From 490051a650ddf08e8f2a3d783572660e013b3a50 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 30 May 2024 23:25:47 -0400 Subject: [PATCH] reconciliation work --- .../reconciliation/author_input_queue.dart | 145 ++++++++++++ .../reconciliation/author_input_source.dart | 10 + .../message_reconciliation.dart | 206 +++++++++++++++++ .../reconciliation/output_position.dart | 13 ++ .../cubits/reconciliation/reconciliation.dart | 1 + .../cubits/single_contact_messages_cubit.dart | 213 ++---------------- lib/proto/extensions.dart | 3 + .../lib/dht_support/src/dht_log/dht_log.dart | 3 +- .../src/dht_log/dht_log_cubit.dart | 4 +- .../lib/src/table_db_array.dart | 42 ++++ pubspec.lock | 9 + pubspec.yaml | 4 + 12 files changed, 457 insertions(+), 196 deletions(-) create mode 100644 lib/chat/cubits/reconciliation/author_input_queue.dart create mode 100644 lib/chat/cubits/reconciliation/author_input_source.dart create mode 100644 lib/chat/cubits/reconciliation/message_reconciliation.dart create mode 100644 lib/chat/cubits/reconciliation/output_position.dart create mode 100644 lib/chat/cubits/reconciliation/reconciliation.dart diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart new file mode 100644 index 0000000..b441e75 --- /dev/null +++ b/lib/chat/cubits/reconciliation/author_input_queue.dart @@ -0,0 +1,145 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:math'; + +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:veilid_support/veilid_support.dart'; + +import '../../../proto/proto.dart' as proto; + +import 'author_input_source.dart'; +import 'output_position.dart'; + +class AuthorInputQueue { + AuthorInputQueue({ + required this.author, + required this.inputSource, + required this.lastOutputPosition, + required this.onError, + }): + assert(inputSource.messages.count>0, 'no input source window length'), + assert(inputSource.messages.elements.isNotEmpty, 'no input source elements'), + assert(inputSource.messages.tail >= inputSource.messages.elements.length, 'tail is before initial messages end'), + assert(inputSource.messages.tail > 0, 'tail is not greater than zero'), + currentPosition = inputSource.messages.tail, + currentWindow = inputSource.messages.elements, + windowLength = inputSource.messages.count, + windowFirst = inputSource.messages.tail - inputSource.messages.elements.length, + windowLast = inputSource.messages.tail - 1; + + //////////////////////////////////////////////////////////////////////////// + + bool get isEmpty => toReconcile.isEmpty; + + proto.Message? get current => toReconcile.firstOrNull; + + bool consume() { + toReconcile.removeFirst(); + return toReconcile.isNotEmpty; + } + + Future prepareInputQueue() async { + // Go through batches of the input dhtlog starting with + // the current cubit state which is at the tail of the log + // Find the last reconciled message for this author + + outer: + while (true) { + for (var rn = currentWindow.length; + rn >= 0 && currentPosition >= 0; + rn--, currentPosition--) { + final elem = currentWindow[rn]; + + // If we've found an input element that is older than our last + // reconciled message for this author, then we stop + if (lastOutputPosition != null) { + if (elem.value.timestamp < lastOutputPosition!.message.timestamp) { + break outer; + } + } + + // Drop the 'offline' elements because we don't reconcile + // anything until it has been confirmed to be committed to the DHT + if (elem.isOffline) { + continue; + } + + // Add to head of reconciliation queue + toReconcile.addFirst(elem.value); + if (toReconcile.length > _maxQueueChunk) { + toReconcile.removeLast(); + } + } + if (currentPosition < 0) { + break; + } + + xxx update window here and make this and other methods work + } + return true; + } + + // Slide the window toward the current position and load the batch around it + Future updateWindow() async { + + // Check if we are still in the window + if (currentPosition>=windowFirst && currentPosition <= windowLast) { + return true; + } + + // Get the length of the cubit + final inputLength = await inputSource.cubit.operate((r) async => r.length); + + // If not, slide the window + if (currentPosition toReconcile = ListQueue(); + final AuthorInputSource inputSource; + final OutputPosition? lastOutputPosition; + final void Function(Object, StackTrace?) onError; + + // The current position in the input log that we are looking at + int currentPosition; + // The current input window elements + IList> currentWindow; + // The first position of the sliding input window + int windowFirst; + // The last position of the sliding input window + int windowLast; + // Desired maximum window length + int windowLength; + + static const int _maxQueueChunk = 256; +} diff --git a/lib/chat/cubits/reconciliation/author_input_source.dart b/lib/chat/cubits/reconciliation/author_input_source.dart new file mode 100644 index 0000000..75f020e --- /dev/null +++ b/lib/chat/cubits/reconciliation/author_input_source.dart @@ -0,0 +1,10 @@ +import 'package:veilid_support/veilid_support.dart'; + +import '../../../proto/proto.dart' as proto; + +class AuthorInputSource { + AuthorInputSource({required this.messages, required this.cubit}); + + final DHTLogStateData messages; + final DHTLogCubit cubit; +} diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart new file mode 100644 index 0000000..51cfe2b --- /dev/null +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -0,0 +1,206 @@ +import 'dart:async'; + +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:sorted_list/sorted_list.dart'; +import 'package:veilid_support/veilid_support.dart'; + +import '../../../proto/proto.dart' as proto; +import 'author_input_queue.dart'; +import 'author_input_source.dart'; +import 'output_position.dart'; + +class MessageReconciliation { + MessageReconciliation( + {required TableDBArrayCubit output, + required void Function(Object, StackTrace?) onError}) + : _outputCubit = output, + _onError = onError; + + //////////////////////////////////////////////////////////////////////////// + + void reconcileMessages( + TypedKey author, + DHTLogStateData inputMessages, + DHTLogCubit inputMessagesCubit) { + if (inputMessages.elements.isEmpty) { + return; + } + + _inputSources[author] = + AuthorInputSource(messages: inputMessages, cubit: inputMessagesCubit); + + singleFuture(this, onError: _onError, () async { + // Take entire list of input sources we have currently and process them + final inputSources = _inputSources; + _inputSources = {}; + + final inputFuts = >[]; + for (final kv in inputSources.entries) { + final author = kv.key; + final inputSource = kv.value; + inputFuts + .add(_enqueueAuthorInput(author: author, inputSource: inputSource)); + } + final inputQueues = await inputFuts.wait; + + // Make this safe to cast by removing inputs that were rejected or empty + inputQueues.removeNulls(); + + // Process all input queues together + await _outputCubit + .operate((reconciledArray) async => _reconcileInputQueues( + reconciledArray: reconciledArray, + inputQueues: inputQueues.cast(), + )); + }); + } + + //////////////////////////////////////////////////////////////////////////// + + // Set up a single author's message reconciliation + Future _enqueueAuthorInput( + {required TypedKey author, + required AuthorInputSource inputSource}) async { + // Get the position of our most recent reconciled message from this author + final lastReconciledMessage = + await _findNewestReconciledMessage(author: author); + + // Find oldest message we have not yet reconciled + final inputQueue = await _buildAuthorInputQueue( + author: author, + inputSource: inputSource, + lastOutputPosition: lastReconciledMessage); + return inputQueue; + } + + // Get the position of our most recent + // reconciled message from this author + // XXX: For a group chat, this should find when the author + // was added to the membership so we don't just go back in time forever + Future _findNewestReconciledMessage( + {required TypedKey author}) async => + _outputCubit.operate((arr) async { + var pos = arr.length - 1; + while (pos >= 0) { + final message = await arr.getProtobuf(proto.Message.fromBuffer, pos); + if (message == null) { + throw StateError('should have gotten last message'); + } + if (message.author.toVeilid() == author) { + return OutputPosition(message, pos); + } + pos--; + } + return null; + }); + + // Find oldest message we have not yet reconciled and build a queue forward + // from that position + Future _buildAuthorInputQueue( + {required TypedKey author, + required AuthorInputSource inputSource, + required OutputPosition? lastOutputPosition}) async { + // Make an author input queue + final authorInputQueue = AuthorInputQueue( + author: author, + inputSource: inputSource, + lastOutputPosition: lastOutputPosition, + onError: _onError); + + if (!await authorInputQueue.prepareInputQueue()) { + return null; + } + + return authorInputQueue; + } + + // Process a list of author input queues and insert their messages + // into the output array, performing validation steps along the way + Future _reconcileInputQueues({ + required TableDBArray reconciledArray, + required List inputQueues, + }) async { + // Ensure queues all have something to do + inputQueues.removeWhere((q) => q.isEmpty); + if (inputQueues.isEmpty) { + return; + } + + // Sort queues from earliest to latest and then by author + // to ensure a deterministic insert order + inputQueues.sort((a, b) { + final acmp = a.lastOutputPosition?.pos ?? -1; + final bcmp = b.lastOutputPosition?.pos ?? -1; + if (acmp == bcmp) { + return a.author.toString().compareTo(b.author.toString()); + } + return acmp.compareTo(bcmp); + }); + + // Start at the earliest position we know about in all the queues + final firstOutputPos = inputQueues.first.lastOutputPosition?.pos; + // Get the timestamp for this output position + var currentOutputMessage = firstOutputPos == null + ? null + : await reconciledArray.getProtobuf( + proto.Message.fromBuffer, firstOutputPos); + + var currentOutputPos = firstOutputPos ?? 0; + + final toInsert = + SortedList(proto.MessageExt.compareTimestamp); + + while (inputQueues.isNotEmpty) { + // Get up to '_maxReconcileChunk' of the items from the queues + // that we can insert at this location + + bool added; + do { + added = false; + var someQueueEmpty = false; + for (final inputQueue in inputQueues) { + final inputCurrent = inputQueue.current!; + if (currentOutputMessage == null || + inputCurrent.timestamp <= currentOutputMessage.timestamp) { + toInsert.add(inputCurrent); + added = true; + + // Advance this queue + if (!inputQueue.consume()) { + // Queue is empty now, run a queue purge + someQueueEmpty = true; + } + } + } + // Remove empty queues now that we're done iterating + if (someQueueEmpty) { + inputQueues.removeWhere((q) => q.isEmpty); + } + + if (toInsert.length >= _maxReconcileChunk) { + break; + } + } while (added); + + // Perform insertions in bulk + if (toInsert.isNotEmpty) { + await reconciledArray.insertAllProtobuf(currentOutputPos, toInsert); + toInsert.clear(); + } else { + // If there's nothing to insert at this position move to the next one + currentOutputPos++; + currentOutputMessage = await reconciledArray.getProtobuf( + proto.Message.fromBuffer, currentOutputPos); + } + } + } + + //////////////////////////////////////////////////////////////////////////// + + Map _inputSources = {}; + final TableDBArrayCubit _outputCubit; + final void Function(Object, StackTrace?) _onError; + + static const int _maxReconcileChunk = 65536; +} diff --git a/lib/chat/cubits/reconciliation/output_position.dart b/lib/chat/cubits/reconciliation/output_position.dart new file mode 100644 index 0000000..258259e --- /dev/null +++ b/lib/chat/cubits/reconciliation/output_position.dart @@ -0,0 +1,13 @@ +import 'package:equatable/equatable.dart'; +import 'package:meta/meta.dart'; + +import '../../../proto/proto.dart' as proto; + +@immutable +class OutputPosition extends Equatable { + const OutputPosition(this.message, this.pos); + final proto.Message message; + final int pos; + @override + List get props => [message, pos]; +} diff --git a/lib/chat/cubits/reconciliation/reconciliation.dart b/lib/chat/cubits/reconciliation/reconciliation.dart new file mode 100644 index 0000000..2dc0b93 --- /dev/null +++ b/lib/chat/cubits/reconciliation/reconciliation.dart @@ -0,0 +1 @@ +export 'message_reconciliation.dart'; diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index ff21d43..6c4fd8f 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -1,28 +1,16 @@ import 'dart:async'; -import 'dart:collection'; import 'dart:convert'; import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; -import 'package:equatable/equatable.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:fixnum/fixnum.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; -import 'package:meta/meta.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; import '../../proto/proto.dart' as proto; import '../models/models.dart'; - -@immutable -class MessagePosition extends Equatable { - const MessagePosition(this.message, this.pos); - final proto.Message message; - final int pos; - @override - List get props => [message, pos]; -} +import 'message_reconciliation.dart'; class RenderStateElement { RenderStateElement( @@ -165,6 +153,13 @@ class SingleContactMessagesCubit extends Cubit { _reconciledMessagesCubit = TableDBArrayCubit( open: () async => TableDBArray.make(table: tableName, crypto: crypto), decodeElement: proto.ReconciledMessage.fromBuffer); + + _reconciliation = MessageReconciliation( + output: _reconciledMessagesCubit!, + onError: (e, st) { + emit(AsyncValue.error(e, st)); + }); + _reconciledSubscription = _reconciledMessagesCubit!.stream.listen(_updateReconciledMessagesState); _updateReconciledMessagesState(_reconciledMessagesCubit!.state); @@ -203,7 +198,7 @@ class SingleContactMessagesCubit extends Cubit { return; } - _reconcileMessages( + _reconciliation.reconcileMessages( _activeAccountInfo.localAccount.identityMaster.identityPublicTypedKey(), sentMessages, _sentMessagesCubit!); @@ -216,7 +211,7 @@ class SingleContactMessagesCubit extends Cubit { return; } - _reconcileMessages( + _reconciliation.reconcileMessages( _remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!); } @@ -246,6 +241,12 @@ class SingleContactMessagesCubit extends Cubit { message.signature = signature.toProto(); } + Future _generateInitialId( + {required PublicKey identityPublicKey}) async => + (await _localMessagesCryptoSystem + .generateHash(identityPublicKey.decode())) + .decode(); + Future _processMessageToSend( proto.Message message, proto.Message? previousMessage) async { // Get the previous message if we don't have one @@ -257,10 +258,9 @@ class SingleContactMessagesCubit extends Cubit { if (previousMessage == null) { // If there's no last sent message, // we start at a hash of the identity public key - message.id = (await _localMessagesCryptoSystem.generateHash( - _activeAccountInfo.localAccount.identityMaster.identityPublicKey - .decode())) - .decode(); + message.id = await _generateInitialId( + identityPublicKey: + _activeAccountInfo.localAccount.identityMaster.identityPublicKey); } else { // If there is a last message, we generate the hash // of the last message's signature and use it as our next id @@ -285,177 +285,6 @@ class SingleContactMessagesCubit extends Cubit { writer.tryAddAll(messages.map((m) => m.writeToBuffer()).toList())); } - void _reconcileMessages( - TypedKey author, - DHTLogStateData inputMessages, - DHTLogCubit inputMessagesCubit) { - singleFuture(_reconciledMessagesCubit!, () async { - // Get the position of our most recent - // reconciled message from this author - // XXX: For a group chat, this should find when the author - // was added to the membership so we don't just go back in time forever - final lastReconciledMessage = - await _reconciledMessagesCubit!.operate((arr) async { - var pos = arr.length - 1; - while (pos >= 0) { - final message = await arr.getProtobuf(proto.Message.fromBuffer, pos); - if (message == null) { - throw StateError('should have gotten last message'); - } - if (message.author.toVeilid() == author) { - return MessagePosition(message, pos); - } - pos--; - } - return null; - }); - - // Find oldest message we have not yet reconciled - final toReconcile = ListQueue(); - - // Go through batches of the input dhtlog starting with - // the current cubit state which is at the tail of the log - // Find the last reconciled message for this author - var currentInputPos = inputMessages.tail; - var currentInputElements = inputMessages.elements; - final inputBatchCount = inputMessages.count; - outer: - while (true) { - for (var rn = currentInputElements.length; - rn >= 0 && currentInputPos >= 0; - rn--, currentInputPos--) { - final elem = currentInputElements[rn]; - - // If we've found an input element that is older than our last - // reconciled message for this author, then we stop - if (lastReconciledMessage != null) { - if (elem.value.timestamp < - lastReconciledMessage.message.timestamp) { - break outer; - } - } - - // Drop the 'offline' elements because we don't reconcile - // anything until it has been confirmed to be committed to the DHT - if (elem.isOffline) { - continue; - } - - // Add to head of reconciliation queue - toReconcile.addFirst(elem.value); - if (toReconcile.length > _maxReconcileChunk) { - toReconcile.removeLast(); - } - } - if (currentInputPos < 0) { - break; - } - - // Get another input batch futher back - final nextInputBatch = await inputMessagesCubit.loadElements( - currentInputPos, inputBatchCount); - final asErr = nextInputBatch.asError; - if (asErr != null) { - emit(AsyncValue.error(asErr.error, asErr.stackTrace)); - return; - } - final asLoading = nextInputBatch.asLoading; - if (asLoading != null) { - // xxx: no need to block the cubit here for this - // xxx: might want to switch to a 'busy' state though - // xxx: to let the messages view show a spinner at the bottom - // xxx: while we reconcile... - // emit(const AsyncValue.loading()); - return; - } - currentInputElements = nextInputBatch.asData!.value; - } - - // Now iterate from our current input position in batches - // and reconcile the messages in the forward direction - var insertPosition = - (lastReconciledMessage != null) ? lastReconciledMessage.pos : 0; - var lastInsertTime = (lastReconciledMessage != null) - ? lastReconciledMessage.message.timestamp - : Int64.ZERO; - - // Insert this batch - xxx expand upon 'res' and iterate batches and update insert position/time - final res = await _reconciledMessagesCubit!.operate((arr) async => - _reconcileMessagesInner( - reconciledArray: arr, - toReconcile: toReconcile, - insertPosition: insertPosition, - lastInsertTime: lastInsertTime)); - - // Update the view - _renderState(); - }); - } - - Future _reconcileMessagesInner( - {required TableDBArray reconciledArray, - required Iterable toReconcile, - required int insertPosition, - required Int64 lastInsertTime}) async { - // // Ensure remoteMessages is sorted by timestamp - // final newMessages = messages - // .sort((a, b) => a.timestamp.compareTo(b.timestamp)) - // .removeDuplicates(); - - // // Existing messages will always be sorted by timestamp so merging is easy - // final existingMessages = await reconciledMessagesWriter - // .getItemRangeProtobuf(proto.Message.fromBuffer, 0); - // if (existingMessages == null) { - // throw Exception( - // 'Could not load existing reconciled messages at this time'); - // } - - // var ePos = 0; - // var nPos = 0; - // while (ePos < existingMessages.length && nPos < newMessages.length) { - // final existingMessage = existingMessages[ePos]; - // final newMessage = newMessages[nPos]; - - // // If timestamp to insert is less than - // // the current position, insert it here - // final newTs = Timestamp.fromInt64(newMessage.timestamp); - // final existingTs = Timestamp.fromInt64(existingMessage.timestamp); - // final cmp = newTs.compareTo(existingTs); - // if (cmp < 0) { - // // New message belongs here - - // // Insert into dht backing array - // await reconciledMessagesWriter.tryInsertItem( - // ePos, newMessage.writeToBuffer()); - // // Insert into local copy as well for this operation - // existingMessages.insert(ePos, newMessage); - - // // Next message - // nPos++; - // ePos++; - // } else if (cmp == 0) { - // // Duplicate, skip - // nPos++; - // ePos++; - // } else if (cmp > 0) { - // // New message belongs later - // ePos++; - // } - // } - // // If there are any new messages left, append them all - // while (nPos < newMessages.length) { - // final newMessage = newMessages[nPos]; - - // // Append to dht backing array - // await reconciledMessagesWriter.tryAddItem(newMessage.writeToBuffer()); - // // Insert into local copy as well for this operation - // existingMessages.add(newMessage); - - // nPos++; - // } - } - // Produce a state for this cubit from the input cubits and queues void _renderState() { // Get all reconciled messages @@ -584,12 +413,12 @@ class SingleContactMessagesCubit extends Cubit { DHTLogCubit? _rcvdMessagesCubit; TableDBArrayCubit? _reconciledMessagesCubit; + late final MessageReconciliation _reconciliation; + late final PersistentQueue _sendingMessagesQueue; StreamSubscription>? _sentSubscription; StreamSubscription>? _rcvdSubscription; StreamSubscription>? _reconciledSubscription; - - static const int _maxReconcileChunk = 65536; } diff --git a/lib/proto/extensions.dart b/lib/proto/extensions.dart index e9fd9a2..1da55f9 100644 --- a/lib/proto/extensions.dart +++ b/lib/proto/extensions.dart @@ -23,4 +23,7 @@ extension MessageExt on proto.Message { } String get uniqueIdString => base64UrlNoPadEncode(uniqueIdBytes); + + static int compareTimestamp(proto.Message a, proto.Message b) => + a.timestamp.compareTo(b.timestamp); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index acdc6fe..985b11f 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -209,8 +209,7 @@ class DHTLog implements DHTDeleteable { OwnedDHTRecordPointer get recordPointer => _spine.recordPointer; /// Runs a closure allowing read-only access to the log - Future operate( - Future Function(DHTLogReadOperations) closure) async { + Future operate(Future Function(DHTLogReadOperations) closure) async { if (!isOpen) { throw StateError('log is not open"'); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart index 010c76e..2f97b3f 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -108,6 +108,7 @@ class DHTLogCubit extends Cubit> elements: elements, tail: _tail, count: _count, follow: _follow))); } + // Tail is one past the last element to load Future>>> loadElements( int tail, int count, {bool forceRefresh = false}) async { @@ -184,8 +185,7 @@ class DHTLogCubit extends Cubit> await super.close(); } - Future operate( - Future Function(DHTLogReadOperations) closure) async { + Future operate(Future Function(DHTLogReadOperations) closure) async { await _initWait(); return _log.operate(closure); } diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index 4d5b9dd..8bcd146 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -662,4 +662,46 @@ extension TableDBArrayExt on TableDBArray { T Function(List) fromBuffer, int start, [int? end]) => getRange(start, end ?? _length) .then((out) => out.map(fromBuffer).toList()); + + /// Convenience function: + /// Like add but for a JSON value + Future addJson(T value) async => add(jsonEncodeBytes(value)); + + /// Convenience function: + /// Like add but for a Protobuf value + Future addProtobuf(T value) => + add(value.writeToBuffer()); + + /// Convenience function: + /// Like addAll but for a JSON value + Future addAllJson(List values) async => + addAll(values.map(jsonEncodeBytes).toList()); + + /// Convenience function: + /// Like addAll but for a Protobuf value + Future addAllProtobuf( + List values) async => + addAll(values.map((x) => x.writeToBuffer()).toList()); + + /// Convenience function: + /// Like insert but for a JSON value + Future insertJson(int pos, T value) async => + insert(pos, jsonEncodeBytes(value)); + + /// Convenience function: + /// Like insert but for a Protobuf value + Future insertProtobuf( + int pos, T value) async => + insert(pos, value.writeToBuffer()); + + /// Convenience function: + /// Like insertAll but for a JSON value + Future insertAllJson(int pos, List values) async => + insertAll(pos, values.map(jsonEncodeBytes).toList()); + + /// Convenience function: + /// Like insertAll but for a Protobuf value + Future insertAllProtobuf( + int pos, List values) async => + insertAll(pos, values.map((x) => x.writeToBuffer()).toList()); } diff --git a/pubspec.lock b/pubspec.lock index c6e754b..8a70f22 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -1219,6 +1219,15 @@ packages: url: "https://pub.dev" source: hosted version: "2.0.0" + sorted_list: + dependency: "direct main" + description: + path: "." + ref: main + resolved-ref: "090eb9be48ab85ff064a0a1d8175b4a72d79b139" + url: "https://gitlab.com/veilid/dart-sorted-list-improved.git" + source: git + version: "1.0.0" source_gen: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index 1cc893f..133d482 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -69,6 +69,10 @@ dependencies: share_plus: ^9.0.0 shared_preferences: ^2.2.3 signal_strength_indicator: ^0.4.1 + sorted_list: + git: + url: https://gitlab.com/veilid/dart-sorted-list-improved.git + ref: main split_view: ^3.2.1 stack_trace: ^1.11.1 stream_transform: ^2.1.0