short array cubit

This commit is contained in:
Christien Rioux 2024-01-25 09:30:02 -05:00
parent 1534a77ab1
commit 0291ff7224

View File

@ -1,5 +1,4 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:bloc/bloc.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
@ -10,34 +9,58 @@ class DHTShortArrayCubit<T> extends Cubit<AsyncValue<IList<T>>> {
DHTShortArrayCubit({
required DHTShortArray shortArray,
required T Function(List<int> data) decodeElement,
}) : super(const AsyncValue.loading()) {
}) : _shortArray = shortArray,
_decodeElement = decodeElement,
_wantsUpdate = false,
_isUpdating = false,
super(const AsyncValue.loading()) {
// Make initial state update
_update();
Future.delayed(Duration.zero, () async {
// Make initial state update
try {
final initialState = await initialStateFunction(record);
if (initialState != null) {
emit(AsyncValue.data(initialState));
}
} on Exception catch (e) {
emit(AsyncValue.error(e));
}
xxx do this now
shortArray. xxx add listen to head and linked records in dht_short_array
_subscription = await shortArray.listen(_update);
});
}
_subscription = await record.listen((update) async {
void _update() {
// Run at most one background update process
_wantsUpdate = true;
if (_isUpdating) {
return;
}
_isUpdating = true;
Future.delayed(Duration.zero, () async {
// Keep updating until we don't want to update any more
// Because this is async, we could get an update while we're
// still processing the last one
do {
_wantsUpdate = false;
try {
final newState =
await stateFunction(record, update.subkeys, update.valueData);
if (newState != null) {
emit(AsyncValue.data(newState));
}
final initialState = await _getElements();
emit(AsyncValue.data(initialState));
} on Exception catch (e) {
emit(AsyncValue.error(e));
}
});
} while (_wantsUpdate);
// Note that this update future has finished
_isUpdating = false;
});
}
// Get and decode the entire short array
Future<IList<T>> _getElements() async {
var out = IList<T>();
for (var i = 0; i < _shortArray.length; i++) {
// Get the element bytes (throw if fails, array state is invalid)
final bytes = (await _shortArray.getItem(i))!;
// Decode the element
final elem = _decodeElement(bytes);
// Append to the output list
out = out.add(elem);
}
return out;
}
@override
Future<void> close() async {
await _subscription?.cancel();
@ -45,40 +68,9 @@ xxx do this now
await super.close();
}
StreamSubscription<VeilidUpdateValueChange>? _subscription;
}
// Cubit that watches the default subkey value of a dhtrecord
class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
DefaultDHTRecordCubit({
required super.record,
required T Function(List<int> data) decodeState,
}) : super(
initialStateFunction: (record) async {
final initialData = await record.get();
if (initialData == null) {
return null;
}
return decodeState(initialData);
},
stateFunction: (record, subkeys, valueData) async {
final defaultSubkey = record.subkeyOrDefault(-1);
if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey) {
final maybeData = await record.get(forceRefresh: true);
if (maybeData == null) {
return null;
}
data = maybeData;
} else {
data = valueData.data;
}
final newState = decodeState(data);
return newState;
}
return null;
},
);
final DHTShortArray _shortArray;
final T Function(List<int> data) _decodeElement;
StreamSubscription<void>? _subscription;
bool _wantsUpdate;
bool _isUpdating;
}