diff --git a/lib/chat/cubits/messages_cubit.dart b/lib/chat/cubits/messages_cubit.dart index 1e04641..ee09893 100644 --- a/lib/chat/cubits/messages_cubit.dart +++ b/lib/chat/cubits/messages_cubit.dart @@ -10,9 +10,7 @@ import '../../account_manager/account_manager.dart'; import '../../proto/proto.dart' as proto; class _MessageQueueEntry { - _MessageQueueEntry( - {required this.localMessages, required this.remoteMessages}); - IList localMessages; + _MessageQueueEntry({required this.remoteMessages}); IList remoteMessages; } @@ -60,74 +58,10 @@ class MessagesCubit extends Cubit { await super.close(); } - void updateLocalMessagesState( - BlocBusyState>> avmessages) { - // Updated local messages from online just update the state immediately - emit(avmessages.state); - } - - Future _updateRemoteMessagesStateAsync(_MessageQueueEntry entry) async { - // Updated remote messages need to be merged with the local messages state - - // Ensure remoteMessages is sorted by timestamp - final remoteMessages = - entry.remoteMessages.sort((a, b) => a.timestamp.compareTo(b.timestamp)); - - // Existing messages will always be sorted by timestamp so merging is easy - var localMessages = entry.localMessages; - var pos = 0; - for (final newMessage in remoteMessages) { - var skip = false; - while (pos < localMessages.length) { - final m = localMessages[pos]; - - // If timestamp to insert is less than - // the current position, insert it here - final newTs = Timestamp.fromInt64(newMessage.timestamp); - final curTs = Timestamp.fromInt64(m.timestamp); - final cmp = newTs.compareTo(curTs); - if (cmp < 0) { - break; - } else if (cmp == 0) { - skip = true; - break; - } - pos++; - } - // Insert at this position - if (!skip) { - // Insert into dht backing array - await _localMessagesCubit!.operate((shortArray) => - shortArray.tryInsertItem(pos, newMessage.writeToBuffer())); - // Insert into local copy as well for this operation - localMessages = localMessages.insert(pos, newMessage); - } - } - } - - void updateRemoteMessagesState( - BlocBusyState>> avmessages) { - final remoteMessages = avmessages.state.data?.value; - if (remoteMessages == null) { - return; - } - - final localMessages = state.data?.value; - if (localMessages == null) { - // No local messages means remote messages - // are all we have so merging is easy - emit(AsyncValue.data(remoteMessages)); - return; - } - - _remoteMessagesQueue.add(_MessageQueueEntry( - localMessages: localMessages, remoteMessages: remoteMessages)); - } - // Open local messages key Future _initLocalMessages(TypedKey localConversationRecordKey, TypedKey localMessagesRecordKey) async { - final crypto = await getMessagesCrypto(); + final crypto = await _getMessagesCrypto(); final writer = _activeAccountInfo.conversationWriter; _localMessagesCubit = DHTShortArrayCubit( @@ -136,21 +70,87 @@ class MessagesCubit extends Cubit { parent: localConversationRecordKey, crypto: crypto), decodeElement: proto.Message.fromBuffer); _localSubscription = - _localMessagesCubit!.stream.listen(updateLocalMessagesState); + _localMessagesCubit!.stream.listen(_updateLocalMessagesState); + _updateLocalMessagesState(_localMessagesCubit!.state); } // Open remote messages key Future _initRemoteMessages(TypedKey remoteConversationRecordKey, TypedKey remoteMessagesRecordKey) async { // Open remote record key if it is specified - final crypto = await getMessagesCrypto(); + final crypto = await _getMessagesCrypto(); _remoteMessagesCubit = DHTShortArrayCubit( open: () async => DHTShortArray.openRead(remoteMessagesRecordKey, parent: remoteConversationRecordKey, crypto: crypto), decodeElement: proto.Message.fromBuffer); _remoteSubscription = - _remoteMessagesCubit!.stream.listen(updateRemoteMessagesState); + _remoteMessagesCubit!.stream.listen(_updateRemoteMessagesState); + _updateRemoteMessagesState(_remoteMessagesCubit!.state); + } + + // Called when the local messages list gets a change + void _updateLocalMessagesState( + BlocBusyState>> avmessages) { + // When local messages are updated, pass this + // directly to the messages cubit state + emit(avmessages.state); + } + + // Called when the remote messages list gets a change + void _updateRemoteMessagesState( + BlocBusyState>> avmessages) { + final remoteMessages = avmessages.state.data?.value; + if (remoteMessages == null) { + return; + } + // Add remote messages updates to queue to process asynchronously + _remoteMessagesQueue + .add(_MessageQueueEntry(remoteMessages: remoteMessages)); + } + + Future _updateRemoteMessagesStateAsync(_MessageQueueEntry entry) async { + final localMessagesCubit = _localMessagesCubit!; + + // Updated remote messages need to be merged with the local messages state + await localMessagesCubit.operate((shortArray) async { + // Ensure remoteMessages is sorted by timestamp + final remoteMessages = entry.remoteMessages + .sort((a, b) => a.timestamp.compareTo(b.timestamp)); + + // dedup? build local timestamp set? + + // Existing messages will always be sorted by timestamp so merging is easy + var localMessages = localMessagesCubit.state.state.data!.value; + + var pos = 0; + for (final newMessage in remoteMessages) { + var skip = false; + while (pos < localMessages.length) { + final m = localMessages[pos]; + pos++; + + // If timestamp to insert is less than + // the current position, insert it here + final newTs = Timestamp.fromInt64(newMessage.timestamp); + final curTs = Timestamp.fromInt64(m.timestamp); + final cmp = newTs.compareTo(curTs); + if (cmp < 0) { + break; + } else if (cmp == 0) { + skip = true; + break; + } + } + // Insert at this position + if (!skip) { + // Insert into dht backing array + await shortArray.tryInsertItem(pos, newMessage.writeToBuffer()); + // Insert into local copy as well for this operation + localMessages = localMessages.insert(pos, newMessage); + } + } + }); } // Initialize local messages @@ -187,7 +187,7 @@ class MessagesCubit extends Cubit { (shortArray) => shortArray.tryAddItem(message.writeToBuffer())); } - Future getMessagesCrypto() async { + Future _getMessagesCrypto() async { var messagesCrypto = _messagesCrypto; if (messagesCrypto != null) { return messagesCrypto; diff --git a/lib/settings/preferences_repository.dart b/lib/settings/preferences_repository.dart index d7b8f48..03f73ba 100644 --- a/lib/settings/preferences_repository.dart +++ b/lib/settings/preferences_repository.dart @@ -20,9 +20,11 @@ class PreferencesRepository { Future init() async { final sharedPreferences = await SharedPreferences.getInstance(); + // ignore: do_not_use_environment + const namespace = String.fromEnvironment('NAMESPACE'); _data = SharedPreferencesValue( sharedPreferences: sharedPreferences, - keyName: 'preferences', + keyName: namespace.isEmpty ? 'preferences' : 'preferences_$namespace', valueFromJson: (obj) => obj != null ? Preferences.fromJson(obj) : Preferences.defaults, valueToJson: (val) => val.toJson()); diff --git a/lib/tools/state_logger.dart b/lib/tools/state_logger.dart index 0f27504..60c274e 100644 --- a/lib/tools/state_logger.dart +++ b/lib/tools/state_logger.dart @@ -3,7 +3,8 @@ import 'package:loggy/loggy.dart'; import 'loggy.dart'; const Map _blocChangeLogLevels = { - 'ConnectionStateCubit': LogLevel.off + 'ConnectionStateCubit': LogLevel.off, + 'ActiveConversationMessagesBlocMapCubit': LogLevel.off }; const Map _blocCreateCloseLogLevels = {}; const Map _blocErrorLogLevels = {}; diff --git a/lib/veilid_processor/repository/processor_repository.dart b/lib/veilid_processor/repository/processor_repository.dart index 0a17fc7..e021648 100644 --- a/lib/veilid_processor/repository/processor_repository.dart +++ b/lib/veilid_processor/repository/processor_repository.dart @@ -119,6 +119,8 @@ class ProcessorRepository { } void processUpdateValueChange(VeilidUpdateValueChange updateValueChange) { + log.debug('UpdateValueChange: ${updateValueChange.toJson()}'); + // Send value updates to DHTRecordPool DHTRecordPool.instance.processRemoteValueChange(updateValueChange); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array.dart index 3de5df5..9205fce 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array.dart @@ -36,11 +36,7 @@ class DHTShortArray { //////////////////////////////////////////////////////////////// // Constructors - DHTShortArray._({required DHTRecord headRecord}) - : _headRecord = headRecord, - _head = _DHTShortArrayCache(), - _subscriptions = {}, - _listenMutex = Mutex() { + DHTShortArray._({required DHTRecord headRecord}) : _headRecord = headRecord { late final int stride; switch (headRecord.schema) { case DHTSchemaDFLT(oCnt: final oCnt): @@ -155,9 +151,14 @@ class DHTShortArray { //////////////////////////////////////////////////////////////////////////// // Public API + /// Returns the first record in the DHTShortArray which contains its control + /// information. DHTRecord get record => _headRecord; + + /// Returns the number of elements in the DHTShortArray int get length => _head.index.length; + /// Free all resources for the DHTShortArray Future close() async { await _watchController?.close(); final futures = >[_headRecord.close()]; @@ -167,6 +168,7 @@ class DHTShortArray { await Future.wait(futures); } + /// Free all resources for the DHTShortArray and delete it from the DHT Future delete() async { await _watchController?.close(); @@ -177,6 +179,8 @@ class DHTShortArray { await Future.wait(futures); } + /// Runs a closure that guarantees the DHTShortArray + /// will be closed upon exit, even if an uncaught exception is thrown Future scope(Future Function(DHTShortArray) scopeFunction) async { try { return await scopeFunction(this); @@ -185,6 +189,9 @@ class DHTShortArray { } } + /// Runs a closure that guarantees the DHTShortArray + /// will be closed upon exit, and deleted if an an + /// uncaught exception is thrown Future deleteScope( Future Function(DHTShortArray) scopeFunction) async { try { @@ -197,6 +204,9 @@ class DHTShortArray { } } + /// Return the item at position 'pos' in the DHTShortArray. If 'foreRefresh' + /// is specified, the network will always be checked for newer values + /// rather than returning the existing locally stored copy of the elements. Future getItem(int pos, {bool forceRefresh = false}) async { await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true); @@ -212,6 +222,9 @@ class DHTShortArray { return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh); } + /// Return a list of all of the items in the DHTShortArray. If 'foreRefresh' + /// is specified, the network will always be checked for newer values + /// rather than returning the existing locally stored copy of the elements. Future?> getAllItems({bool forceRefresh = false}) async { await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true); @@ -238,28 +251,41 @@ class DHTShortArray { return out; } + /// Convenience function: + /// Like getItem but also parses the returned element as JSON Future getItemJson(T Function(dynamic) fromJson, int pos, {bool forceRefresh = false}) => getItem(pos, forceRefresh: forceRefresh) .then((out) => jsonDecodeOptBytes(fromJson, out)); + /// Convenience function: + /// Like getAllItems but also parses the returned elements as JSON Future?> getAllItemsJson(T Function(dynamic) fromJson, {bool forceRefresh = false}) => getAllItems(forceRefresh: forceRefresh) .then((out) => out?.map(fromJson).toList()); + /// Convenience function: + /// Like getItem but also parses the returned element as a protobuf object Future getItemProtobuf( T Function(List) fromBuffer, int pos, {bool forceRefresh = false}) => getItem(pos, forceRefresh: forceRefresh) .then((out) => (out == null) ? null : fromBuffer(out)); + /// Convenience function: + /// Like getAllItems but also parses the returned elements as protobuf objects Future?> getAllItemsProtobuf( T Function(List) fromBuffer, {bool forceRefresh = false}) => getAllItems(forceRefresh: forceRefresh) .then((out) => out?.map(fromBuffer).toList()); + /// Try to add an item to the end of the DHTShortArray. Return true if the + /// element was successfully added, and false if the state changed before + /// the element could be added or a newer value was found on the network. + /// This may throw an exception if the number elements added exceeds the + /// built-in limit of 'maxElements = 256' entries. Future tryAddItem(Uint8List value) async { await _refreshHead(onlyUpdates: true); @@ -288,6 +314,12 @@ class DHTShortArray { return true; } + /// Try to insert an item as position 'pos' of the DHTShortArray. + /// Return true if the element was successfully inserted, and false if the + /// state changed before the element could be inserted or a newer value was + /// found on the network. + /// This may throw an exception if the number elements added exceeds the + /// built-in limit of 'maxElements = 256' entries. Future tryInsertItem(int pos, Uint8List value) async { await _refreshHead(onlyUpdates: true); @@ -314,6 +346,10 @@ class DHTShortArray { return true; } + /// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray. + /// Return true if the elements were successfully swapped, and false if the + /// state changed before the elements could be swapped or newer values were + /// found on the network. Future trySwapItem(int aPos, int bPos) async { if (aPos == bPos) { return false; @@ -344,6 +380,10 @@ class DHTShortArray { return true; } + /// Try to remove an item at position 'pos' in the DHTShortArray. + /// Return the element if it was successfully removed, and null if the + /// state changed before the elements could be removed or newer values were + /// found on the network. Future tryRemoveItem(int pos) async { await _refreshHead(onlyUpdates: true); @@ -376,16 +416,24 @@ class DHTShortArray { } } + /// Convenience function: + /// Like removeItem but also parses the returned element as JSON Future tryRemoveItemJson( T Function(dynamic) fromJson, int pos, ) => tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out)); + /// Convenience function: + /// Like removeItem but also parses the returned element as JSON Future tryRemoveItemProtobuf( T Function(List) fromBuffer, int pos) => getItem(pos).then((out) => (out == null) ? null : fromBuffer(out)); + /// Try to remove all items in the DHTShortArray. + /// Return true if it was successfully cleared, and false if the + /// state changed before the elements could be cleared or newer values were + /// found on the network. Future tryClear() async { await _refreshHead(onlyUpdates: true); @@ -411,6 +459,12 @@ class DHTShortArray { return true; } + /// Try to set an item at position 'pos' of the DHTShortArray. + /// Return true if the element was successfully set, and false if the + /// state changed before the element could be set or a newer value was + /// found on the network. + /// This may throw an exception if the position elements the built-in limit of + /// 'maxElements = 256' entries. Future tryWriteItem(int pos, Uint8List newValue) async { if (await _refreshHead(onlyUpdates: true)) { throw StateError('structure changed'); @@ -433,6 +487,10 @@ class DHTShortArray { return result; } + /// Set an item at position 'pos' of the DHTShortArray. Retries until the + /// value being written is successfully made the newest value of the element. + /// This may throw an exception if the position elements the built-in limit of + /// 'maxElements = 256' entries. Future eventualWriteItem(int pos, Uint8List newValue) async { Uint8List? oldData; do { @@ -443,6 +501,12 @@ class DHTShortArray { } while (oldData != null); } + /// Change an item at position 'pos' of the DHTShortArray. + /// Runs with the value of the old element at that position such that it can + /// be changed to the returned value from tha closure. Retries until the + /// value being written is successfully made the newest value of the element. + /// This may throw an exception if the position elements the built-in limit of + /// 'maxElements = 256' entries. Future eventualUpdateItem( int pos, Future Function(Uint8List? oldValue) update) async { var oldData = await getItem(pos); @@ -461,6 +525,9 @@ class DHTShortArray { } while (oldData != null); } + /// Convenience function: + /// Like tryWriteItem but also encodes the input value as JSON and parses the + /// returned element as JSON Future tryWriteItemJson( T Function(dynamic) fromJson, int pos, @@ -469,6 +536,9 @@ class DHTShortArray { tryWriteItem(pos, jsonEncodeBytes(newValue)) .then((out) => jsonDecodeOptBytes(fromJson, out)); + /// Convenience function: + /// Like tryWriteItem but also encodes the input value as a protobuf object + /// and parses the returned element as a protobuf object Future tryWriteItemProtobuf( T Function(List) fromBuffer, int pos, @@ -481,14 +551,22 @@ class DHTShortArray { return fromBuffer(out); }); + /// Convenience function: + /// Like eventualWriteItem but also encodes the input value as JSON and parses + /// the returned element as JSON Future eventualWriteItemJson(int pos, T newValue) => eventualWriteItem(pos, jsonEncodeBytes(newValue)); + /// Convenience function: + /// Like eventualWriteItem but also encodes the input value as a protobuf + /// object and parses the returned element as a protobuf object Future eventualWriteItemProtobuf( int pos, T newValue, {int subkey = -1}) => eventualWriteItem(pos, newValue.writeToBuffer()); + /// Convenience function: + /// Like eventualUpdateItem but also encodes the input value as JSON Future eventualUpdateItemJson( T Function(dynamic) fromJson, int pos, @@ -496,6 +574,9 @@ class DHTShortArray { ) => eventualUpdateItem(pos, jsonUpdate(fromJson, update)); + /// Convenience function: + /// Like eventualUpdateItem but also encodes the input value as a protobuf + /// object Future eventualUpdateItemProtobuf( T Function(List) fromBuffer, int pos, @@ -807,15 +888,17 @@ class DHTShortArray { // Head DHT record final DHTRecord _headRecord; + // How many elements per linked record late final int _stride; - // Cached representation refreshed from head record - _DHTShortArrayCache _head; - + _DHTShortArrayCache _head = _DHTShortArrayCache(); // Subscription to head and linked record internal changes - final Map> _subscriptions; + final Map> _subscriptions = + {}; // Stream of external changes StreamController? _watchController; // Watch mutex to ensure we keep the representation valid - final Mutex _listenMutex; + final Mutex _listenMutex = Mutex(); + // Head/element mutex to ensure we keep the representation valid + final Mutex _headMutex = Mutex(); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart index 7937b12..8525721 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart @@ -48,9 +48,8 @@ class DHTShortArrayCubit extends Cubit> } Future _refreshNoWait({bool forceRefresh = false}) async => - busy((emit) async { - await _refreshInner(emit, forceRefresh: forceRefresh); - }); + busy((emit) async => _operateMutex.protect( + () async => _refreshInner(emit, forceRefresh: forceRefresh))); Future _refreshInner(void Function(AsyncValue>) emit, {bool forceRefresh = false}) async { @@ -59,9 +58,7 @@ class DHTShortArrayCubit extends Cubit> (await _shortArray.getAllItems(forceRefresh: forceRefresh)) ?.map(_decodeElement) .toIList(); - if (newState == null) { - emit(const AsyncValue.loading()); - } else { + if (newState != null) { emit(AsyncValue.data(newState)); } } on Exception catch (e) { @@ -74,9 +71,8 @@ class DHTShortArrayCubit extends Cubit> // Because this is async, we could get an update while we're // still processing the last one. Only called after init future has run // so we dont have to wait for that here. - _sspUpdate.busyUpdate>>(busy, (emit) async { - await _refreshInner(emit); - }); + _sspUpdate.busyUpdate>>(busy, + (emit) async => _operateMutex.protect(() async => _refreshInner(emit))); } @override @@ -90,7 +86,7 @@ class DHTShortArrayCubit extends Cubit> await super.close(); } - Future operate(Future Function(DHTShortArray) closure) async { + Future operate(Future Function(DHTShortArray) closure) async { await _initFuture; return _operateMutex.protect(() async => closure(_shortArray)); }