short array work and doc

This commit is contained in:
Christien Rioux 2024-03-07 08:28:05 -05:00
parent d3cdca17c5
commit 64d4d0cefb
6 changed files with 178 additions and 94 deletions

View File

@ -10,9 +10,7 @@ import '../../account_manager/account_manager.dart';
import '../../proto/proto.dart' as proto;
class _MessageQueueEntry {
_MessageQueueEntry(
{required this.localMessages, required this.remoteMessages});
IList<proto.Message> localMessages;
_MessageQueueEntry({required this.remoteMessages});
IList<proto.Message> remoteMessages;
}
@ -60,74 +58,10 @@ class MessagesCubit extends Cubit<MessagesState> {
await super.close();
}
void updateLocalMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
// Updated local messages from online just update the state immediately
emit(avmessages.state);
}
Future<void> _updateRemoteMessagesStateAsync(_MessageQueueEntry entry) async {
// Updated remote messages need to be merged with the local messages state
// Ensure remoteMessages is sorted by timestamp
final remoteMessages =
entry.remoteMessages.sort((a, b) => a.timestamp.compareTo(b.timestamp));
// Existing messages will always be sorted by timestamp so merging is easy
var localMessages = entry.localMessages;
var pos = 0;
for (final newMessage in remoteMessages) {
var skip = false;
while (pos < localMessages.length) {
final m = localMessages[pos];
// If timestamp to insert is less than
// the current position, insert it here
final newTs = Timestamp.fromInt64(newMessage.timestamp);
final curTs = Timestamp.fromInt64(m.timestamp);
final cmp = newTs.compareTo(curTs);
if (cmp < 0) {
break;
} else if (cmp == 0) {
skip = true;
break;
}
pos++;
}
// Insert at this position
if (!skip) {
// Insert into dht backing array
await _localMessagesCubit!.operate((shortArray) =>
shortArray.tryInsertItem(pos, newMessage.writeToBuffer()));
// Insert into local copy as well for this operation
localMessages = localMessages.insert(pos, newMessage);
}
}
}
void updateRemoteMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
final remoteMessages = avmessages.state.data?.value;
if (remoteMessages == null) {
return;
}
final localMessages = state.data?.value;
if (localMessages == null) {
// No local messages means remote messages
// are all we have so merging is easy
emit(AsyncValue.data(remoteMessages));
return;
}
_remoteMessagesQueue.add(_MessageQueueEntry(
localMessages: localMessages, remoteMessages: remoteMessages));
}
// Open local messages key
Future<void> _initLocalMessages(TypedKey localConversationRecordKey,
TypedKey localMessagesRecordKey) async {
final crypto = await getMessagesCrypto();
final crypto = await _getMessagesCrypto();
final writer = _activeAccountInfo.conversationWriter;
_localMessagesCubit = DHTShortArrayCubit(
@ -136,21 +70,87 @@ class MessagesCubit extends Cubit<MessagesState> {
parent: localConversationRecordKey, crypto: crypto),
decodeElement: proto.Message.fromBuffer);
_localSubscription =
_localMessagesCubit!.stream.listen(updateLocalMessagesState);
_localMessagesCubit!.stream.listen(_updateLocalMessagesState);
_updateLocalMessagesState(_localMessagesCubit!.state);
}
// Open remote messages key
Future<void> _initRemoteMessages(TypedKey remoteConversationRecordKey,
TypedKey remoteMessagesRecordKey) async {
// Open remote record key if it is specified
final crypto = await getMessagesCrypto();
final crypto = await _getMessagesCrypto();
_remoteMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openRead(remoteMessagesRecordKey,
parent: remoteConversationRecordKey, crypto: crypto),
decodeElement: proto.Message.fromBuffer);
_remoteSubscription =
_remoteMessagesCubit!.stream.listen(updateRemoteMessagesState);
_remoteMessagesCubit!.stream.listen(_updateRemoteMessagesState);
_updateRemoteMessagesState(_remoteMessagesCubit!.state);
}
// Called when the local messages list gets a change
void _updateLocalMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
// When local messages are updated, pass this
// directly to the messages cubit state
emit(avmessages.state);
}
// Called when the remote messages list gets a change
void _updateRemoteMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
final remoteMessages = avmessages.state.data?.value;
if (remoteMessages == null) {
return;
}
// Add remote messages updates to queue to process asynchronously
_remoteMessagesQueue
.add(_MessageQueueEntry(remoteMessages: remoteMessages));
}
Future<void> _updateRemoteMessagesStateAsync(_MessageQueueEntry entry) async {
final localMessagesCubit = _localMessagesCubit!;
// Updated remote messages need to be merged with the local messages state
await localMessagesCubit.operate((shortArray) async {
// Ensure remoteMessages is sorted by timestamp
final remoteMessages = entry.remoteMessages
.sort((a, b) => a.timestamp.compareTo(b.timestamp));
// dedup? build local timestamp set?
// Existing messages will always be sorted by timestamp so merging is easy
var localMessages = localMessagesCubit.state.state.data!.value;
var pos = 0;
for (final newMessage in remoteMessages) {
var skip = false;
while (pos < localMessages.length) {
final m = localMessages[pos];
pos++;
// If timestamp to insert is less than
// the current position, insert it here
final newTs = Timestamp.fromInt64(newMessage.timestamp);
final curTs = Timestamp.fromInt64(m.timestamp);
final cmp = newTs.compareTo(curTs);
if (cmp < 0) {
break;
} else if (cmp == 0) {
skip = true;
break;
}
}
// Insert at this position
if (!skip) {
// Insert into dht backing array
await shortArray.tryInsertItem(pos, newMessage.writeToBuffer());
// Insert into local copy as well for this operation
localMessages = localMessages.insert(pos, newMessage);
}
}
});
}
// Initialize local messages
@ -187,7 +187,7 @@ class MessagesCubit extends Cubit<MessagesState> {
(shortArray) => shortArray.tryAddItem(message.writeToBuffer()));
}
Future<DHTRecordCrypto> getMessagesCrypto() async {
Future<DHTRecordCrypto> _getMessagesCrypto() async {
var messagesCrypto = _messagesCrypto;
if (messagesCrypto != null) {
return messagesCrypto;

View File

@ -20,9 +20,11 @@ class PreferencesRepository {
Future<void> init() async {
final sharedPreferences = await SharedPreferences.getInstance();
// ignore: do_not_use_environment
const namespace = String.fromEnvironment('NAMESPACE');
_data = SharedPreferencesValue<Preferences>(
sharedPreferences: sharedPreferences,
keyName: 'preferences',
keyName: namespace.isEmpty ? 'preferences' : 'preferences_$namespace',
valueFromJson: (obj) =>
obj != null ? Preferences.fromJson(obj) : Preferences.defaults,
valueToJson: (val) => val.toJson());

View File

@ -3,7 +3,8 @@ import 'package:loggy/loggy.dart';
import 'loggy.dart';
const Map<String, LogLevel> _blocChangeLogLevels = {
'ConnectionStateCubit': LogLevel.off
'ConnectionStateCubit': LogLevel.off,
'ActiveConversationMessagesBlocMapCubit': LogLevel.off
};
const Map<String, LogLevel> _blocCreateCloseLogLevels = {};
const Map<String, LogLevel> _blocErrorLogLevels = {};

View File

@ -119,6 +119,8 @@ class ProcessorRepository {
}
void processUpdateValueChange(VeilidUpdateValueChange updateValueChange) {
log.debug('UpdateValueChange: ${updateValueChange.toJson()}');
// Send value updates to DHTRecordPool
DHTRecordPool.instance.processRemoteValueChange(updateValueChange);
}

View File

@ -36,11 +36,7 @@ class DHTShortArray {
////////////////////////////////////////////////////////////////
// Constructors
DHTShortArray._({required DHTRecord headRecord})
: _headRecord = headRecord,
_head = _DHTShortArrayCache(),
_subscriptions = {},
_listenMutex = Mutex() {
DHTShortArray._({required DHTRecord headRecord}) : _headRecord = headRecord {
late final int stride;
switch (headRecord.schema) {
case DHTSchemaDFLT(oCnt: final oCnt):
@ -155,9 +151,14 @@ class DHTShortArray {
////////////////////////////////////////////////////////////////////////////
// Public API
/// Returns the first record in the DHTShortArray which contains its control
/// information.
DHTRecord get record => _headRecord;
/// Returns the number of elements in the DHTShortArray
int get length => _head.index.length;
/// Free all resources for the DHTShortArray
Future<void> close() async {
await _watchController?.close();
final futures = <Future<void>>[_headRecord.close()];
@ -167,6 +168,7 @@ class DHTShortArray {
await Future.wait(futures);
}
/// Free all resources for the DHTShortArray and delete it from the DHT
Future<void> delete() async {
await _watchController?.close();
@ -177,6 +179,8 @@ class DHTShortArray {
await Future.wait(futures);
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(DHTShortArray) scopeFunction) async {
try {
return await scopeFunction(this);
@ -185,6 +189,9 @@ class DHTShortArray {
}
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, and deleted if an an
/// uncaught exception is thrown
Future<T> deleteScope<T>(
Future<T> Function(DHTShortArray) scopeFunction) async {
try {
@ -197,6 +204,9 @@ class DHTShortArray {
}
}
/// Return the item at position 'pos' in the DHTShortArray. If 'foreRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
@ -212,6 +222,9 @@ class DHTShortArray {
return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh);
}
/// Return a list of all of the items in the DHTShortArray. If 'foreRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) async {
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
@ -238,28 +251,41 @@ class DHTShortArray {
return out;
}
/// Convenience function:
/// Like getItem but also parses the returned element as JSON
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as JSON
Future<List<T>?> getAllItemsJson<T>(T Function(dynamic) fromJson,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromJson).toList());
/// Convenience function:
/// Like getItem but also parses the returned element as a protobuf object
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as protobuf objects
Future<List<T>?> getAllItemsProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromBuffer).toList());
/// Try to add an item to the end of the DHTShortArray. Return true if the
/// element was successfully added, and false if the state changed before
/// the element could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value) async {
await _refreshHead(onlyUpdates: true);
@ -288,6 +314,12 @@ class DHTShortArray {
return true;
}
/// Try to insert an item as position 'pos' of the DHTShortArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value) async {
await _refreshHead(onlyUpdates: true);
@ -314,6 +346,10 @@ class DHTShortArray {
return true;
}
/// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray.
/// Return true if the elements were successfully swapped, and false if the
/// state changed before the elements could be swapped or newer values were
/// found on the network.
Future<bool> trySwapItem(int aPos, int bPos) async {
if (aPos == bPos) {
return false;
@ -344,6 +380,10 @@ class DHTShortArray {
return true;
}
/// Try to remove an item at position 'pos' in the DHTShortArray.
/// Return the element if it was successfully removed, and null if the
/// state changed before the elements could be removed or newer values were
/// found on the network.
Future<Uint8List?> tryRemoveItem(int pos) async {
await _refreshHead(onlyUpdates: true);
@ -376,16 +416,24 @@ class DHTShortArray {
}
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemJson<T>(
T Function(dynamic) fromJson,
int pos,
) =>
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos) =>
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
/// Try to remove all items in the DHTShortArray.
/// Return true if it was successfully cleared, and false if the
/// state changed before the elements could be cleared or newer values were
/// found on the network.
Future<bool> tryClear() async {
await _refreshHead(onlyUpdates: true);
@ -411,6 +459,12 @@ class DHTShortArray {
return true;
}
/// Try to set an item at position 'pos' of the DHTShortArray.
/// Return true if the element was successfully set, and false if the
/// state changed before the element could be set or a newer value was
/// found on the network.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<Uint8List?> tryWriteItem(int pos, Uint8List newValue) async {
if (await _refreshHead(onlyUpdates: true)) {
throw StateError('structure changed');
@ -433,6 +487,10 @@ class DHTShortArray {
return result;
}
/// Set an item at position 'pos' of the DHTShortArray. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualWriteItem(int pos, Uint8List newValue) async {
Uint8List? oldData;
do {
@ -443,6 +501,12 @@ class DHTShortArray {
} while (oldData != null);
}
/// Change an item at position 'pos' of the DHTShortArray.
/// Runs with the value of the old element at that position such that it can
/// be changed to the returned value from tha closure. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualUpdateItem(
int pos, Future<Uint8List> Function(Uint8List? oldValue) update) async {
var oldData = await getItem(pos);
@ -461,6 +525,9 @@ class DHTShortArray {
} while (oldData != null);
}
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<T?> tryWriteItemJson<T>(
T Function(dynamic) fromJson,
int pos,
@ -469,6 +536,9 @@ class DHTShortArray {
tryWriteItem(pos, jsonEncodeBytes(newValue))
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<T?> tryWriteItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
@ -481,14 +551,22 @@ class DHTShortArray {
return fromBuffer(out);
});
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as JSON and parses
/// the returned element as JSON
Future<void> eventualWriteItemJson<T>(int pos, T newValue) =>
eventualWriteItem(pos, jsonEncodeBytes(newValue));
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as a protobuf
/// object and parses the returned element as a protobuf object
Future<void> eventualWriteItemProtobuf<T extends GeneratedMessage>(
int pos, T newValue,
{int subkey = -1}) =>
eventualWriteItem(pos, newValue.writeToBuffer());
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as JSON
Future<void> eventualUpdateItemJson<T>(
T Function(dynamic) fromJson,
int pos,
@ -496,6 +574,9 @@ class DHTShortArray {
) =>
eventualUpdateItem(pos, jsonUpdate(fromJson, update));
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as a protobuf
/// object
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
@ -807,15 +888,17 @@ class DHTShortArray {
// Head DHT record
final DHTRecord _headRecord;
// How many elements per linked record
late final int _stride;
// Cached representation refreshed from head record
_DHTShortArrayCache _head;
_DHTShortArrayCache _head = _DHTShortArrayCache();
// Subscription to head and linked record internal changes
final Map<TypedKey, StreamSubscription<DHTRecordWatchChange>> _subscriptions;
final Map<TypedKey, StreamSubscription<DHTRecordWatchChange>> _subscriptions =
{};
// Stream of external changes
StreamController<void>? _watchController;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex;
final Mutex _listenMutex = Mutex();
// Head/element mutex to ensure we keep the representation valid
final Mutex _headMutex = Mutex();
}

View File

@ -48,9 +48,8 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
}
Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
busy((emit) async {
await _refreshInner(emit, forceRefresh: forceRefresh);
});
busy((emit) async => _operateMutex.protect(
() async => _refreshInner(emit, forceRefresh: forceRefresh)));
Future<void> _refreshInner(void Function(AsyncValue<IList<T>>) emit,
{bool forceRefresh = false}) async {
@ -59,9 +58,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
(await _shortArray.getAllItems(forceRefresh: forceRefresh))
?.map(_decodeElement)
.toIList();
if (newState == null) {
emit(const AsyncValue.loading());
} else {
if (newState != null) {
emit(AsyncValue.data(newState));
}
} on Exception catch (e) {
@ -74,9 +71,8 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
// Because this is async, we could get an update while we're
// still processing the last one. Only called after init future has run
// so we dont have to wait for that here.
_sspUpdate.busyUpdate<T, AsyncValue<IList<T>>>(busy, (emit) async {
await _refreshInner(emit);
});
_sspUpdate.busyUpdate<T, AsyncValue<IList<T>>>(busy,
(emit) async => _operateMutex.protect(() async => _refreshInner(emit)));
}
@override
@ -90,7 +86,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
await super.close();
}
Future<R> operate<R>(Future<R> Function(DHTShortArray) closure) async {
Future<R?> operate<R>(Future<R?> Function(DHTShortArray) closure) async {
await _initFuture;
return _operateMutex.protect(() async => closure(_shortArray));
}