This commit is contained in:
Christien Rioux 2024-03-24 12:13:27 -04:00
parent 64d4d0cefb
commit 41bb198d92
40 changed files with 1623 additions and 1272 deletions

View File

@ -1,3 +1,5 @@
import 'dart:convert';
import 'package:meta/meta.dart';
import 'package:veilid_support/veilid_support.dart';
@ -22,6 +24,20 @@ class ActiveAccountInfo {
return KeyPair(key: identityKey, secret: identitySecret.value);
}
Future<DHTRecordCrypto> makeConversationCrypto(
TypedKey remoteIdentityPublicKey) async {
final identitySecret = userLogin.identitySecret;
final cs = await Veilid.instance.getCryptoSystem(identitySecret.kind);
final sharedSecret = await cs.generateSharedSecret(
remoteIdentityPublicKey.value,
identitySecret.value,
utf8.encode('VeilidChat Conversation'));
final messagesCrypto = await DHTRecordCryptoPrivate.fromSecret(
identitySecret.kind, sharedSecret);
return messagesCrypto;
}
//
final LocalAccount localAccount;
final UserLogin userLogin;

View File

@ -218,18 +218,18 @@ class AccountRepository {
// Make empty contact list
log.debug('Creating contacts list');
final contactList = await (await DHTShortArray.create(parent: parent))
.scope((r) async => r.record.ownedDHTRecordPointer);
.scope((r) async => r.recordPointer);
// Make empty contact invitation record list
log.debug('Creating contact invitation records list');
final contactInvitationRecords =
await (await DHTShortArray.create(parent: parent))
.scope((r) async => r.record.ownedDHTRecordPointer);
.scope((r) async => r.recordPointer);
// Make empty chat record list
log.debug('Creating chat records list');
final chatRecords = await (await DHTShortArray.create(parent: parent))
.scope((r) async => r.record.ownedDHTRecordPointer);
.scope((r) async => r.recordPointer);
// Make account object
final account = proto.Account()

View File

@ -1,2 +1,2 @@
export 'active_chat_cubit.dart';
export 'messages_cubit.dart';
export 'single_contact_messages_cubit.dart';

View File

@ -1,225 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../proto/proto.dart' as proto;
class _MessageQueueEntry {
_MessageQueueEntry({required this.remoteMessages});
IList<proto.Message> remoteMessages;
}
typedef MessagesState = AsyncValue<IList<proto.Message>>;
class MessagesCubit extends Cubit<MessagesState> {
MessagesCubit(
{required ActiveAccountInfo activeAccountInfo,
required TypedKey remoteIdentityPublicKey,
required TypedKey localConversationRecordKey,
required TypedKey localMessagesRecordKey,
required TypedKey remoteConversationRecordKey,
required TypedKey remoteMessagesRecordKey})
: _activeAccountInfo = activeAccountInfo,
_remoteIdentityPublicKey = remoteIdentityPublicKey,
_remoteMessagesQueue = StreamController(),
super(const AsyncValue.loading()) {
// Local messages key
Future.delayed(
Duration.zero,
() async => _initLocalMessages(
localConversationRecordKey, localMessagesRecordKey));
// Remote messages key
Future.delayed(
Duration.zero,
() async => _initRemoteMessages(
remoteConversationRecordKey, remoteMessagesRecordKey));
// Remote messages listener
Future.delayed(Duration.zero, () async {
await for (final entry in _remoteMessagesQueue.stream) {
await _updateRemoteMessagesStateAsync(entry);
}
});
}
@override
Future<void> close() async {
await _remoteMessagesQueue.close();
await _localSubscription?.cancel();
await _remoteSubscription?.cancel();
await _localMessagesCubit?.close();
await _remoteMessagesCubit?.close();
await super.close();
}
// Open local messages key
Future<void> _initLocalMessages(TypedKey localConversationRecordKey,
TypedKey localMessagesRecordKey) async {
final crypto = await _getMessagesCrypto();
final writer = _activeAccountInfo.conversationWriter;
_localMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openWrite(
localMessagesRecordKey, writer,
parent: localConversationRecordKey, crypto: crypto),
decodeElement: proto.Message.fromBuffer);
_localSubscription =
_localMessagesCubit!.stream.listen(_updateLocalMessagesState);
_updateLocalMessagesState(_localMessagesCubit!.state);
}
// Open remote messages key
Future<void> _initRemoteMessages(TypedKey remoteConversationRecordKey,
TypedKey remoteMessagesRecordKey) async {
// Open remote record key if it is specified
final crypto = await _getMessagesCrypto();
_remoteMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openRead(remoteMessagesRecordKey,
parent: remoteConversationRecordKey, crypto: crypto),
decodeElement: proto.Message.fromBuffer);
_remoteSubscription =
_remoteMessagesCubit!.stream.listen(_updateRemoteMessagesState);
_updateRemoteMessagesState(_remoteMessagesCubit!.state);
}
// Called when the local messages list gets a change
void _updateLocalMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
// When local messages are updated, pass this
// directly to the messages cubit state
emit(avmessages.state);
}
// Called when the remote messages list gets a change
void _updateRemoteMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
final remoteMessages = avmessages.state.data?.value;
if (remoteMessages == null) {
return;
}
// Add remote messages updates to queue to process asynchronously
_remoteMessagesQueue
.add(_MessageQueueEntry(remoteMessages: remoteMessages));
}
Future<void> _updateRemoteMessagesStateAsync(_MessageQueueEntry entry) async {
final localMessagesCubit = _localMessagesCubit!;
// Updated remote messages need to be merged with the local messages state
await localMessagesCubit.operate((shortArray) async {
// Ensure remoteMessages is sorted by timestamp
final remoteMessages = entry.remoteMessages
.sort((a, b) => a.timestamp.compareTo(b.timestamp));
// dedup? build local timestamp set?
// Existing messages will always be sorted by timestamp so merging is easy
var localMessages = localMessagesCubit.state.state.data!.value;
var pos = 0;
for (final newMessage in remoteMessages) {
var skip = false;
while (pos < localMessages.length) {
final m = localMessages[pos];
pos++;
// If timestamp to insert is less than
// the current position, insert it here
final newTs = Timestamp.fromInt64(newMessage.timestamp);
final curTs = Timestamp.fromInt64(m.timestamp);
final cmp = newTs.compareTo(curTs);
if (cmp < 0) {
break;
} else if (cmp == 0) {
skip = true;
break;
}
}
// Insert at this position
if (!skip) {
// Insert into dht backing array
await shortArray.tryInsertItem(pos, newMessage.writeToBuffer());
// Insert into local copy as well for this operation
localMessages = localMessages.insert(pos, newMessage);
}
}
});
}
// Initialize local messages
static Future<T> initLocalMessages<T>({
required ActiveAccountInfo activeAccountInfo,
required TypedKey remoteIdentityPublicKey,
required TypedKey localConversationKey,
required FutureOr<T> Function(DHTShortArray) callback,
}) async {
final crypto =
await _makeMessagesCrypto(activeAccountInfo, remoteIdentityPublicKey);
final writer = activeAccountInfo.conversationWriter;
return (await DHTShortArray.create(
parent: localConversationKey, crypto: crypto, smplWriter: writer))
.deleteScope((messages) async => await callback(messages));
}
// Force refresh of messages
Future<void> refresh() async {
final lcc = _localMessagesCubit;
final rcc = _remoteMessagesCubit;
if (lcc != null) {
await lcc.refresh();
}
if (rcc != null) {
await rcc.refresh();
}
}
Future<void> addMessage({required proto.Message message}) async {
await _localMessagesCubit!.operate(
(shortArray) => shortArray.tryAddItem(message.writeToBuffer()));
}
Future<DHTRecordCrypto> _getMessagesCrypto() async {
var messagesCrypto = _messagesCrypto;
if (messagesCrypto != null) {
return messagesCrypto;
}
messagesCrypto =
await _makeMessagesCrypto(_activeAccountInfo, _remoteIdentityPublicKey);
_messagesCrypto = messagesCrypto;
return messagesCrypto;
}
static Future<DHTRecordCrypto> _makeMessagesCrypto(
ActiveAccountInfo activeAccountInfo,
TypedKey remoteIdentityPublicKey) async {
final identitySecret = activeAccountInfo.userLogin.identitySecret;
final cs = await Veilid.instance.getCryptoSystem(identitySecret.kind);
final sharedSecret =
await cs.cachedDH(remoteIdentityPublicKey.value, identitySecret.value);
final messagesCrypto = await DHTRecordCryptoPrivate.fromSecret(
identitySecret.kind, sharedSecret);
return messagesCrypto;
}
final ActiveAccountInfo _activeAccountInfo;
final TypedKey _remoteIdentityPublicKey;
DHTShortArrayCubit<proto.Message>? _localMessagesCubit;
DHTShortArrayCubit<proto.Message>? _remoteMessagesCubit;
final StreamController<_MessageQueueEntry> _remoteMessagesQueue;
StreamSubscription<BlocBusyState<AsyncValue<IList<proto.Message>>>>?
_localSubscription;
StreamSubscription<BlocBusyState<AsyncValue<IList<proto.Message>>>>?
_remoteSubscription;
//
DHTRecordCrypto? _messagesCrypto;
}

View File

