checkpoint

This commit is contained in:
Christien Rioux 2024-05-28 22:01:50 -04:00
parent 8a5af51ec7
commit 37f6ca19f7
26 changed files with 357 additions and 166 deletions

View file

@ -1,4 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart'; import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart';
@ -15,7 +17,6 @@ class RenderStateElement {
{required this.message, {required this.message,
required this.isLocal, required this.isLocal,
this.reconciled = false, this.reconciled = false,
this.reconciledOffline = false,
this.sent = false, this.sent = false,
this.sentOffline = false}); this.sentOffline = false});
@ -27,7 +28,7 @@ class RenderStateElement {
if (sent && !sentOffline) { if (sent && !sentOffline) {
return MessageSendState.delivered; return MessageSendState.delivered;
} }
if (reconciled && !reconciledOffline) { if (reconciled) {
return MessageSendState.sent; return MessageSendState.sent;
} }
return MessageSendState.sending; return MessageSendState.sending;
@ -36,7 +37,6 @@ class RenderStateElement {
proto.Message message; proto.Message message;
bool isLocal; bool isLocal;
bool reconciled; bool reconciled;
bool reconciledOffline;
bool sent; bool sent;
bool sentOffline; bool sentOffline;
} }
@ -96,7 +96,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
); );
// Make crypto // Make crypto
await _initMessagesCrypto(); await _initCrypto();
// Reconciled messages key // Reconciled messages key
await _initReconciledMessagesCubit(); await _initReconciledMessagesCubit();
@ -109,9 +109,13 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
} }
// Make crypto // Make crypto
Future<void> _initMessagesCrypto() async { Future<void> _initCrypto() async {
_messagesCrypto = await _activeAccountInfo _messagesCrypto = await _activeAccountInfo
.makeConversationCrypto(_remoteIdentityPublicKey); .makeConversationCrypto(_remoteIdentityPublicKey);
_localMessagesCryptoSystem =
await Veilid.instance.getCryptoSystem(_localMessagesRecordKey.kind);
_identityCryptoSystem =
await _activeAccountInfo.localAccount.identityMaster.identityCrypto;
} }
// Open local messages key // Open local messages key
@ -144,13 +148,16 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_updateRcvdMessagesState(_rcvdMessagesCubit!.state); _updateRcvdMessagesState(_rcvdMessagesCubit!.state);
} }
Future<VeilidCrypto> _makeLocalMessagesCrypto() async =>
VeilidCryptoPrivate.fromTypedKey(
_activeAccountInfo.userLogin.identitySecret, 'tabledb');
// Open reconciled chat record key // Open reconciled chat record key
Future<void> _initReconciledMessagesCubit() async { Future<void> _initReconciledMessagesCubit() async {
final tableName = _localConversationRecordKey.toString(); final tableName = _localConversationRecordKey.toString();
xxx whats the right encryption for reconciled messages cubit? final crypto = await _makeLocalMessagesCrypto();
final crypto = VeilidCryptoPrivate.fromTypedKey(kind, secretKey);
_reconciledMessagesCubit = TableDBArrayCubit( _reconciledMessagesCubit = TableDBArrayCubit(
open: () async => TableDBArray.make(table: tableName, crypto: crypto), open: () async => TableDBArray.make(table: tableName, crypto: crypto),
decodeElement: proto.Message.fromBuffer); decodeElement: proto.Message.fromBuffer);
@ -183,8 +190,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
if (sentMessages == null) { if (sentMessages == null) {
return; return;
} }
// Don't reconcile, the sending machine will have already added
// to the reconciliation queue on that machine await _reconcileMessages(sentMessages, _sentMessagesCubit);
// Update the view // Update the view
_renderState(); _renderState();
@ -197,16 +204,18 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
return; return;
} }
await _reconcileMessages(rcvdMessages, _rcvdMessagesCubit);
singleFuture(_rcvdMessagesCubit!, () async { singleFuture(_rcvdMessagesCubit!, () async {
// Get the timestamp of our most recent reconciled message // Get the timestamp of our most recent reconciled message
final lastReconciledMessageTs = final lastReconciledMessageTs =
await _reconciledMessagesCubit!.operate((r) async { await _reconciledMessagesCubit!.operate((arr) async {
final len = r.length; final len = arr.length;
if (len == 0) { if (len == 0) {
return null; return null;
} else { } else {
final lastMessage = final lastMessage =
await r.getItemProtobuf(proto.Message.fromBuffer, len - 1); await arr.getProtobuf(proto.Message.fromBuffer, len - 1);
if (lastMessage == null) { if (lastMessage == null) {
throw StateError('should have gotten last message'); throw StateError('should have gotten last message');
} }
@ -232,11 +241,9 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}); });
} }
// Called when the reconciled messages list gets a change // Called when the reconciled messages window gets a change
// This can happen when multiple clients for the same identity are
// reading and reconciling the same remote chat
void _updateReconciledMessagesState( void _updateReconciledMessagesState(
DHTLogBusyState<proto.Message> avmessages) { TableDBArrayBusyState<proto.Message> avmessages) {
// Update the view // Update the view
_renderState(); _renderState();
} }
@ -252,10 +259,62 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// }); // });
} }
Future<Uint8List> _hashSignature(proto.Signature signature) async =>
(await _localMessagesCryptoSystem
.generateHash(signature.toVeilid().decode()))
.decode();
Future<void> _signMessage(proto.Message message) async {
// Generate data to sign
final data = Uint8List.fromList(utf8.encode(message.writeToJson()));
// Sign with our identity
final signature = await _identityCryptoSystem.sign(
_activeAccountInfo.localAccount.identityMaster.identityPublicKey,
_activeAccountInfo.userLogin.identitySecret.value,
data);
// Add to the message
message.signature = signature.toProto();
}
Future<void> _processMessageToSend(
proto.Message message, proto.Message? previousMessage) async {
// Get the previous message if we don't have one
previousMessage ??= await _sentMessagesCubit!.operate((r) async =>
r.length == 0
? null
: await r.getProtobuf(proto.Message.fromBuffer, r.length - 1));
if (previousMessage == null) {
// If there's no last sent message,
// we start at a hash of the identity public key
message.id = (await _localMessagesCryptoSystem.generateHash(
_activeAccountInfo.localAccount.identityMaster.identityPublicKey
.decode()))
.decode();
} else {
// If there is a last message, we generate the hash
// of the last message's signature and use it as our next id
message.id = await _hashSignature(previousMessage.signature);
}
// Now sign it
await _signMessage(message);
}
// Async process to send messages in the background // Async process to send messages in the background
Future<void> _processSendingMessages(IList<proto.Message> messages) async { Future<void> _processSendingMessages(IList<proto.Message> messages) async {
// Go through and assign ids to all the messages in order
proto.Message? previousMessage;
final processedMessages = messages.toList();
for (final message in processedMessages) {
await _processMessageToSend(message, previousMessage);
previousMessage = message;
}
await _sentMessagesCubit!.operateAppendEventual((writer) => await _sentMessagesCubit!.operateAppendEventual((writer) =>
writer.tryAddItems(messages.map((m) => m.writeToBuffer()).toList())); writer.tryAddAll(messages.map((m) => m.writeToBuffer()).toList()));
} }
Future<void> _reconcileMessagesInner( Future<void> _reconcileMessagesInner(
@ -345,9 +404,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
keyMapper: (x) => x.value.timestamp, keyMapper: (x) => x.value.timestamp,
values: sentMessages.elements, values: sentMessages.elements,
); );
final reconciledMessagesMap = final reconciledMessagesMap = IMap<Int64, proto.Message>.fromValues(
IMap<Int64, DHTLogElementState<proto.Message>>.fromValues( keyMapper: (x) => x.timestamp,
keyMapper: (x) => x.value.timestamp,
values: reconciledMessages.elements, values: reconciledMessages.elements,
); );
final sendingMessagesMap = IMap<Int64, proto.Message>.fromValues( final sendingMessagesMap = IMap<Int64, proto.Message>.fromValues(
@ -416,7 +474,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
emit(AsyncValue.data(renderedState)); emit(AsyncValue.data(renderedState));
} }
void addTextMessage({required proto.Message_Text messageText}) { void sendTextMessage({required proto.Message_Text messageText}) {
final message = proto.Message() final message = proto.Message()
..id = generateNextId() ..id = generateNextId()
..author = _activeAccountInfo.localAccount.identityMaster ..author = _activeAccountInfo.localAccount.identityMaster
@ -425,7 +483,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
..timestamp = Veilid.instance.now().toInt64() ..timestamp = Veilid.instance.now().toInt64()
..text = messageText; ..text = messageText;
_unreconciledMessagesQueue.addSync(message);
_sendingMessagesQueue.addSync(message); _sendingMessagesQueue.addSync(message);
// Update the view // Update the view
@ -456,15 +513,18 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
final TypedKey _remoteMessagesRecordKey; final TypedKey _remoteMessagesRecordKey;
late final VeilidCrypto _messagesCrypto; late final VeilidCrypto _messagesCrypto;
late final VeilidCryptoSystem _localMessagesCryptoSystem;
late final VeilidCryptoSystem _identityCryptoSystem;
DHTLogCubit<proto.Message>? _sentMessagesCubit; DHTLogCubit<proto.Message>? _sentMessagesCubit;
DHTLogCubit<proto.Message>? _rcvdMessagesCubit; DHTLogCubit<proto.Message>? _rcvdMessagesCubit;
TableDBArrayCubit<proto.Message>? _reconciledMessagesCubit; TableDBArrayCubit<proto.Message>? _reconciledMessagesCubit;
late final PersistentQueue<proto.Message> _unreconciledMessagesQueue; late final PersistentQueue<proto.Message> _unreconciledMessagesQueue; xxx can we eliminate this? and make rcvd messages cubit listener work like sent?
late final PersistentQueue<proto.Message> _sendingMessagesQueue; late final PersistentQueue<proto.Message> _sendingMessagesQueue;
StreamSubscription<DHTLogBusyState<proto.Message>>? _sentSubscription; StreamSubscription<DHTLogBusyState<proto.Message>>? _sentSubscription;
StreamSubscription<DHTLogBusyState<proto.Message>>? _rcvdSubscription; StreamSubscription<DHTLogBusyState<proto.Message>>? _rcvdSubscription;
StreamSubscription<DHTLogBusyState<proto.Message>>? _reconciledSubscription; StreamSubscription<TableDBArrayBusyState<proto.Message>>?
_reconciledSubscription;
} }

View file

@ -162,7 +162,7 @@ class ChatComponent extends StatelessWidget {
..viewLimit = viewLimit ?? 0; ..viewLimit = viewLimit ?? 0;
protoMessageText.attachments.addAll(attachments); protoMessageText.attachments.addAll(attachments);
_messagesCubit.addTextMessage(messageText: protoMessageText); _messagesCubit.sendTextMessage(messageText: protoMessageText);
} }
void _handleSendPressed(types.PartialText message) { void _handleSendPressed(types.PartialText message) {

View file

@ -65,7 +65,7 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat>
await operateWrite((writer) async { await operateWrite((writer) async {
// See if we have added this chat already // See if we have added this chat already
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final cbuf = await writer.getItem(i); final cbuf = await writer.get(i);
if (cbuf == null) { if (cbuf == null) {
throw Exception('Failed to get chat'); throw Exception('Failed to get chat');
} }
@ -84,7 +84,7 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat>
..remoteConversationRecordKey = remoteConversationRecordKey.toProto(); ..remoteConversationRecordKey = remoteConversationRecordKey.toProto();
// Add chat // Add chat
final added = await writer.tryAddItem(chat.writeToBuffer()); final added = await writer.tryAdd(chat.writeToBuffer());
if (!added) { if (!added) {
throw Exception('Failed to add chat'); throw Exception('Failed to add chat');
} }
@ -106,15 +106,14 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat>
activeChatCubit.setActiveChat(null); activeChatCubit.setActiveChat(null);
} }
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final c = final c = await writer.getProtobuf(proto.Chat.fromBuffer, i);
await writer.getItemProtobuf(proto.Chat.fromBuffer, i);
if (c == null) { if (c == null) {
throw Exception('Failed to get chat'); throw Exception('Failed to get chat');
} }
if (c.localConversationRecordKey == if (c.localConversationRecordKey ==
localConversationRecordKeyProto) { localConversationRecordKeyProto) {
// Found the right chat // Found the right chat
await writer.removeItem(i); await writer.remove(i);
return c; return c;
} }
} }

View file

@ -159,7 +159,7 @@ class ContactInvitationListCubit
// Add ContactInvitationRecord to account's list // Add ContactInvitationRecord to account's list
// if this fails, don't keep retrying, user can try again later // if this fails, don't keep retrying, user can try again later
await operateWrite((writer) async { await operateWrite((writer) async {
if (await writer.tryAddItem(cinvrec.writeToBuffer()) == false) { if (await writer.tryAdd(cinvrec.writeToBuffer()) == false) {
throw Exception('Failed to add contact invitation record'); throw Exception('Failed to add contact invitation record');
} }
}); });
@ -179,14 +179,14 @@ class ContactInvitationListCubit
// Remove ContactInvitationRecord from account's list // Remove ContactInvitationRecord from account's list
final deletedItem = await operateWrite((writer) async { final deletedItem = await operateWrite((writer) async {
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final item = await writer.getItemProtobuf( final item = await writer.getProtobuf(
proto.ContactInvitationRecord.fromBuffer, i); proto.ContactInvitationRecord.fromBuffer, i);
if (item == null) { if (item == null) {
throw Exception('Failed to get contact invitation record'); throw Exception('Failed to get contact invitation record');
} }
if (item.contactRequestInbox.recordKey.toVeilid() == if (item.contactRequestInbox.recordKey.toVeilid() ==
contactRequestInboxRecordKey) { contactRequestInboxRecordKey) {
await writer.removeItem(i); await writer.remove(i);
return item; return item;
} }
} }

View file

@ -56,7 +56,7 @@ class ContactListCubit extends DHTShortArrayCubit<proto.Contact> {
// Add Contact to account's list // Add Contact to account's list
// if this fails, don't keep retrying, user can try again later // if this fails, don't keep retrying, user can try again later
await operateWrite((writer) async { await operateWrite((writer) async {
if (!await writer.tryAddItem(contact.writeToBuffer())) { if (!await writer.tryAdd(contact.writeToBuffer())) {
throw Exception('Failed to add contact record'); throw Exception('Failed to add contact record');
} }
}); });
@ -72,13 +72,13 @@ class ContactListCubit extends DHTShortArrayCubit<proto.Contact> {
// Remove Contact from account's list // Remove Contact from account's list
final deletedItem = await operateWrite((writer) async { final deletedItem = await operateWrite((writer) async {
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final item = await writer.getItemProtobuf(proto.Contact.fromBuffer, i); final item = await writer.getProtobuf(proto.Contact.fromBuffer, i);
if (item == null) { if (item == null) {
throw Exception('Failed to get contact'); throw Exception('Failed to get contact');
} }
if (item.localConversationRecordKey == if (item.localConversationRecordKey ==
contact.localConversationRecordKey) { contact.localConversationRecordKey) {
await writer.removeItem(i); await writer.remove(i);
return item; return item;
} }
} }

View file

@ -486,7 +486,7 @@ class Message_ControlDelete extends $pb.GeneratedMessage {
factory Message_ControlDelete.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); factory Message_ControlDelete.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlDelete', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create) static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlDelete', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create)
..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'ids', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) ..p<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'ids', $pb.PbFieldType.PY)
..hasRequiredFields = false ..hasRequiredFields = false
; ;
@ -512,7 +512,7 @@ class Message_ControlDelete extends $pb.GeneratedMessage {
static Message_ControlDelete? _defaultInstance; static Message_ControlDelete? _defaultInstance;
@$pb.TagNumber(1) @$pb.TagNumber(1)
$core.List<$0.TypedKey> get ids => $_getList(0); $core.List<$core.List<$core.int>> get ids => $_getList(0);
} }
class Message_ControlErase extends $pb.GeneratedMessage { class Message_ControlErase extends $pb.GeneratedMessage {
@ -696,8 +696,8 @@ class Message_ControlModeration extends $pb.GeneratedMessage {
factory Message_ControlModeration.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); factory Message_ControlModeration.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlModeration', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create) static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlModeration', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create)
..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'acceptedIds', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) ..p<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'acceptedIds', $pb.PbFieldType.PY)
..pc<$0.TypedKey>(2, _omitFieldNames ? '' : 'rejectedIds', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create) ..p<$core.List<$core.int>>(2, _omitFieldNames ? '' : 'rejectedIds', $pb.PbFieldType.PY)
..hasRequiredFields = false ..hasRequiredFields = false
; ;
@ -723,10 +723,46 @@ class Message_ControlModeration extends $pb.GeneratedMessage {
static Message_ControlModeration? _defaultInstance; static Message_ControlModeration? _defaultInstance;
@$pb.TagNumber(1) @$pb.TagNumber(1)
$core.List<$0.TypedKey> get acceptedIds => $_getList(0); $core.List<$core.List<$core.int>> get acceptedIds => $_getList(0);
@$pb.TagNumber(2) @$pb.TagNumber(2)
$core.List<$0.TypedKey> get rejectedIds => $_getList(1); $core.List<$core.List<$core.int>> get rejectedIds => $_getList(1);
}
class Message_ControlReadReceipt extends $pb.GeneratedMessage {
factory Message_ControlReadReceipt() => create();
Message_ControlReadReceipt._() : super();
factory Message_ControlReadReceipt.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory Message_ControlReadReceipt.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Message.ControlReadReceipt', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create)
..p<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'readIds', $pb.PbFieldType.PY)
..hasRequiredFields = false
;
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
Message_ControlReadReceipt clone() => Message_ControlReadReceipt()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
Message_ControlReadReceipt copyWith(void Function(Message_ControlReadReceipt) updates) => super.copyWith((message) => updates(message as Message_ControlReadReceipt)) as Message_ControlReadReceipt;
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static Message_ControlReadReceipt create() => Message_ControlReadReceipt._();
Message_ControlReadReceipt createEmptyInstance() => create();
static $pb.PbList<Message_ControlReadReceipt> createRepeated() => $pb.PbList<Message_ControlReadReceipt>();
@$core.pragma('dart2js:noInline')
static Message_ControlReadReceipt getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<Message_ControlReadReceipt>(create);
static Message_ControlReadReceipt? _defaultInstance;
@$pb.TagNumber(1)
$core.List<$core.List<$core.int>> get readIds => $_getList(0);
} }
enum Message_Kind { enum Message_Kind {

View file

@ -172,7 +172,7 @@ const Message$json = {
{'1': 'moderation', '3': 11, '4': 1, '5': 11, '6': '.veilidchat.Message.ControlModeration', '9': 0, '10': 'moderation'}, {'1': 'moderation', '3': 11, '4': 1, '5': 11, '6': '.veilidchat.Message.ControlModeration', '9': 0, '10': 'moderation'},
{'1': 'signature', '3': 12, '4': 1, '5': 11, '6': '.veilid.Signature', '10': 'signature'}, {'1': 'signature', '3': 12, '4': 1, '5': 11, '6': '.veilid.Signature', '10': 'signature'},
], ],
'3': [Message_Text$json, Message_Secret$json, Message_ControlDelete$json, Message_ControlErase$json, Message_ControlSettings$json, Message_ControlPermissions$json, Message_ControlMembership$json, Message_ControlModeration$json], '3': [Message_Text$json, Message_Secret$json, Message_ControlDelete$json, Message_ControlErase$json, Message_ControlSettings$json, Message_ControlPermissions$json, Message_ControlMembership$json, Message_ControlModeration$json, Message_ControlReadReceipt$json],
'8': [ '8': [
{'1': 'kind'}, {'1': 'kind'},
], ],
@ -208,7 +208,7 @@ const Message_Secret$json = {
const Message_ControlDelete$json = { const Message_ControlDelete$json = {
'1': 'ControlDelete', '1': 'ControlDelete',
'2': [ '2': [
{'1': 'ids', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'ids'}, {'1': 'ids', '3': 1, '4': 3, '5': 12, '10': 'ids'},
], ],
}; };
@ -248,8 +248,16 @@ const Message_ControlMembership$json = {
const Message_ControlModeration$json = { const Message_ControlModeration$json = {
'1': 'ControlModeration', '1': 'ControlModeration',
'2': [ '2': [
{'1': 'accepted_ids', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'acceptedIds'}, {'1': 'accepted_ids', '3': 1, '4': 3, '5': 12, '10': 'acceptedIds'},
{'1': 'rejected_ids', '3': 2, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'rejectedIds'}, {'1': 'rejected_ids', '3': 2, '4': 3, '5': 12, '10': 'rejectedIds'},
],
};
@$core.Deprecated('Use messageDescriptor instead')
const Message_ControlReadReceipt$json = {
'1': 'ControlReadReceipt',
'2': [
{'1': 'read_ids', '3': 1, '4': 3, '5': 12, '10': 'readIds'},
], ],
}; };
@ -272,15 +280,15 @@ final $typed_data.Uint8List messageDescriptor = $convert.base64Decode(
'HQoKdmlld19saW1pdBgFIAEoDVIJdmlld0xpbWl0EjgKC2F0dGFjaG1lbnRzGAYgAygLMhYudm' 'HQoKdmlld19saW1pdBgFIAEoDVIJdmlld0xpbWl0EjgKC2F0dGFjaG1lbnRzGAYgAygLMhYudm'
'VpbGlkY2hhdC5BdHRhY2htZW50UgthdHRhY2htZW50c0IICgZfdG9waWNCCwoJX3JlcGx5X2lk' 'VpbGlkY2hhdC5BdHRhY2htZW50UgthdHRhY2htZW50c0IICgZfdG9waWNCCwoJX3JlcGx5X2lk'
'GkgKBlNlY3JldBIeCgpjaXBoZXJ0ZXh0GAEgASgMUgpjaXBoZXJ0ZXh0Eh4KCmV4cGlyYXRpb2' 'GkgKBlNlY3JldBIeCgpjaXBoZXJ0ZXh0GAEgASgMUgpjaXBoZXJ0ZXh0Eh4KCmV4cGlyYXRpb2'
'4YAiABKARSCmV4cGlyYXRpb24aMwoNQ29udHJvbERlbGV0ZRIiCgNpZHMYASADKAsyEC52ZWls' '4YAiABKARSCmV4cGlyYXRpb24aIQoNQ29udHJvbERlbGV0ZRIQCgNpZHMYASADKAxSA2lkcxos'
'aWQuVHlwZWRLZXlSA2lkcxosCgxDb250cm9sRXJhc2USHAoJdGltZXN0YW1wGAEgASgEUgl0aW' 'CgxDb250cm9sRXJhc2USHAoJdGltZXN0YW1wGAEgASgEUgl0aW1lc3RhbXAaRwoPQ29udHJvbF'
'1lc3RhbXAaRwoPQ29udHJvbFNldHRpbmdzEjQKCHNldHRpbmdzGAEgASgLMhgudmVpbGlkY2hh' 'NldHRpbmdzEjQKCHNldHRpbmdzGAEgASgLMhgudmVpbGlkY2hhdC5DaGF0U2V0dGluZ3NSCHNl'
'dC5DaGF0U2V0dGluZ3NSCHNldHRpbmdzGk8KEkNvbnRyb2xQZXJtaXNzaW9ucxI5CgtwZXJtaX' 'dHRpbmdzGk8KEkNvbnRyb2xQZXJtaXNzaW9ucxI5CgtwZXJtaXNzaW9ucxgBIAEoCzIXLnZlaW'
'NzaW9ucxgBIAEoCzIXLnZlaWxpZGNoYXQuUGVybWlzc2lvbnNSC3Blcm1pc3Npb25zGksKEUNv' 'xpZGNoYXQuUGVybWlzc2lvbnNSC3Blcm1pc3Npb25zGksKEUNvbnRyb2xNZW1iZXJzaGlwEjYK'
'bnRyb2xNZW1iZXJzaGlwEjYKCm1lbWJlcnNoaXAYASABKAsyFi52ZWlsaWRjaGF0Lk1lbWJlcn' 'Cm1lbWJlcnNoaXAYASABKAsyFi52ZWlsaWRjaGF0Lk1lbWJlcnNoaXBSCm1lbWJlcnNoaXAaWQ'
'NoaXBSCm1lbWJlcnNoaXAafQoRQ29udHJvbE1vZGVyYXRpb24SMwoMYWNjZXB0ZWRfaWRzGAEg' 'oRQ29udHJvbE1vZGVyYXRpb24SIQoMYWNjZXB0ZWRfaWRzGAEgAygMUgthY2NlcHRlZElkcxIh'
'AygLMhAudmVpbGlkLlR5cGVkS2V5UgthY2NlcHRlZElkcxIzCgxyZWplY3RlZF9pZHMYAiADKA' 'CgxyZWplY3RlZF9pZHMYAiADKAxSC3JlamVjdGVkSWRzGi8KEkNvbnRyb2xSZWFkUmVjZWlwdB'
'syEC52ZWlsaWQuVHlwZWRLZXlSC3JlamVjdGVkSWRzQgYKBGtpbmQ='); 'IZCghyZWFkX2lkcxgBIAMoDFIHcmVhZElkc0IGCgRraW5k');
@$core.Deprecated('Use reconciledMessageDescriptor instead') @$core.Deprecated('Use reconciledMessageDescriptor instead')
const ReconciledMessage$json = { const ReconciledMessage$json = {

View file

@ -146,7 +146,7 @@ message Message {
// A 'delete' control message // A 'delete' control message
// Deletes a set of messages by their ids // Deletes a set of messages by their ids
message ControlDelete { message ControlDelete {
repeated veilid.TypedKey ids = 1; repeated bytes ids = 1;
} }
// An 'erase' control message // An 'erase' control message
// Deletes a set of messages from before some timestamp // Deletes a set of messages from before some timestamp
@ -175,8 +175,13 @@ message Message {
// A 'moderation' control message // A 'moderation' control message
// Accepts or rejects a set of messages // Accepts or rejects a set of messages
message ControlModeration { message ControlModeration {
repeated veilid.TypedKey accepted_ids = 1; repeated bytes accepted_ids = 1;
repeated veilid.TypedKey rejected_ids = 2; repeated bytes rejected_ids = 2;
}
// A 'read receipt' control message
message ControlReadReceipt {
repeated bytes read_ids = 1;
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////

View file

@ -64,7 +64,7 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
const chunk = 25; const chunk = 25;
for (var n = 0; n < dataset.length; n += chunk) { for (var n = 0; n < dataset.length; n += chunk) {
print('$n-${n + chunk - 1} '); print('$n-${n + chunk - 1} ');
final success = await w.tryAddItems(dataset.sublist(n, n + chunk)); final success = await w.tryAddAll(dataset.sublist(n, n + chunk));
expect(success, isTrue); expect(success, isTrue);
} }
}); });
@ -73,22 +73,22 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
print('get all\n'); print('get all\n');
{ {
final dataset2 = await dlog.operate((r) async => r.getItemRange(0)); final dataset2 = await dlog.operate((r) async => r.getRange(0));
expect(dataset2, equals(dataset)); expect(dataset2, equals(dataset));
} }
{ {
final dataset3 = final dataset3 =
await dlog.operate((r) async => r.getItemRange(64, length: 128)); await dlog.operate((r) async => r.getRange(64, length: 128));
expect(dataset3, equals(dataset.sublist(64, 64 + 128))); expect(dataset3, equals(dataset.sublist(64, 64 + 128)));
} }
{ {
final dataset4 = final dataset4 =
await dlog.operate((r) async => r.getItemRange(0, length: 1000)); await dlog.operate((r) async => r.getRange(0, length: 1000));
expect(dataset4, equals(dataset.sublist(0, 1000))); expect(dataset4, equals(dataset.sublist(0, 1000)));
} }
{ {
final dataset5 = final dataset5 =
await dlog.operate((r) async => r.getItemRange(500, length: 499)); await dlog.operate((r) async => r.getRange(500, length: 499));
expect(dataset5, equals(dataset.sublist(500, 999))); expect(dataset5, equals(dataset.sublist(500, 999)));
} }
print('truncate\n'); print('truncate\n');
@ -96,8 +96,8 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
await dlog.operateAppend((w) async => w.truncate(w.length - 5)); await dlog.operateAppend((w) async => w.truncate(w.length - 5));
} }
{ {
final dataset6 = await dlog final dataset6 =
.operate((r) async => r.getItemRange(500 - 5, length: 499)); await dlog.operate((r) async => r.getRange(500 - 5, length: 499));
expect(dataset6, equals(dataset.sublist(500, 999))); expect(dataset6, equals(dataset.sublist(500, 999)));
} }
print('truncate 2\n'); print('truncate 2\n');
@ -105,8 +105,8 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
await dlog.operateAppend((w) async => w.truncate(w.length - 251)); await dlog.operateAppend((w) async => w.truncate(w.length - 251));
} }
{ {
final dataset7 = await dlog final dataset7 =
.operate((r) async => r.getItemRange(500 - 256, length: 499)); await dlog.operate((r) async => r.getRange(500 - 256, length: 499));
expect(dataset7, equals(dataset.sublist(500, 999))); expect(dataset7, equals(dataset.sublist(500, 999)));
} }
print('clear\n'); print('clear\n');
@ -115,7 +115,7 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
} }
print('get all\n'); print('get all\n');
{ {
final dataset8 = await dlog.operate((r) async => r.getItemRange(0)); final dataset8 = await dlog.operate((r) async => r.getRange(0));
expect(dataset8, isEmpty); expect(dataset8, isEmpty);
} }
print('delete and close\n'); print('delete and close\n');

