checkpoint

This commit is contained in:
Christien Rioux 2024-01-04 22:29:31 -05:00
parent 92cb5a07cf
commit 37ed0239f3
5 changed files with 47 additions and 26 deletions

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
impl StorageManager { impl StorageManager {
// Flush records stores to disk and remove dead records and send watch notifications // Check if watches either have dead nodes or if the watch has expired
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(super) async fn check_active_watches_task_routine( pub(super) async fn check_active_watches_task_routine(
self, self,
@ -44,8 +44,15 @@ impl StorageManager {
} }
} }
} }
// See if the watch is expired
if !is_dead && active_watch.expiration_ts <= cur_ts {
// Watch has expired
is_dead = true;
}
if is_dead { if is_dead {
v.clear_active_watch();
if let Some(update_callback) = opt_update_callback.clone() { if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys // Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
@ -55,8 +62,6 @@ impl StorageManager {
value: ValueData::default(), value: ValueData::default(),
}))); })));
} }
v.clear_active_watch();
} }
} }
} }

View File

@ -61,10 +61,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: collection name: collection
sha256: f092b211a4319e98e5ff58223576de6c2803db36221657b46c82574721240687 sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.17.2" version: "1.18.0"
convert: convert:
dependency: transitive dependency: transitive
description: description:
@ -220,10 +220,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: meta name: meta
sha256: "3c74dbf8763d36539f114c799d8a2d87343b5067e9d796ca22b5eb8437090ee3" sha256: a6e590c838b18133bb482a2745ad77c5bb7715fb0451209e1a7567d416678b8e
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.9.1" version: "1.10.0"
path: path:
dependency: "direct main" dependency: "direct main"
description: description:
@ -329,18 +329,18 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: stack_trace name: stack_trace
sha256: c3c7d8edb15bee7f0f74debd4b9c5f3c2ea86766fe4178eb2a18eb30a0bdaed5 sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.11.0" version: "1.11.1"
stream_channel: stream_channel:
dependency: transitive dependency: transitive
description: description:
name: stream_channel name: stream_channel
sha256: "83615bee9045c1d322bbbd1ba209b7a749c2cbcdcb3fdd1df8eb488b3279c1c8" sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.1.1" version: "2.1.2"
string_scanner: string_scanner:
dependency: transitive dependency: transitive
description: description:
@ -377,10 +377,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: test_api name: test_api
sha256: "75760ffd7786fffdfb9597c35c5b27eaeec82be8edfb6d71d32651128ed7aab8" sha256: "5c2f730018264d276c20e4f1503fd1308dfbbae39ec8ee63c5236311ac06954b"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.6.0" version: "0.6.1"
typed_data: typed_data:
dependency: transitive dependency: transitive
description: description:
@ -408,10 +408,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: web name: web
sha256: dc8ccd225a2005c1be616fe02951e2e342092edf968cf0844220383757ef8f10 sha256: afe077240a270dcfd2aafe77602b4113645af95d0ad31128cc02bce5ac5d5152
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.1.4-beta" version: "0.3.0"
win32: win32:
dependency: transitive dependency: transitive
description: description:
@ -437,5 +437,5 @@ packages:
source: hosted source: hosted
version: "3.5.0" version: "3.5.0"
sdks: sdks:
dart: ">=3.1.0-185.0.dev <4.0.0" dart: ">=3.2.0-194.0.dev <4.0.0"
flutter: ">=3.10.6" flutter: ">=3.10.6"

View File

@ -256,7 +256,7 @@ abstract class VeilidRoutingContext {
Future<void> deleteDHTRecord(TypedKey key); Future<void> deleteDHTRecord(TypedKey key);
Future<ValueData?> getDHTValue(TypedKey key, int subkey, bool forceRefresh); Future<ValueData?> getDHTValue(TypedKey key, int subkey, bool forceRefresh);
Future<ValueData?> setDHTValue(TypedKey key, int subkey, Uint8List data); Future<ValueData?> setDHTValue(TypedKey key, int subkey, Uint8List data);
Future<Timestamp> watchDHTValues(TypedKey key, List<ValueSubkeyRange> subkeys, Future<Timestamp> watchDHTValues(TypedKey key,
Timestamp expiration, int count); {List<ValueSubkeyRange>? subkeys, Timestamp? expiration, int? count});
Future<bool> cancelDHTWatch(TypedKey key, List<ValueSubkeyRange> subkeys); Future<bool> cancelDHTWatch(TypedKey key, {List<ValueSubkeyRange>? subkeys});
} }

View File

@ -671,8 +671,14 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext {
} }
@override @override
Future<Timestamp> watchDHTValues(TypedKey key, List<ValueSubkeyRange> subkeys, Future<Timestamp> watchDHTValues(TypedKey key,
Timestamp expiration, int count) async { {List<ValueSubkeyRange>? subkeys,
Timestamp? expiration,
int? count}) async {
subkeys ??= [];
expiration ??= Timestamp(value: BigInt.zero);
count ??= 0xFFFFFFFF;
_ctx.ensureValid(); _ctx.ensureValid();
final nativeKey = jsonEncode(key).toNativeUtf8(); final nativeKey = jsonEncode(key).toNativeUtf8();
final nativeSubkeys = jsonEncode(subkeys).toNativeUtf8(); final nativeSubkeys = jsonEncode(subkeys).toNativeUtf8();
@ -688,8 +694,10 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext {
} }
@override @override
Future<bool> cancelDHTWatch( Future<bool> cancelDHTWatch(TypedKey key,
TypedKey key, List<ValueSubkeyRange> subkeys) async { {List<ValueSubkeyRange>? subkeys}) async {
subkeys ??= [];
_ctx.ensureValid(); _ctx.ensureValid();
final nativeKey = jsonEncode(key).toNativeUtf8(); final nativeKey = jsonEncode(key).toNativeUtf8();
final nativeSubkeys = jsonEncode(subkeys).toNativeUtf8(); final nativeSubkeys = jsonEncode(subkeys).toNativeUtf8();

View File

@ -185,8 +185,14 @@ class VeilidRoutingContextJS extends VeilidRoutingContext {
} }
@override @override
Future<Timestamp> watchDHTValues(TypedKey key, List<ValueSubkeyRange> subkeys, Future<Timestamp> watchDHTValues(TypedKey key,
Timestamp expiration, int count) async { {List<ValueSubkeyRange>? subkeys,
Timestamp? expiration,
int? count}) async {
subkeys ??= [];
expiration ??= Timestamp(value: BigInt.zero);
count ??= 0xFFFFFFFF;
final id = _ctx.requireId(); final id = _ctx.requireId();
final ts = await _wrapApiPromise<String>(js_util.callMethod( final ts = await _wrapApiPromise<String>(js_util.callMethod(
wasm, 'routing_context_watch_dht_values', [ wasm, 'routing_context_watch_dht_values', [
@ -200,7 +206,9 @@ class VeilidRoutingContextJS extends VeilidRoutingContext {
} }
@override @override
Future<bool> cancelDHTWatch(TypedKey key, List<ValueSubkeyRange> subkeys) { Future<bool> cancelDHTWatch(TypedKey key, {List<ValueSubkeyRange>? subkeys}) {
subkeys ??= [];
final id = _ctx.requireId(); final id = _ctx.requireId();
return _wrapApiPromise(js_util.callMethod( return _wrapApiPromise(js_util.callMethod(
wasm, wasm,