better message status support

This commit is contained in:
Christien Rioux 2024-04-17 21:31:26 -04:00
parent 4f02435964
commit 809f6d69bf
31 changed files with 1046 additions and 248 deletions

View file

@ -5,12 +5,20 @@ import 'package:bloc/bloc.dart';
abstract class FutureCubit<State> extends Cubit<AsyncValue<State>> {
FutureCubit(Future<State> fut) : super(const AsyncValue.loading()) {
unawaited(fut.then((value) {
emit(AsyncValue.data(value));
// ignore: avoid_types_on_closure_parameters
}, onError: (Object e, StackTrace stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}));
_initWait.add(() async => fut.then((value) {
emit(AsyncValue.data(value));
// ignore: avoid_types_on_closure_parameters
}, onError: (Object e, StackTrace stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}));
}
FutureCubit.value(State state) : super(AsyncValue.data(state));
@override
Future<void> close() async {
await _initWait();
await super.close();
}
final WaitSet _initWait = WaitSet();
}

View file

@ -86,7 +86,7 @@ class DHTRecord {
if (_open) {
await close();
}
await DHTRecordPool.instance.delete(key);
await DHTRecordPool.instance.deleteRecord(key);
rethrow;
}
}

View file

@ -109,7 +109,7 @@ class OpenedRecordInfo {
String get sharedDetails => shared.toString();
}
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
: _state = const DHTRecordPoolAllocations(),
_mutex = Mutex(),
@ -150,7 +150,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
? DHTRecordPoolAllocations.fromJson(obj)
: const DHTRecordPoolAllocations();
@override
Object? valueToJson(DHTRecordPoolAllocations val) => val.toJson();
Object? valueToJson(DHTRecordPoolAllocations? val) => val?.toJson();
//////////////////////////////////////////////////////////////
@ -161,7 +161,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
final globalPool = DHTRecordPool._(Veilid.instance, routingContext);
globalPool
.._logger = logger
.._state = await globalPool.load();
.._state = await globalPool.load() ?? const DHTRecordPoolAllocations();
_singleton = globalPool;
}
@ -279,7 +279,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
if (openedRecordInfo.records.isEmpty) {
await _routingContext.closeDHTRecord(key);
if (openedRecordInfo.shared.deleteOnClose) {
await _deleteInner(key);
await _deleteRecordInner(key);
}
_opened.remove(key);
}
@ -316,7 +316,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}
}
Future<void> _deleteInner(TypedKey recordKey) async {
Future<void> _deleteRecordInner(TypedKey recordKey) async {
log('deleteDHTRecord: key=$recordKey');
// Remove this child from parents
@ -324,7 +324,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
await _routingContext.deleteDHTRecord(recordKey);
}
Future<void> delete(TypedKey recordKey) async {
Future<void> deleteRecord(TypedKey recordKey) async {
await _mutex.protect(() async {
final allDeps = _collectChildrenInner(recordKey);
@ -339,7 +339,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
ori.shared.deleteOnClose = true;
} else {
// delete now
await _deleteInner(recordKey);
await _deleteRecordInner(recordKey);
}
});
}

View file

@ -69,7 +69,7 @@ class DHTShortArray {
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.close();
await pool.delete(dhtRecord.key);
await pool.deleteRecord(dhtRecord.key);
rethrow;
}
}
@ -152,7 +152,7 @@ class DHTShortArray {
/// Free all resources for the DHTShortArray and delete it from the DHT
Future<void> delete() async {
await close();
await DHTRecordPool.instance.delete(recordKey);
await DHTRecordPool.instance.deleteRecord(recordKey);
}
/// Runs a closure that guarantees the DHTShortArray
@ -212,6 +212,8 @@ class DHTShortArray {
return closure(writer);
}, timeout: timeout);
/// Listen to and any all changes to the structure of this short array
/// regardless of where the changes are coming from
Future<StreamSubscription<void>> listen(
void Function() onChanged,
) =>

View file

@ -3,11 +3,24 @@ import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
typedef DHTShortArrayState<T> = AsyncValue<IList<T>>;
@immutable
class DHTShortArrayElementState<T> extends Equatable {
const DHTShortArrayElementState(
{required this.value, required this.isOffline});
final T value;
final bool isOffline;
@override
List<Object?> get props => [value, isOffline];
}
typedef DHTShortArrayState<T> = AsyncValue<IList<DHTShortArrayElementState<T>>>;
typedef DHTShortArrayBusyState<T> = BlocBusyState<DHTShortArrayState<T>>;
class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
@ -49,13 +62,19 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh));
Future<void> _refreshInner(void Function(AsyncValue<IList<T>>) emit,
Future<void> _refreshInner(void Function(DHTShortArrayState<T>) emit,
{bool forceRefresh = false}) async {
try {
final newState = (await _shortArray.operate(
(reader) => reader.getAllItems(forceRefresh: forceRefresh)))
?.map(_decodeElement)
.toIList();
final newState = await _shortArray.operate((reader) async {
final offlinePositions = await reader.getOfflinePositions();
final allItems = (await reader.getAllItems(forceRefresh: forceRefresh))
?.indexed
.map((x) => DHTShortArrayElementState(
value: _decodeElement(x.$2),
isOffline: offlinePositions.contains(x.$1)))
.toIList();
return allItems;
});
if (newState != null) {
emit(AsyncValue.data(newState));
}

View file

@ -15,6 +15,9 @@ abstract class DHTShortArrayRead {
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false});
/// Get a list of the positions that were written offline and not flushed yet
Future<Set<int>> getOfflinePositions();
}
extension DHTShortArrayReadExt on DHTShortArrayRead {
@ -96,6 +99,40 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
return out;
}
/// Get a list of the positions that were written offline and not flushed yet
@override
Future<Set<int>> getOfflinePositions() async {
final indexOffline = <int>{};
final inspects = await [
_head._headRecord.inspect(),
..._head._linkedRecords.map((lr) => lr.inspect())
].wait;
// Add to offline index
var strideOffset = 0;
for (final inspect in inspects) {
for (final r in inspect.offlineSubkeys) {
for (var i = r.low; i <= r.high; i++) {
// If this is the head record, ignore the first head subkey
if (strideOffset != 0 || i != 0) {
indexOffline.add(i + ((strideOffset == 0) ? -1 : strideOffset));
}
}
}
strideOffset += _head._stride;
}
// See which positions map to offline indexes
final positionOffline = <int>{};
for (var i = 0; i < _head._index.length; i++) {
final idx = _head._index[i];
if (indexOffline.contains(idx)) {
positionOffline.add(i);
}
}
return positionOffline;
}
////////////////////////////////////////////////////////////////////////////
// Fields
final _DHTShortArrayHead _head;

View file

@ -92,12 +92,11 @@ extension DHTShortArrayWriteExt on DHTShortArrayWrite {
}
////////////////////////////////////////////////////////////////////////////
// Writer-only implementation
// Writer implementation
class _DHTShortArrayWrite implements DHTShortArrayWrite {
_DHTShortArrayWrite._(_DHTShortArrayHead head)
: _head = head,
_reader = _DHTShortArrayRead._(head);
class _DHTShortArrayWrite extends _DHTShortArrayRead
implements DHTShortArrayWrite {
_DHTShortArrayWrite._(super.head) : super._();
@override
Future<bool> tryAddItem(Uint8List value) async {
@ -187,23 +186,4 @@ class _DHTShortArrayWrite implements DHTShortArrayWrite {
}
return (oldValue, true);
}
////////////////////////////////////////////////////////////////////////////
// Reader passthrough
@override
int get length => _reader.length;
@override
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) =>
_reader.getItem(pos, forceRefresh: forceRefresh);
@override
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) =>
_reader.getAllItems(forceRefresh: forceRefresh);
////////////////////////////////////////////////////////////////////////////
// Fields
final _DHTShortArrayHead _head;
final _DHTShortArrayRead _reader;
}

