mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-13 00:09:47 -05:00
add watchvalue test and some more routing context convenience functions
This commit is contained in:
parent
30cc4a814b
commit
b51c660b9c
@ -66,6 +66,8 @@ void main() {
|
|||||||
test('get dht value nonexistent', testGetDHTValueNonexistent);
|
test('get dht value nonexistent', testGetDHTValueNonexistent);
|
||||||
test('set get dht value', testSetGetDHTValue);
|
test('set get dht value', testSetGetDHTValue);
|
||||||
test('open writer dht value', testOpenWriterDHTValue);
|
test('open writer dht value', testOpenWriterDHTValue);
|
||||||
|
test(
|
||||||
|
'watch dht values', () => testWatchDHTValues(fixture.updateStream));
|
||||||
test('inspect dht record', testInspectDHTRecord);
|
test('inspect dht record', testInspectDHTRecord);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import 'dart:async';
|
||||||
import 'dart:convert';
|
import 'dart:convert';
|
||||||
|
|
||||||
import 'package:flutter_test/flutter_test.dart';
|
import 'package:flutter_test/flutter_test.dart';
|
||||||
@ -7,7 +8,8 @@ final bogusKey =
|
|||||||
TypedKey.fromString("VLD0:qD10lHHPD1_Qr23_Qy-1JnxTht12eaWwENVG_m2v7II");
|
TypedKey.fromString("VLD0:qD10lHHPD1_Qr23_Qy-1JnxTht12eaWwENVG_m2v7II");
|
||||||
|
|
||||||
Future<void> testGetDHTValueUnopened() async {
|
Future<void> testGetDHTValueUnopened() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
await expectLater(
|
await expectLater(
|
||||||
() async => await rc.getDHTValue(bogusKey, 0, forceRefresh: false),
|
() async => await rc.getDHTValue(bogusKey, 0, forceRefresh: false),
|
||||||
@ -18,7 +20,8 @@ Future<void> testGetDHTValueUnopened() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testOpenDHTRecordNonexistentNoWriter() async {
|
Future<void> testOpenDHTRecordNonexistentNoWriter() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
await expectLater(() async => await rc.openDHTRecord(bogusKey),
|
await expectLater(() async => await rc.openDHTRecord(bogusKey),
|
||||||
throwsA(isA<VeilidAPIException>()));
|
throwsA(isA<VeilidAPIException>()));
|
||||||
@ -28,7 +31,8 @@ Future<void> testOpenDHTRecordNonexistentNoWriter() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testCloseDHTRecordNonexistent() async {
|
Future<void> testCloseDHTRecordNonexistent() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
await expectLater(() async => await rc.closeDHTRecord(bogusKey),
|
await expectLater(() async => await rc.closeDHTRecord(bogusKey),
|
||||||
throwsA(isA<VeilidAPIException>()));
|
throwsA(isA<VeilidAPIException>()));
|
||||||
@ -38,7 +42,8 @@ Future<void> testCloseDHTRecordNonexistent() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testDeleteDHTRecordNonexistent() async {
|
Future<void> testDeleteDHTRecordNonexistent() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
await expectLater(() async => await rc.deleteDHTRecord(bogusKey),
|
await expectLater(() async => await rc.deleteDHTRecord(bogusKey),
|
||||||
throwsA(isA<VeilidAPIException>()));
|
throwsA(isA<VeilidAPIException>()));
|
||||||
@ -48,7 +53,8 @@ Future<void> testDeleteDHTRecordNonexistent() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testCreateDeleteDHTRecordSimple() async {
|
Future<void> testCreateDeleteDHTRecordSimple() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1));
|
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1));
|
||||||
await rc.closeDHTRecord(rec.key);
|
await rc.closeDHTRecord(rec.key);
|
||||||
@ -59,7 +65,8 @@ Future<void> testCreateDeleteDHTRecordSimple() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testCreateDeleteDHTRecordNoClose() async {
|
Future<void> testCreateDeleteDHTRecordNoClose() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1));
|
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1));
|
||||||
await rc.deleteDHTRecord(rec.key);
|
await rc.deleteDHTRecord(rec.key);
|
||||||
@ -69,7 +76,8 @@ Future<void> testCreateDeleteDHTRecordNoClose() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testGetDHTValueNonexistent() async {
|
Future<void> testGetDHTValueNonexistent() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1));
|
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1));
|
||||||
expect(await rc.getDHTValue(rec.key, 0), isNull);
|
expect(await rc.getDHTValue(rec.key, 0), isNull);
|
||||||
@ -80,7 +88,8 @@ Future<void> testGetDHTValueNonexistent() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testSetGetDHTValue() async {
|
Future<void> testSetGetDHTValue() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2));
|
final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2));
|
||||||
expect(await rc.setDHTValue(rec.key, 0, utf8.encode("BLAH BLAH BLAH")),
|
expect(await rc.setDHTValue(rec.key, 0, utf8.encode("BLAH BLAH BLAH")),
|
||||||
@ -103,7 +112,8 @@ Future<void> testSetGetDHTValue() async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> testOpenWriterDHTValue() async {
|
Future<void> testOpenWriterDHTValue() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2));
|
var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2));
|
||||||
final key = rec.key;
|
final key = rec.key;
|
||||||
@ -223,8 +233,132 @@ Future<void> testOpenWriterDHTValue() async {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
|
||||||
|
final valueChangeQueue = StreamController<VeilidUpdateValueChange>();
|
||||||
|
final valueChangeSubscription = updateStream.listen((update) {
|
||||||
|
if (update is VeilidUpdateValueChange) {
|
||||||
|
// print("valuechange: " + update.toString());
|
||||||
|
valueChangeQueue.sink.add(update);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
final valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Make two routing contexts, one with and one without safety
|
||||||
|
// So we can pretend to be a different node and get the watch updates
|
||||||
|
// Normally they would not get sent if the set comes from the same target
|
||||||
|
// as the watch's target
|
||||||
|
|
||||||
|
final rcWatch = (await Veilid.instance.routingContext())
|
||||||
|
.withSequencing(Sequencing.ensureOrdered, closeSelf: true);
|
||||||
|
final rcSet = (await Veilid.instance.routingContext()).withSafety(
|
||||||
|
const SafetySelectionUnsafe(sequencing: Sequencing.ensureOrdered),
|
||||||
|
closeSelf: true);
|
||||||
|
try {
|
||||||
|
// Make a DHT record
|
||||||
|
var rec = await rcWatch.createDHTRecord(const DHTSchema.dflt(oCnt: 10));
|
||||||
|
|
||||||
|
// Set some subkey we care about
|
||||||
|
expect(
|
||||||
|
await rcWatch.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")),
|
||||||
|
isNull);
|
||||||
|
|
||||||
|
// Make a watch on that subkey
|
||||||
|
expect(await rcWatch.watchDHTValues(rec.key),
|
||||||
|
isNot(equals(Timestamp.zero())));
|
||||||
|
|
||||||
|
// Reopen without closing to change routing context and not lose watch
|
||||||
|
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
|
||||||
|
|
||||||
|
// Now set the subkey and trigger an update
|
||||||
|
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull);
|
||||||
|
|
||||||
|
// Wait for the update
|
||||||
|
await valueChangeQueueIterator
|
||||||
|
.moveNext()
|
||||||
|
.timeout(const Duration(seconds: 5), onTimeout: () {
|
||||||
|
fail("should have a change");
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify the update
|
||||||
|
expect(valueChangeQueueIterator.current.key, equals(rec.key));
|
||||||
|
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFE));
|
||||||
|
expect(valueChangeQueueIterator.current.subkeys,
|
||||||
|
equals([ValueSubkeyRange.single(3)]));
|
||||||
|
expect(valueChangeQueueIterator.current.value.seq, equals(1));
|
||||||
|
expect(valueChangeQueueIterator.current.value.data,
|
||||||
|
equals(utf8.encode("BLAH")));
|
||||||
|
expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner));
|
||||||
|
|
||||||
|
// Reopen without closing to change routing context and not lose watch
|
||||||
|
rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
|
||||||
|
|
||||||
|
// Cancel some subkeys we don't care about
|
||||||
|
expect(
|
||||||
|
await rcWatch
|
||||||
|
.cancelDHTWatch(rec.key, subkeys: [ValueSubkeyRange.make(0, 2)]),
|
||||||
|
isTrue);
|
||||||
|
|
||||||
|
// Reopen without closing to change routing context and not lose watch
|
||||||
|
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
|
||||||
|
|
||||||
|
// Change our subkey
|
||||||
|
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")),
|
||||||
|
isNull);
|
||||||
|
|
||||||
|
// Wait for the update
|
||||||
|
await valueChangeQueueIterator
|
||||||
|
.moveNext()
|
||||||
|
.timeout(const Duration(seconds: 5), onTimeout: () {
|
||||||
|
fail("should have a change");
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify the update
|
||||||
|
expect(valueChangeQueueIterator.current.key, equals(rec.key));
|
||||||
|
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD));
|
||||||
|
expect(valueChangeQueueIterator.current.subkeys,
|
||||||
|
equals([ValueSubkeyRange.single(3)]));
|
||||||
|
expect(valueChangeQueueIterator.current.value.seq, equals(2));
|
||||||
|
expect(valueChangeQueueIterator.current.value.data,
|
||||||
|
equals(utf8.encode("BLAH BLAH BLAH")));
|
||||||
|
expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner));
|
||||||
|
|
||||||
|
// Reopen without closing to change routing context and not lose watch
|
||||||
|
rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
|
||||||
|
|
||||||
|
// Now cancel the update
|
||||||
|
expect(
|
||||||
|
await rcWatch
|
||||||
|
.cancelDHTWatch(rec.key, subkeys: [ValueSubkeyRange.make(3, 9)]),
|
||||||
|
isFalse);
|
||||||
|
|
||||||
|
// Reopen without closing to change routing context and not lose watch
|
||||||
|
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
|
||||||
|
|
||||||
|
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull);
|
||||||
|
|
||||||
|
if (await valueChangeQueueIterator
|
||||||
|
.moveNext()
|
||||||
|
.timeout(const Duration(seconds: 5), onTimeout: () {
|
||||||
|
return false;
|
||||||
|
})) {
|
||||||
|
fail("should not have a change");
|
||||||
|
}
|
||||||
|
|
||||||
|
await rcSet.closeDHTRecord(rec.key);
|
||||||
|
await rcSet.deleteDHTRecord(rec.key);
|
||||||
|
} finally {
|
||||||
|
rcWatch.close();
|
||||||
|
rcSet.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
await valueChangeSubscription.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Future<void> testInspectDHTRecord() async {
|
Future<void> testInspectDHTRecord() async {
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2));
|
var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2));
|
||||||
|
|
||||||
|
@ -55,7 +55,8 @@ Future<void> testAppMessageLoopback(Stream<VeilidUpdate> updateStream) async {
|
|||||||
await Veilid.instance.debug("purge routes");
|
await Veilid.instance.debug("purge routes");
|
||||||
|
|
||||||
// make a routing context that uses a safety route
|
// make a routing context that uses a safety route
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
// make a new local private route
|
// make a new local private route
|
||||||
final prl = await Veilid.instance.newPrivateRoute();
|
final prl = await Veilid.instance.newPrivateRoute();
|
||||||
@ -96,7 +97,8 @@ Future<void> testAppCallLoopback(Stream<VeilidUpdate> updateStream) async {
|
|||||||
await Veilid.instance.debug("purge routes");
|
await Veilid.instance.debug("purge routes");
|
||||||
|
|
||||||
// make a routing context that uses a safety route
|
// make a routing context that uses a safety route
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
// make a new local private route
|
// make a new local private route
|
||||||
final prl = await Veilid.instance.newPrivateRoute();
|
final prl = await Veilid.instance.newPrivateRoute();
|
||||||
@ -153,7 +155,8 @@ Future<void> testAppMessageLoopbackBigPackets(
|
|||||||
await Veilid.instance.debug("purge routes");
|
await Veilid.instance.debug("purge routes");
|
||||||
|
|
||||||
// make a routing context that uses a safety route
|
// make a routing context that uses a safety route
|
||||||
final rc = await Veilid.instance.routingContext();
|
final rc = await Veilid.instance
|
||||||
|
.safeRoutingContext(sequencing: Sequencing.ensureOrdered);
|
||||||
try {
|
try {
|
||||||
// make a new local private route
|
// make a new local private route
|
||||||
final prl = await Veilid.instance.newPrivateRoute();
|
final prl = await Veilid.instance.newPrivateRoute();
|
||||||
|
@ -63,6 +63,7 @@ class VeilidVersion extends Equatable {
|
|||||||
@immutable
|
@immutable
|
||||||
class Timestamp extends Equatable implements Comparable<Timestamp> {
|
class Timestamp extends Equatable implements Comparable<Timestamp> {
|
||||||
const Timestamp({required this.value});
|
const Timestamp({required this.value});
|
||||||
|
factory Timestamp.zero() => Timestamp(value: BigInt.zero);
|
||||||
factory Timestamp.fromInt64(Int64 i64) => Timestamp(
|
factory Timestamp.fromInt64(Int64 i64) => Timestamp(
|
||||||
value: (BigInt.from((i64 >> 32).toUnsigned(32).toInt()) << 32) |
|
value: (BigInt.from((i64 >> 32).toUnsigned(32).toInt()) << 32) |
|
||||||
BigInt.from(i64.toUnsigned(32).toInt()));
|
BigInt.from(i64.toUnsigned(32).toInt()));
|
||||||
@ -144,6 +145,21 @@ abstract class Veilid {
|
|||||||
|
|
||||||
// Routing context
|
// Routing context
|
||||||
Future<VeilidRoutingContext> routingContext();
|
Future<VeilidRoutingContext> routingContext();
|
||||||
|
Future<VeilidRoutingContext> safeRoutingContext(
|
||||||
|
{Stability stability = Stability.lowLatency,
|
||||||
|
Sequencing sequencing = Sequencing.preferOrdered}) async {
|
||||||
|
final rc = await routingContext();
|
||||||
|
final originalSafety = await rc.safety() as SafetySelectionSafe;
|
||||||
|
final safetySpec = originalSafety.safetySpec
|
||||||
|
.copyWith(stability: stability, sequencing: sequencing);
|
||||||
|
return rc.withSafety(SafetySelectionSafe(safetySpec: safetySpec),
|
||||||
|
closeSelf: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<VeilidRoutingContext> unsafeRoutingContext(
|
||||||
|
{Sequencing sequencing = Sequencing.preferOrdered}) async =>
|
||||||
|
(await routingContext())
|
||||||
|
.withSafety(SafetySelectionUnsafe(sequencing: sequencing));
|
||||||
|
|
||||||
// Private route allocation
|
// Private route allocation
|
||||||
Future<RouteBlob> newPrivateRoute();
|
Future<RouteBlob> newPrivateRoute();
|
||||||
|
@ -702,7 +702,7 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext {
|
|||||||
Timestamp? expiration,
|
Timestamp? expiration,
|
||||||
int? count}) async {
|
int? count}) async {
|
||||||
subkeys ??= [];
|
subkeys ??= [];
|
||||||
expiration ??= Timestamp(value: BigInt.zero);
|
expiration ??= Timestamp.zero();
|
||||||
count ??= 0xFFFFFFFF;
|
count ??= 0xFFFFFFFF;
|
||||||
|
|
||||||
_ctx.ensureValid();
|
_ctx.ensureValid();
|
||||||
|
@ -207,7 +207,7 @@ class VeilidRoutingContextJS extends VeilidRoutingContext {
|
|||||||
Timestamp? expiration,
|
Timestamp? expiration,
|
||||||
int? count}) async {
|
int? count}) async {
|
||||||
subkeys ??= [];
|
subkeys ??= [];
|
||||||
expiration ??= Timestamp(value: BigInt.zero);
|
expiration ??= Timestamp.zero();
|
||||||
count ??= 0xFFFFFFFF;
|
count ??= 0xFFFFFFFF;
|
||||||
|
|
||||||
final id = _ctx.requireId();
|
final id = _ctx.requireId();
|
||||||
|
@ -218,14 +218,14 @@ async def test_watch_dht_values(api_connection: veilid.VeilidAPI):
|
|||||||
vd = await rc.set_dht_value(rec.key, 3, b"BLAH")
|
vd = await rc.set_dht_value(rec.key, 3, b"BLAH")
|
||||||
assert vd == None
|
assert vd == None
|
||||||
|
|
||||||
all_gone = await rc.cancel_dht_watch(rec.key, [(0, 2)])
|
still_active = await rc.cancel_dht_watch(rec.key, [(0, 2)])
|
||||||
assert all_gone == True
|
assert still_active == True
|
||||||
|
|
||||||
vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH")
|
vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH")
|
||||||
assert vd == None
|
assert vd == None
|
||||||
|
|
||||||
all_gone = await rc.cancel_dht_watch(rec.key, [(3, 9)])
|
still_active = await rc.cancel_dht_watch(rec.key, [(3, 9)])
|
||||||
assert all_gone == False
|
assert still_active == False
|
||||||
|
|
||||||
vd = await rc.set_dht_value(rec.key, 3, b"BLAH")
|
vd = await rc.set_dht_value(rec.key, 3, b"BLAH")
|
||||||
assert vd == None
|
assert vd == None
|
||||||
|
Loading…
Reference in New Issue
Block a user