From d3fb4b5b6a565c77f6f0db68b05eca6466d6844d Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 1 Feb 2024 09:41:58 -0500 Subject: [PATCH] xfer --- .../lib/dht_support/src/dht_record.dart | 11 +++++-- .../lib/dht_support/src/dht_record_cubit.dart | 31 ++++++++++++------- .../lib/dht_support/src/dht_short_array.dart | 11 +++---- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/packages/veilid_support/lib/dht_support/src/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record.dart index b6f1b6d..a9064d0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record.dart @@ -282,7 +282,9 @@ class DHTRecord { } Future> listen( - Future Function(VeilidUpdateValueChange update) onUpdate, + Future Function( + DHTRecord record, Uint8List data, List subkeys) + onUpdate, ) async { // Set up watch requirements watchController ??= @@ -293,7 +295,12 @@ class DHTRecord { return watchController!.stream.listen( (update) { - Future.delayed(Duration.zero, () => onUpdate(update)); + Future.delayed(Duration.zero, () async { + final out = await _crypto.decrypt( + update.valueData.data, update.subkeys.first.low); + + await onUpdate(this, out, update.subkeys); + }); }, cancelOnError: true, onError: (e) async { diff --git a/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart index 78e6eb6..94bd861 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart @@ -9,7 +9,7 @@ class DHTRecordCubit extends Cubit> { DHTRecordCubit({ required Future Function() open, required Future Function(DHTRecord) initialStateFunction, - required Future Function(DHTRecord, List, ValueData) + required Future Function(DHTRecord, List, Uint8List) stateFunction, }) : _wantsCloseRecord = false, super(const AsyncValue.loading()) { @@ -24,7 +24,7 @@ class DHTRecordCubit extends Cubit> { DHTRecordCubit.value({ required DHTRecord record, required Future Function(DHTRecord) initialStateFunction, - required Future Function(DHTRecord, List, ValueData) + required Future Function(DHTRecord, List, Uint8List) stateFunction, }) : _record = record, _wantsCloseRecord = false, @@ -36,7 +36,7 @@ class DHTRecordCubit extends Cubit> { Future _init( Future Function(DHTRecord) initialStateFunction, - Future Function(DHTRecord, List, ValueData) + Future Function(DHTRecord, List, Uint8List) stateFunction, ) async { // Make initial state update @@ -49,10 +49,9 @@ class DHTRecordCubit extends Cubit> { emit(AsyncValue.error(e)); } - _subscription = await _record.listen((update) async { + _subscription = await _record.listen((record, data, subkeys) async { try { - final newState = - await stateFunction(_record, update.subkeys, update.valueData); + final newState = await stateFunction(record, subkeys, data); if (newState != null) { emit(AsyncValue.data(newState)); } @@ -73,6 +72,16 @@ class DHTRecordCubit extends Cubit> { await super.close(); } + Future refresh(List subkeys) async { + for (final skr in subkeys) { + for (var sk = skr.low; sk <= skr.high; sk++) { + final data = await _record.get(subkey: sk, forceRefresh: true); + final newState = await stateFunction(_record, subkeys, data); + xxx continue here + } + } + } + StreamSubscription? _subscription; late DHTRecord _record; bool _wantsCloseRecord; @@ -105,9 +114,9 @@ class DefaultDHTRecordCubit extends DHTRecordCubit { return decodeState(initialData); }; - static Future Function(DHTRecord, List, ValueData) + static Future Function(DHTRecord, List, Uint8List) _makeStateFunction(T Function(List data) decodeState) => - (record, subkeys, valueData) async { + (record, subkeys, updatedata) async { final defaultSubkey = record.subkeyOrDefault(-1); if (subkeys.containsSubkey(defaultSubkey)) { final Uint8List data; @@ -119,7 +128,7 @@ class DefaultDHTRecordCubit extends DHTRecordCubit { } data = maybeData; } else { - data = valueData.data; + data = updatedata; } final newState = decodeState(data); return newState; @@ -127,6 +136,6 @@ class DefaultDHTRecordCubit extends DHTRecordCubit { return null; }; - xxx add refresh/get mechanism to DHTRecordCubit and here too, then propagage to conversation_cubit - xxx should just be a 'get' like in dht_short_array_cubit + // xxx add refresh/get mechanism to DHTRecordCubit and here too, then propagage to conversation_cubit + // xxx should just be a 'get' like in dht_short_array_cubit } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array.dart index 55f28e3..40d8b90 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array.dart @@ -721,21 +721,18 @@ class DHTShortArray { } // Called when a head or linked record changes - Future _onUpdateRecord(VeilidUpdateValueChange update) async { - final record = _head.linkedRecords.firstWhere( - (element) => element.key == update.key, - orElse: () => _headRecord); - + Future _onUpdateRecord( + DHTRecord record, Uint8List data, List subkeys) async { // If head record subkey zero changes, then the layout // of the dhtshortarray has changed var updateHead = false; - if (record == _headRecord && update.subkeys.containsSubkey(0)) { + if (record == _headRecord && subkeys.containsSubkey(0)) { updateHead = true; } // If we have any other subkeys to update, do them first final unord = List>.empty(growable: true); - for (final skr in update.subkeys) { + for (final skr in subkeys) { for (var subkey = skr.low; subkey <= skr.high; subkey++) { // Skip head subkey if (subkey == 0) {