Add the crate skeleton

This commit is contained in:
Maksim Kirillov 2025-07-24 14:50:49 +02:00
parent 65a3ebdbe2
commit 088331a933
17 changed files with 2181 additions and 2 deletions

196
Cargo.lock generated
View file

@ -750,6 +750,29 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "automerge"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f0dae93622d3c6850d196503480004576249e0e391bddb3f54600974d92a790"
dependencies = [
"cfg-if",
"flate2",
"fxhash",
"hex",
"im",
"itertools 0.13.0",
"leb128",
"serde",
"sha2 0.10.9",
"smol_str",
"thiserror 1.0.69",
"tinyvec",
"tracing",
"unicode-segmentation",
"uuid",
]
[[package]]
name = "axum"
version = "0.7.9"
@ -1178,6 +1201,15 @@ dependencies = [
"serde",
]
[[package]]
name = "bitmaps"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2"
dependencies = [
"typenum",
]
[[package]]
name = "bitvec"
version = "1.0.1"
@ -2889,6 +2921,34 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "eigensync"
version = "0.1.0"
dependencies = [
"anyhow",
"automerge",
"chrono",
"clap 4.5.41",
"futures",
"libp2p",
"opentelemetry",
"opentelemetry-prometheus",
"prometheus",
"rusqlite",
"rusqlite_migration",
"serde",
"serde_cbor",
"serde_json",
"tempfile",
"thiserror 1.0.69",
"time 0.3.41",
"tokio",
"toml 0.8.23",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "either"
version = "1.15.0"
@ -4582,6 +4642,20 @@ dependencies = [
"xmltree",
]
[[package]]
name = "im"
version = "15.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0acd33ff0285af998aaf9b57342af478078f53492322fafc47450e09397e0e9"
dependencies = [
"bitmaps",
"rand_core 0.6.4",
"rand_xoshiro",
"sized-chunks",
"typenum",
"version_check",
]
[[package]]
name = "image"
version = "0.25.6"
@ -4977,6 +5051,12 @@ dependencies = [
"spin 0.9.8",
]
[[package]]
name = "leb128"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "libappindicator"
version = "0.9.0"
@ -6759,6 +6839,51 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "opentelemetry"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror 1.0.69",
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e1a24eafe47b693cb938f8505f240dc26c71db60df9aca376b4f857e9653ec7"
dependencies = [
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"prometheus",
"protobuf",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd"
dependencies = [
"async-trait",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"lazy_static",
"once_cell",
"opentelemetry",
"ordered-float 4.6.0",
"thiserror 1.0.69",
]
[[package]]
name = "option-ext"
version = "0.2.0"
@ -6774,6 +6899,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ordered-float"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-stream"
version = "0.2.0"
@ -7463,6 +7597,21 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.4",
"protobuf",
"thiserror 1.0.69",
]
[[package]]
name = "prometheus-client"
version = "0.22.3"
@ -7506,6 +7655,12 @@ dependencies = [
"unarray",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "ptr_meta"
version = "0.1.4"
@ -7815,6 +7970,15 @@ dependencies = [
"rand_core 0.9.3",
]
[[package]]
name = "rand_xoshiro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa"
dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "raw-window-handle"
version = "0.6.2"
@ -8149,6 +8313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags 2.9.1",
"chrono",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink 0.9.1",
@ -8157,6 +8322,16 @@ dependencies = [
"time 0.3.41",
]
[[package]]
name = "rusqlite_migration"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "923b42e802f7dc20a0a6b5e097ba7c83fe4289da07e49156fecf6af08aa9cd1c"
dependencies = [
"log",
"rusqlite",
]
[[package]]
name = "rust_decimal"
version = "1.37.2"
@ -8755,7 +8930,7 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c"
dependencies = [
"ordered-float",
"ordered-float 2.10.1",
"serde",
]
@ -9174,6 +9349,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "sized-chunks"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d69225bde7a69b235da73377861095455d298f2b970996eec25ddbb42b3d1e"
dependencies = [
"bitmaps",
"typenum",
]
[[package]]
name = "slab"
version = "0.4.10"
@ -9228,6 +9413,15 @@ dependencies = [
"serde",
]
[[package]]
name = "smol_str"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd538fb6910ac1099850255cf94a94df6551fbdd602454387d0adb2d1ca6dead"
dependencies = [
"serde",
]
[[package]]
name = "snow"
version = "0.9.6"

View file

@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = [
members = [ "eigensync",
"electrum-pool",
"monero-rpc",
"monero-rpc-pool",

59
eigensync/Cargo.toml Normal file
View file

@ -0,0 +1,59 @@
[package]
name = "eigensync"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "eigensync-server"
path = "src/bin/server.rs"
required-features = ["server"]
[features]
default = ["client"]
server = ["dep:clap", "dep:toml", "dep:tracing-subscriber"]
client = []
metrics = ["dep:opentelemetry", "dep:opentelemetry-prometheus", "dep:prometheus"]
[dependencies]
# Core dependencies
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "fs", "signal"] }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
# Automerge
automerge = "0.5"
# Networking
libp2p = { workspace = true, features = ["request-response", "tcp", "noise", "yamux", "identify", "ping", "mdns"] }
futures = { workspace = true }
# Database
rusqlite = { version = "0.32", features = ["bundled", "chrono"] }
rusqlite_migration = "1.2"
# Serialization
serde_cbor = "0.11"
# Time handling
chrono = { version = "0.4", features = ["serde"] }
time = { version = "0.3", features = ["serde", "formatting"] }
# Server-only dependencies
clap = { version = "4.0", features = ["derive"], optional = true }
toml = { version = "0.8", optional = true }
tracing-subscriber = { workspace = true, optional = true }
# Metrics (optional)
opentelemetry = { version = "0.23", optional = true }
opentelemetry-prometheus = { version = "0.16", optional = true }
prometheus = { version = "0.13", optional = true }
[dev-dependencies]
tempfile = "3.0"
[lints]
workspace = true

171
eigensync/src/bin/server.rs Normal file
View file

@ -0,0 +1,171 @@
//! Eigensync server binary
use clap::{Parser, Subcommand};
use eigensync::{Server, ServerConfig};
use std::path::PathBuf;
use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Parser)]
#[command(name = "eigensync-server")]
#[command(about = "Eigensync distributed state synchronization server")]
#[command(version)]
struct Cli {
/// Configuration file path
#[arg(short, long)]
config: Option<PathBuf>,
/// Database path
#[arg(long)]
database: Option<PathBuf>,
/// Listen address
#[arg(short, long, default_value = "0.0.0.0")]
listen_address: String,
/// Listen port
#[arg(short, long, default_value = "9944")]
port: u16,
/// Maximum number of peers
#[arg(long, default_value = "100")]
max_peers: u32,
/// Enable debug logging
#[arg(short, long)]
debug: bool,
/// Enable JSON logging
#[arg(long)]
json: bool,
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Clone, Subcommand)]
enum Commands {
/// Run the server
Run,
/// Generate default configuration
GenerateConfig {
/// Output file path
#[arg(short, long)]
output: Option<PathBuf>,
},
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
// Initialize logging
let filter = if cli.debug {
"debug,eigensync=trace"
} else {
"info,eigensync=debug"
};
if cli.json {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(filter))
.with(tracing_subscriber::fmt::layer().json())
.init();
} else {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(filter))
.with(tracing_subscriber::fmt::layer().pretty())
.init();
}
info!("Starting eigensync server v{}", env!("CARGO_PKG_VERSION"));
// Handle subcommands
let command = cli.command.clone().unwrap_or(Commands::Run);
match command {
Commands::Run => {
run_server(cli).await?;
}
Commands::GenerateConfig { output } => {
generate_config(output)?;
}
}
Ok(())
}
async fn run_server(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
// Load or create configuration
let mut config = if let Some(config_path) = cli.config {
if config_path.exists() {
info!("Loading configuration from {:?}", config_path);
// TODO: Implement config file loading
ServerConfig::default()
} else {
warn!("Configuration file {:?} not found, using defaults", config_path);
ServerConfig::default()
}
} else {
info!("No configuration file specified, using defaults");
ServerConfig::default()
};
// Override config with CLI arguments
if let Some(database_path) = cli.database {
config.database_path = database_path;
}
config.listen_address = cli.listen_address;
config.listen_port = cli.port;
config.max_peers = cli.max_peers;
info!("Server configuration:");
info!(" Database: {:?}", config.database_path);
info!(" Listen: {}:{}", config.listen_address, config.listen_port);
info!(" Max peers: {}", config.max_peers);
// Create and run server
let server = Server::new(config).await?;
// Set up signal handling for graceful shutdown
let shutdown_signal = async {
tokio::signal::ctrl_c()
.await
.expect("Failed to install CTRL+C signal handler");
info!("Received shutdown signal");
};
// Run server with graceful shutdown
tokio::select! {
result = server.run() => {
match result {
Ok(_) => info!("Server completed successfully"),
Err(e) => {
tracing::error!("Server error: {}", e);
return Err(e.into());
}
}
}
_ = shutdown_signal => {
info!("Shutting down server gracefully");
}
}
Ok(())
}
fn generate_config(output: Option<PathBuf>) -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::default();
let config_toml = toml::to_string_pretty(&config)?;
match output {
Some(path) => {
std::fs::write(&path, config_toml)?;
info!("Generated configuration file: {:?}", path);
}
None => {
println!("{}", config_toml);
}
}
Ok(())
}

