fix head issue and refactor

This commit is contained in:
Christien Rioux 2024-04-01 09:04:54 -04:00
parent c48305cba1
commit 48c9c67ab8
20 changed files with 229 additions and 327 deletions

View file

@ -342,7 +342,7 @@ class DHTRecord {
void _addValueChange(
{required bool local,
required Uint8List data,
required Uint8List? data,
required List<ValueSubkeyRange> subkeys}) {
final ws = watchState;
if (ws != null) {
@ -378,6 +378,6 @@ class DHTRecord {
void _addRemoteValueChange(VeilidUpdateValueChange update) {
_addValueChange(
local: false, data: update.value.data, subkeys: update.subkeys);
local: false, data: update.value?.data, subkeys: update.subkeys);
}
}

View file

@ -28,19 +28,19 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
});
}
DHTRecordCubit.value({
required DHTRecord record,
required InitialStateFunction<T> initialStateFunction,
required StateFunction<T> stateFunction,
required WatchFunction watchFunction,
}) : _record = record,
_stateFunction = stateFunction,
_wantsCloseRecord = false,
super(const AsyncValue.loading()) {
Future.delayed(Duration.zero, () async {
await _init(initialStateFunction, stateFunction, watchFunction);
});
}
// DHTRecordCubit.value({
// required DHTRecord record,
// required InitialStateFunction<T> initialStateFunction,
// required StateFunction<T> stateFunction,
// required WatchFunction watchFunction,
// }) : _record = record,
// _stateFunction = stateFunction,
// _wantsCloseRecord = false,
// super(const AsyncValue.loading()) {
// Future.delayed(Duration.zero, () async {
// await _init(initialStateFunction, stateFunction, watchFunction);
// });
// }
Future<void> _init(
InitialStateFunction<T> initialStateFunction,
@ -123,13 +123,13 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
stateFunction: _makeStateFunction(decodeState),
watchFunction: _makeWatchFunction());
DefaultDHTRecordCubit.value({
required super.record,
required T Function(List<int> data) decodeState,
}) : super.value(
initialStateFunction: _makeInitialStateFunction(decodeState),
stateFunction: _makeStateFunction(decodeState),
watchFunction: _makeWatchFunction());
// DefaultDHTRecordCubit.value({
// required super.record,
// required T Function(List<int> data) decodeState,
// }) : super.value(
// initialStateFunction: _makeInitialStateFunction(decodeState),
// stateFunction: _makeStateFunction(decodeState),
// watchFunction: _makeWatchFunction());
static InitialStateFunction<T> _makeInitialStateFunction<T>(
T Function(List<int> data) decodeState) =>

View file