View file

@ -64,7 +64,7 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
final res = await arr.operateWrite((w) async { final res = await arr.operateWrite((w) async {
for (var n = 4; n < 8; n++) { for (var n = 4; n < 8; n++) {
print('$n '); print('$n ');
final success = await w.tryAddItem(dataset[n]); final success = await w.tryAdd(dataset[n]);
expect(success, isTrue); expect(success, isTrue);
} }
}); });
@ -75,8 +75,8 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
{ {
final res = await arr.operateWrite((w) async { final res = await arr.operateWrite((w) async {
print('${dataset.length ~/ 2}-${dataset.length}'); print('${dataset.length ~/ 2}-${dataset.length}');
final success = await w.tryAddItems( final success = await w
dataset.sublist(dataset.length ~/ 2, dataset.length)); .tryAddAll(dataset.sublist(dataset.length ~/ 2, dataset.length));
expect(success, isTrue); expect(success, isTrue);
}); });
expect(res, isNull); expect(res, isNull);
@ -87,7 +87,7 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
final res = await arr.operateWrite((w) async { final res = await arr.operateWrite((w) async {
for (var n = 0; n < 4; n++) { for (var n = 0; n < 4; n++) {
print('$n '); print('$n ');
final success = await w.tryInsertItem(n, dataset[n]); final success = await w.tryInsert(n, dataset[n]);
expect(success, isTrue); expect(success, isTrue);
} }
}); });
@ -98,8 +98,8 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
{ {
final res = await arr.operateWrite((w) async { final res = await arr.operateWrite((w) async {
print('8-${dataset.length ~/ 2}'); print('8-${dataset.length ~/ 2}');
final success = await w.tryInsertItems( final success =
8, dataset.sublist(8, dataset.length ~/ 2)); await w.tryInsertAll(8, dataset.sublist(8, dataset.length ~/ 2));
expect(success, isTrue); expect(success, isTrue);
}); });
expect(res, isNull); expect(res, isNull);
@ -107,12 +107,12 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
//print('get all\n'); //print('get all\n');
{ {
final dataset2 = await arr.operate((r) async => r.getItemRange(0)); final dataset2 = await arr.operate((r) async => r.getRange(0));
expect(dataset2, equals(dataset)); expect(dataset2, equals(dataset));
} }
{ {
final dataset3 = final dataset3 =
await arr.operate((r) async => r.getItemRange(64, length: 128)); await arr.operate((r) async => r.getRange(64, length: 128));
expect(dataset3, equals(dataset.sublist(64, 64 + 128))); expect(dataset3, equals(dataset.sublist(64, 64 + 128)));
} }
@ -126,7 +126,7 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
//print('get all\n'); //print('get all\n');
{ {
final dataset4 = await arr.operate((r) async => r.getItemRange(0)); final dataset4 = await arr.operate((r) async => r.getRange(0));
expect(dataset4, isEmpty); expect(dataset4, isEmpty);
} }

View file

@ -62,13 +62,24 @@ message DHTShortArray {
// calculated through iteration // calculated through iteration
} }
// Reference to data on the DHT
message DHTDataReference {
veilid.TypedKey dht_data = 1;
veilid.TypedKey hash = 2;
}
// Reference to data on the BlockStore
message BlockStoreDataReference {
veilid.TypedKey block = 1;
}
// DataReference // DataReference
// Pointer to data somewhere in Veilid // Pointer to data somewhere in Veilid
// Abstraction over DHTData and BlockStore // Abstraction over DHTData and BlockStore
message DataReference { message DataReference {
oneof kind { oneof kind {
veilid.TypedKey dht_data = 1; DHTDataReference dht_data = 1;
// TypedKey block = 2; BlockStoreDataReference block_store_data = 2;
} }
} }

View file

@ -9,7 +9,7 @@ import 'package:meta/meta.dart';
import '../../../veilid_support.dart'; import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto; import '../../proto/proto.dart' as proto;
import '../interfaces/dht_append.dart'; import '../interfaces/dht_add.dart';
part 'dht_log_spine.dart'; part 'dht_log_spine.dart';
part 'dht_log_read.dart'; part 'dht_log_read.dart';

View file

@ -92,7 +92,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
Future<void> _refreshInner(void Function(AsyncValue<DHTLogStateData<T>>) emit, Future<void> _refreshInner(void Function(AsyncValue<DHTLogStateData<T>>) emit,
{bool forceRefresh = false}) async { {bool forceRefresh = false}) async {
final avElements = await _loadElements(_tail, _count); final avElements = await loadElements(_tail, _count);
final err = avElements.asError; final err = avElements.asError;
if (err != null) { if (err != null) {
emit(AsyncValue.error(err.error, err.stackTrace)); emit(AsyncValue.error(err.error, err.stackTrace));
@ -108,9 +108,10 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
elements: elements, tail: _tail, count: _count, follow: _follow))); elements: elements, tail: _tail, count: _count, follow: _follow)));
} }
Future<AsyncValue<IList<DHTLogElementState<T>>>> _loadElements( Future<AsyncValue<IList<DHTLogElementState<T>>>> loadElements(
int tail, int count, int tail, int count,
{bool forceRefresh = false}) async { {bool forceRefresh = false}) async {
await _initWait();
try { try {
final allItems = await _log.operate((reader) async { final allItems = await _log.operate((reader) async {
final length = reader.length; final length = reader.length;
@ -118,7 +119,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
final start = (count < end) ? end - count : 0; final start = (count < end) ? end - count : 0;
final offlinePositions = await reader.getOfflinePositions(); final offlinePositions = await reader.getOfflinePositions();
final allItems = (await reader.getItemRange(start, final allItems = (await reader.getRange(start,
length: end - start, forceRefresh: forceRefresh)) length: end - start, forceRefresh: forceRefresh))
?.indexed ?.indexed
.map((x) => DHTLogElementState( .map((x) => DHTLogElementState(

View file

@ -12,7 +12,7 @@ class _DHTLogRead implements DHTLogReadOperations {
int get length => _spine.length; int get length => _spine.length;
@override @override
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async { Future<Uint8List?> get(int pos, {bool forceRefresh = false}) async {
if (pos < 0 || pos >= length) { if (pos < 0 || pos >= length) {
throw IndexError.withLength(pos, length); throw IndexError.withLength(pos, length);
} }
@ -21,8 +21,8 @@ class _DHTLogRead implements DHTLogReadOperations {
return null; return null;
} }
return lookup.scope((sa) => sa.operate( return lookup.scope((sa) =>
(read) => read.getItem(lookup.pos, forceRefresh: forceRefresh))); sa.operate((read) => read.get(lookup.pos, forceRefresh: forceRefresh)));
} }
(int, int) _clampStartLen(int start, int? len) { (int, int) _clampStartLen(int start, int? len) {
@ -40,14 +40,14 @@ class _DHTLogRead implements DHTLogReadOperations {
} }
@override @override
Future<List<Uint8List>?> getItemRange(int start, Future<List<Uint8List>?> getRange(int start,
{int? length, bool forceRefresh = false}) async { {int? length, bool forceRefresh = false}) async {
final out = <Uint8List>[]; final out = <Uint8List>[];
(start, length) = _clampStartLen(start, length); (start, length) = _clampStartLen(start, length);
final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map( final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map(
(chunk) => chunk (chunk) =>
.map((pos) => getItem(pos + start, forceRefresh: forceRefresh))); chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh)));
for (final chunk in chunks) { for (final chunk in chunks) {
final elems = await chunk.wait; final elems = await chunk.wait;

View file

@ -3,16 +3,15 @@ part of 'dht_log.dart';
class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> {
_DHTLogPosition._({ _DHTLogPosition._({
required _DHTLogSpine dhtLogSpine, required _DHTLogSpine dhtLogSpine,
required DHTShortArray shortArray, required this.shortArray,
required this.pos, required this.pos,
required int segmentNumber, required int segmentNumber,
}) : _segmentShortArray = shortArray, }) : _dhtLogSpine = dhtLogSpine,
_dhtLogSpine = dhtLogSpine,
_segmentNumber = segmentNumber; _segmentNumber = segmentNumber;
final int pos; final int pos;
final _DHTLogSpine _dhtLogSpine; final _DHTLogSpine _dhtLogSpine;
final DHTShortArray _segmentShortArray; final DHTShortArray shortArray;
var _openCount = 1; var _openCount = 1;
final int _segmentNumber; final int _segmentNumber;
final Mutex _mutex = Mutex(); final Mutex _mutex = Mutex();
@ -23,7 +22,7 @@ class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> {
/// The type of the openable scope /// The type of the openable scope
@override @override
FutureOr<DHTShortArray> scoped() => _segmentShortArray; FutureOr<DHTShortArray> scoped() => shortArray;
/// Add a reference to this log /// Add a reference to this log
@override @override

View file

@ -17,7 +17,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
} }
final lookup = await _spine.lookupPosition(pos); final lookup = await _spine.lookupPosition(pos);
if (lookup == null) { if (lookup == null) {
throw StateError("can't write to dht log"); throw StateError("can't lookup position in write to dht log");
} }
// Write item to the segment // Write item to the segment
@ -26,7 +26,47 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
} }
@override @override
Future<bool> tryAddItem(Uint8List value) async { Future<void> swap(int aPos, int bPos) async {
if (aPos < 0 || aPos >= _spine.length) {
throw IndexError.withLength(aPos, _spine.length);
}
if (bPos < 0 || bPos >= _spine.length) {
throw IndexError.withLength(bPos, _spine.length);
}
final aLookup = await _spine.lookupPosition(aPos);
if (aLookup == null) {
throw StateError("can't lookup position a in swap of dht log");
}
final bLookup = await _spine.lookupPosition(bPos);
if (bLookup == null) {
throw StateError("can't lookup position b in swap of dht log");
}
// Swap items in the segments
if (aLookup.shortArray == bLookup.shortArray) {
await aLookup.scope((sa) => sa.operateWriteEventual((aWrite) async {
await aWrite.swap(aLookup.pos, bLookup.pos);
return true;
}));
} else {
final bItem = Output<Uint8List>();
await aLookup.scope(
(sa) => bLookup.scope((sb) => sa.operateWriteEventual((aWrite) async {
if (bItem.value == null) {
final aItem = await aWrite.get(aLookup.pos);
if (aItem == null) {
throw StateError("can't get item for position a in swap");
}
await sb.operateWriteEventual((bWrite) async =>
bWrite.tryWriteItem(bLookup.pos, aItem, output: bItem));
}
return aWrite.tryWriteItem(aLookup.pos, bItem.value!);
})));
}
}
@override
Future<bool> tryAdd(Uint8List value) async {
// Allocate empty index at the end of the list // Allocate empty index at the end of the list
final insertPos = _spine.length; final insertPos = _spine.length;
_spine.allocateTail(1); _spine.allocateTail(1);
@ -44,12 +84,12 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
// We should always be appending at the length // We should always be appending at the length
throw StateError('appending should be at the end'); throw StateError('appending should be at the end');
} }
return write.tryAddItem(value); return write.tryAdd(value);
})); }));
} }
@override @override
Future<bool> tryAddItems(List<Uint8List> values) async { Future<bool> tryAddAll(List<Uint8List> values) async {
// Allocate empty index at the end of the list // Allocate empty index at the end of the list
final insertPos = _spine.length; final insertPos = _spine.length;
_spine.allocateTail(values.length); _spine.allocateTail(values.length);
@ -79,7 +119,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
// We should always be appending at the length // We should always be appending at the length
throw StateError('appending should be at the end'); throw StateError('appending should be at the end');
} }
return write.tryAddItems(sublistValues); return write.tryAddAll(sublistValues);
})); }));
if (!ok) { if (!ok) {
success = false; success = false;

View file

@ -54,13 +54,12 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
try { try {
final newState = await _shortArray.operate((reader) async { final newState = await _shortArray.operate((reader) async {
final offlinePositions = await reader.getOfflinePositions(); final offlinePositions = await reader.getOfflinePositions();
final allItems = final allItems = (await reader.getRange(0, forceRefresh: forceRefresh))
(await reader.getItemRange(0, forceRefresh: forceRefresh)) ?.indexed
?.indexed .map((x) => DHTShortArrayElementState(
.map((x) => DHTShortArrayElementState( value: _decodeElement(x.$2),
value: _decodeElement(x.$2), isOffline: offlinePositions.contains(x.$1)))
isOffline: offlinePositions.contains(x.$1))) .toIList();
.toIList();
return allItems; return allItems;
}); });
if (newState != null) { if (newState != null) {

View file

@ -12,7 +12,7 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
int get length => _head.length; int get length => _head.length;
@override @override
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async { Future<Uint8List?> get(int pos, {bool forceRefresh = false}) async {
if (pos < 0 || pos >= length) { if (pos < 0 || pos >= length) {
throw IndexError.withLength(pos, length); throw IndexError.withLength(pos, length);
} }
@ -49,14 +49,14 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
} }
@override @override
Future<List<Uint8List>?> getItemRange(int start, Future<List<Uint8List>?> getRange(int start,
{int? length, bool forceRefresh = false}) async { {int? length, bool forceRefresh = false}) async {
final out = <Uint8List>[]; final out = <Uint8List>[];
(start, length) = _clampStartLen(start, length); (start, length) = _clampStartLen(start, length);
final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map( final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map(
(chunk) => chunk (chunk) =>
.map((pos) => getItem(pos + start, forceRefresh: forceRefresh))); chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh)));
for (final chunk in chunks) { for (final chunk in chunks) {
final elems = await chunk.wait; final elems = await chunk.wait;

View file

@ -16,15 +16,14 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
_DHTShortArrayWrite._(super.head) : super._(); _DHTShortArrayWrite._(super.head) : super._();
@override @override
Future<bool> tryAddItem(Uint8List value) => Future<bool> tryAdd(Uint8List value) => tryInsert(_head.length, value);
tryInsertItem(_head.length, value);
@override @override
Future<bool> tryAddItems(List<Uint8List> values) => Future<bool> tryAddAll(List<Uint8List> values) =>
tryInsertItems(_head.length, values); tryInsertAll(_head.length, values);
@override @override
Future<bool> tryInsertItem(int pos, Uint8List value) async { Future<bool> tryInsert(int pos, Uint8List value) async {
if (pos < 0 || pos > _head.length) { if (pos < 0 || pos > _head.length) {
throw IndexError.withLength(pos, _head.length); throw IndexError.withLength(pos, _head.length);
} }
@ -44,7 +43,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
} }
@override @override
Future<bool> tryInsertItems(int pos, List<Uint8List> values) async { Future<bool> tryInsertAll(int pos, List<Uint8List> values) async {
if (pos < 0 || pos > _head.length) { if (pos < 0 || pos > _head.length) {
throw IndexError.withLength(pos, _head.length); throw IndexError.withLength(pos, _head.length);
} }
@ -100,7 +99,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
} }
@override @override
Future<void> swapItem(int aPos, int bPos) async { Future<void> swap(int aPos, int bPos) async {
if (aPos < 0 || aPos >= _head.length) { if (aPos < 0 || aPos >= _head.length) {
throw IndexError.withLength(aPos, _head.length); throw IndexError.withLength(aPos, _head.length);
} }
@ -112,7 +111,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
} }
@override @override
Future<void> removeItem(int pos, {Output<Uint8List>? output}) async { Future<void> remove(int pos, {Output<Uint8List>? output}) async {
if (pos < 0 || pos >= _head.length) { if (pos < 0 || pos >= _head.length) {
throw IndexError.withLength(pos, _head.length); throw IndexError.withLength(pos, _head.length);
} }

View file

@ -12,30 +12,30 @@ abstract class DHTAdd {
/// changed before the element could be added or a newer value was found on /// changed before the element could be added or a newer value was found on
/// the network. /// the network.
/// Throws a StateError if the container exceeds its maximum size. /// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryAddItem(Uint8List value); Future<bool> tryAdd(Uint8List value);
/// Try to add a list of items to the DHT container. /// Try to add a list of items to the DHT container.
/// Return true if the elements were successfully added, and false if the /// Return true if the elements were successfully added, and false if the
/// state changed before the element could be added or a newer value was found /// state changed before the element could be added or a newer value was found
/// on the network. /// on the network.
/// Throws a StateError if the container exceeds its maximum size. /// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryAddItems(List<Uint8List> values); Future<bool> tryAddAll(List<Uint8List> values);
} }
extension DHTAddExt on DHTAdd { extension DHTAddExt on DHTAdd {
/// Convenience function: /// Convenience function:
/// Like tryAddItem but also encodes the input value as JSON and parses the /// Like tryAddItem but also encodes the input value as JSON and parses the
/// returned element as JSON /// returned element as JSON
Future<bool> tryAppendItemJson<T>( Future<bool> tryAddJson<T>(
T newValue, T newValue,
) => ) =>
tryAddItem(jsonEncodeBytes(newValue)); tryAdd(jsonEncodeBytes(newValue));
/// Convenience function: /// Convenience function:
/// Like tryAddItem but also encodes the input value as a protobuf object /// Like tryAddItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object /// and parses the returned element as a protobuf object
Future<bool> tryAddItemProtobuf<T extends GeneratedMessage>( Future<bool> tryAddProtobuf<T extends GeneratedMessage>(
T newValue, T newValue,
) => ) =>
tryAddItem(newValue.writeToBuffer()); tryAdd(newValue.writeToBuffer());
} }

View file

@ -14,7 +14,7 @@ abstract class DHTInsertRemove {
/// Throws an IndexError if the position removed exceeds the length of /// Throws an IndexError if the position removed exceeds the length of
/// the container. /// the container.
/// Throws a StateError if the container exceeds its maximum size. /// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryInsertItem(int pos, Uint8List value); Future<bool> tryInsert(int pos, Uint8List value);
/// Try to insert items at position 'pos' of the DHT container. /// Try to insert items at position 'pos' of the DHT container.
/// Return true if the elements were successfully inserted, and false if the /// Return true if the elements were successfully inserted, and false if the
@ -23,38 +23,33 @@ abstract class DHTInsertRemove {
/// Throws an IndexError if the position removed exceeds the length of /// Throws an IndexError if the position removed exceeds the length of
/// the container. /// the container.
/// Throws a StateError if the container exceeds its maximum size. /// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryInsertItems(int pos, List<Uint8List> values); Future<bool> tryInsertAll(int pos, List<Uint8List> values);
/// Swap items at position 'aPos' and 'bPos' in the DHTArray.
/// Throws an IndexError if either of the positions swapped exceeds the length
/// of the container
Future<void> swapItem(int aPos, int bPos);
/// Remove an item at position 'pos' in the DHT container. /// Remove an item at position 'pos' in the DHT container.
/// If the remove was successful this returns: /// If the remove was successful this returns:
/// * outValue will return the prior contents of the element /// * outValue will return the prior contents of the element
/// Throws an IndexError if the position removed exceeds the length of /// Throws an IndexError if the position removed exceeds the length of
/// the container. /// the container.
Future<void> removeItem(int pos, {Output<Uint8List>? output}); Future<void> remove(int pos, {Output<Uint8List>? output});
} }
extension DHTInsertRemoveExt on DHTInsertRemove { extension DHTInsertRemoveExt on DHTInsertRemove {
/// Convenience function: /// Convenience function:
/// Like removeItem but also parses the returned element as JSON /// Like remove but also parses the returned element as JSON
Future<void> removeItemJson<T>(T Function(dynamic) fromJson, int pos, Future<void> removeJson<T>(T Function(dynamic) fromJson, int pos,
{Output<T>? output}) async { {Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>(); final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes); await remove(pos, output: outValueBytes);
output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b)); output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b));
} }
/// Convenience function: /// Convenience function:
/// Like removeItem but also parses the returned element as JSON /// Like remove but also parses the returned element as JSON
Future<void> removeItemProtobuf<T extends GeneratedMessage>( Future<void> removeProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos, T Function(List<int>) fromBuffer, int pos,
{Output<T>? output}) async { {Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>(); final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes); await remove(pos, output: outValueBytes);
output.mapSave(outValueBytes, fromBuffer); output.mapSave(outValueBytes, fromBuffer);
} }
} }

View file

@ -15,14 +15,14 @@ abstract class DHTRandomRead {
/// rather than returning the existing locally stored copy of the elements. /// rather than returning the existing locally stored copy of the elements.
/// Throws an IndexError if the 'pos' is not within the length /// Throws an IndexError if the 'pos' is not within the length
/// of the container. /// of the container.
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}); Future<Uint8List?> get(int pos, {bool forceRefresh = false});
/// Return a list of a range of items in the DHTArray. If 'forceRefresh' /// Return a list of a range of items in the DHTArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values /// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements. /// rather than returning the existing locally stored copy of the elements.
/// Throws an IndexError if either 'start' or '(start+length)' is not within /// Throws an IndexError if either 'start' or '(start+length)' is not within
/// the length of the container. /// the length of the container.
Future<List<Uint8List>?> getItemRange(int start, Future<List<Uint8List>?> getRange(int start,
{int? length, bool forceRefresh = false}); {int? length, bool forceRefresh = false});
/// Get a list of the positions that were written offline and not flushed yet /// Get a list of the positions that were written offline and not flushed yet
@ -31,32 +31,32 @@ abstract class DHTRandomRead {
extension DHTRandomReadExt on DHTRandomRead { extension DHTRandomReadExt on DHTRandomRead {
/// Convenience function: /// Convenience function:
/// Like getItem but also parses the returned element as JSON /// Like get but also parses the returned element as JSON
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos, Future<T?> getJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) => {bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh) get(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out)); .then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function: /// Convenience function:
/// Like getAllItems but also parses the returned elements as JSON /// Like getRange but also parses the returned elements as JSON
Future<List<T>?> getItemRangeJson<T>(T Function(dynamic) fromJson, int start, Future<List<T>?> getRangeJson<T>(T Function(dynamic) fromJson, int start,
{int? length, bool forceRefresh = false}) => {int? length, bool forceRefresh = false}) =>
getItemRange(start, length: length, forceRefresh: forceRefresh) getRange(start, length: length, forceRefresh: forceRefresh)
.then((out) => out?.map(fromJson).toList()); .then((out) => out?.map(fromJson).toList());
/// Convenience function: /// Convenience function:
/// Like getItem but also parses the returned element as a protobuf object /// Like get but also parses the returned element as a protobuf object
Future<T?> getItemProtobuf<T extends GeneratedMessage>( Future<T?> getProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos, T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) => {bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh) get(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out)); .then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function: /// Convenience function:
/// Like getAllItems but also parses the returned elements as protobuf objects /// Like getRange but also parses the returned elements as protobuf objects
Future<List<T>?> getItemRangeProtobuf<T extends GeneratedMessage>( Future<List<T>?> getRangeProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int start, T Function(List<int>) fromBuffer, int start,
{int? length, bool forceRefresh = false}) => {int? length, bool forceRefresh = false}) =>
getItemRange(start, length: length, forceRefresh: forceRefresh) getRange(start, length: length, forceRefresh: forceRefresh)
.then((out) => out?.map(fromBuffer).toList()); .then((out) => out?.map(fromBuffer).toList());
} }