@ -0,0 +1,273 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../proto/proto.dart' as proto;
class _SingleContactMessageQueueEntry {
_SingleContactMessageQueueEntry({this.localMessages, this.remoteMessages});
IList<proto.Message>? localMessages;
IList<proto.Message>? remoteMessages;
}
typedef SingleContactMessagesState = AsyncValue<IList<proto.Message>>;
// Cubit that processes single-contact chats
// Builds the reconciled chat record from the local and remote conversation keys
class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
SingleContactMessagesCubit({
required ActiveAccountInfo activeAccountInfo,
required TypedKey remoteIdentityPublicKey,
required TypedKey localConversationRecordKey,
required TypedKey localMessagesRecordKey,
required TypedKey remoteConversationRecordKey,
required TypedKey remoteMessagesRecordKey,
required OwnedDHTRecordPointer reconciledChatRecord,
}) : _activeAccountInfo = activeAccountInfo,
_remoteIdentityPublicKey = remoteIdentityPublicKey,
_localConversationRecordKey = localConversationRecordKey,
_localMessagesRecordKey = localMessagesRecordKey,
_remoteConversationRecordKey = remoteConversationRecordKey,
_remoteMessagesRecordKey = remoteMessagesRecordKey,
_reconciledChatRecord = reconciledChatRecord,
_messagesUpdateQueue = StreamController(),
super(const AsyncValue.loading()) {
// Async Init
Future.delayed(Duration.zero, _init);
}
@override
Future<void> close() async {
await _messagesUpdateQueue.close();
await _localSubscription?.cancel();
await _remoteSubscription?.cancel();
await _reconciledChatSubscription?.cancel();
await _localMessagesCubit?.close();
await _remoteMessagesCubit?.close();
await _reconciledChatMessagesCubit?.close();
await super.close();
}
// Initialize everything
Future<void> _init() async {
// Make crypto
await _initMessagesCrypto();
// Reconciled messages key
await _initReconciledChatMessages();
// Local messages key
await _initLocalMessages();
// Remote messages key
await _initRemoteMessages();
// Messages listener
Future.delayed(Duration.zero, () async {
await for (final entry in _messagesUpdateQueue.stream) {
await _updateMessagesStateAsync(entry);
}
});
}
// Make crypto
Future<void> _initMessagesCrypto() async {
_messagesCrypto = await _activeAccountInfo
.makeConversationCrypto(_remoteIdentityPublicKey);
}
// Open local messages key
Future<void> _initLocalMessages() async {
final writer = _activeAccountInfo.conversationWriter;
_localMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openWrite(
_localMessagesRecordKey, writer,
parent: _localConversationRecordKey, crypto: _messagesCrypto),
decodeElement: proto.Message.fromBuffer);
_localSubscription =
_localMessagesCubit!.stream.listen(_updateLocalMessagesState);
_updateLocalMessagesState(_localMessagesCubit!.state);
}
// Open remote messages key
Future<void> _initRemoteMessages() async {
_remoteMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openRead(_remoteMessagesRecordKey,
parent: _remoteConversationRecordKey, crypto: _messagesCrypto),
decodeElement: proto.Message.fromBuffer);
_remoteSubscription =
_remoteMessagesCubit!.stream.listen(_updateRemoteMessagesState);
_updateRemoteMessagesState(_remoteMessagesCubit!.state);
}
// Open reconciled chat record key
Future<void> _initReconciledChatMessages() async {
final accountRecordKey =
_activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
_reconciledChatMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openOwned(_reconciledChatRecord,
parent: accountRecordKey),
decodeElement: proto.Message.fromBuffer);
_reconciledChatSubscription =
_reconciledChatMessagesCubit!.stream.listen(_updateReconciledChatState);
_updateReconciledChatState(_reconciledChatMessagesCubit!.state);
}
// Called when the local messages list gets a change
void _updateLocalMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
final localMessages = avmessages.state.data?.value;
if (localMessages == null) {
return;
}
// Add local messages updates to queue to process asynchronously
_messagesUpdateQueue
.add(_SingleContactMessageQueueEntry(localMessages: localMessages));
}
// Called when the remote messages list gets a change
void _updateRemoteMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
final remoteMessages = avmessages.state.data?.value;
if (remoteMessages == null) {
return;
}
// Add remote messages updates to queue to process asynchronously
_messagesUpdateQueue
.add(_SingleContactMessageQueueEntry(remoteMessages: remoteMessages));
}
// Called when the reconciled messages list gets a change
void _updateReconciledChatState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
// When reconciled messages are updated, pass this
// directly to the messages cubit state
emit(avmessages.state);
}
Future<void> _mergeMessagesInner(
{required DHTShortArray reconciledMessages,
required IList<proto.Message> messages}) async {
// Ensure remoteMessages is sorted by timestamp
final newMessages = messages
.sort((a, b) => a.timestamp.compareTo(b.timestamp))
.removeDuplicates();
// Existing messages will always be sorted by timestamp so merging is easy
final existingMessages =
_reconciledChatMessagesCubit!.state.state.data!.value.toList();
var ePos = 0;
var nPos = 0;
while (ePos < existingMessages.length && nPos < newMessages.length) {
final existingMessage = existingMessages[ePos];
final newMessage = newMessages[nPos];
// If timestamp to insert is less than
// the current position, insert it here
final newTs = Timestamp.fromInt64(newMessage.timestamp);
final existingTs = Timestamp.fromInt64(existingMessage.timestamp);
final cmp = newTs.compareTo(existingTs);
if (cmp < 0) {
// New message belongs here
// Insert into dht backing array
await reconciledMessages.tryInsertItem(
ePos, newMessage.writeToBuffer());
// Insert into local copy as well for this operation
existingMessages.insert(ePos, newMessage);
// Next message
nPos++;
ePos++;
} else if (cmp == 0) {
// Duplicate, skip
nPos++;
ePos++;
} else if (cmp > 0) {
// New message belongs later
ePos++;
}
}
// If there are any new messages left, append them all
while (nPos < newMessages.length) {
final newMessage = newMessages[nPos];
// Append to dht backing array
await reconciledMessages.tryAddItem(newMessage.writeToBuffer());
// Insert into local copy as well for this operation
existingMessages.add(newMessage);
nPos++;
}
}
Future<void> _updateMessagesStateAsync(
_SingleContactMessageQueueEntry entry) async {
final reconciledChatMessagesCubit = _reconciledChatMessagesCubit!;
// Merge remote and local messages into the reconciled chat log
await reconciledChatMessagesCubit.operate((reconciledMessages) async {
// xxx for now, keep two lists, but can probable simplify this out soon
if (entry.localMessages != null) {
await _mergeMessagesInner(
reconciledMessages: reconciledMessages,
messages: entry.localMessages!);
}
if (entry.remoteMessages != null) {
await _mergeMessagesInner(
reconciledMessages: reconciledMessages,
messages: entry.remoteMessages!);
}
});
}
// Force refresh of messages
Future<void> refresh() async {
final lcc = _localMessagesCubit;
final rcc = _remoteMessagesCubit;
if (lcc != null) {
await lcc.refresh();
}
if (rcc != null) {
await rcc.refresh();
}
}
Future<void> addMessage({required proto.Message message}) async {
await _localMessagesCubit!.operate(
(shortArray) => shortArray.tryAddItem(message.writeToBuffer()));
}
final ActiveAccountInfo _activeAccountInfo;
final TypedKey _remoteIdentityPublicKey;
final TypedKey _localConversationRecordKey;
final TypedKey _localMessagesRecordKey;
final TypedKey _remoteConversationRecordKey;
final TypedKey _remoteMessagesRecordKey;
final OwnedDHTRecordPointer _reconciledChatRecord;
late final DHTRecordCrypto _messagesCrypto;
DHTShortArrayCubit<proto.Message>? _localMessagesCubit;
DHTShortArrayCubit<proto.Message>? _remoteMessagesCubit;
DHTShortArrayCubit<proto.Message>? _reconciledChatMessagesCubit;
final StreamController<_SingleContactMessageQueueEntry> _messagesUpdateQueue;
StreamSubscription<BlocBusyState<AsyncValue<IList<proto.Message>>>>?
_localSubscription;
StreamSubscription<BlocBusyState<AsyncValue<IList<proto.Message>>>>?
_remoteSubscription;
StreamSubscription<BlocBusyState<AsyncValue<IList<proto.Message>>>>?
_reconciledChatSubscription;
}

View File

