From 088331a93319414d61fccbb1f6c3585b18a926cf Mon Sep 17 00:00:00 2001 From: Maksim Kirillov Date: Thu, 24 Jul 2025 14:50:49 +0200 Subject: [PATCH] Add the crate skeleton --- Cargo.lock | 196 +++++++++++++- Cargo.toml | 2 +- eigensync/Cargo.toml | 59 +++++ eigensync/src/bin/server.rs | 171 +++++++++++++ eigensync/src/client/behaviour.rs | 72 ++++++ eigensync/src/client/database.rs | 81 ++++++ eigensync/src/client/document.rs | 156 ++++++++++++ eigensync/src/client/mod.rs | 124 +++++++++ eigensync/src/client/sync_loop.rs | 104 ++++++++ eigensync/src/lib.rs | 70 +++++ eigensync/src/metrics.rs | 190 ++++++++++++++ eigensync/src/protocol.rs | 395 +++++++++++++++++++++++++++++ eigensync/src/server/behaviour.rs | 72 ++++++ eigensync/src/server/database.rs | 81 ++++++ eigensync/src/server/event_loop.rs | 74 ++++++ eigensync/src/server/mod.rs | 92 +++++++ eigensync/src/types.rs | 244 ++++++++++++++++++ 17 files changed, 2181 insertions(+), 2 deletions(-) create mode 100644 eigensync/Cargo.toml create mode 100644 eigensync/src/bin/server.rs create mode 100644 eigensync/src/client/behaviour.rs create mode 100644 eigensync/src/client/database.rs create mode 100644 eigensync/src/client/document.rs create mode 100644 eigensync/src/client/mod.rs create mode 100644 eigensync/src/client/sync_loop.rs create mode 100644 eigensync/src/lib.rs create mode 100644 eigensync/src/metrics.rs create mode 100644 eigensync/src/protocol.rs create mode 100644 eigensync/src/server/behaviour.rs create mode 100644 eigensync/src/server/database.rs create mode 100644 eigensync/src/server/event_loop.rs create mode 100644 eigensync/src/server/mod.rs create mode 100644 eigensync/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index 102c1181..6262416a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -750,6 +750,29 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "automerge" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f0dae93622d3c6850d196503480004576249e0e391bddb3f54600974d92a790" +dependencies = [ + "cfg-if", + "flate2", + "fxhash", + "hex", + "im", + "itertools 0.13.0", + "leb128", + "serde", + "sha2 0.10.9", + "smol_str", + "thiserror 1.0.69", + "tinyvec", + "tracing", + "unicode-segmentation", + "uuid", +] + [[package]] name = "axum" version = "0.7.9" @@ -1178,6 +1201,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bitmaps" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2" +dependencies = [ + "typenum", +] + [[package]] name = "bitvec" version = "1.0.1" @@ -2889,6 +2921,34 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "eigensync" +version = "0.1.0" +dependencies = [ + "anyhow", + "automerge", + "chrono", + "clap 4.5.41", + "futures", + "libp2p", + "opentelemetry", + "opentelemetry-prometheus", + "prometheus", + "rusqlite", + "rusqlite_migration", + "serde", + "serde_cbor", + "serde_json", + "tempfile", + "thiserror 1.0.69", + "time 0.3.41", + "tokio", + "toml 0.8.23", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "either" version = "1.15.0" @@ -4582,6 +4642,20 @@ dependencies = [ "xmltree", ] +[[package]] +name = "im" +version = "15.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0acd33ff0285af998aaf9b57342af478078f53492322fafc47450e09397e0e9" +dependencies = [ + "bitmaps", + "rand_core 0.6.4", + "rand_xoshiro", + "sized-chunks", + "typenum", + "version_check", +] + [[package]] name = "image" version = "0.25.6" @@ -4977,6 +5051,12 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "leb128" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" + [[package]] name = "libappindicator" version = "0.9.0" @@ -6759,6 +6839,51 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror 1.0.69", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e1a24eafe47b693cb938f8505f240dc26c71db60df9aca376b4f857e9653ec7" +dependencies = [ + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "prometheus", + "protobuf", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "lazy_static", + "once_cell", + "opentelemetry", + "ordered-float 4.6.0", + "thiserror 1.0.69", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -6774,6 +6899,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-stream" version = "0.2.0" @@ -7463,6 +7597,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.4", + "protobuf", + "thiserror 1.0.69", +] + [[package]] name = "prometheus-client" version = "0.22.3" @@ -7506,6 +7655,12 @@ dependencies = [ "unarray", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "ptr_meta" version = "0.1.4" @@ -7815,6 +7970,15 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "raw-window-handle" version = "0.6.2" @@ -8149,6 +8313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags 2.9.1", + "chrono", "fallible-iterator", "fallible-streaming-iterator", "hashlink 0.9.1", @@ -8157,6 +8322,16 @@ dependencies = [ "time 0.3.41", ] +[[package]] +name = "rusqlite_migration" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "923b42e802f7dc20a0a6b5e097ba7c83fe4289da07e49156fecf6af08aa9cd1c" +dependencies = [ + "log", + "rusqlite", +] + [[package]] name = "rust_decimal" version = "1.37.2" @@ -8755,7 +8930,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" dependencies = [ - "ordered-float", + "ordered-float 2.10.1", "serde", ] @@ -9174,6 +9349,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "sized-chunks" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d69225bde7a69b235da73377861095455d298f2b970996eec25ddbb42b3d1e" +dependencies = [ + "bitmaps", + "typenum", +] + [[package]] name = "slab" version = "0.4.10" @@ -9228,6 +9413,15 @@ dependencies = [ "serde", ] +[[package]] +name = "smol_str" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd538fb6910ac1099850255cf94a94df6551fbdd602454387d0adb2d1ca6dead" +dependencies = [ + "serde", +] + [[package]] name = "snow" version = "0.9.6" diff --git a/Cargo.toml b/Cargo.toml index 4ed59411..47c5e94c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = [ +members = [ "eigensync", "electrum-pool", "monero-rpc", "monero-rpc-pool", diff --git a/eigensync/Cargo.toml b/eigensync/Cargo.toml new file mode 100644 index 00000000..bde1c8b0 --- /dev/null +++ b/eigensync/Cargo.toml @@ -0,0 +1,59 @@ +[package] +name = "eigensync" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "eigensync-server" +path = "src/bin/server.rs" +required-features = ["server"] + +[features] +default = ["client"] +server = ["dep:clap", "dep:toml", "dep:tracing-subscriber"] +client = [] +metrics = ["dep:opentelemetry", "dep:opentelemetry-prometheus", "dep:prometheus"] + +[dependencies] +# Core dependencies +anyhow = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "fs", "signal"] } +tracing = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +uuid = { workspace = true } + +# Automerge +automerge = "0.5" + +# Networking +libp2p = { workspace = true, features = ["request-response", "tcp", "noise", "yamux", "identify", "ping", "mdns"] } +futures = { workspace = true } + +# Database +rusqlite = { version = "0.32", features = ["bundled", "chrono"] } +rusqlite_migration = "1.2" + +# Serialization +serde_cbor = "0.11" + +# Time handling +chrono = { version = "0.4", features = ["serde"] } +time = { version = "0.3", features = ["serde", "formatting"] } + +# Server-only dependencies +clap = { version = "4.0", features = ["derive"], optional = true } +toml = { version = "0.8", optional = true } +tracing-subscriber = { workspace = true, optional = true } + +# Metrics (optional) +opentelemetry = { version = "0.23", optional = true } +opentelemetry-prometheus = { version = "0.16", optional = true } +prometheus = { version = "0.13", optional = true } + +[dev-dependencies] +tempfile = "3.0" + +[lints] +workspace = true diff --git a/eigensync/src/bin/server.rs b/eigensync/src/bin/server.rs new file mode 100644 index 00000000..d9ea39ca --- /dev/null +++ b/eigensync/src/bin/server.rs @@ -0,0 +1,171 @@ +//! Eigensync server binary + +use clap::{Parser, Subcommand}; +use eigensync::{Server, ServerConfig}; +use std::path::PathBuf; +use tracing::{info, warn}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[derive(Parser)] +#[command(name = "eigensync-server")] +#[command(about = "Eigensync distributed state synchronization server")] +#[command(version)] +struct Cli { + /// Configuration file path + #[arg(short, long)] + config: Option, + + /// Database path + #[arg(long)] + database: Option, + + /// Listen address + #[arg(short, long, default_value = "0.0.0.0")] + listen_address: String, + + /// Listen port + #[arg(short, long, default_value = "9944")] + port: u16, + + /// Maximum number of peers + #[arg(long, default_value = "100")] + max_peers: u32, + + /// Enable debug logging + #[arg(short, long)] + debug: bool, + + /// Enable JSON logging + #[arg(long)] + json: bool, + + #[command(subcommand)] + command: Option, +} + +#[derive(Clone, Subcommand)] +enum Commands { + /// Run the server + Run, + /// Generate default configuration + GenerateConfig { + /// Output file path + #[arg(short, long)] + output: Option, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + + // Initialize logging + let filter = if cli.debug { + "debug,eigensync=trace" + } else { + "info,eigensync=debug" + }; + + if cli.json { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(filter)) + .with(tracing_subscriber::fmt::layer().json()) + .init(); + } else { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(filter)) + .with(tracing_subscriber::fmt::layer().pretty()) + .init(); + } + + info!("Starting eigensync server v{}", env!("CARGO_PKG_VERSION")); + + // Handle subcommands + let command = cli.command.clone().unwrap_or(Commands::Run); + match command { + Commands::Run => { + run_server(cli).await?; + } + Commands::GenerateConfig { output } => { + generate_config(output)?; + } + } + + Ok(()) +} + +async fn run_server(cli: Cli) -> Result<(), Box> { + // Load or create configuration + let mut config = if let Some(config_path) = cli.config { + if config_path.exists() { + info!("Loading configuration from {:?}", config_path); + // TODO: Implement config file loading + ServerConfig::default() + } else { + warn!("Configuration file {:?} not found, using defaults", config_path); + ServerConfig::default() + } + } else { + info!("No configuration file specified, using defaults"); + ServerConfig::default() + }; + + // Override config with CLI arguments + if let Some(database_path) = cli.database { + config.database_path = database_path; + } + config.listen_address = cli.listen_address; + config.listen_port = cli.port; + config.max_peers = cli.max_peers; + + info!("Server configuration:"); + info!(" Database: {:?}", config.database_path); + info!(" Listen: {}:{}", config.listen_address, config.listen_port); + info!(" Max peers: {}", config.max_peers); + + // Create and run server + let server = Server::new(config).await?; + + // Set up signal handling for graceful shutdown + let shutdown_signal = async { + tokio::signal::ctrl_c() + .await + .expect("Failed to install CTRL+C signal handler"); + info!("Received shutdown signal"); + }; + + // Run server with graceful shutdown + tokio::select! { + result = server.run() => { + match result { + Ok(_) => info!("Server completed successfully"), + Err(e) => { + tracing::error!("Server error: {}", e); + return Err(e.into()); + } + } + } + _ = shutdown_signal => { + info!("Shutting down server gracefully"); + } + } + + Ok(()) +} + +fn generate_config(output: Option) -> Result<(), Box> { + let config = ServerConfig::default(); + let config_toml = toml::to_string_pretty(&config)?; + + match output { + Some(path) => { + std::fs::write(&path, config_toml)?; + info!("Generated configuration file: {:?}", path); + } + None => { + println!("{}", config_toml); + } + } + + Ok(()) +} \ No newline at end of file diff --git a/eigensync/src/client/behaviour.rs b/eigensync/src/client/behaviour.rs new file mode 100644 index 00000000..8c86261a --- /dev/null +++ b/eigensync/src/client/behaviour.rs @@ -0,0 +1,72 @@ +//! libp2p networking behaviour for eigensync client + +use crate::protocol::{EigensyncRequest, EigensyncResponse}; +use crate::types::{Result, PeerId}; + +/// libp2p behaviour for eigensync client (placeholder) +pub struct ClientBehaviour { + // TODO: Add actual behaviour components + // request_response: RequestResponse, + // identify: Identify, + // ping: Ping, +} + +impl ClientBehaviour { + /// Create a new client behaviour + pub fn new() -> Self { + tracing::debug!("Creating client behaviour"); + + // TODO: Initialize behaviour components + Self { + // request_response: RequestResponse::new(...), + // identify: Identify::new(...), + // ping: Ping::default(), + } + } + + /// Send a request to the server + pub async fn send_request( + &mut self, + server_peer_id: PeerId, + request: EigensyncRequest, + ) -> Result { + tracing::debug!("Sending request to server {}: {:?}", server_peer_id, request); + + // TODO: Implement request sending + match request { + EigensyncRequest::GetChanges(_params) => { + // TODO: Handle GetChanges request + todo!("GetChanges request not implemented") + }, + EigensyncRequest::SubmitChanges(_params) => { + // TODO: Handle SubmitChanges request + todo!("SubmitChanges request not implemented") + }, + EigensyncRequest::Ping(_params) => { + // TODO: Handle Ping request + todo!("Ping request not implemented") + }, + EigensyncRequest::GetStatus(_params) => { + // TODO: Handle GetStatus request + todo!("GetStatus request not implemented") + }, + } + } +} + +impl Default for ClientBehaviour { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_behaviour_creation() { + let _behaviour = ClientBehaviour::new(); + // Behaviour creation should not panic + } +} \ No newline at end of file diff --git a/eigensync/src/client/database.rs b/eigensync/src/client/database.rs new file mode 100644 index 00000000..0853c328 --- /dev/null +++ b/eigensync/src/client/database.rs @@ -0,0 +1,81 @@ +//! Client database layer for caching documents and metadata + +use crate::types::{Result, ActorId}; +use rusqlite::Connection; +use std::path::Path; + +/// Client database for caching Automerge documents and metadata +pub struct ClientDatabase { + connection: Connection, +} + +impl ClientDatabase { + /// Open or create a client cache database + pub async fn open>(path: P) -> Result { + tracing::info!("Opening client cache database at {:?}", path.as_ref()); + + // TODO: Implement actual database opening and migration + let connection = Connection::open(path)?; + + Ok(Self { connection }) + } + + /// Store an Automerge document + pub async fn store_document( + &self, + document_id: &str, + document_data: &[u8], + ) -> Result<()> { + tracing::debug!("Storing document {}", document_id); + + // TODO: Implement document storage + Ok(()) + } + + /// Load an Automerge document + pub async fn load_document(&self, document_id: &str) -> Result>> { + tracing::debug!("Loading document {}", document_id); + + // TODO: Implement document loading + Ok(None) + } + + /// Store document metadata + pub async fn store_metadata( + &self, + document_id: &str, + last_sync: chrono::DateTime, + heads: &[automerge::ChangeHash], + ) -> Result<()> { + tracing::debug!("Storing metadata for document {}", document_id); + + // TODO: Implement metadata storage + Ok(()) + } + + /// Load document metadata + pub async fn load_metadata( + &self, + document_id: &str, + ) -> Result, Vec)>> { + tracing::debug!("Loading metadata for document {}", document_id); + + // TODO: Implement metadata loading + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[tokio::test] + async fn test_client_database_creation() { + let temp_dir = tempdir().unwrap(); + let db_path = temp_dir.path().join("test_cache.db"); + + let db = ClientDatabase::open(db_path).await; + assert!(db.is_ok()); + } +} \ No newline at end of file diff --git a/eigensync/src/client/document.rs b/eigensync/src/client/document.rs new file mode 100644 index 00000000..5ecfe8c2 --- /dev/null +++ b/eigensync/src/client/document.rs @@ -0,0 +1,156 @@ +//! Automerge document management for client + +use crate::types::{Result, ActorId}; +use automerge::{AutoCommit, ChangeHash}; +use serde_json::Value; +use std::collections::HashMap; + +/// Manager for Automerge documents +pub struct DocumentManager { + documents: HashMap, + actor_id: ActorId, +} + +impl DocumentManager { + /// Create a new document manager + pub fn new(actor_id: ActorId) -> Self { + tracing::debug!("Creating document manager for actor {}", actor_id); + + Self { + documents: HashMap::new(), + actor_id, + } + } + + /// Get or create a document for a swap + pub fn get_or_create_document(&mut self, document_id: &str) -> Result<&mut AutoCommit> { + tracing::debug!("Getting or creating document {}", document_id); + + if !self.documents.contains_key(document_id) { + // TODO: Try to load from cache first + let mut doc = AutoCommit::new(); + doc.set_actor(self.actor_id.0.clone()); + self.documents.insert(document_id.to_string(), doc); + } + + Ok(self.documents.get_mut(document_id).unwrap()) + } + + /// Append a swap state to a document + pub fn append_swap_state( + &mut self, + document_id: &str, + state_json: Value, + timestamp: chrono::DateTime, + ) -> Result> { + tracing::debug!("Appending swap state to document {}", document_id); + + let doc = self.get_or_create_document(document_id)?; + + // TODO: Implement state appending to Automerge document + // Structure: { "states": [ { "timestamp": "...", "state": {...} } ] } + + // For now, return empty changes + Ok(vec![]) + } + + /// Get the latest state from a document + pub fn get_latest_state(&self, document_id: &str) -> Result> { + tracing::debug!("Getting latest state from document {}", document_id); + + if let Some(doc) = self.documents.get(document_id) { + // TODO: Implement state retrieval from Automerge document + // Find entry with latest timestamp + } + + Ok(None) + } + + /// Apply changes to a document + pub fn apply_changes( + &mut self, + document_id: &str, + changes: &[automerge::Change], + ) -> Result<()> { + tracing::debug!("Applying {} changes to document {}", changes.len(), document_id); + + let doc = self.get_or_create_document(document_id)?; + + for change in changes { + doc.load_incremental(change.raw_bytes())?; + } + + Ok(()) + } + + /// Get document heads + pub fn get_heads(&mut self, document_id: &str) -> Vec { + if let Some(doc) = self.documents.get_mut(document_id) { + doc.get_heads() + } else { + vec![] + } + } + + /// Generate changes since given heads + pub fn changes_since( + &mut self, + document_id: &str, + heads: &[ChangeHash], + ) -> Result> { + if let Some(doc) = self.documents.get_mut(document_id) { + let changes = doc.get_changes(heads); + Ok(changes.into_iter().cloned().collect()) + } else { + Ok(vec![]) + } + } + + /// Serialize a document for storage + pub fn serialize_document(&mut self, document_id: &str) -> Result>> { + if let Some(doc) = self.documents.get_mut(document_id) { + let bytes = doc.save(); + Ok(Some(bytes)) + } else { + Ok(None) + } + } + + /// Load a document from serialized data + pub fn load_document(&mut self, document_id: &str, data: &[u8]) -> Result<()> { + tracing::debug!("Loading document {} from {} bytes", document_id, data.len()); + + let mut doc = AutoCommit::load(data)?; + doc.set_actor(self.actor_id.0.clone()); + self.documents.insert(document_id.to_string(), doc); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_document_manager_creation() { + let actor_id = ActorId(automerge::ActorId::random()); + let _manager = DocumentManager::new(actor_id); + // Manager creation should not panic + } + + #[test] + fn test_get_or_create_document() { + let actor_id = ActorId(automerge::ActorId::random()); + let mut manager = DocumentManager::new(actor_id); + + // First call should create the document + let _doc1 = manager.get_or_create_document("test-doc").unwrap(); + + // Second call should return the existing document (test that no panic occurs) + let _doc2 = manager.get_or_create_document("test-doc").unwrap(); + + // Document should exist in the manager + assert!(manager.documents.contains_key("test-doc")); + } +} \ No newline at end of file diff --git a/eigensync/src/client/mod.rs b/eigensync/src/client/mod.rs new file mode 100644 index 00000000..5ba8d006 --- /dev/null +++ b/eigensync/src/client/mod.rs @@ -0,0 +1,124 @@ +//! Client-side components for eigensync + +use crate::types::{Result, PeerId, ActorId}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::time::Duration; + +pub mod database; +pub mod behaviour; +pub mod sync_loop; +pub mod document; + +/// Configuration for the eigensync client +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClientConfig { + /// Path to client cache database + pub cache_database_path: PathBuf, + /// Server address to connect to + pub server_address: String, + /// Server port to connect to + pub server_port: u16, + /// Sync interval + pub sync_interval: Duration, + /// Connection timeout + pub connection_timeout: Duration, + /// Actor ID for this client + pub actor_id: ActorId, +} + +impl Default for ClientConfig { + fn default() -> Self { + Self { + cache_database_path: PathBuf::from("eigensync_cache.sqlite"), + server_address: "127.0.0.1".to_string(), + server_port: 9944, + sync_interval: Duration::from_secs(30), + connection_timeout: Duration::from_secs(10), + actor_id: ActorId(automerge::ActorId::random()), + } + } +} + +/// Main client struct (placeholder implementation) +pub struct Client { + config: ClientConfig, +} + +impl Client { + /// Create a new client with the given configuration + pub async fn new(config: ClientConfig) -> Result { + tracing::info!("Creating eigensync client with config: {:?}", config); + + // TODO: Initialize database, networking, etc. + + Ok(Self { config }) + } + + /// Start the client sync loop + pub async fn start_sync(&mut self) -> Result<()> { + tracing::info!("Starting eigensync client sync"); + + // TODO: Implement client sync loop + + Ok(()) + } + + /// Append a swap state to the local document + pub async fn append_swap_state( + &mut self, + swap_id: uuid::Uuid, + state_json: serde_json::Value, + timestamp: chrono::DateTime, + ) -> Result<()> { + tracing::debug!("Appending swap state for {}: {:?}", swap_id, state_json); + + // TODO: Implement state appending + // 1. Load Automerge document for swap_id + // 2. Add new state entry with timestamp + // 3. Generate patch + // 4. Store locally and mark for sync + + Ok(()) + } + + /// Get the latest state for a swap + pub async fn get_latest_swap_state( + &self, + swap_id: uuid::Uuid, + ) -> Result> { + tracing::debug!("Getting latest swap state for {}", swap_id); + + // TODO: Implement state retrieval + // 1. Load Automerge document for swap_id + // 2. Find entry with latest timestamp + // 3. Return state JSON + + Ok(None) + } + + /// Get client configuration + pub fn config(&self) -> &ClientConfig { + &self.config + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_config_default() { + let config = ClientConfig::default(); + assert_eq!(config.server_address, "127.0.0.1"); + assert_eq!(config.server_port, 9944); + assert_eq!(config.sync_interval, Duration::from_secs(30)); + } + + #[tokio::test] + async fn test_client_creation() { + let config = ClientConfig::default(); + let client = Client::new(config).await; + assert!(client.is_ok()); + } +} \ No newline at end of file diff --git a/eigensync/src/client/sync_loop.rs b/eigensync/src/client/sync_loop.rs new file mode 100644 index 00000000..77ad88a9 --- /dev/null +++ b/eigensync/src/client/sync_loop.rs @@ -0,0 +1,104 @@ +//! Client sync loop for periodic synchronization with server + +use crate::client::behaviour::ClientBehaviour; +use crate::client::database::ClientDatabase; +use crate::types::{Result, PeerId}; +use std::time::Duration; + +/// Client sync loop for periodic synchronization +pub struct ClientSyncLoop { + behaviour: ClientBehaviour, + database: ClientDatabase, + server_peer_id: PeerId, + sync_interval: Duration, +} + +impl ClientSyncLoop { + /// Create a new client sync loop + pub fn new( + behaviour: ClientBehaviour, + database: ClientDatabase, + server_peer_id: PeerId, + sync_interval: Duration, + ) -> Self { + tracing::debug!("Creating client sync loop with interval {:?}", sync_interval); + + Self { + behaviour, + database, + server_peer_id, + sync_interval, + } + } + + /// Run the sync loop + pub async fn run(mut self) -> Result<()> { + tracing::info!("Starting client sync loop"); + + let mut interval = tokio::time::interval(self.sync_interval); + + loop { + interval.tick().await; + + match self.perform_sync().await { + Ok(_) => { + tracing::debug!("Sync completed successfully"); + }, + Err(e) => { + tracing::warn!("Sync failed: {}", e); + // Continue syncing despite errors + } + } + } + } + + /// Perform a single sync operation + async fn perform_sync(&mut self) -> Result<()> { + tracing::debug!("Performing sync with server {}", self.server_peer_id); + + // TODO: Implement sync logic: + // 1. Get list of documents to sync + // 2. For each document: + // - Get local heads/metadata + // - Request changes since last sync + // - Apply received changes + // - Submit any local changes + // - Update local metadata + + Ok(()) + } + + /// Sync a specific document + pub async fn sync_document(&mut self, document_id: &str) -> Result<()> { + tracing::debug!("Syncing document {}", document_id); + + // TODO: Implement document-specific sync + Ok(()) + } + + /// Force immediate sync (outside of normal interval) + pub async fn sync_now(&mut self) -> Result<()> { + tracing::info!("Forcing immediate sync"); + + self.perform_sync().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[tokio::test] + async fn test_sync_loop_creation() { + let behaviour = ClientBehaviour::new(); + let temp_dir = tempdir().unwrap(); + let db_path = temp_dir.path().join("test_cache.db"); + let database = crate::client::database::ClientDatabase::open(db_path).await.unwrap(); + let server_peer_id = PeerId::random(); + let sync_interval = Duration::from_secs(1); + + let _sync_loop = ClientSyncLoop::new(behaviour, database, server_peer_id, sync_interval); + // Sync loop creation should not panic + } +} \ No newline at end of file diff --git a/eigensync/src/lib.rs b/eigensync/src/lib.rs new file mode 100644 index 00000000..493a23ab --- /dev/null +++ b/eigensync/src/lib.rs @@ -0,0 +1,70 @@ +//! Eigensync: Distributed State Synchronization using Automerge CRDTs +//! +//! This crate provides a distributed state synchronization system built on top of +//! Automerge CRDTs and libp2p networking. It enables synchronizing append-only state +//! machines across multiple devices. +//! +//! # Features +//! +//! - **Append-only state synchronization**: Designed for state machines that only add states +//! - **Conflict-free replication**: Uses Automerge CRDTs to handle concurrent updates +//! - **Peer-to-peer networking**: Built on libp2p for reliable P2P communication +//! - **Persistent storage**: SQLite-based persistence for both server and client +//! - **Authentication**: PeerId-based authentication with ActorId mapping +//! +//! # Architecture +//! +//! The system consists of: +//! - **Server**: Stores and distributes patches per PeerId +//! - **Client**: Maintains local Automerge document and syncs with server +//! - **Protocol**: Request/response protocol for patch exchange + +pub mod types; +pub mod protocol; + +#[cfg(feature = "server")] +pub mod server; + +#[cfg(feature = "client")] +pub mod client; + +#[cfg(feature = "metrics")] +pub mod metrics; + +// Re-export commonly used types +pub use types::{Error, Result, ActorId, PeerId, DocumentState, PatchInfo}; +pub use protocol::{EigensyncMessage, EigensyncRequest, EigensyncResponse}; + +#[cfg(feature = "client")] +pub use client::{Client, ClientConfig}; + +#[cfg(feature = "server")] +pub use server::{Server, ServerConfig}; + +/// Version of the eigensync protocol +pub const PROTOCOL_VERSION: u32 = 1; + +/// Protocol name for libp2p +pub const PROTOCOL_NAME: &str = "/eigensync/1.0.0"; + +/// Main entry point for integrating eigensync with swap state machine +#[cfg(feature = "client")] +pub async fn append_state( + client: &mut Client, + swap_id: uuid::Uuid, + state_json: serde_json::Value, + timestamp: chrono::DateTime, +) -> Result<()> { + client.append_swap_state(swap_id, state_json, timestamp).await +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_protocol_constants() { + assert_eq!(PROTOCOL_VERSION, 1); + assert_eq!(PROTOCOL_NAME, "/eigensync/1.0.0"); + } +} diff --git a/eigensync/src/metrics.rs b/eigensync/src/metrics.rs new file mode 100644 index 00000000..88c627b1 --- /dev/null +++ b/eigensync/src/metrics.rs @@ -0,0 +1,190 @@ +//! Metrics and observability for eigensync + +#[cfg(feature = "metrics")] +use prometheus::{Counter, Histogram, Registry}; +use std::time::Instant; + +/// Metrics collector for eigensync operations +pub struct EigensyncMetrics { + #[cfg(feature = "metrics")] + registry: Registry, + #[cfg(feature = "metrics")] + changes_sent: Counter, + #[cfg(feature = "metrics")] + changes_received: Counter, + #[cfg(feature = "metrics")] + sync_duration: Histogram, + #[cfg(feature = "metrics")] + rtt_histogram: Histogram, +} + +impl EigensyncMetrics { + /// Create a new metrics collector + pub fn new() -> Self { + tracing::debug!("Creating eigensync metrics collector"); + + #[cfg(feature = "metrics")] + { + let registry = Registry::new(); + + let changes_sent = Counter::new( + "eigensync_changes_sent_total", + "Total number of changes sent" + ).expect("Failed to create changes_sent counter"); + + let changes_received = Counter::new( + "eigensync_changes_received_total", + "Total number of changes received" + ).expect("Failed to create changes_received counter"); + + let sync_duration = Histogram::with_opts( + prometheus::HistogramOpts::new( + "eigensync_sync_duration_seconds", + "Duration of sync operations in seconds" + ) + ).expect("Failed to create sync_duration histogram"); + + let rtt_histogram = Histogram::with_opts( + prometheus::HistogramOpts::new( + "eigensync_request_rtt_seconds", + "Round-trip time for requests in seconds" + ) + ).expect("Failed to create rtt histogram"); + + registry.register(Box::new(changes_sent.clone())).unwrap(); + registry.register(Box::new(changes_received.clone())).unwrap(); + registry.register(Box::new(sync_duration.clone())).unwrap(); + registry.register(Box::new(rtt_histogram.clone())).unwrap(); + + Self { + registry, + changes_sent, + changes_received, + sync_duration, + rtt_histogram, + } + } + + #[cfg(not(feature = "metrics"))] + { + Self {} + } + } + + /// Record changes sent + pub fn record_changes_sent(&self, count: u64) { + #[cfg(feature = "metrics")] + { + self.changes_sent.inc_by(count as f64); + } + + tracing::debug!("Recorded {} changes sent", count); + } + + /// Record changes received + pub fn record_changes_received(&self, count: u64) { + #[cfg(feature = "metrics")] + { + self.changes_received.inc_by(count as f64); + } + + tracing::debug!("Recorded {} changes received", count); + } + + /// Record sync operation duration + pub fn record_sync_duration(&self, duration: std::time::Duration) { + #[cfg(feature = "metrics")] + { + self.sync_duration.observe(duration.as_secs_f64()); + } + + tracing::debug!("Recorded sync duration: {:?}", duration); + } + + /// Record request round-trip time + pub fn record_rtt(&self, rtt: std::time::Duration) { + #[cfg(feature = "metrics")] + { + self.rtt_histogram.observe(rtt.as_secs_f64()); + } + + tracing::debug!("Recorded RTT: {:?}", rtt); + } + + /// Get metrics registry (for Prometheus export) + #[cfg(feature = "metrics")] + pub fn registry(&self) -> &Registry { + &self.registry + } +} + +impl Default for EigensyncMetrics { + fn default() -> Self { + Self::new() + } +} + +/// Timer for measuring operation duration +pub struct Timer { + start: Instant, + label: String, +} + +impl Timer { + /// Start a new timer + pub fn new(label: impl Into) -> Self { + Self { + start: Instant::now(), + label: label.into(), + } + } + + /// Stop the timer and return the elapsed duration + pub fn stop(self) -> std::time::Duration { + let duration = self.start.elapsed(); + tracing::debug!("Timer '{}' finished in {:?}", self.label, duration); + duration + } +} + +/// Macro for timing operations +#[macro_export] +macro_rules! time_operation { + ($metrics:expr, $operation:expr, $block:block) => {{ + let timer = crate::metrics::Timer::new($operation); + let result = $block; + let duration = timer.stop(); + $metrics.record_sync_duration(duration); + result + }}; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_metrics_creation() { + let _metrics = EigensyncMetrics::new(); + // Metrics creation should not panic + } + + #[test] + fn test_timer() { + let timer = Timer::new("test"); + std::thread::sleep(std::time::Duration::from_millis(1)); + let duration = timer.stop(); + assert!(duration.as_millis() >= 1); + } + + #[test] + fn test_metrics_recording() { + let metrics = EigensyncMetrics::new(); + + // These should not panic + metrics.record_changes_sent(5); + metrics.record_changes_received(3); + metrics.record_sync_duration(std::time::Duration::from_millis(100)); + metrics.record_rtt(std::time::Duration::from_millis(50)); + } +} \ No newline at end of file diff --git a/eigensync/src/protocol.rs b/eigensync/src/protocol.rs new file mode 100644 index 00000000..0c2c45d5 --- /dev/null +++ b/eigensync/src/protocol.rs @@ -0,0 +1,395 @@ +//! Protocol definitions for eigensync communication +//! +//! This module defines the wire protocol used for communication between +//! eigensync clients and servers. The protocol is versioned and uses +//! serde_cbor for serialization with length-prefixed frames. + +use crate::types::{ActorId, PeerId, Result, Error}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +/// Protocol version for version negotiation +pub const CURRENT_VERSION: u32 = 1; + +/// Maximum message size to prevent DoS attacks (10 MB) +pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; + +/// Default request timeout +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); + +/// Main message envelope for all eigensync communications +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EigensyncMessage { + /// Protocol version + pub version: u32, + /// Unique request identifier for matching responses + pub request_id: uuid::Uuid, + /// Message payload + pub payload: EigensyncPayload, +} + +/// Union type for all message payloads +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data")] +pub enum EigensyncPayload { + Request(EigensyncRequest), + Response(EigensyncResponse), +} + +/// All possible request types +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", content = "params")] +pub enum EigensyncRequest { + /// Get changes from server since a given point + GetChanges(GetChangesParams), + /// Submit new changes to server + SubmitChanges(SubmitChangesParams), + /// Ping for connectivity testing + Ping(PingParams), + /// Get server status/info + GetStatus(GetStatusParams), +} + +/// All possible response types +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", content = "result")] +pub enum EigensyncResponse { + /// Response to GetChanges request + GetChanges(GetChangesResult), + /// Response to SubmitChanges request + SubmitChanges(SubmitChangesResult), + /// Response to Ping request + Ping(PingResult), + /// Response to GetStatus request + GetStatus(GetStatusResult), + /// Error response for any request + Error(ErrorResult), +} + +// Request parameters + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetChangesParams { + /// Document to get changes for (typically swap_id) + pub document_id: String, + /// Only return changes after this sequence number + pub since_sequence: Option, + /// Only return changes after this timestamp + pub since_timestamp: Option>, + /// Maximum number of changes to return (for pagination) + pub limit: Option, + /// Automerge heads we already have (to optimize sync) + pub have_heads: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubmitChangesParams { + /// Document these changes apply to + pub document_id: String, + /// Serialized Automerge changes + pub changes: Vec>, + /// Actor ID that created these changes + pub actor_id: ActorId, + /// Expected sequence number for optimistic concurrency control + pub expected_sequence: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PingParams { + /// Timestamp when ping was sent + pub timestamp: chrono::DateTime, + /// Optional payload for bandwidth testing + pub payload: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetStatusParams { + /// Include detailed statistics + pub include_stats: bool, + /// Include information about other peers + pub include_peers: bool, +} + +// Response results + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetChangesResult { + /// Document ID these changes apply to + pub document_id: String, + /// Serialized Automerge changes + pub changes: Vec>, + /// Sequence numbers for each change + pub sequences: Vec, + /// Whether there are more changes available + pub has_more: bool, + /// Current document heads after applying these changes + pub new_heads: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubmitChangesResult { + /// Document ID changes were applied to + pub document_id: String, + /// Sequence numbers assigned to the submitted changes + pub assigned_sequences: Vec, + /// Number of changes that were actually new (not duplicates) + pub new_changes_count: u32, + /// Current document heads after applying changes + pub new_heads: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PingResult { + /// Timestamp from the request + pub request_timestamp: chrono::DateTime, + /// Timestamp when server processed the request + pub response_timestamp: chrono::DateTime, + /// Echo back any payload that was sent + pub payload: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetStatusResult { + /// Server version/build info + pub server_version: String, + /// Protocol versions supported + pub supported_versions: Vec, + /// Server uptime in seconds + pub uptime_seconds: u64, + /// Number of connected peers + pub connected_peers: u32, + /// Number of documents being tracked + pub document_count: u64, + /// Total number of changes stored + pub total_changes: u64, + /// Optional detailed statistics + pub stats: Option, + /// Optional peer information + pub peers: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorResult { + /// Error code for programmatic handling + pub code: ErrorCode, + /// Human-readable error message + pub message: String, + /// Optional additional details + pub details: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServerStats { + /// Total bytes stored + pub total_bytes: u64, + /// Bytes sent since startup + pub bytes_sent: u64, + /// Bytes received since startup + pub bytes_received: u64, + /// Number of requests processed + pub requests_processed: u64, + /// Average request processing time in milliseconds + pub avg_request_time_ms: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerStatus { + /// Peer ID + pub peer_id: String, + /// Associated actor ID if authenticated + pub actor_id: Option, + /// When peer connected + pub connected_at: chrono::DateTime, + /// Last activity timestamp + pub last_activity: chrono::DateTime, + /// Number of documents this peer is syncing + pub document_count: u32, +} + +/// Error codes for programmatic error handling +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u32)] +pub enum ErrorCode { + /// Unknown or internal server error + InternalError = 1000, + /// Invalid request format or parameters + InvalidRequest = 1001, + /// Authentication failed + AuthenticationFailed = 1002, + /// Requested resource not found + NotFound = 1003, + /// Rate limit exceeded + RateLimitExceeded = 1004, + /// Storage quota exceeded + QuotaExceeded = 1005, + /// Version not supported + UnsupportedVersion = 1006, + /// Request timeout + Timeout = 1007, + /// Conflict in optimistic concurrency control + Conflict = 1008, + /// Invalid actor/peer mapping + InvalidActorMapping = 1009, +} + +impl ErrorCode { + /// Convert error code to human-readable string + pub fn as_str(&self) -> &'static str { + match self { + ErrorCode::InternalError => "internal_error", + ErrorCode::InvalidRequest => "invalid_request", + ErrorCode::AuthenticationFailed => "authentication_failed", + ErrorCode::NotFound => "not_found", + ErrorCode::RateLimitExceeded => "rate_limit_exceeded", + ErrorCode::QuotaExceeded => "quota_exceeded", + ErrorCode::UnsupportedVersion => "unsupported_version", + ErrorCode::Timeout => "timeout", + ErrorCode::Conflict => "conflict", + ErrorCode::InvalidActorMapping => "invalid_actor_mapping", + } + } +} + +impl From for Error { + fn from(code: ErrorCode) -> Self { + Error::Protocol { + message: format!("Protocol error: {}", code.as_str()), + } + } +} + +/// Codec for serializing/deserializing eigensync messages +pub struct EigensyncCodec; + +impl EigensyncCodec { + /// Serialize message to bytes with length prefix + pub fn encode(message: &EigensyncMessage) -> Result> { + let payload = serde_cbor::to_vec(message)?; + + if payload.len() > MAX_MESSAGE_SIZE { + return Err(Error::Protocol { + message: format!( + "Message too large: {} bytes > {} max", + payload.len(), + MAX_MESSAGE_SIZE + ), + }); + } + + let mut result = Vec::with_capacity(4 + payload.len()); + result.extend_from_slice(&(payload.len() as u32).to_be_bytes()); + result.extend_from_slice(&payload); + Ok(result) + } + + /// Deserialize message from bytes (assumes length prefix already read) + pub fn decode(data: &[u8]) -> Result { + if data.len() > MAX_MESSAGE_SIZE { + return Err(Error::Protocol { + message: format!( + "Message too large: {} bytes > {} max", + data.len(), + MAX_MESSAGE_SIZE + ), + }); + } + + let message: EigensyncMessage = serde_cbor::from_slice(data)?; + + // Validate version + if message.version > CURRENT_VERSION { + return Err(Error::Protocol { + message: format!( + "Unsupported protocol version: {} > {}", + message.version, + CURRENT_VERSION + ), + }); + } + + Ok(message) + } + + /// Create a request message + pub fn create_request(request: EigensyncRequest) -> EigensyncMessage { + EigensyncMessage { + version: CURRENT_VERSION, + request_id: uuid::Uuid::new_v4(), + payload: EigensyncPayload::Request(request), + } + } + + /// Create a response message + pub fn create_response( + request_id: uuid::Uuid, + response: EigensyncResponse, + ) -> EigensyncMessage { + EigensyncMessage { + version: CURRENT_VERSION, + request_id, + payload: EigensyncPayload::Response(response), + } + } + + /// Create an error response + pub fn create_error_response( + request_id: uuid::Uuid, + code: ErrorCode, + message: String, + ) -> EigensyncMessage { + Self::create_response( + request_id, + EigensyncResponse::Error(ErrorResult { + code, + message, + details: None, + }), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_codec_roundtrip() { + let request = EigensyncRequest::Ping(PingParams { + timestamp: chrono::Utc::now(), + payload: Some(b"test".to_vec()), + }); + + let message = EigensyncCodec::create_request(request); + let encoded = EigensyncCodec::encode(&message).unwrap(); + + // Skip the length prefix for decoding + let decoded = EigensyncCodec::decode(&encoded[4..]).unwrap(); + + assert_eq!(message.version, decoded.version); + assert_eq!(message.request_id, decoded.request_id); + } + + #[test] + fn test_message_size_limit() { + let large_payload = vec![0u8; MAX_MESSAGE_SIZE + 1]; + let request = EigensyncRequest::SubmitChanges(SubmitChangesParams { + document_id: "test".to_string(), + changes: vec![large_payload], + actor_id: ActorId(automerge::ActorId::random()), + expected_sequence: None, + }); + + let message = EigensyncCodec::create_request(request); + let result = EigensyncCodec::encode(&message); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Message too large")); + } + + #[test] + fn test_error_codes() { + assert_eq!(ErrorCode::InternalError.as_str(), "internal_error"); + assert_eq!(ErrorCode::AuthenticationFailed.as_str(), "authentication_failed"); + assert_eq!(ErrorCode::NotFound.as_str(), "not_found"); + } +} \ No newline at end of file diff --git a/eigensync/src/server/behaviour.rs b/eigensync/src/server/behaviour.rs new file mode 100644 index 00000000..33e6fb19 --- /dev/null +++ b/eigensync/src/server/behaviour.rs @@ -0,0 +1,72 @@ +//! libp2p networking behaviour for eigensync server + +use crate::protocol::{EigensyncMessage, EigensyncRequest, EigensyncResponse}; +use crate::types::{Result, PeerId}; + +/// libp2p behaviour for eigensync server (placeholder) +pub struct ServerBehaviour { + // TODO: Add actual behaviour components + // request_response: RequestResponse, + // identify: Identify, + // ping: Ping, +} + +impl ServerBehaviour { + /// Create a new server behaviour + pub fn new() -> Self { + tracing::debug!("Creating server behaviour"); + + // TODO: Initialize behaviour components + Self { + // request_response: RequestResponse::new(...), + // identify: Identify::new(...), + // ping: Ping::default(), + } + } + + /// Handle incoming request + pub async fn handle_request( + &mut self, + peer_id: PeerId, + request: EigensyncRequest, + ) -> Result { + tracing::debug!("Handling request from peer {}: {:?}", peer_id, request); + + // TODO: Implement request handling + match request { + EigensyncRequest::GetChanges(_params) => { + // TODO: Handle GetChanges + todo!("GetChanges not implemented") + }, + EigensyncRequest::SubmitChanges(_params) => { + // TODO: Handle SubmitChanges + todo!("SubmitChanges not implemented") + }, + EigensyncRequest::Ping(_params) => { + // TODO: Handle Ping + todo!("Ping not implemented") + }, + EigensyncRequest::GetStatus(_params) => { + // TODO: Handle GetStatus + todo!("GetStatus not implemented") + }, + } + } +} + +impl Default for ServerBehaviour { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_behaviour_creation() { + let _behaviour = ServerBehaviour::new(); + // Behaviour creation should not panic + } +} \ No newline at end of file diff --git a/eigensync/src/server/database.rs b/eigensync/src/server/database.rs new file mode 100644 index 00000000..7d71804e --- /dev/null +++ b/eigensync/src/server/database.rs @@ -0,0 +1,81 @@ +//! Server database layer for patch storage and peer management + +use crate::types::{Result, PeerId, ActorId}; +use rusqlite::Connection; +use std::path::Path; + +/// Server database for storing patches and peer information +pub struct ServerDatabase { + connection: Connection, +} + +impl ServerDatabase { + /// Open or create a server database + pub async fn open>(path: P) -> Result { + tracing::info!("Opening server database at {:?}", path.as_ref()); + + // TODO: Implement actual database opening and migration + let connection = Connection::open(path)?; + + Ok(Self { connection }) + } + + /// Store a patch for a peer + pub async fn store_patch( + &self, + peer_id: PeerId, + actor_id: ActorId, + document_id: &str, + patch_data: &[u8], + ) -> Result { + tracing::debug!("Storing patch for peer {} in document {}", peer_id, document_id); + + // TODO: Implement patch storage + Ok(0) + } + + /// Get patches for a peer since a given sequence number + pub async fn get_patches( + &self, + peer_id: PeerId, + document_id: &str, + since_sequence: Option, + ) -> Result)>> { + tracing::debug!("Getting patches for peer {} in document {} since {:?}", + peer_id, document_id, since_sequence); + + // TODO: Implement patch retrieval + Ok(vec![]) + } + + /// Bind a peer ID to an actor ID + pub async fn bind_peer_actor(&self, peer_id: PeerId, actor_id: ActorId) -> Result<()> { + tracing::debug!("Binding peer {} to actor {}", peer_id, actor_id); + + // TODO: Implement peer-actor binding + Ok(()) + } + + /// Get actor ID for a peer + pub async fn get_actor_for_peer(&self, peer_id: PeerId) -> Result> { + tracing::debug!("Getting actor for peer {}", peer_id); + + // TODO: Implement actor lookup + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[tokio::test] + async fn test_database_creation() { + let temp_dir = tempdir().unwrap(); + let db_path = temp_dir.path().join("test.db"); + + let db = ServerDatabase::open(db_path).await; + assert!(db.is_ok()); + } +} \ No newline at end of file diff --git a/eigensync/src/server/event_loop.rs b/eigensync/src/server/event_loop.rs new file mode 100644 index 00000000..7cf74711 --- /dev/null +++ b/eigensync/src/server/event_loop.rs @@ -0,0 +1,74 @@ +//! Server event loop for handling network events + +use crate::server::behaviour::ServerBehaviour; +use crate::server::database::ServerDatabase; +use crate::types::{Result, PeerId}; +use std::time::Duration; + +/// Server event loop for handling network events and database operations +pub struct ServerEventLoop { + behaviour: ServerBehaviour, + database: ServerDatabase, +} + +impl ServerEventLoop { + /// Create a new server event loop + pub fn new(behaviour: ServerBehaviour, database: ServerDatabase) -> Self { + tracing::debug!("Creating server event loop"); + + Self { + behaviour, + database, + } + } + + /// Run the server event loop + pub async fn run(mut self) -> Result<()> { + tracing::info!("Starting server event loop"); + + // TODO: Implement actual event loop with libp2p swarm + loop { + // Placeholder: just sleep for now + tokio::time::sleep(Duration::from_secs(1)).await; + + // TODO: + // - Poll swarm for events + // - Handle incoming requests + // - Manage peer connections + // - Perform periodic maintenance tasks + } + } + + /// Handle peer connection + pub async fn handle_peer_connected(&mut self, peer_id: PeerId) -> Result<()> { + tracing::info!("Peer connected: {}", peer_id); + + // TODO: Implement peer connection handling + Ok(()) + } + + /// Handle peer disconnection + pub async fn handle_peer_disconnected(&mut self, peer_id: PeerId) -> Result<()> { + tracing::info!("Peer disconnected: {}", peer_id); + + // TODO: Implement peer disconnection handling + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[tokio::test] + async fn test_event_loop_creation() { + let behaviour = ServerBehaviour::new(); + let temp_dir = tempdir().unwrap(); + let db_path = temp_dir.path().join("test.db"); + let database = ServerDatabase::open(db_path).await.unwrap(); + + let _event_loop = ServerEventLoop::new(behaviour, database); + // Event loop creation should not panic + } +} \ No newline at end of file diff --git a/eigensync/src/server/mod.rs b/eigensync/src/server/mod.rs new file mode 100644 index 00000000..c748d797 --- /dev/null +++ b/eigensync/src/server/mod.rs @@ -0,0 +1,92 @@ +//! Server-side components for eigensync + +use crate::types::{Result, PeerId, ActorId}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +pub mod database; +pub mod behaviour; +pub mod event_loop; + +/// Configuration for the eigensync server +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServerConfig { + /// Path to server database + pub database_path: PathBuf, + /// Address to listen on + pub listen_address: String, + /// Port to listen on + pub listen_port: u16, + /// Maximum number of connected peers + pub max_peers: u32, + /// Rate limiting configuration + pub rate_limit: crate::types::RateLimitConfig, + /// Snapshot configuration + pub snapshot_config: crate::types::SnapshotConfig, +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + database_path: PathBuf::from("server_patches.sqlite"), + listen_address: "0.0.0.0".to_string(), + listen_port: 9944, + max_peers: 100, + rate_limit: crate::types::RateLimitConfig::default(), + snapshot_config: crate::types::SnapshotConfig::default(), + } + } +} + +/// Main server struct (placeholder implementation) +pub struct Server { + config: ServerConfig, +} + +impl Server { + /// Create a new server with the given configuration + pub async fn new(config: ServerConfig) -> Result { + tracing::info!("Creating eigensync server with config: {:?}", config); + + // TODO: Initialize database, networking, etc. + + Ok(Self { config }) + } + + /// Start the server + pub async fn run(self) -> Result<()> { + tracing::info!("Starting eigensync server on {}:{}", + self.config.listen_address, self.config.listen_port); + + // TODO: Implement server event loop + + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + /// Get server configuration + pub fn config(&self) -> &ServerConfig { + &self.config + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_server_config_default() { + let config = ServerConfig::default(); + assert_eq!(config.listen_address, "0.0.0.0"); + assert_eq!(config.listen_port, 9944); + assert_eq!(config.max_peers, 100); + } + + #[tokio::test] + async fn test_server_creation() { + let config = ServerConfig::default(); + let server = Server::new(config).await; + assert!(server.is_ok()); + } +} \ No newline at end of file diff --git a/eigensync/src/types.rs b/eigensync/src/types.rs new file mode 100644 index 00000000..7544ae67 --- /dev/null +++ b/eigensync/src/types.rs @@ -0,0 +1,244 @@ +//! Common types and error definitions for eigensync + +use serde::{Deserialize, Serialize}; + +/// Result type alias for eigensync operations +pub type Result = std::result::Result; + +/// PeerId type alias +pub type PeerId = libp2p::PeerId; + +/// ActorId uniquely identifies an actor in the Automerge document +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ActorId(pub automerge::ActorId); + +impl From for ActorId { + fn from(actor_id: automerge::ActorId) -> Self { + Self(actor_id) + } +} + +impl From for automerge::ActorId { + fn from(actor_id: ActorId) -> Self { + actor_id.0 + } +} + +impl std::fmt::Display for ActorId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Comprehensive error types for eigensync operations +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Database error: {0}")] + Database(#[from] rusqlite::Error), + + #[error("Migration error: {0}")] + Migration(#[from] rusqlite_migration::Error), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_cbor::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("Automerge error: {0}")] + Automerge(#[from] automerge::AutomergeError), + + #[error("Network error: {0}")] + Network(#[from] libp2p::swarm::DialError), + + #[error("Protocol error: {message}")] + Protocol { message: String }, + + #[error("Authentication failed for peer {peer_id}: {reason}")] + Authentication { peer_id: PeerId, reason: String }, + + #[error("Document not found: {document_id}")] + DocumentNotFound { document_id: String }, + + #[error("Invalid configuration: {message}")] + InvalidConfig { message: String }, + + #[error("Timeout: {operation}")] + Timeout { operation: String }, + + #[error("Actor mapping conflict: peer {peer_id} tried to use actor {actor_id} already mapped to different peer")] + ActorMappingConflict { peer_id: PeerId, actor_id: ActorId }, + + #[error("Storage quota exceeded: {current_size} bytes")] + StorageQuotaExceeded { current_size: u64 }, +} + +/// Information about a patch/change in the system +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatchInfo { + /// Unique identifier for this patch + pub id: uuid::Uuid, + /// Actor that created this patch + pub actor_id: ActorId, + /// Timestamp when patch was created + pub timestamp: chrono::DateTime, + /// Size of the patch data in bytes + pub size_bytes: u64, + /// Hash of the patch content for integrity checking + pub content_hash: String, +} + +/// Current state of a document +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DocumentState { + /// Document identifier (typically swap_id) + pub document_id: String, + /// Current number of patches applied + pub patch_count: u64, + /// Total size of all patches in bytes + pub total_size_bytes: u64, + /// Timestamp of last update + pub last_updated: chrono::DateTime, + /// Current document heads + pub heads: Vec, +} + +/// Configuration for snapshot and garbage collection +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SnapshotConfig { + /// Trigger snapshot after this many changes + pub max_changes: u64, + /// Trigger snapshot after this many bytes + pub max_size_bytes: u64, + /// Whether to compress snapshots above this size + pub compress_threshold_bytes: u64, +} + +impl Default for SnapshotConfig { + fn default() -> Self { + Self { + max_changes: 10_000, + max_size_bytes: 10 * 1024 * 1024, // 10 MB + compress_threshold_bytes: 1024 * 1024, // 1 MB + } + } +} + +/// Metrics for monitoring sync operations +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct SyncMetrics { + /// Total number of changes sent + pub changes_sent: u64, + /// Total number of changes received + pub changes_received: u64, + /// Total number of bytes sent + pub bytes_sent: u64, + /// Total number of bytes received + pub bytes_received: u64, + /// Number of sync operations performed + pub sync_operations: u64, + /// Number of failed sync operations + pub sync_failures: u64, + /// Average round-trip time in milliseconds + pub avg_rtt_ms: f64, + /// Number of conflicts resolved + pub conflicts_resolved: u64, +} + +/// State of a peer connection +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum PeerState { + /// Peer is disconnected + Disconnected, + /// Peer is connecting + Connecting, + /// Peer is connected and authenticated + Connected, + /// Peer authentication failed + AuthenticationFailed, + /// Peer connection failed + ConnectionFailed, +} + +/// Information about a connected peer +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo { + /// Peer ID (as string) + pub peer_id: String, + /// Associated actor ID + pub actor_id: Option, + /// Current connection state + pub state: PeerState, + /// When the peer was first seen + pub first_seen: chrono::DateTime, + /// When the peer was last seen + pub last_seen: chrono::DateTime, + /// Sync metrics for this peer + pub metrics: SyncMetrics, +} + +/// A batch of changes/patches for efficient transmission +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChangeBatch { + /// Unique identifier for this batch + pub batch_id: uuid::Uuid, + /// Document this batch applies to + pub document_id: String, + /// The actual changes (serialized) + pub changes: Vec>, + /// Metadata about each change + pub patch_info: Vec, +} + +/// Configuration for rate limiting +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RateLimitConfig { + /// Maximum requests per second per peer + pub max_requests_per_second: u32, + /// Maximum bytes per second per peer + pub max_bytes_per_second: u64, + /// Burst allowance + pub burst_size: u32, +} + +impl Default for RateLimitConfig { + fn default() -> Self { + Self { + max_requests_per_second: 10, + max_bytes_per_second: 1024 * 1024, // 1 MB/s + burst_size: 5, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_actor_id_conversion() { + let automerge_actor = automerge::ActorId::random(); + let actor_id = ActorId::from(automerge_actor.clone()); + let converted_back: automerge::ActorId = actor_id.into(); + assert_eq!(automerge_actor, converted_back); + } + + #[test] + fn test_snapshot_config_defaults() { + let config = SnapshotConfig::default(); + assert_eq!(config.max_changes, 10_000); + assert_eq!(config.max_size_bytes, 10 * 1024 * 1024); + assert_eq!(config.compress_threshold_bytes, 1024 * 1024); + } + + #[test] + fn test_rate_limit_config_defaults() { + let config = RateLimitConfig::default(); + assert_eq!(config.max_requests_per_second, 10); + assert_eq!(config.max_bytes_per_second, 1024 * 1024); + assert_eq!(config.burst_size, 5); + } +} \ No newline at end of file