View file

@ -23,6 +23,11 @@ abstract class DHTRandomWrite {
/// of the container. /// of the container.
Future<bool> tryWriteItem(int pos, Uint8List newValue, Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output}); {Output<Uint8List>? output});
/// Swap items at position 'aPos' and 'bPos' in the DHTArray.
/// Throws an IndexError if either of the positions swapped exceeds the length
/// of the container
Future<void> swap(int aPos, int bPos);
} }
extension DHTRandomWriteExt on DHTRandomWrite { extension DHTRandomWriteExt on DHTRandomWrite {

View file

@ -1,4 +1,4 @@
export 'dht_append.dart'; export 'dht_add.dart';
export 'dht_clear.dart'; export 'dht_clear.dart';
export 'dht_closeable.dart'; export 'dht_closeable.dart';
export 'dht_insert_remove.dart'; export 'dht_insert_remove.dart';

View file

@ -6,6 +6,7 @@ import 'package:async_tools/async_tools.dart';
import 'package:charcode/charcode.dart'; import 'package:charcode/charcode.dart';
import 'package:equatable/equatable.dart'; import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart';
import '../veilid_support.dart'; import '../veilid_support.dart';
@ -128,13 +129,13 @@ class TableDBArray {
}); });
} }
Future<List<Uint8List>> getRange(int start, int end) async { Future<List<Uint8List>> getRange(int start, [int? end]) async {
await _initWait(); await _initWait();
return _mutex.protect(() async { return _mutex.protect(() async {
if (!_open) { if (!_open) {
throw StateError('not open'); throw StateError('not open');
} }
return _getRangeInner(start, end); return _getRangeInner(start, end ?? _length);
}); });
} }
@ -629,3 +630,36 @@ class TableDBArray {
final StreamController<TableDBArrayUpdate> _changeStream = final StreamController<TableDBArrayUpdate> _changeStream =
StreamController.broadcast(); StreamController.broadcast();
} }
extension TableDBArrayExt on TableDBArray {
/// Convenience function:
/// Like get but also parses the returned element as JSON
Future<T?> getJson<T>(
T Function(dynamic) fromJson,
int pos,
) =>
get(
pos,
).then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like getRange but also parses the returned elements as JSON
Future<List<T>?> getRangeJson<T>(T Function(dynamic) fromJson, int start,
[int? end]) =>
getRange(start, end ?? _length).then((out) => out.map(fromJson).toList());
/// Convenience function:
/// Like get but also parses the returned element as a protobuf object
Future<T?> getProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
) =>
get(pos).then(fromBuffer);
/// Convenience function:
/// Like getRange but also parses the returned elements as protobuf objects
Future<List<T>?> getRangeProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int start, [int? end]) =>
getRange(start, end ?? _length)
.then((out) => out.map(fromBuffer).toList());
}