@ -19,8 +19,8 @@ import '../chat.dart';
class ChatComponent extends StatelessWidget {
const ChatComponent._(
{required TypedKey localUserIdentityKey,
required MessagesCubit messagesCubit,
required MessagesState messagesState,
required SingleContactMessagesCubit messagesCubit,
required SingleContactMessagesState messagesState,
required types.User localUser,
required types.User remoteUser,
super.key})
@ -31,8 +31,8 @@ class ChatComponent extends StatelessWidget {
_remoteUser = remoteUser;
final TypedKey _localUserIdentityKey;
final MessagesCubit _messagesCubit;
final MessagesState _messagesState;
final SingleContactMessagesCubit _messagesCubit;
final SingleContactMessagesState _messagesState;
final types.User _localUser;
final types.User _remoteUser;
@ -78,8 +78,8 @@ class ChatComponent extends StatelessWidget {
firstName: editedName);
// Get the messages cubit
final messages = context.select<ActiveConversationMessagesBlocMapCubit,
(MessagesCubit, MessagesState)?>(
final messages = context.select<ActiveSingleContactChatBlocMapCubit,
(SingleContactMessagesCubit, SingleContactMessagesState)?>(
(x) => x.tryOperate(remoteConversationRecordKey,
closure: (cubit) => (cubit, cubit.state)));

View File

@ -1,68 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../chat/chat.dart';
import '../../proto/proto.dart' as proto;
import 'active_conversations_bloc_map_cubit.dart';
// Map of remoteConversationRecordKey to MessagesCubit
// Wraps a MessagesCubit to stream the latest messages to the state
// Automatically follows the state of a ActiveConversationsBlocMapCubit.
class ActiveConversationMessagesBlocMapCubit extends BlocMapCubit<TypedKey,
AsyncValue<IList<proto.Message>>, MessagesCubit>
with
StateFollower<ActiveConversationsBlocMapState, TypedKey,
AsyncValue<ActiveConversationState>> {
ActiveConversationMessagesBlocMapCubit({
required ActiveAccountInfo activeAccountInfo,
}) : _activeAccountInfo = activeAccountInfo;
Future<void> _addConversationMessages(
{required proto.Contact contact,
required proto.Conversation localConversation,
required proto.Conversation remoteConversation}) async =>
add(() => MapEntry(
contact.remoteConversationRecordKey.toVeilid(),
MessagesCubit(
activeAccountInfo: _activeAccountInfo,
remoteIdentityPublicKey: contact.identityPublicKey.toVeilid(),
localConversationRecordKey:
contact.localConversationRecordKey.toVeilid(),
remoteConversationRecordKey:
contact.remoteConversationRecordKey.toVeilid(),
localMessagesRecordKey: localConversation.messages.toVeilid(),
remoteMessagesRecordKey:
remoteConversation.messages.toVeilid())));
/// StateFollower /////////////////////////
@override
IMap<TypedKey, AsyncValue<ActiveConversationState>> getStateMap(
ActiveConversationsBlocMapState state) =>
state;
@override
Future<void> removeFromState(TypedKey key) => remove(key);
@override
Future<void> updateState(
TypedKey key, AsyncValue<ActiveConversationState> value) async {
await value.when(
data: (state) => _addConversationMessages(
contact: state.contact,
localConversation: state.localConversation,
remoteConversation: state.remoteConversation),
loading: () => addState(key, const AsyncValue.loading()),
error: (error, stackTrace) =>
addState(key, AsyncValue.error(error, stackTrace)));
}
////
final ActiveAccountInfo _activeAccountInfo;
}

View File

@ -34,6 +34,9 @@ typedef ActiveConversationsBlocMapState
// Map of remoteConversationRecordKey to ActiveConversationCubit
// Wraps a conversation cubit to only expose completely built conversations
// Automatically follows the state of a ChatListCubit.
// Even though 'conversations' are per-contact and not per-chat
// We currently only build the cubits for the chats that are active, not
// archived chats or contacts that are not actively in a chat.
class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
AsyncValue<ActiveConversationState>, ActiveConversationCubit>
with
@ -82,7 +85,7 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
return IMap();
}
return IMap.fromIterable(stateValue,
keyMapper: (e) => e.remoteConversationKey.toVeilid(),
keyMapper: (e) => e.remoteConversationRecordKey.toVeilid(),
valueMapper: (e) => e);
}
@ -99,7 +102,7 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
final contactIndex = contactList
.indexWhere((c) => c.remoteConversationRecordKey.toVeilid() == key);
if (contactIndex == -1) {
await addState(key, AsyncValue.error('Contact not found for chat'));
await addState(key, AsyncValue.error('Contact not found'));
return;
}
final contact = contactList[contactIndex];

View File

@ -0,0 +1,108 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../chat/chat.dart';
import '../../contacts/contacts.dart';
import '../../proto/proto.dart' as proto;
import 'active_conversations_bloc_map_cubit.dart';
import 'chat_list_cubit.dart';
// Map of remoteConversationRecordKey to MessagesCubit
// Wraps a MessagesCubit to stream the latest messages to the state
// Automatically follows the state of a ActiveConversationsBlocMapCubit.
class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit<TypedKey,
AsyncValue<IList<proto.Message>>, SingleContactMessagesCubit>
with
StateFollower<ActiveConversationsBlocMapState, TypedKey,
AsyncValue<ActiveConversationState>> {
ActiveSingleContactChatBlocMapCubit(
{required ActiveAccountInfo activeAccountInfo,
required ContactListCubit contactListCubit,
required ChatListCubit chatListCubit})
: _activeAccountInfo = activeAccountInfo,
_contactListCubit = contactListCubit,
_chatListCubit = chatListCubit;
Future<void> _addConversationMessages(
{required proto.Contact contact,
required proto.Chat chat,
required proto.Conversation localConversation,
required proto.Conversation remoteConversation}) async =>
add(() => MapEntry(
contact.remoteConversationRecordKey.toVeilid(),
SingleContactMessagesCubit(
activeAccountInfo: _activeAccountInfo,
remoteIdentityPublicKey: contact.identityPublicKey.toVeilid(),
localConversationRecordKey:
contact.localConversationRecordKey.toVeilid(),
remoteConversationRecordKey:
contact.remoteConversationRecordKey.toVeilid(),
localMessagesRecordKey: localConversation.messages.toVeilid(),
remoteMessagesRecordKey: remoteConversation.messages.toVeilid(),
reconciledChatRecord: chat.reconciledChatRecord.toVeilid(),
)));
/// StateFollower /////////////////////////
@override
IMap<TypedKey, AsyncValue<ActiveConversationState>> getStateMap(
ActiveConversationsBlocMapState state) =>
state;
@override
Future<void> removeFromState(TypedKey key) => remove(key);
@override
Future<void> updateState(
TypedKey key, AsyncValue<ActiveConversationState> value) async {
// Get the contact object for this single contact chat
final contactList = _contactListCubit.state.state.data?.value;
if (contactList == null) {
await addState(key, const AsyncValue.loading());
return;
}
final contactIndex = contactList
.indexWhere((c) => c.remoteConversationRecordKey.toVeilid() == key);
if (contactIndex == -1) {
await addState(
key, AsyncValue.error('Contact not found for conversation'));
return;
}
final contact = contactList[contactIndex];
// Get the chat object for this single contact chat
final chatList = _chatListCubit.state.state.data?.value;
if (chatList == null) {
await addState(key, const AsyncValue.loading());
return;
}
final chatIndex = chatList
.indexWhere((c) => c.remoteConversationRecordKey.toVeilid() == key);
if (contactIndex == -1) {
await addState(key, AsyncValue.error('Chat not found for conversation'));
return;
}
final chat = chatList[chatIndex];
await value.when(
data: (state) => _addConversationMessages(
contact: contact,
chat: chat,
localConversation: state.localConversation,
remoteConversation: state.remoteConversation),
loading: () => addState(key, const AsyncValue.loading()),
error: (error, stackTrace) =>
addState(key, AsyncValue.error(error, stackTrace)));
}
////
final ActiveAccountInfo _activeAccountInfo;
final ContactListCubit _contactListCubit;
final ChatListCubit _chatListCubit;
}

View File

@ -5,6 +5,7 @@ import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../chat/chat.dart';
import '../../proto/proto.dart' as proto;
import '../../tools/tools.dart';
//////////////////////////////////////////////////
@ -16,7 +17,8 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat> {
required ActiveAccountInfo activeAccountInfo,
required proto.Account account,
required this.activeChatCubit,
}) : super(
}) : _activeAccountInfo = activeAccountInfo,
super(
open: () => _open(activeAccountInfo, account),
decodeElement: proto.Chat.fromBuffer);
@ -50,15 +52,24 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat> {
throw Exception('Failed to get chat');
}
final c = proto.Chat.fromBuffer(cbuf);
if (c.remoteConversationKey == remoteConversationRecordKeyProto) {
if (c.remoteConversationRecordKey == remoteConversationRecordKeyProto) {
// Nothing to do here
return;
}
}
final accountRecordKey = _activeAccountInfo
.userLogin.accountRecordInfo.accountRecord.recordKey;
// Make a record that can store the reconciled version of the chat
final reconciledChatRecord =
await (await DHTShortArray.create(parent: accountRecordKey))
.scope((r) async => r.recordPointer);
// Create conversation type Chat
final chat = proto.Chat()
..type = proto.ChatType.SINGLE_CONTACT
..remoteConversationKey = remoteConversationRecordKeyProto;
..remoteConversationRecordKey = remoteConversationRecordKeyProto
..reconciledChatRecord = reconciledChatRecord.toProto();
// Add chat
final added = await shortArray.tryAddItem(chat.writeToBuffer());
@ -71,8 +82,9 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat> {
/// Delete a chat
Future<void> deleteChat(
{required TypedKey remoteConversationRecordKey}) async {
// Create conversation type Chat
final remoteConversationKey = remoteConversationRecordKey.toProto();
final accountRecordKey =
_activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
// Remove Chat from account's list
// if this fails, don't keep retrying, user can try again later
@ -86,8 +98,18 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat> {
throw Exception('Failed to get chat');
}
final c = proto.Chat.fromBuffer(cbuf);
if (c.remoteConversationKey == remoteConversationKey) {
await shortArray.tryRemoveItem(i);
if (c.remoteConversationRecordKey == remoteConversationKey) {
// Found the right chat
if (await shortArray.tryRemoveItem(i) != null) {
try {
await (await DHTShortArray.openOwned(
c.reconciledChatRecord.toVeilid(),
parent: accountRecordKey))
.delete();
} on Exception catch (e) {
log.debug('error removing reconciled chat record: $e', e);
}
}
return;
}
}
@ -95,4 +117,5 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat> {
}
final ActiveChatCubit activeChatCubit;
final ActiveAccountInfo _activeAccountInfo;
}

View File

@ -1,3 +1,3 @@
export 'active_conversation_messages_bloc_map_cubit.dart';
export 'active_single_contact_chat_bloc_map_cubit.dart';
export 'active_conversations_bloc_map_cubit.dart';
export 'chat_list_cubit.dart';

View File

@ -40,7 +40,7 @@ class ChatSingleContactListWidget extends StatelessWidget {
initialList: chatList.toList(),
builder: (l, i, c) {
final contact =
contactMap[c.remoteConversationKey];
contactMap[c.remoteConversationRecordKey];
if (contact == null) {
return const Text('...');
}
@ -52,7 +52,7 @@ class ChatSingleContactListWidget extends StatelessWidget {
final lowerValue = value.toLowerCase();
return chatList.where((c) {
final contact =
contactMap[c.remoteConversationKey];
contactMap[c.remoteConversationRecordKey];
if (contact == null) {
return false;
}

View File

@ -1,4 +1,5 @@
export 'contact_invitation_list_cubit.dart';
export 'contact_request_inbox_cubit.dart';
export 'invitation_generator_cubit.dart';
export 'waiting_invitation_cubit.dart';
export 'waiting_invitations_bloc_map_cubit.dart';

View File

@ -0,0 +1,8 @@
import 'dart:typed_data';
import 'package:bloc_tools/bloc_tools.dart';
class InvitationGeneratorCubit extends FutureCubit<Uint8List> {
InvitationGeneratorCubit(super.fut);
InvitationGeneratorCubit.value(super.v) : super.value();
}

View File

@ -2,7 +2,6 @@ import 'dart:math';
import 'package:awesome_extensions/awesome_extensions.dart';
import 'package:basic_utils/basic_utils.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
@ -12,10 +11,7 @@ import 'package:qr_flutter/qr_flutter.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../tools/tools.dart';
class InvitationGeneratorCubit extends FutureCubit<Uint8List> {
InvitationGeneratorCubit(super.fut);
}
import '../contact_invitation.dart';
class ContactInvitationDisplayDialog extends StatefulWidget {
const ContactInvitationDisplayDialog({

View File

@ -108,9 +108,9 @@ class ContactInvitationItemWidget extends StatelessWidget {
await showDialog<void>(
context: context,
builder: (context) => BlocProvider(
create: (context) => InvitationGeneratorCubit(
Future.value(Uint8List.fromList(
contactInvitationRecord.invitation))),
create: (context) => InvitationGeneratorCubit
.value(Uint8List.fromList(
contactInvitationRecord.invitation)),
child: ContactInvitationDisplayDialog(
message: contactInvitationRecord.message,
)));

View File

@ -12,7 +12,6 @@ import 'package:meta/meta.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../chat/chat.dart';
import '../../proto/proto.dart' as proto;
@immutable
@ -47,7 +46,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Open local record key if it is specified
final pool = DHTRecordPool.instance;
final crypto = await getConversationCrypto();
final crypto = await _cachedConversationCrypto();
final writer = _activeAccountInfo.conversationWriter;
final record = await pool.openWrite(
_localConversationRecordKey!, writer,
@ -63,7 +62,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Open remote record key if it is specified
final pool = DHTRecordPool.instance;
final crypto = await getConversationCrypto();
final crypto = await _cachedConversationCrypto();
final record = await pool.openRead(_remoteConversationRecordKey,
parent: accountRecordKey, crypto: crypto);
await _setRemoteConversation(record);
@ -163,7 +162,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
final accountRecordKey =
_activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
final crypto = await getConversationCrypto();
final crypto = await _cachedConversationCrypto();
final writer = _activeAccountInfo.conversationWriter;
// Open with SMPL scheme for identity writer
@ -187,7 +186,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// ignore: prefer_expression_function_bodies
.deleteScope((localConversation) async {
// Make messages log
return MessagesCubit.initLocalMessages(
return initLocalMessages(
activeAccountInfo: _activeAccountInfo,
remoteIdentityPublicKey: _remoteIdentityPublicKey,
localConversationKey: localConversation.key,
@ -197,7 +196,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
..profile = profile
..identityMasterJson = jsonEncode(
_activeAccountInfo.localAccount.identityMaster.toJson())
..messages = messages.record.key.toProto();
..messages = messages.recordKey.toProto();
// Write initial conversation to record
final update = await localConversation.tryWriteProtobuf(
@ -221,6 +220,22 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
return out;
}
// Initialize local messages
Future<T> initLocalMessages<T>({
required ActiveAccountInfo activeAccountInfo,
required TypedKey remoteIdentityPublicKey,
required TypedKey localConversationKey,
required FutureOr<T> Function(DHTShortArray) callback,
}) async {
final crypto =
await activeAccountInfo.makeConversationCrypto(remoteIdentityPublicKey);
final writer = activeAccountInfo.conversationWriter;
return (await DHTShortArray.create(
parent: localConversationKey, crypto: crypto, smplWriter: writer))
.deleteScope((messages) async => await callback(messages));
}
// Force refresh of conversation keys
Future<void> refresh() async {
final lcc = _localConversationCubit;
@ -247,18 +262,14 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
return update;
}
Future<DHTRecordCrypto> getConversationCrypto() async {
Future<DHTRecordCrypto> _cachedConversationCrypto() async {
var conversationCrypto = _conversationCrypto;
if (conversationCrypto != null) {
return conversationCrypto;
}
final identitySecret = _activeAccountInfo.userLogin.identitySecret;
final cs = await Veilid.instance.getCryptoSystem(identitySecret.kind);
final sharedSecret =
await cs.cachedDH(_remoteIdentityPublicKey.value, identitySecret.value);
conversationCrypto = await _activeAccountInfo
.makeConversationCrypto(_remoteIdentityPublicKey);
conversationCrypto = await DHTRecordCryptoPrivate.fromSecret(
identitySecret.kind, sharedSecret);
_conversationCrypto = conversationCrypto;
return conversationCrypto;
}

View File

@ -151,11 +151,12 @@ class HomeAccountReadyShellState extends State<HomeAccountReadyShell> {
contactListCubit: context.read<ContactListCubit>())
..followBloc(context.read<ChatListCubit>())),
BlocProvider(
create: (context) =>
ActiveConversationMessagesBlocMapCubit(
activeAccountInfo: widget.activeAccountInfo,
)..followBloc(
context.read<ActiveConversationsBlocMapCubit>())),
create: (context) => ActiveSingleContactChatBlocMapCubit(
activeAccountInfo: widget.activeAccountInfo,
contactListCubit: context.read<ContactListCubit>(),
chatListCubit: context.read<ChatListCubit>())
..followBloc(
context.read<ActiveConversationsBlocMapCubit>())),
BlocProvider(
create: (context) => WaitingInvitationsBlocMapCubit(
activeAccountInfo: widget.activeAccountInfo,

View File

@ -456,7 +456,8 @@ class Chat extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Chat', package: const $pb.PackageName(_omitMessageNames ? '' : 'veilidchat'), createEmptyInstance: create)
..e<ChatType>(1, _omitFieldNames ? '' : 'type', $pb.PbFieldType.OE, defaultOrMaker: ChatType.CHAT_TYPE_UNSPECIFIED, valueOf: ChatType.valueOf, enumValues: ChatType.values)
..aOM<$1.TypedKey>(2, _omitFieldNames ? '' : 'remoteConversationKey', subBuilder: $1.TypedKey.create)
..aOM<$1.TypedKey>(2, _omitFieldNames ? '' : 'remoteConversationRecordKey', subBuilder: $1.TypedKey.create)
..aOM<$0.OwnedDHTRecordPointer>(3, _omitFieldNames ? '' : 'reconciledChatRecord', subBuilder: $0.OwnedDHTRecordPointer.create)
..hasRequiredFields = false
;
@ -491,15 +492,26 @@ class Chat extends $pb.GeneratedMessage {
void clearType() => clearField(1);
@$pb.TagNumber(2)
$1.TypedKey get remoteConversationKey => $_getN(1);
$1.TypedKey get remoteConversationRecordKey => $_getN(1);
@$pb.TagNumber(2)
set remoteConversationKey($1.TypedKey v) { setField(2, v); }
set remoteConversationRecordKey($1.TypedKey v) { setField(2, v); }
@$pb.TagNumber(2)
$core.bool hasRemoteConversationKey() => $_has(1);
$core.bool hasRemoteConversationRecordKey() => $_has(1);
@$pb.TagNumber(2)
void clearRemoteConversationKey() => clearField(2);
void clearRemoteConversationRecordKey() => clearField(2);
@$pb.TagNumber(2)
$1.TypedKey ensureRemoteConversationKey() => $_ensure(1);
$1.TypedKey ensureRemoteConversationRecordKey() => $_ensure(1);
@$pb.TagNumber(3)
$0.OwnedDHTRecordPointer get reconciledChatRecord => $_getN(2);
@$pb.TagNumber(3)
set reconciledChatRecord($0.OwnedDHTRecordPointer v) { setField(3, v); }
@$pb.TagNumber(3)
$core.bool hasReconciledChatRecord() => $_has(2);
@$pb.TagNumber(3)
void clearReconciledChatRecord() => clearField(3);
@$pb.TagNumber(3)
$0.OwnedDHTRecordPointer ensureReconciledChatRecord() => $_ensure(2);
}
class Account extends $pb.GeneratedMessage {

View File

@ -185,15 +185,17 @@ const Chat$json = {
'1': 'Chat',
'2': [
{'1': 'type', '3': 1, '4': 1, '5': 14, '6': '.veilidchat.ChatType', '10': 'type'},
{'1': 'remote_conversation_key', '3': 2, '4': 1, '5': 11, '6': '.veilid.TypedKey', '10': 'remoteConversationKey'},
{'1': 'remote_conversation_record_key', '3': 2, '4': 1, '5': 11, '6': '.veilid.TypedKey', '10': 'remoteConversationRecordKey'},
{'1': 'reconciled_chat_record', '3': 3, '4': 1, '5': 11, '6': '.dht.OwnedDHTRecordPointer', '10': 'reconciledChatRecord'},
],
};
/// Descriptor for `Chat`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List chatDescriptor = $convert.base64Decode(
'CgRDaGF0EigKBHR5cGUYASABKA4yFC52ZWlsaWRjaGF0LkNoYXRUeXBlUgR0eXBlEkgKF3JlbW'
'90ZV9jb252ZXJzYXRpb25fa2V5GAIgASgLMhAudmVpbGlkLlR5cGVkS2V5UhVyZW1vdGVDb252'
'ZXJzYXRpb25LZXk=');
'CgRDaGF0EigKBHR5cGUYASABKA4yFC52ZWlsaWRjaGF0LkNoYXRUeXBlUgR0eXBlElUKHnJlbW'
'90ZV9jb252ZXJzYXRpb25fcmVjb3JkX2tleRgCIAEoCzIQLnZlaWxpZC5UeXBlZEtleVIbcmVt'
'b3RlQ29udmVyc2F0aW9uUmVjb3JkS2V5ElAKFnJlY29uY2lsZWRfY2hhdF9yZWNvcmQYAyABKA'
'syGi5kaHQuT3duZWRESFRSZWNvcmRQb2ludGVyUhRyZWNvbmNpbGVkQ2hhdFJlY29yZA==');
@$core.Deprecated('Use accountDescriptor instead')
const Account$json = {

View File

@ -42,8 +42,12 @@ message Message {
repeated Attachment attachments = 5;
}
// A record of a 1-1 chat that is synchronized between
// two users. Visible and encrypted for the other party
// The means of direct communications that is synchronized between
// two users. Visible and encrypted for the other party.
// Includes communications for:
// * Profile changes
// * Identity changes
// * 1-1 chat messages
//
// DHT Schema: SMPL(0,1,[identityPublicKey])
// DHT Key (UnicastOutbox): localConversation
@ -117,12 +121,15 @@ enum ChatType {
GROUP = 2;
}
// Either a 1-1 converation or a group chat (eventually)
// Either a 1-1 conversation or a group chat (eventually)
// Privately encrypted, this is the local user's copy of the chat
message Chat {
// What kind of chat is this
ChatType type = 1;
// 1-1 Chat key
veilid.TypedKey remote_conversation_key = 2;
// Conversation key for the other party
veilid.TypedKey remote_conversation_record_key = 2;
// Reconciled chat record DHTLog (xxx for now DHTShortArray)
dht.OwnedDHTRecordPointer reconciled_chat_record = 3;
}
// A record of an individual account

View File

@ -12,4 +12,5 @@ abstract class FutureCubit<State> extends Cubit<AsyncValue<State>> {
emit(AsyncValue.error(e, stackTrace));
}));
}
FutureCubit.value(State state) : super(AsyncValue.data(state));
}

View File

@ -2,8 +2,5 @@
library dht_support;
export 'src/dht_record_crypto.dart';
export 'src/dht_record_cubit.dart';
export 'src/dht_record_pool.dart';
export 'src/dht_short_array.dart';
export 'src/dht_short_array_cubit.dart';
export 'src/dht_record/barrel.dart';
export 'src/dht_short_array/barrel.dart';

View File

@ -42,6 +42,10 @@ message DHTShortArray {
// key = idx / stride
// subkey = idx % stride
bytes index = 2;
// Most recent sequence numbers for elements
repeated uint32 seqs = 3;
// Free items are not represented in the list but can be
// calculated through iteration
}

View File

@ -0,0 +1,3 @@
export 'dht_record_crypto.dart';
export 'dht_record_cubit.dart';
export 'dht_record_pool.dart';

View File

@ -347,6 +347,11 @@ class DHTRecord {
}
}
Future<DHTRecordReport> inspect(
{List<ValueSubkeyRange>? subkeys,
DHTReportScope scope = DHTReportScope.local}) =>
_routingContext.inspectDHTRecord(key, subkeys: subkeys, scope: scope);
void _addValueChange(
{required bool local,
required Uint8List data,

View File

@ -1,6 +1,6 @@
import 'dart:async';
import 'dart:typed_data';
import '../../../../veilid_support.dart';
import '../../../../../veilid_support.dart';
abstract class DHTRecordCrypto {
Future<Uint8List> encrypt(Uint8List data, int subkey);

View File

@ -4,7 +4,7 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import '../../veilid_support.dart';
import '../../../veilid_support.dart';
typedef InitialStateFunction<T> = Future<T?> Function(DHTRecord);
typedef StateFunction<T> = Future<T?> Function(

View File

@ -12,7 +12,6 @@ import '../../../../veilid_support.dart';
part 'dht_record_pool.freezed.dart';
part 'dht_record_pool.g.dart';
part 'dht_record.dart';
const int watchBackoffMultiplier = 2;

View File

@ -12,7 +12,7 @@ part of 'dht_record_pool.dart';
T _$identity<T>(T value) => value;
final _privateConstructorUsedError = UnsupportedError(
'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#custom-getters-and-methods');
'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#adding-getters-and-methods-to-our-models');
DHTRecordPoolAllocations _$DHTRecordPoolAllocationsFromJson(
Map<String, dynamic> json) {

View File

@ -1,904 +0,0 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import '../../../../veilid_support.dart';
import '../proto/proto.dart' as proto;
class _DHTShortArrayCache {
_DHTShortArrayCache()
: linkedRecords = List<DHTRecord>.empty(growable: true),
index = List<int>.empty(growable: true),
free = List<int>.empty(growable: true);
_DHTShortArrayCache.from(_DHTShortArrayCache other)
: linkedRecords = List.of(other.linkedRecords),
index = List.of(other.index),
free = List.of(other.free);
final List<DHTRecord> linkedRecords;
final List<int> index;
final List<int> free;
proto.DHTShortArray toProto() {
final head = proto.DHTShortArray();
head.keys.addAll(linkedRecords.map((lr) => lr.key.toProto()));
head.index = head.index..addAll(index);
// Do not serialize free list, it gets recreated
return head;
}
}
///////////////////////////////////////////////////////////////////////
class DHTShortArray {
////////////////////////////////////////////////////////////////
// Constructors
DHTShortArray._({required DHTRecord headRecord}) : _headRecord = headRecord {
late final int stride;
switch (headRecord.schema) {
case DHTSchemaDFLT(oCnt: final oCnt):
if (oCnt <= 1) {
throw StateError('Invalid DFLT schema in DHTShortArray');
}
stride = oCnt - 1;
case DHTSchemaSMPL(oCnt: final oCnt, members: final members):
if (oCnt != 0 || members.length != 1 || members[0].mCnt <= 1) {
throw StateError('Invalid SMPL schema in DHTShortArray');
}
stride = members[0].mCnt - 1;
}
assert(stride <= maxElements, 'stride too long');
_stride = stride;
}
// Create a DHTShortArray
// if smplWriter is specified, uses a SMPL schema with a single writer
// rather than the key owner
static Future<DHTShortArray> create(
{int stride = maxElements,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
KeyPair? smplWriter}) async {
assert(stride <= maxElements, 'stride too long');
final pool = DHTRecordPool.instance;
late final DHTRecord dhtRecord;
if (smplWriter != null) {
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]);
dhtRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
} else {
final schema = DHTSchema.dflt(oCnt: stride + 1);
dhtRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto);
}
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
if (!await dhtShortArray._tryWriteHead()) {
throw StateError('Failed to write head at this time');
}
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.delete();
rethrow;
}
}
static Future<DHTShortArray> openRead(TypedKey headRecordKey,
{VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
final dhtRecord = await DHTRecordPool.instance.openRead(headRecordKey,
parent: parent, routingContext: routingContext, crypto: crypto);
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
await dhtShortArray._refreshHead();
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.close();
rethrow;
}
}
static Future<DHTShortArray> openWrite(
TypedKey headRecordKey,
KeyPair writer, {
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
}) async {
final dhtRecord = await DHTRecordPool.instance.openWrite(
headRecordKey, writer,
parent: parent, routingContext: routingContext, crypto: crypto);
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
await dhtShortArray._refreshHead();
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.close();
rethrow;
}
}
static Future<DHTShortArray> openOwned(
OwnedDHTRecordPointer ownedDHTRecordPointer, {
required TypedKey parent,
VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto,
}) =>
openWrite(
ownedDHTRecordPointer.recordKey,
ownedDHTRecordPointer.owner,
routingContext: routingContext,
parent: parent,
crypto: crypto,
);
////////////////////////////////////////////////////////////////////////////
// Public API
/// Returns the first record in the DHTShortArray which contains its control
/// information.
DHTRecord get record => _headRecord;
/// Returns the number of elements in the DHTShortArray
int get length => _head.index.length;
/// Free all resources for the DHTShortArray
Future<void> close() async {
await _watchController?.close();
final futures = <Future<void>>[_headRecord.close()];
for (final lr in _head.linkedRecords) {
futures.add(lr.close());
}
await Future.wait(futures);
}
/// Free all resources for the DHTShortArray and delete it from the DHT
Future<void> delete() async {
await _watchController?.close();
final futures = <Future<void>>[_headRecord.delete()];
for (final lr in _head.linkedRecords) {
futures.add(lr.delete());
}
await Future.wait(futures);
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(DHTShortArray) scopeFunction) async {
try {
return await scopeFunction(this);
} finally {
await close();
}
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, and deleted if an an
/// uncaught exception is thrown
Future<T> deleteScope<T>(
Future<T> Function(DHTShortArray) scopeFunction) async {
try {
final out = await scopeFunction(this);
await close();
return out;
} on Exception catch (_) {
await delete();
rethrow;
}
}
/// Return the item at position 'pos' in the DHTShortArray. If 'foreRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
if (pos < 0 || pos >= _head.index.length) {
throw IndexError.withLength(pos, _head.index.length);
}
final index = _head.index[pos];
final recordNumber = index ~/ _stride;
final record = _getLinkedRecord(recordNumber);
assert(record != null, 'Record does not exist');
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh);
}
/// Return a list of all of the items in the DHTShortArray. If 'foreRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) async {
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
final out = <Uint8List>[];
for (var pos = 0; pos < _head.index.length; pos++) {
final index = _head.index[pos];
final recordNumber = index ~/ _stride;
final record = _getLinkedRecord(recordNumber);
if (record == null) {
assert(record != null, 'Record does not exist');
return null;
}
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
final elem =
await record.get(subkey: recordSubkey, forceRefresh: forceRefresh);
if (elem == null) {
return null;
}
out.add(elem);
}
return out;
}
/// Convenience function:
/// Like getItem but also parses the returned element as JSON
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as JSON
Future<List<T>?> getAllItemsJson<T>(T Function(dynamic) fromJson,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromJson).toList());
/// Convenience function:
/// Like getItem but also parses the returned element as a protobuf object
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as protobuf objects
Future<List<T>?> getAllItemsProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromBuffer).toList());
/// Try to add an item to the end of the DHTShortArray. Return true if the
/// element was successfully added, and false if the state changed before
/// the element could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
late final int pos;
try {
// Allocate empty index
final idx = _emptyIndex();
// Add new index
pos = _head.index.length;
_head.index.add(idx);
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
// Head write succeeded, now write item
await eventualWriteItem(pos, value);
return true;
}
/// Try to insert an item as position 'pos' of the DHTShortArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
// Allocate empty index
final idx = _emptyIndex();
// Add new index
_head.index.insert(pos, idx);
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
// Head write succeeded, now write item
await eventualWriteItem(pos, value);
return true;
}
/// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray.
/// Return true if the elements were successfully swapped, and false if the
/// state changed before the elements could be swapped or newer values were
/// found on the network.
Future<bool> trySwapItem(int aPos, int bPos) async {
if (aPos == bPos) {
return false;
}
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
// Add new index
final aIdx = _head.index[aPos];
final bIdx = _head.index[bPos];
_head.index[aPos] = bIdx;
_head.index[bPos] = aIdx;
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
// A change happened, notify any listeners
_watchController?.sink.add(null);
return true;
}
/// Try to remove an item at position 'pos' in the DHTShortArray.
/// Return the element if it was successfully removed, and null if the
/// state changed before the elements could be removed or newer values were
/// found on the network.
Future<Uint8List?> tryRemoveItem(int pos) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
final removedIdx = _head.index.removeAt(pos);
_freeIndex(removedIdx);
final recordNumber = removedIdx ~/ _stride;
final record = _getLinkedRecord(recordNumber);
assert(record != null, 'Record does not exist');
final recordSubkey =
(removedIdx % _stride) + ((recordNumber == 0) ? 1 : 0);
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return null;
}
final result = await record!.get(subkey: recordSubkey);
// A change happened, notify any listeners
_watchController?.sink.add(null);
return result;
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return null;
}
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemJson<T>(
T Function(dynamic) fromJson,
int pos,
) =>
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos) =>
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
/// Try to remove all items in the DHTShortArray.
/// Return true if it was successfully cleared, and false if the
/// state changed before the elements could be cleared or newer values were
/// found on the network.
Future<bool> tryClear() async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
_head.index.clear();
_head.free.clear();
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
// A change happened, notify any listeners
_watchController?.sink.add(null);
return true;
}
/// Try to set an item at position 'pos' of the DHTShortArray.
/// Return true if the element was successfully set, and false if the
/// state changed before the element could be set or a newer value was
/// found on the network.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<Uint8List?> tryWriteItem(int pos, Uint8List newValue) async {
if (await _refreshHead(onlyUpdates: true)) {
throw StateError('structure changed');
}
if (pos < 0 || pos >= _head.index.length) {
throw IndexError.withLength(pos, _head.index.length);
}
final index = _head.index[pos];
final recordNumber = index ~/ _stride;
final record = await _getOrCreateLinkedRecord(recordNumber);
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
final result = await record.tryWriteBytes(newValue, subkey: recordSubkey);
if (result == null) {
// A newer value was not found, so the change took
_watchController?.sink.add(null);
}
return result;
}
/// Set an item at position 'pos' of the DHTShortArray. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualWriteItem(int pos, Uint8List newValue) async {
Uint8List? oldData;
do {
// Set it back
oldData = await tryWriteItem(pos, newValue);
// Repeat if newer data on the network was found
} while (oldData != null);
}
/// Change an item at position 'pos' of the DHTShortArray.
/// Runs with the value of the old element at that position such that it can
/// be changed to the returned value from tha closure. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualUpdateItem(
int pos, Future<Uint8List> Function(Uint8List? oldValue) update) async {
var oldData = await getItem(pos);
// Ensure it exists already
if (oldData == null) {
throw const FormatException('value does not exist');
}
do {
// Update the data
final updatedData = await update(oldData);
// Set it back
oldData = await tryWriteItem(pos, updatedData);
// Repeat if newer data on the network was found
} while (oldData != null);
}
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<T?> tryWriteItemJson<T>(
T Function(dynamic) fromJson,
int pos,
T newValue,
) =>
tryWriteItem(pos, jsonEncodeBytes(newValue))
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<T?> tryWriteItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
T newValue,
) =>
tryWriteItem(pos, newValue.writeToBuffer()).then((out) {
if (out == null) {
return null;
}
return fromBuffer(out);
});
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as JSON and parses
/// the returned element as JSON
Future<void> eventualWriteItemJson<T>(int pos, T newValue) =>
eventualWriteItem(pos, jsonEncodeBytes(newValue));
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as a protobuf
/// object and parses the returned element as a protobuf object
Future<void> eventualWriteItemProtobuf<T extends GeneratedMessage>(
int pos, T newValue,
{int subkey = -1}) =>
eventualWriteItem(pos, newValue.writeToBuffer());
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as JSON
Future<void> eventualUpdateItemJson<T>(
T Function(dynamic) fromJson,
int pos,
Future<T> Function(T?) update,
) =>
eventualUpdateItem(pos, jsonUpdate(fromJson, update));
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as a protobuf
/// object
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
Future<T> Function(T?) update,
) =>
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update));
////////////////////////////////////////////////////////////////
// Internal Operations
DHTRecord? _getLinkedRecord(int recordNumber) {
if (recordNumber == 0) {
return _headRecord;
}
recordNumber--;
if (recordNumber >= _head.linkedRecords.length) {
return null;
}
return _head.linkedRecords[recordNumber];
}
Future<DHTRecord> _getOrCreateLinkedRecord(int recordNumber) async {
if (recordNumber == 0) {
return _headRecord;
}
final pool = DHTRecordPool.instance;
recordNumber--;
while (recordNumber >= _head.linkedRecords.length) {
// Linked records must use SMPL schema so writer can be specified
// Use the same writer as the head record
final smplWriter = _headRecord.writer!;
final parent = pool.getParentRecordKey(_headRecord.key);
final routingContext = _headRecord.routingContext;
final crypto = _headRecord.crypto;
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: _stride)]);
final dhtCreateRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
// Reopen with SMPL writer
await dhtCreateRecord.close();
final dhtRecord = await pool.openWrite(dhtCreateRecord.key, smplWriter,
parent: parent, routingContext: routingContext, crypto: crypto);
// Add to linked records
_head.linkedRecords.add(dhtRecord);
if (!await _tryWriteHead()) {
await _refreshHead();
}
}
return _head.linkedRecords[recordNumber];
}
int _emptyIndex() {
if (_head.free.isNotEmpty) {
return _head.free.removeLast();
}
if (_head.index.length == maxElements) {
throw StateError('too many elements');
}
return _head.index.length;
}
void _freeIndex(int idx) {
_head.free.add(idx);
// xxx: free list optimization here?
}
/// Serialize and write out the current head record, possibly updating it
/// if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> _tryWriteHead() async {
final head = _head.toProto();
final headBuffer = head.writeToBuffer();
final existingData = await _headRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _newHead(proto.DHTShortArray.fromBuffer(existingData));
return false;
}
return true;
}
/// Validate the head from the DHT is properly formatted
/// and calculate the free list from it while we're here
List<int> _validateHeadCacheData(
List<Typed<FixedEncodedString43>> linkedKeys, List<int> index) {
// Ensure nothing is duplicated in the linked keys set
final newKeys = linkedKeys.toSet();
assert(newKeys.length <= (maxElements + (_stride - 1)) ~/ _stride,
'too many keys');
assert(newKeys.length == linkedKeys.length, 'duplicated linked keys');
final newIndex = index.toSet();
assert(newIndex.length <= maxElements, 'too many indexes');
assert(newIndex.length == index.length, 'duplicated index locations');
// Ensure all the index keys fit into the existing records
final indexCapacity = (linkedKeys.length + 1) * _stride;
int? maxIndex;
for (final idx in newIndex) {
assert(idx >= 0 || idx < indexCapacity, 'index out of range');
if (maxIndex == null || idx > maxIndex) {
maxIndex = idx;
}
}
final free = <int>[];
if (maxIndex != null) {
for (var i = 0; i < maxIndex; i++) {
if (!newIndex.contains(i)) {
free.add(i);
}
}
}
return free;
}
/// Open a linked record for reading or writing, same as the head record
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecordPool.instance.openWrite(
recordKey,
writer,
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
)
: await DHTRecordPool.instance.openRead(
recordKey,
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
);
}
/// Validate a new head record
Future<void> _newHead(proto.DHTShortArray head) async {
// Get the set of new linked keys and validate it
final linkedKeys = head.keys.map((p) => p.toVeilid()).toList();
final index = head.index;
final free = _validateHeadCacheData(linkedKeys, index);
// See which records are actually new
final oldRecords = Map<TypedKey, DHTRecord>.fromEntries(
_head.linkedRecords.map((lr) => MapEntry(lr.key, lr)));
final newRecords = <TypedKey, DHTRecord>{};
final sameRecords = <TypedKey, DHTRecord>{};
try {
for (var n = 0; n < linkedKeys.length; n++) {
final newKey = linkedKeys[n];
final oldRecord = oldRecords[newKey];
if (oldRecord == null) {
// Open the new record
final newRecord = await _openLinkedRecord(newKey);
newRecords[newKey] = newRecord;
} else {
sameRecords[newKey] = oldRecord;
}
}
} on Exception catch (_) {
// On any exception close the records we have opened
await Future.wait(newRecords.entries.map((e) => e.value.close()));
rethrow;
}
// From this point forward we should not throw an exception or everything
// is possibly invalid. Just pass the exception up it happens and the caller
// will have to delete this short array and reopen it if it can
await Future.wait(oldRecords.entries
.where((e) => !sameRecords.containsKey(e.key))
.map((e) => e.value.close()));
// Figure out which indices are free
// Make the new head cache
_head = _DHTShortArrayCache()
..linkedRecords.addAll(
linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!))
..index.addAll(index)
..free.addAll(free);
// Update watch if we have one in case linked records have been added
if (_watchController != null) {
await _watchAllRecords();
}
}
/// Pull the latest or updated copy of the head record from the network
Future<bool> _refreshHead(
{bool forceRefresh = true, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) {
if (onlyUpdates) {
// No update
return false;
}
throw StateError('head missing during refresh');
}
await _newHead(head);
return true;
}
// Watch head and all linked records
Future<void> _watchAllRecords() async {
// This will update any existing watches if necessary
try {
await [_headRecord.watch(), ..._head.linkedRecords.map((r) => r.watch())]
.wait;
// Update changes to the head record
// Don't watch for local changes because this class already handles
// notifying listeners and knows when it makes local changes
if (!_subscriptions.containsKey(_headRecord.key)) {
_subscriptions[_headRecord.key] =
await _headRecord.listen(localChanges: false, _onUpdateRecord);
}
// Update changes to any linked records
for (final lr in _head.linkedRecords) {
if (!_subscriptions.containsKey(lr.key)) {
_subscriptions[lr.key] =
await lr.listen(localChanges: false, _onUpdateRecord);
}
}
} on Exception {
// If anything fails, try to cancel the watches
await _cancelRecordWatches();
rethrow;
}
}
// Stop watching for changes to head and linked records
Future<void> _cancelRecordWatches() async {
await _headRecord.cancelWatch();
for (final lr in _head.linkedRecords) {
await lr.cancelWatch();
}
await _subscriptions.values.map((s) => s.cancel()).wait;
_subscriptions.clear();
}
// Called when a head or linked record changes
Future<void> _onUpdateRecord(
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys) async {
// If head record subkey zero changes, then the layout
// of the dhtshortarray has changed
var updateHead = false;
if (record == _headRecord && subkeys.containsSubkey(0)) {
updateHead = true;
}
// If we have any other subkeys to update, do them first
final unord = List<Future<Uint8List?>>.empty(growable: true);
for (final skr in subkeys) {
for (var subkey = skr.low; subkey <= skr.high; subkey++) {
// Skip head subkey
if (updateHead && subkey == 0) {
continue;
}
// Get the subkey, which caches the result in the local record store
unord.add(record.get(subkey: subkey, forceRefresh: true));
}
}
await unord.wait;
// Then update the head record
if (updateHead) {
await _refreshHead(forceRefresh: false);
}
// Then commit the change to any listeners
_watchController?.sink.add(null);
}
Future<StreamSubscription<void>> listen(
void Function() onChanged,
) =>
_listenMutex.protect(() async {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
// Cancel watches of head and linked records
await _cancelRecordWatches();
_watchController = null;
}));
});
// Start watching head and linked records
await _watchAllRecords();
}
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
});
////////////////////////////////////////////////////////////////
// Fields
static const maxElements = 256;
// Head DHT record
final DHTRecord _headRecord;
// How many elements per linked record
late final int _stride;
// Cached representation refreshed from head record
_DHTShortArrayCache _head = _DHTShortArrayCache();
// Subscription to head and linked record internal changes
final Map<TypedKey, StreamSubscription<DHTRecordWatchChange>> _subscriptions =
{};
// Stream of external changes
StreamController<void>? _watchController;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
// Head/element mutex to ensure we keep the representation valid
final Mutex _headMutex = Mutex();
}

