diff --git a/lib/tools/json_tools.dart b/lib/tools/json_tools.dart index be69102..e7bfd09 100644 --- a/lib/tools/json_tools.dart +++ b/lib/tools/json_tools.dart @@ -4,6 +4,9 @@ import 'dart:typed_data'; T jsonDecodeBytes(T Function(dynamic) fromJson, Uint8List data) => fromJson(jsonDecode(utf8.decode(data))); +T? jsonDecodeOptBytes(T Function(dynamic) fromJson, Uint8List? data) => + (data == null) ? null : fromJson(jsonDecode(utf8.decode(data))); + Uint8List jsonEncodeBytes(Object? object, {Object? Function(Object?)? toEncodable}) => Uint8List.fromList( diff --git a/lib/veilid_support/dht_short_array.dart b/lib/veilid_support/dht_short_array.dart index 06eb2d2..426e850 100644 --- a/lib/veilid_support/dht_short_array.dart +++ b/lib/veilid_support/dht_short_array.dart @@ -12,6 +12,10 @@ class _DHTShortArrayCache { : linkedRecords = List.empty(growable: true), index = List.empty(growable: true), free = List.empty(growable: true); + _DHTShortArrayCache.from(_DHTShortArrayCache other) + : linkedRecords = List.of(other.linkedRecords), + index = List.of(other.index), + free = List.of(other.free); final List linkedRecords; final List index; @@ -94,20 +98,23 @@ class DHTShortArray { //////////////////////////////////////////////////////////////// - /// Write the current head cache out to a protobuf to be serialized - Uint8List _headToBuffer() { + /// Seralize and write out the current head record, possibly updating it + /// if a newer copy is available online. Returns true if the write was + /// successful + Future _tryWriteHead() async { final head = proto.DHTShortArray(); head.keys.addAll(_head.linkedRecords.map((lr) => lr.key.toProto())); head.index.addAll(_head.index); - return head.writeToBuffer(); - } + final headBuffer = head.writeToBuffer(); - Future _openLinkedRecord(TypedKey recordKey) async { - final writer = _headRecord.writer; - return (writer != null) - ? await DHTRecord.openWrite( - _headRecord.routingContext, recordKey, writer) - : await DHTRecord.openRead(_headRecord.routingContext, recordKey); + final existingData = await _headRecord.tryWriteBytes(headBuffer); + if (existingData != null) { + // Head write failed, incorporate update + await _newHead(proto.DHTShortArray.fromBuffer(existingData)); + return false; + } + + return true; } /// Validate the head from the DHT is properly formatted @@ -142,19 +149,17 @@ class DHTShortArray { return free; } - Future _refreshHead( - {bool forceRefresh = false, bool onlyUpdates = false}) async { - // Get an updated head record copy if one exists - final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, - forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); - if (head == null) { - if (onlyUpdates) { - // No update - return false; - } - throw StateError('head missing during initial refresh'); - } + /// Open a linked record for reading or writing, same as the head record + Future _openLinkedRecord(TypedKey recordKey) async { + final writer = _headRecord.writer; + return (writer != null) + ? await DHTRecord.openWrite( + _headRecord.routingContext, recordKey, writer) + : await DHTRecord.openRead(_headRecord.routingContext, recordKey); + } + /// Validate a new head record + Future _newHead(proto.DHTShortArray head) async { // Get the set of new linked keys and validate it final linkedKeys = head.keys.map(proto.TypedKeyProto.fromProto).toList(); final index = head.index; @@ -180,6 +185,7 @@ class DHTShortArray { } on Exception catch (_) { // On any exception close the records we have opened await Future.wait(newRecords.entries.map((e) => e.value.close())); + rethrow; } // From this point forward we should not throw an exception or everything @@ -197,6 +203,24 @@ class DHTShortArray { linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!)) ..index.addAll(index) ..free.addAll(free); + } + + /// Pull the latest or updated copy of the head record from the network + Future _refreshHead( + {bool forceRefresh = false, bool onlyUpdates = false}) async { + // Get an updated head record copy if one exists + final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, + forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); + if (head == null) { + if (onlyUpdates) { + // No update + return false; + } + throw StateError('head missing during refresh'); + } + + await _newHead(head); + return true; } @@ -249,19 +273,30 @@ class DHTShortArray { return _head.linkedRecords[recordNumber]; } - // xxx: add - // xxx: insert - // xxx: swap - // xxx: remove - // xxx: clear - // xxx ensure these write the head back out because they change it + int _emptyIndex() { + if (_head.free.isNotEmpty) { + return _head.free.removeLast(); + } + if (_head.index.length == maxElements) { + throw StateError('too many elements'); + } + return _head.index.length; + } - Future getItem(int index, {bool forceRefresh = false}) async { + void _freeIndex(int idx) { + _head.free.add(idx); + // xxx: free list optimization here? + } + + int length() => _head.index.length; + + Future getItem(int pos, {bool forceRefresh = false}) async { await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true); - if (index < 0 || index >= _head.index.length) { - throw IndexError.withLength(index, _head.index.length); + if (pos < 0 || pos >= _head.index.length) { + throw IndexError.withLength(pos, _head.index.length); } + final index = _head.index[pos]; final recordNumber = index ~/ _stride; final record = _getRecord(recordNumber); assert(record != null, 'Record does not exist'); @@ -270,14 +305,162 @@ class DHTShortArray { return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh); } - Future tryWriteItem(int index, Uint8List newValue) async { + Future getItemJson(T Function(dynamic) fromJson, int pos, + {bool forceRefresh = false}) => + getItem(pos, forceRefresh: forceRefresh) + .then((out) => jsonDecodeOptBytes(fromJson, out)); + + Future getItemProtobuf( + T Function(List) fromBuffer, int pos, + {bool forceRefresh = false}) => + getItem(pos, forceRefresh: forceRefresh) + .then((out) => (out == null) ? null : fromBuffer(out)); + + Future tryAddItem(Uint8List value) async { + await _refreshHead(onlyUpdates: true); + + final oldHead = _DHTShortArrayCache.from(_head); + late final int pos; + try { + // Allocate empty index + final idx = _emptyIndex(); + // Add new index + pos = _head.index.length; + _head.index.add(idx); + + // Write new head + if (!await _tryWriteHead()) { + // Failed to write head means head got overwritten + return false; + } + } on Exception catch (_) { + // Exception on write means state needs to be reverted + _head = oldHead; + return false; + } + + // Head write succeeded, now write item + await eventualWriteItem(pos, value); + return true; + } + + Future tryInsertItem(int pos, Uint8List value) async { + await _refreshHead(onlyUpdates: true); + + final oldHead = _DHTShortArrayCache.from(_head); + try { + // Allocate empty index + final idx = _emptyIndex(); + // Add new index + _head.index.insert(pos, idx); + + // Write new head + if (!await _tryWriteHead()) { + // Failed to write head means head got overwritten + return false; + } + } on Exception catch (_) { + // Exception on write means state needs to be reverted + _head = oldHead; + return false; + } + + // Head write succeeded, now write item + await eventualWriteItem(pos, value); + return true; + } + + Future trySwapItem(int aPos, int bPos) async { + await _refreshHead(onlyUpdates: true); + + final oldHead = _DHTShortArrayCache.from(_head); + try { + // Add new index + final aIdx = _head.index[aPos]; + final bIdx = _head.index[bPos]; + _head.index[aPos] = bIdx; + _head.index[bPos] = aIdx; + + // Write new head + if (!await _tryWriteHead()) { + // Failed to write head means head got overwritten + return false; + } + } on Exception catch (_) { + // Exception on write means state needs to be reverted + _head = oldHead; + return false; + } + return true; + } + + Future tryRemoveItem(int pos) async { + await _refreshHead(onlyUpdates: true); + + final oldHead = _DHTShortArrayCache.from(_head); + try { + final removedIdx = _head.index.removeAt(pos); + _freeIndex(removedIdx); + final recordNumber = removedIdx ~/ _stride; + final record = _getRecord(recordNumber); + assert(record != null, 'Record does not exist'); + final recordSubkey = + (removedIdx % _stride) + ((recordNumber == 0) ? 1 : 0); + + // Write new head + if (!await _tryWriteHead()) { + // Failed to write head means head got overwritten + return null; + } + + return record!.get(subkey: recordSubkey); + } on Exception catch (_) { + // Exception on write means state needs to be reverted + _head = oldHead; + return null; + } + } + + Future tryRemoveItemJson( + T Function(dynamic) fromJson, + int pos, + ) => + tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out)); + + Future tryRemoveItemProtobuf( + T Function(List) fromBuffer, int pos) => + getItem(pos).then((out) => (out == null) ? null : fromBuffer(out)); + + Future tryClear() async { + await _refreshHead(onlyUpdates: true); + + final oldHead = _DHTShortArrayCache.from(_head); + try { + _head.index.clear(); + _head.free.clear(); + + // Write new head + if (!await _tryWriteHead()) { + // Failed to write head means head got overwritten + return false; + } + } on Exception catch (_) { + // Exception on write means state needs to be reverted + _head = oldHead; + return false; + } + return true; + } + + Future tryWriteItem(int pos, Uint8List newValue) async { if (await _refreshHead(onlyUpdates: true)) { throw StateError('structure changed'); } - - if (index < 0 || index >= _head.index.length) { - throw IndexError.withLength(index, _head.index.length); + if (pos < 0 || pos >= _head.index.length) { + throw IndexError.withLength(pos, _head.index.length); } + final index = _head.index[pos]; + final recordNumber = index ~/ _stride; final record = _getRecord(recordNumber); assert(record != null, 'Record does not exist'); @@ -286,19 +469,19 @@ class DHTShortArray { return record!.tryWriteBytes(newValue, subkey: recordSubkey); } - Future eventualWriteItem(int index, Uint8List newValue) async { + Future eventualWriteItem(int pos, Uint8List newValue) async { Uint8List? oldData; do { // Set it back - oldData = await tryWriteItem(index, newValue); + oldData = await tryWriteItem(pos, newValue); // Repeat if newer data on the network was found } while (oldData != null); } Future eventualUpdateItem( - int index, Future Function(Uint8List oldValue) update) async { - var oldData = await getItem(index); + int pos, Future Function(Uint8List oldValue) update) async { + var oldData = await getItem(pos); // Ensure it exists already if (oldData == null) { throw const FormatException('value does not exist'); @@ -308,7 +491,7 @@ class DHTShortArray { final updatedData = await update(oldData!); // Set it back - oldData = await tryWriteItem(index, updatedData); + oldData = await tryWriteItem(pos, updatedData); // Repeat if newer data on the network was found } while (oldData != null); @@ -316,47 +499,43 @@ class DHTShortArray { Future tryWriteItemJson( T Function(dynamic) fromJson, - int index, + int pos, T newValue, ) => - tryWriteItem(index, jsonEncodeBytes(newValue)).then((out) { - if (out == null) { - return null; - } - return jsonDecodeBytes(fromJson, out); - }); + tryWriteItem(pos, jsonEncodeBytes(newValue)) + .then((out) => jsonDecodeOptBytes(fromJson, out)); Future tryWriteItemProtobuf( T Function(List) fromBuffer, - int index, + int pos, T newValue, ) => - tryWriteItem(index, newValue.writeToBuffer()).then((out) { + tryWriteItem(pos, newValue.writeToBuffer()).then((out) { if (out == null) { return null; } return fromBuffer(out); }); - Future eventualWriteItemJson(int index, T newValue) => - eventualWriteItem(index, jsonEncodeBytes(newValue)); + Future eventualWriteItemJson(int pos, T newValue) => + eventualWriteItem(pos, jsonEncodeBytes(newValue)); Future eventualWriteItemProtobuf( - int index, T newValue, + int pos, T newValue, {int subkey = -1}) => - eventualWriteItem(index, newValue.writeToBuffer()); + eventualWriteItem(pos, newValue.writeToBuffer()); Future eventualUpdateItemJson( T Function(dynamic) fromJson, - int index, + int pos, Future Function(T) update, ) => - eventualUpdateItem(index, jsonUpdate(fromJson, update)); + eventualUpdateItem(pos, jsonUpdate(fromJson, update)); Future eventualUpdateItemProtobuf( T Function(List) fromBuffer, - int index, + int pos, Future Function(T) update, ) => - eventualUpdateItem(index, protobufUpdate(fromBuffer, update)); + eventualUpdateItem(pos, protobufUpdate(fromBuffer, update)); } diff --git a/lib/veilid_support/identity_master.dart b/lib/veilid_support/identity_master.dart index 8c64aa9..81aa11b 100644 --- a/lib/veilid_support/identity_master.dart +++ b/lib/veilid_support/identity_master.dart @@ -1,3 +1,5 @@ +// ignore_for_file: prefer_expression_function_bodies + import 'dart:typed_data'; import 'package:fast_immutable_collections/fast_immutable_collections.dart';