View file

@ -13,16 +13,16 @@ abstract class VeilidCrypto {
class VeilidCryptoPrivate implements VeilidCrypto { class VeilidCryptoPrivate implements VeilidCrypto {
VeilidCryptoPrivate._(VeilidCryptoSystem cryptoSystem, SharedSecret secretKey) VeilidCryptoPrivate._(VeilidCryptoSystem cryptoSystem, SharedSecret secretKey)
: _cryptoSystem = cryptoSystem, : _cryptoSystem = cryptoSystem,
_secretKey = secretKey; _secret = secretKey;
final VeilidCryptoSystem _cryptoSystem; final VeilidCryptoSystem _cryptoSystem;
final SharedSecret _secretKey; final SharedSecret _secret;
static Future<VeilidCryptoPrivate> fromTypedKey( static Future<VeilidCryptoPrivate> fromTypedKey(
TypedKey typedKey, String domain) async { TypedKey typedSecret, String domain) async {
final cryptoSystem = await Veilid.instance.getCryptoSystem(typedKey.kind); final cryptoSystem =
final keyMaterial = Uint8List(0) await Veilid.instance.getCryptoSystem(typedSecret.kind);
..addAll(typedKey.value.decode()) final keyMaterial = Uint8List.fromList(
..addAll(utf8.encode(domain)); [...typedSecret.value.decode(), ...utf8.encode(domain)]);
final secretKey = await cryptoSystem.generateHash(keyMaterial); final secretKey = await cryptoSystem.generateHash(keyMaterial);
return VeilidCryptoPrivate._(cryptoSystem, secretKey); return VeilidCryptoPrivate._(cryptoSystem, secretKey);
} }
@ -35,18 +35,18 @@ class VeilidCryptoPrivate implements VeilidCrypto {
} }
static Future<VeilidCryptoPrivate> fromSharedSecret( static Future<VeilidCryptoPrivate> fromSharedSecret(
CryptoKind kind, SharedSecret secretKey) async { CryptoKind kind, SharedSecret sharedSecret) async {
final cryptoSystem = await Veilid.instance.getCryptoSystem(kind); final cryptoSystem = await Veilid.instance.getCryptoSystem(kind);
return VeilidCryptoPrivate._(cryptoSystem, secretKey); return VeilidCryptoPrivate._(cryptoSystem, sharedSecret);
} }
@override @override
Future<Uint8List> encrypt(Uint8List data) => Future<Uint8List> encrypt(Uint8List data) =>
_cryptoSystem.encryptNoAuthWithNonce(data, _secretKey); _cryptoSystem.encryptNoAuthWithNonce(data, _secret);
@override @override
Future<Uint8List> decrypt(Uint8List data) => Future<Uint8List> decrypt(Uint8List data) =>
_cryptoSystem.decryptNoAuthWithNonce(data, _secretKey); _cryptoSystem.decryptNoAuthWithNonce(data, _secret);
} }
//////////////////////////////////// ////////////////////////////////////