View File

@ -0,0 +1,2 @@
export 'dht_short_array.dart';
export 'dht_short_array_cubit.dart';

View File

@ -0,0 +1,596 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto;
part 'dht_short_array_head.dart';
///////////////////////////////////////////////////////////////////////
class DHTShortArray {
////////////////////////////////////////////////////////////////
// Constructors
DHTShortArray._({required DHTRecord headRecord})
: _head = _DHTShortArrayHead(headRecord: headRecord) {}
// Create a DHTShortArray
// if smplWriter is specified, uses a SMPL schema with a single writer
// rather than the key owner
static Future<DHTShortArray> create(
{int stride = maxElements,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
KeyPair? smplWriter}) async {
assert(stride <= maxElements, 'stride too long');
final pool = DHTRecordPool.instance;
late final DHTRecord dhtRecord;
if (smplWriter != null) {
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]);
dhtRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
} else {
final schema = DHTSchema.dflt(oCnt: stride + 1);
dhtRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto);
}
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
if (!await dhtShortArray._head._tryWriteHead()) {
throw StateError('Failed to write head at this time');
}
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.delete();
rethrow;
}
}
static Future<DHTShortArray> openRead(TypedKey headRecordKey,
{VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
final dhtRecord = await DHTRecordPool.instance.openRead(headRecordKey,
parent: parent, routingContext: routingContext, crypto: crypto);
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
await dhtShortArray._head._refreshInner();
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.close();
rethrow;
}
}
static Future<DHTShortArray> openWrite(
TypedKey headRecordKey,
KeyPair writer, {
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
}) async {
final dhtRecord = await DHTRecordPool.instance.openWrite(
headRecordKey, writer,
parent: parent, routingContext: routingContext, crypto: crypto);
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
await dhtShortArray._head._refreshInner();
return dhtShortArray;
} on Exception catch (_) {
await dhtRecord.close();
rethrow;
}
}
static Future<DHTShortArray> openOwned(
OwnedDHTRecordPointer ownedDHTRecordPointer, {
required TypedKey parent,
VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto,
}) =>
openWrite(
ownedDHTRecordPointer.recordKey,
ownedDHTRecordPointer.owner,
routingContext: routingContext,
parent: parent,
crypto: crypto,
);
////////////////////////////////////////////////////////////////////////////
// Public API
// External references for the shortarray
TypedKey get recordKey => _head.headRecord.key;
OwnedDHTRecordPointer get recordPointer =>
_head.headRecord.ownedDHTRecordPointer;
/// Returns the number of elements in the DHTShortArray
int get length => _head.index.length;
/// Free all resources for the DHTShortArray
Future<void> close() async {
await _watchController?.close();
await _head.close();
}
/// Free all resources for the DHTShortArray and delete it from the DHT
Future<void> delete() async {
await _watchController?.close();
await _head.delete();
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(DHTShortArray) scopeFunction) async {
try {
return await scopeFunction(this);
} finally {
await close();
}
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, and deleted if an an
/// uncaught exception is thrown
Future<T> deleteScope<T>(
Future<T> Function(DHTShortArray) scopeFunction) async {
try {
final out = await scopeFunction(this);
await close();
return out;
} on Exception catch (_) {
await delete();
rethrow;
}
}
/// Return the item at position 'pos' in the DHTShortArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async =>
_head.operate(
(head) async => _getItemInner(head, pos, forceRefresh: forceRefresh));
Future<Uint8List?> _getItemInner(_DHTShortArrayHead head, int pos,
{bool forceRefresh = false}) async {
if (pos < 0 || pos >= head.index.length) {
throw IndexError.withLength(pos, head.index.length);
}
final index = head.index[pos];
final recordNumber = index ~/ head.stride;
final record = head.getLinkedRecord(recordNumber);
if (record == null) {
throw StateError('Record does not exist');
}
final recordSubkey = (index % head.stride) + ((recordNumber == 0) ? 1 : 0);
final refresh = forceRefresh || head.indexNeedsRefresh(index);
final out = record.get(subkey: recordSubkey, forceRefresh: refresh);
await head.updateIndexSeq(index, false);
return out;
}
/// Return a list of all of the items in the DHTShortArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) async =>
_head.operate((head) async {
final out = <Uint8List>[];
for (var pos = 0; pos < head.index.length; pos++) {
final elem =
await _getItemInner(head, pos, forceRefresh: forceRefresh);
if (elem == null) {
return null;
}
out.add(elem);
}
return out;
});
/// Convenience function:
/// Like getItem but also parses the returned element as JSON
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as JSON
Future<List<T>?> getAllItemsJson<T>(T Function(dynamic) fromJson,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromJson).toList());
/// Convenience function:
/// Like getItem but also parses the returned element as a protobuf object
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as protobuf objects
Future<List<T>?> getAllItemsProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromBuffer).toList());
/// Try to add an item to the end of the DHTShortArray. Return true if the
/// element was successfully added, and false if the state changed before
/// the element could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value) async {
final out = await _head
.operateWrite((head) async => _tryAddItemInner(head, value)) ??
false;
// Send update
_watchController?.sink.add(null);
return out;
}
Future<bool> _tryAddItemInner(
_DHTShortArrayHead head, Uint8List value) async {
// Allocate empty index
final index = head.emptyIndex();
// Add new index
final pos = head.index.length;
head.index.add(index);
// Write item
final (_, wasSet) = await _tryWriteItemInner(head, pos, value);
if (!wasSet) {
return false;
}
// Get sequence number written
await head.updateIndexSeq(index, true);
return true;
}
/// Try to insert an item as position 'pos' of the DHTShortArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value) async {
final out = await _head.operateWrite(
(head) async => _tryInsertItemInner(head, pos, value)) ??
false;
// Send update
_watchController?.sink.add(null);
return out;
}
Future<bool> _tryInsertItemInner(
_DHTShortArrayHead head, int pos, Uint8List value) async {
// Allocate empty index
final index = head.emptyIndex();
// Add new index
_head.index.insert(pos, index);
// Write item
final (_, wasSet) = await _tryWriteItemInner(head, pos, value);
if (!wasSet) {
return false;
}
// Get sequence number written
await head.updateIndexSeq(index, true);
return true;
}
/// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray.
/// Return true if the elements were successfully swapped, and false if the
/// state changed before the elements could be swapped or newer values were
/// found on the network.
/// This may throw an exception if either of the positions swapped exceed
/// the length of the list
Future<bool> trySwapItem(int aPos, int bPos) async {
final out = await _head.operateWrite(
(head) async => _trySwapItemInner(head, aPos, bPos)) ??
false;
// Send update
_watchController?.sink.add(null);
return out;
}
Future<bool> _trySwapItemInner(
_DHTShortArrayHead head, int aPos, int bPos) async {
// No-op case
if (aPos == bPos) {
return true;
}
// Swap indices
final aIdx = _head.index[aPos];
final bIdx = _head.index[bPos];
_head.index[aPos] = bIdx;
_head.index[bPos] = aIdx;
return true;
}
/// Try to remove an item at position 'pos' in the DHTShortArray.
/// Return the element if it was successfully removed, and null if the
/// state changed before the elements could be removed or newer values were
/// found on the network.
/// This may throw an exception if the position removed exceeeds the length of
/// the list.
Future<Uint8List?> tryRemoveItem(int pos) async {
final out =
_head.operateWrite((head) async => _tryRemoveItemInner(head, pos));
// Send update
_watchController?.sink.add(null);
return out;
}
Future<Uint8List> _tryRemoveItemInner(
_DHTShortArrayHead head, int pos) async {
final index = _head.index.removeAt(pos);
final recordNumber = index ~/ head.stride;
final record = head.getLinkedRecord(recordNumber);
if (record == null) {
throw StateError('Record does not exist');
}
final recordSubkey = (index % head.stride) + ((recordNumber == 0) ? 1 : 0);
final result = await record.get(subkey: recordSubkey);
if (result == null) {
throw StateError('Element does not exist');
}
head.freeIndex(index);
return result;
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemJson<T>(
T Function(dynamic) fromJson,
int pos,
) =>
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos) =>
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
/// Try to remove all items in the DHTShortArray.
/// Return true if it was successfully cleared, and false if the
/// state changed before the elements could be cleared or newer values were
/// found on the network.
Future<bool> tryClear() async {
final out =
await _head.operateWrite((head) async => _tryClearInner(head)) ?? false;
// Send update
_watchController?.sink.add(null);
return out;
}
Future<bool> _tryClearInner(_DHTShortArrayHead head) async {
head.index.clear();
head.free.clear();
return true;
}
/// Try to set an item at position 'pos' of the DHTShortArray.
/// If the set was successful this returns:
/// * The prior contents of the element, or null if there was no value yet
/// * A boolean true
/// If the set was found a newer value on the network:
/// * The newer value of the element, or null if the head record
/// changed.
/// * A boolean false
/// This may throw an exception if the position exceeds the built-in limit of
/// 'maxElements = 256' entries.
Future<(Uint8List?, bool)> tryWriteItem(int pos, Uint8List newValue) async {
final out = await _head
.operateWrite((head) async => _tryWriteItemInner(head, pos, newValue));
if (out == null) {
return (null, false);
}
// Send update
_watchController?.sink.add(null);
return out;
}
Future<(Uint8List?, bool)> _tryWriteItemInner(
_DHTShortArrayHead head, int pos, Uint8List newValue) async {
if (pos < 0 || pos >= head.index.length) {
throw IndexError.withLength(pos, _head.index.length);
}
final index = head.index[pos];
final recordNumber = index ~/ head.stride;
final record = await head.getOrCreateLinkedRecord(recordNumber);
final recordSubkey = (index % head.stride) + ((recordNumber == 0) ? 1 : 0);
final oldValue = await record.get(subkey: recordSubkey);
final result = await record.tryWriteBytes(newValue, subkey: recordSubkey);
if (result != null) {
// A result coming back means the element was overwritten already
return (result, false);
}
return (oldValue, true);
}
/// Set an item at position 'pos' of the DHTShortArray. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualWriteItem(int pos, Uint8List newValue,
{Duration? timeout}) async {
await _head.operateWriteEventual((head) async {
bool wasSet;
(_, wasSet) = await _tryWriteItemInner(head, pos, newValue);
return wasSet;
}, timeout: timeout);
// Send update
_watchController?.sink.add(null);
}
/// Change an item at position 'pos' of the DHTShortArray.
/// Runs with the value of the old element at that position such that it can
/// be changed to the returned value from tha closure. Retries until the
/// value being written is successfully made the newest value of the element.
/// This may throw an exception if the position elements the built-in limit of
/// 'maxElements = 256' entries.
Future<void> eventualUpdateItem(
int pos, Future<Uint8List> Function(Uint8List? oldValue) update,
{Duration? timeout}) async {
await _head.operateWriteEventual((head) async {
final oldData = await getItem(pos);
// Update the data
final updatedData = await update(oldData);
// Set it back
bool wasSet;
(_, wasSet) = await _tryWriteItemInner(head, pos, updatedData);
return wasSet;
}, timeout: timeout);
// Send update
_watchController?.sink.add(null);
}
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<(T?, bool)> tryWriteItemJson<T>(
T Function(dynamic) fromJson,
int pos,
T newValue,
) =>
tryWriteItem(pos, jsonEncodeBytes(newValue))
.then((out) => (jsonDecodeOptBytes(fromJson, out.$1), out.$2));
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<(T?, bool)> tryWriteItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
T newValue,
) =>
tryWriteItem(pos, newValue.writeToBuffer()).then(
(out) => ((out.$1 == null ? null : fromBuffer(out.$1!)), out.$2));
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as JSON and parses
/// the returned element as JSON
Future<void> eventualWriteItemJson<T>(int pos, T newValue,
{Duration? timeout}) =>
eventualWriteItem(pos, jsonEncodeBytes(newValue), timeout: timeout);
/// Convenience function:
/// Like eventualWriteItem but also encodes the input value as a protobuf
/// object and parses the returned element as a protobuf object
Future<void> eventualWriteItemProtobuf<T extends GeneratedMessage>(
int pos, T newValue,
{int subkey = -1, Duration? timeout}) =>
eventualWriteItem(pos, newValue.writeToBuffer(), timeout: timeout);
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as JSON
Future<void> eventualUpdateItemJson<T>(
T Function(dynamic) fromJson, int pos, Future<T> Function(T?) update,
{Duration? timeout}) =>
eventualUpdateItem(pos, jsonUpdate(fromJson, update), timeout: timeout);
/// Convenience function:
/// Like eventualUpdateItem but also encodes the input value as a protobuf
/// object
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
Future<T> Function(T?) update,
{Duration? timeout}) =>
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update),
timeout: timeout);
Future<StreamSubscription<void>> listen(
void Function() onChanged,
) =>
_listenMutex.protect(() async {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
// Cancel watches of head record
await _head._cancelWatch();
_watchController = null;
}));
});
// Start watching head record
await _head._watch();
}
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
});
////////////////////////////////////////////////////////////////
// Fields
static const maxElements = 256;
// Internal representation refreshed from head record
final _DHTShortArrayHead _head;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
// Stream of external changes
StreamController<void>? _watchController;
}

