diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index de281fe..72c820e 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -227,7 +227,7 @@ class SingleContactMessagesCubit extends Cubit { } Future _reconcileMessagesInner( - {required DHTShortArrayWrite reconciledMessagesWriter, + {required DHTRandomReadWrite reconciledMessagesWriter, required IList messages}) async { // Ensure remoteMessages is sorted by timestamp final newMessages = messages @@ -236,7 +236,7 @@ class SingleContactMessagesCubit extends Cubit { // Existing messages will always be sorted by timestamp so merging is easy final existingMessages = await reconciledMessagesWriter - .getAllItemsProtobuf(proto.Message.fromBuffer); + .getItemRangeProtobuf(proto.Message.fromBuffer, 0); if (existingMessages == null) { throw Exception( 'Could not load existing reconciled messages at this time'); diff --git a/lib/chat_list/cubits/chat_list_cubit.dart b/lib/chat_list/cubits/chat_list_cubit.dart index 5023d51..9204a0a 100644 --- a/lib/chat_list/cubits/chat_list_cubit.dart +++ b/lib/chat_list/cubits/chat_list_cubit.dart @@ -92,31 +92,29 @@ class ChatListCubit extends DHTShortArrayCubit // Remove Chat from account's list // if this fails, don't keep retrying, user can try again later - final (deletedItem, success) = + final deletedItem = // Ensure followers get their changes before we return await syncFollowers(() => operateWrite((writer) async { if (activeChatCubit.state == remoteConversationRecordKey) { activeChatCubit.setActiveChat(null); } for (var i = 0; i < writer.length; i++) { - final cbuf = await writer.getItem(i); - if (cbuf == null) { + final c = + await writer.getItemProtobuf(proto.Chat.fromBuffer, i); + if (c == null) { throw Exception('Failed to get chat'); } - final c = proto.Chat.fromBuffer(cbuf); if (c.remoteConversationRecordKey == remoteConversationKey) { // Found the right chat - if (await writer.tryRemoveItem(i) != null) { - return c; - } - return null; + await writer.removeItem(i); + return c; } } return null; })); // Since followers are synced, we can safetly remove the reconciled // chat record now - if (success && deletedItem != null) { + if (deletedItem != null) { try { await DHTRecordPool.instance.deleteRecord( deletedItem.reconciledChatRecord.toVeilid().recordKey); diff --git a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart index 8e133b1..afe91c0 100644 --- a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart +++ b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart @@ -177,7 +177,7 @@ class ContactInvitationListCubit _activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey; // Remove ContactInvitationRecord from account's list - final (deletedItem, success) = await operateWrite((writer) async { + final deletedItem = await operateWrite((writer) async { for (var i = 0; i < writer.length; i++) { final item = await writer.getItemProtobuf( proto.ContactInvitationRecord.fromBuffer, i); @@ -186,16 +186,14 @@ class ContactInvitationListCubit } if (item.contactRequestInbox.recordKey.toVeilid() == contactRequestInboxRecordKey) { - if (await writer.tryRemoveItem(i) != null) { - return item; - } - return null; + await writer.removeItem(i); + return item; } } return null; }); - if (success && deletedItem != null) { + if (deletedItem != null) { // Delete the contact request inbox final contactRequestInbox = deletedItem.contactRequestInbox.toVeilid(); await (await pool.openRecordOwned(contactRequestInbox, diff --git a/lib/contacts/cubits/contact_list_cubit.dart b/lib/contacts/cubits/contact_list_cubit.dart index e30c8ed..a139b89 100644 --- a/lib/contacts/cubits/contact_list_cubit.dart +++ b/lib/contacts/cubits/contact_list_cubit.dart @@ -70,7 +70,7 @@ class ContactListCubit extends DHTShortArrayCubit { contact.remoteConversationRecordKey.toVeilid(); // Remove Contact from account's list - final (deletedItem, success) = await operateWrite((writer) async { + final deletedItem = await operateWrite((writer) async { for (var i = 0; i < writer.length; i++) { final item = await writer.getItemProtobuf(proto.Contact.fromBuffer, i); if (item == null) { @@ -78,16 +78,14 @@ class ContactListCubit extends DHTShortArrayCubit { } if (item.remoteConversationRecordKey == contact.remoteConversationRecordKey) { - if (await writer.tryRemoveItem(i) != null) { - return item; - } - return null; + await writer.removeItem(i); + return item; } } return null; }); - if (success && deletedItem != null) { + if (deletedItem != null) { try { // Make a conversation cubit to manipulate the conversation final conversationCubit = ConversationCubit( diff --git a/lib/contacts/cubits/conversation_cubit.dart b/lib/contacts/cubits/conversation_cubit.dart index 253dbba..b4d8ee9 100644 --- a/lib/contacts/cubits/conversation_cubit.dart +++ b/lib/contacts/cubits/conversation_cubit.dart @@ -295,7 +295,7 @@ class ConversationCubit extends Cubit> { debugName: 'ConversationCubit::initLocalMessages::LocalMessages', parent: localConversationKey, crypto: crypto, - smplWriter: writer)) + writer: writer)) .deleteScope((messages) async => await callback(messages)); } diff --git a/packages/veilid_support/build.yaml b/packages/veilid_support/build.yaml new file mode 100644 index 0000000..84fde8c --- /dev/null +++ b/packages/veilid_support/build.yaml @@ -0,0 +1,10 @@ +targets: + $default: + sources: + exclude: + - example/** + builders: + json_serializable: + options: + explicit_to_json: true + field_rename: snake diff --git a/packages/veilid_support/example/integration_test/test_dht_short_array.dart b/packages/veilid_support/example/integration_test/test_dht_short_array.dart index ad3e22a..c2fcc2b 100644 --- a/packages/veilid_support/example/integration_test/test_dht_short_array.dart +++ b/packages/veilid_support/example/integration_test/test_dht_short_array.dart @@ -63,7 +63,7 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => print('adding\n'); { - final (res, ok) = await arr.operateWrite((w) async { + final res = await arr.operateWrite((w) async { for (var n = 0; n < dataset.length; n++) { print('$n '); final success = await w.tryAddItem(dataset[n]); @@ -71,26 +71,28 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => } }); expect(res, isNull); - expect(ok, isTrue); } //print('get all\n'); { - final dataset2 = await arr.operate((r) async => r.getAllItems()); + final dataset2 = await arr.operate((r) async => r.getItemRange(0)); expect(dataset2, equals(dataset)); } + { + final dataset3 = + await arr.operate((r) async => r.getItemRange(64, length: 128)); + expect(dataset3, equals(dataset.sublist(64, 64 + 128))); + } //print('clear\n'); { - final (res, ok) = await arr.operateWrite((w) async => w.tryClear()); - expect(res, isTrue); - expect(ok, isTrue); + await arr.operateWrite((w) async => w.clear()); } //print('get all\n'); { - final dataset3 = await arr.operate((r) async => r.getAllItems()); - expect(dataset3, isEmpty); + final dataset4 = await arr.operate((r) async => r.getItemRange(0)); + expect(dataset4, isEmpty); } await arr.delete(); diff --git a/packages/veilid_support/lib/dht_support/dht_support.dart b/packages/veilid_support/lib/dht_support/dht_support.dart index 869a267..cc2a8be 100644 --- a/packages/veilid_support/lib/dht_support/dht_support.dart +++ b/packages/veilid_support/lib/dht_support/dht_support.dart @@ -2,5 +2,7 @@ library dht_support; +export 'src/dht_log/barrel.dart'; export 'src/dht_record/barrel.dart'; export 'src/dht_short_array/barrel.dart'; +export 'src/interfaces/interfaces.dart'; diff --git a/packages/veilid_support/lib/dht_support/proto/dht.proto b/packages/veilid_support/lib/dht_support/proto/dht.proto index 023c3cf..6796753 100644 --- a/packages/veilid_support/lib/dht_support/proto/dht.proto +++ b/packages/veilid_support/lib/dht_support/proto/dht.proto @@ -23,6 +23,18 @@ message DHTData { uint32 size = 4; } + +// DHTLog - represents a ring buffer of many elements with append/truncate semantics +// Header in subkey 0 of first key follows this structure +message DHTLog { + // Position of the start of the log (oldest items) + uint32 head = 1; + // Position of the end of the log (newest items) + uint32 tail = 2; + // Stride of each segment of the dhtlog + uint32 stride = 3; +} + // DHTShortArray - represents a re-orderable collection of up to 256 individual elements // Header in subkey 0 of first key follows this structure // @@ -50,20 +62,6 @@ message DHTShortArray { // calculated through iteration } -// DHTLog - represents a long ring buffer of elements utilizing a multi-level -// indirection table of DHTShortArrays. - -message DHTLog { - // Keys to concatenate - repeated veilid.TypedKey keys = 1; - // Back link to another DHTLog further back - veilid.TypedKey back = 2; - // Count of subkeys in all keys in this DHTLog - repeated uint32 subkey_counts = 3; - // Total count of subkeys in all keys in this DHTLog including all backlogs - uint32 total_subkeys = 4; -} - // DataReference // Pointer to data somewhere in Veilid // Abstraction over DHTData and BlockStore diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/barrel.dart b/packages/veilid_support/lib/dht_support/src/dht_log/barrel.dart new file mode 100644 index 0000000..18686f2 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_log/barrel.dart @@ -0,0 +1,2 @@ +export 'dht_array.dart'; +export 'dht_array_cubit.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart new file mode 100644 index 0000000..a132bdb --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -0,0 +1,273 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async_tools/async_tools.dart'; +import 'package:collection/collection.dart'; +import 'package:equatable/equatable.dart'; + +import '../../../veilid_support.dart'; +import '../../proto/proto.dart' as proto; +import '../interfaces/dht_append_truncate.dart'; + +part 'dht_log_spine.dart'; +part 'dht_log_read.dart'; +part 'dht_log_append.dart'; + +/////////////////////////////////////////////////////////////////////// + +/// DHTLog is a ring-buffer queue like data structure with the following +/// operations: +/// * Add elements to the tail +/// * Remove elements from the head +/// The structure has a 'spine' record that acts as an indirection table of +/// DHTShortArray record pointers spread over its subkeys. +/// Subkey 0 of the DHTLog is a head subkey that contains housekeeping data: +/// * The head and tail position of the log +/// - subkeyIdx = pos / recordsPerSubkey +/// - recordIdx = pos % recordsPerSubkey +class DHTLog implements DHTOpenable { + //////////////////////////////////////////////////////////////// + // Constructors + + DHTLog._({required _DHTLogSpine spine}) : _spine = spine { + _spine.onUpdatedSpine = () { + _watchController?.sink.add(null); + }; + } + + /// Create a DHTLog + static Future create( + {required String debugName, + int stride = DHTShortArray.maxElements, + VeilidRoutingContext? routingContext, + TypedKey? parent, + DHTRecordCrypto? crypto, + KeyPair? writer}) async { + assert(stride <= DHTShortArray.maxElements, 'stride too long'); + final pool = DHTRecordPool.instance; + + late final DHTRecord spineRecord; + if (writer != null) { + final schema = DHTSchema.smpl( + oCnt: 0, + members: [DHTSchemaMember(mKey: writer.key, mCnt: spineSubkeys + 1)]); + spineRecord = await pool.createRecord( + debugName: debugName, + parent: parent, + routingContext: routingContext, + schema: schema, + crypto: crypto, + writer: writer); + } else { + const schema = DHTSchema.dflt(oCnt: spineSubkeys + 1); + spineRecord = await pool.createRecord( + debugName: debugName, + parent: parent, + routingContext: routingContext, + schema: schema, + crypto: crypto); + } + + try { + final spine = await _DHTLogSpine.create( + spineRecord: spineRecord, segmentStride: stride); + return DHTLog._(spine: spine); + } on Exception catch (_) { + await spineRecord.close(); + await spineRecord.delete(); + rethrow; + } + } + + static Future openRead(TypedKey logRecordKey, + {required String debugName, + VeilidRoutingContext? routingContext, + TypedKey? parent, + DHTRecordCrypto? crypto}) async { + final spineRecord = await DHTRecordPool.instance.openRecordRead( + logRecordKey, + debugName: debugName, + parent: parent, + routingContext: routingContext, + crypto: crypto); + try { + final spine = await _DHTLogSpine.load(spineRecord: spineRecord); + final dhtLog = DHTLog._(spine: spine); + return dhtLog; + } on Exception catch (_) { + await spineRecord.close(); + rethrow; + } + } + + static Future openWrite( + TypedKey logRecordKey, + KeyPair writer, { + required String debugName, + VeilidRoutingContext? routingContext, + TypedKey? parent, + DHTRecordCrypto? crypto, + }) async { + final spineRecord = await DHTRecordPool.instance.openRecordWrite( + logRecordKey, writer, + debugName: debugName, + parent: parent, + routingContext: routingContext, + crypto: crypto); + try { + final spine = await _DHTLogSpine.load(spineRecord: spineRecord); + final dhtLog = DHTLog._(spine: spine); + return dhtLog; + } on Exception catch (_) { + await spineRecord.close(); + rethrow; + } + } + + static Future openOwned( + OwnedDHTRecordPointer ownedLogRecordPointer, { + required String debugName, + required TypedKey parent, + VeilidRoutingContext? routingContext, + DHTRecordCrypto? crypto, + }) => + openWrite( + ownedLogRecordPointer.recordKey, + ownedLogRecordPointer.owner, + debugName: debugName, + routingContext: routingContext, + parent: parent, + crypto: crypto, + ); + + //////////////////////////////////////////////////////////////////////////// + // DHTOpenable + + /// Check if the DHTLog is open + @override + bool get isOpen => _spine.isOpen; + + /// Free all resources for the DHTLog + @override + Future close() async { + if (!isOpen) { + return; + } + await _watchController?.close(); + _watchController = null; + await _spine.close(); + } + + /// Free all resources for the DHTLog and delete it from the DHT + /// Will wait until the short array is closed to delete it + @override + Future delete() async { + await _spine.delete(); + } + + //////////////////////////////////////////////////////////////////////////// + // Public API + + /// Get the record key for this log + TypedKey get recordKey => _spine.recordKey; + + /// Get the record pointer foir this log + OwnedDHTRecordPointer get recordPointer => _spine.recordPointer; + + /// Runs a closure allowing read-only access to the log + Future operate(Future Function(DHTRandomRead) closure) async { + if (!isOpen) { + throw StateError('log is not open"'); + } + + return _spine.operate((spine) async { + final reader = _DHTLogRead._(spine); + return closure(reader); + }); + } + + /// Runs a closure allowing append/truncate access to the log + /// Makes only one attempt to consistently write the changes to the DHT + /// Returns result of the closure if the write could be performed + /// Throws DHTOperateException if the write could not be performed + /// at this time + Future operateAppend( + Future Function(DHTAppendTruncateRandomRead) closure) async { + if (!isOpen) { + throw StateError('log is not open"'); + } + + return _spine.operateAppend((spine) async { + final writer = _DHTLogAppend._(spine); + return closure(writer); + }); + } + + /// Runs a closure allowing append/truncate access to the log + /// Will execute the closure multiple times if a consistent write to the DHT + /// is not achieved. Timeout if specified will be thrown as a + /// TimeoutException. The closure should return true if its changes also + /// succeeded, returning false will trigger another eventual consistency + /// attempt. + Future operateAppendEventual( + Future Function(DHTAppendTruncateRandomRead) closure, + {Duration? timeout}) async { + if (!isOpen) { + throw StateError('log is not open"'); + } + + return _spine.operateAppendEventual((spine) async { + final writer = _DHTLogAppend._(spine); + return closure(writer); + }, timeout: timeout); + } + + /// Listen to and any all changes to the structure of this log + /// regardless of where the changes are coming from + Future> listen( + void Function() onChanged, + ) { + if (!isOpen) { + throw StateError('log is not open"'); + } + + return _listenMutex.protect(() async { + // If don't have a controller yet, set it up + if (_watchController == null) { + // Set up watch requirements + _watchController = StreamController.broadcast(onCancel: () { + // If there are no more listeners then we can get + // rid of the controller and drop our subscriptions + unawaited(_listenMutex.protect(() async { + // Cancel watches of head record + await _spine.cancelWatch(); + _watchController = null; + })); + }); + + // Start watching head subkey of the spine + await _spine.watch(); + } + // Return subscription + return _watchController!.stream.listen((_) => onChanged()); + }); + } + + //////////////////////////////////////////////////////////////// + // Fields + + // 56 subkeys * 512 segments * 36 bytes per typedkey = + // 1032192 bytes per record + // 512*36 = 18432 bytes per subkey + // 28672 shortarrays * 256 elements = 7340032 elements + static const spineSubkeys = 56; + static const segmentsPerSubkey = 512; + + // Internal representation refreshed from spine record + final _DHTLogSpine _spine; + + // Watch mutex to ensure we keep the representation valid + final Mutex _listenMutex = Mutex(); + // Stream of external changes + StreamController? _watchController; +} diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart new file mode 100644 index 0000000..6a172a7 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart @@ -0,0 +1,42 @@ +part of 'dht_log.dart'; + +//////////////////////////////////////////////////////////////////////////// +// Append/truncate implementation + +class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead { + _DHTLogAppend._(super.spine) : super._(); + + @override + Future tryAppendItem(Uint8List value) async { + // Allocate empty index at the end of the list + final endPos = _spine.length; + _spine.allocateTail(1); + final lookup = await _spine.lookupPosition(endPos); + if (lookup == null) { + throw StateError("can't write to dht log"); + } + // Write item to the segment + return lookup.shortArray + .operateWrite((write) async => write.tryWriteItem(lookup.pos, value)); + } + + @override + Future truncate(int count) async { + final len = _spine.length; + if (count > len) { + count = len; + } + if (count == 0) { + return; + } + if (count < 0) { + throw StateError('can not remove negative items'); + } + _spine.releaseHead(count); + } + + @override + Future clear() async { + _spine.releaseHead(_spine.length); + } +} diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart new file mode 100644 index 0000000..a7d5333 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -0,0 +1,119 @@ +import 'dart:async'; + +import 'package:async_tools/async_tools.dart'; +import 'package:bloc/bloc.dart'; +import 'package:bloc_advanced_tools/bloc_advanced_tools.dart'; +import 'package:equatable/equatable.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:meta/meta.dart'; + +import '../../../veilid_support.dart'; + +// xxx paginate and remember to paginate watches (could use short array cubit as a subcubit here?) + +// @immutable +// class DHTArrayElementState extends Equatable { +// const DHTArrayElementState( +// {required this.value, required this.isOffline}); +// final T value; +// final bool isOffline; + +// @override +// List get props => [value, isOffline]; +// } + +// typedef DHTArrayState = AsyncValue>>; +// typedef DHTArrayBusyState = BlocBusyState>; + +// class DHTArrayCubit extends Cubit> +// with BlocBusyWrapper> { +// DHTArrayCubit({ +// required Future Function() open, +// required T Function(List data) decodeElement, +// }) : _decodeElement = decodeElement, +// super(const BlocBusyState(AsyncValue.loading())) { +// _initWait.add(() async { +// // Open DHT record +// _array = await open(); +// _wantsCloseRecord = true; + +// // Make initial state update +// await _refreshNoWait(); +// _subscription = await _array.listen(_update); +// }); +// } + +// Future refresh({bool forceRefresh = false}) async { +// await _initWait(); +// await _refreshNoWait(forceRefresh: forceRefresh); +// } + +// Future _refreshNoWait({bool forceRefresh = false}) async => +// busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh)); + +// Future _refreshInner(void Function(DHTShortArrayState) emit, +// {bool forceRefresh = false}) async { +// try { +// final newState = await _shortArray.operate((reader) async { +// final offlinePositions = await reader.getOfflinePositions(); +// final allItems = (await reader.getAllItems(forceRefresh: forceRefresh)) +// ?.indexed +// .map((x) => DHTShortArrayElementState( +// value: _decodeElement(x.$2), +// isOffline: offlinePositions.contains(x.$1))) +// .toIList(); +// return allItems; +// }); +// if (newState != null) { +// emit(AsyncValue.data(newState)); +// } +// } on Exception catch (e) { +// emit(AsyncValue.error(e)); +// } +// } + +// void _update() { +// // Run at most one background update process +// // Because this is async, we could get an update while we're +// // still processing the last one. Only called after init future has run +// // so we dont have to wait for that here. +// _sspUpdate.busyUpdate>( +// busy, (emit) async => _refreshInner(emit)); +// } + +// @override +// Future close() async { +// await _initWait(); +// await _subscription?.cancel(); +// _subscription = null; +// if (_wantsCloseRecord) { +// await _shortArray.close(); +// } +// await super.close(); +// } + +// Future operate(Future Function(DHTShortArrayRead) closure) async { +// await _initWait(); +// return _shortArray.operate(closure); +// } + +// Future<(R?, bool)> operateWrite( +// Future Function(DHTShortArrayWrite) closure) async { +// await _initWait(); +// return _shortArray.operateWrite(closure); +// } + +// Future operateWriteEventual( +// Future Function(DHTShortArrayWrite) closure, +// {Duration? timeout}) async { +// await _initWait(); +// return _shortArray.operateWriteEventual(closure, timeout: timeout); +// } + +// final WaitSet _initWait = WaitSet(); +// late final DHTShortArray _shortArray; +// final T Function(List data) _decodeElement; +// StreamSubscription? _subscription; +// bool _wantsCloseRecord = false; +// final _sspUpdate = SingleStatelessProcessor(); +// } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart new file mode 100644 index 0000000..0919412 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -0,0 +1,103 @@ +part of 'dht_log.dart'; + +//////////////////////////////////////////////////////////////////////////// +// Reader-only implementation + +class _DHTLogRead implements DHTRandomRead { + _DHTLogRead._(_DHTLogSpine spine) : _spine = spine; + + @override + int get length => _spine.length; + + @override + Future getItem(int pos, {bool forceRefresh = false}) async { + if (pos < 0 || pos >= length) { + throw IndexError.withLength(pos, length); + } + final lookup = await _spine.lookupPosition(pos); + if (lookup == null) { + return null; + } + + return lookup.shortArray.operate( + (read) => read.getItem(lookup.pos, forceRefresh: forceRefresh)); + } + + (int, int) _clampStartLen(int start, int? len) { + len ??= _spine.length; + if (start < 0) { + throw IndexError.withLength(start, _spine.length); + } + if (start > _spine.length) { + throw IndexError.withLength(start, _spine.length); + } + if ((len + start) > _spine.length) { + len = _spine.length - start; + } + return (start, len); + } + + @override + Future?> getItemRange(int start, + {int? length, bool forceRefresh = false}) async { + final out = []; + (start, length) = _clampStartLen(start, length); + + final chunks = Iterable.generate(length).slices(maxDHTConcurrency).map( + (chunk) => chunk + .map((pos) => getItem(pos + start, forceRefresh: forceRefresh))); + + for (final chunk in chunks) { + final elems = await chunk.wait; + if (elems.contains(null)) { + return null; + } + out.addAll(elems.cast()); + } + + return out; + } + + @override + Future> getOfflinePositions() async { + final positionOffline = {}; + + // Iterate positions backward from most recent + for (var pos = _spine.length - 1; pos >= 0; pos--) { + final lookup = await _spine.lookupPosition(pos); + if (lookup == null) { + throw StateError('Unable to look up position'); + } + + // Check each segment for offline positions + var foundOffline = false; + await lookup.shortArray.operate((read) async { + final segmentOffline = await read.getOfflinePositions(); + + // For each shortarray segment go through their segment positions + // in reverse order and see if they are offline + for (var segmentPos = lookup.pos; + segmentPos >= 0 && pos >= 0; + segmentPos--, pos--) { + // If the position in the segment is offline, then + // mark the position in the log as offline + if (segmentOffline.contains(segmentPos)) { + positionOffline.add(pos); + foundOffline = true; + } + } + }); + + // If we found nothing offline in this segment then we can stop + if (!foundOffline) { + break; + } + } + + return positionOffline; + } + + //////////////////////////////////////////////////////////////////////////// + // Fields + final _DHTLogSpine _spine; +} diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart new file mode 100644 index 0000000..76a3f0c --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -0,0 +1,527 @@ +part of 'dht_log.dart'; + +class DHTLogPositionLookup { + const DHTLogPositionLookup({required this.shortArray, required this.pos}); + final DHTShortArray shortArray; + final int pos; +} + +class _DHTLogSegmentLookup extends Equatable { + const _DHTLogSegmentLookup({required this.subkey, required this.segment}); + final int subkey; + final int segment; + + @override + List get props => [subkey, segment]; +} + +class _DHTLogSpine { + _DHTLogSpine._( + {required DHTRecord spineRecord, + required int head, + required int tail, + required int stride}) + : _spineRecord = spineRecord, + _head = head, + _tail = tail, + _segmentStride = stride, + _spineCache = []; + + // Create a new spine record and push it to the network + static Future<_DHTLogSpine> create( + {required DHTRecord spineRecord, required int segmentStride}) async { + // Construct new spinehead + final spine = _DHTLogSpine._( + spineRecord: spineRecord, head: 0, tail: 0, stride: segmentStride); + + // Write new spine head record to the network + await spine.operate((spine) async { + final success = await spine.writeSpineHead(); + assert(success, 'false return should never happen on create'); + }); + + return spine; + } + + // Pull the latest or updated copy of the spine head record from the network + static Future<_DHTLogSpine> load({required DHTRecord spineRecord}) async { + // Get an updated spine head record copy if one exists + final spineHead = await spineRecord.getProtobuf(proto.DHTLog.fromBuffer, + subkey: 0, refreshMode: DHTRecordRefreshMode.refresh); + if (spineHead == null) { + throw StateError('spine head missing during refresh'); + } + return _DHTLogSpine._( + spineRecord: spineRecord, + head: spineHead.head, + tail: spineHead.tail, + stride: spineHead.stride); + } + + proto.DHTLog _toProto() { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + final logHead = proto.DHTLog() + ..head = _head + ..tail = _tail + ..stride = _segmentStride; + return logHead; + } + + Future close() async { + await _spineMutex.protect(() async { + if (!isOpen) { + return; + } + final futures = >[_spineRecord.close()]; + for (final (_, sc) in _spineCache) { + futures.add(sc.close()); + } + await Future.wait(futures); + }); + } + + Future delete() async { + await _spineMutex.protect(() async { + final pool = DHTRecordPool.instance; + final futures = >[pool.deleteRecord(_spineRecord.key)]; + for (final (_, sc) in _spineCache) { + futures.add(sc.delete()); + } + await Future.wait(futures); + }); + } + + Future operate(Future Function(_DHTLogSpine) closure) async => + // ignore: prefer_expression_function_bodies + _spineMutex.protect(() async { + return closure(this); + }); + + Future operateAppend(Future Function(_DHTLogSpine) closure) async => + _spineMutex.protect(() async { + final oldHead = _head; + final oldTail = _tail; + try { + final out = await closure(this); + // Write head assuming it has been changed + if (!await writeSpineHead()) { + // Failed to write head means head got overwritten so write should + // be considered failed + throw DHTExceptionTryAgain(); + } + + onUpdatedSpine?.call(); + return out; + } on Exception { + // Exception means state needs to be reverted + _head = oldHead; + _tail = oldTail; + rethrow; + } + }); + + Future operateAppendEventual( + Future Function(_DHTLogSpine) closure, + {Duration? timeout}) async { + final timeoutTs = timeout == null + ? null + : Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout)); + + await _spineMutex.protect(() async { + late int oldHead; + late int oldTail; + + try { + // Iterate until we have a successful element and head write + + do { + // Save off old values each pass of writeSpineHead because the head + // will have changed + oldHead = _head; + oldTail = _tail; + + // Try to do the element write + while (true) { + if (timeoutTs != null) { + final now = Veilid.instance.now(); + if (now >= timeoutTs) { + throw TimeoutException('timeout reached'); + } + } + if (await closure(this)) { + break; + } + // Failed to write in closure resets state + _head = oldHead; + _tail = oldTail; + } + + // Try to do the head write + } while (!await writeSpineHead()); + + onUpdatedSpine?.call(); + } on Exception { + // Exception means state needs to be reverted + _head = oldHead; + _tail = oldTail; + rethrow; + } + }); + } + + /// Serialize and write out the current spine head subkey, possibly updating + /// it if a newer copy is available online. Returns true if the write was + /// successful + Future writeSpineHead() async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + final headBuffer = _toProto().writeToBuffer(); + + final existingData = await _spineRecord.tryWriteBytes(headBuffer); + if (existingData != null) { + // Head write failed, incorporate update + await _updateHead(proto.DHTLog.fromBuffer(existingData)); + return false; + } + + return true; + } + + /// Validate a new spine head subkey that has come in from the network + Future _updateHead(proto.DHTLog spineHead) async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + _head = spineHead.head; + _tail = spineHead.tail; + } + + ///////////////////////////////////////////////////////////////////////////// + // Spine element management + + static final Uint8List _emptySegmentKey = + Uint8List.fromList(List.filled(TypedKey.decodedLength(), 0)); + static Uint8List _makeEmptySubkey() => Uint8List.fromList(List.filled( + DHTLog.segmentsPerSubkey * TypedKey.decodedLength(), 0)); + + static TypedKey? _getSegmentKey(Uint8List subkeyData, int segment) { + final decodedLength = TypedKey.decodedLength(); + final segmentKeyBytes = subkeyData.sublist( + decodedLength * segment, (decodedLength + 1) * segment); + if (segmentKeyBytes.equals(_emptySegmentKey)) { + return null; + } + return TypedKey.fromBytes(segmentKeyBytes); + } + + static void _setSegmentKey( + Uint8List subkeyData, int segment, TypedKey? segmentKey) { + final decodedLength = TypedKey.decodedLength(); + late final Uint8List segmentKeyBytes; + if (segmentKey == null) { + segmentKeyBytes = _emptySegmentKey; + } else { + segmentKeyBytes = segmentKey.decode(); + } + subkeyData.setRange(decodedLength * segment, (decodedLength + 1) * segment, + segmentKeyBytes); + } + + Future _getOrCreateSegmentInner(int segmentNumber) async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + assert(_spineRecord.writer != null, 'should be writable'); + + // Lookup what subkey and segment subrange has this position's segment + // shortarray + final l = lookupSegment(segmentNumber); + final subkey = l.subkey; + final segment = l.segment; + + var subkeyData = await _spineRecord.get(subkey: subkey); + subkeyData ??= _makeEmptySubkey(); + while (true) { + final segmentKey = _getSegmentKey(subkeyData!, segment); + if (segmentKey == null) { + // Create a shortarray segment + final segmentRec = await DHTShortArray.create( + debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment', + stride: _segmentStride, + crypto: _spineRecord.crypto, + parent: _spineRecord.key, + routingContext: _spineRecord.routingContext, + writer: _spineRecord.writer, + ); + var success = false; + try { + // Write it back to the spine record + _setSegmentKey(subkeyData, segment, segmentRec.recordKey); + subkeyData = + await _spineRecord.tryWriteBytes(subkeyData, subkey: subkey); + // If the write was successful then we're done + if (subkeyData == null) { + // Return it + success = true; + return segmentRec; + } + } finally { + if (!success) { + await segmentRec.close(); + await segmentRec.delete(); + } + } + } else { + // Open a shortarray segment + final segmentRec = await DHTShortArray.openWrite( + segmentKey, + _spineRecord.writer!, + debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment', + crypto: _spineRecord.crypto, + parent: _spineRecord.key, + routingContext: _spineRecord.routingContext, + ); + return segmentRec; + } + // Loop if we need to try again with the new data from the network + } + } + + Future _getSegmentInner(int segmentNumber) async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + // Lookup what subkey and segment subrange has this position's segment + // shortarray + final l = lookupSegment(segmentNumber); + final subkey = l.subkey; + final segment = l.segment; + + final subkeyData = await _spineRecord.get(subkey: subkey); + if (subkeyData == null) { + return null; + } + final segmentKey = _getSegmentKey(subkeyData, segment); + if (segmentKey == null) { + return null; + } + + // Open a shortarray segment + final segmentRec = await DHTShortArray.openRead( + segmentKey, + debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment', + crypto: _spineRecord.crypto, + parent: _spineRecord.key, + routingContext: _spineRecord.routingContext, + ); + return segmentRec; + } + + Future getOrCreateSegment(int segmentNumber) async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + // See if we already have this in the cache + for (var i = 0; i < _spineCache.length; i++) { + if (_spineCache[i].$1 == segmentNumber) { + // Touch the element + final x = _spineCache.removeAt(i); + _spineCache.add(x); + // Return the shortarray for this position + return x.$2; + } + } + + // If we don't have it in the cache, get/create it and then cache it + final segment = await _getOrCreateSegmentInner(segmentNumber); + _spineCache.add((segmentNumber, segment)); + if (_spineCache.length > _spineCacheLength) { + // Trim the LRU cache + _spineCache.removeAt(0); + } + return segment; + } + + Future getSegment(int segmentNumber) async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + // See if we already have this in the cache + for (var i = 0; i < _spineCache.length; i++) { + if (_spineCache[i].$1 == segmentNumber) { + // Touch the element + final x = _spineCache.removeAt(i); + _spineCache.add(x); + // Return the shortarray for this position + return x.$2; + } + } + + // If we don't have it in the cache, get it and then cache it + final segment = await _getSegmentInner(segmentNumber); + if (segment == null) { + return null; + } + _spineCache.add((segmentNumber, segment)); + if (_spineCache.length > _spineCacheLength) { + // Trim the LRU cache + _spineCache.removeAt(0); + } + return segment; + } + + _DHTLogSegmentLookup lookupSegment(int segmentNumber) { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + if (segmentNumber < 0) { + throw IndexError.withLength( + segmentNumber, DHTLog.spineSubkeys * DHTLog.segmentsPerSubkey); + } + final subkey = segmentNumber ~/ DHTLog.segmentsPerSubkey; + if (subkey >= DHTLog.spineSubkeys) { + throw IndexError.withLength( + segmentNumber, DHTLog.spineSubkeys * DHTLog.segmentsPerSubkey); + } + final segment = segmentNumber % DHTLog.segmentsPerSubkey; + return _DHTLogSegmentLookup(subkey: subkey + 1, segment: segment); + } + + /////////////////////////////////////////// + // API for public interfaces + + Future lookupPosition(int pos) async { + assert(_spineMutex.isLocked, 'should be locked'); + + // Check if our position is in bounds + final endPos = length; + if (pos < 0 || pos >= endPos) { + throw IndexError.withLength(pos, endPos); + } + + // Calculate absolute position, ring-buffer style + final absolutePosition = (_head + pos) % _positionLimit; + + // Determine the segment number and position within the segment + final segmentNumber = absolutePosition ~/ DHTShortArray.maxElements; + final segmentPos = absolutePosition % DHTShortArray.maxElements; + + // Get the segment shortArray + final shortArray = (_spineRecord.writer == null) + ? await getSegment(segmentNumber) + : await getOrCreateSegment(segmentNumber); + if (shortArray == null) { + return null; + } + return DHTLogPositionLookup(shortArray: shortArray, pos: segmentPos); + } + + void allocateTail(int count) { + assert(_spineMutex.isLocked, 'should be locked'); + + final currentLength = length; + if (count <= 0) { + throw StateError('count should be > 0'); + } + if (currentLength + count >= _positionLimit) { + throw StateError('ring buffer overflow'); + } + + _tail = (_tail + count) % _positionLimit; + } + + void releaseHead(int count) { + assert(_spineMutex.isLocked, 'should be locked'); + + final currentLength = length; + if (count <= 0) { + throw StateError('count should be > 0'); + } + if (count > currentLength) { + throw StateError('ring buffer underflow'); + } + + _head = (_head + count) % _positionLimit; + } + + ///////////////////////////////////////////////////////////////////////////// + // Watch For Updates + + // Watch head for changes + Future watch() async { + // This will update any existing watches if necessary + try { + await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]); + + // Update changes to the head record + // Don't watch for local changes because this class already handles + // notifying listeners and knows when it makes local changes + _subscription ??= + await _spineRecord.listen(localChanges: false, _onSpineChanged); + } on Exception { + // If anything fails, try to cancel the watches + await cancelWatch(); + rethrow; + } + } + + // Stop watching for changes to head and linked records + Future cancelWatch() async { + await _spineRecord.cancelWatch(); + await _subscription?.cancel(); + _subscription = null; + } + + // Called when the log changes online and we find out from a watch + // but not when we make a change locally + Future _onSpineChanged( + DHTRecord record, Uint8List? data, List subkeys) async { + // If head record subkey zero changes, then the layout + // of the dhtshortarray has changed + if (data == null) { + throw StateError('spine head changed without data'); + } + if (record.key != _spineRecord.key || + subkeys.length != 1 || + subkeys[0] != ValueSubkeyRange.single(0)) { + throw StateError('watch returning wrong subkey range'); + } + + // Decode updated head + final headData = proto.DHTLog.fromBuffer(data); + + // Then update the head record + await _spineMutex.protect(() async { + await _updateHead(headData); + onUpdatedSpine?.call(); + }); + } + + //////////////////////////////////////////////////////////////////////////// + + TypedKey get recordKey => _spineRecord.key; + OwnedDHTRecordPointer get recordPointer => _spineRecord.ownedDHTRecordPointer; + int get length => + (_tail < _head) ? (_positionLimit - _head) + _tail : _tail - _head; + bool get isOpen => _spineRecord.isOpen; + + static const _positionLimit = DHTLog.segmentsPerSubkey * + DHTLog.spineSubkeys * + DHTShortArray.maxElements; + + // Spine head mutex to ensure we keep the representation valid + final Mutex _spineMutex = Mutex(); + // Subscription to head record internal changes + StreamSubscription? _subscription; + // Notify closure for external spine head changes + void Function()? onUpdatedSpine; + + // Spine DHT record + final DHTRecord _spineRecord; + + // Position of the start of the log (oldest items) + int _head; + // Position of the end of the log (newest items) + int _tail; + + // LRU cache of DHT spine elements accessed recently + // Pair of position and associated shortarray segment + final List<(int, DHTShortArray)> _spineCache; + static const int _spineCacheLength = 3; + // Segment stride to use for spine elements + final int _segmentStride; +} diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart index 1cf97d5..0b4e0b6 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart @@ -38,7 +38,8 @@ class DefaultDHTRecordCubit extends DHTRecordCubit { final Uint8List data; final firstSubkey = subkeys.firstOrNull!.low; if (firstSubkey != defaultSubkey || updatedata == null) { - final maybeData = await record.get(forceRefresh: true); + final maybeData = + await record.get(refreshMode: DHTRecordRefreshMode.refresh); if (maybeData == null) { return null; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index ccb09f8..3d625f8 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -13,9 +13,22 @@ class DHTRecordWatchChange extends Equatable { List get props => [local, data, subkeys]; } +/// Refresh mode for DHT record 'get' +enum DHTRecordRefreshMode { + /// Return existing subkey values if they exist locally already + existing, + + /// Always check the network for a newer subkey value + refresh, + + /// Always check the network for a newer subkey value but only + /// return that value if its sequence number is newer than the local value + refreshOnlyUpdates, +} + ///////////////////////////////////////////////// -class DHTRecord { +class DHTRecord implements DHTOpenable { DHTRecord._( {required VeilidRoutingContext routingContext, required SharedDHTRecordData sharedDHTRecordData, @@ -30,20 +43,33 @@ class DHTRecord { _open = true, _sharedDHTRecordData = sharedDHTRecordData; - final SharedDHTRecordData _sharedDHTRecordData; - final VeilidRoutingContext _routingContext; - final int _defaultSubkey; - final KeyPair? _writer; - final DHTRecordCrypto _crypto; - final String debugName; + //////////////////////////////////////////////////////////////////////////// + // DHTOpenable - bool _open; - @internal - StreamController? watchController; - @internal - WatchState? watchState; + /// Check if the DHTRecord is open + @override + bool get isOpen => _open; - int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey; + /// Free all resources for the DHTRecord + @override + Future close() async { + if (!_open) { + return; + } + await watchController?.close(); + await DHTRecordPool.instance._recordClosed(this); + _open = false; + } + + /// Free all resources for the DHTRecord and delete it from the DHT + /// Will wait until the record is closed to delete it + @override + Future delete() async { + await DHTRecordPool.instance.deleteRecord(key); + } + + //////////////////////////////////////////////////////////////////////////// + // Public API VeilidRoutingContext get routingContext => _routingContext; TypedKey get key => _sharedDHTRecordData.recordDescriptor.key; @@ -57,64 +83,30 @@ class DHTRecord { DHTRecordCrypto get crypto => _crypto; OwnedDHTRecordPointer get ownedDHTRecordPointer => OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!); - bool get isOpen => _open; - - Future close() async { - if (!_open) { - return; - } - await watchController?.close(); - await DHTRecordPool.instance._recordClosed(this); - _open = false; - } - - Future scope(Future Function(DHTRecord) scopeFunction) async { - try { - return await scopeFunction(this); - } finally { - await close(); - } - } - - Future deleteScope(Future Function(DHTRecord) scopeFunction) async { - try { - final out = await scopeFunction(this); - if (_open) { - await close(); - } - return out; - } on Exception catch (_) { - if (_open) { - await close(); - } - await DHTRecordPool.instance.deleteRecord(key); - rethrow; - } - } - - Future maybeDeleteScope( - bool delete, Future Function(DHTRecord) scopeFunction) async { - if (delete) { - return deleteScope(scopeFunction); - } else { - return scope(scopeFunction); - } - } + int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey; + /// Get a subkey value from this record. + /// Returns the most recent value data for this subkey or null if this subkey + /// has not yet been written to. + /// * 'refreshMode' determines whether or not to return a locally existing + /// value or always check the network + /// * 'outSeqNum' optionally returns the sequence number of the value being + /// returned if one was returned. Future get( {int subkey = -1, DHTRecordCrypto? crypto, - bool forceRefresh = false, - bool onlyUpdates = false, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing, Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); final valueData = await _routingContext.getDHTValue(key, subkey, - forceRefresh: forceRefresh); + forceRefresh: refreshMode != DHTRecordRefreshMode.existing); if (valueData == null) { return null; } final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; - if (onlyUpdates && lastSeq != null && valueData.seq <= lastSeq) { + if (refreshMode == DHTRecordRefreshMode.refreshOnlyUpdates && + lastSeq != null && + valueData.seq <= lastSeq) { return null; } final out = (crypto ?? _crypto).decrypt(valueData.data, subkey); @@ -125,17 +117,23 @@ class DHTRecord { return out; } + /// Get a subkey value from this record. + /// Process the record returned with a JSON unmarshal function 'fromJson'. + /// Returns the most recent value data for this subkey or null if this subkey + /// has not yet been written to. + /// * 'refreshMode' determines whether or not to return a locally existing + /// value or always check the network + /// * 'outSeqNum' optionally returns the sequence number of the value being + /// returned if one was returned. Future getJson(T Function(dynamic) fromJson, {int subkey = -1, DHTRecordCrypto? crypto, - bool forceRefresh = false, - bool onlyUpdates = false, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing, Output? outSeqNum}) async { final data = await get( subkey: subkey, crypto: crypto, - forceRefresh: forceRefresh, - onlyUpdates: onlyUpdates, + refreshMode: refreshMode, outSeqNum: outSeqNum); if (data == null) { return null; @@ -143,18 +141,25 @@ class DHTRecord { return jsonDecodeBytes(fromJson, data); } + /// Get a subkey value from this record. + /// Process the record returned with a protobuf unmarshal + /// function 'fromBuffer'. + /// Returns the most recent value data for this subkey or null if this subkey + /// has not yet been written to. + /// * 'refreshMode' determines whether or not to return a locally existing + /// value or always check the network + /// * 'outSeqNum' optionally returns the sequence number of the value being + /// returned if one was returned. Future getProtobuf( T Function(List i) fromBuffer, {int subkey = -1, DHTRecordCrypto? crypto, - bool forceRefresh = false, - bool onlyUpdates = false, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing, Output? outSeqNum}) async { final data = await get( subkey: subkey, crypto: crypto, - forceRefresh: forceRefresh, - onlyUpdates: onlyUpdates, + refreshMode: refreshMode, outSeqNum: outSeqNum); if (data == null) { return null; @@ -162,6 +167,9 @@ class DHTRecord { return fromBuffer(data.toList()); } + /// Attempt to write a byte buffer to a DHTRecord subkey + /// If a newer value was found on the network, it is returned + /// If the value was succesfully written, null is returned Future tryWriteBytes(Uint8List newValue, {int subkey = -1, DHTRecordCrypto? crypto, @@ -211,6 +219,9 @@ class DHTRecord { return decryptedNewValue; } + /// Attempt to write a byte buffer to a DHTRecord subkey + /// If a newer value was found on the network, another attempt + /// will be made to write the subkey until this succeeds Future eventualWriteBytes(Uint8List newValue, {int subkey = -1, DHTRecordCrypto? crypto, @@ -256,6 +267,11 @@ class DHTRecord { } } + /// Attempt to write a byte buffer to a DHTRecord subkey + /// If a newer value was found on the network, another attempt + /// will be made to write the subkey until this succeeds + /// Each attempt to write the value calls an update function with the + /// old value to determine what new value should be attempted for that write. Future eventualUpdateBytes( Future Function(Uint8List? oldValue) update, {int subkey = -1, @@ -281,6 +297,7 @@ class DHTRecord { } while (oldValue != null); } + /// Like 'tryWriteBytes' but with JSON marshal/unmarshal of the value Future tryWriteJson(T Function(dynamic) fromJson, T newValue, {int subkey = -1, DHTRecordCrypto? crypto, @@ -298,6 +315,7 @@ class DHTRecord { return jsonDecodeBytes(fromJson, out); }); + /// Like 'tryWriteBytes' but with protobuf marshal/unmarshal of the value Future tryWriteProtobuf( T Function(List) fromBuffer, T newValue, {int subkey = -1, @@ -316,6 +334,7 @@ class DHTRecord { return fromBuffer(out); }); + /// Like 'eventualWriteBytes' but with JSON marshal/unmarshal of the value Future eventualWriteJson(T newValue, {int subkey = -1, DHTRecordCrypto? crypto, @@ -324,6 +343,7 @@ class DHTRecord { eventualWriteBytes(jsonEncodeBytes(newValue), subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); + /// Like 'eventualWriteBytes' but with protobuf marshal/unmarshal of the value Future eventualWriteProtobuf(T newValue, {int subkey = -1, DHTRecordCrypto? crypto, @@ -332,6 +352,7 @@ class DHTRecord { eventualWriteBytes(newValue.writeToBuffer(), subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); + /// Like 'eventualUpdateBytes' but with JSON marshal/unmarshal of the value Future eventualUpdateJson( T Function(dynamic) fromJson, Future Function(T?) update, {int subkey = -1, @@ -341,6 +362,7 @@ class DHTRecord { eventualUpdateBytes(jsonUpdate(fromJson, update), subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); + /// Like 'eventualUpdateBytes' but with protobuf marshal/unmarshal of the value Future eventualUpdateProtobuf( T Function(List) fromBuffer, Future Function(T?) update, {int subkey = -1, @@ -350,6 +372,8 @@ class DHTRecord { eventualUpdateBytes(protobufUpdate(fromBuffer, update), subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); + /// Watch a subkey range of this DHT record for changes + /// Takes effect on the next DHTRecordPool tick Future watch( {List? subkeys, Timestamp? expiration, @@ -363,6 +387,13 @@ class DHTRecord { } } + /// Register a callback for changes made on this this DHT record. + /// You must 'watch' the record as well as listen to it in order for this + /// call back to be called. + /// * 'localChanges' also enables calling the callback if changed are made + /// locally, otherwise only changes seen from the network itself are + /// reported + /// Future> listen( Future Function( DHTRecord record, Uint8List? data, List subkeys) @@ -405,6 +436,8 @@ class DHTRecord { }); } + /// Stop watching this record for changes + /// Takes effect on the next DHTRecordPool tick Future cancelWatch() async { // Tear down watch requirements if (watchState != null) { @@ -413,11 +446,15 @@ class DHTRecord { } } + /// Return the inspection state of a set of subkeys of the DHTRecord + /// See Veilid's 'inspectDHTRecord' call for details on how this works Future inspect( {List? subkeys, DHTReportScope scope = DHTReportScope.local}) => _routingContext.inspectDHTRecord(key, subkeys: subkeys, scope: scope); + ////////////////////////////////////////////////////////////////////////// + void _addValueChange( {required bool local, required Uint8List? data, @@ -458,4 +495,19 @@ class DHTRecord { _addValueChange( local: false, data: update.value?.data, subkeys: update.subkeys); } + + ////////////////////////////////////////////////////////////// + + final SharedDHTRecordData _sharedDHTRecordData; + final VeilidRoutingContext _routingContext; + final int _defaultSubkey; + final KeyPair? _writer; + final DHTRecordCrypto _crypto; + final String debugName; + + bool _open; + @internal + StreamController? watchController; + @internal + WatchState? watchState; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart index 15919f9..8616658 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart @@ -93,7 +93,7 @@ class DHTRecordCubit extends Cubit> { for (final skr in subkeys) { for (var sk = skr.low; sk <= skr.high; sk++) { final data = await _record.get( - subkey: sk, forceRefresh: true, onlyUpdates: true); + subkey: sk, refreshMode: DHTRecordRefreshMode.refreshOnlyUpdates); if (data != null) { final newState = await _stateFunction(_record, updateSubkeys, data); if (newState != null) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index f15987a..a305e22 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -3,7 +3,6 @@ import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; import 'package:collection/collection.dart'; -import 'package:protobuf/protobuf.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; @@ -14,7 +13,7 @@ part 'dht_short_array_write.dart'; /////////////////////////////////////////////////////////////////////// -class DHTShortArray { +class DHTShortArray implements DHTOpenable { //////////////////////////////////////////////////////////////// // Constructors @@ -34,22 +33,22 @@ class DHTShortArray { VeilidRoutingContext? routingContext, TypedKey? parent, DHTRecordCrypto? crypto, - KeyPair? smplWriter}) async { + KeyPair? writer}) async { assert(stride <= maxElements, 'stride too long'); final pool = DHTRecordPool.instance; late final DHTRecord dhtRecord; - if (smplWriter != null) { + if (writer != null) { final schema = DHTSchema.smpl( oCnt: 0, - members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]); + members: [DHTSchemaMember(mKey: writer.key, mCnt: stride + 1)]); dhtRecord = await pool.createRecord( debugName: debugName, parent: parent, routingContext: routingContext, schema: schema, crypto: crypto, - writer: smplWriter); + writer: writer); } else { final schema = DHTSchema.dflt(oCnt: stride + 1); dhtRecord = await pool.createRecord( @@ -120,15 +119,15 @@ class DHTShortArray { } static Future openOwned( - OwnedDHTRecordPointer ownedDHTRecordPointer, { + OwnedDHTRecordPointer ownedShortArrayRecordPointer, { required String debugName, required TypedKey parent, VeilidRoutingContext? routingContext, DHTRecordCrypto? crypto, }) => openWrite( - ownedDHTRecordPointer.recordKey, - ownedDHTRecordPointer.owner, + ownedShortArrayRecordPointer.recordKey, + ownedShortArrayRecordPointer.owner, debugName: debugName, routingContext: routingContext, parent: parent, @@ -136,18 +135,14 @@ class DHTShortArray { ); //////////////////////////////////////////////////////////////////////////// - // Public API - - /// Get the record key for this shortarray - TypedKey get recordKey => _head.recordKey; - - /// Get the record pointer foir this shortarray - OwnedDHTRecordPointer get recordPointer => _head.recordPointer; + // DHTOpenable /// Check if the shortarray is open + @override bool get isOpen => _head.isOpen; /// Free all resources for the DHTShortArray + @override Future close() async { if (!isOpen) { return; @@ -159,44 +154,22 @@ class DHTShortArray { /// Free all resources for the DHTShortArray and delete it from the DHT /// Will wait until the short array is closed to delete it + @override Future delete() async { await _head.delete(); } - /// Runs a closure that guarantees the DHTShortArray - /// will be closed upon exit, even if an uncaught exception is thrown - Future scope(Future Function(DHTShortArray) scopeFunction) async { - if (!isOpen) { - throw StateError('short array is not open"'); - } - try { - return await scopeFunction(this); - } finally { - await close(); - } - } + //////////////////////////////////////////////////////////////////////////// + // Public API - /// Runs a closure that guarantees the DHTShortArray - /// will be closed upon exit, and deleted if an an - /// uncaught exception is thrown - Future deleteScope( - Future Function(DHTShortArray) scopeFunction) async { - if (!isOpen) { - throw StateError('short array is not open"'); - } + /// Get the record key for this shortarray + TypedKey get recordKey => _head.recordKey; - try { - final out = await scopeFunction(this); - await close(); - return out; - } on Exception catch (_) { - await delete(); - rethrow; - } - } + /// Get the record pointer foir this shortarray + OwnedDHTRecordPointer get recordPointer => _head.recordPointer; /// Runs a closure allowing read-only access to the shortarray - Future operate(Future Function(DHTShortArrayRead) closure) async { + Future operate(Future Function(DHTRandomRead) closure) async { if (!isOpen) { throw StateError('short array is not open"'); } @@ -209,14 +182,19 @@ class DHTShortArray { /// Runs a closure allowing read-write access to the shortarray /// Makes only one attempt to consistently write the changes to the DHT - /// Returns (result, true) of the closure if the write could be performed - /// Returns (null, false) if the write could not be performed at this time - Future<(T?, bool)> operateWrite( - Future Function(DHTShortArrayWrite) closure) async => - _head.operateWrite((head) async { - final writer = _DHTShortArrayWrite._(head); - return closure(writer); - }); + /// Returns result of the closure if the write could be performed + /// Throws DHTOperateException if the write could not be performed at this time + Future operateWrite( + Future Function(DHTRandomReadWrite) closure) async { + if (!isOpen) { + throw StateError('short array is not open"'); + } + + return _head.operateWrite((head) async { + final writer = _DHTShortArrayWrite._(head); + return closure(writer); + }); + } /// Runs a closure allowing read-write access to the shortarray /// Will execute the closure multiple times if a consistent write to the DHT @@ -225,7 +203,7 @@ class DHTShortArray { /// succeeded, returning false will trigger another eventual consistency /// attempt. Future operateWriteEventual( - Future Function(DHTShortArrayWrite) closure, + Future Function(DHTRandomReadWrite) closure, {Duration? timeout}) async { if (!isOpen) { throw StateError('short array is not open"'); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart index 7465715..e0b2504 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart @@ -41,19 +41,6 @@ class DHTShortArrayCubit extends Cubit> }); } - // DHTShortArrayCubit.value({ - // required DHTShortArray shortArray, - // required T Function(List data) decodeElement, - // }) : _shortArray = shortArray, - // _decodeElement = decodeElement, - // super(const BlocBusyState(AsyncValue.loading())) { - // _initFuture = Future(() async { - // // Make initial state update - // unawaited(_refreshNoWait()); - // _subscription = await shortArray.listen(_update); - // }); - // } - Future refresh({bool forceRefresh = false}) async { await _initWait(); await _refreshNoWait(forceRefresh: forceRefresh); @@ -67,12 +54,13 @@ class DHTShortArrayCubit extends Cubit> try { final newState = await _shortArray.operate((reader) async { final offlinePositions = await reader.getOfflinePositions(); - final allItems = (await reader.getAllItems(forceRefresh: forceRefresh)) - ?.indexed - .map((x) => DHTShortArrayElementState( - value: _decodeElement(x.$2), - isOffline: offlinePositions.contains(x.$1))) - .toIList(); + final allItems = + (await reader.getItemRange(0, forceRefresh: forceRefresh)) + ?.indexed + .map((x) => DHTShortArrayElementState( + value: _decodeElement(x.$2), + isOffline: offlinePositions.contains(x.$1))) + .toIList(); return allItems; }); if (newState != null) { @@ -103,19 +91,19 @@ class DHTShortArrayCubit extends Cubit> await super.close(); } - Future operate(Future Function(DHTShortArrayRead) closure) async { + Future operate(Future Function(DHTRandomRead) closure) async { await _initWait(); return _shortArray.operate(closure); } - Future<(R?, bool)> operateWrite( - Future Function(DHTShortArrayWrite) closure) async { + Future operateWrite( + Future Function(DHTRandomReadWrite) closure) async { await _initWait(); return _shortArray.operateWrite(closure); } Future operateWriteEventual( - Future Function(DHTShortArrayWrite) closure, + Future Function(DHTRandomReadWrite) closure, {Duration? timeout}) async { await _initWait(); return _shortArray.operateWriteEventual(closure, timeout: timeout); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 6da7791..0a2b7d2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -82,8 +82,8 @@ class _DHTShortArrayHead { return closure(this); }); - Future<(T?, bool)> operateWrite( - Future Function(_DHTShortArrayHead) closure) async => + Future operateWrite( + Future Function(_DHTShortArrayHead) closure) async => _headMutex.protect(() async { final oldLinkedRecords = List.of(_linkedRecords); final oldIndex = List.of(_index); @@ -95,11 +95,11 @@ class _DHTShortArrayHead { if (!await _writeHead()) { // Failed to write head means head got overwritten so write should // be considered failed - return (null, false); + throw DHTExceptionTryAgain(); } onUpdatedHead?.call(); - return (out, true); + return out; } on Exception { // Exception means state needs to be reverted _linkedRecords = oldLinkedRecords; @@ -249,22 +249,15 @@ class _DHTShortArrayHead { } // Pull the latest or updated copy of the head record from the network - Future _loadHead( - {bool forceRefresh = true, bool onlyUpdates = false}) async { + Future _loadHead() async { // Get an updated head record copy if one exists final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, - subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); + subkey: 0, refreshMode: DHTRecordRefreshMode.refresh); if (head == null) { - if (onlyUpdates) { - // No update - return false; - } - throw StateError('head missing during refresh'); + throw StateError('shortarray head missing during refresh'); } await _updateHead(head); - - return true; } ///////////////////////////////////////////////////////////////////////////// diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 342e67a..88cefde 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -1,70 +1,14 @@ part of 'dht_short_array.dart'; -//////////////////////////////////////////////////////////////////////////// -// Reader interface -abstract class DHTShortArrayRead { - /// Returns the number of elements in the DHTShortArray - int get length; - - /// Return the item at position 'pos' in the DHTShortArray. If 'forceRefresh' - /// is specified, the network will always be checked for newer values - /// rather than returning the existing locally stored copy of the elements. - Future getItem(int pos, {bool forceRefresh = false}); - - /// Return a list of all of the items in the DHTShortArray. If 'forceRefresh' - /// is specified, the network will always be checked for newer values - /// rather than returning the existing locally stored copy of the elements. - Future?> getAllItems({bool forceRefresh = false}); - - /// Get a list of the positions that were written offline and not flushed yet - Future> getOfflinePositions(); -} - -extension DHTShortArrayReadExt on DHTShortArrayRead { - /// Convenience function: - /// Like getItem but also parses the returned element as JSON - Future getItemJson(T Function(dynamic) fromJson, int pos, - {bool forceRefresh = false}) => - getItem(pos, forceRefresh: forceRefresh) - .then((out) => jsonDecodeOptBytes(fromJson, out)); - - /// Convenience function: - /// Like getAllItems but also parses the returned elements as JSON - Future?> getAllItemsJson(T Function(dynamic) fromJson, - {bool forceRefresh = false}) => - getAllItems(forceRefresh: forceRefresh) - .then((out) => out?.map(fromJson).toList()); - - /// Convenience function: - /// Like getItem but also parses the returned element as a protobuf object - Future getItemProtobuf( - T Function(List) fromBuffer, int pos, - {bool forceRefresh = false}) => - getItem(pos, forceRefresh: forceRefresh) - .then((out) => (out == null) ? null : fromBuffer(out)); - - /// Convenience function: - /// Like getAllItems but also parses the returned elements as protobuf objects - Future?> getAllItemsProtobuf( - T Function(List) fromBuffer, - {bool forceRefresh = false}) => - getAllItems(forceRefresh: forceRefresh) - .then((out) => out?.map(fromBuffer).toList()); -} - //////////////////////////////////////////////////////////////////////////// // Reader-only implementation -class _DHTShortArrayRead implements DHTShortArrayRead { +class _DHTShortArrayRead implements DHTRandomRead { _DHTShortArrayRead._(_DHTShortArrayHead head) : _head = head; - /// Returns the number of elements in the DHTShortArray @override int get length => _head.length; - /// Return the item at position 'pos' in the DHTShortArray. If 'forceRefresh' - /// is specified, the network will always be checked for newer values - /// rather than returning the existing locally stored copy of the elements. @override Future getItem(int pos, {bool forceRefresh = false}) async { if (pos < 0 || pos >= length) { @@ -77,7 +21,9 @@ class _DHTShortArrayRead implements DHTShortArrayRead { final outSeqNum = Output(); final out = lookup.record.get( subkey: lookup.recordSubkey, - forceRefresh: refresh, + refreshMode: refresh + ? DHTRecordRefreshMode.refresh + : DHTRecordRefreshMode.existing, outSeqNum: outSeqNum); if (outSeqNum.value != null) { _head.updatePositionSeq(pos, false, outSeqNum.value!); @@ -86,17 +32,29 @@ class _DHTShortArrayRead implements DHTShortArrayRead { return out; } - /// Return a list of all of the items in the DHTShortArray. If 'forceRefresh' - /// is specified, the network will always be checked for newer values - /// rather than returning the existing locally stored copy of the elements. - @override - Future?> getAllItems({bool forceRefresh = false}) async { - final out = []; + (int, int) _clampStartLen(int start, int? len) { + len ??= _head.length; + if (start < 0) { + throw IndexError.withLength(start, _head.length); + } + if (start > _head.length) { + throw IndexError.withLength(start, _head.length); + } + if ((len + start) > _head.length) { + len = _head.length - start; + } + return (start, len); + } - final chunks = Iterable.generate(_head.length) - .slices(maxDHTConcurrency) - .map((chunk) => - chunk.map((pos) => getItem(pos, forceRefresh: forceRefresh))); + @override + Future?> getItemRange(int start, + {int? length, bool forceRefresh = false}) async { + final out = []; + (start, length) = _clampStartLen(start, length); + + final chunks = Iterable.generate(length).slices(maxDHTConcurrency).map( + (chunk) => chunk + .map((pos) => getItem(pos + start, forceRefresh: forceRefresh))); for (final chunk in chunks) { final elems = await chunk.wait; @@ -109,9 +67,10 @@ class _DHTShortArrayRead implements DHTShortArrayRead { return out; } - /// Get a list of the positions that were written offline and not flushed yet @override Future> getOfflinePositions() async { + final (start, length) = _clampStartLen(0, DHTShortArray.maxElements); + final indexOffline = {}; final inspects = await [ _head._headRecord.inspect(), @@ -134,7 +93,7 @@ class _DHTShortArrayRead implements DHTShortArrayRead { // See which positions map to offline indexes final positionOffline = {}; - for (var i = 0; i < _head._index.length; i++) { + for (var i = start; i < (start + length); i++) { final idx = _head._index[i]; if (indexOffline.contains(idx)) { positionOffline.add(i); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index d1c8b2f..0d51663 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -1,101 +1,10 @@ part of 'dht_short_array.dart'; -//////////////////////////////////////////////////////////////////////////// -// Writer interface -abstract class DHTShortArrayWrite implements DHTShortArrayRead { - /// Try to add an item to the end of the DHTShortArray. Return true if the - /// element was successfully added, and false if the state changed before - /// the element could be added or a newer value was found on the network. - /// This may throw an exception if the number elements added exceeds the - /// built-in limit of 'maxElements = 256' entries. - Future tryAddItem(Uint8List value); - - /// Try to insert an item as position 'pos' of the DHTShortArray. - /// Return true if the element was successfully inserted, and false if the - /// state changed before the element could be inserted or a newer value was - /// found on the network. - /// This may throw an exception if the number elements added exceeds the - /// built-in limit of 'maxElements = 256' entries. - Future tryInsertItem(int pos, Uint8List value); - - /// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray. - /// Return true if the elements were successfully swapped, and false if the - /// state changed before the elements could be swapped or newer values were - /// found on the network. - /// This may throw an exception if either of the positions swapped exceed - /// the length of the list - Future trySwapItem(int aPos, int bPos); - - /// Try to remove an item at position 'pos' in the DHTShortArray. - /// Return the element if it was successfully removed, and null if the - /// state changed before the elements could be removed or newer values were - /// found on the network. - /// This may throw an exception if the position removed exceeeds the length of - /// the list. - Future tryRemoveItem(int pos); - - /// Try to remove all items in the DHTShortArray. - /// Return true if it was successfully cleared, and false if the - /// state changed before the elements could be cleared or newer values were - /// found on the network. - Future tryClear(); - - /// Try to set an item at position 'pos' of the DHTShortArray. - /// If the set was successful this returns: - /// * The prior contents of the element, or null if there was no value yet - /// * A boolean true - /// If the set was found a newer value on the network: - /// * The newer value of the element, or null if the head record - /// changed. - /// * A boolean false - /// This may throw an exception if the position exceeds the built-in limit of - /// 'maxElements = 256' entries. - Future<(Uint8List?, bool)> tryWriteItem(int pos, Uint8List newValue); -} - -extension DHTShortArrayWriteExt on DHTShortArrayWrite { - /// Convenience function: - /// Like removeItem but also parses the returned element as JSON - Future tryRemoveItemJson( - T Function(dynamic) fromJson, - int pos, - ) => - tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out)); - - /// Convenience function: - /// Like removeItem but also parses the returned element as JSON - Future tryRemoveItemProtobuf( - T Function(List) fromBuffer, int pos) => - getItem(pos).then((out) => (out == null) ? null : fromBuffer(out)); - - /// Convenience function: - /// Like tryWriteItem but also encodes the input value as JSON and parses the - /// returned element as JSON - Future<(T?, bool)> tryWriteItemJson( - T Function(dynamic) fromJson, - int pos, - T newValue, - ) => - tryWriteItem(pos, jsonEncodeBytes(newValue)) - .then((out) => (jsonDecodeOptBytes(fromJson, out.$1), out.$2)); - - /// Convenience function: - /// Like tryWriteItem but also encodes the input value as a protobuf object - /// and parses the returned element as a protobuf object - Future<(T?, bool)> tryWriteItemProtobuf( - T Function(List) fromBuffer, - int pos, - T newValue, - ) => - tryWriteItem(pos, newValue.writeToBuffer()).then( - (out) => ((out.$1 == null ? null : fromBuffer(out.$1!)), out.$2)); -} - //////////////////////////////////////////////////////////////////////////// // Writer implementation class _DHTShortArrayWrite extends _DHTShortArrayRead - implements DHTShortArrayWrite { + implements DHTRandomReadWrite { _DHTShortArrayWrite._(super.head) : super._(); @override @@ -105,12 +14,12 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead _head.allocateIndex(pos); // Write item - final (_, wasSet) = await tryWriteItem(pos, value); - if (!wasSet) { - return false; + final ok = await tryWriteItem(pos, value); + if (!ok) { + _head.freeIndex(pos); } - return true; + return ok; } @override @@ -119,16 +28,15 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead _head.allocateIndex(pos); // Write item - final (_, wasSet) = await tryWriteItem(pos, value); - if (!wasSet) { - return false; + final ok = await tryWriteItem(pos, value); + if (!ok) { + _head.freeIndex(pos); } - return true; } @override - Future trySwapItem(int aPos, int bPos) async { + Future swapItem(int aPos, int bPos) async { if (aPos < 0 || aPos >= _head.length) { throw IndexError.withLength(aPos, _head.length); } @@ -137,12 +45,10 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } // Swap indices _head.swapIndex(aPos, bPos); - - return true; } @override - Future tryRemoveItem(int pos) async { + Future removeItem(int pos, {Output? output}) async { if (pos < 0 || pos >= _head.length) { throw IndexError.withLength(pos, _head.length); } @@ -162,17 +68,17 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead throw StateError('Element does not exist'); } _head.freeIndex(pos); - return result; + output?.save(result); } @override - Future tryClear() async { + Future clear() async { _head.clearIndex(); - return true; } @override - Future<(Uint8List?, bool)> tryWriteItem(int pos, Uint8List newValue) async { + Future tryWriteItem(int pos, Uint8List newValue, + {Output? output}) async { if (pos < 0 || pos >= _head.length) { throw IndexError.withLength(pos, _head.length); } @@ -198,8 +104,10 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead if (result != null) { // A result coming back means the element was overwritten already - return (result, false); + output?.save(result); + return false; } - return (oldValue, true); + output?.save(oldValue); + return true; } } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart new file mode 100644 index 0000000..babcc7d --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart @@ -0,0 +1,44 @@ +import 'dart:typed_data'; + +import 'package:protobuf/protobuf.dart'; + +import '../../../veilid_support.dart'; + +//////////////////////////////////////////////////////////////////////////// +// Append/truncate interface +abstract class DHTAppendTruncate { + /// Try to add an item to the end of the DHT data structure. + /// Return true if the element was successfully added, and false if the state + /// changed before the element could be added or a newer value was found on + /// the network. + /// This may throw an exception if the number elements added exceeds limits. + Future tryAppendItem(Uint8List value); + + /// Try to remove a number of items from the head of the DHT data structure. + /// Throws StateError if count < 0 + Future truncate(int count); + + /// Remove all items in the DHT data structure. + Future clear(); +} + +abstract class DHTAppendTruncateRandomRead + implements DHTAppendTruncate, DHTRandomRead {} + +extension DHTAppendTruncateExt on DHTAppendTruncate { + /// Convenience function: + /// Like tryAppendItem but also encodes the input value as JSON and parses the + /// returned element as JSON + Future tryAppendItemJson( + T newValue, + ) => + tryAppendItem(jsonEncodeBytes(newValue)); + + /// Convenience function: + /// Like tryAppendItem but also encodes the input value as a protobuf object + /// and parses the returned element as a protobuf object + Future tryAppendItemProtobuf( + T newValue, + ) => + tryAppendItem(newValue.writeToBuffer()); +} diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart new file mode 100644 index 0000000..1ee1140 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart @@ -0,0 +1,49 @@ +import 'dart:async'; + +abstract class DHTOpenable { + bool get isOpen; + Future close(); + Future delete(); +} + +extension DHTOpenableExt on D { + /// Runs a closure that guarantees the DHTOpenable + /// will be closed upon exit, even if an uncaught exception is thrown + Future scope(Future Function(D) scopeFunction) async { + if (!isOpen) { + throw StateError('not open in scope'); + } + try { + return await scopeFunction(this); + } finally { + await close(); + } + } + + /// Runs a closure that guarantees the DHTOpenable + /// will be closed upon exit, and deleted if an an + /// uncaught exception is thrown + Future deleteScope(Future Function(D) scopeFunction) async { + if (!isOpen) { + throw StateError('not open in deleteScope'); + } + + try { + final out = await scopeFunction(this); + await close(); + return out; + } on Exception catch (_) { + await delete(); + rethrow; + } + } + + /// Scopes a closure that conditionally deletes the DHTOpenable on exit + Future maybeDeleteScope( + bool delete, Future Function(D) scopeFunction) async { + if (delete) { + return deleteScope(scopeFunction); + } + return scope(scopeFunction); + } +} diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart new file mode 100644 index 0000000..d52676e --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart @@ -0,0 +1,63 @@ +import 'dart:typed_data'; + +import 'package:protobuf/protobuf.dart'; + +import '../../../veilid_support.dart'; + +//////////////////////////////////////////////////////////////////////////// +// Reader interface +abstract class DHTRandomRead { + /// Returns the number of elements in the DHTArray + /// This number will be >= 0 and <= DHTShortArray.maxElements (256) + int get length; + + /// Return the item at position 'pos' in the DHTArray. If 'forceRefresh' + /// is specified, the network will always be checked for newer values + /// rather than returning the existing locally stored copy of the elements. + /// * 'pos' must be >= 0 and < 'length' + Future getItem(int pos, {bool forceRefresh = false}); + + /// Return a list of a range of items in the DHTArray. If 'forceRefresh' + /// is specified, the network will always be checked for newer values + /// rather than returning the existing locally stored copy of the elements. + /// * 'start' must be >= 0 + /// * 'len' must be >= 0 and <= DHTShortArray.maxElements (256) and defaults + /// to the maximum length + Future?> getItemRange(int start, + {int? length, bool forceRefresh = false}); + + /// Get a list of the positions that were written offline and not flushed yet + Future> getOfflinePositions(); +} + +extension DHTRandomReadExt on DHTRandomRead { + /// Convenience function: + /// Like getItem but also parses the returned element as JSON + Future getItemJson(T Function(dynamic) fromJson, int pos, + {bool forceRefresh = false}) => + getItem(pos, forceRefresh: forceRefresh) + .then((out) => jsonDecodeOptBytes(fromJson, out)); + + /// Convenience function: + /// Like getAllItems but also parses the returned elements as JSON + Future?> getItemRangeJson(T Function(dynamic) fromJson, int start, + {int? length, bool forceRefresh = false}) => + getItemRange(start, length: length, forceRefresh: forceRefresh) + .then((out) => out?.map(fromJson).toList()); + + /// Convenience function: + /// Like getItem but also parses the returned element as a protobuf object + Future getItemProtobuf( + T Function(List) fromBuffer, int pos, + {bool forceRefresh = false}) => + getItem(pos, forceRefresh: forceRefresh) + .then((out) => (out == null) ? null : fromBuffer(out)); + + /// Convenience function: + /// Like getAllItems but also parses the returned elements as protobuf objects + Future?> getItemRangeProtobuf( + T Function(List) fromBuffer, int start, + {int? length, bool forceRefresh = false}) => + getItemRange(start, length: length, forceRefresh: forceRefresh) + .then((out) => out?.map(fromBuffer).toList()); +} diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart new file mode 100644 index 0000000..53f307c --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart @@ -0,0 +1,104 @@ +import 'dart:typed_data'; + +import 'package:protobuf/protobuf.dart'; + +import '../../../veilid_support.dart'; + +//////////////////////////////////////////////////////////////////////////// +// Writer interface +abstract class DHTRandomWrite { + /// Try to set an item at position 'pos' of the DHTArray. + /// If the set was successful this returns: + /// * A boolean true + /// * outValue will return the prior contents of the element, + /// or null if there was no value yet + /// + /// If the set was found a newer value on the network this returns: + /// * A boolean false + /// * outValue will return the newer value of the element, + /// or null if the head record changed. + /// + /// This may throw an exception if the position exceeds the built-in limit of + /// 'maxElements = 256' entries. + Future tryWriteItem(int pos, Uint8List newValue, + {Output? output}); + + /// Try to add an item to the end of the DHTArray. Return true if the + /// element was successfully added, and false if the state changed before + /// the element could be added or a newer value was found on the network. + /// This may throw an exception if the number elements added exceeds the + /// built-in limit of 'maxElements = 256' entries. + Future tryAddItem(Uint8List value); + + /// Try to insert an item as position 'pos' of the DHTArray. + /// Return true if the element was successfully inserted, and false if the + /// state changed before the element could be inserted or a newer value was + /// found on the network. + /// This may throw an exception if the number elements added exceeds the + /// built-in limit of 'maxElements = 256' entries. + Future tryInsertItem(int pos, Uint8List value); + + /// Swap items at position 'aPos' and 'bPos' in the DHTArray. + /// Throws IndexError if either of the positions swapped exceed + /// the length of the list + Future swapItem(int aPos, int bPos); + + /// Remove an item at position 'pos' in the DHTArray. + /// If the remove was successful this returns: + /// * outValue will return the prior contents of the element + /// Throws IndexError if the position removed exceeds the length of + /// the list. + Future removeItem(int pos, {Output? output}); + + /// Remove all items in the DHTShortArray. + Future clear(); +} + +extension DHTRandomWriteExt on DHTRandomWrite { + /// Convenience function: + /// Like tryWriteItem but also encodes the input value as JSON and parses the + /// returned element as JSON + Future tryWriteItemJson( + T Function(dynamic) fromJson, int pos, T newValue, + {Output? output}) async { + final outValueBytes = output == null ? null : Output(); + final out = await tryWriteItem(pos, jsonEncodeBytes(newValue), + output: outValueBytes); + output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b)); + return out; + } + + /// Convenience function: + /// Like tryWriteItem but also encodes the input value as a protobuf object + /// and parses the returned element as a protobuf object + Future tryWriteItemProtobuf( + T Function(List) fromBuffer, int pos, T newValue, + {Output? output}) async { + final outValueBytes = output == null ? null : Output(); + final out = await tryWriteItem(pos, newValue.writeToBuffer(), + output: outValueBytes); + output.mapSave(outValueBytes, fromBuffer); + return out; + } + + /// Convenience function: + /// Like removeItem but also parses the returned element as JSON + Future removeItemJson(T Function(dynamic) fromJson, int pos, + {Output? output}) async { + final outValueBytes = output == null ? null : Output(); + await removeItem(pos, output: outValueBytes); + output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b)); + } + + /// Convenience function: + /// Like removeItem but also parses the returned element as JSON + Future removeItemProtobuf( + T Function(List) fromBuffer, int pos, + {Output? output}) async { + final outValueBytes = output == null ? null : Output(); + await removeItem(pos, output: outValueBytes); + output.mapSave(outValueBytes, fromBuffer); + } +} + +abstract class DHTRandomReadWrite implements DHTRandomRead, DHTRandomWrite {} diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart new file mode 100644 index 0000000..2b95033 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart @@ -0,0 +1,5 @@ +class DHTExceptionTryAgain implements Exception { + DHTExceptionTryAgain( + [this.cause = 'operation failed due to newer dht value']); + String cause; +} diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart b/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart new file mode 100644 index 0000000..6c61075 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart @@ -0,0 +1,4 @@ +export 'dht_openable.dart'; +export 'dht_random_read.dart'; +export 'dht_random_write.dart'; +export 'exceptions.dart'; diff --git a/packages/veilid_support/lib/proto/dht.pb.dart b/packages/veilid_support/lib/proto/dht.pb.dart index 7c96a7f..4007d3d 100644 --- a/packages/veilid_support/lib/proto/dht.pb.dart +++ b/packages/veilid_support/lib/proto/dht.pb.dart @@ -83,6 +83,68 @@ class DHTData extends $pb.GeneratedMessage { void clearSize() => clearField(4); } +class DHTLog extends $pb.GeneratedMessage { + factory DHTLog() => create(); + DHTLog._() : super(); + factory DHTLog.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory DHTLog.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + + static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTLog', package: const $pb.PackageName(_omitMessageNames ? '' : 'dht'), createEmptyInstance: create) + ..a<$core.int>(1, _omitFieldNames ? '' : 'head', $pb.PbFieldType.OU3) + ..a<$core.int>(2, _omitFieldNames ? '' : 'tail', $pb.PbFieldType.OU3) + ..a<$core.int>(3, _omitFieldNames ? '' : 'stride', $pb.PbFieldType.OU3) + ..hasRequiredFields = false + ; + + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + DHTLog clone() => DHTLog()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + DHTLog copyWith(void Function(DHTLog) updates) => super.copyWith((message) => updates(message as DHTLog)) as DHTLog; + + $pb.BuilderInfo get info_ => _i; + + @$core.pragma('dart2js:noInline') + static DHTLog create() => DHTLog._(); + DHTLog createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static DHTLog getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static DHTLog? _defaultInstance; + + @$pb.TagNumber(1) + $core.int get head => $_getIZ(0); + @$pb.TagNumber(1) + set head($core.int v) { $_setUnsignedInt32(0, v); } + @$pb.TagNumber(1) + $core.bool hasHead() => $_has(0); + @$pb.TagNumber(1) + void clearHead() => clearField(1); + + @$pb.TagNumber(2) + $core.int get tail => $_getIZ(1); + @$pb.TagNumber(2) + set tail($core.int v) { $_setUnsignedInt32(1, v); } + @$pb.TagNumber(2) + $core.bool hasTail() => $_has(1); + @$pb.TagNumber(2) + void clearTail() => clearField(2); + + @$pb.TagNumber(3) + $core.int get stride => $_getIZ(2); + @$pb.TagNumber(3) + set stride($core.int v) { $_setUnsignedInt32(2, v); } + @$pb.TagNumber(3) + $core.bool hasStride() => $_has(2); + @$pb.TagNumber(3) + void clearStride() => clearField(3); +} + class DHTShortArray extends $pb.GeneratedMessage { factory DHTShortArray() => create(); DHTShortArray._() : super(); @@ -133,68 +195,6 @@ class DHTShortArray extends $pb.GeneratedMessage { $core.List<$core.int> get seqs => $_getList(2); } -class DHTLog extends $pb.GeneratedMessage { - factory DHTLog() => create(); - DHTLog._() : super(); - factory DHTLog.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory DHTLog.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); - - static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTLog', package: const $pb.PackageName(_omitMessageNames ? '' : 'dht'), createEmptyInstance: create) - ..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'keys', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) - ..aOM<$0.TypedKey>(2, _omitFieldNames ? '' : 'back', subBuilder: $0.TypedKey.create) - ..p<$core.int>(3, _omitFieldNames ? '' : 'subkeyCounts', $pb.PbFieldType.KU3) - ..a<$core.int>(4, _omitFieldNames ? '' : 'totalSubkeys', $pb.PbFieldType.OU3) - ..hasRequiredFields = false - ; - - @$core.Deprecated( - 'Using this can add significant overhead to your binary. ' - 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' - 'Will be removed in next major version') - DHTLog clone() => DHTLog()..mergeFromMessage(this); - @$core.Deprecated( - 'Using this can add significant overhead to your binary. ' - 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' - 'Will be removed in next major version') - DHTLog copyWith(void Function(DHTLog) updates) => super.copyWith((message) => updates(message as DHTLog)) as DHTLog; - - $pb.BuilderInfo get info_ => _i; - - @$core.pragma('dart2js:noInline') - static DHTLog create() => DHTLog._(); - DHTLog createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); - @$core.pragma('dart2js:noInline') - static DHTLog getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static DHTLog? _defaultInstance; - - @$pb.TagNumber(1) - $core.List<$0.TypedKey> get keys => $_getList(0); - - @$pb.TagNumber(2) - $0.TypedKey get back => $_getN(1); - @$pb.TagNumber(2) - set back($0.TypedKey v) { setField(2, v); } - @$pb.TagNumber(2) - $core.bool hasBack() => $_has(1); - @$pb.TagNumber(2) - void clearBack() => clearField(2); - @$pb.TagNumber(2) - $0.TypedKey ensureBack() => $_ensure(1); - - @$pb.TagNumber(3) - $core.List<$core.int> get subkeyCounts => $_getList(2); - - @$pb.TagNumber(4) - $core.int get totalSubkeys => $_getIZ(3); - @$pb.TagNumber(4) - set totalSubkeys($core.int v) { $_setUnsignedInt32(3, v); } - @$pb.TagNumber(4) - $core.bool hasTotalSubkeys() => $_has(3); - @$pb.TagNumber(4) - void clearTotalSubkeys() => clearField(4); -} - enum DataReference_Kind { dhtData, notSet diff --git a/packages/veilid_support/lib/proto/dht.pbjson.dart b/packages/veilid_support/lib/proto/dht.pbjson.dart index bf31c30..6c99cb7 100644 --- a/packages/veilid_support/lib/proto/dht.pbjson.dart +++ b/packages/veilid_support/lib/proto/dht.pbjson.dart @@ -30,6 +30,21 @@ final $typed_data.Uint8List dHTDataDescriptor = $convert.base64Decode( 'gCIAEoCzIQLnZlaWxpZC5UeXBlZEtleVIEaGFzaBIUCgVjaHVuaxgDIAEoDVIFY2h1bmsSEgoE' 'c2l6ZRgEIAEoDVIEc2l6ZQ=='); +@$core.Deprecated('Use dHTLogDescriptor instead') +const DHTLog$json = { + '1': 'DHTLog', + '2': [ + {'1': 'head', '3': 1, '4': 1, '5': 13, '10': 'head'}, + {'1': 'tail', '3': 2, '4': 1, '5': 13, '10': 'tail'}, + {'1': 'stride', '3': 3, '4': 1, '5': 13, '10': 'stride'}, + ], +}; + +/// Descriptor for `DHTLog`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List dHTLogDescriptor = $convert.base64Decode( + 'CgZESFRMb2cSEgoEaGVhZBgBIAEoDVIEaGVhZBISCgR0YWlsGAIgASgNUgR0YWlsEhYKBnN0cm' + 'lkZRgDIAEoDVIGc3RyaWRl'); + @$core.Deprecated('Use dHTShortArrayDescriptor instead') const DHTShortArray$json = { '1': 'DHTShortArray', @@ -45,23 +60,6 @@ final $typed_data.Uint8List dHTShortArrayDescriptor = $convert.base64Decode( 'Cg1ESFRTaG9ydEFycmF5EiQKBGtleXMYASADKAsyEC52ZWlsaWQuVHlwZWRLZXlSBGtleXMSFA' 'oFaW5kZXgYAiABKAxSBWluZGV4EhIKBHNlcXMYAyADKA1SBHNlcXM='); -@$core.Deprecated('Use dHTLogDescriptor instead') -const DHTLog$json = { - '1': 'DHTLog', - '2': [ - {'1': 'keys', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'keys'}, - {'1': 'back', '3': 2, '4': 1, '5': 11, '6': '.veilid.TypedKey', '10': 'back'}, - {'1': 'subkey_counts', '3': 3, '4': 3, '5': 13, '10': 'subkeyCounts'}, - {'1': 'total_subkeys', '3': 4, '4': 1, '5': 13, '10': 'totalSubkeys'}, - ], -}; - -/// Descriptor for `DHTLog`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List dHTLogDescriptor = $convert.base64Decode( - 'CgZESFRMb2cSJAoEa2V5cxgBIAMoCzIQLnZlaWxpZC5UeXBlZEtleVIEa2V5cxIkCgRiYWNrGA' - 'IgASgLMhAudmVpbGlkLlR5cGVkS2V5UgRiYWNrEiMKDXN1YmtleV9jb3VudHMYAyADKA1SDHN1' - 'YmtleUNvdW50cxIjCg10b3RhbF9zdWJrZXlzGAQgASgNUgx0b3RhbFN1YmtleXM='); - @$core.Deprecated('Use dataReferenceDescriptor instead') const DataReference$json = { '1': 'DataReference', diff --git a/packages/veilid_support/lib/src/identity.dart b/packages/veilid_support/lib/src/identity.dart index baae797..2645894 100644 --- a/packages/veilid_support/lib/src/identity.dart +++ b/packages/veilid_support/lib/src/identity.dart @@ -300,8 +300,8 @@ Future openIdentityMaster( debugName: 'IdentityMaster::openIdentityMaster::IdentityMasterRecord')) .deleteScope((masterRec) async { - final identityMaster = - (await masterRec.getJson(IdentityMaster.fromJson, forceRefresh: true))!; + final identityMaster = (await masterRec.getJson(IdentityMaster.fromJson, + refreshMode: DHTRecordRefreshMode.refresh))!; // Validate IdentityMaster final masterRecordKey = masterRec.key; diff --git a/packages/veilid_support/lib/src/output.dart b/packages/veilid_support/lib/src/output.dart new file mode 100644 index 0000000..78902b3 --- /dev/null +++ b/packages/veilid_support/lib/src/output.dart @@ -0,0 +1,33 @@ +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; + +export 'package:fast_immutable_collections/fast_immutable_collections.dart' + show Output; + +extension OutputNullExt on Output? { + void mapSave(Output? other, T Function(S output) closure) { + if (this == null) { + return; + } + if (other == null) { + return; + } + final v = other.value; + if (v == null) { + return; + } + return this!.save(closure(v)); + } +} + +extension OutputExt on Output { + void mapSave(Output? other, T Function(S output) closure) { + if (other == null) { + return; + } + final v = other.value; + if (v == null) { + return; + } + return save(closure(v)); + } +} diff --git a/packages/veilid_support/lib/veilid_support.dart b/packages/veilid_support/lib/veilid_support.dart index fcbbaf4..e741990 100644 --- a/packages/veilid_support/lib/veilid_support.dart +++ b/packages/veilid_support/lib/veilid_support.dart @@ -10,6 +10,7 @@ export 'src/config.dart'; export 'src/identity.dart'; export 'src/json_tools.dart'; export 'src/memory_tools.dart'; +export 'src/output.dart'; export 'src/persistent_queue.dart'; export 'src/protobuf_tools.dart'; export 'src/table_db.dart'; diff --git a/pubspec.lock b/pubspec.lock index 21e4f2e..07b05e5 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -37,10 +37,10 @@ packages: dependency: "direct main" description: name: archive - sha256: "22600aa1e926be775fa5fe7e6894e7fb3df9efda8891c73f70fb3262399a432d" + sha256: ecf4273855368121b1caed0d10d4513c7241dfc813f7d3c8933b36622ae9b265 url: "https://pub.dev" source: hosted - version: "3.4.10" + version: "3.5.1" args: dependency: transitive description: @@ -203,18 +203,18 @@ packages: dependency: transitive description: name: cached_network_image_web - sha256: "42a835caa27c220d1294311ac409a43361088625a4f23c820b006dd9bffb3316" + sha256: "205d6a9f1862de34b93184f22b9d2d94586b2f05c581d546695e3d8f6a805cd7" url: "https://pub.dev" source: hosted - version: "1.1.1" + version: "1.2.0" camera: dependency: transitive description: name: camera - sha256: "9499cbc2e51d8eb0beadc158b288380037618ce4e30c9acbc4fae1ac3ecb5797" + sha256: dfa8fc5a1adaeb95e7a54d86a5bd56f4bb0e035515354c8ac6d262e35cec2ec8 url: "https://pub.dev" source: hosted - version: "0.10.5+9" + version: "0.10.6" camera_android: dependency: transitive description: @@ -227,10 +227,10 @@ packages: dependency: transitive description: name: camera_avfoundation - sha256: "9dbbb253aaf201a69c40cf95571f366ca936305d2de012684e21f6f1b1433d31" + sha256: "7d021e8cd30d9b71b8b92b4ad669e80af432d722d18d6aac338572754a786c15" url: "https://pub.dev" source: hosted - version: "0.9.15+4" + version: "0.9.16" camera_platform_interface: dependency: transitive description: @@ -456,10 +456,10 @@ packages: dependency: transitive description: name: flutter_cache_manager - sha256: "8207f27539deb83732fdda03e259349046a39a4c767269285f449ade355d54ba" + sha256: "395d6b7831f21f3b989ebedbb785545932adb9afe2622c1ffacf7f4b53a7e544" url: "https://pub.dev" source: hosted - version: "3.3.1" + version: "3.3.2" flutter_chat_types: dependency: "direct main" description: @@ -634,10 +634,10 @@ packages: dependency: "direct main" description: name: go_router - sha256: "771c8feb40ad0ef639973d7ecf1b43d55ffcedb2207fd43fab030f5639e40446" + sha256: b465e99ce64ba75e61c8c0ce3d87b66d8ac07f0b35d0a7e0263fcfc10f99e836 url: "https://pub.dev" source: hosted - version: "13.2.4" + version: "13.2.5" graphs: dependency: transitive description: @@ -898,10 +898,10 @@ packages: dependency: transitive description: name: path_provider_foundation - sha256: "5a7999be66e000916500be4f15a3633ebceb8302719b47b9cc49ce924125350f" + sha256: f234384a3fdd67f989b4d54a5d73ca2a6c422fa55ae694381ae0f4375cd1ea16 url: "https://pub.dev" source: hosted - version: "2.3.2" + version: "2.4.0" path_provider_linux: dependency: transitive description: @@ -970,10 +970,10 @@ packages: dependency: transitive description: name: pointycastle - sha256: "79fbafed02cfdbe85ef3fd06c7f4bc2cbcba0177e61b765264853d4253b21744" + sha256: "4be0097fcf3fd3e8449e53730c631200ebc7b88016acecab2b0da2f0149222fe" url: "https://pub.dev" source: hosted - version: "3.9.0" + version: "3.9.1" pool: dependency: transitive description: @@ -1106,10 +1106,10 @@ packages: dependency: "direct main" description: name: searchable_listview - sha256: d8513a968bdd540cb011220a5670b23b346e04a7bcb99690a859ed58092f72a4 + sha256: f9bc1a57dfcba49ce2d190d642567fb82309dd23849b3b0a328266e3f90054db url: "https://pub.dev" source: hosted - version: "2.11.2" + version: "2.12.0" share_plus: dependency: "direct main" description: @@ -1146,10 +1146,10 @@ packages: dependency: transitive description: name: shared_preferences_foundation - sha256: "7708d83064f38060c7b39db12aefe449cb8cdc031d6062280087bc4cdb988f5c" + sha256: "0a8a893bf4fd1152f93fec03a415d11c27c74454d96e2318a7ac38dd18683ab7" url: "https://pub.dev" source: hosted - version: "2.3.5" + version: "2.4.0" shared_preferences_linux: dependency: transitive description: @@ -1263,10 +1263,10 @@ packages: dependency: transitive description: name: sqflite - sha256: "5ce2e1a15e822c3b4bfb5400455775e421da7098eed8adc8f26298ada7c9308c" + sha256: a43e5a27235518c03ca238e7b4732cf35eabe863a369ceba6cbefa537a66f16d url: "https://pub.dev" source: hosted - version: "2.3.3" + version: "2.3.3+1" sqflite_common: dependency: transitive description: @@ -1407,10 +1407,10 @@ packages: dependency: transitive description: name: url_launcher_ios - sha256: "9149d493b075ed740901f3ee844a38a00b33116c7c5c10d7fb27df8987fb51d5" + sha256: "7068716403343f6ba4969b4173cbf3b84fc768042124bc2c011e5d782b24fe89" url: "https://pub.dev" source: hosted - version: "6.2.5" + version: "6.3.0" url_launcher_linux: dependency: transitive description: @@ -1423,10 +1423,10 @@ packages: dependency: transitive description: name: url_launcher_macos - sha256: b7244901ea3cf489c5335bdacda07264a6e960b1c1b1a9f91e4bc371d9e68234 + sha256: "9a1a42d5d2d95400c795b2914c36fdcb525870c752569438e4ebb09a2b5d90de" url: "https://pub.dev" source: hosted - version: "3.1.0" + version: "3.2.0" url_launcher_platform_interface: dependency: transitive description: @@ -1541,10 +1541,10 @@ packages: dependency: transitive description: name: win32 - sha256: "0a989dc7ca2bb51eac91e8fd00851297cfffd641aa7538b165c62637ca0eaa4a" + sha256: "0eaf06e3446824099858367950a813472af675116bf63f008a4c2a75ae13e9cb" url: "https://pub.dev" source: hosted - version: "5.4.0" + version: "5.5.0" window_manager: dependency: "direct main" description: diff --git a/pubspec.yaml b/pubspec.yaml index 821c214..d3e5a50 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -10,7 +10,7 @@ environment: dependencies: animated_theme_switcher: ^2.0.10 ansicolor: ^2.0.2 - archive: ^3.4.10 + archive: ^3.5.1 async_tools: ^0.1.1 awesome_extensions: ^2.0.14 badges: ^3.1.2 @@ -44,11 +44,11 @@ dependencies: flutter_translate: ^4.0.4 form_builder_validators: ^9.1.0 freezed_annotation: ^2.4.1 - go_router: ^13.2.4 + go_router: ^13.2.5 hydrated_bloc: ^9.1.5 image: ^4.1.7 intl: ^0.18.1 - json_annotation: ^4.8.1 + json_annotation: ^4.9.0 loggy: ^2.0.3 meta: ^1.11.0 mobile_scanner: ^4.0.1 @@ -65,7 +65,7 @@ dependencies: quickalert: ^1.1.0 radix_colors: ^1.0.4 reorderable_grid: ^1.0.10 - searchable_listview: ^2.11.2 + searchable_listview: ^2.12.0 share_plus: ^8.0.3 shared_preferences: ^2.2.3 signal_strength_indicator: ^0.4.1 @@ -93,7 +93,7 @@ dev_dependencies: build_runner: ^2.4.9 freezed: ^2.5.2 icons_launcher: ^2.1.7 - json_serializable: ^6.7.1 + json_serializable: ^6.8.0 lint_hard: ^4.0.0 flutter_native_splash: