checkpoint

This commit is contained in:
John Smith 2023-04-16 13:16:00 -04:00
parent 0769d032fc
commit 00b0edf687
8 changed files with 173 additions and 68 deletions

View File

@ -22,7 +22,7 @@ pub struct RecordStore {
dead_records: Vec<(RecordTableKey, Record)>,
changed_records: HashSet<RecordTableKey>,
purge_dead_records_mutex: AsyncMutex<()>,
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
}
impl RecordStore {
@ -40,7 +40,7 @@ impl RecordStore {
total_storage_space: 0,
dead_records: Vec::new(),
changed_records: HashSet::new(),
purge_dead_records_mutex: AsyncMutex::new(()),
purge_dead_records_mutex: Arc::new(AsyncMutex::new(())),
}
}
@ -79,7 +79,7 @@ impl RecordStore {
dead_records.push((k, v));
}) {
// This shouldn't happen, but deduplicate anyway
log_stor!(warn "duplicate record in table: {}", ri.0);
log_stor!(warn "duplicate record in table: {:?}", ri.0);
dead_records.push((ri.0, v));
}
}
@ -132,8 +132,9 @@ impl RecordStore {
}
async fn purge_dead_records(&mut self, lazy: bool) {
let purge_dead_records_mutex = self.purge_dead_records_mutex.clone();
let lock = if lazy {
match self.purge_dead_records_mutex.try_lock().await {
match purge_dead_records_mutex.try_lock() {
Ok(v) => v,
Err(_) => {
// If not ready now, just skip it if we're lazy
@ -142,7 +143,7 @@ impl RecordStore {
}
} else {
// Not lazy, must wait
self.purge_dead_records_mutex.lock().await;
purge_dead_records_mutex.lock().await
};
// Delete dead keys
@ -251,7 +252,7 @@ impl RecordStore {
dead_records.push((k, v));
}) {
// Shouldn't happen but log it
log_stor!(warn "new duplicate record in table: {}", rtk);
log_stor!(warn "new duplicate record in table: {:?}", rtk);
self.add_dead_record(rtk, v);
}
for dr in dead_records {

View File

@ -211,10 +211,10 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.dht.resolve_node_count" => Ok(Box::new(20u32)),
"network.dht.resolve_node_fanout" => Ok(Box::new(3u32)),
"network.dht.max_find_node_count" => Ok(Box::new(20u32)),
"network.dht.get_value_timeout_ms" => Ok(Box::new(Option::<u32>::None)),
"network.dht.get_value_timeout_ms" => Ok(Box::new(10u32)),
"network.dht.get_value_count" => Ok(Box::new(20u32)),
"network.dht.get_value_fanout" => Ok(Box::new(3u32)),
"network.dht.set_value_timeout_ms" => Ok(Box::new(Option::<u32>::None)),
"network.dht.set_value_timeout_ms" => Ok(Box::new(10u32)),
"network.dht.set_value_count" => Ok(Box::new(20u32)),
"network.dht.set_value_fanout" => Ok(Box::new(5u32)),
"network.dht.min_peer_count" => Ok(Box::new(20u32)),
@ -317,7 +317,7 @@ pub async fn test_config() {
assert_eq!(inner.network.hole_punch_receipt_time_ms, 5_000u32);
assert_eq!(inner.network.rpc.concurrency, 2u32);
assert_eq!(inner.network.rpc.queue_size, 1024u32);
assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);
assert_eq!(inner.network.rpc.timeout_ms, 5_000u32);
assert_eq!(inner.network.rpc.max_route_hop_count, 4u8);
assert_eq!(inner.network.rpc.default_route_hop_count, 1u8);
assert_eq!(inner.network.routing_table.node_id.len(), 0);
@ -329,16 +329,13 @@ pub async fn test_config() {
assert_eq!(inner.network.routing_table.limit_attached_good, 8u32);
assert_eq!(inner.network.routing_table.limit_attached_weak, 4u32);
assert_eq!(
inner.network.dht.resolve_node_timeout_ms,
Option::<u32>::None
);
assert_eq!(inner.network.dht.resolve_node_timeout_ms, 10_000u32);
assert_eq!(inner.network.dht.resolve_node_count, 20u32);
assert_eq!(inner.network.dht.resolve_node_fanout, 3u32);
assert_eq!(inner.network.dht.get_value_timeout_ms, Option::<u32>::None);
assert_eq!(inner.network.dht.get_value_timeout_ms, 10_000u32);
assert_eq!(inner.network.dht.get_value_count, 20u32);
assert_eq!(inner.network.dht.get_value_fanout, 3u32);
assert_eq!(inner.network.dht.set_value_timeout_ms, Option::<u32>::None);
assert_eq!(inner.network.dht.set_value_timeout_ms, 10_000u32);
assert_eq!(inner.network.dht.set_value_count, 20u32);
assert_eq!(inner.network.dht.set_value_fanout, 5u32);
assert_eq!(inner.network.dht.min_peer_count, 20u32);

View File

@ -2557,7 +2557,7 @@ impl DHTSchemaSMPL {
/// Get the data size of this schema beyond the size of the structure itself
pub fn data_size(&self) -> usize {
self.members.len() * mem::size_of::<DHTSchemaSMPLMember>
self.members.len() * mem::size_of::<DHTSchemaSMPLMember>()
}
}

View File

@ -92,6 +92,13 @@ packages:
url: "https://pub.dartlang.org"
source: hosted
version: "6.1.4"
file_utils:
dependency: transitive
description:
name: file_utils
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.1"
flutter:
dependency: "direct main"
description: flutter
@ -121,6 +128,13 @@ packages:
description: flutter
source: sdk
version: "0.0.0"
globbing:
dependency: transitive
description:
name: globbing
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.0"
js:
dependency: transitive
description:
@ -287,6 +301,13 @@ packages:
url: "https://pub.dartlang.org"
source: hosted
version: "1.1.1"
system_info2:
dependency: transitive
description:
name: system_info2
url: "https://pub.dartlang.org"
source: hosted
version: "3.0.2"
term_glyph:
dependency: transitive
description:

View File

@ -1,8 +1,53 @@
import 'package:flutter/foundation.dart' show kIsWeb;
import 'package:path_provider/path_provider.dart';
import 'package:path/path.dart' as p;
import 'package:system_info2/system_info2.dart' as sysinfo;
import 'veilid.dart';
const int megaByte = 1024 * 1024;
int getLocalSubkeyCacheSize() {
if (kIsWeb) {
return 128;
}
return 1024;
}
int getLocalMaxSubkeyCacheMemoryMb() {
if (kIsWeb) {
return 256;
}
return sysinfo.SysInfo.getTotalPhysicalMemory() ~/ 32 ~/ megaByte;
}
int getRemoteSubkeyCacheSize() {
if (kIsWeb) {
return 64;
}
return 128;
}
int getRemoteMaxRecords() {
if (kIsWeb) {
return 64;
}
return 128;
}
int getRemoteMaxSubkeyCacheMemoryMb() {
if (kIsWeb) {
return 256;
}
return sysinfo.SysInfo.getTotalPhysicalMemory() ~/ 32 ~/ megaByte;
}
int getRemoteMaxStorageSpaceMb() {
if (kIsWeb) {
return 128;
}
return 256;
}
Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
return VeilidConfig(
programName: programName,
@ -63,25 +108,30 @@ Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
queueSize: 1024,
maxTimestampBehindMs: 10000,
maxTimestampAheadMs: 10000,
timeoutMs: 10000,
timeoutMs: 5000,
maxRouteHopCount: 4,
defaultRouteHopCount: 1,
),
dht: VeilidConfigDHT(
resolveNodeTimeoutMs: null,
resolveNodeCount: 20,
resolveNodeFanout: 3,
maxFindNodeCount: 20,
getValueTimeoutMs: null,
getValueCount: 20,
getValueFanout: 3,
setValueTimeoutMs: null,
setValueCount: 20,
setValueFanout: 5,
minPeerCount: 20,
minPeerRefreshTimeMs: 2000,
validateDialInfoReceiptTimeMs: 2000,
),
resolveNodeTimeoutMs: 10000,
resolveNodeCount: 20,
resolveNodeFanout: 3,
maxFindNodeCount: 20,
getValueTimeoutMs: 10000,
getValueCount: 20,
getValueFanout: 3,
setValueTimeoutMs: 10000,
setValueCount: 20,
setValueFanout: 5,
minPeerCount: 20,
minPeerRefreshTimeMs: 2000,
validateDialInfoReceiptTimeMs: 2000,
localSubkeyCacheSize: getLocalSubkeyCacheSize(),
localMaxSubkeyCacheMemoryMb: getLocalMaxSubkeyCacheMemoryMb(),
remoteSubkeyCacheSize: getRemoteSubkeyCacheSize(),
remoteMaxRecords: getRemoteMaxRecords(),
remoteMaxSubkeyCacheMemoryMb: getRemoteMaxSubkeyCacheMemoryMb(),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb()),
upnp: true,
detectAddressChanges: true,
restrictedNatRetries: 0,

View File

@ -588,34 +588,46 @@ class VeilidConfigTLS {
////////////
class VeilidConfigDHT {
int? resolveNodeTimeoutMs;
int resolveNodeTimeoutMs;
int resolveNodeCount;
int resolveNodeFanout;
int maxFindNodeCount;
int? getValueTimeoutMs;
int getValueTimeoutMs;
int getValueCount;
int getValueFanout;
int? setValueTimeoutMs;
int setValueTimeoutMs;
int setValueCount;
int setValueFanout;
int minPeerCount;
int minPeerRefreshTimeMs;
int validateDialInfoReceiptTimeMs;
int localSubkeyCacheSize;
int localMaxSubkeyCacheMemoryMb;
int remoteSubkeyCacheSize;
int remoteMaxRecords;
int remoteMaxSubkeyCacheMemoryMb;
int remoteMaxStorageSpaceMb;
VeilidConfigDHT(
{this.resolveNodeTimeoutMs,
{required this.resolveNodeTimeoutMs,
required this.resolveNodeCount,
required this.resolveNodeFanout,
required this.maxFindNodeCount,
this.getValueTimeoutMs,
required this.getValueTimeoutMs,
required this.getValueCount,
required this.getValueFanout,
this.setValueTimeoutMs,
required this.setValueTimeoutMs,
required this.setValueCount,
required this.setValueFanout,
required this.minPeerCount,
required this.minPeerRefreshTimeMs,
required this.validateDialInfoReceiptTimeMs});
required this.validateDialInfoReceiptTimeMs,
required this.localSubkeyCacheSize,
required this.localMaxSubkeyCacheMemoryMb,
required this.remoteSubkeyCacheSize,
required this.remoteMaxRecords,
required this.remoteMaxSubkeyCacheMemoryMb,
required this.remoteMaxStorageSpaceMb});
Map<String, dynamic> get json {
return {
@ -631,7 +643,13 @@ class VeilidConfigDHT {
'set_value_fanout': setValueFanout,
'min_peer_count': minPeerCount,
'min_peer_refresh_time_ms': minPeerRefreshTimeMs,
'validate_dial_info_receipt_time_ms': validateDialInfoReceiptTimeMs
'validate_dial_info_receipt_time_ms': validateDialInfoReceiptTimeMs,
'local_subkey_cache_size: 128': localSubkeyCacheSize,
'local_max_subkey_cache_memory_mb': localMaxSubkeyCacheMemoryMb,
'remote_subkey_cache_size': remoteSubkeyCacheSize,
'remote_max_records': remoteMaxRecords,
'remote_max_subkey_cache_memory_mb': remoteMaxSubkeyCacheMemoryMb,
'remote_max_storage_space_mb': remoteMaxStorageSpaceMb,
};
}
@ -649,7 +667,14 @@ class VeilidConfigDHT {
minPeerCount = json['min_peer_count'],
minPeerRefreshTimeMs = json['min_peer_refresh_time_ms'],
validateDialInfoReceiptTimeMs =
json['validate_dial_info_receipt_time_ms'];
json['validate_dial_info_receipt_time_ms'],
localSubkeyCacheSize = json['local_subkey_cache_size'],
localMaxSubkeyCacheMemoryMb = json['local_max_subkey_cache_memory_mb'],
remoteSubkeyCacheSize = json['remote_subkey_cache_size'],
remoteMaxRecords = json['remote_max_records'],
remoteMaxSubkeyCacheMemoryMb =
json['remote_max_subkey_cache_memory_mb'],
remoteMaxStorageSpaceMb = json['remote_max_storage_space_mb'];
}
////////////

View File

@ -17,6 +17,7 @@ dependencies:
change_case: ^1.0.1
path_provider: ^2.0.9
path: ^1.8.0
system_info2: ^3.0.2
dev_dependencies:
flutter_test:

View File

@ -79,18 +79,18 @@ core:
queue_size: 1024
max_timestamp_behind_ms: 10000
max_timestamp_ahead_ms: 10000
timeout_ms: 10000
timeout_ms: 5000
max_route_hop_count: 4
default_route_hop_count: 1
dht:
resolve_node_timeout:
resolve_node_timeout: 10000
resolve_node_count: 20
resolve_node_fanout: 3
max_find_node_count: 20
get_value_timeout:
get_value_timeout: 10000
get_value_count: 20
get_value_fanout: 3
set_value_timeout:
set_value_timeout: 10000
set_value_count: 20
set_value_fanout: 5
min_peer_count: 20
@ -175,7 +175,7 @@ core:
.replace(
"%REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%",
&Settings::get_default_remote_max_subkey_cache_memory_mb().to_string_lossy(),
)
);
config::Config::builder()
.add_source(config::File::from_str(
&default_config,
@ -514,10 +514,10 @@ pub struct Dht {
pub resolve_node_count: u32,
pub resolve_node_fanout: u32,
pub max_find_node_count: u32,
pub get_value_timeout_ms: Option<u32>,
pub get_value_timeout_ms: u32,
pub get_value_count: u32,
pub get_value_fanout: u32,
pub set_value_timeout_ms: Option<u32>,
pub set_value_timeout_ms: u32,
pub set_value_count: u32,
pub set_value_fanout: u32,
pub min_peer_count: u32,
@ -642,7 +642,8 @@ impl Settings {
// Fill in missing defaults
if inner.core.network.dht.remote_max_storage_space_mb == 0 {
inner.core.network.dht.remote_max_storage_space_mb = Self::get_default_remote_max_storage_space_mb(&inner);
inner.core.network.dht.remote_max_storage_space_mb =
Self::get_default_remote_max_storage_space_mb(&inner);
}
//
@ -865,8 +866,11 @@ impl Settings {
let dht_storage_path = inner.core.table_store.directory.clone();
let mut available_mb = 0usize;
// Sort longer mount point paths first since we want the mount point closest to our table store directory
sys.sort_disks_by(|a,b| {
b.mount_point().to_string_lossy().len().cmp(&a.mount_point().to_string_lossy().len())
sys.sort_disks_by(|a, b| {
b.mount_point()
.to_string_lossy()
.len()
.cmp(&a.mount_point().to_string_lossy().len())
});
for disk in sys.disks() {
if dht_storage_path.starts_with(disk.mount_point()) {
@ -988,10 +992,16 @@ impl Settings {
value
);
set_config_value!(inner.core.network.dht.local_subkey_cache_size, value);
set_config_value!(inner.core.network.dht.local_max_subkey_cache_memory_mb, value);
set_config_value!(
inner.core.network.dht.local_max_subkey_cache_memory_mb,
value
);
set_config_value!(inner.core.network.dht.remote_subkey_cache_size, value);
set_config_value!(inner.core.network.dht.remote_max_records, value);
set_config_value!(inner.core.network.dht.remote_max_subkey_cache_memory_mb, value);
set_config_value!(
inner.core.network.dht.remote_max_subkey_cache_memory_mb,
value
);
set_config_value!(inner.core.network.dht.remote_max_storage_space_mb, value);
set_config_value!(inner.core.network.upnp, value);
set_config_value!(inner.core.network.detect_address_changes, value);
@ -1201,25 +1211,25 @@ impl Settings {
"network.dht.validate_dial_info_receipt_time_ms" => Ok(Box::new(
inner.core.network.dht.validate_dial_info_receipt_time_ms,
)),
"network.dht.local_subkey_cache_size" => Ok(Box::new(
inner.core.network.dht.local_subkey_cache_size,
)),
"network.dht.local_subkey_cache_size" => {
Ok(Box::new(inner.core.network.dht.local_subkey_cache_size))
}
"network.dht.local_max_subkey_cache_memory_mb" => Ok(Box::new(
inner.core.network.dht.local_max_subkey_cache_memory_mb,
)),
"network.dht.remote_subkey_cache_size" => Ok(Box::new(
inner.core.network.dht.remote_subkey_cache_size
)),
"network.dht.remote_max_records" => Ok(Box::new(
inner.core.network.dht.remote_max_records,
)),
"network.dht.remote_subkey_cache_size" => {
Ok(Box::new(inner.core.network.dht.remote_subkey_cache_size))
}
"network.dht.remote_max_records" => {
Ok(Box::new(inner.core.network.dht.remote_max_records))
}
"network.dht.remote_max_subkey_cache_memory_mb" => Ok(Box::new(
inner.core.network.dht.remote_max_subkey_cache_memory_mb,
)),
"network.dht.remote_max_storage_space_mb" => Ok(Box::new(
inner.core.network.dht.remote_max_storage_space_mb,
)),
"network.dht.remote_max_storage_space_mb" => {
Ok(Box::new(inner.core.network.dht.remote_max_storage_space_mb))
}
"network.upnp" => Ok(Box::new(inner.core.network.upnp)),
"network.detect_address_changes" => {
Ok(Box::new(inner.core.network.detect_address_changes))
@ -1521,7 +1531,7 @@ mod tests {
assert_eq!(s.core.network.rpc.queue_size, 1024);
assert_eq!(s.core.network.rpc.max_timestamp_behind_ms, Some(10_000u32));
assert_eq!(s.core.network.rpc.max_timestamp_ahead_ms, Some(10_000u32));
assert_eq!(s.core.network.rpc.timeout_ms, 10_000u32);
assert_eq!(s.core.network.rpc.timeout_ms, 5_000u32);
assert_eq!(s.core.network.rpc.max_route_hop_count, 4);
assert_eq!(s.core.network.rpc.default_route_hop_count, 1);
//
@ -1529,10 +1539,10 @@ mod tests {
assert_eq!(s.core.network.dht.resolve_node_count, 20u32);
assert_eq!(s.core.network.dht.resolve_node_fanout, 3u32);
assert_eq!(s.core.network.dht.max_find_node_count, 20u32);
assert_eq!(s.core.network.dht.get_value_timeout_ms, None);
assert_eq!(s.core.network.dht.get_value_timeout_ms, 10_000u32);
assert_eq!(s.core.network.dht.get_value_count, 20u32);
assert_eq!(s.core.network.dht.get_value_fanout, 3u32);
assert_eq!(s.core.network.dht.set_value_timeout_ms, None);
assert_eq!(s.core.network.dht.set_value_timeout_ms, 10_000u32);
assert_eq!(s.core.network.dht.set_value_count, 20u32);
assert_eq!(s.core.network.dht.set_value_fanout, 5u32);
assert_eq!(s.core.network.dht.min_peer_count, 20u32);