View File

@ -6,7 +6,7 @@ import 'package:bloc_tools/bloc_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:mutex/mutex.dart';
import '../../veilid_support.dart';
import '../../../veilid_support.dart';
typedef DHTShortArrayState<T> = AsyncValue<IList<T>>;
typedef DHTShortArrayBusyState<T> = BlocBusyState<DHTShortArrayState<T>>;
@ -29,7 +29,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
});
}
DHTShortArrayCubit.value({
DHTShortArrayCubit.value({
required DHTShortArray shortArray,
required T Function(List<int> data) decodeElement,
}) : _shortArray = shortArray,

View File

@ -0,0 +1,471 @@
part of 'dht_short_array.dart';
////////////////////////////////////////////////////////////////
// Internal Operations
class _DHTShortArrayHead {
_DHTShortArrayHead({required this.headRecord})
: linkedRecords = [],
index = [],
free = [],
seqs = [],
localSeqs = [] {
_calculateStride();
}
proto.DHTShortArray toProto() {
final head = proto.DHTShortArray();
head.keys.addAll(linkedRecords.map((lr) => lr.key.toProto()));
head.index.addAll(index);
head.seqs.addAll(seqs);
// Do not serialize free list, it gets recreated
// Do not serialize local seqs, they are only locally relevant
return head;
}
Future<void> close() async {
final futures = <Future<void>>[headRecord.close()];
for (final lr in linkedRecords) {
futures.add(lr.close());
}
await Future.wait(futures);
}
Future<void> delete() async {
final futures = <Future<void>>[headRecord.delete()];
for (final lr in linkedRecords) {
futures.add(lr.delete());
}
await Future.wait(futures);
}
Future<T> operate<T>(Future<T> Function(_DHTShortArrayHead) closure) async =>
// ignore: prefer_expression_function_bodies
_headMutex.protect(() async {
return closure(this);
});
Future<T?> operateWrite<T>(
Future<T> Function(_DHTShortArrayHead) closure) async {
final oldLinkedRecords = List.of(linkedRecords);
final oldIndex = List.of(index);
final oldFree = List.of(free);
final oldSeqs = List.of(seqs);
try {
final out = await _headMutex.protect(() async {
final out = await closure(this);
// Write head assuming it has been changed
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten so write should
// be considered failed
return null;
}
return out;
});
return out;
} on Exception {
// Exception means state needs to be reverted
linkedRecords = oldLinkedRecords;
index = oldIndex;
free = oldFree;
seqs = oldSeqs;
rethrow;
}
}
Future<void> operateWriteEventual(
Future<bool> Function(_DHTShortArrayHead) closure,
{Duration? timeout}) async {
late List<DHTRecord> oldLinkedRecords;
late List<int> oldIndex;
late List<int> oldFree;
late List<int> oldSeqs;
final timeoutTs = timeout == null
? null
: Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout));
try {
await _headMutex.protect(() async {
// Iterate until we have a successful element and head write
do {
// Save off old values each pass of tryWriteHead because the head
// will have changed
oldLinkedRecords = List.of(linkedRecords);
oldIndex = List.of(index);
oldFree = List.of(free);
oldSeqs = List.of(seqs);
// Try to do the element write
do {
if (timeoutTs != null) {
final now = Veilid.instance.now();
if (now >= timeoutTs) {
throw TimeoutException('timeout reached');
}
}
} while (!await closure(this));
// Try to do the head write
} while (!await _tryWriteHead());
});
} on Exception {
// Exception means state needs to be reverted
linkedRecords = oldLinkedRecords;
index = oldIndex;
free = oldFree;
seqs = oldSeqs;
rethrow;
}
}
/// Serialize and write out the current head record, possibly updating it
/// if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> _tryWriteHead() async {
final headBuffer = toProto().writeToBuffer();
final existingData = await headRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _updateHead(proto.DHTShortArray.fromBuffer(existingData));
return false;
}
return true;
}
/// Validate a new head record that has come in from the network
Future<void> _updateHead(proto.DHTShortArray head) async {
// Get the set of new linked keys and validate it
final updatedLinkedKeys = head.keys.map((p) => p.toVeilid()).toList();
final updatedIndex = List.of(head.index);
final updatedSeqs = List.of(head.seqs);
final updatedFree = _makeFreeList(updatedLinkedKeys, updatedIndex);
// See which records are actually new
final oldRecords = Map<TypedKey, DHTRecord>.fromEntries(
linkedRecords.map((lr) => MapEntry(lr.key, lr)));
final newRecords = <TypedKey, DHTRecord>{};
final sameRecords = <TypedKey, DHTRecord>{};
final updatedLinkedRecords = <DHTRecord>[];
try {
for (var n = 0; n < updatedLinkedKeys.length; n++) {
final newKey = updatedLinkedKeys[n];
final oldRecord = oldRecords[newKey];
if (oldRecord == null) {
// Open the new record
final newRecord = await _openLinkedRecord(newKey);
newRecords[newKey] = newRecord;
updatedLinkedRecords.add(newRecord);
} else {
sameRecords[newKey] = oldRecord;
updatedLinkedRecords.add(oldRecord);
}
}
} on Exception catch (_) {
// On any exception close the records we have opened
await Future.wait(newRecords.entries.map((e) => e.value.close()));
rethrow;
}
// From this point forward we should not throw an exception or everything
// is possibly invalid. Just pass the exception up it happens and the caller
// will have to delete this short array and reopen it if it can
await Future.wait(oldRecords.entries
.where((e) => !sameRecords.containsKey(e.key))
.map((e) => e.value.close()));
// Get the localseqs list from inspect results
final localReports = await [headRecord, ...updatedLinkedRecords].map((r) {
final start = (r.key == headRecord.key) ? 1 : 0;
return r
.inspect(subkeys: [ValueSubkeyRange.make(start, start + stride - 1)]);
}).wait;
final updatedLocalSeqs =
localReports.map((l) => l.localSeqs).expand((e) => e).toList();
// Make the new head cache
linkedRecords = updatedLinkedRecords;
index = updatedIndex;
free = updatedFree;
seqs = updatedSeqs;
localSeqs = updatedLocalSeqs;
}
/// Pull the latest or updated copy of the head record from the network
Future<bool> _refreshInner(
{bool forceRefresh = true, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists
final head = await headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) {
if (onlyUpdates) {
// No update
return false;
}
throw StateError('head missing during refresh');
}
await _updateHead(head);
return true;
}
void _calculateStride() {
switch (headRecord.schema) {
case DHTSchemaDFLT(oCnt: final oCnt):
if (oCnt <= 1) {
throw StateError('Invalid DFLT schema in DHTShortArray');
}
stride = oCnt - 1;
case DHTSchemaSMPL(oCnt: final oCnt, members: final members):
if (oCnt != 0 || members.length != 1 || members[0].mCnt <= 1) {
throw StateError('Invalid SMPL schema in DHTShortArray');
}
stride = members[0].mCnt - 1;
}
assert(stride <= DHTShortArray.maxElements, 'stride too long');
}
DHTRecord? getLinkedRecord(int recordNumber) {
if (recordNumber == 0) {
return headRecord;
}
recordNumber--;
if (recordNumber >= linkedRecords.length) {
return null;
}
return linkedRecords[recordNumber];
}
Future<DHTRecord> getOrCreateLinkedRecord(int recordNumber) async {
if (recordNumber == 0) {
return headRecord;
}
final pool = DHTRecordPool.instance;
recordNumber--;
while (recordNumber >= linkedRecords.length) {
// Linked records must use SMPL schema so writer can be specified
// Use the same writer as the head record
final smplWriter = headRecord.writer!;
final parent = pool.getParentRecordKey(headRecord.key);
final routingContext = headRecord.routingContext;
final crypto = headRecord.crypto;
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride)]);
final dhtCreateRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
// Reopen with SMPL writer
await dhtCreateRecord.close();
final dhtRecord = await pool.openWrite(dhtCreateRecord.key, smplWriter,
parent: parent, routingContext: routingContext, crypto: crypto);
// Add to linked records
linkedRecords.add(dhtRecord);
if (!await _tryWriteHead()) {
await _refreshInner();
}
}
return linkedRecords[recordNumber];
}
int emptyIndex() {
if (free.isNotEmpty) {
return free.removeLast();
}
if (index.length == DHTShortArray.maxElements) {
throw StateError('too many elements');
}
return index.length;
}
void freeIndex(int idx) {
free.add(idx);
// xxx: free list optimization here?
}
/// Validate the head from the DHT is properly formatted
/// and calculate the free list from it while we're here
List<int> _makeFreeList(
List<Typed<FixedEncodedString43>> linkedKeys, List<int> index) {
// Ensure nothing is duplicated in the linked keys set
final newKeys = linkedKeys.toSet();
assert(
newKeys.length <= (DHTShortArray.maxElements + (stride - 1)) ~/ stride,
'too many keys');
assert(newKeys.length == linkedKeys.length, 'duplicated linked keys');
final newIndex = index.toSet();
assert(newIndex.length <= DHTShortArray.maxElements, 'too many indexes');
assert(newIndex.length == index.length, 'duplicated index locations');
// Ensure all the index keys fit into the existing records
final indexCapacity = (linkedKeys.length + 1) * stride;
int? maxIndex;
for (final idx in newIndex) {
assert(idx >= 0 || idx < indexCapacity, 'index out of range');
if (maxIndex == null || idx > maxIndex) {
maxIndex = idx;
}
}
// Figure out which indices are free
final free = <int>[];
if (maxIndex != null) {
for (var i = 0; i < maxIndex; i++) {
if (!newIndex.contains(i)) {
free.add(i);
}
}
}
return free;
}
/// Open a linked record for reading or writing, same as the head record
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
final writer = headRecord.writer;
return (writer != null)
? await DHTRecordPool.instance.openWrite(
recordKey,
writer,
parent: headRecord.key,
routingContext: headRecord.routingContext,
)
: await DHTRecordPool.instance.openRead(
recordKey,
parent: headRecord.key,
routingContext: headRecord.routingContext,
);
}
/// Check if we know that the network has a copy of an index that is newer
/// than our local copy from looking at the seqs list in the head
bool indexNeedsRefresh(int index) {
// If our local sequence number is unknown or hasnt been written yet
// then a normal DHT operation is going to pull from the network anyway
if (localSeqs.length < index || localSeqs[index] == 0xFFFFFFFF) {
return false;
}
// If the remote sequence number record is unknown or hasnt been written
// at this index yet, then we also do not refresh at this time as it
// is the first time the index is being written to
if (seqs.length < index || seqs[index] == 0xFFFFFFFF) {
return false;
}
return localSeqs[index] < seqs[index];
}
/// Update the sequence number for a particular index in
/// our local sequence number list.
/// If a write is happening, update the network copy as well.
Future<void> updateIndexSeq(int index, bool write) async {
final recordNumber = index ~/ stride;
final record = await getOrCreateLinkedRecord(recordNumber);
final recordSubkey = (index % stride) + ((recordNumber == 0) ? 1 : 0);
final report =
await record.inspect(subkeys: [ValueSubkeyRange.single(recordSubkey)]);
while (localSeqs.length <= index) {
localSeqs.add(0xFFFFFFFF);
}
localSeqs[index] = report.localSeqs[0];
if (write) {
while (seqs.length <= index) {
seqs.add(0xFFFFFFFF);
}
seqs[index] = report.localSeqs[0];
}
}
// Watch head for changes
Future<void> _watch() async {
// This will update any existing watches if necessary
try {
await headRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
// Update changes to the head record
// Don't watch for local changes because this class already handles
// notifying listeners and knows when it makes local changes
_subscription ??=
await headRecord.listen(localChanges: false, _onUpdateHead);
} on Exception {
// If anything fails, try to cancel the watches
await _cancelWatch();
rethrow;
}
}
// Stop watching for changes to head and linked records
Future<void> _cancelWatch() async {
await headRecord.cancelWatch();
await _subscription?.cancel();
_subscription = null;
}
// Called when a head or linked record changes
Future<void> _onUpdateHead(
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys) async {
// If head record subkey zero changes, then the layout
// of the dhtshortarray has changed
var updateHead = false;
if (record == headRecord && subkeys.containsSubkey(0)) {
updateHead = true;
}
// If we have any other subkeys to update, do them first
final unord = List<Future<Uint8List?>>.empty(growable: true);
for (final skr in subkeys) {
for (var subkey = skr.low; subkey <= skr.high; subkey++) {
// Skip head subkey
if (updateHead && subkey == 0) {
continue;
}
// Get the subkey, which caches the result in the local record store
unord.add(record.get(subkey: subkey, forceRefresh: true));
}
}
await unord.wait;
// Then update the head record
if (updateHead) {
await _refreshInner(forceRefresh: false);
}
}
////////////////////////////////////////////////////////////////////////////
// Head/element mutex to ensure we keep the representation valid
final Mutex _headMutex = Mutex();
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Head DHT record
final DHTRecord headRecord;
// How many elements per linked record
late final int stride;
// List of additional records after the head record used for element data
List<DHTRecord> linkedRecords;
// Ordering of the subkey indices.
// Elements are subkey numbers. Represents the element order.
List<int> index;
// List of free subkeys for elements that have been removed.
// Used to optimize allocations.
List<int> free;
// The sequence numbers of each subkey.
// Index is by subkey number not by element index.
// (n-1 for head record and then the next n for linked records)
List<int> seqs;
// The local sequence numbers for each subkey.
List<int> localSeqs;
}

