diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index dc0e2d4..7ab3401 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -1,4 +1,6 @@ import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; @@ -15,7 +17,6 @@ class RenderStateElement { {required this.message, required this.isLocal, this.reconciled = false, - this.reconciledOffline = false, this.sent = false, this.sentOffline = false}); @@ -27,7 +28,7 @@ class RenderStateElement { if (sent && !sentOffline) { return MessageSendState.delivered; } - if (reconciled && !reconciledOffline) { + if (reconciled) { return MessageSendState.sent; } return MessageSendState.sending; @@ -36,7 +37,6 @@ class RenderStateElement { proto.Message message; bool isLocal; bool reconciled; - bool reconciledOffline; bool sent; bool sentOffline; } @@ -96,7 +96,7 @@ class SingleContactMessagesCubit extends Cubit { ); // Make crypto - await _initMessagesCrypto(); + await _initCrypto(); // Reconciled messages key await _initReconciledMessagesCubit(); @@ -109,9 +109,13 @@ class SingleContactMessagesCubit extends Cubit { } // Make crypto - Future _initMessagesCrypto() async { + Future _initCrypto() async { _messagesCrypto = await _activeAccountInfo .makeConversationCrypto(_remoteIdentityPublicKey); + _localMessagesCryptoSystem = + await Veilid.instance.getCryptoSystem(_localMessagesRecordKey.kind); + _identityCryptoSystem = + await _activeAccountInfo.localAccount.identityMaster.identityCrypto; } // Open local messages key @@ -144,13 +148,16 @@ class SingleContactMessagesCubit extends Cubit { _updateRcvdMessagesState(_rcvdMessagesCubit!.state); } + Future _makeLocalMessagesCrypto() async => + VeilidCryptoPrivate.fromTypedKey( + _activeAccountInfo.userLogin.identitySecret, 'tabledb'); + // Open reconciled chat record key Future _initReconciledMessagesCubit() async { final tableName = _localConversationRecordKey.toString(); - xxx whats the right encryption for reconciled messages cubit? + final crypto = await _makeLocalMessagesCrypto(); - final crypto = VeilidCryptoPrivate.fromTypedKey(kind, secretKey); _reconciledMessagesCubit = TableDBArrayCubit( open: () async => TableDBArray.make(table: tableName, crypto: crypto), decodeElement: proto.Message.fromBuffer); @@ -183,8 +190,8 @@ class SingleContactMessagesCubit extends Cubit { if (sentMessages == null) { return; } - // Don't reconcile, the sending machine will have already added - // to the reconciliation queue on that machine + + await _reconcileMessages(sentMessages, _sentMessagesCubit); // Update the view _renderState(); @@ -197,16 +204,18 @@ class SingleContactMessagesCubit extends Cubit { return; } + await _reconcileMessages(rcvdMessages, _rcvdMessagesCubit); + singleFuture(_rcvdMessagesCubit!, () async { // Get the timestamp of our most recent reconciled message final lastReconciledMessageTs = - await _reconciledMessagesCubit!.operate((r) async { - final len = r.length; + await _reconciledMessagesCubit!.operate((arr) async { + final len = arr.length; if (len == 0) { return null; } else { final lastMessage = - await r.getItemProtobuf(proto.Message.fromBuffer, len - 1); + await arr.getProtobuf(proto.Message.fromBuffer, len - 1); if (lastMessage == null) { throw StateError('should have gotten last message'); } @@ -232,11 +241,9 @@ class SingleContactMessagesCubit extends Cubit { }); } - // Called when the reconciled messages list gets a change - // This can happen when multiple clients for the same identity are - // reading and reconciling the same remote chat + // Called when the reconciled messages window gets a change void _updateReconciledMessagesState( - DHTLogBusyState avmessages) { + TableDBArrayBusyState avmessages) { // Update the view _renderState(); } @@ -252,10 +259,62 @@ class SingleContactMessagesCubit extends Cubit { // }); } + Future _hashSignature(proto.Signature signature) async => + (await _localMessagesCryptoSystem + .generateHash(signature.toVeilid().decode())) + .decode(); + + Future _signMessage(proto.Message message) async { + // Generate data to sign + final data = Uint8List.fromList(utf8.encode(message.writeToJson())); + + // Sign with our identity + final signature = await _identityCryptoSystem.sign( + _activeAccountInfo.localAccount.identityMaster.identityPublicKey, + _activeAccountInfo.userLogin.identitySecret.value, + data); + + // Add to the message + message.signature = signature.toProto(); + } + + Future _processMessageToSend( + proto.Message message, proto.Message? previousMessage) async { + // Get the previous message if we don't have one + previousMessage ??= await _sentMessagesCubit!.operate((r) async => + r.length == 0 + ? null + : await r.getProtobuf(proto.Message.fromBuffer, r.length - 1)); + + if (previousMessage == null) { + // If there's no last sent message, + // we start at a hash of the identity public key + message.id = (await _localMessagesCryptoSystem.generateHash( + _activeAccountInfo.localAccount.identityMaster.identityPublicKey + .decode())) + .decode(); + } else { + // If there is a last message, we generate the hash + // of the last message's signature and use it as our next id + message.id = await _hashSignature(previousMessage.signature); + } + + // Now sign it + await _signMessage(message); + } + // Async process to send messages in the background Future _processSendingMessages(IList messages) async { + // Go through and assign ids to all the messages in order + proto.Message? previousMessage; + final processedMessages = messages.toList(); + for (final message in processedMessages) { + await _processMessageToSend(message, previousMessage); + previousMessage = message; + } + await _sentMessagesCubit!.operateAppendEventual((writer) => - writer.tryAddItems(messages.map((m) => m.writeToBuffer()).toList())); + writer.tryAddAll(messages.map((m) => m.writeToBuffer()).toList())); } Future _reconcileMessagesInner( @@ -345,9 +404,8 @@ class SingleContactMessagesCubit extends Cubit { keyMapper: (x) => x.value.timestamp, values: sentMessages.elements, ); - final reconciledMessagesMap = - IMap>.fromValues( - keyMapper: (x) => x.value.timestamp, + final reconciledMessagesMap = IMap.fromValues( + keyMapper: (x) => x.timestamp, values: reconciledMessages.elements, ); final sendingMessagesMap = IMap.fromValues( @@ -416,7 +474,7 @@ class SingleContactMessagesCubit extends Cubit { emit(AsyncValue.data(renderedState)); } - void addTextMessage({required proto.Message_Text messageText}) { + void sendTextMessage({required proto.Message_Text messageText}) { final message = proto.Message() ..id = generateNextId() ..author = _activeAccountInfo.localAccount.identityMaster @@ -425,7 +483,6 @@ class SingleContactMessagesCubit extends Cubit { ..timestamp = Veilid.instance.now().toInt64() ..text = messageText; - _unreconciledMessagesQueue.addSync(message); _sendingMessagesQueue.addSync(message); // Update the view @@ -456,15 +513,18 @@ class SingleContactMessagesCubit extends Cubit { final TypedKey _remoteMessagesRecordKey; late final VeilidCrypto _messagesCrypto; + late final VeilidCryptoSystem _localMessagesCryptoSystem; + late final VeilidCryptoSystem _identityCryptoSystem; DHTLogCubit? _sentMessagesCubit; DHTLogCubit? _rcvdMessagesCubit; TableDBArrayCubit? _reconciledMessagesCubit; - late final PersistentQueue _unreconciledMessagesQueue; + late final PersistentQueue _unreconciledMessagesQueue; xxx can we eliminate this? and make rcvd messages cubit listener work like sent? late final PersistentQueue _sendingMessagesQueue; StreamSubscription>? _sentSubscription; StreamSubscription>? _rcvdSubscription; - StreamSubscription>? _reconciledSubscription; + StreamSubscription>? + _reconciledSubscription; } diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index 5aa70b7..06d4312 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -162,7 +162,7 @@ class ChatComponent extends StatelessWidget { ..viewLimit = viewLimit ?? 0; protoMessageText.attachments.addAll(attachments); - _messagesCubit.addTextMessage(messageText: protoMessageText); + _messagesCubit.sendTextMessage(messageText: protoMessageText); } void _handleSendPressed(types.PartialText message) { diff --git a/lib/chat_list/cubits/chat_list_cubit.dart b/lib/chat_list/cubits/chat_list_cubit.dart index ff1d5d0..2ab2993 100644 --- a/lib/chat_list/cubits/chat_list_cubit.dart +++ b/lib/chat_list/cubits/chat_list_cubit.dart @@ -65,7 +65,7 @@ class ChatListCubit extends DHTShortArrayCubit await operateWrite((writer) async { // See if we have added this chat already for (var i = 0; i < writer.length; i++) { - final cbuf = await writer.getItem(i); + final cbuf = await writer.get(i); if (cbuf == null) { throw Exception('Failed to get chat'); } @@ -84,7 +84,7 @@ class ChatListCubit extends DHTShortArrayCubit ..remoteConversationRecordKey = remoteConversationRecordKey.toProto(); // Add chat - final added = await writer.tryAddItem(chat.writeToBuffer()); + final added = await writer.tryAdd(chat.writeToBuffer()); if (!added) { throw Exception('Failed to add chat'); } @@ -106,15 +106,14 @@ class ChatListCubit extends DHTShortArrayCubit activeChatCubit.setActiveChat(null); } for (var i = 0; i < writer.length; i++) { - final c = - await writer.getItemProtobuf(proto.Chat.fromBuffer, i); + final c = await writer.getProtobuf(proto.Chat.fromBuffer, i); if (c == null) { throw Exception('Failed to get chat'); } if (c.localConversationRecordKey == localConversationRecordKeyProto) { // Found the right chat - await writer.removeItem(i); + await writer.remove(i); return c; } } diff --git a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart index da1f6e3..640d416 100644 --- a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart +++ b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart @@ -159,7 +159,7 @@ class ContactInvitationListCubit // Add ContactInvitationRecord to account's list // if this fails, don't keep retrying, user can try again later await operateWrite((writer) async { - if (await writer.tryAddItem(cinvrec.writeToBuffer()) == false) { + if (await writer.tryAdd(cinvrec.writeToBuffer()) == false) { throw Exception('Failed to add contact invitation record'); } }); @@ -179,14 +179,14 @@ class ContactInvitationListCubit // Remove ContactInvitationRecord from account's list final deletedItem = await operateWrite((writer) async { for (var i = 0; i < writer.length; i++) { - final item = await writer.getItemProtobuf( + final item = await writer.getProtobuf( proto.ContactInvitationRecord.fromBuffer, i); if (item == null) { throw Exception('Failed to get contact invitation record'); } if (item.contactRequestInbox.recordKey.toVeilid() == contactRequestInboxRecordKey) { - await writer.removeItem(i); + await writer.remove(i); return item; } } diff --git a/lib/contacts/cubits/contact_list_cubit.dart b/lib/contacts/cubits/contact_list_cubit.dart index 2eb8a08..71669fc 100644 --- a/lib/contacts/cubits/contact_list_cubit.dart +++ b/lib/contacts/cubits/contact_list_cubit.dart @@ -56,7 +56,7 @@ class ContactListCubit extends DHTShortArrayCubit { // Add Contact to account's list // if this fails, don't keep retrying, user can try again later await operateWrite((writer) async { - if (!await writer.tryAddItem(contact.writeToBuffer())) { + if (!await writer.tryAdd(contact.writeToBuffer())) { throw Exception('Failed to add contact record'); } }); @@ -72,13 +72,13 @@ class ContactListCubit extends DHTShortArrayCubit { // Remove Contact from account's list final deletedItem = await operateWrite((writer) async { for (var i = 0; i < writer.length; i++) { - final item = await writer.getItemProtobuf(proto.Contact.fromBuffer, i); + final item = await writer.getProtobuf(proto.Contact.fromBuffer, i); if (item == null) { throw Exception('Failed to get contact'); } if (item.localConversationRecordKey == contact.localConversationRecordKey) { - await writer.removeItem(i); + await writer.remove(i); return item; } } diff --git a/lib/proto/veilidchat.pb.dart b/lib/proto/veilidchat.pb.dart index 3544f00..7770d73 100644 --- a/lib/proto/veilidchat.pb.dart +++ b/lib/proto/veilidchat.pb.dart @@ -486,7 +486,7 @@ class Message_ControlDelete extends $pb.GeneratedMessage { factory Message_ControlDelete.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlDelete', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create) - ..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'ids', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) + ..p<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'ids', $pb.PbFieldType.PY) ..hasRequiredFields = false ; @@ -512,7 +512,7 @@ class Message_ControlDelete extends $pb.GeneratedMessage { static Message_ControlDelete? _defaultInstance; @$pb.TagNumber(1) - $core.List<$0.TypedKey> get ids => $_getList(0); + $core.List<$core.List<$core.int>> get ids => $_getList(0); } class Message_ControlErase extends $pb.GeneratedMessage { @@ -696,8 +696,8 @@ class Message_ControlModeration extends $pb.GeneratedMessage { factory Message_ControlModeration.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlModeration', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create) - ..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'acceptedIds', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) - ..pc<$0.TypedKey>(2, _omitFieldNames ? '' : 'rejectedIds', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) + ..p<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'acceptedIds', $pb.PbFieldType.PY) + ..p<$core.List<$core.int>>(2, _omitFieldNames ? '' : 'rejectedIds', $pb.PbFieldType.PY) ..hasRequiredFields = false ; @@ -723,10 +723,46 @@ class Message_ControlModeration extends $pb.GeneratedMessage { static Message_ControlModeration? _defaultInstance; @$pb.TagNumber(1) - $core.List<$0.TypedKey> get acceptedIds => $_getList(0); + $core.List<$core.List<$core.int>> get acceptedIds => $_getList(0); @$pb.TagNumber(2) - $core.List<$0.TypedKey> get rejectedIds => $_getList(1); + $core.List<$core.List<$core.int>> get rejectedIds => $_getList(1); +} + +class Message_ControlReadReceipt extends $pb.GeneratedMessage { + factory Message_ControlReadReceipt() => create(); + Message_ControlReadReceipt._() : super(); + factory Message_ControlReadReceipt.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory Message_ControlReadReceipt.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + + static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlReadReceipt', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create) + ..p<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'readIds', $pb.PbFieldType.PY) + ..hasRequiredFields = false + ; + + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + Message_ControlReadReceipt clone() => Message_ControlReadReceipt()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + Message_ControlReadReceipt copyWith(void Function(Message_ControlReadReceipt) updates) => super.copyWith((message) => updates(message as Message_ControlReadReceipt)) as Message_ControlReadReceipt; + + $pb.BuilderInfo get info_ => _i; + + @$core.pragma('dart2js:noInline') + static Message_ControlReadReceipt create() => Message_ControlReadReceipt._(); + Message_ControlReadReceipt createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static Message_ControlReadReceipt getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static Message_ControlReadReceipt? _defaultInstance; + + @$pb.TagNumber(1) + $core.List<$core.List<$core.int>> get readIds => $_getList(0); } enum Message_Kind { diff --git a/lib/proto/veilidchat.pbjson.dart b/lib/proto/veilidchat.pbjson.dart index 1470054..56ebbe6 100644 --- a/lib/proto/veilidchat.pbjson.dart +++ b/lib/proto/veilidchat.pbjson.dart @@ -172,7 +172,7 @@ const Message$json = { {'1': 'moderation', '3': 11, '4': 1, '5': 11, '6': '.veilidchat.Message.ControlModeration', '9': 0, '10': 'moderation'}, {'1': 'signature', '3': 12, '4': 1, '5': 11, '6': '.veilid.Signature', '10': 'signature'}, ], - '3': [Message_Text$json, Message_Secret$json, Message_ControlDelete$json, Message_ControlErase$json, Message_ControlSettings$json, Message_ControlPermissions$json, Message_ControlMembership$json, Message_ControlModeration$json], + '3': [Message_Text$json, Message_Secret$json, Message_ControlDelete$json, Message_ControlErase$json, Message_ControlSettings$json, Message_ControlPermissions$json, Message_ControlMembership$json, Message_ControlModeration$json, Message_ControlReadReceipt$json], '8': [ {'1': 'kind'}, ], @@ -208,7 +208,7 @@ const Message_Secret$json = { const Message_ControlDelete$json = { '1': 'ControlDelete', '2': [ - {'1': 'ids', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'ids'}, + {'1': 'ids', '3': 1, '4': 3, '5': 12, '10': 'ids'}, ], }; @@ -248,8 +248,16 @@ const Message_ControlMembership$json = { const Message_ControlModeration$json = { '1': 'ControlModeration', '2': [ - {'1': 'accepted_ids', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'acceptedIds'}, - {'1': 'rejected_ids', '3': 2, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'rejectedIds'}, + {'1': 'accepted_ids', '3': 1, '4': 3, '5': 12, '10': 'acceptedIds'}, + {'1': 'rejected_ids', '3': 2, '4': 3, '5': 12, '10': 'rejectedIds'}, + ], +}; + +@$core.Deprecated('Use messageDescriptor instead') +const Message_ControlReadReceipt$json = { + '1': 'ControlReadReceipt', + '2': [ + {'1': 'read_ids', '3': 1, '4': 3, '5': 12, '10': 'readIds'}, ], }; @@ -272,15 +280,15 @@ final $typed_data.Uint8List messageDescriptor = $convert.base64Decode( 'HQoKdmlld19saW1pdBgFIAEoDVIJdmlld0xpbWl0EjgKC2F0dGFjaG1lbnRzGAYgAygLMhYudm' 'VpbGlkY2hhdC5BdHRhY2htZW50UgthdHRhY2htZW50c0IICgZfdG9waWNCCwoJX3JlcGx5X2lk' 'GkgKBlNlY3JldBIeCgpjaXBoZXJ0ZXh0GAEgASgMUgpjaXBoZXJ0ZXh0Eh4KCmV4cGlyYXRpb2' - '4YAiABKARSCmV4cGlyYXRpb24aMwoNQ29udHJvbERlbGV0ZRIiCgNpZHMYASADKAsyEC52ZWls' - 'aWQuVHlwZWRLZXlSA2lkcxosCgxDb250cm9sRXJhc2USHAoJdGltZXN0YW1wGAEgASgEUgl0aW' - '1lc3RhbXAaRwoPQ29udHJvbFNldHRpbmdzEjQKCHNldHRpbmdzGAEgASgLMhgudmVpbGlkY2hh' - 'dC5DaGF0U2V0dGluZ3NSCHNldHRpbmdzGk8KEkNvbnRyb2xQZXJtaXNzaW9ucxI5CgtwZXJtaX' - 'NzaW9ucxgBIAEoCzIXLnZlaWxpZGNoYXQuUGVybWlzc2lvbnNSC3Blcm1pc3Npb25zGksKEUNv' - 'bnRyb2xNZW1iZXJzaGlwEjYKCm1lbWJlcnNoaXAYASABKAsyFi52ZWlsaWRjaGF0Lk1lbWJlcn' - 'NoaXBSCm1lbWJlcnNoaXAafQoRQ29udHJvbE1vZGVyYXRpb24SMwoMYWNjZXB0ZWRfaWRzGAEg' - 'AygLMhAudmVpbGlkLlR5cGVkS2V5UgthY2NlcHRlZElkcxIzCgxyZWplY3RlZF9pZHMYAiADKA' - 'syEC52ZWlsaWQuVHlwZWRLZXlSC3JlamVjdGVkSWRzQgYKBGtpbmQ='); + '4YAiABKARSCmV4cGlyYXRpb24aIQoNQ29udHJvbERlbGV0ZRIQCgNpZHMYASADKAxSA2lkcxos' + 'CgxDb250cm9sRXJhc2USHAoJdGltZXN0YW1wGAEgASgEUgl0aW1lc3RhbXAaRwoPQ29udHJvbF' + 'NldHRpbmdzEjQKCHNldHRpbmdzGAEgASgLMhgudmVpbGlkY2hhdC5DaGF0U2V0dGluZ3NSCHNl' + 'dHRpbmdzGk8KEkNvbnRyb2xQZXJtaXNzaW9ucxI5CgtwZXJtaXNzaW9ucxgBIAEoCzIXLnZlaW' + 'xpZGNoYXQuUGVybWlzc2lvbnNSC3Blcm1pc3Npb25zGksKEUNvbnRyb2xNZW1iZXJzaGlwEjYK' + 'Cm1lbWJlcnNoaXAYASABKAsyFi52ZWlsaWRjaGF0Lk1lbWJlcnNoaXBSCm1lbWJlcnNoaXAaWQ' + 'oRQ29udHJvbE1vZGVyYXRpb24SIQoMYWNjZXB0ZWRfaWRzGAEgAygMUgthY2NlcHRlZElkcxIh' + 'CgxyZWplY3RlZF9pZHMYAiADKAxSC3JlamVjdGVkSWRzGi8KEkNvbnRyb2xSZWFkUmVjZWlwdB' + 'IZCghyZWFkX2lkcxgBIAMoDFIHcmVhZElkc0IGCgRraW5k'); @$core.Deprecated('Use reconciledMessageDescriptor instead') const ReconciledMessage$json = { diff --git a/lib/proto/veilidchat.proto b/lib/proto/veilidchat.proto index 81c963c..fa701fb 100644 --- a/lib/proto/veilidchat.proto +++ b/lib/proto/veilidchat.proto @@ -146,7 +146,7 @@ message Message { // A 'delete' control message // Deletes a set of messages by their ids message ControlDelete { - repeated veilid.TypedKey ids = 1; + repeated bytes ids = 1; } // An 'erase' control message // Deletes a set of messages from before some timestamp @@ -175,8 +175,13 @@ message Message { // A 'moderation' control message // Accepts or rejects a set of messages message ControlModeration { - repeated veilid.TypedKey accepted_ids = 1; - repeated veilid.TypedKey rejected_ids = 2; + repeated bytes accepted_ids = 1; + repeated bytes rejected_ids = 2; + } + + // A 'read receipt' control message + message ControlReadReceipt { + repeated bytes read_ids = 1; } ////////////////////////////////////////////////////////////////////////// diff --git a/packages/veilid_support/example/integration_test/test_dht_log.dart b/packages/veilid_support/example/integration_test/test_dht_log.dart index 0c06c87..0ebdd55 100644 --- a/packages/veilid_support/example/integration_test/test_dht_log.dart +++ b/packages/veilid_support/example/integration_test/test_dht_log.dart @@ -64,7 +64,7 @@ Future Function() makeTestDHTLogAddTruncate({required int stride}) => const chunk = 25; for (var n = 0; n < dataset.length; n += chunk) { print('$n-${n + chunk - 1} '); - final success = await w.tryAddItems(dataset.sublist(n, n + chunk)); + final success = await w.tryAddAll(dataset.sublist(n, n + chunk)); expect(success, isTrue); } }); @@ -73,22 +73,22 @@ Future Function() makeTestDHTLogAddTruncate({required int stride}) => print('get all\n'); { - final dataset2 = await dlog.operate((r) async => r.getItemRange(0)); + final dataset2 = await dlog.operate((r) async => r.getRange(0)); expect(dataset2, equals(dataset)); } { final dataset3 = - await dlog.operate((r) async => r.getItemRange(64, length: 128)); + await dlog.operate((r) async => r.getRange(64, length: 128)); expect(dataset3, equals(dataset.sublist(64, 64 + 128))); } { final dataset4 = - await dlog.operate((r) async => r.getItemRange(0, length: 1000)); + await dlog.operate((r) async => r.getRange(0, length: 1000)); expect(dataset4, equals(dataset.sublist(0, 1000))); } { final dataset5 = - await dlog.operate((r) async => r.getItemRange(500, length: 499)); + await dlog.operate((r) async => r.getRange(500, length: 499)); expect(dataset5, equals(dataset.sublist(500, 999))); } print('truncate\n'); @@ -96,8 +96,8 @@ Future Function() makeTestDHTLogAddTruncate({required int stride}) => await dlog.operateAppend((w) async => w.truncate(w.length - 5)); } { - final dataset6 = await dlog - .operate((r) async => r.getItemRange(500 - 5, length: 499)); + final dataset6 = + await dlog.operate((r) async => r.getRange(500 - 5, length: 499)); expect(dataset6, equals(dataset.sublist(500, 999))); } print('truncate 2\n'); @@ -105,8 +105,8 @@ Future Function() makeTestDHTLogAddTruncate({required int stride}) => await dlog.operateAppend((w) async => w.truncate(w.length - 251)); } { - final dataset7 = await dlog - .operate((r) async => r.getItemRange(500 - 256, length: 499)); + final dataset7 = + await dlog.operate((r) async => r.getRange(500 - 256, length: 499)); expect(dataset7, equals(dataset.sublist(500, 999))); } print('clear\n'); @@ -115,7 +115,7 @@ Future Function() makeTestDHTLogAddTruncate({required int stride}) => } print('get all\n'); { - final dataset8 = await dlog.operate((r) async => r.getItemRange(0)); + final dataset8 = await dlog.operate((r) async => r.getRange(0)); expect(dataset8, isEmpty); } print('delete and close\n'); diff --git a/packages/veilid_support/example/integration_test/test_dht_short_array.dart b/packages/veilid_support/example/integration_test/test_dht_short_array.dart index 637afe0..244b3d5 100644 --- a/packages/veilid_support/example/integration_test/test_dht_short_array.dart +++ b/packages/veilid_support/example/integration_test/test_dht_short_array.dart @@ -64,7 +64,7 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => final res = await arr.operateWrite((w) async { for (var n = 4; n < 8; n++) { print('$n '); - final success = await w.tryAddItem(dataset[n]); + final success = await w.tryAdd(dataset[n]); expect(success, isTrue); } }); @@ -75,8 +75,8 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => { final res = await arr.operateWrite((w) async { print('${dataset.length ~/ 2}-${dataset.length}'); - final success = await w.tryAddItems( - dataset.sublist(dataset.length ~/ 2, dataset.length)); + final success = await w + .tryAddAll(dataset.sublist(dataset.length ~/ 2, dataset.length)); expect(success, isTrue); }); expect(res, isNull); @@ -87,7 +87,7 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => final res = await arr.operateWrite((w) async { for (var n = 0; n < 4; n++) { print('$n '); - final success = await w.tryInsertItem(n, dataset[n]); + final success = await w.tryInsert(n, dataset[n]); expect(success, isTrue); } }); @@ -98,8 +98,8 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => { final res = await arr.operateWrite((w) async { print('8-${dataset.length ~/ 2}'); - final success = await w.tryInsertItems( - 8, dataset.sublist(8, dataset.length ~/ 2)); + final success = + await w.tryInsertAll(8, dataset.sublist(8, dataset.length ~/ 2)); expect(success, isTrue); }); expect(res, isNull); @@ -107,12 +107,12 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => //print('get all\n'); { - final dataset2 = await arr.operate((r) async => r.getItemRange(0)); + final dataset2 = await arr.operate((r) async => r.getRange(0)); expect(dataset2, equals(dataset)); } { final dataset3 = - await arr.operate((r) async => r.getItemRange(64, length: 128)); + await arr.operate((r) async => r.getRange(64, length: 128)); expect(dataset3, equals(dataset.sublist(64, 64 + 128))); } @@ -126,7 +126,7 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => //print('get all\n'); { - final dataset4 = await arr.operate((r) async => r.getItemRange(0)); + final dataset4 = await arr.operate((r) async => r.getRange(0)); expect(dataset4, isEmpty); } diff --git a/packages/veilid_support/lib/dht_support/proto/dht.proto b/packages/veilid_support/lib/dht_support/proto/dht.proto index 6796753..c27915c 100644 --- a/packages/veilid_support/lib/dht_support/proto/dht.proto +++ b/packages/veilid_support/lib/dht_support/proto/dht.proto @@ -62,13 +62,24 @@ message DHTShortArray { // calculated through iteration } +// Reference to data on the DHT +message DHTDataReference { + veilid.TypedKey dht_data = 1; + veilid.TypedKey hash = 2; +} + +// Reference to data on the BlockStore +message BlockStoreDataReference { + veilid.TypedKey block = 1; +} + // DataReference // Pointer to data somewhere in Veilid // Abstraction over DHTData and BlockStore message DataReference { oneof kind { - veilid.TypedKey dht_data = 1; - // TypedKey block = 2; + DHTDataReference dht_data = 1; + BlockStoreDataReference block_store_data = 2; } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index cba15f4..acdc6fe 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -9,7 +9,7 @@ import 'package:meta/meta.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; -import '../interfaces/dht_append.dart'; +import '../interfaces/dht_add.dart'; part 'dht_log_spine.dart'; part 'dht_log_read.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart index 3c054fc..010c76e 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -92,7 +92,7 @@ class DHTLogCubit extends Cubit> Future _refreshInner(void Function(AsyncValue>) emit, {bool forceRefresh = false}) async { - final avElements = await _loadElements(_tail, _count); + final avElements = await loadElements(_tail, _count); final err = avElements.asError; if (err != null) { emit(AsyncValue.error(err.error, err.stackTrace)); @@ -108,9 +108,10 @@ class DHTLogCubit extends Cubit> elements: elements, tail: _tail, count: _count, follow: _follow))); } - Future>>> _loadElements( + Future>>> loadElements( int tail, int count, {bool forceRefresh = false}) async { + await _initWait(); try { final allItems = await _log.operate((reader) async { final length = reader.length; @@ -118,7 +119,7 @@ class DHTLogCubit extends Cubit> final start = (count < end) ? end - count : 0; final offlinePositions = await reader.getOfflinePositions(); - final allItems = (await reader.getItemRange(start, + final allItems = (await reader.getRange(start, length: end - start, forceRefresh: forceRefresh)) ?.indexed .map((x) => DHTLogElementState( diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart index 0a66a01..7f397ac 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -12,7 +12,7 @@ class _DHTLogRead implements DHTLogReadOperations { int get length => _spine.length; @override - Future getItem(int pos, {bool forceRefresh = false}) async { + Future get(int pos, {bool forceRefresh = false}) async { if (pos < 0 || pos >= length) { throw IndexError.withLength(pos, length); } @@ -21,8 +21,8 @@ class _DHTLogRead implements DHTLogReadOperations { return null; } - return lookup.scope((sa) => sa.operate( - (read) => read.getItem(lookup.pos, forceRefresh: forceRefresh))); + return lookup.scope((sa) => + sa.operate((read) => read.get(lookup.pos, forceRefresh: forceRefresh))); } (int, int) _clampStartLen(int start, int? len) { @@ -40,14 +40,14 @@ class _DHTLogRead implements DHTLogReadOperations { } @override - Future?> getItemRange(int start, + Future?> getRange(int start, {int? length, bool forceRefresh = false}) async { final out = []; (start, length) = _clampStartLen(start, length); final chunks = Iterable.generate(length).slices(maxDHTConcurrency).map( - (chunk) => chunk - .map((pos) => getItem(pos + start, forceRefresh: forceRefresh))); + (chunk) => + chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh))); for (final chunk in chunks) { final elems = await chunk.wait; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index 9a8c64e..a47602a 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -3,16 +3,15 @@ part of 'dht_log.dart'; class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { _DHTLogPosition._({ required _DHTLogSpine dhtLogSpine, - required DHTShortArray shortArray, + required this.shortArray, required this.pos, required int segmentNumber, - }) : _segmentShortArray = shortArray, - _dhtLogSpine = dhtLogSpine, + }) : _dhtLogSpine = dhtLogSpine, _segmentNumber = segmentNumber; final int pos; final _DHTLogSpine _dhtLogSpine; - final DHTShortArray _segmentShortArray; + final DHTShortArray shortArray; var _openCount = 1; final int _segmentNumber; final Mutex _mutex = Mutex(); @@ -23,7 +22,7 @@ class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { /// The type of the openable scope @override - FutureOr scoped() => _segmentShortArray; + FutureOr scoped() => shortArray; /// Add a reference to this log @override diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart index 5503051..49acce0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart @@ -17,7 +17,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } final lookup = await _spine.lookupPosition(pos); if (lookup == null) { - throw StateError("can't write to dht log"); + throw StateError("can't lookup position in write to dht log"); } // Write item to the segment @@ -26,7 +26,47 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } @override - Future tryAddItem(Uint8List value) async { + Future swap(int aPos, int bPos) async { + if (aPos < 0 || aPos >= _spine.length) { + throw IndexError.withLength(aPos, _spine.length); + } + if (bPos < 0 || bPos >= _spine.length) { + throw IndexError.withLength(bPos, _spine.length); + } + final aLookup = await _spine.lookupPosition(aPos); + if (aLookup == null) { + throw StateError("can't lookup position a in swap of dht log"); + } + final bLookup = await _spine.lookupPosition(bPos); + if (bLookup == null) { + throw StateError("can't lookup position b in swap of dht log"); + } + + // Swap items in the segments + if (aLookup.shortArray == bLookup.shortArray) { + await aLookup.scope((sa) => sa.operateWriteEventual((aWrite) async { + await aWrite.swap(aLookup.pos, bLookup.pos); + return true; + })); + } else { + final bItem = Output(); + await aLookup.scope( + (sa) => bLookup.scope((sb) => sa.operateWriteEventual((aWrite) async { + if (bItem.value == null) { + final aItem = await aWrite.get(aLookup.pos); + if (aItem == null) { + throw StateError("can't get item for position a in swap"); + } + await sb.operateWriteEventual((bWrite) async => + bWrite.tryWriteItem(bLookup.pos, aItem, output: bItem)); + } + return aWrite.tryWriteItem(aLookup.pos, bItem.value!); + }))); + } + } + + @override + Future tryAdd(Uint8List value) async { // Allocate empty index at the end of the list final insertPos = _spine.length; _spine.allocateTail(1); @@ -44,12 +84,12 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { // We should always be appending at the length throw StateError('appending should be at the end'); } - return write.tryAddItem(value); + return write.tryAdd(value); })); } @override - Future tryAddItems(List values) async { + Future tryAddAll(List values) async { // Allocate empty index at the end of the list final insertPos = _spine.length; _spine.allocateTail(values.length); @@ -79,7 +119,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { // We should always be appending at the length throw StateError('appending should be at the end'); } - return write.tryAddItems(sublistValues); + return write.tryAddAll(sublistValues); })); if (!ok) { success = false; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart index f4b806e..90fcbad 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart @@ -54,13 +54,12 @@ class DHTShortArrayCubit extends Cubit> try { final newState = await _shortArray.operate((reader) async { final offlinePositions = await reader.getOfflinePositions(); - final allItems = - (await reader.getItemRange(0, forceRefresh: forceRefresh)) - ?.indexed - .map((x) => DHTShortArrayElementState( - value: _decodeElement(x.$2), - isOffline: offlinePositions.contains(x.$1))) - .toIList(); + final allItems = (await reader.getRange(0, forceRefresh: forceRefresh)) + ?.indexed + .map((x) => DHTShortArrayElementState( + value: _decodeElement(x.$2), + isOffline: offlinePositions.contains(x.$1))) + .toIList(); return allItems; }); if (newState != null) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 5da8cf8..abe7198 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -12,7 +12,7 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { int get length => _head.length; @override - Future getItem(int pos, {bool forceRefresh = false}) async { + Future get(int pos, {bool forceRefresh = false}) async { if (pos < 0 || pos >= length) { throw IndexError.withLength(pos, length); } @@ -49,14 +49,14 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { } @override - Future?> getItemRange(int start, + Future?> getRange(int start, {int? length, bool forceRefresh = false}) async { final out = []; (start, length) = _clampStartLen(start, length); final chunks = Iterable.generate(length).slices(maxDHTConcurrency).map( - (chunk) => chunk - .map((pos) => getItem(pos + start, forceRefresh: forceRefresh))); + (chunk) => + chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh))); for (final chunk in chunks) { final elems = await chunk.wait; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index c336e47..d002e35 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -16,15 +16,14 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead _DHTShortArrayWrite._(super.head) : super._(); @override - Future tryAddItem(Uint8List value) => - tryInsertItem(_head.length, value); + Future tryAdd(Uint8List value) => tryInsert(_head.length, value); @override - Future tryAddItems(List values) => - tryInsertItems(_head.length, values); + Future tryAddAll(List values) => + tryInsertAll(_head.length, values); @override - Future tryInsertItem(int pos, Uint8List value) async { + Future tryInsert(int pos, Uint8List value) async { if (pos < 0 || pos > _head.length) { throw IndexError.withLength(pos, _head.length); } @@ -44,7 +43,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } @override - Future tryInsertItems(int pos, List values) async { + Future tryInsertAll(int pos, List values) async { if (pos < 0 || pos > _head.length) { throw IndexError.withLength(pos, _head.length); } @@ -100,7 +99,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } @override - Future swapItem(int aPos, int bPos) async { + Future swap(int aPos, int bPos) async { if (aPos < 0 || aPos >= _head.length) { throw IndexError.withLength(aPos, _head.length); } @@ -112,7 +111,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } @override - Future removeItem(int pos, {Output? output}) async { + Future remove(int pos, {Output? output}) async { if (pos < 0 || pos >= _head.length) { throw IndexError.withLength(pos, _head.length); } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_append.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_add.dart similarity index 80% rename from packages/veilid_support/lib/dht_support/src/interfaces/dht_append.dart rename to packages/veilid_support/lib/dht_support/src/interfaces/dht_add.dart index a1f47ee..e2b5ad7 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_append.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_add.dart @@ -12,30 +12,30 @@ abstract class DHTAdd { /// changed before the element could be added or a newer value was found on /// the network. /// Throws a StateError if the container exceeds its maximum size. - Future tryAddItem(Uint8List value); + Future tryAdd(Uint8List value); /// Try to add a list of items to the DHT container. /// Return true if the elements were successfully added, and false if the /// state changed before the element could be added or a newer value was found /// on the network. /// Throws a StateError if the container exceeds its maximum size. - Future tryAddItems(List values); + Future tryAddAll(List values); } extension DHTAddExt on DHTAdd { /// Convenience function: /// Like tryAddItem but also encodes the input value as JSON and parses the /// returned element as JSON - Future tryAppendItemJson( + Future tryAddJson( T newValue, ) => - tryAddItem(jsonEncodeBytes(newValue)); + tryAdd(jsonEncodeBytes(newValue)); /// Convenience function: /// Like tryAddItem but also encodes the input value as a protobuf object /// and parses the returned element as a protobuf object - Future tryAddItemProtobuf( + Future tryAddProtobuf( T newValue, ) => - tryAddItem(newValue.writeToBuffer()); + tryAdd(newValue.writeToBuffer()); } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_insert_remove.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_insert_remove.dart index 1f98a22..fe44368 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_insert_remove.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_insert_remove.dart @@ -14,7 +14,7 @@ abstract class DHTInsertRemove { /// Throws an IndexError if the position removed exceeds the length of /// the container. /// Throws a StateError if the container exceeds its maximum size. - Future tryInsertItem(int pos, Uint8List value); + Future tryInsert(int pos, Uint8List value); /// Try to insert items at position 'pos' of the DHT container. /// Return true if the elements were successfully inserted, and false if the @@ -23,38 +23,33 @@ abstract class DHTInsertRemove { /// Throws an IndexError if the position removed exceeds the length of /// the container. /// Throws a StateError if the container exceeds its maximum size. - Future tryInsertItems(int pos, List values); - - /// Swap items at position 'aPos' and 'bPos' in the DHTArray. - /// Throws an IndexError if either of the positions swapped exceeds the length - /// of the container - Future swapItem(int aPos, int bPos); + Future tryInsertAll(int pos, List values); /// Remove an item at position 'pos' in the DHT container. /// If the remove was successful this returns: /// * outValue will return the prior contents of the element /// Throws an IndexError if the position removed exceeds the length of /// the container. - Future removeItem(int pos, {Output? output}); + Future remove(int pos, {Output? output}); } extension DHTInsertRemoveExt on DHTInsertRemove { /// Convenience function: - /// Like removeItem but also parses the returned element as JSON - Future removeItemJson(T Function(dynamic) fromJson, int pos, + /// Like remove but also parses the returned element as JSON + Future removeJson(T Function(dynamic) fromJson, int pos, {Output? output}) async { final outValueBytes = output == null ? null : Output(); - await removeItem(pos, output: outValueBytes); + await remove(pos, output: outValueBytes); output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b)); } /// Convenience function: - /// Like removeItem but also parses the returned element as JSON - Future removeItemProtobuf( + /// Like remove but also parses the returned element as JSON + Future removeProtobuf( T Function(List) fromBuffer, int pos, {Output? output}) async { final outValueBytes = output == null ? null : Output(); - await removeItem(pos, output: outValueBytes); + await remove(pos, output: outValueBytes); output.mapSave(outValueBytes, fromBuffer); } } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart index 39d49e6..362d688 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart @@ -15,14 +15,14 @@ abstract class DHTRandomRead { /// rather than returning the existing locally stored copy of the elements. /// Throws an IndexError if the 'pos' is not within the length /// of the container. - Future getItem(int pos, {bool forceRefresh = false}); + Future get(int pos, {bool forceRefresh = false}); /// Return a list of a range of items in the DHTArray. If 'forceRefresh' /// is specified, the network will always be checked for newer values /// rather than returning the existing locally stored copy of the elements. /// Throws an IndexError if either 'start' or '(start+length)' is not within /// the length of the container. - Future?> getItemRange(int start, + Future?> getRange(int start, {int? length, bool forceRefresh = false}); /// Get a list of the positions that were written offline and not flushed yet @@ -31,32 +31,32 @@ abstract class DHTRandomRead { extension DHTRandomReadExt on DHTRandomRead { /// Convenience function: - /// Like getItem but also parses the returned element as JSON - Future getItemJson(T Function(dynamic) fromJson, int pos, + /// Like get but also parses the returned element as JSON + Future getJson(T Function(dynamic) fromJson, int pos, {bool forceRefresh = false}) => - getItem(pos, forceRefresh: forceRefresh) + get(pos, forceRefresh: forceRefresh) .then((out) => jsonDecodeOptBytes(fromJson, out)); /// Convenience function: - /// Like getAllItems but also parses the returned elements as JSON - Future?> getItemRangeJson(T Function(dynamic) fromJson, int start, + /// Like getRange but also parses the returned elements as JSON + Future?> getRangeJson(T Function(dynamic) fromJson, int start, {int? length, bool forceRefresh = false}) => - getItemRange(start, length: length, forceRefresh: forceRefresh) + getRange(start, length: length, forceRefresh: forceRefresh) .then((out) => out?.map(fromJson).toList()); /// Convenience function: - /// Like getItem but also parses the returned element as a protobuf object - Future getItemProtobuf( + /// Like get but also parses the returned element as a protobuf object + Future getProtobuf( T Function(List) fromBuffer, int pos, {bool forceRefresh = false}) => - getItem(pos, forceRefresh: forceRefresh) + get(pos, forceRefresh: forceRefresh) .then((out) => (out == null) ? null : fromBuffer(out)); /// Convenience function: - /// Like getAllItems but also parses the returned elements as protobuf objects - Future?> getItemRangeProtobuf( + /// Like getRange but also parses the returned elements as protobuf objects + Future?> getRangeProtobuf( T Function(List) fromBuffer, int start, {int? length, bool forceRefresh = false}) => - getItemRange(start, length: length, forceRefresh: forceRefresh) + getRange(start, length: length, forceRefresh: forceRefresh) .then((out) => out?.map(fromBuffer).toList()); } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart index 0d8f3ac..5b3f032 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart @@ -23,6 +23,11 @@ abstract class DHTRandomWrite { /// of the container. Future tryWriteItem(int pos, Uint8List newValue, {Output? output}); + + /// Swap items at position 'aPos' and 'bPos' in the DHTArray. + /// Throws an IndexError if either of the positions swapped exceeds the length + /// of the container + Future swap(int aPos, int bPos); } extension DHTRandomWriteExt on DHTRandomWrite { diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart b/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart index dd95cac..57d0979 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/interfaces.dart @@ -1,4 +1,4 @@ -export 'dht_append.dart'; +export 'dht_add.dart'; export 'dht_clear.dart'; export 'dht_closeable.dart'; export 'dht_insert_remove.dart'; diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index 9714770..4d5b9dd 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -6,6 +6,7 @@ import 'package:async_tools/async_tools.dart'; import 'package:charcode/charcode.dart'; import 'package:equatable/equatable.dart'; import 'package:meta/meta.dart'; +import 'package:protobuf/protobuf.dart'; import '../veilid_support.dart'; @@ -128,13 +129,13 @@ class TableDBArray { }); } - Future> getRange(int start, int end) async { + Future> getRange(int start, [int? end]) async { await _initWait(); return _mutex.protect(() async { if (!_open) { throw StateError('not open'); } - return _getRangeInner(start, end); + return _getRangeInner(start, end ?? _length); }); } @@ -629,3 +630,36 @@ class TableDBArray { final StreamController _changeStream = StreamController.broadcast(); } + +extension TableDBArrayExt on TableDBArray { + /// Convenience function: + /// Like get but also parses the returned element as JSON + Future getJson( + T Function(dynamic) fromJson, + int pos, + ) => + get( + pos, + ).then((out) => jsonDecodeOptBytes(fromJson, out)); + + /// Convenience function: + /// Like getRange but also parses the returned elements as JSON + Future?> getRangeJson(T Function(dynamic) fromJson, int start, + [int? end]) => + getRange(start, end ?? _length).then((out) => out.map(fromJson).toList()); + + /// Convenience function: + /// Like get but also parses the returned element as a protobuf object + Future getProtobuf( + T Function(List) fromBuffer, + int pos, + ) => + get(pos).then(fromBuffer); + + /// Convenience function: + /// Like getRange but also parses the returned elements as protobuf objects + Future?> getRangeProtobuf( + T Function(List) fromBuffer, int start, [int? end]) => + getRange(start, end ?? _length) + .then((out) => out.map(fromBuffer).toList()); +} diff --git a/packages/veilid_support/lib/src/veilid_crypto.dart b/packages/veilid_support/lib/src/veilid_crypto.dart index 75087fb..565459c 100644 --- a/packages/veilid_support/lib/src/veilid_crypto.dart +++ b/packages/veilid_support/lib/src/veilid_crypto.dart @@ -13,16 +13,16 @@ abstract class VeilidCrypto { class VeilidCryptoPrivate implements VeilidCrypto { VeilidCryptoPrivate._(VeilidCryptoSystem cryptoSystem, SharedSecret secretKey) : _cryptoSystem = cryptoSystem, - _secretKey = secretKey; + _secret = secretKey; final VeilidCryptoSystem _cryptoSystem; - final SharedSecret _secretKey; + final SharedSecret _secret; static Future fromTypedKey( - TypedKey typedKey, String domain) async { - final cryptoSystem = await Veilid.instance.getCryptoSystem(typedKey.kind); - final keyMaterial = Uint8List(0) - ..addAll(typedKey.value.decode()) - ..addAll(utf8.encode(domain)); + TypedKey typedSecret, String domain) async { + final cryptoSystem = + await Veilid.instance.getCryptoSystem(typedSecret.kind); + final keyMaterial = Uint8List.fromList( + [...typedSecret.value.decode(), ...utf8.encode(domain)]); final secretKey = await cryptoSystem.generateHash(keyMaterial); return VeilidCryptoPrivate._(cryptoSystem, secretKey); } @@ -35,18 +35,18 @@ class VeilidCryptoPrivate implements VeilidCrypto { } static Future fromSharedSecret( - CryptoKind kind, SharedSecret secretKey) async { + CryptoKind kind, SharedSecret sharedSecret) async { final cryptoSystem = await Veilid.instance.getCryptoSystem(kind); - return VeilidCryptoPrivate._(cryptoSystem, secretKey); + return VeilidCryptoPrivate._(cryptoSystem, sharedSecret); } @override Future encrypt(Uint8List data) => - _cryptoSystem.encryptNoAuthWithNonce(data, _secretKey); + _cryptoSystem.encryptNoAuthWithNonce(data, _secret); @override Future decrypt(Uint8List data) => - _cryptoSystem.decryptNoAuthWithNonce(data, _secretKey); + _cryptoSystem.decryptNoAuthWithNonce(data, _secret); } ////////////////////////////////////