diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 45808558..38b89504 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -54,6 +54,9 @@ const ROUTING_TABLE: &str = "routing_table"; const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map"; const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key"; +// Critical sections +const LOCK_TAG_TICK: &str = "TICK"; + pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; #[derive(Clone, Debug)] diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index 1b22657e..385fe4b5 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -129,9 +129,11 @@ impl RoutingDomainEditor { } // Briefly pause routing table ticker while changes are made - if pause_tasks { - self.routing_table.pause_tasks(true).await; - } + let _tick_guard = if pause_tasks { + Some(self.routing_table.pause_tasks().await) + } else { + None + }; // Apply changes let mut changed = false; @@ -262,8 +264,5 @@ impl RoutingDomainEditor { rss.reset(); } } - - // Unpause routing table ticker - self.routing_table.pause_tasks(false).await; } } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 44c001a4..6da5cefa 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -2,6 +2,7 @@ use super::*; use weak_table::PtrWeakHashSet; const RECENT_PEERS_TABLE_SIZE: usize = 64; + pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; ////////////////////////////////////////////////////////////////////////// @@ -34,8 +35,9 @@ pub struct RoutingTableInner { pub(super) recent_peers: LruCache, /// Storage for private/safety RouteSpecs pub(super) route_spec_store: Option, - /// Tick paused or not - pub(super) tick_paused: bool, + /// Async tagged critical sections table + /// Tag: "tick" -> in ticker + pub(super) critical_sections: AsyncTagLockTable<&'static str>, } impl RoutingTableInner { @@ -52,7 +54,7 @@ impl RoutingTableInner { self_transfer_stats: TransferStatsDownUp::default(), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), route_spec_store: None, - tick_paused: false, + critical_sections: AsyncTagLockTable::new(), } } diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index 25a67cd0..0a599b26 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -126,9 +126,13 @@ impl RoutingTable { /// to run tick tasks which may run at slower tick rates as configured pub async fn tick(&self) -> EyreResult<()> { // Don't tick if paused - if self.inner.read().tick_paused { + let opt_tick_guard = { + let inner = self.inner.read(); + inner.critical_sections.try_lock_tag(LOCK_TAG_TICK) + }; + let Some(_tick_guard) = opt_tick_guard else { return Ok(()); - } + }; // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs self.unlocked_inner.rolling_transfers_task.tick().await?; @@ -183,22 +187,9 @@ impl RoutingTable { Ok(()) } - pub(crate) async fn pause_tasks(&self, paused: bool) { - let cancel = { - let mut inner = self.inner.write(); - if !inner.tick_paused && paused { - inner.tick_paused = true; - true - } else if inner.tick_paused && !paused { - inner.tick_paused = false; - false - } else { - false - } - }; - if cancel { - self.cancel_tasks().await; - } + pub(crate) async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> { + let critical_sections = self.inner.read().critical_sections.clone(); + critical_sections.lock_tag(LOCK_TAG_TICK).await } pub(crate) async fn cancel_tasks(&self) { diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 4dff0c0d..caeb894d 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -78,7 +78,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest, question, Some(question_context)) + self.question(dest.clone(), question, Some(question_context)) .await? ); @@ -99,29 +99,35 @@ impl RPCProcessor { }; let (value, peers, descriptor) = get_value_a.destructure(); + #[cfg(feature="debug-dht")] + { + let debug_string_value = value.as_ref().map(|v| { + format!(" len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + + let debug_string_answer = format!( + "OUT <== GetValueA({} #{}{}{} peers={}) <= {}", + key, + subkey, + debug_string_value, + if descriptor.is_some() { + " +desc" + } else { + "" + }, + peers.len(), + dest + ); - let debug_string_value = value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); - - let debug_string_answer = format!( - "OUT <== GetValueA({} #{}{}{} peers={})", - key, - subkey, - debug_string_value, - if descriptor.is_some() { - " +desc" - } else { - "" - }, - peers.len(), - ); - - log_rpc!(debug "{}", debug_string_answer); + log_rpc!(debug "{}", debug_string_answer); + + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + log_rpc!(debug "Peers: {:#?}", peers); + } // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { @@ -203,19 +209,22 @@ impl RPCProcessor { let routing_table = self.routing_table(); let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); - let debug_string = format!( - "IN <=== GetValueQ({} #{}{}) <== {}", - key, - subkey, - if want_descriptor { - " +wantdesc" - } else { - "" - }, - msg.header.direct_sender_node_id() - ); + #[cfg(feature="debug-dht")] + { + let debug_string = format!( + "IN <=== GetValueQ({} #{}{}) <== {}", + key, + subkey, + if want_descriptor { + " +wantdesc" + } else { + "" + }, + msg.header.direct_sender_node_id() + ); - log_rpc!(debug "{}", debug_string); + log_rpc!(debug "{}", debug_string); + } // See if we have this record ourselves let storage_manager = self.storage_manager(); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 62ad8da8..cab0d6e6 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -92,7 +92,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest, question, Some(question_context)) + self.question(dest.clone(), question, Some(question_context)) .await? ); @@ -114,28 +114,35 @@ impl RPCProcessor { }; let (set, value, peers) = set_value_a.destructure(); - - let debug_string_value = value.as_ref().map(|v| { - format!(" len={} writer={}", - v.value_data().data().len(), - v.value_data().writer(), - ) - }).unwrap_or_default(); - - let debug_string_answer = format!( - "OUT <== SetValueA({} #{}{}{} peers={})", - key, - subkey, - if set { - " +set" - } else { - "" - }, - debug_string_value, - peers.len(), - ); + + #[cfg(feature="debug-dht")] + { + let debug_string_value = value.as_ref().map(|v| { + format!(" len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + - log_rpc!(debug "{}", debug_string_answer); + let debug_string_answer = format!( + "OUT <== SetValueA({} #{}{}{} peers={}) <= {}", + key, + subkey, + if set { + " +set" + } else { + "" + }, + debug_string_value, + peers.len(), + dest, + ); + + log_rpc!(debug "{}", debug_string_answer); + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + log_rpc!(debug "Peers: {:#?}", peer_ids); + } // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { @@ -235,7 +242,7 @@ impl RPCProcessor { // If there are less than 'set_value_count' peers that are closer, then store here too let set_value_count = { let c = self.config.get(); - c.network.dht.set_value_fanout as usize + c.network.dht.set_value_count as usize }; let (set, new_value) = if closer_to_key_peers.len() >= set_value_count { // Not close enough diff --git a/veilid-flutter/lib/default_config.dart b/veilid-flutter/lib/default_config.dart index 660722cd..eac855bc 100644 --- a/veilid-flutter/lib/default_config.dart +++ b/veilid-flutter/lib/default_config.dart @@ -120,16 +120,16 @@ Future getDefaultVeilidConfig(String programName) async { defaultRouteHopCount: 1, ), dht: VeilidConfigDHT( + maxFindNodeCount: 20, resolveNodeTimeoutMs: 10000, resolveNodeCount: 1, resolveNodeFanout: 4, - maxFindNodeCount: 20, getValueTimeoutMs: 10000, getValueCount: 3, getValueFanout: 4, setValueTimeoutMs: 10000, - setValueCount: 4, - setValueFanout: 6, + setValueCount: 5, + setValueFanout: 4, minPeerCount: 20, minPeerRefreshTimeMs: 60000, validateDialInfoReceiptTimeMs: 2000, diff --git a/veilid-tools/run_tests.sh b/veilid-tools/run_tests.sh index be64d808..615f3e63 100755 --- a/veilid-tools/run_tests.sh +++ b/veilid-tools/run_tests.sh @@ -3,7 +3,7 @@ SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" pushd $SCRIPTDIR 2>/dev/null if [[ "$1" == "wasm" ]]; then - WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --features=rt-wasm-bindgen + WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --no-default-features --features=rt-wasm-bindgen elif [[ "$1" == "ios" ]]; then SYMROOT=/tmp/testout APPNAME=veilidtools-tests diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index 7dcaec02..cf3950b8 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -135,4 +135,35 @@ where // Return the locked guard AsyncTagLockGuard::new(self.clone(), tag, guard) } + + pub fn try_lock_tag(&self, tag: T) -> Option> { + // Get or create a tag lock entry + let mutex = { + let mut inner = self.inner.lock(); + + // See if this tag is in the table + // and if not, add a new mutex for this tag + let entry = inner + .table + .entry(tag.clone()) + .or_insert_with(|| AsyncTagLockTableEntry { + mutex: Arc::new(AsyncMutex::new(())), + waiters: 0, + }); + + // Increment the number of waiters + entry.waiters += 1; + + // Return the mutex associated with the tag + entry.mutex.clone() + + // Drop the table guard + }; + + // Lock the tag lock + let opt_guard = asyncmutex_try_lock_arc!(mutex); + + // Return the locked guard + opt_guard.map(|guard| AsyncTagLockGuard::new(self.clone(), tag, guard)) + } } diff --git a/veilid-tools/src/tests/common/test_async_tag_lock.rs b/veilid-tools/src/tests/common/test_async_tag_lock.rs index c1b2ecc4..caef3001 100644 --- a/veilid-tools/src/tests/common/test_async_tag_lock.rs +++ b/veilid-tools/src/tests/common/test_async_tag_lock.rs @@ -55,6 +55,29 @@ pub async fn test_simple_single_contention() { assert_eq!(table.len(), 1); } +pub async fn test_simple_try() { + info!("test_simple_try"); + + let table = AsyncTagLockTable::new(); + + let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); + let a2 = SocketAddr::new("1.2.3.5".parse().unwrap(), 1235); + + { + let _g1 = table.lock_tag(a1).await; + + let opt_g2 = table.try_lock_tag(a1); + let opt_g3 = table.try_lock_tag(a2); + + assert!(opt_g2.is_none()); + assert!(opt_g3.is_some()); + } + let opt_g4 = table.try_lock_tag(a1); + assert!(opt_g4.is_some()); + + assert_eq!(table.len(), 1); +} + pub async fn test_simple_double_contention() { info!("test_simple_double_contention"); @@ -153,6 +176,7 @@ pub async fn test_parallel_single_contention() { pub async fn test_all() { test_simple_no_contention().await; + test_simple_try().await; test_simple_single_contention().await; test_parallel_single_contention().await; } diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 92ce1444..76d0a2d8 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -47,6 +47,13 @@ cfg_if::cfg_if! { $x.clone().lock_owned().await }; } + + #[macro_export] + macro_rules! asyncmutex_try_lock_arc { + ($x:expr) => { + $x.try_lock_owned().ok() + }; + } } else { #[macro_export] macro_rules! asyncmutex_try_lock { @@ -60,6 +67,12 @@ cfg_if::cfg_if! { $x.lock_arc().await }; } + #[macro_export] + macro_rules! asyncmutex_try_lock_arc { + ($x:expr) => { + $x.try_lock_arc() + }; + } } } diff --git a/veilid-tools/tests/web.rs b/veilid-tools/tests/web.rs index e3997307..af72b762 100644 --- a/veilid-tools/tests/web.rs +++ b/veilid-tools/tests/web.rs @@ -4,7 +4,6 @@ use cfg_if::*; use parking_lot::Once; use veilid_tools::tests::*; -use veilid_tools::*; use wasm_bindgen_test::*;