View file

@ -0,0 +1,72 @@
//! libp2p networking behaviour for eigensync client
use crate::protocol::{EigensyncRequest, EigensyncResponse};
use crate::types::{Result, PeerId};
/// libp2p behaviour for eigensync client (placeholder)
pub struct ClientBehaviour {
// TODO: Add actual behaviour components
// request_response: RequestResponse<EigensyncCodec>,
// identify: Identify,
// ping: Ping,
}
impl ClientBehaviour {
/// Create a new client behaviour
pub fn new() -> Self {
tracing::debug!("Creating client behaviour");
// TODO: Initialize behaviour components
Self {
// request_response: RequestResponse::new(...),
// identify: Identify::new(...),
// ping: Ping::default(),
}
}
/// Send a request to the server
pub async fn send_request(
&mut self,
server_peer_id: PeerId,
request: EigensyncRequest,
) -> Result<EigensyncResponse> {
tracing::debug!("Sending request to server {}: {:?}", server_peer_id, request);
// TODO: Implement request sending
match request {
EigensyncRequest::GetChanges(_params) => {
// TODO: Handle GetChanges request
todo!("GetChanges request not implemented")
},
EigensyncRequest::SubmitChanges(_params) => {
// TODO: Handle SubmitChanges request
todo!("SubmitChanges request not implemented")
},
EigensyncRequest::Ping(_params) => {
// TODO: Handle Ping request
todo!("Ping request not implemented")
},
EigensyncRequest::GetStatus(_params) => {
// TODO: Handle GetStatus request
todo!("GetStatus request not implemented")
},
}
}
}
impl Default for ClientBehaviour {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_behaviour_creation() {
let _behaviour = ClientBehaviour::new();
// Behaviour creation should not panic
}
}

View file

@ -0,0 +1,81 @@
//! Client database layer for caching documents and metadata
use crate::types::{Result, ActorId};
use rusqlite::Connection;
use std::path::Path;
/// Client database for caching Automerge documents and metadata
pub struct ClientDatabase {
connection: Connection,
}
impl ClientDatabase {
/// Open or create a client cache database
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
tracing::info!("Opening client cache database at {:?}", path.as_ref());
// TODO: Implement actual database opening and migration
let connection = Connection::open(path)?;
Ok(Self { connection })
}
/// Store an Automerge document
pub async fn store_document(
&self,
document_id: &str,
document_data: &[u8],
) -> Result<()> {
tracing::debug!("Storing document {}", document_id);
// TODO: Implement document storage
Ok(())
}
/// Load an Automerge document
pub async fn load_document(&self, document_id: &str) -> Result<Option<Vec<u8>>> {
tracing::debug!("Loading document {}", document_id);
// TODO: Implement document loading
Ok(None)
}
/// Store document metadata
pub async fn store_metadata(
&self,
document_id: &str,
last_sync: chrono::DateTime<chrono::Utc>,
heads: &[automerge::ChangeHash],
) -> Result<()> {
tracing::debug!("Storing metadata for document {}", document_id);
// TODO: Implement metadata storage
Ok(())
}
/// Load document metadata
pub async fn load_metadata(
&self,
document_id: &str,
) -> Result<Option<(chrono::DateTime<chrono::Utc>, Vec<automerge::ChangeHash>)>> {
tracing::debug!("Loading metadata for document {}", document_id);
// TODO: Implement metadata loading
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_client_database_creation() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_cache.db");
let db = ClientDatabase::open(db_path).await;
assert!(db.is_ok());
}
}

View file

