make valuechanged update no longer happen when value hasn't changed or is older

This commit is contained in:
Christien Rioux 2024-03-31 16:34:12 -04:00
parent 2ec00e18da
commit 6e1439306a
13 changed files with 190 additions and 112 deletions

View file

@ -225,14 +225,15 @@ Future<void> testOpenWriterDHTValue() async {
}
Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
final valueChangeQueue = StreamController<VeilidUpdateValueChange>();
final valueChangeQueue =
StreamController<VeilidUpdateValueChange>.broadcast();
final valueChangeSubscription = updateStream.listen((update) {
if (update is VeilidUpdateValueChange) {
// print("valuechange: " + update.toString());
valueChangeQueue.sink.add(update);
}
});
final valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream);
var valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream);
try {
// Make two routing contexts, one with and one without safety
@ -262,6 +263,25 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Now set the subkey and trigger an update
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull);
// Now we should NOT get an update because the update
// is the same as our local copy
if (await valueChangeQueueIterator
.moveNext()
.timeout(const Duration(seconds: 5), onTimeout: () {
return false;
})) {
fail("should not have a change");
}
valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream);
// Now set multiple subkeys and trigger an update
expect(
await [
rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH")),
rcSet.setDHTValue(rec.key, 4, utf8.encode("BZORT"))
].wait,
equals([null, null]));
// Wait for the update
await valueChangeQueueIterator
.moveNext()
@ -271,13 +291,10 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Verify the update
expect(valueChangeQueueIterator.current.key, equals(rec.key));
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFE));
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD));
expect(valueChangeQueueIterator.current.subkeys,
equals([ValueSubkeyRange.single(3)]));
expect(valueChangeQueueIterator.current.value.seq, equals(1));
expect(valueChangeQueueIterator.current.value.data,
equals(utf8.encode("BLAH")));
expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner));
equals([ValueSubkeyRange.make(3, 4)]));
expect(valueChangeQueueIterator.current.value, isNull);
// Reopen without closing to change routing context and not lose watch
rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
@ -291,9 +308,13 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Reopen without closing to change routing context and not lose watch
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
// Change our subkey
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")),
isNull);
// Now set multiple subkeys and trigger an update
expect(
await [
rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")),
rcSet.setDHTValue(rec.key, 5, utf8.encode("BZORT BZORT"))
].wait,
equals([null, null]));
// Wait for the update
await valueChangeQueueIterator
@ -302,15 +323,12 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
fail("should have a change");
});
// Verify the update
// Verify the update came back but we don't get a new value because the sequence number is the same
expect(valueChangeQueueIterator.current.key, equals(rec.key));
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD));
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFC));
expect(valueChangeQueueIterator.current.subkeys,
equals([ValueSubkeyRange.single(3)]));
expect(valueChangeQueueIterator.current.value.seq, equals(2));
expect(valueChangeQueueIterator.current.value.data,
equals(utf8.encode("BLAH BLAH BLAH")));
expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner));
equals([ValueSubkeyRange.single(3), ValueSubkeyRange.single(5)]));
expect(valueChangeQueueIterator.current.value, isNull);
// Reopen without closing to change routing context and not lose watch
rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
@ -324,8 +342,13 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Reopen without closing to change routing context and not lose watch
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
// Set the value without a watch
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull);
// Now set multiple subkeys and trigger an update
expect(
await [
rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH BLAH")),
rcSet.setDHTValue(rec.key, 5, utf8.encode("BZORT BZORT BZORT"))
].wait,
equals([null, null]));
// Now we should NOT get an update
if (await valueChangeQueueIterator