View File

@ -92,6 +92,7 @@ class DHTShortArray extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTShortArray', package: const $pb.PackageName(_omitMessageNames ? '' : 'dht'), createEmptyInstance: create)
..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'keys', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create)
..a<$core.List<$core.int>>(2, _omitFieldNames ? '' : 'index', $pb.PbFieldType.OY)
..p<$core.int>(3, _omitFieldNames ? '' : 'seqs', $pb.PbFieldType.KU3)
..hasRequiredFields = false
;
@ -127,6 +128,9 @@ class DHTShortArray extends $pb.GeneratedMessage {
$core.bool hasIndex() => $_has(1);
@$pb.TagNumber(2)
void clearIndex() => clearField(2);
@$pb.TagNumber(3)
$core.List<$core.int> get seqs => $_getList(2);
}
class DHTLog extends $pb.GeneratedMessage {

View File

@ -36,13 +36,14 @@ const DHTShortArray$json = {
'2': [
{'1': 'keys', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'keys'},
{'1': 'index', '3': 2, '4': 1, '5': 12, '10': 'index'},
{'1': 'seqs', '3': 3, '4': 3, '5': 13, '10': 'seqs'},
],
};
/// Descriptor for `DHTShortArray`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List dHTShortArrayDescriptor = $convert.base64Decode(
'Cg1ESFRTaG9ydEFycmF5EiQKBGtleXMYASADKAsyEC52ZWlsaWQuVHlwZWRLZXlSBGtleXMSFA'
'oFaW5kZXgYAiABKAxSBWluZGV4');
'oFaW5kZXgYAiABKAxSBWluZGV4EhIKBHNlcXMYAyADKA1SBHNlcXM=');
@$core.Deprecated('Use dHTLogDescriptor instead')
const DHTLog$json = {

View File

@ -12,7 +12,7 @@ part of 'identity.dart';
T _$identity<T>(T value) => value;
final _privateConstructorUsedError = UnsupportedError(
'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#custom-getters-and-methods');
'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#adding-getters-and-methods-to-our-models');
AccountRecordInfo _$AccountRecordInfoFromJson(Map<String, dynamic> json) {
return _AccountRecordInfo.fromJson(json);

View File

@ -33,6 +33,10 @@ void setVeilidLogLevel(LogLevel? level) {
Veilid.instance.changeLogLevel('all', convertToVeilidConfigLogLevel(level));
}
void changeVeilidLogIgnore(String change) {
Veilid.instance.changeLogIgnore('all', change.split(','));
}
class VeilidLoggy implements LoggyType {
@override
Loggy<VeilidLoggy> get loggy => Loggy<VeilidLoggy>('Veilid');