From b51c660b9c05fc16ef706c2de30560533479104c Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 17 Mar 2024 16:00:34 -0400 Subject: [PATCH] add watchvalue test and some more routing context convenience functions --- .../example/integration_test/app_test.dart | 2 + .../example/integration_test/test_dht.dart | 154 ++++++++++++++++-- .../test_routing_context.dart | 9 +- veilid-flutter/lib/veilid.dart | 16 ++ veilid-flutter/lib/veilid_ffi.dart | 2 +- veilid-flutter/lib/veilid_js.dart | 2 +- veilid-python/tests/test_dht.py | 8 +- 7 files changed, 174 insertions(+), 19 deletions(-) diff --git a/veilid-flutter/example/integration_test/app_test.dart b/veilid-flutter/example/integration_test/app_test.dart index 001d0953..2af1db03 100644 --- a/veilid-flutter/example/integration_test/app_test.dart +++ b/veilid-flutter/example/integration_test/app_test.dart @@ -66,6 +66,8 @@ void main() { test('get dht value nonexistent', testGetDHTValueNonexistent); test('set get dht value', testSetGetDHTValue); test('open writer dht value', testOpenWriterDHTValue); + test( + 'watch dht values', () => testWatchDHTValues(fixture.updateStream)); test('inspect dht record', testInspectDHTRecord); }); }); diff --git a/veilid-flutter/example/integration_test/test_dht.dart b/veilid-flutter/example/integration_test/test_dht.dart index d2dcebbc..5af537c0 100644 --- a/veilid-flutter/example/integration_test/test_dht.dart +++ b/veilid-flutter/example/integration_test/test_dht.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'package:flutter_test/flutter_test.dart'; @@ -7,7 +8,8 @@ final bogusKey = TypedKey.fromString("VLD0:qD10lHHPD1_Qr23_Qy-1JnxTht12eaWwENVG_m2v7II"); Future testGetDHTValueUnopened() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { await expectLater( () async => await rc.getDHTValue(bogusKey, 0, forceRefresh: false), @@ -18,7 +20,8 @@ Future testGetDHTValueUnopened() async { } Future testOpenDHTRecordNonexistentNoWriter() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { await expectLater(() async => await rc.openDHTRecord(bogusKey), throwsA(isA())); @@ -28,7 +31,8 @@ Future testOpenDHTRecordNonexistentNoWriter() async { } Future testCloseDHTRecordNonexistent() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { await expectLater(() async => await rc.closeDHTRecord(bogusKey), throwsA(isA())); @@ -38,7 +42,8 @@ Future testCloseDHTRecordNonexistent() async { } Future testDeleteDHTRecordNonexistent() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { await expectLater(() async => await rc.deleteDHTRecord(bogusKey), throwsA(isA())); @@ -48,7 +53,8 @@ Future testDeleteDHTRecordNonexistent() async { } Future testCreateDeleteDHTRecordSimple() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1)); await rc.closeDHTRecord(rec.key); @@ -59,7 +65,8 @@ Future testCreateDeleteDHTRecordSimple() async { } Future testCreateDeleteDHTRecordNoClose() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1)); await rc.deleteDHTRecord(rec.key); @@ -69,7 +76,8 @@ Future testCreateDeleteDHTRecordNoClose() async { } Future testGetDHTValueNonexistent() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1)); expect(await rc.getDHTValue(rec.key, 0), isNull); @@ -80,7 +88,8 @@ Future testGetDHTValueNonexistent() async { } Future testSetGetDHTValue() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2)); expect(await rc.setDHTValue(rec.key, 0, utf8.encode("BLAH BLAH BLAH")), @@ -103,7 +112,8 @@ Future testSetGetDHTValue() async { } Future testOpenWriterDHTValue() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2)); final key = rec.key; @@ -223,8 +233,132 @@ Future testOpenWriterDHTValue() async { } } +Future testWatchDHTValues(Stream updateStream) async { + final valueChangeQueue = StreamController(); + final valueChangeSubscription = updateStream.listen((update) { + if (update is VeilidUpdateValueChange) { + // print("valuechange: " + update.toString()); + valueChangeQueue.sink.add(update); + } + }); + final valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); + + try { + // Make two routing contexts, one with and one without safety + // So we can pretend to be a different node and get the watch updates + // Normally they would not get sent if the set comes from the same target + // as the watch's target + + final rcWatch = (await Veilid.instance.routingContext()) + .withSequencing(Sequencing.ensureOrdered, closeSelf: true); + final rcSet = (await Veilid.instance.routingContext()).withSafety( + const SafetySelectionUnsafe(sequencing: Sequencing.ensureOrdered), + closeSelf: true); + try { + // Make a DHT record + var rec = await rcWatch.createDHTRecord(const DHTSchema.dflt(oCnt: 10)); + + // Set some subkey we care about + expect( + await rcWatch.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")), + isNull); + + // Make a watch on that subkey + expect(await rcWatch.watchDHTValues(rec.key), + isNot(equals(Timestamp.zero()))); + + // Reopen without closing to change routing context and not lose watch + rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); + + // Now set the subkey and trigger an update + expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); + + // Wait for the update + await valueChangeQueueIterator + .moveNext() + .timeout(const Duration(seconds: 5), onTimeout: () { + fail("should have a change"); + }); + + // Verify the update + expect(valueChangeQueueIterator.current.key, equals(rec.key)); + expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFE)); + expect(valueChangeQueueIterator.current.subkeys, + equals([ValueSubkeyRange.single(3)])); + expect(valueChangeQueueIterator.current.value.seq, equals(1)); + expect(valueChangeQueueIterator.current.value.data, + equals(utf8.encode("BLAH"))); + expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner)); + + // Reopen without closing to change routing context and not lose watch + rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); + + // Cancel some subkeys we don't care about + expect( + await rcWatch + .cancelDHTWatch(rec.key, subkeys: [ValueSubkeyRange.make(0, 2)]), + isTrue); + + // Reopen without closing to change routing context and not lose watch + rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); + + // Change our subkey + expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")), + isNull); + + // Wait for the update + await valueChangeQueueIterator + .moveNext() + .timeout(const Duration(seconds: 5), onTimeout: () { + fail("should have a change"); + }); + + // Verify the update + expect(valueChangeQueueIterator.current.key, equals(rec.key)); + expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD)); + expect(valueChangeQueueIterator.current.subkeys, + equals([ValueSubkeyRange.single(3)])); + expect(valueChangeQueueIterator.current.value.seq, equals(2)); + expect(valueChangeQueueIterator.current.value.data, + equals(utf8.encode("BLAH BLAH BLAH"))); + expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner)); + + // Reopen without closing to change routing context and not lose watch + rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); + + // Now cancel the update + expect( + await rcWatch + .cancelDHTWatch(rec.key, subkeys: [ValueSubkeyRange.make(3, 9)]), + isFalse); + + // Reopen without closing to change routing context and not lose watch + rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); + + expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); + + if (await valueChangeQueueIterator + .moveNext() + .timeout(const Duration(seconds: 5), onTimeout: () { + return false; + })) { + fail("should not have a change"); + } + + await rcSet.closeDHTRecord(rec.key); + await rcSet.deleteDHTRecord(rec.key); + } finally { + rcWatch.close(); + rcSet.close(); + } + } finally { + await valueChangeSubscription.cancel(); + } +} + Future testInspectDHTRecord() async { - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2)); diff --git a/veilid-flutter/example/integration_test/test_routing_context.dart b/veilid-flutter/example/integration_test/test_routing_context.dart index dc8dee4c..cfc0367c 100644 --- a/veilid-flutter/example/integration_test/test_routing_context.dart +++ b/veilid-flutter/example/integration_test/test_routing_context.dart @@ -55,7 +55,8 @@ Future testAppMessageLoopback(Stream updateStream) async { await Veilid.instance.debug("purge routes"); // make a routing context that uses a safety route - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { // make a new local private route final prl = await Veilid.instance.newPrivateRoute(); @@ -96,7 +97,8 @@ Future testAppCallLoopback(Stream updateStream) async { await Veilid.instance.debug("purge routes"); // make a routing context that uses a safety route - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { // make a new local private route final prl = await Veilid.instance.newPrivateRoute(); @@ -153,7 +155,8 @@ Future testAppMessageLoopbackBigPackets( await Veilid.instance.debug("purge routes"); // make a routing context that uses a safety route - final rc = await Veilid.instance.routingContext(); + final rc = await Veilid.instance + .safeRoutingContext(sequencing: Sequencing.ensureOrdered); try { // make a new local private route final prl = await Veilid.instance.newPrivateRoute(); diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index d31c0a44..7f0d9aea 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -63,6 +63,7 @@ class VeilidVersion extends Equatable { @immutable class Timestamp extends Equatable implements Comparable { const Timestamp({required this.value}); + factory Timestamp.zero() => Timestamp(value: BigInt.zero); factory Timestamp.fromInt64(Int64 i64) => Timestamp( value: (BigInt.from((i64 >> 32).toUnsigned(32).toInt()) << 32) | BigInt.from(i64.toUnsigned(32).toInt())); @@ -144,6 +145,21 @@ abstract class Veilid { // Routing context Future routingContext(); + Future safeRoutingContext( + {Stability stability = Stability.lowLatency, + Sequencing sequencing = Sequencing.preferOrdered}) async { + final rc = await routingContext(); + final originalSafety = await rc.safety() as SafetySelectionSafe; + final safetySpec = originalSafety.safetySpec + .copyWith(stability: stability, sequencing: sequencing); + return rc.withSafety(SafetySelectionSafe(safetySpec: safetySpec), + closeSelf: true); + } + + Future unsafeRoutingContext( + {Sequencing sequencing = Sequencing.preferOrdered}) async => + (await routingContext()) + .withSafety(SafetySelectionUnsafe(sequencing: sequencing)); // Private route allocation Future newPrivateRoute(); diff --git a/veilid-flutter/lib/veilid_ffi.dart b/veilid-flutter/lib/veilid_ffi.dart index 4f8f8b81..fa04e20c 100644 --- a/veilid-flutter/lib/veilid_ffi.dart +++ b/veilid-flutter/lib/veilid_ffi.dart @@ -702,7 +702,7 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext { Timestamp? expiration, int? count}) async { subkeys ??= []; - expiration ??= Timestamp(value: BigInt.zero); + expiration ??= Timestamp.zero(); count ??= 0xFFFFFFFF; _ctx.ensureValid(); diff --git a/veilid-flutter/lib/veilid_js.dart b/veilid-flutter/lib/veilid_js.dart index 12c613c2..951bf2f7 100644 --- a/veilid-flutter/lib/veilid_js.dart +++ b/veilid-flutter/lib/veilid_js.dart @@ -207,7 +207,7 @@ class VeilidRoutingContextJS extends VeilidRoutingContext { Timestamp? expiration, int? count}) async { subkeys ??= []; - expiration ??= Timestamp(value: BigInt.zero); + expiration ??= Timestamp.zero(); count ??= 0xFFFFFFFF; final id = _ctx.requireId(); diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 4f98d959..ec3015a9 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -218,14 +218,14 @@ async def test_watch_dht_values(api_connection: veilid.VeilidAPI): vd = await rc.set_dht_value(rec.key, 3, b"BLAH") assert vd == None - all_gone = await rc.cancel_dht_watch(rec.key, [(0, 2)]) - assert all_gone == True + still_active = await rc.cancel_dht_watch(rec.key, [(0, 2)]) + assert still_active == True vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") assert vd == None - all_gone = await rc.cancel_dht_watch(rec.key, [(3, 9)]) - assert all_gone == False + still_active = await rc.cancel_dht_watch(rec.key, [(3, 9)]) + assert still_active == False vd = await rc.set_dht_value(rec.key, 3, b"BLAH") assert vd == None