more refactor and dhtrecord multiple-open support

This commit is contained in:
Christien Rioux 2024-02-24 22:27:59 -05:00
parent c4c7b264aa
commit e262b0f777
19 changed files with 782 additions and 419 deletions

View File

@ -147,7 +147,7 @@ class ContactInvitationListCubit
Future<void> deleteInvitation(
{required bool accepted,
required proto.ContactInvitationRecord contactInvitationRecord}) async {
required TypedKey contactRequestInboxRecordKey}) async {
final pool = DHTRecordPool.instance;
final accountRecordKey =
_activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
@ -159,14 +159,11 @@ class ContactInvitationListCubit
if (item == null) {
throw Exception('Failed to get contact invitation record');
}
if (item.contactRequestInbox.recordKey ==
contactInvitationRecord.contactRequestInbox.recordKey) {
if (item.contactRequestInbox.recordKey.toVeilid() ==
contactRequestInboxRecordKey) {
await shortArray.tryRemoveItem(i);
break;
}
}
await (await pool.openOwned(
contactInvitationRecord.contactRequestInbox.toVeilid(),
await (await pool.openOwned(item.contactRequestInbox.toVeilid(),
parent: accountRecordKey))
.scope((contactRequestInbox) async {
// Wipe out old invitation so it shows up as invalid
@ -174,11 +171,13 @@ class ContactInvitationListCubit
await contactRequestInbox.delete();
});
if (!accepted) {
await (await pool.openRead(
contactInvitationRecord.localConversationRecordKey.toVeilid(),
await (await pool.openRead(item.localConversationRecordKey.toVeilid(),
parent: accountRecordKey))
.delete();
}
return;
}
}
}
Future<ValidContactInvitation?> validateInvitation(

View File

@ -10,7 +10,7 @@ import 'cubits.dart';
typedef WaitingInvitationsBlocMapState
= BlocMapState<TypedKey, AsyncValue<InvitationStatus>>;
// Map of contactInvitationListRecordKey to WaitingInvitationCubit
// Map of contactRequestInboxRecordKey to WaitingInvitationCubit
// Wraps a contact invitation cubit to watch for accept/reject
// Automatically follows the state of a ContactInvitationListCubit.
class WaitingInvitationsBlocMapCubit extends BlocMapCubit<TypedKey,
@ -20,6 +20,7 @@ class WaitingInvitationsBlocMapCubit extends BlocMapCubit<TypedKey,
TypedKey, proto.ContactInvitationRecord> {
WaitingInvitationsBlocMapCubit(
{required this.activeAccountInfo, required this.account});
Future<void> addWaitingInvitation(
{required proto.ContactInvitationRecord
contactInvitationRecord}) async =>

View File

@ -53,7 +53,9 @@ class ContactInvitationItemWidget extends StatelessWidget {
context.read<ContactInvitationListCubit>();
await contactInvitationListCubit.deleteInvitation(
accepted: false,
contactInvitationRecord: contactInvitationRecord);
contactRequestInboxRecordKey: contactInvitationRecord
.contactRequestInbox.recordKey
.toVeilid());
},
backgroundColor: scale.tertiaryScale.background,
foregroundColor: scale.tertiaryScale.text,

View File

@ -1,8 +1,10 @@
import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:provider/provider.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../../account_manager/account_manager.dart';
import '../../../chat/chat.dart';
@ -13,65 +15,143 @@ import '../../../router/router.dart';
import '../../../tools/tools.dart';
class HomeAccountReadyShell extends StatefulWidget {
const HomeAccountReadyShell({required this.child, super.key});
@override
HomeAccountReadyShellState createState() => HomeAccountReadyShellState();
final Widget child;
}
class HomeAccountReadyShellState extends State<HomeAccountReadyShell> {
//
@override
void initState() {
super.initState();
}
@override
Widget build(BuildContext context) {
// These must be valid already before making this widget,
// per the ShellRoute above it
factory HomeAccountReadyShell(
{required BuildContext context, required Widget child, Key? key}) {
// These must exist in order for the account to
// be considered 'ready' for this widget subtree
final activeLocalAccount = context.read<ActiveLocalAccountCubit>().state!;
final accountInfo =
AccountRepository.instance.getAccountInfo(activeLocalAccount);
final activeAccountInfo = accountInfo.activeAccountInfo!;
final routerCubit = context.read<RouterCubit>();
return Provider<ActiveAccountInfo>.value(
value: activeAccountInfo,
return HomeAccountReadyShell._(
activeLocalAccount: activeLocalAccount,
accountInfo: accountInfo,
activeAccountInfo: activeAccountInfo,
routerCubit: routerCubit,
key: key,
child: child);
}
const HomeAccountReadyShell._(
{required this.activeLocalAccount,
required this.accountInfo,
required this.activeAccountInfo,
required this.routerCubit,
required this.child,
super.key});
@override
HomeAccountReadyShellState createState() => HomeAccountReadyShellState();
final Widget child;
final TypedKey activeLocalAccount;
final AccountInfo accountInfo;
final ActiveAccountInfo activeAccountInfo;
final RouterCubit routerCubit;
@override
void debugFillProperties(DiagnosticPropertiesBuilder properties) {
super.debugFillProperties(properties);
properties
..add(DiagnosticsProperty<TypedKey>(
'activeLocalAccount', activeLocalAccount))
..add(DiagnosticsProperty<AccountInfo>('accountInfo', accountInfo))
..add(DiagnosticsProperty<ActiveAccountInfo>(
'activeAccountInfo', activeAccountInfo))
..add(DiagnosticsProperty<RouterCubit>('routerCubit', routerCubit));
}
}
class HomeAccountReadyShellState extends State<HomeAccountReadyShell> {
final SingleStateProcessor<WaitingInvitationsBlocMapState>
_singleInvitationStatusProcessor = SingleStateProcessor();
@override
void initState() {
super.initState();
}
// Process all accepted or rejected invitations
void _invitationStatusListener(
BuildContext context, WaitingInvitationsBlocMapState state) {
_singleInvitationStatusProcessor.updateState(state,
closure: (newState) async {
final contactListCubit = context.read<ContactListCubit>();
final contactInvitationListCubit =
context.read<ContactInvitationListCubit>();
for (final entry in newState.entries) {
final contactRequestInboxRecordKey = entry.key;
final invStatus = entry.value.data?.value;
// Skip invitations that have not yet been accepted or rejected
if (invStatus == null) {
continue;
}
// Delete invitation and process the accepted or rejected contact
final acceptedContact = invStatus.acceptedContact;
if (acceptedContact != null) {
await contactInvitationListCubit.deleteInvitation(
accepted: true,
contactRequestInboxRecordKey: contactRequestInboxRecordKey);
// Accept
await contactListCubit.createContact(
remoteProfile: acceptedContact.remoteProfile,
remoteIdentity: acceptedContact.remoteIdentity,
remoteConversationRecordKey:
acceptedContact.remoteConversationRecordKey,
localConversationRecordKey:
acceptedContact.localConversationRecordKey,
);
} else {
// Reject
await contactInvitationListCubit.deleteInvitation(
accepted: false,
contactRequestInboxRecordKey: contactRequestInboxRecordKey);
}
}
});
}
@override
Widget build(BuildContext context) => Provider<ActiveAccountInfo>.value(
value: widget.activeAccountInfo,
child: BlocProvider(
create: (context) =>
AccountRecordCubit(record: activeAccountInfo.accountRecord),
create: (context) => AccountRecordCubit(
record: widget.activeAccountInfo.accountRecord),
child: Builder(builder: (context) {
final account =
context.watch<AccountRecordCubit>().state.data?.value;
if (account == null) {
return waitingPage();
}
return MultiBlocProvider(providers: [
return MultiBlocProvider(
providers: [
BlocProvider(
create: (context) => ContactInvitationListCubit(
activeAccountInfo: activeAccountInfo,
activeAccountInfo: widget.activeAccountInfo,
account: account)),
BlocProvider(
create: (context) => ContactListCubit(
activeAccountInfo: activeAccountInfo,
activeAccountInfo: widget.activeAccountInfo,
account: account)),
BlocProvider(
create: (context) => ChatListCubit(
activeAccountInfo: activeAccountInfo,
activeAccountInfo: widget.activeAccountInfo,
account: account)),
BlocProvider(
create: (context) => ActiveConversationsBlocMapCubit(
activeAccountInfo: activeAccountInfo,
activeAccountInfo: widget.activeAccountInfo,
contactListCubit: context.read<ContactListCubit>())
..follow(
initialInputState: const AsyncValue.loading(),
stream: context.read<ChatListCubit>().stream)),
BlocProvider(
create: (context) => ActiveConversationMessagesBlocMapCubit(
activeAccountInfo: activeAccountInfo,
create: (context) =>
ActiveConversationMessagesBlocMapCubit(
activeAccountInfo: widget.activeAccountInfo,
)..follow(
initialInputState: IMap(),
stream: context
@ -80,17 +160,23 @@ class HomeAccountReadyShellState extends State<HomeAccountReadyShell> {
BlocProvider(
create: (context) => ActiveChatCubit(null)
..withStateListen((event) {
routerCubit.setHasActiveChat(event != null);
widget.routerCubit.setHasActiveChat(event != null);
})),
BlocProvider(
create: (context) => WaitingInvitationsBlocMapCubit(
activeAccountInfo: activeAccountInfo, account: account)
activeAccountInfo: widget.activeAccountInfo,
account: account)
..follow(
initialInputState: const AsyncValue.loading(),
stream: context
.read<ContactInvitationListCubit>()
.stream))
], child: widget.child);
],
child: MultiBlocListener(listeners: [
BlocListener<WaitingInvitationsBlocMapCubit,
WaitingInvitationsBlocMapState>(
listener: _invitationStatusListener,
)
], child: widget.child));
})));
}
}

View File

@ -10,12 +10,12 @@ import 'home_account_missing.dart';
import 'home_no_active.dart';
class HomeShell extends StatefulWidget {
const HomeShell({required this.child, super.key});
const HomeShell({required this.accountReadyBuilder, super.key});
@override
HomeShellState createState() => HomeShellState();
final Widget child;
final Builder accountReadyBuilder;
}
class HomeShellState extends State<HomeShell> {
@ -32,7 +32,7 @@ class HomeShellState extends State<HomeShell> {
super.dispose();
}
Widget buildWithLogin(BuildContext context, Widget child) {
Widget buildWithLogin(BuildContext context) {
final activeLocalAccount = context.watch<ActiveLocalAccountCubit>().state;
if (activeLocalAccount == null) {
@ -56,7 +56,7 @@ class HomeShellState extends State<HomeShell> {
child: BlocProvider(
create: (context) => AccountRecordCubit(
record: accountInfo.activeAccountInfo!.accountRecord),
child: child));
child: widget.accountReadyBuilder));
}
}
@ -72,6 +72,6 @@ class HomeShellState extends State<HomeShell> {
child: DecoratedBox(
decoration: BoxDecoration(
color: scale.primaryScale.activeElementBackground),
child: buildWithLogin(context, widget.child))));
child: buildWithLogin(context))));
}
}

View File

@ -3,6 +3,7 @@ import 'dart:async';
import 'package:bloc/bloc.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/widgets.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:go_router/go_router.dart';
import 'package:stream_transform/stream_transform.dart';
@ -68,8 +69,10 @@ class RouterCubit extends Cubit<RouterState> {
),
ShellRoute(
navigatorKey: _homeNavKey,
builder: (context, state, child) =>
HomeShell(child: HomeAccountReadyShell(child: child)),
builder: (context, state, child) => HomeShell(
accountReadyBuilder: Builder(
builder: (context) =>
HomeAccountReadyShell(context: context, child: child))),
routes: [
GoRoute(
path: '/home',

View File

@ -10,41 +10,20 @@ class AsyncTransformerCubit<T, S> extends Cubit<AsyncValue<T>> {
_subscription = input.stream.listen(_asyncTransform);
}
void _asyncTransform(AsyncValue<S> newInputState) {
// Use a singlefuture here to ensure we get dont lose any updates
// If the input stream gives us an update while we are
// still processing the last update, the most recent input state will
// be saved and processed eventually.
singleFuture(this, () async {
var newState = newInputState;
var done = false;
while (!done) {
_singleStateProcessor.updateState(newInputState, closure: (newState) async {
// Emit the transformed state
try {
if (newState is AsyncLoading) {
return AsyncValue<T>.loading();
}
if (newState is AsyncError) {
final newStateError = newState as AsyncError<S>;
return AsyncValue<T>.error(
newStateError.error, newStateError.stackTrace);
}
if (newState is AsyncLoading<S>) {
emit(const AsyncValue.loading());
} else if (newState is AsyncError<S>) {
emit(AsyncValue.error(newState.error, newState.stackTrace));
} else {
final transformedState = await transform(newState.data!.value);
emit(transformedState);
}
} on Exception catch (e, st) {
emit(AsyncValue.error(e, st));
}
// See if there's another state change to process
final next = _nextInputState;
_nextInputState = null;
if (next != null) {
newState = next;
} else {
done = true;
}
}
}, onBusy: () {
// Keep this state until we process again
_nextInputState = newInputState;
});
}
@ -56,7 +35,8 @@ class AsyncTransformerCubit<T, S> extends Cubit<AsyncValue<T>> {
}
Cubit<AsyncValue<S>> input;
AsyncValue<S>? _nextInputState;
final SingleStateProcessor<AsyncValue<S>> _singleStateProcessor =
SingleStateProcessor();
Future<AsyncValue<T>> Function(S) transform;
late final StreamSubscription<AsyncValue<S>> _subscription;
}

View File

@ -30,16 +30,8 @@ abstract mixin class StateFollower<S extends Object, K, V> {
Future<void> updateState(K key, V value);
void _updateFollow(S newInputState) {
// Use a singlefuture here to ensure we get dont lose any updates
// If the input stream gives us an update while we are
// still processing the last update, the most recent input state will
// be saved and processed eventually.
final newInputStateMap = getStateMap(newInputState);
singleFuture(this, () async {
var newStateMap = newInputStateMap;
var done = false;
while (!done) {
_singleStateProcessor.updateState(getStateMap(newInputState),
closure: (newStateMap) async {
for (final k in _lastInputStateMap.keys) {
if (!newStateMap.containsKey(k)) {
// deleted
@ -56,23 +48,11 @@ abstract mixin class StateFollower<S extends Object, K, V> {
// Keep this state map for the next time
_lastInputStateMap = newStateMap;
// See if there's another state change to process
final next = _nextInputStateMap;
_nextInputStateMap = null;
if (next != null) {
newStateMap = next;
} else {
done = true;
}
}
}, onBusy: () {
// Keep this state until we process again
_nextInputStateMap = newInputStateMap;
});
}
late IMap<K, V> _lastInputStateMap;
IMap<K, V>? _nextInputStateMap;
final SingleStateProcessor<IMap<K, V>> _singleStateProcessor =
SingleStateProcessor();
late final StreamSubscription<S> _subscription;
}

View File

@ -3,4 +3,6 @@ library;
export 'src/async_tag_lock.dart';
export 'src/async_value.dart';
export 'src/single_async.dart';
export 'src/serial_future.dart';
export 'src/single_future.dart';
export 'src/single_state_processor.dart';

View File

@ -0,0 +1,57 @@
// Process a single future at a time per tag queued serially
//
// The closure function is called to produce the future that is to be executed.
// If a future with a particular tag is still executing, it is queued serially
// and executed when the previous tagged future completes.
// When a tagged serialFuture finishes executing, the onDone callback is called.
// If an unhandled exception happens in the closure future, the onError callback
// is called.
import 'dart:async';
import 'dart:collection';
import 'async_tag_lock.dart';
AsyncTagLock<Object> _keys = AsyncTagLock();
typedef SerialFutureQueueItem = Future<void> Function();
Map<Object, Queue<SerialFutureQueueItem>> _queues = {};
SerialFutureQueueItem _makeSerialFutureQueueItem<T>(
Future<T> Function() closure,
void Function(T)? onDone,
void Function(Object e, StackTrace? st)? onError) =>
() async {
try {
final out = await closure();
if (onDone != null) {
onDone(out);
}
// ignore: avoid_catches_without_on_clauses
} catch (e, sp) {
if (onError != null) {
onError(e, sp);
} else {
rethrow;
}
}
};
void serialFuture<T>(Object tag, Future<T> Function() closure,
{void Function(T)? onDone,
void Function(Object e, StackTrace? st)? onError}) {
final queueItem = _makeSerialFutureQueueItem(closure, onDone, onError);
if (!_keys.tryLock(tag)) {
final queue = _queues[tag];
queue!.add(queueItem);
return;
}
final queue = _queues[tag] = Queue.from([queueItem]);
unawaited(() async {
do {
final queueItem = queue.removeFirst();
await queueItem();
} while (queue.isNotEmpty);
_queues.remove(tag);
_keys.unlockTag(tag);
}());
}

View File

@ -1,25 +0,0 @@
import 'dart:async';
import 'async_tag_lock.dart';
AsyncTagLock<Object> _keys = AsyncTagLock();
void singleFuture<T>(Object tag, Future<T> Function() closure,
{void Function()? onBusy, void Function(T)? onDone}) {
if (!_keys.tryLock(tag)) {
if (onBusy != null) {
onBusy();
}
return;
}
unawaited(() async {
try {
final out = await closure();
if (onDone != null) {
onDone(out);
}
} finally {
_keys.unlockTag(tag);
}
}());
}

View File

@ -0,0 +1,42 @@
import 'dart:async';
import 'async_tag_lock.dart';
AsyncTagLock<Object> _keys = AsyncTagLock();
// Process a single future at a time per tag
//
// The closure function is called to produce the future that is to be executed.
// If a future with a particular tag is still executing, the onBusy callback
// is called.
// When a tagged singleFuture finishes executing, the onDone callback is called.
// If an unhandled exception happens in the closure future, the onError callback
// is called.
void singleFuture<T>(Object tag, Future<T> Function() closure,
{void Function()? onBusy,
void Function(T)? onDone,
void Function(Object e, StackTrace? st)? onError}) {
if (!_keys.tryLock(tag)) {
if (onBusy != null) {
onBusy();
}
return;
}
unawaited(() async {
try {
final out = await closure();
if (onDone != null) {
onDone(out);
}
// ignore: avoid_catches_without_on_clauses
} catch (e, sp) {
if (onError != null) {
onError(e, sp);
} else {
rethrow;
}
} finally {
_keys.unlockTag(tag);
}
}());
}

View File

@ -0,0 +1,46 @@
import 'dart:async';
import '../async_tools.dart';
// Process a single state update at a time ensuring the most
// recent state gets processed asynchronously, possibly skipping
// states that happen while a previous state is still being processed.
//
// Eventually this will always process the most recent state passed to
// updateState.
//
// This is useful for processing state changes asynchronously without waiting
// from a synchronous execution context
class SingleStateProcessor<State> {
SingleStateProcessor();
void updateState(State newInputState,
{required Future<void> Function(State) closure}) {
// Use a singlefuture here to ensure we get dont lose any updates
// If the input stream gives us an update while we are
// still processing the last update, the most recent input state will
// be saved and processed eventually.
singleFuture(this, () async {
var newState = newInputState;
var done = false;
while (!done) {
await closure(newState);
// See if there's another state change to process
final next = _nextState;
_nextState = null;
if (next != null) {
newState = next;
} else {
done = true;
}
}
}, onBusy: () {
// Keep this state until we process again
_nextState = newInputState;
});
}
State? _nextState;
}

View File

@ -2,7 +2,6 @@
library dht_support;
export 'src/dht_record.dart';
export 'src/dht_record_crypto.dart';
export 'src/dht_record_cubit.dart';
export 'src/dht_record_pool.dart';

View File

@ -1,12 +1,4 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
part of 'dht_record_pool.dart';
@immutable
class DHTRecordWatchChange extends Equatable {
@ -14,7 +6,7 @@ class DHTRecordWatchChange extends Equatable {
{required this.local, required this.data, required this.subkeys});
final bool local;
final Uint8List data;
final Uint8List? data;
final List<ValueSubkeyRange> subkeys;
@override
@ -26,46 +18,41 @@ class DHTRecordWatchChange extends Equatable {
class DHTRecord {
DHTRecord(
{required VeilidRoutingContext routingContext,
required DHTRecordDescriptor recordDescriptor,
int defaultSubkey = 0,
KeyPair? writer,
DHTRecordCrypto crypto = const DHTRecordCryptoPublic()})
required SharedDHTRecordData sharedDHTRecordData,
required int defaultSubkey,
required KeyPair? writer,
required DHTRecordCrypto crypto})
: _crypto = crypto,
_routingContext = routingContext,
_recordDescriptor = recordDescriptor,
_defaultSubkey = defaultSubkey,
_writer = writer,
_open = true,
_valid = true,
_subkeySeqCache = {},
needsWatchStateUpdate = false,
inWatchStateUpdate = false;
_sharedDHTRecordData = sharedDHTRecordData;
final SharedDHTRecordData _sharedDHTRecordData;
final VeilidRoutingContext _routingContext;
final DHTRecordDescriptor _recordDescriptor;
final int _defaultSubkey;
final KeyPair? _writer;
final Map<int, int> _subkeySeqCache;
final DHTRecordCrypto _crypto;
bool _open;
bool _valid;
@internal
StreamController<DHTRecordWatchChange>? watchController;
@internal
bool needsWatchStateUpdate;
@internal
bool inWatchStateUpdate;
@internal
WatchState? watchState;
int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey;
VeilidRoutingContext get routingContext => _routingContext;
TypedKey get key => _recordDescriptor.key;
PublicKey get owner => _recordDescriptor.owner;
KeyPair? get ownerKeyPair => _recordDescriptor.ownerKeyPair();
DHTSchema get schema => _recordDescriptor.schema;
int get subkeyCount => _recordDescriptor.schema.subkeyCount();
TypedKey get key => _sharedDHTRecordData.recordDescriptor.key;
PublicKey get owner => _sharedDHTRecordData.recordDescriptor.owner;
KeyPair? get ownerKeyPair =>
_sharedDHTRecordData.recordDescriptor.ownerKeyPair();
DHTSchema get schema => _sharedDHTRecordData.recordDescriptor.schema;
int get subkeyCount =>
_sharedDHTRecordData.recordDescriptor.schema.subkeyCount();
KeyPair? get writer => _writer;
DHTRecordCrypto get crypto => _crypto;
OwnedDHTRecordPointer get ownedDHTRecordPointer =>
@ -79,22 +66,16 @@ class DHTRecord {
return;
}
await watchController?.close();
await _routingContext.closeDHTRecord(_recordDescriptor.key);
DHTRecordPool.instance.recordClosed(_recordDescriptor.key);
await DHTRecordPool.instance._recordClosed(this);
_open = false;
}
Future<void> delete() async {
if (!_valid) {
throw StateError('already deleted');
}
if (_open) {
await close();
}
await DHTRecordPool.instance.deleteDeep(key);
void _markDeleted() {
_valid = false;
}
Future<void> delete() => DHTRecordPool.instance.delete(key);
Future<T> scope<T>(Future<T> Function(DHTRecord) scopeFunction) async {
try {
return await scopeFunction(this);
@ -134,17 +115,17 @@ class DHTRecord {
bool forceRefresh = false,
bool onlyUpdates = false}) async {
subkey = subkeyOrDefault(subkey);
final valueData = await _routingContext.getDHTValue(
_recordDescriptor.key, subkey, forceRefresh);
final valueData = await _routingContext.getDHTValue(key, subkey,
forceRefresh: forceRefresh);
if (valueData == null) {
return null;
}
final lastSeq = _subkeySeqCache[subkey];
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
if (onlyUpdates && lastSeq != null && valueData.seq <= lastSeq) {
return null;
}
final out = _crypto.decrypt(valueData.data, subkey);
_subkeySeqCache[subkey] = valueData.seq;
_sharedDHTRecordData.subkeySeqCache[subkey] = valueData.seq;
return out;
}
@ -176,17 +157,16 @@ class DHTRecord {
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
{int subkey = -1}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _subkeySeqCache[subkey];
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
// Set the new data if possible
var newValueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, encryptedNewValue);
var newValueData =
await _routingContext.setDHTValue(key, subkey, encryptedNewValue);
if (newValueData == null) {
// A newer value wasn't found on the set, but
// we may get a newer value when getting the value for the sequence number
newValueData = await _routingContext.getDHTValue(
_recordDescriptor.key, subkey, false);
newValueData = await _routingContext.getDHTValue(key, subkey);
if (newValueData == null) {
assert(newValueData != null, "can't get value that was just set");
return null;
@ -195,13 +175,13 @@ class DHTRecord {
// Record new sequence number
final isUpdated = newValueData.seq != lastSeq;
_subkeySeqCache[subkey] = newValueData.seq;
_sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq;
// See if the encrypted data returned is exactly the same
// if so, shortcut and don't bother decrypting it
if (newValueData.data.equals(encryptedNewValue)) {
if (isUpdated) {
addLocalValueChange(newValue, subkey);
_addLocalValueChange(newValue, subkey);
}
return null;
}
@ -209,36 +189,35 @@ class DHTRecord {
// Decrypt value to return it
final decryptedNewValue = await _crypto.decrypt(newValueData.data, subkey);
if (isUpdated) {
addLocalValueChange(decryptedNewValue, subkey);
_addLocalValueChange(decryptedNewValue, subkey);
}
return decryptedNewValue;
}
Future<void> eventualWriteBytes(Uint8List newValue, {int subkey = -1}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _subkeySeqCache[subkey];
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
ValueData? newValueData;
do {
do {
// Set the new data
newValueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, encryptedNewValue);
newValueData =
await _routingContext.setDHTValue(key, subkey, encryptedNewValue);
// Repeat if newer data on the network was found
} while (newValueData != null);
// Get the data to check its sequence number
newValueData = await _routingContext.getDHTValue(
_recordDescriptor.key, subkey, false);
newValueData = await _routingContext.getDHTValue(key, subkey);
if (newValueData == null) {
assert(newValueData != null, "can't get value that was just set");
return;
}
// Record new sequence number
_subkeySeqCache[subkey] = newValueData.seq;
_sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq;
// The encrypted data returned should be exactly the same
// as what we are trying to set,
@ -247,7 +226,7 @@ class DHTRecord {
final isUpdated = newValueData.seq != lastSeq;
if (isUpdated) {
addLocalValueChange(newValue, subkey);
_addLocalValueChange(newValue, subkey);
}
}
@ -258,8 +237,7 @@ class DHTRecord {
// Get the existing data, do not allow force refresh here
// because if we need a refresh the setDHTValue will fail anyway
var oldValue =
await get(subkey: subkey, forceRefresh: false, onlyUpdates: false);
var oldValue = await get(subkey: subkey);
do {
// Update the data
@ -314,16 +292,16 @@ class DHTRecord {
int? count}) async {
// Set up watch requirements which will get picked up by the next tick
final oldWatchState = watchState;
watchState = WatchState(
subkeys: subkeys?.lock, expiration: expiration, count: count);
watchState =
WatchState(subkeys: subkeys, expiration: expiration, count: count);
if (oldWatchState != watchState) {
needsWatchStateUpdate = true;
_sharedDHTRecordData.needsWatchStateUpdate = true;
}
}
Future<StreamSubscription<DHTRecordWatchChange>> listen(
Future<void> Function(
DHTRecord record, Uint8List data, List<ValueSubkeyRange> subkeys)
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys)
onUpdate,
{bool localChanges = true}) async {
// Set up watch requirements
@ -339,14 +317,16 @@ class DHTRecord {
return;
}
Future.delayed(Duration.zero, () async {
final Uint8List data;
final Uint8List? data;
if (change.local) {
// local changes are not encrypted
data = change.data;
} else {
// incoming/remote changes are encrypted
data =
await _crypto.decrypt(change.data, change.subkeys.first.low);
final changeData = change.data;
data = changeData == null
? null
: await _crypto.decrypt(changeData, change.subkeys.first.low);
}
await onUpdate(this, data, change.subkeys);
});
@ -362,17 +342,48 @@ class DHTRecord {
// Tear down watch requirements
if (watchState != null) {
watchState = null;
needsWatchStateUpdate = true;
_sharedDHTRecordData.needsWatchStateUpdate = true;
}
}
void addLocalValueChange(Uint8List data, int subkey) {
void _addValueChange(
{required bool local,
required Uint8List data,
required List<ValueSubkeyRange> subkeys}) {
final ws = watchState;
if (ws != null) {
final watchedSubkeys = ws.subkeys;
if (watchedSubkeys == null) {
// Report all subkeys
watchController?.add(
DHTRecordWatchChange(local: false, data: data, subkeys: subkeys));
} else {
// Only some subkeys are being watched, see if the reported update
// overlaps the subkeys being watched
final overlappedSubkeys = watchedSubkeys.intersectSubkeys(subkeys);
// If the reported data isn't within the
// range we care about, don't pass it through
final overlappedFirstSubkey = overlappedSubkeys.firstSubkey;
final updateFirstSubkey = subkeys.firstSubkey;
final updatedData = (overlappedFirstSubkey != null &&
updateFirstSubkey != null &&
overlappedFirstSubkey == updateFirstSubkey)
? data
: null;
// Report only wathced subkeys
watchController?.add(DHTRecordWatchChange(
local: true, data: data, subkeys: [ValueSubkeyRange.single(subkey)]));
local: local, data: updatedData, subkeys: overlappedSubkeys));
}
}
}
void _addLocalValueChange(Uint8List data, int subkey) {
_addValueChange(
local: true, data: data, subkeys: [ValueSubkeyRange.single(subkey)]);
}
void addRemoteValueChange(VeilidUpdateValueChange update) {
watchController?.add(DHTRecordWatchChange(
local: false, data: update.valueData.data, subkeys: update.subkeys));
_addValueChange(
local: false, data: update.valueData.data, subkeys: update.subkeys);
}
}

View File

@ -3,8 +3,8 @@ import 'dart:typed_data';
import '../../../../veilid_support.dart';
abstract class DHTRecordCrypto {
FutureOr<Uint8List> encrypt(Uint8List data, int subkey);
FutureOr<Uint8List> decrypt(Uint8List data, int subkey);
Future<Uint8List> encrypt(Uint8List data, int subkey);
Future<Uint8List> decrypt(Uint8List data, int subkey);
}
////////////////////////////////////
@ -32,11 +32,11 @@ class DHTRecordCryptoPrivate implements DHTRecordCrypto {
}
@override
FutureOr<Uint8List> encrypt(Uint8List data, int subkey) =>
Future<Uint8List> encrypt(Uint8List data, int subkey) =>
_cryptoSystem.encryptNoAuthWithNonce(data, _secretKey);
@override
FutureOr<Uint8List> decrypt(Uint8List data, int subkey) =>
Future<Uint8List> decrypt(Uint8List data, int subkey) =>
_cryptoSystem.decryptNoAuthWithNonce(data, _secretKey);
}
@ -46,8 +46,8 @@ class DHTRecordCryptoPublic implements DHTRecordCrypto {
const DHTRecordCryptoPublic();
@override
FutureOr<Uint8List> encrypt(Uint8List data, int subkey) => data;
Future<Uint8List> encrypt(Uint8List data, int subkey) async => data;
@override
FutureOr<Uint8List> decrypt(Uint8List data, int subkey) => data;
Future<Uint8List> decrypt(Uint8List data, int subkey) async => data;
}

View File

@ -8,7 +8,7 @@ import '../../veilid_support.dart';
typedef InitialStateFunction<T> = Future<T?> Function(DHTRecord);
typedef StateFunction<T> = Future<T?> Function(
DHTRecord, List<ValueSubkeyRange>, Uint8List);
DHTRecord, List<ValueSubkeyRange>, Uint8List?);
class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecordCubit({
@ -28,9 +28,8 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecordCubit.value({
required DHTRecord record,
required Future<T?> Function(DHTRecord) initialStateFunction,
required Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List)
stateFunction,
required InitialStateFunction<T> initialStateFunction,
required StateFunction<T> stateFunction,
}) : _record = record,
_stateFunction = stateFunction,
_wantsCloseRecord = false,
@ -41,9 +40,8 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
}
Future<void> _init(
Future<T?> Function(DHTRecord) initialStateFunction,
Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List)
stateFunction,
InitialStateFunction<T> initialStateFunction,
StateFunction<T> stateFunction,
) async {
// Make initial state update
try {
@ -142,7 +140,7 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey) {
if (firstSubkey != defaultSubkey || updatedata == null) {
final maybeData = await record.get(forceRefresh: true);
if (maybeData == null) {
return null;

View File

@ -1,15 +1,21 @@
import 'dart:async';
import 'dart:math';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import '../../../../veilid_support.dart';
part 'dht_record_pool.freezed.dart';
part 'dht_record_pool.g.dart';
part 'dht_record.dart';
/// Record pool that managed DHTRecords and allows for tagged deletion
@freezed
class DHTRecordPoolAllocations with _$DHTRecordPoolAllocations {
@ -39,13 +45,14 @@ class OwnedDHTRecordPointer with _$OwnedDHTRecordPointer {
}
/// Watch state
@immutable
class WatchState extends Equatable {
const WatchState(
{required this.subkeys,
required this.expiration,
required this.count,
this.realExpiration});
final IList<ValueSubkeyRange>? subkeys;
final List<ValueSubkeyRange>? subkeys;
final Timestamp? expiration;
final int? count;
final Timestamp? realExpiration;
@ -54,23 +61,51 @@ class WatchState extends Equatable {
List<Object?> get props => [subkeys, expiration, count, realExpiration];
}
/// Data shared amongst all DHTRecord instances
class SharedDHTRecordData {
SharedDHTRecordData(
{required this.recordDescriptor,
required this.defaultWriter,
required this.defaultRoutingContext});
DHTRecordDescriptor recordDescriptor;
KeyPair? defaultWriter;
VeilidRoutingContext defaultRoutingContext;
Map<int, int> subkeySeqCache = {};
bool inWatchStateUpdate = false;
bool needsWatchStateUpdate = false;
}
// Per opened record data
class OpenedRecordInfo {
OpenedRecordInfo(
{required DHTRecordDescriptor recordDescriptor,
required KeyPair? defaultWriter,
required VeilidRoutingContext defaultRoutingContext})
: shared = SharedDHTRecordData(
recordDescriptor: recordDescriptor,
defaultWriter: defaultWriter,
defaultRoutingContext: defaultRoutingContext);
SharedDHTRecordData shared;
Set<DHTRecord> records = {};
}
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
: _state = DHTRecordPoolAllocations(
childrenByParent: IMap(),
parentByChild: IMap(),
rootRecords: ISet()),
_opened = <TypedKey, DHTRecord>{},
_locks = AsyncTagLock(),
_mutex = Mutex(),
_opened = <TypedKey, OpenedRecordInfo>{},
_routingContext = routingContext,
_veilid = veilid;
// Persistent DHT record list
DHTRecordPoolAllocations _state;
// Lock table to ensure we don't open the same record more than once
final AsyncTagLock<TypedKey> _locks;
// Create/open Mutex
final Mutex _mutex;
// Which DHT records are currently open
final Map<TypedKey, DHTRecord> _opened;
final Map<TypedKey, OpenedRecordInfo> _opened;
// Default routing context to use for new keys
final VeilidRoutingContext _routingContext;
// Convenience accessor
@ -107,30 +142,106 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
Veilid get veilid => _veilid;
void _recordOpened(DHTRecord record) {
if (_opened.containsKey(record.key)) {
throw StateError('record already opened');
Future<OpenedRecordInfo> _recordCreateInner(
{required VeilidRoutingContext dhtctx,
required DHTSchema schema,
KeyPair? writer,
TypedKey? parent}) async {
assert(_mutex.isLocked, 'should be locked here');
// Create the record
final recordDescriptor = await dhtctx.createDHTRecord(schema);
// Reopen if a writer is specified to ensure
// we switch the default writer
if (writer != null) {
await dhtctx.openDHTRecord(recordDescriptor.key, writer: writer);
}
_opened[record.key] = record;
final openedRecordInfo = OpenedRecordInfo(
recordDescriptor: recordDescriptor,
defaultWriter: writer ?? recordDescriptor.ownerKeyPair(),
defaultRoutingContext: dhtctx);
_opened[recordDescriptor.key] = openedRecordInfo;
// Register the dependency
await _addDependencyInner(parent, recordDescriptor.key);
return openedRecordInfo;
}
void recordClosed(TypedKey key) {
final rec = _opened.remove(key);
if (rec == null) {
Future<OpenedRecordInfo> _recordOpenInner(
{required VeilidRoutingContext dhtctx,
required TypedKey recordKey,
KeyPair? writer,
TypedKey? parent}) async {
assert(_mutex.isLocked, 'should be locked here');
// If we are opening a key that already exists
// make sure we are using the same parent if one was specified
_validateParent(parent, recordKey);
// See if this has been opened yet
final openedRecordInfo = _opened[recordKey];
if (openedRecordInfo == null) {
// Fresh open, just open the record
final recordDescriptor =
await dhtctx.openDHTRecord(recordKey, writer: writer);
final newOpenedRecordInfo = OpenedRecordInfo(
recordDescriptor: recordDescriptor,
defaultWriter: writer,
defaultRoutingContext: dhtctx);
_opened[recordDescriptor.key] = newOpenedRecordInfo;
// Register the dependency
await _addDependencyInner(parent, recordKey);
return newOpenedRecordInfo;
}
// Already opened
// See if we need to reopen the record with a default writer and possibly
// a different routing context
if (writer != null && openedRecordInfo.shared.defaultWriter == null) {
final newRecordDescriptor =
await dhtctx.openDHTRecord(recordKey, writer: writer);
openedRecordInfo.shared.defaultWriter = writer;
openedRecordInfo.shared.defaultRoutingContext = dhtctx;
if (openedRecordInfo.shared.recordDescriptor.ownerSecret == null) {
openedRecordInfo.shared.recordDescriptor = newRecordDescriptor;
}
}
// Register the dependency
await _addDependencyInner(parent, recordKey);
return openedRecordInfo;
}
Future<void> _recordClosed(DHTRecord record) async {
await _mutex.protect(() async {
final key = record.key;
final openedRecordInfo = _opened[key];
if (openedRecordInfo == null ||
!openedRecordInfo.records.remove(record)) {
throw StateError('record already closed');
}
_locks.unlockTag(key);
if (openedRecordInfo.records.isEmpty) {
await _routingContext.closeDHTRecord(key);
_opened.remove(key);
}
});
}
Future<void> deleteDeep(TypedKey parent) async {
// Collect all dependencies
Future<void> delete(TypedKey recordKey) async {
// Collect all dependencies (including the record itself)
final allDeps = <TypedKey>[];
final currentDeps = [parent];
final currentDeps = [recordKey];
while (currentDeps.isNotEmpty) {
final nextDep = currentDeps.removeLast();
// Remove this child from its parent
await _removeDependency(nextDep);
await _removeDependencyInner(nextDep);
allDeps.add(nextDep);
final childDeps =
@ -138,18 +249,27 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
currentDeps.addAll(childDeps);
}
// Delete all dependent records in parallel
final allFutures = <Future<void>>[];
// Delete all dependent records in parallel (including the record itself)
final allDeleteFutures = <Future<void>>[];
final allCloseFutures = <Future<void>>[];
final allDeletedRecords = <DHTRecord>{};
for (final dep in allDeps) {
// If record is opened, close it first
final rec = _opened[dep];
if (rec != null) {
await rec.close();
final openinfo = _opened[dep];
if (openinfo != null) {
for (final rec in openinfo.records) {
allCloseFutures.add(rec.close());
allDeletedRecords.add(rec);
}
}
// Then delete
allFutures.add(_routingContext.deleteDHTRecord(dep));
allDeleteFutures.add(_routingContext.deleteDHTRecord(dep));
}
await Future.wait(allCloseFutures);
await Future.wait(allDeleteFutures);
for (final deletedRecord in allDeletedRecords) {
deletedRecord._markDeleted();
}
await Future.wait(allFutures);
}
void _validateParent(TypedKey? parent, TypedKey child) {
@ -169,7 +289,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}
}
Future<void> _addDependency(TypedKey? parent, TypedKey child) async {
Future<void> _addDependencyInner(TypedKey? parent, TypedKey child) async {
assert(_mutex.isLocked, 'should be locked here');
if (parent == null) {
if (_state.rootRecords.contains(child)) {
// Dependency already added
@ -191,7 +312,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}
}
Future<void> _removeDependency(TypedKey child) async {
Future<void> _removeDependencyInner(TypedKey child) async {
assert(_mutex.isLocked, 'should be locked here');
if (_state.rootRecords.contains(child)) {
_state = await store(
_state.copyWith(rootRecords: _state.rootRecords.remove(child)));
@ -226,57 +348,52 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
int defaultSubkey = 0,
DHTRecordCrypto? crypto,
KeyPair? writer,
}) async {
}) async =>
_mutex.protect(() async {
final dhtctx = routingContext ?? _routingContext;
final recordDescriptor = await dhtctx.createDHTRecord(schema);
await _locks.lockTag(recordDescriptor.key);
final openedRecordInfo = await _recordCreateInner(
dhtctx: dhtctx, schema: schema, writer: writer, parent: parent);
final rec = DHTRecord(
routingContext: dhtctx,
recordDescriptor: recordDescriptor,
defaultSubkey: defaultSubkey,
writer: writer ?? recordDescriptor.ownerKeyPair(),
sharedDHTRecordData: openedRecordInfo.shared,
writer: writer ??
openedRecordInfo.shared.recordDescriptor.ownerKeyPair(),
crypto: crypto ??
await DHTRecordCryptoPrivate.fromTypedKeyPair(
recordDescriptor.ownerTypedKeyPair()!));
await DHTRecordCryptoPrivate.fromTypedKeyPair(openedRecordInfo
.shared.recordDescriptor
.ownerTypedKeyPair()!));
await _addDependency(parent, rec.key);
_recordOpened(rec);
openedRecordInfo.records.add(rec);
return rec;
}
});
/// Open a DHTRecord readonly
Future<DHTRecord> openRead(TypedKey recordKey,
{VeilidRoutingContext? routingContext,
TypedKey? parent,
int defaultSubkey = 0,
DHTRecordCrypto? crypto}) async {
await _locks.lockTag(recordKey);
DHTRecordCrypto? crypto}) async =>
_mutex.protect(() async {
final dhtctx = routingContext ?? _routingContext;
late final DHTRecord rec;
// If we are opening a key that already exists
// make sure we are using the same parent if one was specified
_validateParent(parent, recordKey);
final openedRecordInfo = await _recordOpenInner(
dhtctx: dhtctx, recordKey: recordKey, parent: parent);
// Open from the veilid api
final recordDescriptor = await dhtctx.openDHTRecord(recordKey, null);
rec = DHTRecord(
final rec = DHTRecord(
routingContext: dhtctx,
recordDescriptor: recordDescriptor,
defaultSubkey: defaultSubkey,
sharedDHTRecordData: openedRecordInfo.shared,
writer: null,
crypto: crypto ?? const DHTRecordCryptoPublic());
// Register the dependency
await _addDependency(parent, rec.key);
_recordOpened(rec);
openedRecordInfo.records.add(rec);
return rec;
}
});
/// Open a DHTRecord writable
Future<DHTRecord> openWrite(
@ -286,33 +403,29 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
TypedKey? parent,
int defaultSubkey = 0,
DHTRecordCrypto? crypto,
}) async {
await _locks.lockTag(recordKey);
}) async =>
_mutex.protect(() async {
final dhtctx = routingContext ?? _routingContext;
late final DHTRecord rec;
// If we are opening a key that already exists
// make sure we are using the same parent if one was specified
_validateParent(parent, recordKey);
final openedRecordInfo = await _recordOpenInner(
dhtctx: dhtctx,
recordKey: recordKey,
parent: parent,
writer: writer);
// Open from the veilid api
final recordDescriptor = await dhtctx.openDHTRecord(recordKey, writer);
rec = DHTRecord(
final rec = DHTRecord(
routingContext: dhtctx,
recordDescriptor: recordDescriptor,
defaultSubkey: defaultSubkey,
writer: writer,
sharedDHTRecordData: openedRecordInfo.shared,
crypto: crypto ??
await DHTRecordCryptoPrivate.fromTypedKeyPair(
TypedKeyPair.fromKeyPair(recordKey.kind, writer)));
// Register the dependency if specified
await _addDependency(parent, rec.key);
_recordOpened(rec);
openedRecordInfo.records.add(rec);
return rec;
}
});
/// Open a DHTRecord owned
/// This is the same as writable but uses an OwnedDHTRecordPointer
@ -336,9 +449,6 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
crypto: crypto,
);
/// Look up an opened DHTRecord
DHTRecord? getOpenedRecord(TypedKey recordKey) => _opened[recordKey];
/// Get the parent of a DHTRecord key if it exists
TypedKey? getParentRecordKey(TypedKey child) {
final childJson = child.toJson();
@ -351,33 +461,107 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
// Change
for (final kv in _opened.entries) {
if (kv.key == updateValueChange.key) {
kv.value.addRemoteValueChange(updateValueChange);
for (final rec in kv.value.records) {
rec.addRemoteValueChange(updateValueChange);
}
break;
}
}
} else {
final now = Veilid.instance.now().value;
// Expired, process renewal if desired
for (final kv in _opened.entries) {
if (kv.key == updateValueChange.key) {
// Renew watch state
kv.value.needsWatchStateUpdate = true;
for (final entry in _opened.entries) {
final openedKey = entry.key;
final openedRecordInfo = entry.value;
if (openedKey == updateValueChange.key) {
// Renew watch state for each opened recrod
for (final rec in openedRecordInfo.records) {
// See if the watch had an expiration and if it has expired
// otherwise the renewal will keep the same parameters
final watchState = kv.value.watchState;
final watchState = rec.watchState;
if (watchState != null) {
final exp = watchState.expiration;
if (exp != null && exp.value < Veilid.instance.now().value) {
if (exp != null && exp.value < now) {
// Has expiration, and it has expired, clear watch state
kv.value.watchState = null;
rec.watchState = null;
}
}
}
openedRecordInfo.shared.needsWatchStateUpdate = true;
break;
}
}
}
}
WatchState? _collectUnionWatchState(Iterable<DHTRecord> records) {
// Collect union of opened record watch states
int? totalCount;
Timestamp? maxExpiration;
List<ValueSubkeyRange>? allSubkeys;
var noExpiration = false;
var everySubkey = false;
var cancelWatch = true;
for (final rec in records) {
final ws = rec.watchState;
if (ws != null) {
cancelWatch = false;
final wsCount = ws.count;
if (wsCount != null) {
totalCount = totalCount ?? 0 + min(wsCount, 0x7FFFFFFF);
totalCount = min(totalCount, 0x7FFFFFFF);
}
final wsExp = ws.expiration;
if (wsExp != null && !noExpiration) {
maxExpiration = maxExpiration == null
? wsExp
: wsExp.value > maxExpiration.value
? wsExp
: maxExpiration;
} else {
noExpiration = true;
}
final wsSubkeys = ws.subkeys;
if (wsSubkeys != null && !everySubkey) {
allSubkeys = allSubkeys == null
? wsSubkeys
: allSubkeys.unionSubkeys(wsSubkeys);
} else {
everySubkey = true;
}
}
}
if (noExpiration) {
maxExpiration = null;
}
if (everySubkey) {
allSubkeys = null;
}
if (cancelWatch) {
return null;
}
return WatchState(
subkeys: allSubkeys, expiration: maxExpiration, count: totalCount);
}
void _updateWatchExpirations(
Iterable<DHTRecord> records, Timestamp realExpiration) {
for (final rec in records) {
final ws = rec.watchState;
if (ws != null) {
rec.watchState = WatchState(
subkeys: ws.subkeys,
expiration: ws.expiration,
count: ws.count,
realExpiration: realExpiration);
}
}
}
/// Ticker to check watch state change requests
Future<void> tick() async {
if (inTick) {
@ -386,53 +570,55 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
inTick = true;
try {
// See if any opened records need watch state changes
final unord = List<Future<void>>.empty(growable: true);
final unord = <Future<void>>[];
for (final kv in _opened.entries) {
final openedRecordKey = kv.key;
final openedRecordInfo = kv.value;
final dhtctx = openedRecordInfo.shared.defaultRoutingContext;
// Check if already updating
if (kv.value.inWatchStateUpdate) {
if (openedRecordInfo.shared.inWatchStateUpdate) {
continue;
}
if (kv.value.needsWatchStateUpdate) {
kv.value.inWatchStateUpdate = true;
if (openedRecordInfo.shared.needsWatchStateUpdate) {
openedRecordInfo.shared.inWatchStateUpdate = true;
final ws = kv.value.watchState;
if (ws == null) {
final watchState = _collectUnionWatchState(openedRecordInfo.records);
// Apply watch changes for record
if (watchState == null) {
unord.add(() async {
// Record needs watch cancel
try {
final done =
await kv.value.routingContext.cancelDHTWatch(kv.key);
final done = await dhtctx.cancelDHTWatch(openedRecordKey);
assert(done,
'should always be done when cancelling whole subkey range');
kv.value.needsWatchStateUpdate = false;
openedRecordInfo.shared.needsWatchStateUpdate = false;
} on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick
}
kv.value.inWatchStateUpdate = false;
openedRecordInfo.shared.inWatchStateUpdate = false;
}());
} else {
unord.add(() async {
// Record needs new watch
try {
final realExpiration = await kv.value.routingContext
.watchDHTValues(kv.key,
subkeys: ws.subkeys?.toList(),
count: ws.count,
expiration: ws.expiration);
kv.value.needsWatchStateUpdate = false;
final realExpiration = await dhtctx.watchDHTValues(
openedRecordKey,
subkeys: watchState.subkeys?.toList(),
count: watchState.count,
expiration: watchState.expiration);
openedRecordInfo.shared.needsWatchStateUpdate = false;
// Update watch state with real expiration
kv.value.watchState = WatchState(
subkeys: ws.subkeys,
expiration: ws.expiration,
count: ws.count,
realExpiration: realExpiration);
// Update watch states with real expiration
_updateWatchExpirations(
openedRecordInfo.records, realExpiration);
} on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick
}
kv.value.inWatchStateUpdate = false;
openedRecordInfo.shared.inWatchStateUpdate = false;
}());
}
}

View File

@ -86,16 +86,12 @@ class DHTShortArray {
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]);
final dhtCreateRecord = await pool.create(
dhtRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
// Reopen with SMPL writer
await dhtCreateRecord.close();
dhtRecord = await pool.openWrite(dhtCreateRecord.key, smplWriter,
parent: parent, routingContext: routingContext, crypto: crypto);
} else {
final schema = DHTSchema.dflt(oCnt: stride + 1);
dhtRecord = await pool.create(