From 0291ff72247ebd86b000c5ba226c8bf175f1fae8 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 25 Jan 2024 09:30:02 -0500 Subject: [PATCH] short array cubit --- .../src/dht_short_array_cubit.dart | 104 ++++++++---------- 1 file changed, 48 insertions(+), 56 deletions(-) diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart index c1f1bd8..735d79a 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:typed_data'; import 'package:bloc/bloc.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; @@ -10,34 +9,58 @@ class DHTShortArrayCubit extends Cubit>> { DHTShortArrayCubit({ required DHTShortArray shortArray, required T Function(List data) decodeElement, - }) : super(const AsyncValue.loading()) { + }) : _shortArray = shortArray, + _decodeElement = decodeElement, + _wantsUpdate = false, + _isUpdating = false, + super(const AsyncValue.loading()) { + // Make initial state update + _update(); Future.delayed(Duration.zero, () async { - // Make initial state update - try { - final initialState = await initialStateFunction(record); - if (initialState != null) { - emit(AsyncValue.data(initialState)); - } - } on Exception catch (e) { - emit(AsyncValue.error(e)); - } -xxx do this now - shortArray. xxx add listen to head and linked records in dht_short_array + _subscription = await shortArray.listen(_update); + }); + } - _subscription = await record.listen((update) async { + void _update() { + // Run at most one background update process + _wantsUpdate = true; + if (_isUpdating) { + return; + } + _isUpdating = true; + Future.delayed(Duration.zero, () async { + // Keep updating until we don't want to update any more + // Because this is async, we could get an update while we're + // still processing the last one + do { + _wantsUpdate = false; try { - final newState = - await stateFunction(record, update.subkeys, update.valueData); - if (newState != null) { - emit(AsyncValue.data(newState)); - } + final initialState = await _getElements(); + emit(AsyncValue.data(initialState)); } on Exception catch (e) { emit(AsyncValue.error(e)); } - }); + } while (_wantsUpdate); + + // Note that this update future has finished + _isUpdating = false; }); } + // Get and decode the entire short array + Future> _getElements() async { + var out = IList(); + for (var i = 0; i < _shortArray.length; i++) { + // Get the element bytes (throw if fails, array state is invalid) + final bytes = (await _shortArray.getItem(i))!; + // Decode the element + final elem = _decodeElement(bytes); + // Append to the output list + out = out.add(elem); + } + return out; + } + @override Future close() async { await _subscription?.cancel(); @@ -45,40 +68,9 @@ xxx do this now await super.close(); } - StreamSubscription? _subscription; -} - -// Cubit that watches the default subkey value of a dhtrecord -class DefaultDHTRecordCubit extends DHTRecordCubit { - DefaultDHTRecordCubit({ - required super.record, - required T Function(List data) decodeState, - }) : super( - initialStateFunction: (record) async { - final initialData = await record.get(); - if (initialData == null) { - return null; - } - return decodeState(initialData); - }, - stateFunction: (record, subkeys, valueData) async { - final defaultSubkey = record.subkeyOrDefault(-1); - if (subkeys.containsSubkey(defaultSubkey)) { - final Uint8List data; - final firstSubkey = subkeys.firstOrNull!.low; - if (firstSubkey != defaultSubkey) { - final maybeData = await record.get(forceRefresh: true); - if (maybeData == null) { - return null; - } - data = maybeData; - } else { - data = valueData.data; - } - final newState = decodeState(data); - return newState; - } - return null; - }, - ); + final DHTShortArray _shortArray; + final T Function(List data) _decodeElement; + StreamSubscription? _subscription; + bool _wantsUpdate; + bool _isUpdating; }