@ -0,0 +1,156 @@
//! Automerge document management for client
use crate::types::{Result, ActorId};
use automerge::{AutoCommit, ChangeHash};
use serde_json::Value;
use std::collections::HashMap;
/// Manager for Automerge documents
pub struct DocumentManager {
documents: HashMap<String, AutoCommit>,
actor_id: ActorId,
}
impl DocumentManager {
/// Create a new document manager
pub fn new(actor_id: ActorId) -> Self {
tracing::debug!("Creating document manager for actor {}", actor_id);
Self {
documents: HashMap::new(),
actor_id,
}
}
/// Get or create a document for a swap
pub fn get_or_create_document(&mut self, document_id: &str) -> Result<&mut AutoCommit> {
tracing::debug!("Getting or creating document {}", document_id);
if !self.documents.contains_key(document_id) {
// TODO: Try to load from cache first
let mut doc = AutoCommit::new();
doc.set_actor(self.actor_id.0.clone());
self.documents.insert(document_id.to_string(), doc);
}
Ok(self.documents.get_mut(document_id).unwrap())
}
/// Append a swap state to a document
pub fn append_swap_state(
&mut self,
document_id: &str,
state_json: Value,
timestamp: chrono::DateTime<chrono::Utc>,
) -> Result<Vec<automerge::Change>> {
tracing::debug!("Appending swap state to document {}", document_id);
let doc = self.get_or_create_document(document_id)?;
// TODO: Implement state appending to Automerge document
// Structure: { "states": [ { "timestamp": "...", "state": {...} } ] }
// For now, return empty changes
Ok(vec![])
}
/// Get the latest state from a document
pub fn get_latest_state(&self, document_id: &str) -> Result<Option<Value>> {
tracing::debug!("Getting latest state from document {}", document_id);
if let Some(doc) = self.documents.get(document_id) {
// TODO: Implement state retrieval from Automerge document
// Find entry with latest timestamp
}
Ok(None)
}
/// Apply changes to a document
pub fn apply_changes(
&mut self,
document_id: &str,
changes: &[automerge::Change],
) -> Result<()> {
tracing::debug!("Applying {} changes to document {}", changes.len(), document_id);
let doc = self.get_or_create_document(document_id)?;
for change in changes {
doc.load_incremental(change.raw_bytes())?;
}
Ok(())
}
/// Get document heads
pub fn get_heads(&mut self, document_id: &str) -> Vec<ChangeHash> {
if let Some(doc) = self.documents.get_mut(document_id) {
doc.get_heads()
} else {
vec![]
}
}
/// Generate changes since given heads
pub fn changes_since(
&mut self,
document_id: &str,
heads: &[ChangeHash],
) -> Result<Vec<automerge::Change>> {
if let Some(doc) = self.documents.get_mut(document_id) {
let changes = doc.get_changes(heads);
Ok(changes.into_iter().cloned().collect())
} else {
Ok(vec![])
}
}
/// Serialize a document for storage
pub fn serialize_document(&mut self, document_id: &str) -> Result<Option<Vec<u8>>> {
if let Some(doc) = self.documents.get_mut(document_id) {
let bytes = doc.save();
Ok(Some(bytes))
} else {
Ok(None)
}
}
/// Load a document from serialized data
pub fn load_document(&mut self, document_id: &str, data: &[u8]) -> Result<()> {
tracing::debug!("Loading document {} from {} bytes", document_id, data.len());
let mut doc = AutoCommit::load(data)?;
doc.set_actor(self.actor_id.0.clone());
self.documents.insert(document_id.to_string(), doc);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_document_manager_creation() {
let actor_id = ActorId(automerge::ActorId::random());
let _manager = DocumentManager::new(actor_id);
// Manager creation should not panic
}
#[test]
fn test_get_or_create_document() {
let actor_id = ActorId(automerge::ActorId::random());
let mut manager = DocumentManager::new(actor_id);
// First call should create the document
let _doc1 = manager.get_or_create_document("test-doc").unwrap();
// Second call should return the existing document (test that no panic occurs)
let _doc2 = manager.get_or_create_document("test-doc").unwrap();
// Document should exist in the manager
assert!(manager.documents.contains_key("test-doc"));
}
}

124
eigensync/src/client/mod.rs Normal file
View file

@ -0,0 +1,124 @@
//! Client-side components for eigensync
use crate::types::{Result, PeerId, ActorId};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
pub mod database;
pub mod behaviour;
pub mod sync_loop;
pub mod document;
/// Configuration for the eigensync client
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientConfig {
/// Path to client cache database
pub cache_database_path: PathBuf,
/// Server address to connect to
pub server_address: String,
/// Server port to connect to
pub server_port: u16,
/// Sync interval
pub sync_interval: Duration,
/// Connection timeout
pub connection_timeout: Duration,
/// Actor ID for this client
pub actor_id: ActorId,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
cache_database_path: PathBuf::from("eigensync_cache.sqlite"),
server_address: "127.0.0.1".to_string(),
server_port: 9944,
sync_interval: Duration::from_secs(30),
connection_timeout: Duration::from_secs(10),
actor_id: ActorId(automerge::ActorId::random()),
}
}
}
/// Main client struct (placeholder implementation)
pub struct Client {
config: ClientConfig,
}
impl Client {
/// Create a new client with the given configuration
pub async fn new(config: ClientConfig) -> Result<Self> {
tracing::info!("Creating eigensync client with config: {:?}", config);
// TODO: Initialize database, networking, etc.
Ok(Self { config })
}
/// Start the client sync loop
pub async fn start_sync(&mut self) -> Result<()> {
tracing::info!("Starting eigensync client sync");
// TODO: Implement client sync loop
Ok(())
}
/// Append a swap state to the local document
pub async fn append_swap_state(
&mut self,
swap_id: uuid::Uuid,
state_json: serde_json::Value,
timestamp: chrono::DateTime<chrono::Utc>,
) -> Result<()> {
tracing::debug!("Appending swap state for {}: {:?}", swap_id, state_json);
// TODO: Implement state appending
// 1. Load Automerge document for swap_id
// 2. Add new state entry with timestamp
// 3. Generate patch
// 4. Store locally and mark for sync
Ok(())
}
/// Get the latest state for a swap
pub async fn get_latest_swap_state(
&self,
swap_id: uuid::Uuid,
) -> Result<Option<serde_json::Value>> {
tracing::debug!("Getting latest swap state for {}", swap_id);
// TODO: Implement state retrieval
// 1. Load Automerge document for swap_id
// 2. Find entry with latest timestamp
// 3. Return state JSON
Ok(None)
}
/// Get client configuration
pub fn config(&self) -> &ClientConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_config_default() {
let config = ClientConfig::default();
assert_eq!(config.server_address, "127.0.0.1");
assert_eq!(config.server_port, 9944);
assert_eq!(config.sync_interval, Duration::from_secs(30));
}
#[tokio::test]
async fn test_client_creation() {
let config = ClientConfig::default();
let client = Client::new(config).await;
assert!(client.is_ok());
}
}

