From fd63a0d5e01dc48bffec66fb67670afd84a40aee Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 31 May 2024 18:27:50 -0400 Subject: [PATCH] message integrity --- .../reconciliation/author_input_queue.dart | 268 ++++++++++-------- .../reconciliation/author_input_source.dart | 76 ++++- .../reconciliation/message_integrity.dart | 74 +++++ .../message_reconciliation.dart | 51 ++-- .../cubits/reconciliation/reconciliation.dart | 1 + .../cubits/single_contact_messages_cubit.dart | 71 ++--- lib/chat/views/chat_component.dart | 2 +- lib/proto/extensions.dart | 6 +- .../src/dht_log/dht_log_cubit.dart | 45 ++- .../lib/src/online_element_state.dart | 12 + .../veilid_support/lib/veilid_support.dart | 1 + 11 files changed, 370 insertions(+), 237 deletions(-) create mode 100644 lib/chat/cubits/reconciliation/message_integrity.dart create mode 100644 packages/veilid_support/lib/src/online_element_state.dart diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart index b441e75..b9fd7d7 100644 --- a/lib/chat/cubits/reconciliation/author_input_queue.dart +++ b/lib/chat/cubits/reconciliation/author_input_queue.dart @@ -1,145 +1,191 @@ import 'dart:async'; -import 'dart:collection'; -import 'dart:math'; - -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; import 'author_input_source.dart'; +import 'message_integrity.dart'; import 'output_position.dart'; class AuthorInputQueue { - AuthorInputQueue({ - required this.author, - required this.inputSource, - required this.lastOutputPosition, - required this.onError, - }): - assert(inputSource.messages.count>0, 'no input source window length'), - assert(inputSource.messages.elements.isNotEmpty, 'no input source elements'), - assert(inputSource.messages.tail >= inputSource.messages.elements.length, 'tail is before initial messages end'), - assert(inputSource.messages.tail > 0, 'tail is not greater than zero'), - currentPosition = inputSource.messages.tail, - currentWindow = inputSource.messages.elements, - windowLength = inputSource.messages.count, - windowFirst = inputSource.messages.tail - inputSource.messages.elements.length, - windowLast = inputSource.messages.tail - 1; + AuthorInputQueue._({ + required TypedKey author, + required AuthorInputSource inputSource, + required OutputPosition? outputPosition, + required void Function(Object, StackTrace?) onError, + required MessageIntegrity messageIntegrity, + }) : _author = author, + _onError = onError, + _inputSource = inputSource, + _outputPosition = outputPosition, + _lastMessage = outputPosition?.message, + _messageIntegrity = messageIntegrity, + _currentPosition = inputSource.currentWindow.last; - //////////////////////////////////////////////////////////////////////////// - - bool get isEmpty => toReconcile.isEmpty; - - proto.Message? get current => toReconcile.firstOrNull; - - bool consume() { - toReconcile.removeFirst(); - return toReconcile.isNotEmpty; + static Future create({ + required TypedKey author, + required AuthorInputSource inputSource, + required OutputPosition? outputPosition, + required void Function(Object, StackTrace?) onError, + }) async { + final queue = AuthorInputQueue._( + author: author, + inputSource: inputSource, + outputPosition: outputPosition, + onError: onError, + messageIntegrity: await MessageIntegrity.create(author: author)); + if (!await queue._findStartOfWork()) { + return null; + } + return queue; } - Future prepareInputQueue() async { - // Go through batches of the input dhtlog starting with - // the current cubit state which is at the tail of the log - // Find the last reconciled message for this author + //////////////////////////////////////////////////////////////////////////// + // Public interface - outer: + // Check if there are no messages in this queue to reconcile + bool get isEmpty => _currentMessage == null; + + // Get the current message that needs reconciliation + proto.Message? get current => _currentMessage; + + // Get the earliest output position to start inserting + OutputPosition? get outputPosition => _outputPosition; + + // Get the author of this queue + TypedKey get author => _author; + + // Remove a reconciled message and move to the next message + // Returns true if there is more work to do + Future consume() async { while (true) { - for (var rn = currentWindow.length; - rn >= 0 && currentPosition >= 0; - rn--, currentPosition--) { - final elem = currentWindow[rn]; + _lastMessage = _currentMessage; - // If we've found an input element that is older than our last - // reconciled message for this author, then we stop - if (lastOutputPosition != null) { - if (elem.value.timestamp < lastOutputPosition!.message.timestamp) { - break outer; - } - } + _currentPosition++; - // Drop the 'offline' elements because we don't reconcile - // anything until it has been confirmed to be committed to the DHT - if (elem.isOffline) { + // Get more window if we need to + if (!await _updateWindow()) { + // Window is not available so this queue can't work right now + return false; + } + final nextMessage = _inputSource.currentWindow + .elements[_currentPosition - _inputSource.currentWindow.first]; + + // Drop the 'offline' elements because we don't reconcile + // anything until it has been confirmed to be committed to the DHT + if (nextMessage.isOffline) { + continue; + } + + if (_lastMessage != null) { + // Ensure the timestamp is not moving backward + if (nextMessage.value.timestamp < _lastMessage!.timestamp) { continue; } - - // Add to head of reconciliation queue - toReconcile.addFirst(elem.value); - if (toReconcile.length > _maxQueueChunk) { - toReconcile.removeLast(); - } - } - if (currentPosition < 0) { - break; } - xxx update window here and make this and other methods work + // Verify the id chain for the message + final matchId = await _messageIntegrity.generateMessageId(_lastMessage); + if (matchId.compare(nextMessage.value.idBytes) != 0) { + continue; + } + + // Verify the signature for the message + if (!await _messageIntegrity.verifyMessage(nextMessage.value)) { + continue; + } + + _currentMessage = nextMessage.value; + break; } return true; } + //////////////////////////////////////////////////////////////////////////// + // Internal implementation + + // Walk backward from the tail of the input queue to find the first + // message newer than our last reconcicled message from this author + // Returns false if no work is needed + Future _findStartOfWork() async { + // Iterate windows over the inputSource + outer: + while (true) { + // Iterate through current window backward + for (var i = _inputSource.currentWindow.elements.length; + i >= 0 && _currentPosition >= 0; + i--, _currentPosition--) { + final elem = _inputSource.currentWindow.elements[i]; + + // If we've found an input element that is older than our last + // reconciled message for this author, then we stop + if (_lastMessage != null) { + if (elem.value.timestamp < _lastMessage!.timestamp) { + break outer; + } + } + } + // If we're at the beginning of the inputSource then we stop + if (_currentPosition < 0) { + break; + } + + // Get more window if we need to + if (!await _updateWindow()) { + // Window is not available or things are empty so this + // queue can't work right now + return false; + } + } + + // The current position should be equal to the first message to process + // and the current window to process should not be empty + return _inputSource.currentWindow.elements.isNotEmpty; + } + // Slide the window toward the current position and load the batch around it - Future updateWindow() async { - - // Check if we are still in the window - if (currentPosition>=windowFirst && currentPosition <= windowLast) { - return true; - } - - // Get the length of the cubit - final inputLength = await inputSource.cubit.operate((r) async => r.length); - - // If not, slide the window - if (currentPosition _updateWindow() async { + // Check if we are still in the window + if (_currentPosition >= _inputSource.currentWindow.first && + _currentPosition <= _inputSource.currentWindow.last) { return true; + } + + // Get another input batch futher back + final avOk = + await _inputSource.updateWindow(_currentPosition, _maxWindowLength); + + final asErr = avOk.asError; + if (asErr != null) { + _onError(asErr.error, asErr.stackTrace); + return false; + } + final asLoading = avOk.asLoading; + if (asLoading != null) { + // xxx: no need to block the cubit here for this + // xxx: might want to switch to a 'busy' state though + // xxx: to let the messages view show a spinner at the bottom + // xxx: while we reconcile... + // emit(const AsyncValue.loading()); + return false; + } + return avOk.asData!.value; } //////////////////////////////////////////////////////////////////////////// - final TypedKey author; - final ListQueue toReconcile = ListQueue(); - final AuthorInputSource inputSource; - final OutputPosition? lastOutputPosition; - final void Function(Object, StackTrace?) onError; + final TypedKey _author; + final AuthorInputSource _inputSource; + final OutputPosition? _outputPosition; + final void Function(Object, StackTrace?) _onError; + final MessageIntegrity _messageIntegrity; + // The last message we've consumed + proto.Message? _lastMessage; // The current position in the input log that we are looking at - int currentPosition; - // The current input window elements - IList> currentWindow; - // The first position of the sliding input window - int windowFirst; - // The last position of the sliding input window - int windowLast; + int _currentPosition; + // The current message we're looking at + proto.Message? _currentMessage; // Desired maximum window length - int windowLength; - - static const int _maxQueueChunk = 256; + static const int _maxWindowLength = 256; } diff --git a/lib/chat/cubits/reconciliation/author_input_source.dart b/lib/chat/cubits/reconciliation/author_input_source.dart index 75f020e..1f67264 100644 --- a/lib/chat/cubits/reconciliation/author_input_source.dart +++ b/lib/chat/cubits/reconciliation/author_input_source.dart @@ -1,10 +1,76 @@ +import 'dart:math'; + +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:meta/meta.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; -class AuthorInputSource { - AuthorInputSource({required this.messages, required this.cubit}); - - final DHTLogStateData messages; - final DHTLogCubit cubit; +@immutable +class InputWindow { + const InputWindow( + {required this.elements, required this.first, required this.last}); + final IList> elements; + final int first; + final int last; +} + +class AuthorInputSource { + AuthorInputSource.fromCubit( + {required DHTLogStateData cubitState, + required this.cubit}) { + _currentWindow = InputWindow( + elements: cubitState.elements, + first: cubitState.tail - cubitState.elements.length, + last: cubitState.tail - 1); + } + + //////////////////////////////////////////////////////////////////////////// + + InputWindow get currentWindow => _currentWindow; + + Future> updateWindow( + int currentPosition, int windowLength) async => + cubit.operate((reader) async { + // See if we're beyond the input source + if (currentPosition < 0 || currentPosition >= reader.length) { + return const AsyncValue.data(false); + } + + // Slide the window if we need to + var first = _currentWindow.first; + var last = _currentWindow.last; + if (currentPosition < first) { + // Slide it backward, current position is now last + first = max((currentPosition - windowLength) + 1, 0); + last = currentPosition; + } else if (currentPosition > last) { + // Slide it forward, current position is now first + first = currentPosition; + last = min((currentPosition + windowLength) - 1, reader.length - 1); + } else { + return const AsyncValue.data(true); + } + + // Get another input batch futher back + final nextWindow = await cubit.loadElementsFromReader( + reader, last + 1, (last + 1) - first); + final asErr = nextWindow.asError; + if (asErr != null) { + return AsyncValue.error(asErr.error, asErr.stackTrace); + } + final asLoading = nextWindow.asLoading; + if (asLoading != null) { + return const AsyncValue.loading(); + } + _currentWindow = InputWindow( + elements: nextWindow.asData!.value, first: first, last: last); + return const AsyncValue.data(true); + }); + + //////////////////////////////////////////////////////////////////////////// + final DHTLogCubit cubit; + + late InputWindow _currentWindow; } diff --git a/lib/chat/cubits/reconciliation/message_integrity.dart b/lib/chat/cubits/reconciliation/message_integrity.dart new file mode 100644 index 0000000..2fd1956 --- /dev/null +++ b/lib/chat/cubits/reconciliation/message_integrity.dart @@ -0,0 +1,74 @@ +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:protobuf/protobuf.dart'; +import 'package:veilid_support/veilid_support.dart'; +import '../../../proto/proto.dart' as proto; + +class MessageIntegrity { + MessageIntegrity._({ + required TypedKey author, + required VeilidCryptoSystem crypto, + }) : _author = author, + _crypto = crypto; + static Future create({required TypedKey author}) async { + final crypto = await Veilid.instance.getCryptoSystem(author.kind); + return MessageIntegrity._(author: author, crypto: crypto); + } + + //////////////////////////////////////////////////////////////////////////// + // Public interface + + Future generateMessageId(proto.Message? previous) async { + if (previous == null) { + // If there's no last sent message, + // we start at a hash of the identity public key + return _generateInitialId(); + } else { + // If there is a last message, we generate the hash + // of the last message's signature and use it as our next id + return _hashSignature(previous.signature); + } + } + + Future signMessage( + proto.Message message, + SecretKey authorSecret, + ) async { + // Ensure this message is not already signed + assert(!message.hasSignature(), 'should not sign message twice'); + // Generate data to sign + final data = Uint8List.fromList(utf8.encode(message.writeToJson())); + + // Sign with our identity + final signature = await _crypto.sign(_author.value, authorSecret, data); + + // Add to the message + message.signature = signature.toProto(); + } + + Future verifyMessage(proto.Message message) async { + // Ensure the message is signed + assert(message.hasSignature(), 'should not verify unsigned message'); + final signature = message.signature.toVeilid(); + + // Generate data to sign + final messageNoSig = message.deepCopy()..clearSignature(); + final data = Uint8List.fromList(utf8.encode(messageNoSig.writeToJson())); + + // Verify signature + return _crypto.verify(_author.value, data, signature); + } + + //////////////////////////////////////////////////////////////////////////// + // Private implementation + + Future _generateInitialId() async => + (await _crypto.generateHash(_author.decode())).decode(); + + Future _hashSignature(proto.Signature signature) async => + (await _crypto.generateHash(signature.toVeilid().decode())).decode(); + //////////////////////////////////////////////////////////////////////////// + final TypedKey _author; + final VeilidCryptoSystem _crypto; +} diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart index 51cfe2b..1687f4d 100644 --- a/lib/chat/cubits/reconciliation/message_reconciliation.dart +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -21,14 +21,14 @@ class MessageReconciliation { void reconcileMessages( TypedKey author, - DHTLogStateData inputMessages, + DHTLogStateData inputMessagesCubitState, DHTLogCubit inputMessagesCubit) { - if (inputMessages.elements.isEmpty) { + if (inputMessagesCubitState.elements.isEmpty) { return; } - _inputSources[author] = - AuthorInputSource(messages: inputMessages, cubit: inputMessagesCubit); + _inputSources[author] = AuthorInputSource.fromCubit( + cubitState: inputMessagesCubitState, cubit: inputMessagesCubit); singleFuture(this, onError: _onError, () async { // Take entire list of input sources we have currently and process them @@ -63,14 +63,15 @@ class MessageReconciliation { {required TypedKey author, required AuthorInputSource inputSource}) async { // Get the position of our most recent reconciled message from this author - final lastReconciledMessage = - await _findNewestReconciledMessage(author: author); + final outputPosition = await _findLastOutputPosition(author: author); // Find oldest message we have not yet reconciled - final inputQueue = await _buildAuthorInputQueue( - author: author, - inputSource: inputSource, - lastOutputPosition: lastReconciledMessage); + final inputQueue = await AuthorInputQueue.create( + author: author, + inputSource: inputSource, + outputPosition: outputPosition, + onError: _onError, + ); return inputQueue; } @@ -78,7 +79,7 @@ class MessageReconciliation { // reconciled message from this author // XXX: For a group chat, this should find when the author // was added to the membership so we don't just go back in time forever - Future _findNewestReconciledMessage( + Future _findLastOutputPosition( {required TypedKey author}) async => _outputCubit.operate((arr) async { var pos = arr.length - 1; @@ -95,26 +96,6 @@ class MessageReconciliation { return null; }); - // Find oldest message we have not yet reconciled and build a queue forward - // from that position - Future _buildAuthorInputQueue( - {required TypedKey author, - required AuthorInputSource inputSource, - required OutputPosition? lastOutputPosition}) async { - // Make an author input queue - final authorInputQueue = AuthorInputQueue( - author: author, - inputSource: inputSource, - lastOutputPosition: lastOutputPosition, - onError: _onError); - - if (!await authorInputQueue.prepareInputQueue()) { - return null; - } - - return authorInputQueue; - } - // Process a list of author input queues and insert their messages // into the output array, performing validation steps along the way Future _reconcileInputQueues({ @@ -130,8 +111,8 @@ class MessageReconciliation { // Sort queues from earliest to latest and then by author // to ensure a deterministic insert order inputQueues.sort((a, b) { - final acmp = a.lastOutputPosition?.pos ?? -1; - final bcmp = b.lastOutputPosition?.pos ?? -1; + final acmp = a.outputPosition?.pos ?? -1; + final bcmp = b.outputPosition?.pos ?? -1; if (acmp == bcmp) { return a.author.toString().compareTo(b.author.toString()); } @@ -139,7 +120,7 @@ class MessageReconciliation { }); // Start at the earliest position we know about in all the queues - final firstOutputPos = inputQueues.first.lastOutputPosition?.pos; + final firstOutputPos = inputQueues.first.outputPosition?.pos; // Get the timestamp for this output position var currentOutputMessage = firstOutputPos == null ? null @@ -167,7 +148,7 @@ class MessageReconciliation { added = true; // Advance this queue - if (!inputQueue.consume()) { + if (!await inputQueue.consume()) { // Queue is empty now, run a queue purge someQueueEmpty = true; } diff --git a/lib/chat/cubits/reconciliation/reconciliation.dart b/lib/chat/cubits/reconciliation/reconciliation.dart index 2dc0b93..a8187cf 100644 --- a/lib/chat/cubits/reconciliation/reconciliation.dart +++ b/lib/chat/cubits/reconciliation/reconciliation.dart @@ -1 +1,2 @@ +export 'message_integrity.dart'; export 'message_reconciliation.dart'; diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 6c4fd8f..c444134 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -1,6 +1,4 @@ import 'dart:async'; -import 'dart:convert'; -import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; @@ -10,7 +8,7 @@ import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; import '../../proto/proto.dart' as proto; import '../models/models.dart'; -import 'message_reconciliation.dart'; +import 'reconciliation/reconciliation.dart'; class RenderStateElement { RenderStateElement( @@ -102,12 +100,11 @@ class SingleContactMessagesCubit extends Cubit { // Make crypto Future _initCrypto() async { - _messagesCrypto = await _activeAccountInfo + _conversationCrypto = await _activeAccountInfo .makeConversationCrypto(_remoteIdentityPublicKey); - _localMessagesCryptoSystem = - await Veilid.instance.getCryptoSystem(_localMessagesRecordKey.kind); - _identityCryptoSystem = - await _activeAccountInfo.localAccount.identityMaster.identityCrypto; + _senderMessageIntegrity = await MessageIntegrity.create( + author: _activeAccountInfo.localAccount.identityMaster + .identityPublicTypedKey()); } // Open local messages key @@ -119,7 +116,7 @@ class SingleContactMessagesCubit extends Cubit { debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::' 'SentMessages', parent: _localConversationRecordKey, - crypto: _messagesCrypto), + crypto: _conversationCrypto), decodeElement: proto.Message.fromBuffer); _sentSubscription = _sentMessagesCubit!.stream.listen(_updateSentMessagesState); @@ -133,7 +130,7 @@ class SingleContactMessagesCubit extends Cubit { debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' 'RcvdMessages', parent: _remoteConversationRecordKey, - crypto: _messagesCrypto), + crypto: _conversationCrypto), decodeElement: proto.Message.fromBuffer); _rcvdSubscription = _rcvdMessagesCubit!.stream.listen(_updateRcvdMessagesState); @@ -222,31 +219,6 @@ class SingleContactMessagesCubit extends Cubit { _renderState(); } - Future _hashSignature(proto.Signature signature) async => - (await _localMessagesCryptoSystem - .generateHash(signature.toVeilid().decode())) - .decode(); - - Future _signMessage(proto.Message message) async { - // Generate data to sign - final data = Uint8List.fromList(utf8.encode(message.writeToJson())); - - // Sign with our identity - final signature = await _identityCryptoSystem.sign( - _activeAccountInfo.localAccount.identityMaster.identityPublicKey, - _activeAccountInfo.userLogin.identitySecret.value, - data); - - // Add to the message - message.signature = signature.toProto(); - } - - Future _generateInitialId( - {required PublicKey identityPublicKey}) async => - (await _localMessagesCryptoSystem - .generateHash(identityPublicKey.decode())) - .decode(); - Future _processMessageToSend( proto.Message message, proto.Message? previousMessage) async { // Get the previous message if we don't have one @@ -255,20 +227,12 @@ class SingleContactMessagesCubit extends Cubit { ? null : await r.getProtobuf(proto.Message.fromBuffer, r.length - 1)); - if (previousMessage == null) { - // If there's no last sent message, - // we start at a hash of the identity public key - message.id = await _generateInitialId( - identityPublicKey: - _activeAccountInfo.localAccount.identityMaster.identityPublicKey); - } else { - // If there is a last message, we generate the hash - // of the last message's signature and use it as our next id - message.id = await _hashSignature(previousMessage.signature); - } + message.id = + await _senderMessageIntegrity.generateMessageId(previousMessage); // Now sign it - await _signMessage(message); + await _senderMessageIntegrity.signMessage( + message, _activeAccountInfo.userLogin.identitySecret.value); } // Async process to send messages in the background @@ -303,17 +267,17 @@ class SingleContactMessagesCubit extends Cubit { // Generate state for each message final sentMessagesMap = - IMap>.fromValues( - keyMapper: (x) => x.value.uniqueIdString, + IMap>.fromValues( + keyMapper: (x) => x.value.authorUniqueIdString, values: sentMessages.elements, ); final reconciledMessagesMap = IMap.fromValues( - keyMapper: (x) => x.content.uniqueIdString, + keyMapper: (x) => x.content.authorUniqueIdString, values: reconciledMessages.elements, ); final sendingMessagesMap = IMap.fromValues( - keyMapper: (x) => x.uniqueIdString, + keyMapper: (x) => x.authorUniqueIdString, values: sendingMessages, ); @@ -405,9 +369,8 @@ class SingleContactMessagesCubit extends Cubit { final TypedKey _remoteConversationRecordKey; final TypedKey _remoteMessagesRecordKey; - late final VeilidCrypto _messagesCrypto; - late final VeilidCryptoSystem _localMessagesCryptoSystem; - late final VeilidCryptoSystem _identityCryptoSystem; + late final VeilidCrypto _conversationCrypto; + late final MessageIntegrity _senderMessageIntegrity; DHTLogCubit? _sentMessagesCubit; DHTLogCubit? _rcvdMessagesCubit; diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index 7f549cb..1e296e9 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -127,7 +127,7 @@ class ChatComponent extends StatelessWidget { author: isLocal ? _localUser : _remoteUser, createdAt: (message.sentTimestamp.value ~/ BigInt.from(1000)).toInt(), - id: message.content.uniqueIdString, + id: message.content.authorUniqueIdString, text: contextText.text, showStatus: status != null, status: status); diff --git a/lib/proto/extensions.dart b/lib/proto/extensions.dart index 1da55f9..25b8558 100644 --- a/lib/proto/extensions.dart +++ b/lib/proto/extensions.dart @@ -16,13 +16,15 @@ Map reconciledMessageToJson(proto.ReconciledMessage m) => m.writeToJsonMap(); extension MessageExt on proto.Message { - Uint8List get uniqueIdBytes { + Uint8List get idBytes => Uint8List.fromList(id); + + Uint8List get authorUniqueIdBytes { final author = this.author.toVeilid().decode(); final id = this.id; return Uint8List.fromList([...author, ...id]); } - String get uniqueIdString => base64UrlNoPadEncode(uniqueIdBytes); + String get authorUniqueIdString => base64UrlNoPadEncode(authorUniqueIdBytes); static int compareTimestamp(proto.Message a, proto.Message b) => a.timestamp.compareTo(b.timestamp); diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart index 2f97b3f..f70a34c 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -9,16 +9,6 @@ import 'package:meta/meta.dart'; import '../../../veilid_support.dart'; -@immutable -class DHTLogElementState extends Equatable { - const DHTLogElementState({required this.value, required this.isOffline}); - final T value; - final bool isOffline; - - @override - List get props => [value, isOffline]; -} - @immutable class DHTLogStateData extends Equatable { const DHTLogStateData( @@ -28,7 +18,7 @@ class DHTLogStateData extends Equatable { required this.follow}); // The view of the elements in the dhtlog // Span is from [tail-length, tail) - final IList> elements; + final IList> elements; // One past the end of the last element final int tail; // The total number of elements to try to keep in 'elements' @@ -92,7 +82,8 @@ class DHTLogCubit extends Cubit> Future _refreshInner(void Function(AsyncValue>) emit, {bool forceRefresh = false}) async { - final avElements = await loadElements(_tail, _count); + final avElements = await operate( + (reader) => loadElementsFromReader(reader, _tail, _count)); final err = avElements.asError; if (err != null) { emit(AsyncValue.error(err.error, err.stackTrace)); @@ -109,26 +100,22 @@ class DHTLogCubit extends Cubit> } // Tail is one past the last element to load - Future>>> loadElements( - int tail, int count, + Future>>> loadElementsFromReader( + DHTLogReadOperations reader, int tail, int count, {bool forceRefresh = false}) async { - await _initWait(); try { - final allItems = await _log.operate((reader) async { - final length = reader.length; - final end = ((tail - 1) % length) + 1; - final start = (count < end) ? end - count : 0; + final length = reader.length; + final end = ((tail - 1) % length) + 1; + final start = (count < end) ? end - count : 0; - final offlinePositions = await reader.getOfflinePositions(); - final allItems = (await reader.getRange(start, - length: end - start, forceRefresh: forceRefresh)) - ?.indexed - .map((x) => DHTLogElementState( - value: _decodeElement(x.$2), - isOffline: offlinePositions.contains(x.$1))) - .toIList(); - return allItems; - }); + final offlinePositions = await reader.getOfflinePositions(); + final allItems = (await reader.getRange(start, + length: end - start, forceRefresh: forceRefresh)) + ?.indexed + .map((x) => OnlineElementState( + value: _decodeElement(x.$2), + isOffline: offlinePositions.contains(x.$1))) + .toIList(); if (allItems == null) { return const AsyncValue.loading(); } diff --git a/packages/veilid_support/lib/src/online_element_state.dart b/packages/veilid_support/lib/src/online_element_state.dart new file mode 100644 index 0000000..8cbd38b --- /dev/null +++ b/packages/veilid_support/lib/src/online_element_state.dart @@ -0,0 +1,12 @@ +import 'package:equatable/equatable.dart'; +import 'package:meta/meta.dart'; + +@immutable +class OnlineElementState extends Equatable { + const OnlineElementState({required this.value, required this.isOffline}); + final T value; + final bool isOffline; + + @override + List get props => [value, isOffline]; +} diff --git a/packages/veilid_support/lib/veilid_support.dart b/packages/veilid_support/lib/veilid_support.dart index 42aa839..0f19bf3 100644 --- a/packages/veilid_support/lib/veilid_support.dart +++ b/packages/veilid_support/lib/veilid_support.dart @@ -10,6 +10,7 @@ export 'src/config.dart'; export 'src/identity.dart'; export 'src/json_tools.dart'; export 'src/memory_tools.dart'; +export 'src/online_element_state.dart'; export 'src/output.dart'; export 'src/persistent_queue.dart'; export 'src/protobuf_tools.dart';