mirror of
synced 2025-03-23 15:46:33 -04:00
chat work
This commit is contained in:
@ -101,6 +101,8 @@ class ContactInvitationListCubit
..private = encryptedContactRequestPrivate;
// Create DHT unicast inbox for ContactRequest
// Subkey 0 is the ContactRequest from the initiator
// Subkey 1 will contain the invitation response accept/reject eventually
await (await pool.create(
parent: _activeAccountInfo.accountRecordKey,
schema: DHTSchema.smpl(oCnt: 1, members: [
@ -262,110 +264,6 @@ class ContactInvitationListCubit
return out;
// Future<InvitationStatus?> checkInvitationStatus(
// {required proto.ContactInvitationRecord contactInvitationRecord}) async {
// // Open the contact request inbox
// try {
// final pool = DHTRecordPool.instance;
// final accountRecordKey = _activeAccountInfo
// .userLogin.accountRecordInfo.accountRecord.recordKey;
// final writerKey = contactInvitationRecord.writerKey.toVeilid();
// final writerSecret = contactInvitationRecord.writerSecret.toVeilid();
// final recordKey =
// contactInvitationRecord.contactRequestInbox.recordKey.toVeilid();
// final writer = TypedKeyPair(
// kind: recordKey.kind, key: writerKey, secret: writerSecret);
// final acceptReject = await (await pool.openRead(recordKey,
// crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer),
// parent: accountRecordKey,
// defaultSubkey: 1))
// .scope((contactRequestInbox) async {
// //
// final signedContactResponse = await contactRequestInbox.getProtobuf(
// proto.SignedContactResponse.fromBuffer,
// forceRefresh: true);
// if (signedContactResponse == null) {
// return null;
// }
// final contactResponseBytes =
// Uint8List.fromList(signedContactResponse.contactResponse);
// final contactResponse =
// proto.ContactResponse.fromBuffer(contactResponseBytes);
// final contactIdentityMasterRecordKey =
// contactResponse.identityMasterRecordKey.toVeilid();
// final cs = await pool.veilid.getCryptoSystem(recordKey.kind);
// // Fetch the remote contact's account master
// final contactIdentityMaster = await openIdentityMaster(
// identityMasterRecordKey: contactIdentityMasterRecordKey);
// // Verify
// final signature = signedContactResponse.identitySignature.toVeilid();
// await cs.verify(contactIdentityMaster.identityPublicKey,
// contactResponseBytes, signature);
// // Check for rejection
// if (!contactResponse.accept) {
// return const InvitationStatus(acceptedContact: null);
// }
// // Pull profile from remote conversation key
// final remoteConversationRecordKey =
// contactResponse.remoteConversationRecordKey.toVeilid();
// final conversation = ConversationCubit(
// activeAccountInfo: _activeAccountInfo,
// remoteIdentityPublicKey:
// contactIdentityMaster.identityPublicTypedKey(),
// remoteConversationRecordKey: remoteConversationRecordKey);
// await conversation.refresh();
// final remoteConversation =
// conversation.state.data?.value.remoteConversation;
// if (remoteConversation == null) {
// log.info('Remote conversation could not be read. Waiting...');
// return null;
// }
// // Complete the local conversation now that we have the remote profile
// final localConversationRecordKey =
// contactInvitationRecord.localConversationRecordKey.toVeilid();
// return conversation.initLocalConversation(
// existingConversationRecordKey: localConversationRecordKey,
// profile: _account.profile,
// // ignore: prefer_expression_function_bodies
// callback: (localConversation) async {
// return InvitationStatus(
// acceptedContact: AcceptedContact(
// remoteProfile: remoteConversation.profile,
// remoteIdentity: contactIdentityMaster,
// remoteConversationRecordKey: remoteConversationRecordKey,
// localConversationRecordKey: localConversationRecordKey));
// });
// });
// if (acceptReject == null) {
// return null;
// }
// // Delete invitation and return the accepted or rejected contact
// await deleteInvitation(
// accepted: acceptReject.acceptedContact != null,
// contactInvitationRecord: contactInvitationRecord);
// return acceptReject;
// } on Exception catch (e) {
// log.error('Exception in checkInvitationStatus: $e', e);
// // Attempt to clean up. All this needs better lifetime management
// await deleteInvitation(
// accepted: false, contactInvitationRecord: contactInvitationRecord);
// rethrow;
// }
// }
final ActiveAccountInfo _activeAccountInfo;
final proto.Account _account;
@ -3,6 +3,7 @@ import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../proto/proto.dart' as proto;
// Watch subkey #1 of the ContactRequest record for accept/reject
class ContactRequestInboxCubit
extends DefaultDHTRecordCubit<proto.SignedContactResponse> {
@ -40,112 +41,3 @@ class ContactRequestInboxCubit
final ActiveAccountInfo activeAccountInfo;
final proto.ContactInvitationRecord contactInvitationRecord;
// Future<InvitationStatus?> checkInvitationStatus(
// {}) async {
// // Open the contact request inbox
// try {
// final pool = DHTRecordPool.instance;
// final accountRecordKey = _activeAccountInfo
// .userLogin.accountRecordInfo.accountRecord.recordKey;
// final writerKey = contactInvitationRecord.writerKey.toVeilid();
// final writerSecret = contactInvitationRecord.writerSecret.toVeilid();
// final recordKey =
// contactInvitationRecord.contactRequestInbox.recordKey.toVeilid();
// final writer = TypedKeyPair(
// kind: recordKey.kind, key: writerKey, secret: writerSecret);
// final acceptReject = await (await pool.openRead(recordKey,
// crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer),
// parent: accountRecordKey,
// defaultSubkey: 1))
// .scope((contactRequestInbox) async {
// //
// final signedContactResponse = await contactRequestInbox.getProtobuf(
// proto.SignedContactResponse.fromBuffer,
// forceRefresh: true);
// if (signedContactResponse == null) {
// return null;
// }
// final contactResponseBytes =
// Uint8List.fromList(signedContactResponse.contactResponse);
// final contactResponse =
// proto.ContactResponse.fromBuffer(contactResponseBytes);
// final contactIdentityMasterRecordKey =
// contactResponse.identityMasterRecordKey.toVeilid();
// final cs = await pool.veilid.getCryptoSystem(recordKey.kind);
// // Fetch the remote contact's account master
// final contactIdentityMaster = await openIdentityMaster(
// identityMasterRecordKey: contactIdentityMasterRecordKey);
// // Verify
// final signature = signedContactResponse.identitySignature.toVeilid();
// await cs.verify(contactIdentityMaster.identityPublicKey,
// contactResponseBytes, signature);
// // Check for rejection
// if (!contactResponse.accept) {
// return const InvitationStatus(acceptedContact: null);
// }
// // Pull profile from remote conversation key
// final remoteConversationRecordKey =
// contactResponse.remoteConversationRecordKey.toVeilid();
// final conversation = ConversationCubit(
// activeAccountInfo: _activeAccountInfo,
// remoteIdentityPublicKey:
// contactIdentityMaster.identityPublicTypedKey(),
// remoteConversationRecordKey: remoteConversationRecordKey);
// await conversation.refresh();
// final remoteConversation =
// conversation.state.data?.value.remoteConversation;
// if (remoteConversation == null) {
// log.info('Remote conversation could not be read. Waiting...');
// return null;
// }
// // Complete the local conversation now that we have the remote profile
// final localConversationRecordKey =
// contactInvitationRecord.localConversationRecordKey.toVeilid();
// return conversation.initLocalConversation(
// existingConversationRecordKey: localConversationRecordKey,
// profile: _account.profile,
// // ignore: prefer_expression_function_bodies
// callback: (localConversation) async {
// return InvitationStatus(
// acceptedContact: AcceptedContact(
// remoteProfile: remoteConversation.profile,
// remoteIdentity: contactIdentityMaster,
// remoteConversationRecordKey: remoteConversationRecordKey,
// localConversationRecordKey: localConversationRecordKey));
// });
// });
// if (acceptReject == null) {
// return null;
// }
// // Delete invitation and return the accepted or rejected contact
// await deleteInvitation(
// accepted: acceptReject.acceptedContact != null,
// contactInvitationRecord: contactInvitationRecord);
// return acceptReject;
// } on Exception catch (e) {
// log.error('Exception in checkInvitationStatus: $e', e);
// // Attempt to clean up. All this needs better lifetime management
// await deleteInvitation(
// accepted: false, contactInvitationRecord: contactInvitationRecord);
// rethrow;
// }
@ -108,114 +108,3 @@ class WaitingInvitationCubit extends AsyncTransformerCubit<InvitationStatus,
// Future<InvitationStatus?> checkInvitationStatus(
// {}) async {
// // Open the contact request inbox
// try {
// final pool = DHTRecordPool.instance;
// final accountRecordKey = _activeAccountInfo
// .userLogin.accountRecordInfo.accountRecord.recordKey;
// final writerKey = contactInvitationRecord.writerKey.toVeilid();
// final writerSecret = contactInvitationRecord.writerSecret.toVeilid();
// final recordKey =
// contactInvitationRecord.contactRequestInbox.recordKey.toVeilid();
// final writer = TypedKeyPair(
// kind: recordKey.kind, key: writerKey, secret: writerSecret);
// final acceptReject = await (await pool.openRead(recordKey,
// crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer),
// parent: accountRecordKey,
// defaultSubkey: 1))
// .scope((contactRequestInbox) async {
// //
// final signedContactResponse = await contactRequestInbox.getProtobuf(
// proto.SignedContactResponse.fromBuffer,
// forceRefresh: true);
// if (signedContactResponse == null) {
// return null;
// }
// final contactResponseBytes =
// Uint8List.fromList(signedContactResponse.contactResponse);
// final contactResponse =
// proto.ContactResponse.fromBuffer(contactResponseBytes);
// final contactIdentityMasterRecordKey =
// contactResponse.identityMasterRecordKey.toVeilid();
// final cs = await pool.veilid.getCryptoSystem(recordKey.kind);
// // Fetch the remote contact's account master
// final contactIdentityMaster = await openIdentityMaster(
// identityMasterRecordKey: contactIdentityMasterRecordKey);
// // Verify
// final signature = signedContactResponse.identitySignature.toVeilid();
// await cs.verify(contactIdentityMaster.identityPublicKey,
// contactResponseBytes, signature);
// // Check for rejection
// if (!contactResponse.accept) {
// return const InvitationStatus(acceptedContact: null);
// }
// // Pull profile from remote conversation key
// final remoteConversationRecordKey =
// contactResponse.remoteConversationRecordKey.toVeilid();
// final conversation = ConversationCubit(
// activeAccountInfo: _activeAccountInfo,
// remoteIdentityPublicKey:
// contactIdentityMaster.identityPublicTypedKey(),
// remoteConversationRecordKey: remoteConversationRecordKey);
// await conversation.refresh();
// final remoteConversation =
// conversation.state.data?.value.remoteConversation;
// if (remoteConversation == null) {
// log.info('Remote conversation could not be read. Waiting...');
// return null;
// }
// // Complete the local conversation now that we have the remote profile
// final localConversationRecordKey =
// contactInvitationRecord.localConversationRecordKey.toVeilid();
// return conversation.initLocalConversation(
// existingConversationRecordKey: localConversationRecordKey,
// profile: _account.profile,
// // ignore: prefer_expression_function_bodies
// callback: (localConversation) async {
// return InvitationStatus(
// acceptedContact: AcceptedContact(
// remoteProfile: remoteConversation.profile,
// remoteIdentity: contactIdentityMaster,
// remoteConversationRecordKey: remoteConversationRecordKey,
// localConversationRecordKey: localConversationRecordKey));
// });
// });
// if (acceptReject == null) {
// return null;
// }
// // Delete invitation and return the accepted or rejected contact
// await deleteInvitation(
// accepted: acceptReject.acceptedContact != null,
// contactInvitationRecord: contactInvitationRecord);
// return acceptReject;
// } on Exception catch (e) {
// log.error('Exception in checkInvitationStatus: $e', e);
// // Attempt to clean up. All this needs better lifetime management
// await deleteInvitation(
// accepted: false, contactInvitationRecord: contactInvitationRecord);
// rethrow;
// }
@ -32,6 +32,7 @@ class ValidContactInvitation {
final pool = DHTRecordPool.instance;
try {
// Ensure we don't delete this if we're trying to chat to self
// The initiating side will delete the records in deleteInvitation()
final isSelf = _contactIdentityMaster.identityPublicKey ==
final accountRecordKey = _activeAccountInfo.accountRecordKey;
@ -120,7 +120,7 @@ class ProcessorRepository {
void processUpdateValueChange(VeilidUpdateValueChange updateValueChange) {
// Send value updates to DHTRecordPool
@ -161,8 +161,8 @@ class DHTRecord {
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
// Set the new data if possible
var newValueData =
await _routingContext.setDHTValue(key, subkey, encryptedNewValue);
var newValueData = await _routingContext
.setDHTValue(key, subkey, encryptedNewValue, writer: _writer);
if (newValueData == null) {
// A newer value wasn't found on the set, but
// we may get a newer value when getting the value for the sequence number
@ -181,7 +181,7 @@ class DHTRecord {
// if so, shortcut and don't bother decrypting it
if (newValueData.data.equals(encryptedNewValue)) {
if (isUpdated) {
_addLocalValueChange(newValue, subkey);
DHTRecordPool.instance.processLocalValueChange(key, newValue, subkey);
return null;
@ -189,7 +189,8 @@ class DHTRecord {
// Decrypt value to return it
final decryptedNewValue = await _crypto.decrypt(newValueData.data, subkey);
if (isUpdated) {
_addLocalValueChange(decryptedNewValue, subkey);
.processLocalValueChange(key, decryptedNewValue, subkey);
return decryptedNewValue;
@ -203,8 +204,8 @@ class DHTRecord {
do {
do {
// Set the new data
newValueData =
await _routingContext.setDHTValue(key, subkey, encryptedNewValue);
newValueData = await _routingContext
.setDHTValue(key, subkey, encryptedNewValue, writer: _writer);
// Repeat if newer data on the network was found
} while (newValueData != null);
@ -226,7 +227,7 @@ class DHTRecord {
final isUpdated = newValueData.seq != lastSeq;
if (isUpdated) {
_addLocalValueChange(newValue, subkey);
DHTRecordPool.instance.processLocalValueChange(key, newValue, subkey);
@ -356,7 +357,7 @@ class DHTRecord {
if (watchedSubkeys == null) {
// Report all subkeys
DHTRecordWatchChange(local: false, data: data, subkeys: subkeys));
DHTRecordWatchChange(local: local, data: data, subkeys: subkeys));
} else {
// Only some subkeys are being watched, see if the reported update
// overlaps the subkeys being watched
@ -382,7 +383,7 @@ class DHTRecord {
local: true, data: data, subkeys: [ValueSubkeyRange.single(subkey)]);
void addRemoteValueChange(VeilidUpdateValueChange update) {
void _addRemoteValueChange(VeilidUpdateValueChange update) {
local: false, data: update.valueData.data, subkeys: update.subkeys);
@ -9,12 +9,14 @@ import '../../veilid_support.dart';
typedef InitialStateFunction<T> = Future<T?> Function(DHTRecord);
typedef StateFunction<T> = Future<T?> Function(
DHTRecord, List<ValueSubkeyRange>, Uint8List?);
typedef WatchFunction = Future<void> Function(DHTRecord);
class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
required Future<DHTRecord> Function() open,
required InitialStateFunction<T> initialStateFunction,
required StateFunction<T> stateFunction,
required WatchFunction watchFunction,
}) : _wantsCloseRecord = false,
_stateFunction = stateFunction,
super(const AsyncValue.loading()) {
@ -22,7 +24,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
// Do record open/create
_record = await open();
_wantsCloseRecord = true;
await _init(initialStateFunction, stateFunction);
await _init(initialStateFunction, stateFunction, watchFunction);
@ -30,18 +32,20 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
required DHTRecord record,
required InitialStateFunction<T> initialStateFunction,
required StateFunction<T> stateFunction,
required WatchFunction watchFunction,
}) : _record = record,
_stateFunction = stateFunction,
_wantsCloseRecord = false,
super(const AsyncValue.loading()) {
Future.delayed(Duration.zero, () async {
await _init(initialStateFunction, stateFunction);
await _init(initialStateFunction, stateFunction, watchFunction);
Future<void> _init(
InitialStateFunction<T> initialStateFunction,
StateFunction<T> stateFunction,
WatchFunction watchFunction,
) async {
// Make initial state update
try {
@ -63,10 +67,13 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
await watchFunction(_record);
Future<void> close() async {
await _record.cancelWatch();
await _subscription?.cancel();
_subscription = null;
if (_wantsCloseRecord) {
@ -113,15 +120,16 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
required T Function(List<int> data) decodeState,
}) : super(
initialStateFunction: _makeInitialStateFunction(decodeState),
stateFunction: _makeStateFunction(decodeState));
stateFunction: _makeStateFunction(decodeState),
watchFunction: _makeWatchFunction());
required super.record,
required T Function(List<int> data) decodeState,
}) : super.value(
initialStateFunction: _makeInitialStateFunction(decodeState),
stateFunction: _makeStateFunction(decodeState),
initialStateFunction: _makeInitialStateFunction(decodeState),
stateFunction: _makeStateFunction(decodeState),
watchFunction: _makeWatchFunction());
static InitialStateFunction<T> _makeInitialStateFunction<T>(
T Function(List<int> data) decodeState) =>
@ -155,6 +163,11 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
return null;
static WatchFunction _makeWatchFunction() => (record) async {
final defaultSubkey = record.subkeyOrDefault(-1);
await record.watch(subkeys: [ValueSubkeyRange.single(defaultSubkey)]);
Future<void> refreshDefault() async {
final defaultSubkey = _record.subkeyOrDefault(-1);
await refresh([ValueSubkeyRange(low: defaultSubkey, high: defaultSubkey)]);
@ -70,7 +70,6 @@ class SharedDHTRecordData {
KeyPair? defaultWriter;
VeilidRoutingContext defaultRoutingContext;
Map<int, int> subkeySeqCache = {};
bool inWatchStateUpdate = false;
bool needsWatchStateUpdate = false;
@ -233,42 +232,45 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
Future<void> delete(TypedKey recordKey) async {
// Collect all dependencies (including the record itself)
final allDeps = <TypedKey>[];
final currentDeps = [recordKey];
while (currentDeps.isNotEmpty) {
final nextDep = currentDeps.removeLast();
// Remove this child from its parent
await _removeDependencyInner(nextDep);
final childDeps =
_state.childrenByParent[nextDep.toJson()]?.toList() ?? [];
// Delete all dependent records in parallel (including the record itself)
final allDeleteFutures = <Future<void>>[];
final allCloseFutures = <Future<void>>[];
final allDeletedRecords = <DHTRecord>{};
for (final dep in allDeps) {
// If record is opened, close it first
final openinfo = _opened[dep];
if (openinfo != null) {
for (final rec in openinfo.records) {
final allDeletedRecordKeys = <TypedKey>[];
await _mutex.protect(() async {
// Collect all dependencies (including the record itself)
final allDeps = <TypedKey>[];
final currentDeps = [recordKey];
while (currentDeps.isNotEmpty) {
final nextDep = currentDeps.removeLast();
// Remove this child from its parent
await _removeDependencyInner(nextDep);
final childDeps =
_state.childrenByParent[nextDep.toJson()]?.toList() ?? [];
// Then delete
await Future.wait(allCloseFutures);
await Future.wait(allDeleteFutures);
// Delete all dependent records in parallel (including the record itself)
for (final dep in allDeps) {
// If record is opened, close it first
final openinfo = _opened[dep];
if (openinfo != null) {
for (final rec in openinfo.records) {
// Then delete
await Future.wait(allDeletedRecords.map((r) => r.close()));
for (final deletedRecord in allDeletedRecords) {
await Future.wait(
void _validateParent(TypedKey? parent, TypedKey child) {
@ -454,14 +456,27 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
return _state.parentByChild[childJson];
/// Handle the DHT record updates coming from internal to this app
void processLocalValueChange(TypedKey key, Uint8List data, int subkey) {
// Change
for (final kv in _opened.entries) {
if (kv.key == key) {
for (final rec in kv.value.records) {
rec._addLocalValueChange(data, subkey);
/// Handle the DHT record updates coming from Veilid
void processUpdateValueChange(VeilidUpdateValueChange updateValueChange) {
void processRemoteValueChange(VeilidUpdateValueChange updateValueChange) {
if (updateValueChange.subkeys.isNotEmpty) {
// Change
for (final kv in _opened.entries) {
if (kv.key == updateValueChange.key) {
for (final rec in kv.value.records) {
@ -569,61 +584,56 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
inTick = true;
try {
// See if any opened records need watch state changes
final unord = <Future<void>>[];
final unord = <Future<void> Function()>[];
for (final kv in _opened.entries) {
final openedRecordKey = kv.key;
final openedRecordInfo = kv.value;
final dhtctx = openedRecordInfo.shared.defaultRoutingContext;
await _mutex.protect(() async {
for (final kv in _opened.entries) {
final openedRecordKey = kv.key;
final openedRecordInfo = kv.value;
final dhtctx = openedRecordInfo.shared.defaultRoutingContext;
// Check if already updating
if (openedRecordInfo.shared.inWatchStateUpdate) {
if (openedRecordInfo.shared.needsWatchStateUpdate) {
final watchState =
if (openedRecordInfo.shared.needsWatchStateUpdate) {
openedRecordInfo.shared.inWatchStateUpdate = true;
// Apply watch changes for record
if (watchState == null) {
unord.add(() async {
// Record needs watch cancel
try {
await dhtctx.cancelDHTWatch(openedRecordKey);
openedRecordInfo.shared.needsWatchStateUpdate = false;
} on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick
} else {
unord.add(() async {
// Record needs new watch
try {
final realExpiration = await dhtctx.watchDHTValues(
subkeys: watchState.subkeys?.toList(),
count: watchState.count,
expiration: watchState.expiration);
final watchState = _collectUnionWatchState(openedRecordInfo.records);
// Apply watch changes for record
if (watchState == null) {
unord.add(() async {
// Record needs watch cancel
try {
final done = await dhtctx.cancelDHTWatch(openedRecordKey);
'should always be done when cancelling whole subkey range');
openedRecordInfo.shared.needsWatchStateUpdate = false;
} on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick
openedRecordInfo.shared.inWatchStateUpdate = false;
} else {
unord.add(() async {
// Record needs new watch
try {
final realExpiration = await dhtctx.watchDHTValues(
subkeys: watchState.subkeys?.toList(),
count: watchState.count,
expiration: watchState.expiration);
openedRecordInfo.shared.needsWatchStateUpdate = false;
// Update watch states with real expiration
openedRecordInfo.records, realExpiration);
} on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick
openedRecordInfo.shared.inWatchStateUpdate = false;
// Update watch states with real expiration
if (realExpiration.value != BigInt.zero) {
openedRecordInfo.shared.needsWatchStateUpdate = false;
openedRecordInfo.records, realExpiration);
} on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick
await unord.wait;
// Process all watch changes
await unord.map((f) => f()).wait;
} finally {
inTick = false;
@ -30,7 +30,12 @@ class _DHTShortArrayCache {
class DHTShortArray {
// Constructors
DHTShortArray._({required DHTRecord headRecord})
: _headRecord = headRecord,
_head = _DHTShortArrayCache(),
@ -53,22 +58,6 @@ class DHTShortArray {
_stride = stride;
static const maxElements = 256;
// Head DHT record
final DHTRecord _headRecord;
late final int _stride;
// Cached representation refreshed from head record
_DHTShortArrayCache _head;
// Subscription to head and linked record internal changes
final Map<TypedKey, StreamSubscription<DHTRecordWatchChange>> _subscriptions;
// Stream of external changes
StreamController<void>? _watchController;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex;
// Create a DHTShortArray
// if smplWriter is specified, uses a SMPL schema with a single writer
// rather than the key owner
@ -163,148 +152,11 @@ class DHTShortArray {
crypto: crypto,
// Public API
DHTRecord get record => _headRecord;
/// Serialize and write out the current head record, possibly updating it
/// if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> _tryWriteHead() async {
final head = _head.toProto();
final headBuffer = head.writeToBuffer();
final existingData = await _headRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _newHead(proto.DHTShortArray.fromBuffer(existingData));
return false;
return true;
/// Validate the head from the DHT is properly formatted
/// and calculate the free list from it while we're here
List<int> _validateHeadCacheData(
List<Typed<FixedEncodedString43>> linkedKeys, List<int> index) {
// Ensure nothing is duplicated in the linked keys set
final newKeys = linkedKeys.toSet();
assert(newKeys.length <= (maxElements + (_stride - 1)) ~/ _stride,
'too many keys');
assert(newKeys.length == linkedKeys.length, 'duplicated linked keys');
final newIndex = index.toSet();
assert(newIndex.length <= maxElements, 'too many indexes');
assert(newIndex.length == index.length, 'duplicated index locations');
// Ensure all the index keys fit into the existing records
final indexCapacity = (linkedKeys.length + 1) * _stride;
int? maxIndex;
for (final idx in newIndex) {
assert(idx >= 0 || idx < indexCapacity, 'index out of range');
if (maxIndex == null || idx > maxIndex) {
maxIndex = idx;
final free = <int>[];
if (maxIndex != null) {
for (var i = 0; i < maxIndex; i++) {
if (!newIndex.contains(i)) {
return free;
/// Open a linked record for reading or writing, same as the head record
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecordPool.instance.openWrite(
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
: await DHTRecordPool.instance.openRead(
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
/// Validate a new head record
Future<void> _newHead(proto.DHTShortArray head) async {
// Get the set of new linked keys and validate it
final linkedKeys = head.keys.map((p) => p.toVeilid()).toList();
final index = head.index;
final free = _validateHeadCacheData(linkedKeys, index);
// See which records are actually new
final oldRecords = Map<TypedKey, DHTRecord>.fromEntries(
_head.linkedRecords.map((lr) => MapEntry(lr.key, lr)));
final newRecords = <TypedKey, DHTRecord>{};
final sameRecords = <TypedKey, DHTRecord>{};
try {
for (var n = 0; n < linkedKeys.length; n++) {
final newKey = linkedKeys[n];
final oldRecord = oldRecords[newKey];
if (oldRecord == null) {
// Open the new record
final newRecord = await _openLinkedRecord(newKey);
newRecords[newKey] = newRecord;
} else {
sameRecords[newKey] = oldRecord;
} on Exception catch (_) {
// On any exception close the records we have opened
await Future.wait(newRecords.entries.map((e) => e.value.close()));
// From this point forward we should not throw an exception or everything
// is possibly invalid. Just pass the exception up it happens and the caller
// will have to delete this short array and reopen it if it can
await Future.wait(oldRecords.entries
.where((e) => !sameRecords.containsKey(e.key))
.map((e) => e.value.close()));
// Figure out which indices are free
// Make the new head cache
_head = _DHTShortArrayCache()
linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!))
// Update watch if we have one in case linked records have been added
if (_watchController != null) {
await _watchAllRecords();
/// Pull the latest or updated copy of the head record from the network
Future<bool> _refreshHead(
{bool forceRefresh = true, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) {
if (onlyUpdates) {
// No update
return false;
throw StateError('head missing during refresh');
await _newHead(head);
return true;
int get length => _head.index.length;
Future<void> close() async {
await _watchController?.close();
@ -345,71 +197,6 @@ class DHTShortArray {
DHTRecord? _getLinkedRecord(int recordNumber) {
if (recordNumber == 0) {
return _headRecord;
if (recordNumber >= _head.linkedRecords.length) {
return null;
return _head.linkedRecords[recordNumber];
Future<DHTRecord> _getOrCreateLinkedRecord(int recordNumber) async {
if (recordNumber == 0) {
return _headRecord;
final pool = DHTRecordPool.instance;
while (recordNumber >= _head.linkedRecords.length) {
// Linked records must use SMPL schema so writer can be specified
// Use the same writer as the head record
final smplWriter = _headRecord.writer!;
final parent = pool.getParentRecordKey(_headRecord.key);
final routingContext = _headRecord.routingContext;
final crypto = _headRecord.crypto;
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: _stride)]);
final dhtCreateRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
// Reopen with SMPL writer
await dhtCreateRecord.close();
final dhtRecord = await pool.openWrite(dhtCreateRecord.key, smplWriter,
parent: parent, routingContext: routingContext, crypto: crypto);
// Add to linked records
if (!await _tryWriteHead()) {
await _refreshHead();
return _head.linkedRecords[recordNumber];
int _emptyIndex() {
if (_head.free.isNotEmpty) {
return _head.free.removeLast();
if (_head.index.length == maxElements) {
throw StateError('too many elements');
return _head.index.length;
void _freeIndex(int idx) {
// xxx: free list optimization here?
int get length => _head.index.length;
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
@ -679,6 +466,209 @@ class DHTShortArray {
) =>
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update));
// Internal Operations
DHTRecord? _getLinkedRecord(int recordNumber) {
if (recordNumber == 0) {
return _headRecord;
if (recordNumber >= _head.linkedRecords.length) {
return null;
return _head.linkedRecords[recordNumber];
Future<DHTRecord> _getOrCreateLinkedRecord(int recordNumber) async {
if (recordNumber == 0) {
return _headRecord;
final pool = DHTRecordPool.instance;
while (recordNumber >= _head.linkedRecords.length) {
// Linked records must use SMPL schema so writer can be specified
// Use the same writer as the head record
final smplWriter = _headRecord.writer!;
final parent = pool.getParentRecordKey(_headRecord.key);
final routingContext = _headRecord.routingContext;
final crypto = _headRecord.crypto;
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: _stride)]);
final dhtCreateRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
// Reopen with SMPL writer
await dhtCreateRecord.close();
final dhtRecord = await pool.openWrite(dhtCreateRecord.key, smplWriter,
parent: parent, routingContext: routingContext, crypto: crypto);
// Add to linked records
if (!await _tryWriteHead()) {
await _refreshHead();
return _head.linkedRecords[recordNumber];
int _emptyIndex() {
if (_head.free.isNotEmpty) {
return _head.free.removeLast();
if (_head.index.length == maxElements) {
throw StateError('too many elements');
return _head.index.length;
void _freeIndex(int idx) {
// xxx: free list optimization here?
/// Serialize and write out the current head record, possibly updating it
/// if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> _tryWriteHead() async {
final head = _head.toProto();
final headBuffer = head.writeToBuffer();
final existingData = await _headRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _newHead(proto.DHTShortArray.fromBuffer(existingData));
return false;
return true;
/// Validate the head from the DHT is properly formatted
/// and calculate the free list from it while we're here
List<int> _validateHeadCacheData(
List<Typed<FixedEncodedString43>> linkedKeys, List<int> index) {
// Ensure nothing is duplicated in the linked keys set
final newKeys = linkedKeys.toSet();
assert(newKeys.length <= (maxElements + (_stride - 1)) ~/ _stride,
'too many keys');
assert(newKeys.length == linkedKeys.length, 'duplicated linked keys');
final newIndex = index.toSet();
assert(newIndex.length <= maxElements, 'too many indexes');
assert(newIndex.length == index.length, 'duplicated index locations');
// Ensure all the index keys fit into the existing records
final indexCapacity = (linkedKeys.length + 1) * _stride;
int? maxIndex;
for (final idx in newIndex) {
assert(idx >= 0 || idx < indexCapacity, 'index out of range');
if (maxIndex == null || idx > maxIndex) {
maxIndex = idx;
final free = <int>[];
if (maxIndex != null) {
for (var i = 0; i < maxIndex; i++) {
if (!newIndex.contains(i)) {
return free;
/// Open a linked record for reading or writing, same as the head record
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecordPool.instance.openWrite(
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
: await DHTRecordPool.instance.openRead(
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
/// Validate a new head record
Future<void> _newHead(proto.DHTShortArray head) async {
// Get the set of new linked keys and validate it
final linkedKeys = head.keys.map((p) => p.toVeilid()).toList();
final index = head.index;
final free = _validateHeadCacheData(linkedKeys, index);
// See which records are actually new
final oldRecords = Map<TypedKey, DHTRecord>.fromEntries(
_head.linkedRecords.map((lr) => MapEntry(lr.key, lr)));
final newRecords = <TypedKey, DHTRecord>{};
final sameRecords = <TypedKey, DHTRecord>{};
try {
for (var n = 0; n < linkedKeys.length; n++) {
final newKey = linkedKeys[n];
final oldRecord = oldRecords[newKey];
if (oldRecord == null) {
// Open the new record
final newRecord = await _openLinkedRecord(newKey);
newRecords[newKey] = newRecord;
} else {
sameRecords[newKey] = oldRecord;
} on Exception catch (_) {
// On any exception close the records we have opened
await Future.wait(newRecords.entries.map((e) => e.value.close()));
// From this point forward we should not throw an exception or everything
// is possibly invalid. Just pass the exception up it happens and the caller
// will have to delete this short array and reopen it if it can
await Future.wait(oldRecords.entries
.where((e) => !sameRecords.containsKey(e.key))
.map((e) => e.value.close()));
// Figure out which indices are free
// Make the new head cache
_head = _DHTShortArrayCache()
linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!))
// Update watch if we have one in case linked records have been added
if (_watchController != null) {
await _watchAllRecords();
/// Pull the latest or updated copy of the head record from the network
Future<bool> _refreshHead(
{bool forceRefresh = true, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) {
if (onlyUpdates) {
// No update
return false;
throw StateError('head missing during refresh');
await _newHead(head);
return true;
// Watch head and all linked records
Future<void> _watchAllRecords() async {
// This will update any existing watches if necessary
@ -719,7 +709,7 @@ class DHTShortArray {
// Called when a head or linked record changes
Future<void> _onUpdateRecord(
DHTRecord record, Uint8List data, List<ValueSubkeyRange> subkeys) async {
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys) async {
// If head record subkey zero changes, then the layout
// of the dhtshortarray has changed
var updateHead = false;
@ -772,4 +762,23 @@ class DHTShortArray {
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
// Fields
static const maxElements = 256;
// Head DHT record
final DHTRecord _headRecord;
late final int _stride;
// Cached representation refreshed from head record
_DHTShortArrayCache _head;
// Subscription to head and linked record internal changes
final Map<TypedKey, StreamSubscription<DHTRecordWatchChange>> _subscriptions;
// Stream of external changes
StreamController<void>? _watchController;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex;
@ -66,18 +66,20 @@ class DHTShortArrayCubit<T> extends Cubit<AsyncValue<IList<T>>> {
// Keep updating until we don't want to update any more
// Because this is async, we could get an update while we're
// still processing the last one
do {
_wantsUpdate = false;
try {
final initialState = await _getElements();
} on Exception catch (e) {
} while (_wantsUpdate);
// Note that this update future has finished
_isUpdating = false;
try {
do {
_wantsUpdate = false;
try {
final initialState = await _getElements();
} on Exception catch (e) {
} while (_wantsUpdate);
} finally {
// Note that this update future has finished
_isUpdating = false;
Reference in New Issue
Block a user