import 'dart:async'; import 'dart:convert'; import 'package:flutter_test/flutter_test.dart'; import 'package:veilid/veilid.dart'; final bogusKey = TypedKey.fromString('VLD0:qD10lHHPD1_Qr23_Qy-1JnxTht12eaWwENVG_m2v7II'); Future testGetDHTValueUnopened() async { final rc = await Veilid.instance.routingContext(); try { await expectLater(() async => rc.getDHTValue(bogusKey, 0), throwsA(isA())); } finally { rc.close(); } } Future testOpenDHTRecordNonexistentNoWriter() async { final rc = await Veilid.instance.routingContext(); try { await expectLater(() async => rc.openDHTRecord(bogusKey), throwsA(isA())); } finally { rc.close(); } } Future testCloseDHTRecordNonexistent() async { final rc = await Veilid.instance.routingContext(); try { await expectLater(() async => rc.closeDHTRecord(bogusKey), throwsA(isA())); } finally { rc.close(); } } Future testDeleteDHTRecordNonexistent() async { final rc = await Veilid.instance.routingContext(); try { await expectLater(() async => rc.deleteDHTRecord(bogusKey), throwsA(isA())); } finally { rc.close(); } } Future testCreateDeleteDHTRecordSimple() async { final rc = await Veilid.instance.routingContext(); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1)); await rc.closeDHTRecord(rec.key); await rc.deleteDHTRecord(rec.key); } finally { rc.close(); } } Future testCreateDeleteDHTRecordNoClose() async { final rc = await Veilid.instance.routingContext(); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1)); await rc.deleteDHTRecord(rec.key); } finally { rc.close(); } } Future testGetDHTValueNonexistent() async { final rc = await Veilid.instance.routingContext(); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 1)); expect(await rc.getDHTValue(rec.key, 0), isNull); await rc.deleteDHTRecord(rec.key); } finally { rc.close(); } } Future testSetGetDHTValue() async { final rc = await Veilid.instance.routingContext(); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2)); expect(await rc.setDHTValue(rec.key, 0, utf8.encode('BLAH BLAH BLAH')), isNull); final vd2 = await rc.getDHTValue(rec.key, 0); expect(vd2, isNotNull); final vd3 = await rc.getDHTValue(rec.key, 0, forceRefresh: true); expect(vd3, isNotNull); final vd4 = await rc.getDHTValue(rec.key, 1); expect(vd4, isNull); expect(vd2, equals(vd3)); await rc.deleteDHTRecord(rec.key); } finally { rc.close(); } } Future testOpenWriterDHTValue() async { final rc = await Veilid.instance.routingContext(); try { var rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2)); final key = rec.key; final owner = rec.owner; final secret = rec.ownerSecret!; final cs = await Veilid.instance.getCryptoSystem(rec.key.kind); expect(await cs.validateKeyPair(owner, secret), isTrue); final otherKeyPair = await cs.generateKeyPair(); final va = utf8.encode('Qwertyuiop Asdfghjkl Zxcvbnm'); final vb = utf8.encode('1234567890'); final vc = utf8.encode(r'!@#$%^&*()'); // Test subkey writes expect(await rc.setDHTValue(key, 1, va), isNull); var vdtemp = await rc.getDHTValue(key, 1); expect(vdtemp, isNotNull); expect(vdtemp!.data, equals(va)); expect(vdtemp.seq, equals(0)); expect(vdtemp.writer, equals(owner)); expect(await rc.getDHTValue(key, 0), isNull); expect(await rc.setDHTValue(key, 0, vb), isNull); expect( await rc.getDHTValue(key, 0, forceRefresh: true), equals(ValueData( data: vb, seq: 0, writer: owner, ))); expect( await rc.getDHTValue(key, 1, forceRefresh: true), equals(ValueData( data: va, seq: 0, writer: owner, ))); // Equal value should not trigger sequence number update expect(await rc.setDHTValue(key, 1, va), isNull); // Different value should trigger sequence number update expect(await rc.setDHTValue(key, 1, vb), isNull); // Now that we initialized some subkeys // and verified they stored correctly // Delete things locally and reopen and see if we can write // with the same writer key // await rc.closeDHTRecord(key); await rc.deleteDHTRecord(key); rec = await rc.openDHTRecord(key, writer: KeyPair(key: owner, secret: secret)); expect(rec, isNotNull); expect(rec.key, equals(key)); expect(rec.owner, equals(owner)); expect(rec.ownerSecret, equals(secret)); expect(rec.schema, isA()); expect(rec.schema.oCnt, equals(2)); // Verify subkey 1 can be set before it is get but newer is available online vdtemp = await rc.setDHTValue(key, 1, vc); expect(vdtemp, isNotNull); expect(vdtemp!.data, equals(vb)); expect(vdtemp.seq, equals(1)); expect(vdtemp.writer, equals(owner)); // Verify subkey 1 can be set a second time // and it updates because seq is newer expect(await rc.setDHTValue(key, 1, vc), isNull); // Verify the network got the subkey update with a refresh check vdtemp = await rc.getDHTValue(key, 1, forceRefresh: true); expect(vdtemp, isNotNull); expect(vdtemp!.data, equals(vc)); expect(vdtemp.seq, equals(2)); expect(vdtemp.writer, equals(owner)); // Delete things locally and reopen and see if we can write // with a different writer key (should fail) await rc.closeDHTRecord(key); await rc.deleteDHTRecord(key); rec = await rc.openDHTRecord(key, writer: otherKeyPair); expect(rec, isNotNull); expect(rec.key, equals(key)); expect(rec.owner, equals(owner)); expect(rec.ownerSecret, isNull); expect(rec.schema, isA()); expect(rec.schema.oCnt, equals(2)); // Verify subkey 1 can NOT be set because we have the wrong writer await expectLater(() async => rc.setDHTValue(key, 1, va), throwsA(isA())); // Verify subkey 0 can NOT be set because we have the wrong writer await expectLater(() async => rc.setDHTValue(key, 0, va), throwsA(isA())); // Verify subkey 0 can be set because override with the right writer expect( await rc.setDHTValue(key, 0, va, writer: KeyPair(key: owner, secret: secret)), isNull); // Clean up await rc.closeDHTRecord(key); await rc.deleteDHTRecord(key); } finally { rc.close(); } } Future testWatchDHTValues(Stream updateStream) async { final valueChangeQueue = StreamController.broadcast(); final valueChangeSubscription = updateStream.listen((update) { if (update is VeilidUpdateValueChange) { // print("valuechange: " + update.toString()); valueChangeQueue.sink.add(update); } }); var valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); try { // Make two routing contexts, one with and one without safety // So we can pretend to be a different node and get the watch updates // Normally they would not get sent if the set comes from the same target // as the watch's target final rcWatch = await Veilid.instance.routingContext(); final rcSet = await Veilid.instance .unsafeRoutingContext(sequencing: Sequencing.ensureOrdered); try { // Make a DHT record var rec = await rcWatch.createDHTRecord(const DHTSchema.dflt(oCnt: 10)); // Set some subkey we care about expect( await rcWatch.setDHTValue(rec.key, 3, utf8.encode('BLAH BLAH BLAH')), isNull); // Make a watch on that subkey expect(await rcWatch.watchDHTValues(rec.key), isNot(equals(Timestamp.zero()))); // Reopen without closing to change routing context and not lose watch rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); // Now set the subkey and trigger an update expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode('BLAH')), isNull); // Now we should NOT get an update because the update // is the same as our local copy if (await valueChangeQueueIterator .moveNext() .timeout(const Duration(seconds: 5), onTimeout: () => false)) { fail('should not have a change'); } await valueChangeQueueIterator.cancel(); valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); // Now set multiple subkeys and trigger an update expect( await [ rcSet.setDHTValue(rec.key, 3, utf8.encode('BLAH BLAH')), rcSet.setDHTValue(rec.key, 4, utf8.encode('BZORT')) ].wait, equals([null, null])); // Wait for the update await valueChangeQueueIterator .moveNext() .timeout(const Duration(seconds: 5), onTimeout: () { fail('should have a change'); }); // Verify the update expect(valueChangeQueueIterator.current.key, equals(rec.key)); expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD)); expect(valueChangeQueueIterator.current.subkeys, equals([ValueSubkeyRange.make(3, 4)])); expect(valueChangeQueueIterator.current.value, isNull); // Reopen without closing to change routing context and not lose watch rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); // Cancel some subkeys we don't care about expect( await rcWatch .cancelDHTWatch(rec.key, subkeys: [ValueSubkeyRange.make(0, 2)]), isTrue); // Reopen without closing to change routing context and not lose watch rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); // Now set multiple subkeys and trigger an update expect( await [ rcSet.setDHTValue(rec.key, 3, utf8.encode('BLAH BLAH BLAH')), rcSet.setDHTValue(rec.key, 5, utf8.encode('BZORT BZORT')) ].wait, equals([null, null])); // Wait for the update await valueChangeQueueIterator .moveNext() .timeout(const Duration(seconds: 5), onTimeout: () { fail('should have a change'); }); // Verify the update came back but we don't get a new value because the // sequence number is the same expect(valueChangeQueueIterator.current.key, equals(rec.key)); expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFC)); expect(valueChangeQueueIterator.current.subkeys, equals([ValueSubkeyRange.single(3), ValueSubkeyRange.single(5)])); expect(valueChangeQueueIterator.current.value, isNull); // Reopen without closing to change routing context and not lose watch rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); // Now cancel the update expect( await rcWatch .cancelDHTWatch(rec.key, subkeys: [ValueSubkeyRange.make(3, 9)]), isFalse); // Reopen without closing to change routing context and not lose watch rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); // Now set multiple subkeys and trigger an update expect( await [ rcSet.setDHTValue(rec.key, 3, utf8.encode('BLAH BLAH BLAH BLAH')), rcSet.setDHTValue(rec.key, 5, utf8.encode('BZORT BZORT BZORT')) ].wait, equals([null, null])); // Now we should NOT get an update if (await valueChangeQueueIterator .moveNext() .timeout(const Duration(seconds: 5), onTimeout: () => false)) { fail('should not have a change'); } // Clean up await rcSet.closeDHTRecord(rec.key); await rcSet.deleteDHTRecord(rec.key); } finally { rcWatch.close(); rcSet.close(); } } finally { await valueChangeQueueIterator.cancel(); await valueChangeSubscription.cancel(); await valueChangeQueue.close(); } } Future testInspectDHTRecord() async { final rc = await Veilid.instance.routingContext(); try { final rec = await rc.createDHTRecord(const DHTSchema.dflt(oCnt: 2)); expect(await rc.setDHTValue(rec.key, 0, utf8.encode('BLAH BLAH BLAH')), isNull); final rr = await rc.inspectDHTRecord(rec.key); expect(rr.subkeys, equals([ValueSubkeyRange.make(0, 1)])); expect(rr.localSeqs, equals([0, 0xFFFFFFFF])); expect(rr.networkSeqs, equals([])); final rr2 = await rc.inspectDHTRecord(rec.key, scope: DHTReportScope.syncGet); expect(rr2.subkeys, equals([ValueSubkeyRange.make(0, 1)])); expect(rr2.localSeqs, equals([0, 0xFFFFFFFF])); expect(rr2.networkSeqs, equals([0, 0xFFFFFFFF])); await rc.closeDHTRecord(rec.key); await rc.deleteDHTRecord(rec.key); } finally { rc.close(); } }