@ -1,7 +1,6 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
@ -169,90 +168,36 @@ class DHTShortArray {
}
/// Runs a closure allowing read-only access to the shortarray
Future<T> operate<T>(Future<T> Function(DHTShortArrayRead) closure) async =>
Future<T?> operate<T>(Future<T?> Function(DHTShortArrayRead) closure) async =>
_head.operate((head) async {
final reader = _DHTShortArrayRead._(head);
return closure(reader);
});
/// Runs a closure allowing read-write access to the shortarray
/// Makes only one attempt to consistently write the changes to the DHT
/// Returns (result, true) of the closure if the write could be performed
/// Returns (null, false) if the write could not be performed at this time
Future<(T?, bool)> operateWrite<T>(
Future<T> Function(DHTShortArrayWrite) closure) async =>
Future<T?> Function(DHTShortArrayWrite) closure) async =>
_head.operateWrite((head) async {
final writer = _DHTShortArrayWrite._(head);
return closure(writer);
});
/// Set an item at position 'pos' of the DHTShortArray. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualWriteItem(int pos, Uint8List newValue,
{Duration? timeout}) async {
await _head.operateWriteEventual((head) async {
bool wasSet;
(_, wasSet) = await _tryWriteItemInner(head, pos, newValue);
return wasSet;
}, timeout: timeout);
}
/// Change an item at position 'pos' of the DHTShortArray.
/// Runs with the value of the old element at that position such that it can
/// be changed to the returned value from tha closure. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualUpdateItem(
int pos, Future<Uint8List> Function(Uint8List? oldValue) update,
{Duration? timeout}) async {
await _head.operateWriteEventual((head) async {
final oldData = await getItem(pos);
// Update the data
final updatedData = await update(oldData);
// Set it back
bool wasSet;
(_, wasSet) = await _tryWriteItemInner(head, pos, updatedData);
return wasSet;
}, timeout: timeout);
}
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as JSON and parses
/// the returned element as JSON
Future<void> eventualWriteItemJson<T>(int pos, T newValue,
{Duration? timeout}) =>
eventualWriteItem(pos, jsonEncodeBytes(newValue), timeout: timeout);
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as a protobuf
/// object and parses the returned element as a protobuf object
Future<void> eventualWriteItemProtobuf<T extends GeneratedMessage>(
int pos, T newValue,
{int subkey = -1, Duration? timeout}) =>
eventualWriteItem(pos, newValue.writeToBuffer(), timeout: timeout);
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as JSON
Future<void> eventualUpdateItemJson<T>(
T Function(dynamic) fromJson, int pos, Future<T> Function(T?) update,
{Duration? timeout}) =>
eventualUpdateItem(pos, jsonUpdate(fromJson, update), timeout: timeout);
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as a protobuf
/// object
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
Future<T> Function(T?) update,
{Duration? timeout}) =>
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update),
timeout: timeout);
/// Runs a closure allowing read-write access to the shortarray
/// Will execute the closure multiple times if a consistent write to the DHT
/// is not achieved. Timeout if specified will be thrown as a
/// TimeoutException. The closure should return true if its changes also
/// succeeded, returning false will trigger another eventual consistency
/// attempt.
Future<void> operateWriteEventual(
Future<bool> Function(DHTShortArrayWrite) closure,
{Duration? timeout}) async =>
_head.operateWriteEventual((head) async {
final writer = _DHTShortArrayWrite._(head);
return closure(writer);
}, timeout: timeout);
Future<StreamSubscription<void>> listen(
void Function() onChanged,

View file

@ -4,7 +4,6 @@ import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:mutex/mutex.dart';
import '../../../veilid_support.dart';
@ -29,18 +28,18 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
});
}
DHTShortArrayCubit.value({
required DHTShortArray shortArray,
required T Function(List<int> data) decodeElement,
}) : _shortArray = shortArray,
_decodeElement = decodeElement,
super(const BlocBusyState(AsyncValue.loading())) {
_initFuture = Future(() async {
// Make initial state update
unawaited(_refreshNoWait());
_subscription = await shortArray.listen(_update);
});
}
// DHTShortArrayCubit.value({
// required DHTShortArray shortArray,
// required T Function(List<int> data) decodeElement,
// }) : _shortArray = shortArray,
// _decodeElement = decodeElement,
// super(const BlocBusyState(AsyncValue.loading())) {
// _initFuture = Future(() async {
// // Make initial state update
// unawaited(_refreshNoWait());
// _subscription = await shortArray.listen(_update);
// });
// }
Future<void> refresh({bool forceRefresh = false}) async {
await _initFuture;
@ -48,16 +47,15 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
}
Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
busy((emit) async => _operateMutex.protect(
() async => _refreshInner(emit, forceRefresh: forceRefresh)));
busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh));
Future<void> _refreshInner(void Function(AsyncValue<IList<T>>) emit,
{bool forceRefresh = false}) async {
try {
final newState =
(await _shortArray.getAllItems(forceRefresh: forceRefresh))
?.map(_decodeElement)
.toIList();
final newState = (await _shortArray.operate(
(reader) => reader.getAllItems(forceRefresh: forceRefresh)))
?.map(_decodeElement)
.toIList();
if (newState != null) {
emit(AsyncValue.data(newState));
}
@ -71,8 +69,8 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
// Because this is async, we could get an update while we're
// still processing the last one. Only called after init future has run
// so we dont have to wait for that here.
_sspUpdate.busyUpdate<T, AsyncValue<IList<T>>>(busy,
(emit) async => _operateMutex.protect(() async => _refreshInner(emit)));
_sspUpdate.busyUpdate<T, AsyncValue<IList<T>>>(
busy, (emit) async => _refreshInner(emit));
}
@override
@ -86,12 +84,24 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
await super.close();
}
Future<R?> operate<R>(Future<R?> Function(DHTShortArray) closure) async {
Future<R?> operate<R>(Future<R?> Function(DHTShortArrayRead) closure) async {
await _initFuture;
return _operateMutex.protect(() async => closure(_shortArray));
return _shortArray.operate(closure);
}
Future<(R?, bool)> operateWrite<R>(
Future<R?> Function(DHTShortArrayWrite) closure) async {
await _initFuture;
return _shortArray.operateWrite(closure);
}
Future<void> operateWriteEventual(
Future<bool> Function(DHTShortArrayWrite) closure,
{Duration? timeout}) async {
await _initFuture;
return _shortArray.operateWriteEventual(closure, timeout: timeout);
}
final _operateMutex = Mutex();
late final Future<void> _initFuture;
late final DHTShortArray _shortArray;
final T Function(List<int> data) _decodeElement;

View file

@ -32,7 +32,7 @@ class _DHTShortArrayHead {
final head = proto.DHTShortArray();
head.keys.addAll(_linkedRecords.map((lr) => lr.key.toProto()));
head.index.addAll(_index);
head.index = List.of(_index);
head.seqs.addAll(_seqs);
// Do not serialize free list, it gets recreated
// Do not serialize local seqs, they are only locally relevant
@ -58,7 +58,7 @@ class _DHTShortArrayHead {
});
Future<(T?, bool)> operateWrite<T>(
Future<T> Function(_DHTShortArrayHead) closure) async =>
Future<T?> Function(_DHTShortArrayHead) closure) async =>
_headMutex.protect(() async {
final oldLinkedRecords = List.of(_linkedRecords);
final oldIndex = List.of(_index);
@ -111,14 +111,22 @@ class _DHTShortArrayHead {
oldSeqs = List.of(_seqs);
// Try to do the element write
do {
while (true) {
if (timeoutTs != null) {
final now = Veilid.instance.now();
if (now >= timeoutTs) {
throw TimeoutException('timeout reached');
}
}
} while (!await closure(this));
if (await closure(this)) {
break;
}
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);
_free = List.of(oldFree);
_seqs = List.of(oldSeqs);
}
// Try to do the head write
} while (!await _writeHead());