From fcccacfafa6ec6b7dc0ed70ced13f552fed08a01 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 15 Feb 2024 11:05:59 -0700 Subject: [PATCH] checkpoint --- .../lib/dht_support/src/dht_record.dart | 181 ++++++++++++------ .../lib/dht_support/src/dht_record_cubit.dart | 2 +- .../lib/dht_support/src/dht_record_pool.dart | 2 +- .../lib/dht_support/src/dht_short_array.dart | 30 +-- .../veilid_support/lib/src/json_tools.dart | 9 +- .../lib/src/protobuf_tools.dart | 10 +- packages/veilid_support/pubspec.lock | 2 +- packages/veilid_support/pubspec.yaml | 3 + 8 files changed, 152 insertions(+), 87 deletions(-) diff --git a/packages/veilid_support/lib/dht_support/src/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record.dart index a9064d0..f7f66bd 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record.dart @@ -1,12 +1,28 @@ import 'dart:async'; import 'dart:typed_data'; +import 'package:equatable/equatable.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; import '../../../veilid_support.dart'; +@immutable +class DHTRecordWatchChange extends Equatable { + const DHTRecordWatchChange( + {required this.local, required this.data, required this.subkeys}); + + final bool local; + final Uint8List data; + final List subkeys; + + @override + List get props => [local, data, subkeys]; +} + +///////////////////////////////////////////////// + class DHTRecord { DHTRecord( {required VeilidRoutingContext routingContext, @@ -34,7 +50,7 @@ class DHTRecord { bool _open; bool _valid; @internal - StreamController? watchController; + StreamController? watchController; @internal bool needsWatchStateUpdate; @internal @@ -160,76 +176,100 @@ class DHTRecord { Future tryWriteBytes(Uint8List newValue, {int subkey = -1}) async { subkey = subkeyOrDefault(subkey); - newValue = await _crypto.encrypt(newValue, subkey); + final lastSeq = _subkeySeqCache[subkey]; + final encryptedNewValue = await _crypto.encrypt(newValue, subkey); // Set the new data if possible - var valueData = await _routingContext.setDHTValue( - _recordDescriptor.key, subkey, newValue); - if (valueData == null) { - // Get the data to check its sequence number - valueData = await _routingContext.getDHTValue( + var newValueData = await _routingContext.setDHTValue( + _recordDescriptor.key, subkey, encryptedNewValue); + if (newValueData == null) { + // A newer value wasn't found on the set, but + // we may get a newer value when getting the value for the sequence number + newValueData = await _routingContext.getDHTValue( _recordDescriptor.key, subkey, false); - assert(valueData != null, "can't get value that was just set"); - _subkeySeqCache[subkey] = valueData!.seq; + if (newValueData == null) { + assert(newValueData != null, "can't get value that was just set"); + return null; + } + } + + // Record new sequence number + final isUpdated = newValueData.seq != lastSeq; + _subkeySeqCache[subkey] = newValueData.seq; + + // See if the encrypted data returned is exactly the same + // if so, shortcut and don't bother decrypting it + if (newValueData.data == encryptedNewValue) { + if (isUpdated) { + addLocalValueChange(newValue, subkey); + } return null; } - _subkeySeqCache[subkey] = valueData.seq; - return valueData.data; + + // Decrypt value to return it + final decryptedNewValue = await _crypto.decrypt(newValueData.data, subkey); + if (isUpdated) { + addLocalValueChange(decryptedNewValue, subkey); + } + return decryptedNewValue; } Future eventualWriteBytes(Uint8List newValue, {int subkey = -1}) async { subkey = subkeyOrDefault(subkey); - newValue = await _crypto.encrypt(newValue, subkey); + final lastSeq = _subkeySeqCache[subkey]; + final encryptedNewValue = await _crypto.encrypt(newValue, subkey); - ValueData? valueData; + ValueData? newValueData; do { - // Set the new data - valueData = await _routingContext.setDHTValue( - _recordDescriptor.key, subkey, newValue); + do { + // Set the new data + newValueData = await _routingContext.setDHTValue( + _recordDescriptor.key, subkey, encryptedNewValue); - // Repeat if newer data on the network was found - } while (valueData != null); + // Repeat if newer data on the network was found + } while (newValueData != null); - // Get the data to check its sequence number - valueData = - await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false); - assert(valueData != null, "can't get value that was just set"); - _subkeySeqCache[subkey] = valueData!.seq; + // Get the data to check its sequence number + newValueData = await _routingContext.getDHTValue( + _recordDescriptor.key, subkey, false); + if (newValueData == null) { + assert(newValueData != null, "can't get value that was just set"); + return; + } + + // Record new sequence number + _subkeySeqCache[subkey] = newValueData.seq; + + // The encrypted data returned should be exactly the same + // as what we are trying to set, + // otherwise we still need to keep trying to set the value + } while (newValueData.data != encryptedNewValue); + + final isUpdated = newValueData.seq != lastSeq; + if (isUpdated) { + addLocalValueChange(newValue, subkey); + } } Future eventualUpdateBytes( - Future Function(Uint8List oldValue) update, + Future Function(Uint8List? oldValue) update, {int subkey = -1}) async { subkey = subkeyOrDefault(subkey); - // Get existing identity key, do not allow force refresh here + + // Get the existing data, do not allow force refresh here // because if we need a refresh the setDHTValue will fail anyway - var valueData = - await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false); - // Ensure it exists already - if (valueData == null) { - throw const FormatException('value does not exist'); - } + var oldValue = + await get(subkey: subkey, forceRefresh: false, onlyUpdates: false); + do { - // Update cache - _subkeySeqCache[subkey] = valueData!.seq; - // Update the data - final oldData = await _crypto.decrypt(valueData.data, subkey); - final updatedData = await update(oldData); - final newData = await _crypto.encrypt(updatedData, subkey); + final updatedValue = await update(oldValue); - // Set it back - valueData = await _routingContext.setDHTValue( - _recordDescriptor.key, subkey, newData); + // Try to write it back to the network + oldValue = await tryWriteBytes(updatedValue, subkey: subkey); - // Repeat if newer data on the network was found - } while (valueData != null); - - // Get the data to check its sequence number - valueData = - await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false); - assert(valueData != null, "can't get value that was just set"); - _subkeySeqCache[subkey] = valueData!.seq; + // Repeat update if newer data on the network was found + } while (oldValue != null); } Future tryWriteJson(T Function(dynamic) fromJson, T newValue, @@ -259,12 +299,12 @@ class DHTRecord { eventualWriteBytes(newValue.writeToBuffer(), subkey: subkey); Future eventualUpdateJson( - T Function(dynamic) fromJson, Future Function(T) update, + T Function(dynamic) fromJson, Future Function(T?) update, {int subkey = -1}) => eventualUpdateBytes(jsonUpdate(fromJson, update), subkey: subkey); Future eventualUpdateProtobuf( - T Function(List) fromBuffer, Future Function(T) update, + T Function(List) fromBuffer, Future Function(T?) update, {int subkey = -1}) => eventualUpdateBytes(protobufUpdate(fromBuffer, update), subkey: subkey); @@ -281,25 +321,34 @@ class DHTRecord { } } - Future> listen( - Future Function( - DHTRecord record, Uint8List data, List subkeys) - onUpdate, - ) async { + Future> listen( + Future Function( + DHTRecord record, Uint8List data, List subkeys) + onUpdate, + {bool localChanges = true}) async { // Set up watch requirements watchController ??= - StreamController.broadcast(onCancel: () { + StreamController.broadcast(onCancel: () { // If there are no more listeners then we can get rid of the controller watchController = null; }); return watchController!.stream.listen( - (update) { + (change) { + if (change.local && !localChanges) { + return; + } Future.delayed(Duration.zero, () async { - final out = await _crypto.decrypt( - update.valueData.data, update.subkeys.first.low); - - await onUpdate(this, out, update.subkeys); + final Uint8List data; + if (change.local) { + // local changes are not encrypted + data = change.data; + } else { + // incoming/remote changes are encrypted + data = + await _crypto.decrypt(change.data, change.subkeys.first.low); + } + await onUpdate(this, data, change.subkeys); }); }, cancelOnError: true, @@ -316,4 +365,14 @@ class DHTRecord { needsWatchStateUpdate = true; } } + + void addLocalValueChange(Uint8List data, int subkey) { + watchController?.add(DHTRecordWatchChange( + local: true, data: data, subkeys: [ValueSubkeyRange.single(subkey)])); + } + + void addRemoteValueChange(VeilidUpdateValueChange update) { + watchController?.add(DHTRecordWatchChange( + local: false, data: update.valueData.data, subkeys: update.subkeys)); + } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart index 86806cf..9c809b2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart @@ -102,7 +102,7 @@ class DHTRecordCubit extends Cubit> { DHTRecord get record => _record; - StreamSubscription? _subscription; + StreamSubscription? _subscription; late DHTRecord _record; bool _wantsCloseRecord; final StateFunction _stateFunction; diff --git a/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart index 208ac98..7c85d96 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart @@ -351,7 +351,7 @@ class DHTRecordPool with TableDBBacked { // Change for (final kv in _opened.entries) { if (kv.key == updateValueChange.key) { - kv.value.watchController?.add(updateValueChange); + kv.value.addRemoteValueChange(updateValueChange); break; } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array.dart index 1f7142e..11af592 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array.dart @@ -63,8 +63,7 @@ class DHTShortArray { _DHTShortArrayCache _head; // Subscription to head and linked record internal changes - final Map> - _subscriptions; + final Map> _subscriptions; // Stream of external changes StreamController? _watchController; // Watch mutex to ensure we keep the representation valid @@ -545,10 +544,10 @@ class DHTShortArray { } final result = await record!.get(subkey: recordSubkey); - if (result != null) { - // A change happened, notify any listeners - _watchController?.sink.add(null); - } + + // A change happened, notify any listeners + _watchController?.sink.add(null); + return result; } on Exception catch (_) { // Exception on write means state needs to be reverted @@ -607,8 +606,8 @@ class DHTShortArray { final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0); final result = await record.tryWriteBytes(newValue, subkey: recordSubkey); - if (result != null) { - // A change happened, notify any listeners + if (result == null) { + // A newer value was not found, so the change took _watchController?.sink.add(null); } return result; @@ -625,7 +624,7 @@ class DHTShortArray { } Future eventualUpdateItem( - int pos, Future Function(Uint8List oldValue) update) async { + int pos, Future Function(Uint8List? oldValue) update) async { var oldData = await getItem(pos); // Ensure it exists already if (oldData == null) { @@ -633,7 +632,7 @@ class DHTShortArray { } do { // Update the data - final updatedData = await update(oldData!); + final updatedData = await update(oldData); // Set it back oldData = await tryWriteItem(pos, updatedData); @@ -673,14 +672,14 @@ class DHTShortArray { Future eventualUpdateItemJson( T Function(dynamic) fromJson, int pos, - Future Function(T) update, + Future Function(T?) update, ) => eventualUpdateItem(pos, jsonUpdate(fromJson, update)); Future eventualUpdateItemProtobuf( T Function(List) fromBuffer, int pos, - Future Function(T) update, + Future Function(T?) update, ) => eventualUpdateItem(pos, protobufUpdate(fromBuffer, update)); @@ -692,14 +691,17 @@ class DHTShortArray { .wait; // Update changes to the head record + // Don't watch for local changes because this class already handles + // notifying listeners and knows when it makes local changes if (!_subscriptions.containsKey(_headRecord.key)) { _subscriptions[_headRecord.key] = - await _headRecord.listen(_onUpdateRecord); + await _headRecord.listen(localChanges: false, _onUpdateRecord); } // Update changes to any linked records for (final lr in _head.linkedRecords) { if (!_subscriptions.containsKey(lr.key)) { - _subscriptions[lr.key] = await lr.listen(_onUpdateRecord); + _subscriptions[lr.key] = + await lr.listen(localChanges: false, _onUpdateRecord); } } } on Exception { diff --git a/packages/veilid_support/lib/src/json_tools.dart b/packages/veilid_support/lib/src/json_tools.dart index e7bfd09..c5895d0 100644 --- a/packages/veilid_support/lib/src/json_tools.dart +++ b/packages/veilid_support/lib/src/json_tools.dart @@ -13,14 +13,15 @@ Uint8List jsonEncodeBytes(Object? object, utf8.encode(jsonEncode(object, toEncodable: toEncodable))); Future jsonUpdateBytes(T Function(dynamic) fromJson, - Uint8List oldBytes, Future Function(T) update) async { - final oldObj = fromJson(jsonDecode(utf8.decode(oldBytes))); + Uint8List? oldBytes, Future Function(T?) update) async { + final oldObj = + oldBytes == null ? null : fromJson(jsonDecode(utf8.decode(oldBytes))); final newObj = await update(oldObj); return jsonEncodeBytes(newObj); } -Future Function(Uint8List) jsonUpdate( - T Function(dynamic) fromJson, Future Function(T) update) => +Future Function(Uint8List?) jsonUpdate( + T Function(dynamic) fromJson, Future Function(T?) update) => (oldBytes) => jsonUpdateBytes(fromJson, oldBytes, update); T Function(Object?) genericFromJson( diff --git a/packages/veilid_support/lib/src/protobuf_tools.dart b/packages/veilid_support/lib/src/protobuf_tools.dart index c24302c..94dc6d1 100644 --- a/packages/veilid_support/lib/src/protobuf_tools.dart +++ b/packages/veilid_support/lib/src/protobuf_tools.dart @@ -4,14 +4,14 @@ import 'package:protobuf/protobuf.dart'; Future protobufUpdateBytes( T Function(List) fromBuffer, - Uint8List oldBytes, - Future Function(T) update) async { - final oldObj = fromBuffer(oldBytes); + Uint8List? oldBytes, + Future Function(T?) update) async { + final oldObj = oldBytes == null ? null : fromBuffer(oldBytes); final newObj = await update(oldObj); return Uint8List.fromList(newObj.writeToBuffer()); } -Future Function(Uint8List) +Future Function(Uint8List?) protobufUpdate( - T Function(List) fromBuffer, Future Function(T) update) => + T Function(List) fromBuffer, Future Function(T?) update) => (oldBytes) => protobufUpdateBytes(fromBuffer, oldBytes, update); diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index 54c4755..3ce09e2 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -411,7 +411,7 @@ packages: source: hosted version: "1.0.4" mutex: - dependency: transitive + dependency: "direct main" description: path: "../mutex" relative: true diff --git a/packages/veilid_support/pubspec.yaml b/packages/veilid_support/pubspec.yaml index b1d21c1..acbe5af 100644 --- a/packages/veilid_support/pubspec.yaml +++ b/packages/veilid_support/pubspec.yaml @@ -16,6 +16,9 @@ dependencies: json_annotation: ^4.8.1 loggy: ^2.0.3 meta: ^1.10.0 + mutex: + path: ../mutex + protobuf: ^3.0.0 veilid: # veilid: ^0.0.1