View file

@ -2,48 +2,47 @@ import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
import 'table_db.dart';
abstract class AsyncTableDBBackedCubit<State> extends Cubit<AsyncValue<State>>
with TableDBBacked<State> {
abstract class AsyncTableDBBackedCubit<T> extends Cubit<AsyncValue<T?>>
with TableDBBackedJson<T?> {
AsyncTableDBBackedCubit() : super(const AsyncValue.loading()) {
unawaited(Future.delayed(Duration.zero, _build));
_initWait.add(_build);
}
@override
Future<void> close() async {
// Ensure the init finished
await _initWait();
// Wait for any setStates to finish
await _mutex.acquire();
await super.close();
}
Future<void> _build() async {
try {
emit(AsyncValue.data(await load()));
await _mutex.protect(() async {
emit(AsyncValue.data(await load()));
});
} on Exception catch (e, stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}
}
Future<State> readyData() async {
final stateStream = stream.distinct();
await for (final AsyncValue<State> av in stateStream) {
final d = av.when(
data: (value) => value, loading: () => null, error: (e, s) => null);
if (d != null) {
return d;
}
final ef = av.when(
data: (value) => null,
loading: () => null,
error: Future<State>.error);
if (ef != null) {
return ef;
}
}
return Future<State>.error(
StateError("data never became ready in cubit '$runtimeType'"));
}
Future<void> setState(State newState) async {
@protected
Future<void> setState(T? newState) async {
await _initWait();
try {
emit(AsyncValue.data(await store(newState)));
} on Exception catch (e, stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}
}
final WaitSet _initWait = WaitSet();
final Mutex _mutex = Mutex();
}

View file

@ -93,7 +93,7 @@ extension IdentityMasterExtension on IdentityMaster {
/// Deletes a master identity and the identity record under it
Future<void> delete() async {
final pool = DHTRecordPool.instance;
await pool.delete(masterRecordKey);
await pool.deleteRecord(masterRecordKey);
}
Future<VeilidCryptoSystem> get identityCrypto =>
@ -111,6 +111,9 @@ extension IdentityMasterExtension on IdentityMaster {
TypedKey identityPublicTypedKey() =>
TypedKey(kind: identityRecordKey.kind, value: identityPublicKey);
TypedKey masterPublicTypedKey() =>
TypedKey(kind: identityRecordKey.kind, value: masterPublicKey);
Future<VeilidCryptoSystem> validateIdentitySecret(
SecretKey identitySecret) async {
final cs = await identityCrypto;

View file

@ -0,0 +1,194 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import 'table_db.dart';
class PersistentQueueCubit<T extends GeneratedMessage>
extends Cubit<AsyncValue<IList<T>>> with TableDBBackedFromBuffer<IList<T>> {
//
PersistentQueueCubit(
{required String table,
required String key,
required T Function(Uint8List) fromBuffer,
bool deleteOnClose = true})
: _table = table,
_key = key,
_fromBuffer = fromBuffer,
_deleteOnClose = deleteOnClose,
super(const AsyncValue.loading()) {
_initWait.add(_build);
unawaited(Future.delayed(Duration.zero, () async {
await for (final elem in _syncAddController.stream) {
await addAll(elem);
}
}));
}
@override
Future<void> close() async {
// Ensure the init finished
await _initWait();
// Close the sync add stream
await _syncAddController.close();
// Wait for any setStates to finish
await _stateMutex.acquire();
// Clean up table if desired
if (_deleteOnClose) {
await delete();
}
await super.close();
}
Future<void> _build() async {
await _stateMutex.protect(() async {
try {
emit(AsyncValue.data(await load() ?? await store(IList<T>.empty())));
} on Exception catch (e, stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}
});
}
Future<void> _setStateInner(IList<T> newState) async {
emit(AsyncValue.data(await store(newState)));
}
Future<void> add(T item) async {
await _initWait();
await _stateMutex.protect(() async {
final queue = state.asData!.value.add(item);
await _setStateInner(queue);
});
}
Future<void> addAll(IList<T> items) async {
await _initWait();
await _stateMutex.protect(() async {
var queue = state.asData!.value;
for (final item in items) {
queue = queue.add(item);
}
await _setStateInner(queue);
});
}
void addSync(T item) {
_syncAddController.sink.add([item].toIList());
}
void addAllSync(IList<T> items) {
_syncAddController.sink.add(items.toIList());
}
Future<bool> get isEmpty async {
await _initWait();
return state.asData!.value.isEmpty;
}
Future<bool> get isNotEmpty async {
await _initWait();
return state.asData!.value.isNotEmpty;
}
Future<int> get length async {
await _initWait();
return state.asData!.value.length;
}
// Future<T?> pop() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final removedItem = Output<T>();
// final queue = state.asData!.value.removeAt(0, removedItem);
// await _setStateInner(queue);
// return removedItem.value;
// }));
// }
// Future<IList<T>> popAll() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final queue = state.asData!.value;
// await _setStateInner(IList<T>.empty);
// return queue;
// }));
// }
Future<R> process<R>(Future<R> Function(IList<T>) closure,
{int? count}) async {
await _initWait();
// Only one processor at a time
return _processingMutex.protect(() async {
// Take 'count' items from the front of the list
final toProcess = await _stateMutex.protect(() async {
final queue = state.asData!.value;
final processCount = (count ?? queue.length).clamp(0, queue.length);
return queue.take(processCount).toIList();
});
// Run the processing closure
final processCount = toProcess.length;
final out = await closure(toProcess);
// If there was nothing to process just return
if (toProcess.isEmpty) {
return out;
}
// If there was no exception, remove the processed items
return _stateMutex.protect(() async {
// Get the queue from the state again as items could
// have been added during processing
final queue = state.asData!.value;
final newQueue = queue.skip(processCount).toIList();
await _setStateInner(newQueue);
return out;
});
});
}
// TableDBBacked
@override
String tableKeyName() => _key;
@override
String tableName() => _table;
@override
IList<T> valueFromBuffer(Uint8List bytes) {
final reader = CodedBufferReader(bytes);
var out = IList<T>();
while (!reader.isAtEnd()) {
out = out.add(_fromBuffer(reader.readBytesAsView()));
}
return out;
}
@override
Uint8List valueToBuffer(IList<T> val) {
final writer = CodedBufferWriter();
for (final elem in val) {
writer.writeRawBytes(elem.writeToBuffer());
}
return writer.toBuffer();
}
final String _table;
final String _key;
final T Function(Uint8List) _fromBuffer;
final bool _deleteOnClose;
final WaitSet _initWait = WaitSet();
final Mutex _stateMutex = Mutex();
final Mutex _processingMutex = Mutex();
final StreamController<IList<T>> _syncAddController = StreamController();
}

View file

@ -1,6 +1,9 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:meta/meta.dart';
import 'package:veilid/veilid.dart';
Future<T> tableScope<T>(
@ -32,14 +35,19 @@ Future<T> transactionScope<T>(
}
}
abstract mixin class TableDBBacked<T> {
abstract mixin class TableDBBackedJson<T> {
@protected
String tableName();
@protected
String tableKeyName();
T valueFromJson(Object? obj);
Object? valueToJson(T val);
@protected
T? valueFromJson(Object? obj);
@protected
Object? valueToJson(T? val);
/// Load things from storage
Future<T> load() async {
@protected
Future<T?> load() async {
final obj = await tableScope(tableName(), (tdb) async {
final objJson = await tdb.loadStringJson(0, tableKeyName());
return valueFromJson(objJson);
@ -48,28 +56,98 @@ abstract mixin class TableDBBacked<T> {
}
/// Store things to storage
@protected
Future<T> store(T obj) async {
await tableScope(tableName(), (tdb) async {
await tdb.storeStringJson(0, tableKeyName(), valueToJson(obj));
});
return obj;
}
/// Delete things from storage
@protected
Future<T?> delete() async {
final obj = await tableScope(tableName(), (tdb) async {
final objJson = await tdb.deleteStringJson(0, tableKeyName());
return valueFromJson(objJson);
});
return obj;
}
}
class TableDBValue<T> extends TableDBBacked<T> {
abstract mixin class TableDBBackedFromBuffer<T> {
@protected
String tableName();
@protected
String tableKeyName();
@protected
T valueFromBuffer(Uint8List bytes);
@protected
Uint8List valueToBuffer(T val);
/// Load things from storage
@protected
Future<T?> load() async {
final obj = await tableScope(tableName(), (tdb) async {
final objBytes = await tdb.load(0, utf8.encode(tableKeyName()));
if (objBytes == null) {
return null;
}
return valueFromBuffer(objBytes);
});
return obj;
}
/// Store things to storage
@protected
Future<T> store(T obj) async {
await tableScope(tableName(), (tdb) async {
await tdb.store(0, utf8.encode(tableKeyName()), valueToBuffer(obj));
});
return obj;
}
/// Delete things from storage
@protected
Future<T?> delete() async {
final obj = await tableScope(tableName(), (tdb) async {
final objBytes = await tdb.delete(0, utf8.encode(tableKeyName()));
if (objBytes == null) {
return null;
}
return valueFromBuffer(objBytes);
});
return obj;
}
}
class TableDBValue<T> extends TableDBBackedJson<T> {
TableDBValue({
required String tableName,
required String tableKeyName,
required T Function(Object? obj) valueFromJson,
required Object? Function(T obj) valueToJson,
required T? Function(Object? obj) valueFromJson,
required Object? Function(T? obj) valueToJson,
required T Function() makeInitialValue,
}) : _tableName = tableName,
_valueFromJson = valueFromJson,
_valueToJson = valueToJson,
_tableKeyName = tableKeyName,
_streamController = StreamController<T>.broadcast();
_makeInitialValue = makeInitialValue,
_streamController = StreamController<T>.broadcast() {
_initWait.add(() async {
await get();
});
}
AsyncData<T>? get value => _value;
T get requireValue => _value!.value;
Future<void> init() async {
await _initWait();
}
Future<void> close() async {
await _initWait();
}
T get value => _value!.value;
Stream<T> get stream => _streamController.stream;
Future<T> get() async {
@ -77,7 +155,7 @@ class TableDBValue<T> extends TableDBBacked<T> {
if (val != null) {
return val.value;
}
final loadedValue = await load();
final loadedValue = await load() ?? await store(_makeInitialValue());
_value = AsyncData(loadedValue);
return loadedValue;
}
@ -88,11 +166,13 @@ class TableDBValue<T> extends TableDBBacked<T> {
}
AsyncData<T>? _value;
final T Function() _makeInitialValue;
final String _tableName;
final String _tableKeyName;
final T Function(Object? obj) _valueFromJson;
final Object? Function(T obj) _valueToJson;
final T? Function(Object? obj) _valueFromJson;
final Object? Function(T? obj) _valueToJson;
final StreamController<T> _streamController;
final WaitSet _initWait = WaitSet();
//////////////////////////////////////////////////////////////
/// AsyncTableDBBacked
@ -101,7 +181,7 @@ class TableDBValue<T> extends TableDBBacked<T> {
@override
String tableKeyName() => _tableKeyName;
@override
T valueFromJson(Object? obj) => _valueFromJson(obj);
T? valueFromJson(Object? obj) => _valueFromJson(obj);
@override
Object? valueToJson(T val) => _valueToJson(val);
Object? valueToJson(T? val) => _valueToJson(val);
}

View file

@ -10,6 +10,7 @@ export 'src/config.dart';
export 'src/identity.dart';
export 'src/json_tools.dart';
export 'src/memory_tools.dart';
export 'src/persistent_queue_cubit.dart';
export 'src/protobuf_tools.dart';
export 'src/table_db.dart';
export 'src/veilid_log.dart' hide veilidLoggy;