View file

@ -0,0 +1,104 @@
//! Client sync loop for periodic synchronization with server
use crate::client::behaviour::ClientBehaviour;
use crate::client::database::ClientDatabase;
use crate::types::{Result, PeerId};
use std::time::Duration;
/// Client sync loop for periodic synchronization
pub struct ClientSyncLoop {
behaviour: ClientBehaviour,
database: ClientDatabase,
server_peer_id: PeerId,
sync_interval: Duration,
}
impl ClientSyncLoop {
/// Create a new client sync loop
pub fn new(
behaviour: ClientBehaviour,
database: ClientDatabase,
server_peer_id: PeerId,
sync_interval: Duration,
) -> Self {
tracing::debug!("Creating client sync loop with interval {:?}", sync_interval);
Self {
behaviour,
database,
server_peer_id,
sync_interval,
}
}
/// Run the sync loop
pub async fn run(mut self) -> Result<()> {
tracing::info!("Starting client sync loop");
let mut interval = tokio::time::interval(self.sync_interval);
loop {
interval.tick().await;
match self.perform_sync().await {
Ok(_) => {
tracing::debug!("Sync completed successfully");
},
Err(e) => {
tracing::warn!("Sync failed: {}", e);
// Continue syncing despite errors
}
}
}
}
/// Perform a single sync operation
async fn perform_sync(&mut self) -> Result<()> {
tracing::debug!("Performing sync with server {}", self.server_peer_id);
// TODO: Implement sync logic:
// 1. Get list of documents to sync
// 2. For each document:
// - Get local heads/metadata
// - Request changes since last sync
// - Apply received changes
// - Submit any local changes
// - Update local metadata
Ok(())
}
/// Sync a specific document
pub async fn sync_document(&mut self, document_id: &str) -> Result<()> {
tracing::debug!("Syncing document {}", document_id);
// TODO: Implement document-specific sync
Ok(())
}
/// Force immediate sync (outside of normal interval)
pub async fn sync_now(&mut self) -> Result<()> {
tracing::info!("Forcing immediate sync");
self.perform_sync().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_sync_loop_creation() {
let behaviour = ClientBehaviour::new();
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_cache.db");
let database = crate::client::database::ClientDatabase::open(db_path).await.unwrap();
let server_peer_id = PeerId::random();
let sync_interval = Duration::from_secs(1);
let _sync_loop = ClientSyncLoop::new(behaviour, database, server_peer_id, sync_interval);
// Sync loop creation should not panic
}
}

70
eigensync/src/lib.rs Normal file
View file

@ -0,0 +1,70 @@
//! Eigensync: Distributed State Synchronization using Automerge CRDTs
//!
//! This crate provides a distributed state synchronization system built on top of
//! Automerge CRDTs and libp2p networking. It enables synchronizing append-only state
//! machines across multiple devices.
//!
//! # Features
//!
//! - **Append-only state synchronization**: Designed for state machines that only add states
//! - **Conflict-free replication**: Uses Automerge CRDTs to handle concurrent updates
//! - **Peer-to-peer networking**: Built on libp2p for reliable P2P communication
//! - **Persistent storage**: SQLite-based persistence for both server and client
//! - **Authentication**: PeerId-based authentication with ActorId mapping
//!
//! # Architecture
//!
//! The system consists of:
//! - **Server**: Stores and distributes patches per PeerId
//! - **Client**: Maintains local Automerge document and syncs with server
//! - **Protocol**: Request/response protocol for patch exchange
pub mod types;
pub mod protocol;
#[cfg(feature = "server")]
pub mod server;
#[cfg(feature = "client")]
pub mod client;
#[cfg(feature = "metrics")]
pub mod metrics;
// Re-export commonly used types
pub use types::{Error, Result, ActorId, PeerId, DocumentState, PatchInfo};
pub use protocol::{EigensyncMessage, EigensyncRequest, EigensyncResponse};
#[cfg(feature = "client")]
pub use client::{Client, ClientConfig};
#[cfg(feature = "server")]
pub use server::{Server, ServerConfig};
/// Version of the eigensync protocol
pub const PROTOCOL_VERSION: u32 = 1;
/// Protocol name for libp2p
pub const PROTOCOL_NAME: &str = "/eigensync/1.0.0";
/// Main entry point for integrating eigensync with swap state machine
#[cfg(feature = "client")]
pub async fn append_state(
client: &mut Client,
swap_id: uuid::Uuid,
state_json: serde_json::Value,
timestamp: chrono::DateTime<chrono::Utc>,
) -> Result<()> {
client.append_swap_state(swap_id, state_json, timestamp).await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_protocol_constants() {
assert_eq!(PROTOCOL_VERSION, 1);
assert_eq!(PROTOCOL_NAME, "/eigensync/1.0.0");
}
}

190
eigensync/src/metrics.rs Normal file
View file

@ -0,0 +1,190 @@
//! Metrics and observability for eigensync
#[cfg(feature = "metrics")]
use prometheus::{Counter, Histogram, Registry};
use std::time::Instant;
/// Metrics collector for eigensync operations
pub struct EigensyncMetrics {
#[cfg(feature = "metrics")]
registry: Registry,
#[cfg(feature = "metrics")]
changes_sent: Counter,
#[cfg(feature = "metrics")]
changes_received: Counter,
#[cfg(feature = "metrics")]
sync_duration: Histogram,
#[cfg(feature = "metrics")]
rtt_histogram: Histogram,
}
impl EigensyncMetrics {
/// Create a new metrics collector
pub fn new() -> Self {
tracing::debug!("Creating eigensync metrics collector");
#[cfg(feature = "metrics")]
{
let registry = Registry::new();
let changes_sent = Counter::new(
"eigensync_changes_sent_total",
"Total number of changes sent"
).expect("Failed to create changes_sent counter");
let changes_received = Counter::new(
"eigensync_changes_received_total",
"Total number of changes received"
).expect("Failed to create changes_received counter");
let sync_duration = Histogram::with_opts(
prometheus::HistogramOpts::new(
"eigensync_sync_duration_seconds",
"Duration of sync operations in seconds"
)
).expect("Failed to create sync_duration histogram");
let rtt_histogram = Histogram::with_opts(
prometheus::HistogramOpts::new(
"eigensync_request_rtt_seconds",
"Round-trip time for requests in seconds"
)
).expect("Failed to create rtt histogram");
registry.register(Box::new(changes_sent.clone())).unwrap();
registry.register(Box::new(changes_received.clone())).unwrap();
registry.register(Box::new(sync_duration.clone())).unwrap();
registry.register(Box::new(rtt_histogram.clone())).unwrap();
Self {
registry,
changes_sent,
changes_received,
sync_duration,
rtt_histogram,
}
}
#[cfg(not(feature = "metrics"))]
{
Self {}
}
}
/// Record changes sent
pub fn record_changes_sent(&self, count: u64) {
#[cfg(feature = "metrics")]
{
self.changes_sent.inc_by(count as f64);
}
tracing::debug!("Recorded {} changes sent", count);
}
/// Record changes received
pub fn record_changes_received(&self, count: u64) {
#[cfg(feature = "metrics")]
{
self.changes_received.inc_by(count as f64);
}
tracing::debug!("Recorded {} changes received", count);
}
/// Record sync operation duration
pub fn record_sync_duration(&self, duration: std::time::Duration) {
#[cfg(feature = "metrics")]
{
self.sync_duration.observe(duration.as_secs_f64());
}
tracing::debug!("Recorded sync duration: {:?}", duration);
}
/// Record request round-trip time
pub fn record_rtt(&self, rtt: std::time::Duration) {
#[cfg(feature = "metrics")]
{
self.rtt_histogram.observe(rtt.as_secs_f64());
}
tracing::debug!("Recorded RTT: {:?}", rtt);
}
/// Get metrics registry (for Prometheus export)
#[cfg(feature = "metrics")]
pub fn registry(&self) -> &Registry {
&self.registry
}
}
impl Default for EigensyncMetrics {
fn default() -> Self {
Self::new()
}
}
/// Timer for measuring operation duration
pub struct Timer {
start: Instant,
label: String,
}
impl Timer {
/// Start a new timer
pub fn new(label: impl Into<String>) -> Self {
Self {
start: Instant::now(),
label: label.into(),
}
}
/// Stop the timer and return the elapsed duration
pub fn stop(self) -> std::time::Duration {
let duration = self.start.elapsed();
tracing::debug!("Timer '{}' finished in {:?}", self.label, duration);
duration
}
}
/// Macro for timing operations
#[macro_export]
macro_rules! time_operation {
($metrics:expr, $operation:expr, $block:block) => {{
let timer = crate::metrics::Timer::new($operation);
let result = $block;
let duration = timer.stop();
$metrics.record_sync_duration(duration);
result
}};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_creation() {
let _metrics = EigensyncMetrics::new();
// Metrics creation should not panic
}
#[test]
fn test_timer() {
let timer = Timer::new("test");
std::thread::sleep(std::time::Duration::from_millis(1));
let duration = timer.stop();
assert!(duration.as_millis() >= 1);
}
#[test]
fn test_metrics_recording() {
let metrics = EigensyncMetrics::new();
// These should not panic
metrics.record_changes_sent(5);
metrics.record_changes_received(3);
metrics.record_sync_duration(std::time::Duration::from_millis(100));
metrics.record_rtt(std::time::Duration::from_millis(50));
}
}

395
eigensync/src/protocol.rs Normal file
View file

@ -0,0 +1,395 @@
//! Protocol definitions for eigensync communication
//!
//! This module defines the wire protocol used for communication between
//! eigensync clients and servers. The protocol is versioned and uses
//! serde_cbor for serialization with length-prefixed frames.
use crate::types::{ActorId, PeerId, Result, Error};
use serde::{Deserialize, Serialize};
use std::time::Duration;
/// Protocol version for version negotiation
pub const CURRENT_VERSION: u32 = 1;
/// Maximum message size to prevent DoS attacks (10 MB)
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
/// Default request timeout
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
/// Main message envelope for all eigensync communications
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EigensyncMessage {
/// Protocol version
pub version: u32,
/// Unique request identifier for matching responses
pub request_id: uuid::Uuid,
/// Message payload
pub payload: EigensyncPayload,
}
/// Union type for all message payloads
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum EigensyncPayload {
Request(EigensyncRequest),
Response(EigensyncResponse),
}
/// All possible request types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", content = "params")]
pub enum EigensyncRequest {
/// Get changes from server since a given point
GetChanges(GetChangesParams),
/// Submit new changes to server
SubmitChanges(SubmitChangesParams),
/// Ping for connectivity testing
Ping(PingParams),
/// Get server status/info
GetStatus(GetStatusParams),
}
/// All possible response types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", content = "result")]
pub enum EigensyncResponse {
/// Response to GetChanges request
GetChanges(GetChangesResult),
/// Response to SubmitChanges request
SubmitChanges(SubmitChangesResult),
/// Response to Ping request
Ping(PingResult),
/// Response to GetStatus request
GetStatus(GetStatusResult),
/// Error response for any request
Error(ErrorResult),
}
// Request parameters
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetChangesParams {
/// Document to get changes for (typically swap_id)
pub document_id: String,
/// Only return changes after this sequence number
pub since_sequence: Option<u64>,
/// Only return changes after this timestamp
pub since_timestamp: Option<chrono::DateTime<chrono::Utc>>,
/// Maximum number of changes to return (for pagination)
pub limit: Option<u32>,
/// Automerge heads we already have (to optimize sync)
pub have_heads: Vec<automerge::ChangeHash>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitChangesParams {
/// Document these changes apply to
pub document_id: String,
/// Serialized Automerge changes
pub changes: Vec<Vec<u8>>,
/// Actor ID that created these changes
pub actor_id: ActorId,
/// Expected sequence number for optimistic concurrency control
pub expected_sequence: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PingParams {
/// Timestamp when ping was sent
pub timestamp: chrono::DateTime<chrono::Utc>,
/// Optional payload for bandwidth testing
pub payload: Option<Vec<u8>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetStatusParams {
/// Include detailed statistics
pub include_stats: bool,
/// Include information about other peers
pub include_peers: bool,
}
// Response results
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetChangesResult {
/// Document ID these changes apply to
pub document_id: String,
/// Serialized Automerge changes
pub changes: Vec<Vec<u8>>,
/// Sequence numbers for each change
pub sequences: Vec<u64>,
/// Whether there are more changes available
pub has_more: bool,
/// Current document heads after applying these changes
pub new_heads: Vec<automerge::ChangeHash>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitChangesResult {
/// Document ID changes were applied to
pub document_id: String,
/// Sequence numbers assigned to the submitted changes
pub assigned_sequences: Vec<u64>,
/// Number of changes that were actually new (not duplicates)
pub new_changes_count: u32,
/// Current document heads after applying changes
pub new_heads: Vec<automerge::ChangeHash>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PingResult {
/// Timestamp from the request
pub request_timestamp: chrono::DateTime<chrono::Utc>,
/// Timestamp when server processed the request
pub response_timestamp: chrono::DateTime<chrono::Utc>,
/// Echo back any payload that was sent
pub payload: Option<Vec<u8>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetStatusResult {
/// Server version/build info
pub server_version: String,
/// Protocol versions supported
pub supported_versions: Vec<u32>,
/// Server uptime in seconds
pub uptime_seconds: u64,
/// Number of connected peers
pub connected_peers: u32,
/// Number of documents being tracked
pub document_count: u64,
/// Total number of changes stored
pub total_changes: u64,
/// Optional detailed statistics
pub stats: Option<ServerStats>,
/// Optional peer information
pub peers: Option<Vec<PeerStatus>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorResult {
/// Error code for programmatic handling
pub code: ErrorCode,
/// Human-readable error message
pub message: String,
/// Optional additional details
pub details: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerStats {
/// Total bytes stored
pub total_bytes: u64,
/// Bytes sent since startup
pub bytes_sent: u64,
/// Bytes received since startup
pub bytes_received: u64,
/// Number of requests processed
pub requests_processed: u64,
/// Average request processing time in milliseconds
pub avg_request_time_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerStatus {
/// Peer ID
pub peer_id: String,
/// Associated actor ID if authenticated
pub actor_id: Option<String>,
/// When peer connected
pub connected_at: chrono::DateTime<chrono::Utc>,
/// Last activity timestamp
pub last_activity: chrono::DateTime<chrono::Utc>,
/// Number of documents this peer is syncing
pub document_count: u32,
}
/// Error codes for programmatic error handling
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[repr(u32)]
pub enum ErrorCode {
/// Unknown or internal server error
InternalError = 1000,
/// Invalid request format or parameters
InvalidRequest = 1001,
/// Authentication failed
AuthenticationFailed = 1002,
/// Requested resource not found
NotFound = 1003,
/// Rate limit exceeded
RateLimitExceeded = 1004,
/// Storage quota exceeded
QuotaExceeded = 1005,
/// Version not supported
UnsupportedVersion = 1006,
/// Request timeout
Timeout = 1007,
/// Conflict in optimistic concurrency control
Conflict = 1008,
/// Invalid actor/peer mapping
InvalidActorMapping = 1009,
}
impl ErrorCode {
/// Convert error code to human-readable string
pub fn as_str(&self) -> &'static str {
match self {
ErrorCode::InternalError => "internal_error",
ErrorCode::InvalidRequest => "invalid_request",
ErrorCode::AuthenticationFailed => "authentication_failed",
ErrorCode::NotFound => "not_found",
ErrorCode::RateLimitExceeded => "rate_limit_exceeded",
ErrorCode::QuotaExceeded => "quota_exceeded",
ErrorCode::UnsupportedVersion => "unsupported_version",
ErrorCode::Timeout => "timeout",
ErrorCode::Conflict => "conflict",
ErrorCode::InvalidActorMapping => "invalid_actor_mapping",
}
}
}
impl From<ErrorCode> for Error {
fn from(code: ErrorCode) -> Self {
Error::Protocol {
message: format!("Protocol error: {}", code.as_str()),
}
}
}
/// Codec for serializing/deserializing eigensync messages
pub struct EigensyncCodec;
impl EigensyncCodec {
/// Serialize message to bytes with length prefix
pub fn encode(message: &EigensyncMessage) -> Result<Vec<u8>> {
let payload = serde_cbor::to_vec(message)?;
if payload.len() > MAX_MESSAGE_SIZE {
return Err(Error::Protocol {
message: format!(
"Message too large: {} bytes > {} max",
payload.len(),
MAX_MESSAGE_SIZE
),
});
}
let mut result = Vec::with_capacity(4 + payload.len());
result.extend_from_slice(&(payload.len() as u32).to_be_bytes());
result.extend_from_slice(&payload);
Ok(result)
}
/// Deserialize message from bytes (assumes length prefix already read)
pub fn decode(data: &[u8]) -> Result<EigensyncMessage> {
if data.len() > MAX_MESSAGE_SIZE {
return Err(Error::Protocol {
message: format!(
"Message too large: {} bytes > {} max",
data.len(),
MAX_MESSAGE_SIZE
),
});
}
let message: EigensyncMessage = serde_cbor::from_slice(data)?;
// Validate version
if message.version > CURRENT_VERSION {
return Err(Error::Protocol {
message: format!(
"Unsupported protocol version: {} > {}",
message.version,
CURRENT_VERSION
),
});
}
Ok(message)
}
/// Create a request message
pub fn create_request(request: EigensyncRequest) -> EigensyncMessage {
EigensyncMessage {
version: CURRENT_VERSION,
request_id: uuid::Uuid::new_v4(),
payload: EigensyncPayload::Request(request),
}
}
/// Create a response message
pub fn create_response(
request_id: uuid::Uuid,
response: EigensyncResponse,
) -> EigensyncMessage {
EigensyncMessage {
version: CURRENT_VERSION,
request_id,
payload: EigensyncPayload::Response(response),
}
}
/// Create an error response
pub fn create_error_response(
request_id: uuid::Uuid,
code: ErrorCode,
message: String,
) -> EigensyncMessage {
Self::create_response(
request_id,
EigensyncResponse::Error(ErrorResult {
code,
message,
details: None,
}),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_codec_roundtrip() {
let request = EigensyncRequest::Ping(PingParams {
timestamp: chrono::Utc::now(),
payload: Some(b"test".to_vec()),
});
let message = EigensyncCodec::create_request(request);
let encoded = EigensyncCodec::encode(&message).unwrap();
// Skip the length prefix for decoding
let decoded = EigensyncCodec::decode(&encoded[4..]).unwrap();
assert_eq!(message.version, decoded.version);
assert_eq!(message.request_id, decoded.request_id);
}
#[test]
fn test_message_size_limit() {
let large_payload = vec![0u8; MAX_MESSAGE_SIZE + 1];
let request = EigensyncRequest::SubmitChanges(SubmitChangesParams {
document_id: "test".to_string(),
changes: vec![large_payload],
actor_id: ActorId(automerge::ActorId::random()),
expected_sequence: None,
});
let message = EigensyncCodec::create_request(request);
let result = EigensyncCodec::encode(&message);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Message too large"));
}
#[test]
fn test_error_codes() {
assert_eq!(ErrorCode::InternalError.as_str(), "internal_error");
assert_eq!(ErrorCode::AuthenticationFailed.as_str(), "authentication_failed");
assert_eq!(ErrorCode::NotFound.as_str(), "not_found");
}
}

View file

@ -0,0 +1,72 @@
//! libp2p networking behaviour for eigensync server
use crate::protocol::{EigensyncMessage, EigensyncRequest, EigensyncResponse};
use crate::types::{Result, PeerId};
/// libp2p behaviour for eigensync server (placeholder)
pub struct ServerBehaviour {
// TODO: Add actual behaviour components
// request_response: RequestResponse<EigensyncCodec>,
// identify: Identify,
// ping: Ping,
}
impl ServerBehaviour {
/// Create a new server behaviour
pub fn new() -> Self {
tracing::debug!("Creating server behaviour");
// TODO: Initialize behaviour components
Self {
// request_response: RequestResponse::new(...),
// identify: Identify::new(...),
// ping: Ping::default(),
}
}
/// Handle incoming request
pub async fn handle_request(
&mut self,
peer_id: PeerId,
request: EigensyncRequest,
) -> Result<EigensyncResponse> {
tracing::debug!("Handling request from peer {}: {:?}", peer_id, request);
// TODO: Implement request handling
match request {
EigensyncRequest::GetChanges(_params) => {
// TODO: Handle GetChanges
todo!("GetChanges not implemented")
},
EigensyncRequest::SubmitChanges(_params) => {
// TODO: Handle SubmitChanges
todo!("SubmitChanges not implemented")
},
EigensyncRequest::Ping(_params) => {
// TODO: Handle Ping
todo!("Ping not implemented")
},
EigensyncRequest::GetStatus(_params) => {
// TODO: Handle GetStatus
todo!("GetStatus not implemented")
},
}
}
}
impl Default for ServerBehaviour {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_behaviour_creation() {
let _behaviour = ServerBehaviour::new();
// Behaviour creation should not panic
}
}

View file

@ -0,0 +1,81 @@
//! Server database layer for patch storage and peer management
use crate::types::{Result, PeerId, ActorId};
use rusqlite::Connection;
use std::path::Path;
/// Server database for storing patches and peer information
pub struct ServerDatabase {
connection: Connection,
}
impl ServerDatabase {
/// Open or create a server database
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
tracing::info!("Opening server database at {:?}", path.as_ref());
// TODO: Implement actual database opening and migration
let connection = Connection::open(path)?;
Ok(Self { connection })
}
/// Store a patch for a peer
pub async fn store_patch(
&self,
peer_id: PeerId,
actor_id: ActorId,
document_id: &str,
patch_data: &[u8],
) -> Result<u64> {
tracing::debug!("Storing patch for peer {} in document {}", peer_id, document_id);
// TODO: Implement patch storage
Ok(0)
}
/// Get patches for a peer since a given sequence number
pub async fn get_patches(
&self,
peer_id: PeerId,
document_id: &str,
since_sequence: Option<u64>,
) -> Result<Vec<(u64, Vec<u8>)>> {
tracing::debug!("Getting patches for peer {} in document {} since {:?}",
peer_id, document_id, since_sequence);
// TODO: Implement patch retrieval
Ok(vec![])
}
/// Bind a peer ID to an actor ID
pub async fn bind_peer_actor(&self, peer_id: PeerId, actor_id: ActorId) -> Result<()> {
tracing::debug!("Binding peer {} to actor {}", peer_id, actor_id);
// TODO: Implement peer-actor binding
Ok(())
}
/// Get actor ID for a peer
pub async fn get_actor_for_peer(&self, peer_id: PeerId) -> Result<Option<ActorId>> {
tracing::debug!("Getting actor for peer {}", peer_id);
// TODO: Implement actor lookup
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_database_creation() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = ServerDatabase::open(db_path).await;
assert!(db.is_ok());
}
}

View file

@ -0,0 +1,74 @@
//! Server event loop for handling network events
use crate::server::behaviour::ServerBehaviour;
use crate::server::database::ServerDatabase;
use crate::types::{Result, PeerId};
use std::time::Duration;
/// Server event loop for handling network events and database operations
pub struct ServerEventLoop {
behaviour: ServerBehaviour,
database: ServerDatabase,
}
impl ServerEventLoop {
/// Create a new server event loop
pub fn new(behaviour: ServerBehaviour, database: ServerDatabase) -> Self {
tracing::debug!("Creating server event loop");
Self {
behaviour,
database,
}
}
/// Run the server event loop
pub async fn run(mut self) -> Result<()> {
tracing::info!("Starting server event loop");
// TODO: Implement actual event loop with libp2p swarm
loop {
// Placeholder: just sleep for now
tokio::time::sleep(Duration::from_secs(1)).await;
// TODO:
// - Poll swarm for events
// - Handle incoming requests
// - Manage peer connections
// - Perform periodic maintenance tasks
}
}
/// Handle peer connection
pub async fn handle_peer_connected(&mut self, peer_id: PeerId) -> Result<()> {
tracing::info!("Peer connected: {}", peer_id);
// TODO: Implement peer connection handling
Ok(())
}
/// Handle peer disconnection
pub async fn handle_peer_disconnected(&mut self, peer_id: PeerId) -> Result<()> {
tracing::info!("Peer disconnected: {}", peer_id);
// TODO: Implement peer disconnection handling
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_event_loop_creation() {
let behaviour = ServerBehaviour::new();
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let database = ServerDatabase::open(db_path).await.unwrap();
let _event_loop = ServerEventLoop::new(behaviour, database);
// Event loop creation should not panic
}
}

View file

@ -0,0 +1,92 @@
//! Server-side components for eigensync
use crate::types::{Result, PeerId, ActorId};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
pub mod database;
pub mod behaviour;
pub mod event_loop;
/// Configuration for the eigensync server
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
/// Path to server database
pub database_path: PathBuf,
/// Address to listen on
pub listen_address: String,
/// Port to listen on
pub listen_port: u16,
/// Maximum number of connected peers
pub max_peers: u32,
/// Rate limiting configuration
pub rate_limit: crate::types::RateLimitConfig,
/// Snapshot configuration
pub snapshot_config: crate::types::SnapshotConfig,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
database_path: PathBuf::from("server_patches.sqlite"),
listen_address: "0.0.0.0".to_string(),
listen_port: 9944,
max_peers: 100,
rate_limit: crate::types::RateLimitConfig::default(),
snapshot_config: crate::types::SnapshotConfig::default(),
}
}
}
/// Main server struct (placeholder implementation)
pub struct Server {
config: ServerConfig,
}
impl Server {
/// Create a new server with the given configuration
pub async fn new(config: ServerConfig) -> Result<Self> {
tracing::info!("Creating eigensync server with config: {:?}", config);
// TODO: Initialize database, networking, etc.
Ok(Self { config })
}
/// Start the server
pub async fn run(self) -> Result<()> {
tracing::info!("Starting eigensync server on {}:{}",
self.config.listen_address, self.config.listen_port);
// TODO: Implement server event loop
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
/// Get server configuration
pub fn config(&self) -> &ServerConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_config_default() {
let config = ServerConfig::default();
assert_eq!(config.listen_address, "0.0.0.0");
assert_eq!(config.listen_port, 9944);
assert_eq!(config.max_peers, 100);
}
#[tokio::test]
async fn test_server_creation() {
let config = ServerConfig::default();
let server = Server::new(config).await;
assert!(server.is_ok());
}
}

244
eigensync/src/types.rs Normal file
View file

@ -0,0 +1,244 @@
//! Common types and error definitions for eigensync
use serde::{Deserialize, Serialize};
/// Result type alias for eigensync operations
pub type Result<T> = std::result::Result<T, Error>;
/// PeerId type alias
pub type PeerId = libp2p::PeerId;
/// ActorId uniquely identifies an actor in the Automerge document
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ActorId(pub automerge::ActorId);
impl From<automerge::ActorId> for ActorId {
fn from(actor_id: automerge::ActorId) -> Self {
Self(actor_id)
}
}
impl From<ActorId> for automerge::ActorId {
fn from(actor_id: ActorId) -> Self {
actor_id.0
}
}
impl std::fmt::Display for ActorId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Comprehensive error types for eigensync operations
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Database error: {0}")]
Database(#[from] rusqlite::Error),
#[error("Migration error: {0}")]
Migration(#[from] rusqlite_migration::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_cbor::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Automerge error: {0}")]
Automerge(#[from] automerge::AutomergeError),
#[error("Network error: {0}")]
Network(#[from] libp2p::swarm::DialError),
#[error("Protocol error: {message}")]
Protocol { message: String },
#[error("Authentication failed for peer {peer_id}: {reason}")]
Authentication { peer_id: PeerId, reason: String },
#[error("Document not found: {document_id}")]
DocumentNotFound { document_id: String },
#[error("Invalid configuration: {message}")]
InvalidConfig { message: String },
#[error("Timeout: {operation}")]
Timeout { operation: String },
#[error("Actor mapping conflict: peer {peer_id} tried to use actor {actor_id} already mapped to different peer")]
ActorMappingConflict { peer_id: PeerId, actor_id: ActorId },
#[error("Storage quota exceeded: {current_size} bytes")]
StorageQuotaExceeded { current_size: u64 },
}
/// Information about a patch/change in the system
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatchInfo {
/// Unique identifier for this patch
pub id: uuid::Uuid,
/// Actor that created this patch
pub actor_id: ActorId,
/// Timestamp when patch was created
pub timestamp: chrono::DateTime<chrono::Utc>,
/// Size of the patch data in bytes
pub size_bytes: u64,
/// Hash of the patch content for integrity checking
pub content_hash: String,
}
/// Current state of a document
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentState {
/// Document identifier (typically swap_id)
pub document_id: String,
/// Current number of patches applied
pub patch_count: u64,
/// Total size of all patches in bytes
pub total_size_bytes: u64,
/// Timestamp of last update
pub last_updated: chrono::DateTime<chrono::Utc>,
/// Current document heads
pub heads: Vec<automerge::ChangeHash>,
}
/// Configuration for snapshot and garbage collection
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotConfig {
/// Trigger snapshot after this many changes
pub max_changes: u64,
/// Trigger snapshot after this many bytes
pub max_size_bytes: u64,
/// Whether to compress snapshots above this size
pub compress_threshold_bytes: u64,
}
impl Default for SnapshotConfig {
fn default() -> Self {
Self {
max_changes: 10_000,
max_size_bytes: 10 * 1024 * 1024, // 10 MB
compress_threshold_bytes: 1024 * 1024, // 1 MB
}
}
}
/// Metrics for monitoring sync operations
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SyncMetrics {
/// Total number of changes sent
pub changes_sent: u64,
/// Total number of changes received
pub changes_received: u64,
/// Total number of bytes sent
pub bytes_sent: u64,
/// Total number of bytes received
pub bytes_received: u64,
/// Number of sync operations performed
pub sync_operations: u64,
/// Number of failed sync operations
pub sync_failures: u64,
/// Average round-trip time in milliseconds
pub avg_rtt_ms: f64,
/// Number of conflicts resolved
pub conflicts_resolved: u64,
}
/// State of a peer connection
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PeerState {
/// Peer is disconnected
Disconnected,
/// Peer is connecting
Connecting,
/// Peer is connected and authenticated
Connected,
/// Peer authentication failed
AuthenticationFailed,
/// Peer connection failed
ConnectionFailed,
}
/// Information about a connected peer
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
/// Peer ID (as string)
pub peer_id: String,
/// Associated actor ID
pub actor_id: Option<ActorId>,
/// Current connection state
pub state: PeerState,
/// When the peer was first seen
pub first_seen: chrono::DateTime<chrono::Utc>,
/// When the peer was last seen
pub last_seen: chrono::DateTime<chrono::Utc>,
/// Sync metrics for this peer
pub metrics: SyncMetrics,
}
/// A batch of changes/patches for efficient transmission
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeBatch {
/// Unique identifier for this batch
pub batch_id: uuid::Uuid,
/// Document this batch applies to
pub document_id: String,
/// The actual changes (serialized)
pub changes: Vec<Vec<u8>>,
/// Metadata about each change
pub patch_info: Vec<PatchInfo>,
}
/// Configuration for rate limiting
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RateLimitConfig {
/// Maximum requests per second per peer
pub max_requests_per_second: u32,
/// Maximum bytes per second per peer
pub max_bytes_per_second: u64,
/// Burst allowance
pub burst_size: u32,
}
impl Default for RateLimitConfig {
fn default() -> Self {
Self {
max_requests_per_second: 10,
max_bytes_per_second: 1024 * 1024, // 1 MB/s
burst_size: 5,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_actor_id_conversion() {
let automerge_actor = automerge::ActorId::random();
let actor_id = ActorId::from(automerge_actor.clone());
let converted_back: automerge::ActorId = actor_id.into();
assert_eq!(automerge_actor, converted_back);
}
#[test]
fn test_snapshot_config_defaults() {
let config = SnapshotConfig::default();
assert_eq!(config.max_changes, 10_000);
assert_eq!(config.max_size_bytes, 10 * 1024 * 1024);
assert_eq!(config.compress_threshold_bytes, 1024 * 1024);
}
#[test]
fn test_rate_limit_config_defaults() {
let config = RateLimitConfig::default();
assert_eq!(config.max_requests_per_second, 10);
assert_eq!(config.max_bytes_per_second, 1024 * 1024);
assert_eq!(config.burst_size, 5);
}
}