progress on dht_log

This commit is contained in:
Christien Rioux 2024-05-13 10:03:25 -04:00
parent ab4f05a347
commit c4d25fecb0
36 changed files with 1754 additions and 502 deletions

View File

@ -227,7 +227,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
} }
Future<void> _reconcileMessagesInner( Future<void> _reconcileMessagesInner(
{required DHTShortArrayWrite reconciledMessagesWriter, {required DHTRandomReadWrite reconciledMessagesWriter,
required IList<proto.Message> messages}) async { required IList<proto.Message> messages}) async {
// Ensure remoteMessages is sorted by timestamp // Ensure remoteMessages is sorted by timestamp
final newMessages = messages final newMessages = messages
@ -236,7 +236,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Existing messages will always be sorted by timestamp so merging is easy // Existing messages will always be sorted by timestamp so merging is easy
final existingMessages = await reconciledMessagesWriter final existingMessages = await reconciledMessagesWriter
.getAllItemsProtobuf(proto.Message.fromBuffer); .getItemRangeProtobuf(proto.Message.fromBuffer, 0);
if (existingMessages == null) { if (existingMessages == null) {
throw Exception( throw Exception(
'Could not load existing reconciled messages at this time'); 'Could not load existing reconciled messages at this time');

View File

@ -92,31 +92,29 @@ class ChatListCubit extends DHTShortArrayCubit<proto.Chat>
// Remove Chat from account's list // Remove Chat from account's list
// if this fails, don't keep retrying, user can try again later // if this fails, don't keep retrying, user can try again later
final (deletedItem, success) = final deletedItem =
// Ensure followers get their changes before we return // Ensure followers get their changes before we return
await syncFollowers(() => operateWrite((writer) async { await syncFollowers(() => operateWrite((writer) async {
if (activeChatCubit.state == remoteConversationRecordKey) { if (activeChatCubit.state == remoteConversationRecordKey) {
activeChatCubit.setActiveChat(null); activeChatCubit.setActiveChat(null);
} }
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final cbuf = await writer.getItem(i); final c =
if (cbuf == null) { await writer.getItemProtobuf(proto.Chat.fromBuffer, i);
if (c == null) {
throw Exception('Failed to get chat'); throw Exception('Failed to get chat');
} }
final c = proto.Chat.fromBuffer(cbuf);
if (c.remoteConversationRecordKey == remoteConversationKey) { if (c.remoteConversationRecordKey == remoteConversationKey) {
// Found the right chat // Found the right chat
if (await writer.tryRemoveItem(i) != null) { await writer.removeItem(i);
return c; return c;
}
return null;
} }
} }
return null; return null;
})); }));
// Since followers are synced, we can safetly remove the reconciled // Since followers are synced, we can safetly remove the reconciled
// chat record now // chat record now
if (success && deletedItem != null) { if (deletedItem != null) {
try { try {
await DHTRecordPool.instance.deleteRecord( await DHTRecordPool.instance.deleteRecord(
deletedItem.reconciledChatRecord.toVeilid().recordKey); deletedItem.reconciledChatRecord.toVeilid().recordKey);

View File

@ -177,7 +177,7 @@ class ContactInvitationListCubit
_activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey; _activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
// Remove ContactInvitationRecord from account's list // Remove ContactInvitationRecord from account's list
final (deletedItem, success) = await operateWrite((writer) async { final deletedItem = await operateWrite((writer) async {
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final item = await writer.getItemProtobuf( final item = await writer.getItemProtobuf(
proto.ContactInvitationRecord.fromBuffer, i); proto.ContactInvitationRecord.fromBuffer, i);
@ -186,16 +186,14 @@ class ContactInvitationListCubit
} }
if (item.contactRequestInbox.recordKey.toVeilid() == if (item.contactRequestInbox.recordKey.toVeilid() ==
contactRequestInboxRecordKey) { contactRequestInboxRecordKey) {
if (await writer.tryRemoveItem(i) != null) { await writer.removeItem(i);
return item; return item;
}
return null;
} }
} }
return null; return null;
}); });
if (success && deletedItem != null) { if (deletedItem != null) {
// Delete the contact request inbox // Delete the contact request inbox
final contactRequestInbox = deletedItem.contactRequestInbox.toVeilid(); final contactRequestInbox = deletedItem.contactRequestInbox.toVeilid();
await (await pool.openRecordOwned(contactRequestInbox, await (await pool.openRecordOwned(contactRequestInbox,

View File

@ -70,7 +70,7 @@ class ContactListCubit extends DHTShortArrayCubit<proto.Contact> {
contact.remoteConversationRecordKey.toVeilid(); contact.remoteConversationRecordKey.toVeilid();
// Remove Contact from account's list // Remove Contact from account's list
final (deletedItem, success) = await operateWrite((writer) async { final deletedItem = await operateWrite((writer) async {
for (var i = 0; i < writer.length; i++) { for (var i = 0; i < writer.length; i++) {
final item = await writer.getItemProtobuf(proto.Contact.fromBuffer, i); final item = await writer.getItemProtobuf(proto.Contact.fromBuffer, i);
if (item == null) { if (item == null) {
@ -78,16 +78,14 @@ class ContactListCubit extends DHTShortArrayCubit<proto.Contact> {
} }
if (item.remoteConversationRecordKey == if (item.remoteConversationRecordKey ==
contact.remoteConversationRecordKey) { contact.remoteConversationRecordKey) {
if (await writer.tryRemoveItem(i) != null) { await writer.removeItem(i);
return item; return item;
}
return null;
} }
} }
return null; return null;
}); });
if (success && deletedItem != null) { if (deletedItem != null) {
try { try {
// Make a conversation cubit to manipulate the conversation // Make a conversation cubit to manipulate the conversation
final conversationCubit = ConversationCubit( final conversationCubit = ConversationCubit(

View File

@ -295,7 +295,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
debugName: 'ConversationCubit::initLocalMessages::LocalMessages', debugName: 'ConversationCubit::initLocalMessages::LocalMessages',
parent: localConversationKey, parent: localConversationKey,
crypto: crypto, crypto: crypto,
smplWriter: writer)) writer: writer))
.deleteScope((messages) async => await callback(messages)); .deleteScope((messages) async => await callback(messages));
} }

View File

@ -0,0 +1,10 @@
targets:
$default:
sources:
exclude:
- example/**
builders:
json_serializable:
options:
explicit_to_json: true
field_rename: snake

View File

@ -63,7 +63,7 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
print('adding\n'); print('adding\n');
{ {
final (res, ok) = await arr.operateWrite((w) async { final res = await arr.operateWrite((w) async {
for (var n = 0; n < dataset.length; n++) { for (var n = 0; n < dataset.length; n++) {
print('$n '); print('$n ');
final success = await w.tryAddItem(dataset[n]); final success = await w.tryAddItem(dataset[n]);
@ -71,26 +71,28 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
} }
}); });
expect(res, isNull); expect(res, isNull);
expect(ok, isTrue);
} }
//print('get all\n'); //print('get all\n');
{ {
final dataset2 = await arr.operate((r) async => r.getAllItems()); final dataset2 = await arr.operate((r) async => r.getItemRange(0));
expect(dataset2, equals(dataset)); expect(dataset2, equals(dataset));
} }
{
final dataset3 =
await arr.operate((r) async => r.getItemRange(64, length: 128));
expect(dataset3, equals(dataset.sublist(64, 64 + 128)));
}
//print('clear\n'); //print('clear\n');
{ {
final (res, ok) = await arr.operateWrite((w) async => w.tryClear()); await arr.operateWrite((w) async => w.clear());
expect(res, isTrue);
expect(ok, isTrue);
} }
//print('get all\n'); //print('get all\n');
{ {
final dataset3 = await arr.operate((r) async => r.getAllItems()); final dataset4 = await arr.operate((r) async => r.getItemRange(0));
expect(dataset3, isEmpty); expect(dataset4, isEmpty);
} }
await arr.delete(); await arr.delete();

View File

@ -2,5 +2,7 @@
library dht_support; library dht_support;
export 'src/dht_log/barrel.dart';
export 'src/dht_record/barrel.dart'; export 'src/dht_record/barrel.dart';
export 'src/dht_short_array/barrel.dart'; export 'src/dht_short_array/barrel.dart';
export 'src/interfaces/interfaces.dart';

View File

@ -23,6 +23,18 @@ message DHTData {
uint32 size = 4; uint32 size = 4;
} }
// DHTLog - represents a ring buffer of many elements with append/truncate semantics
// Header in subkey 0 of first key follows this structure
message DHTLog {
// Position of the start of the log (oldest items)
uint32 head = 1;
// Position of the end of the log (newest items)
uint32 tail = 2;
// Stride of each segment of the dhtlog
uint32 stride = 3;
}
// DHTShortArray - represents a re-orderable collection of up to 256 individual elements // DHTShortArray - represents a re-orderable collection of up to 256 individual elements
// Header in subkey 0 of first key follows this structure // Header in subkey 0 of first key follows this structure
// //
@ -50,20 +62,6 @@ message DHTShortArray {
// calculated through iteration // calculated through iteration
} }
// DHTLog - represents a long ring buffer of elements utilizing a multi-level
// indirection table of DHTShortArrays.
message DHTLog {
// Keys to concatenate
repeated veilid.TypedKey keys = 1;
// Back link to another DHTLog further back
veilid.TypedKey back = 2;
// Count of subkeys in all keys in this DHTLog
repeated uint32 subkey_counts = 3;
// Total count of subkeys in all keys in this DHTLog including all backlogs
uint32 total_subkeys = 4;
}
// DataReference // DataReference
// Pointer to data somewhere in Veilid // Pointer to data somewhere in Veilid
// Abstraction over DHTData and BlockStore // Abstraction over DHTData and BlockStore

View File

@ -0,0 +1,2 @@
export 'dht_array.dart';
export 'dht_array_cubit.dart';

View File

@ -0,0 +1,273 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:collection/collection.dart';
import 'package:equatable/equatable.dart';
import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto;
import '../interfaces/dht_append_truncate.dart';
part 'dht_log_spine.dart';
part 'dht_log_read.dart';
part 'dht_log_append.dart';
///////////////////////////////////////////////////////////////////////
/// DHTLog is a ring-buffer queue like data structure with the following
/// operations:
/// * Add elements to the tail
/// * Remove elements from the head
/// The structure has a 'spine' record that acts as an indirection table of
/// DHTShortArray record pointers spread over its subkeys.
/// Subkey 0 of the DHTLog is a head subkey that contains housekeeping data:
/// * The head and tail position of the log
/// - subkeyIdx = pos / recordsPerSubkey
/// - recordIdx = pos % recordsPerSubkey
class DHTLog implements DHTOpenable {
////////////////////////////////////////////////////////////////
// Constructors
DHTLog._({required _DHTLogSpine spine}) : _spine = spine {
_spine.onUpdatedSpine = () {
_watchController?.sink.add(null);
};
}
/// Create a DHTLog
static Future<DHTLog> create(
{required String debugName,
int stride = DHTShortArray.maxElements,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
KeyPair? writer}) async {
assert(stride <= DHTShortArray.maxElements, 'stride too long');
final pool = DHTRecordPool.instance;
late final DHTRecord spineRecord;
if (writer != null) {
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: writer.key, mCnt: spineSubkeys + 1)]);
spineRecord = await pool.createRecord(
debugName: debugName,
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: writer);
} else {
const schema = DHTSchema.dflt(oCnt: spineSubkeys + 1);
spineRecord = await pool.createRecord(
debugName: debugName,
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto);
}
try {
final spine = await _DHTLogSpine.create(
spineRecord: spineRecord, segmentStride: stride);
return DHTLog._(spine: spine);
} on Exception catch (_) {
await spineRecord.close();
await spineRecord.delete();
rethrow;
}
}
static Future<DHTLog> openRead(TypedKey logRecordKey,
{required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
final spineRecord = await DHTRecordPool.instance.openRecordRead(
logRecordKey,
debugName: debugName,
parent: parent,
routingContext: routingContext,
crypto: crypto);
try {
final spine = await _DHTLogSpine.load(spineRecord: spineRecord);
final dhtLog = DHTLog._(spine: spine);
return dhtLog;
} on Exception catch (_) {
await spineRecord.close();
rethrow;
}
}
static Future<DHTLog> openWrite(
TypedKey logRecordKey,
KeyPair writer, {
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
}) async {
final spineRecord = await DHTRecordPool.instance.openRecordWrite(
logRecordKey, writer,
debugName: debugName,
parent: parent,
routingContext: routingContext,
crypto: crypto);
try {
final spine = await _DHTLogSpine.load(spineRecord: spineRecord);
final dhtLog = DHTLog._(spine: spine);
return dhtLog;
} on Exception catch (_) {
await spineRecord.close();
rethrow;
}
}
static Future<DHTLog> openOwned(
OwnedDHTRecordPointer ownedLogRecordPointer, {
required String debugName,
required TypedKey parent,
VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto,
}) =>
openWrite(
ownedLogRecordPointer.recordKey,
ownedLogRecordPointer.owner,
debugName: debugName,
routingContext: routingContext,
parent: parent,
crypto: crypto,
);
////////////////////////////////////////////////////////////////////////////
// DHTOpenable
/// Check if the DHTLog is open
@override
bool get isOpen => _spine.isOpen;
/// Free all resources for the DHTLog
@override
Future<void> close() async {
if (!isOpen) {
return;
}
await _watchController?.close();
_watchController = null;
await _spine.close();
}
/// Free all resources for the DHTLog and delete it from the DHT
/// Will wait until the short array is closed to delete it
@override
Future<void> delete() async {
await _spine.delete();
}
////////////////////////////////////////////////////////////////////////////
// Public API
/// Get the record key for this log
TypedKey get recordKey => _spine.recordKey;
/// Get the record pointer foir this log
OwnedDHTRecordPointer get recordPointer => _spine.recordPointer;
/// Runs a closure allowing read-only access to the log
Future<T?> operate<T>(Future<T?> Function(DHTRandomRead) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
}
return _spine.operate((spine) async {
final reader = _DHTLogRead._(spine);
return closure(reader);
});
}
/// Runs a closure allowing append/truncate access to the log
/// Makes only one attempt to consistently write the changes to the DHT
/// Returns result of the closure if the write could be performed
/// Throws DHTOperateException if the write could not be performed
/// at this time
Future<T> operateAppend<T>(
Future<T> Function(DHTAppendTruncateRandomRead) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
}
return _spine.operateAppend((spine) async {
final writer = _DHTLogAppend._(spine);
return closure(writer);
});
}
/// Runs a closure allowing append/truncate access to the log
/// Will execute the closure multiple times if a consistent write to the DHT
/// is not achieved. Timeout if specified will be thrown as a
/// TimeoutException. The closure should return true if its changes also
/// succeeded, returning false will trigger another eventual consistency
/// attempt.
Future<void> operateAppendEventual(
Future<bool> Function(DHTAppendTruncateRandomRead) closure,
{Duration? timeout}) async {
if (!isOpen) {
throw StateError('log is not open"');
}
return _spine.operateAppendEventual((spine) async {
final writer = _DHTLogAppend._(spine);
return closure(writer);
}, timeout: timeout);
}
/// Listen to and any all changes to the structure of this log
/// regardless of where the changes are coming from
Future<StreamSubscription<void>> listen(
void Function() onChanged,
) {
if (!isOpen) {
throw StateError('log is not open"');
}
return _listenMutex.protect(() async {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
// Cancel watches of head record
await _spine.cancelWatch();
_watchController = null;
}));
});
// Start watching head subkey of the spine
await _spine.watch();
}
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
});
}
////////////////////////////////////////////////////////////////
// Fields
// 56 subkeys * 512 segments * 36 bytes per typedkey =
// 1032192 bytes per record
// 512*36 = 18432 bytes per subkey
// 28672 shortarrays * 256 elements = 7340032 elements
static const spineSubkeys = 56;
static const segmentsPerSubkey = 512;
// Internal representation refreshed from spine record
final _DHTLogSpine _spine;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
// Stream of external changes
StreamController<void>? _watchController;
}

View File

@ -0,0 +1,42 @@
part of 'dht_log.dart';
////////////////////////////////////////////////////////////////////////////
// Append/truncate implementation
class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
_DHTLogAppend._(super.spine) : super._();
@override
Future<bool> tryAppendItem(Uint8List value) async {
// Allocate empty index at the end of the list
final endPos = _spine.length;
_spine.allocateTail(1);
final lookup = await _spine.lookupPosition(endPos);
if (lookup == null) {
throw StateError("can't write to dht log");
}
// Write item to the segment
return lookup.shortArray
.operateWrite((write) async => write.tryWriteItem(lookup.pos, value));
}
@override
Future<void> truncate(int count) async {
final len = _spine.length;
if (count > len) {
count = len;
}
if (count == 0) {
return;
}
if (count < 0) {
throw StateError('can not remove negative items');
}
_spine.releaseHead(count);
}
@override
Future<void> clear() async {
_spine.releaseHead(_spine.length);
}
}

View File

@ -0,0 +1,119 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
// xxx paginate and remember to paginate watches (could use short array cubit as a subcubit here?)
// @immutable
// class DHTArrayElementState<T> extends Equatable {
// const DHTArrayElementState(
// {required this.value, required this.isOffline});
// final T value;
// final bool isOffline;
// @override
// List<Object?> get props => [value, isOffline];
// }
// typedef DHTArrayState<T> = AsyncValue<IList<DHTArrayElementState<T>>>;
// typedef DHTArrayBusyState<T> = BlocBusyState<DHTArrayState<T>>;
// class DHTArrayCubit<T> extends Cubit<DHTArrayBusyState<T>>
// with BlocBusyWrapper<DHTArrayState<T>> {
// DHTArrayCubit({
// required Future<DHTArray> Function() open,
// required T Function(List<int> data) decodeElement,
// }) : _decodeElement = decodeElement,
// super(const BlocBusyState(AsyncValue.loading())) {
// _initWait.add(() async {
// // Open DHT record
// _array = await open();
// _wantsCloseRecord = true;
// // Make initial state update
// await _refreshNoWait();
// _subscription = await _array.listen(_update);
// });
// }
// Future<void> refresh({bool forceRefresh = false}) async {
// await _initWait();
// await _refreshNoWait(forceRefresh: forceRefresh);
// }
// Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
// busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh));
// Future<void> _refreshInner(void Function(DHTShortArrayState<T>) emit,
// {bool forceRefresh = false}) async {
// try {
// final newState = await _shortArray.operate((reader) async {
// final offlinePositions = await reader.getOfflinePositions();
// final allItems = (await reader.getAllItems(forceRefresh: forceRefresh))
// ?.indexed
// .map((x) => DHTShortArrayElementState(
// value: _decodeElement(x.$2),
// isOffline: offlinePositions.contains(x.$1)))
// .toIList();
// return allItems;
// });
// if (newState != null) {
// emit(AsyncValue.data(newState));
// }
// } on Exception catch (e) {
// emit(AsyncValue.error(e));
// }
// }
// void _update() {
// // Run at most one background update process
// // Because this is async, we could get an update while we're
// // still processing the last one. Only called after init future has run
// // so we dont have to wait for that here.
// _sspUpdate.busyUpdate<T, DHTShortArrayState<T>>(
// busy, (emit) async => _refreshInner(emit));
// }
// @override
// Future<void> close() async {
// await _initWait();
// await _subscription?.cancel();
// _subscription = null;
// if (_wantsCloseRecord) {
// await _shortArray.close();
// }
// await super.close();
// }
// Future<R?> operate<R>(Future<R?> Function(DHTShortArrayRead) closure) async {
// await _initWait();
// return _shortArray.operate(closure);
// }
// Future<(R?, bool)> operateWrite<R>(
// Future<R?> Function(DHTShortArrayWrite) closure) async {
// await _initWait();
// return _shortArray.operateWrite(closure);
// }
// Future<void> operateWriteEventual(
// Future<bool> Function(DHTShortArrayWrite) closure,
// {Duration? timeout}) async {
// await _initWait();
// return _shortArray.operateWriteEventual(closure, timeout: timeout);
// }
// final WaitSet<void> _initWait = WaitSet();
// late final DHTShortArray _shortArray;
// final T Function(List<int> data) _decodeElement;
// StreamSubscription<void>? _subscription;
// bool _wantsCloseRecord = false;
// final _sspUpdate = SingleStatelessProcessor();
// }

View File

@ -0,0 +1,103 @@
part of 'dht_log.dart';
////////////////////////////////////////////////////////////////////////////
// Reader-only implementation
class _DHTLogRead implements DHTRandomRead {
_DHTLogRead._(_DHTLogSpine spine) : _spine = spine;
@override
int get length => _spine.length;
@override
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
if (pos < 0 || pos >= length) {
throw IndexError.withLength(pos, length);
}
final lookup = await _spine.lookupPosition(pos);
if (lookup == null) {
return null;
}
return lookup.shortArray.operate(
(read) => read.getItem(lookup.pos, forceRefresh: forceRefresh));
}
(int, int) _clampStartLen(int start, int? len) {
len ??= _spine.length;
if (start < 0) {
throw IndexError.withLength(start, _spine.length);
}
if (start > _spine.length) {
throw IndexError.withLength(start, _spine.length);
}
if ((len + start) > _spine.length) {
len = _spine.length - start;
}
return (start, len);
}
@override
Future<List<Uint8List>?> getItemRange(int start,
{int? length, bool forceRefresh = false}) async {
final out = <Uint8List>[];
(start, length) = _clampStartLen(start, length);
final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map(
(chunk) => chunk
.map((pos) => getItem(pos + start, forceRefresh: forceRefresh)));
for (final chunk in chunks) {
final elems = await chunk.wait;
if (elems.contains(null)) {
return null;
}
out.addAll(elems.cast<Uint8List>());
}
return out;
}
@override
Future<Set<int>> getOfflinePositions() async {
final positionOffline = <int>{};
// Iterate positions backward from most recent
for (var pos = _spine.length - 1; pos >= 0; pos--) {
final lookup = await _spine.lookupPosition(pos);
if (lookup == null) {
throw StateError('Unable to look up position');
}
// Check each segment for offline positions
var foundOffline = false;
await lookup.shortArray.operate((read) async {
final segmentOffline = await read.getOfflinePositions();
// For each shortarray segment go through their segment positions
// in reverse order and see if they are offline
for (var segmentPos = lookup.pos;
segmentPos >= 0 && pos >= 0;
segmentPos--, pos--) {
// If the position in the segment is offline, then
// mark the position in the log as offline
if (segmentOffline.contains(segmentPos)) {
positionOffline.add(pos);
foundOffline = true;
}
}
});
// If we found nothing offline in this segment then we can stop
if (!foundOffline) {
break;
}
}
return positionOffline;
}
////////////////////////////////////////////////////////////////////////////
// Fields
final _DHTLogSpine _spine;
}

View File

@ -0,0 +1,527 @@
part of 'dht_log.dart';
class DHTLogPositionLookup {
const DHTLogPositionLookup({required this.shortArray, required this.pos});
final DHTShortArray shortArray;
final int pos;
}
class _DHTLogSegmentLookup extends Equatable {
const _DHTLogSegmentLookup({required this.subkey, required this.segment});
final int subkey;
final int segment;
@override
List<Object?> get props => [subkey, segment];
}
class _DHTLogSpine {
_DHTLogSpine._(
{required DHTRecord spineRecord,
required int head,
required int tail,
required int stride})
: _spineRecord = spineRecord,
_head = head,
_tail = tail,
_segmentStride = stride,
_spineCache = [];
// Create a new spine record and push it to the network
static Future<_DHTLogSpine> create(
{required DHTRecord spineRecord, required int segmentStride}) async {
// Construct new spinehead
final spine = _DHTLogSpine._(
spineRecord: spineRecord, head: 0, tail: 0, stride: segmentStride);
// Write new spine head record to the network
await spine.operate((spine) async {
final success = await spine.writeSpineHead();
assert(success, 'false return should never happen on create');
});
return spine;
}
// Pull the latest or updated copy of the spine head record from the network
static Future<_DHTLogSpine> load({required DHTRecord spineRecord}) async {
// Get an updated spine head record copy if one exists
final spineHead = await spineRecord.getProtobuf(proto.DHTLog.fromBuffer,
subkey: 0, refreshMode: DHTRecordRefreshMode.refresh);
if (spineHead == null) {
throw StateError('spine head missing during refresh');
}
return _DHTLogSpine._(
spineRecord: spineRecord,
head: spineHead.head,
tail: spineHead.tail,
stride: spineHead.stride);
}
proto.DHTLog _toProto() {
assert(_spineMutex.isLocked, 'should be in mutex here');
final logHead = proto.DHTLog()
..head = _head
..tail = _tail
..stride = _segmentStride;
return logHead;
}
Future<void> close() async {
await _spineMutex.protect(() async {
if (!isOpen) {
return;
}
final futures = <Future<void>>[_spineRecord.close()];
for (final (_, sc) in _spineCache) {
futures.add(sc.close());
}
await Future.wait(futures);
});
}
Future<void> delete() async {
await _spineMutex.protect(() async {
final pool = DHTRecordPool.instance;
final futures = <Future<void>>[pool.deleteRecord(_spineRecord.key)];
for (final (_, sc) in _spineCache) {
futures.add(sc.delete());
}
await Future.wait(futures);
});
}
Future<T> operate<T>(Future<T> Function(_DHTLogSpine) closure) async =>
// ignore: prefer_expression_function_bodies
_spineMutex.protect(() async {
return closure(this);
});
Future<T> operateAppend<T>(Future<T> Function(_DHTLogSpine) closure) async =>
_spineMutex.protect(() async {
final oldHead = _head;
final oldTail = _tail;
try {
final out = await closure(this);
// Write head assuming it has been changed
if (!await writeSpineHead()) {
// Failed to write head means head got overwritten so write should
// be considered failed
throw DHTExceptionTryAgain();
}
onUpdatedSpine?.call();
return out;
} on Exception {
// Exception means state needs to be reverted
_head = oldHead;
_tail = oldTail;
rethrow;
}
});
Future<void> operateAppendEventual(
Future<bool> Function(_DHTLogSpine) closure,
{Duration? timeout}) async {
final timeoutTs = timeout == null
? null
: Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout));
await _spineMutex.protect(() async {
late int oldHead;
late int oldTail;
try {
// Iterate until we have a successful element and head write
do {
// Save off old values each pass of writeSpineHead because the head
// will have changed
oldHead = _head;
oldTail = _tail;
// Try to do the element write
while (true) {
if (timeoutTs != null) {
final now = Veilid.instance.now();
if (now >= timeoutTs) {
throw TimeoutException('timeout reached');
}
}
if (await closure(this)) {
break;
}
// Failed to write in closure resets state
_head = oldHead;
_tail = oldTail;
}
// Try to do the head write
} while (!await writeSpineHead());
onUpdatedSpine?.call();
} on Exception {
// Exception means state needs to be reverted
_head = oldHead;
_tail = oldTail;
rethrow;
}
});
}
/// Serialize and write out the current spine head subkey, possibly updating
/// it if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> writeSpineHead() async {
assert(_spineMutex.isLocked, 'should be in mutex here');
final headBuffer = _toProto().writeToBuffer();
final existingData = await _spineRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _updateHead(proto.DHTLog.fromBuffer(existingData));
return false;
}
return true;
}
/// Validate a new spine head subkey that has come in from the network
Future<void> _updateHead(proto.DHTLog spineHead) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
_head = spineHead.head;
_tail = spineHead.tail;
}
/////////////////////////////////////////////////////////////////////////////
// Spine element management
static final Uint8List _emptySegmentKey =
Uint8List.fromList(List.filled(TypedKey.decodedLength<TypedKey>(), 0));
static Uint8List _makeEmptySubkey() => Uint8List.fromList(List.filled(
DHTLog.segmentsPerSubkey * TypedKey.decodedLength<TypedKey>(), 0));
static TypedKey? _getSegmentKey(Uint8List subkeyData, int segment) {
final decodedLength = TypedKey.decodedLength<TypedKey>();
final segmentKeyBytes = subkeyData.sublist(
decodedLength * segment, (decodedLength + 1) * segment);
if (segmentKeyBytes.equals(_emptySegmentKey)) {
return null;
}
return TypedKey.fromBytes(segmentKeyBytes);
}
static void _setSegmentKey(
Uint8List subkeyData, int segment, TypedKey? segmentKey) {
final decodedLength = TypedKey.decodedLength<TypedKey>();
late final Uint8List segmentKeyBytes;
if (segmentKey == null) {
segmentKeyBytes = _emptySegmentKey;
} else {
segmentKeyBytes = segmentKey.decode();
}
subkeyData.setRange(decodedLength * segment, (decodedLength + 1) * segment,
segmentKeyBytes);
}
Future<DHTShortArray> _getOrCreateSegmentInner(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
assert(_spineRecord.writer != null, 'should be writable');
// Lookup what subkey and segment subrange has this position's segment
// shortarray
final l = lookupSegment(segmentNumber);
final subkey = l.subkey;
final segment = l.segment;
var subkeyData = await _spineRecord.get(subkey: subkey);
subkeyData ??= _makeEmptySubkey();
while (true) {
final segmentKey = _getSegmentKey(subkeyData!, segment);
if (segmentKey == null) {
// Create a shortarray segment
final segmentRec = await DHTShortArray.create(
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
stride: _segmentStride,
crypto: _spineRecord.crypto,
parent: _spineRecord.key,
routingContext: _spineRecord.routingContext,
writer: _spineRecord.writer,
);
var success = false;
try {
// Write it back to the spine record
_setSegmentKey(subkeyData, segment, segmentRec.recordKey);
subkeyData =
await _spineRecord.tryWriteBytes(subkeyData, subkey: subkey);
// If the write was successful then we're done
if (subkeyData == null) {
// Return it
success = true;
return segmentRec;
}
} finally {
if (!success) {
await segmentRec.close();
await segmentRec.delete();
}
}
} else {
// Open a shortarray segment
final segmentRec = await DHTShortArray.openWrite(
segmentKey,
_spineRecord.writer!,
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
crypto: _spineRecord.crypto,
parent: _spineRecord.key,
routingContext: _spineRecord.routingContext,
);
return segmentRec;
}
// Loop if we need to try again with the new data from the network
}
}
Future<DHTShortArray?> _getSegmentInner(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// Lookup what subkey and segment subrange has this position's segment
// shortarray
final l = lookupSegment(segmentNumber);
final subkey = l.subkey;
final segment = l.segment;
final subkeyData = await _spineRecord.get(subkey: subkey);
if (subkeyData == null) {
return null;
}
final segmentKey = _getSegmentKey(subkeyData, segment);
if (segmentKey == null) {
return null;
}
// Open a shortarray segment
final segmentRec = await DHTShortArray.openRead(
segmentKey,
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
crypto: _spineRecord.crypto,
parent: _spineRecord.key,
routingContext: _spineRecord.routingContext,
);
return segmentRec;
}
Future<DHTShortArray> getOrCreateSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// See if we already have this in the cache
for (var i = 0; i < _spineCache.length; i++) {
if (_spineCache[i].$1 == segmentNumber) {
// Touch the element
final x = _spineCache.removeAt(i);
_spineCache.add(x);
// Return the shortarray for this position
return x.$2;
}
}
// If we don't have it in the cache, get/create it and then cache it
final segment = await _getOrCreateSegmentInner(segmentNumber);
_spineCache.add((segmentNumber, segment));
if (_spineCache.length > _spineCacheLength) {
// Trim the LRU cache
_spineCache.removeAt(0);
}
return segment;
}
Future<DHTShortArray?> getSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// See if we already have this in the cache
for (var i = 0; i < _spineCache.length; i++) {
if (_spineCache[i].$1 == segmentNumber) {
// Touch the element
final x = _spineCache.removeAt(i);
_spineCache.add(x);
// Return the shortarray for this position
return x.$2;
}
}
// If we don't have it in the cache, get it and then cache it
final segment = await _getSegmentInner(segmentNumber);
if (segment == null) {
return null;
}
_spineCache.add((segmentNumber, segment));
if (_spineCache.length > _spineCacheLength) {
// Trim the LRU cache
_spineCache.removeAt(0);
}
return segment;
}
_DHTLogSegmentLookup lookupSegment(int segmentNumber) {
assert(_spineMutex.isLocked, 'should be in mutex here');
if (segmentNumber < 0) {
throw IndexError.withLength(
segmentNumber, DHTLog.spineSubkeys * DHTLog.segmentsPerSubkey);
}
final subkey = segmentNumber ~/ DHTLog.segmentsPerSubkey;
if (subkey >= DHTLog.spineSubkeys) {
throw IndexError.withLength(
segmentNumber, DHTLog.spineSubkeys * DHTLog.segmentsPerSubkey);
}
final segment = segmentNumber % DHTLog.segmentsPerSubkey;
return _DHTLogSegmentLookup(subkey: subkey + 1, segment: segment);
}
///////////////////////////////////////////
// API for public interfaces
Future<DHTLogPositionLookup?> lookupPosition(int pos) async {
assert(_spineMutex.isLocked, 'should be locked');
// Check if our position is in bounds
final endPos = length;
if (pos < 0 || pos >= endPos) {
throw IndexError.withLength(pos, endPos);
}
// Calculate absolute position, ring-buffer style
final absolutePosition = (_head + pos) % _positionLimit;
// Determine the segment number and position within the segment
final segmentNumber = absolutePosition ~/ DHTShortArray.maxElements;
final segmentPos = absolutePosition % DHTShortArray.maxElements;
// Get the segment shortArray
final shortArray = (_spineRecord.writer == null)
? await getSegment(segmentNumber)
: await getOrCreateSegment(segmentNumber);
if (shortArray == null) {
return null;
}
return DHTLogPositionLookup(shortArray: shortArray, pos: segmentPos);
}
void allocateTail(int count) {
assert(_spineMutex.isLocked, 'should be locked');
final currentLength = length;
if (count <= 0) {
throw StateError('count should be > 0');
}
if (currentLength + count >= _positionLimit) {
throw StateError('ring buffer overflow');
}
_tail = (_tail + count) % _positionLimit;
}
void releaseHead(int count) {
assert(_spineMutex.isLocked, 'should be locked');
final currentLength = length;
if (count <= 0) {
throw StateError('count should be > 0');
}
if (count > currentLength) {
throw StateError('ring buffer underflow');
}
_head = (_head + count) % _positionLimit;
}
/////////////////////////////////////////////////////////////////////////////
// Watch For Updates
// Watch head for changes
Future<void> watch() async {
// This will update any existing watches if necessary
try {
await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
// Update changes to the head record
// Don't watch for local changes because this class already handles
// notifying listeners and knows when it makes local changes
_subscription ??=
await _spineRecord.listen(localChanges: false, _onSpineChanged);
} on Exception {
// If anything fails, try to cancel the watches
await cancelWatch();
rethrow;
}
}
// Stop watching for changes to head and linked records
Future<void> cancelWatch() async {
await _spineRecord.cancelWatch();
await _subscription?.cancel();
_subscription = null;
}
// Called when the log changes online and we find out from a watch
// but not when we make a change locally
Future<void> _onSpineChanged(
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys) async {
// If head record subkey zero changes, then the layout
// of the dhtshortarray has changed
if (data == null) {
throw StateError('spine head changed without data');
}
if (record.key != _spineRecord.key ||
subkeys.length != 1 ||
subkeys[0] != ValueSubkeyRange.single(0)) {
throw StateError('watch returning wrong subkey range');
}
// Decode updated head
final headData = proto.DHTLog.fromBuffer(data);
// Then update the head record
await _spineMutex.protect(() async {
await _updateHead(headData);
onUpdatedSpine?.call();
});
}
////////////////////////////////////////////////////////////////////////////
TypedKey get recordKey => _spineRecord.key;
OwnedDHTRecordPointer get recordPointer => _spineRecord.ownedDHTRecordPointer;
int get length =>
(_tail < _head) ? (_positionLimit - _head) + _tail : _tail - _head;
bool get isOpen => _spineRecord.isOpen;
static const _positionLimit = DHTLog.segmentsPerSubkey *
DHTLog.spineSubkeys *
DHTShortArray.maxElements;
// Spine head mutex to ensure we keep the representation valid
final Mutex _spineMutex = Mutex();
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external spine head changes
void Function()? onUpdatedSpine;
// Spine DHT record
final DHTRecord _spineRecord;
// Position of the start of the log (oldest items)
int _head;
// Position of the end of the log (newest items)
int _tail;
// LRU cache of DHT spine elements accessed recently
// Pair of position and associated shortarray segment
final List<(int, DHTShortArray)> _spineCache;
static const int _spineCacheLength = 3;
// Segment stride to use for spine elements
final int _segmentStride;
}

View File

@ -38,7 +38,8 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
final Uint8List data; final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low; final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey || updatedata == null) { if (firstSubkey != defaultSubkey || updatedata == null) {
final maybeData = await record.get(forceRefresh: true); final maybeData =
await record.get(refreshMode: DHTRecordRefreshMode.refresh);
if (maybeData == null) { if (maybeData == null) {
return null; return null;
} }

View File

@ -13,9 +13,22 @@ class DHTRecordWatchChange extends Equatable {
List<Object?> get props => [local, data, subkeys]; List<Object?> get props => [local, data, subkeys];
} }
/// Refresh mode for DHT record 'get'
enum DHTRecordRefreshMode {
/// Return existing subkey values if they exist locally already
existing,
/// Always check the network for a newer subkey value
refresh,
/// Always check the network for a newer subkey value but only
/// return that value if its sequence number is newer than the local value
refreshOnlyUpdates,
}
///////////////////////////////////////////////// /////////////////////////////////////////////////
class DHTRecord { class DHTRecord implements DHTOpenable {
DHTRecord._( DHTRecord._(
{required VeilidRoutingContext routingContext, {required VeilidRoutingContext routingContext,
required SharedDHTRecordData sharedDHTRecordData, required SharedDHTRecordData sharedDHTRecordData,
@ -30,20 +43,33 @@ class DHTRecord {
_open = true, _open = true,
_sharedDHTRecordData = sharedDHTRecordData; _sharedDHTRecordData = sharedDHTRecordData;
final SharedDHTRecordData _sharedDHTRecordData; ////////////////////////////////////////////////////////////////////////////
final VeilidRoutingContext _routingContext; // DHTOpenable
final int _defaultSubkey;
final KeyPair? _writer;
final DHTRecordCrypto _crypto;
final String debugName;
bool _open; /// Check if the DHTRecord is open
@internal @override
StreamController<DHTRecordWatchChange>? watchController; bool get isOpen => _open;
@internal
WatchState? watchState;
int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey; /// Free all resources for the DHTRecord
@override
Future<void> close() async {
if (!_open) {
return;
}
await watchController?.close();
await DHTRecordPool.instance._recordClosed(this);
_open = false;
}
/// Free all resources for the DHTRecord and delete it from the DHT
/// Will wait until the record is closed to delete it
@override
Future<void> delete() async {
await DHTRecordPool.instance.deleteRecord(key);
}
////////////////////////////////////////////////////////////////////////////
// Public API
VeilidRoutingContext get routingContext => _routingContext; VeilidRoutingContext get routingContext => _routingContext;
TypedKey get key => _sharedDHTRecordData.recordDescriptor.key; TypedKey get key => _sharedDHTRecordData.recordDescriptor.key;
@ -57,64 +83,30 @@ class DHTRecord {
DHTRecordCrypto get crypto => _crypto; DHTRecordCrypto get crypto => _crypto;
OwnedDHTRecordPointer get ownedDHTRecordPointer => OwnedDHTRecordPointer get ownedDHTRecordPointer =>
OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!); OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!);
bool get isOpen => _open; int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey;
Future<void> close() async {
if (!_open) {
return;
}
await watchController?.close();
await DHTRecordPool.instance._recordClosed(this);
_open = false;
}
Future<T> scope<T>(Future<T> Function(DHTRecord) scopeFunction) async {
try {
return await scopeFunction(this);
} finally {
await close();
}
}
Future<T> deleteScope<T>(Future<T> Function(DHTRecord) scopeFunction) async {
try {
final out = await scopeFunction(this);
if (_open) {
await close();
}
return out;
} on Exception catch (_) {
if (_open) {
await close();
}
await DHTRecordPool.instance.deleteRecord(key);
rethrow;
}
}
Future<T> maybeDeleteScope<T>(
bool delete, Future<T> Function(DHTRecord) scopeFunction) async {
if (delete) {
return deleteScope(scopeFunction);
} else {
return scope(scopeFunction);
}
}
/// Get a subkey value from this record.
/// Returns the most recent value data for this subkey or null if this subkey
/// has not yet been written to.
/// * 'refreshMode' determines whether or not to return a locally existing
/// value or always check the network
/// * 'outSeqNum' optionally returns the sequence number of the value being
/// returned if one was returned.
Future<Uint8List?> get( Future<Uint8List?> get(
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
bool forceRefresh = false, DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing,
bool onlyUpdates = false,
Output<int>? outSeqNum}) async { Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey); subkey = subkeyOrDefault(subkey);
final valueData = await _routingContext.getDHTValue(key, subkey, final valueData = await _routingContext.getDHTValue(key, subkey,
forceRefresh: forceRefresh); forceRefresh: refreshMode != DHTRecordRefreshMode.existing);
if (valueData == null) { if (valueData == null) {
return null; return null;
} }
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
if (onlyUpdates && lastSeq != null && valueData.seq <= lastSeq) { if (refreshMode == DHTRecordRefreshMode.refreshOnlyUpdates &&
lastSeq != null &&
valueData.seq <= lastSeq) {
return null; return null;
} }
final out = (crypto ?? _crypto).decrypt(valueData.data, subkey); final out = (crypto ?? _crypto).decrypt(valueData.data, subkey);
@ -125,17 +117,23 @@ class DHTRecord {
return out; return out;
} }
/// Get a subkey value from this record.
/// Process the record returned with a JSON unmarshal function 'fromJson'.
/// Returns the most recent value data for this subkey or null if this subkey
/// has not yet been written to.
/// * 'refreshMode' determines whether or not to return a locally existing
/// value or always check the network
/// * 'outSeqNum' optionally returns the sequence number of the value being
/// returned if one was returned.
Future<T?> getJson<T>(T Function(dynamic) fromJson, Future<T?> getJson<T>(T Function(dynamic) fromJson,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
bool forceRefresh = false, DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing,
bool onlyUpdates = false,
Output<int>? outSeqNum}) async { Output<int>? outSeqNum}) async {
final data = await get( final data = await get(
subkey: subkey, subkey: subkey,
crypto: crypto, crypto: crypto,
forceRefresh: forceRefresh, refreshMode: refreshMode,
onlyUpdates: onlyUpdates,
outSeqNum: outSeqNum); outSeqNum: outSeqNum);
if (data == null) { if (data == null) {
return null; return null;
@ -143,18 +141,25 @@ class DHTRecord {
return jsonDecodeBytes(fromJson, data); return jsonDecodeBytes(fromJson, data);
} }
/// Get a subkey value from this record.
/// Process the record returned with a protobuf unmarshal
/// function 'fromBuffer'.
/// Returns the most recent value data for this subkey or null if this subkey
/// has not yet been written to.
/// * 'refreshMode' determines whether or not to return a locally existing
/// value or always check the network
/// * 'outSeqNum' optionally returns the sequence number of the value being
/// returned if one was returned.
Future<T?> getProtobuf<T extends GeneratedMessage>( Future<T?> getProtobuf<T extends GeneratedMessage>(
T Function(List<int> i) fromBuffer, T Function(List<int> i) fromBuffer,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
bool forceRefresh = false, DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing,
bool onlyUpdates = false,
Output<int>? outSeqNum}) async { Output<int>? outSeqNum}) async {
final data = await get( final data = await get(
subkey: subkey, subkey: subkey,
crypto: crypto, crypto: crypto,
forceRefresh: forceRefresh, refreshMode: refreshMode,
onlyUpdates: onlyUpdates,
outSeqNum: outSeqNum); outSeqNum: outSeqNum);
if (data == null) { if (data == null) {
return null; return null;
@ -162,6 +167,9 @@ class DHTRecord {
return fromBuffer(data.toList()); return fromBuffer(data.toList());
} }
/// Attempt to write a byte buffer to a DHTRecord subkey
/// If a newer value was found on the network, it is returned
/// If the value was succesfully written, null is returned
Future<Uint8List?> tryWriteBytes(Uint8List newValue, Future<Uint8List?> tryWriteBytes(Uint8List newValue,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
@ -211,6 +219,9 @@ class DHTRecord {
return decryptedNewValue; return decryptedNewValue;
} }
/// Attempt to write a byte buffer to a DHTRecord subkey
/// If a newer value was found on the network, another attempt
/// will be made to write the subkey until this succeeds
Future<void> eventualWriteBytes(Uint8List newValue, Future<void> eventualWriteBytes(Uint8List newValue,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
@ -256,6 +267,11 @@ class DHTRecord {
} }
} }
/// Attempt to write a byte buffer to a DHTRecord subkey
/// If a newer value was found on the network, another attempt
/// will be made to write the subkey until this succeeds
/// Each attempt to write the value calls an update function with the
/// old value to determine what new value should be attempted for that write.
Future<void> eventualUpdateBytes( Future<void> eventualUpdateBytes(
Future<Uint8List> Function(Uint8List? oldValue) update, Future<Uint8List> Function(Uint8List? oldValue) update,
{int subkey = -1, {int subkey = -1,
@ -281,6 +297,7 @@ class DHTRecord {
} while (oldValue != null); } while (oldValue != null);
} }
/// Like 'tryWriteBytes' but with JSON marshal/unmarshal of the value
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue, Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
@ -298,6 +315,7 @@ class DHTRecord {
return jsonDecodeBytes(fromJson, out); return jsonDecodeBytes(fromJson, out);
}); });
/// Like 'tryWriteBytes' but with protobuf marshal/unmarshal of the value
Future<T?> tryWriteProtobuf<T extends GeneratedMessage>( Future<T?> tryWriteProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, T newValue, T Function(List<int>) fromBuffer, T newValue,
{int subkey = -1, {int subkey = -1,
@ -316,6 +334,7 @@ class DHTRecord {
return fromBuffer(out); return fromBuffer(out);
}); });
/// Like 'eventualWriteBytes' but with JSON marshal/unmarshal of the value
Future<void> eventualWriteJson<T>(T newValue, Future<void> eventualWriteJson<T>(T newValue,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
@ -324,6 +343,7 @@ class DHTRecord {
eventualWriteBytes(jsonEncodeBytes(newValue), eventualWriteBytes(jsonEncodeBytes(newValue),
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
/// Like 'eventualWriteBytes' but with protobuf marshal/unmarshal of the value
Future<void> eventualWriteProtobuf<T extends GeneratedMessage>(T newValue, Future<void> eventualWriteProtobuf<T extends GeneratedMessage>(T newValue,
{int subkey = -1, {int subkey = -1,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
@ -332,6 +352,7 @@ class DHTRecord {
eventualWriteBytes(newValue.writeToBuffer(), eventualWriteBytes(newValue.writeToBuffer(),
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
/// Like 'eventualUpdateBytes' but with JSON marshal/unmarshal of the value
Future<void> eventualUpdateJson<T>( Future<void> eventualUpdateJson<T>(
T Function(dynamic) fromJson, Future<T> Function(T?) update, T Function(dynamic) fromJson, Future<T> Function(T?) update,
{int subkey = -1, {int subkey = -1,
@ -341,6 +362,7 @@ class DHTRecord {
eventualUpdateBytes(jsonUpdate(fromJson, update), eventualUpdateBytes(jsonUpdate(fromJson, update),
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
/// Like 'eventualUpdateBytes' but with protobuf marshal/unmarshal of the value
Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>( Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, Future<T> Function(T?) update, T Function(List<int>) fromBuffer, Future<T> Function(T?) update,
{int subkey = -1, {int subkey = -1,
@ -350,6 +372,8 @@ class DHTRecord {
eventualUpdateBytes(protobufUpdate(fromBuffer, update), eventualUpdateBytes(protobufUpdate(fromBuffer, update),
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
/// Watch a subkey range of this DHT record for changes
/// Takes effect on the next DHTRecordPool tick
Future<void> watch( Future<void> watch(
{List<ValueSubkeyRange>? subkeys, {List<ValueSubkeyRange>? subkeys,
Timestamp? expiration, Timestamp? expiration,
@ -363,6 +387,13 @@ class DHTRecord {
} }
} }
/// Register a callback for changes made on this this DHT record.
/// You must 'watch' the record as well as listen to it in order for this
/// call back to be called.
/// * 'localChanges' also enables calling the callback if changed are made
/// locally, otherwise only changes seen from the network itself are
/// reported
///
Future<StreamSubscription<DHTRecordWatchChange>> listen( Future<StreamSubscription<DHTRecordWatchChange>> listen(
Future<void> Function( Future<void> Function(
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys) DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys)
@ -405,6 +436,8 @@ class DHTRecord {
}); });
} }
/// Stop watching this record for changes
/// Takes effect on the next DHTRecordPool tick
Future<void> cancelWatch() async { Future<void> cancelWatch() async {
// Tear down watch requirements // Tear down watch requirements
if (watchState != null) { if (watchState != null) {
@ -413,11 +446,15 @@ class DHTRecord {
} }
} }
/// Return the inspection state of a set of subkeys of the DHTRecord
/// See Veilid's 'inspectDHTRecord' call for details on how this works
Future<DHTRecordReport> inspect( Future<DHTRecordReport> inspect(
{List<ValueSubkeyRange>? subkeys, {List<ValueSubkeyRange>? subkeys,
DHTReportScope scope = DHTReportScope.local}) => DHTReportScope scope = DHTReportScope.local}) =>
_routingContext.inspectDHTRecord(key, subkeys: subkeys, scope: scope); _routingContext.inspectDHTRecord(key, subkeys: subkeys, scope: scope);
//////////////////////////////////////////////////////////////////////////
void _addValueChange( void _addValueChange(
{required bool local, {required bool local,
required Uint8List? data, required Uint8List? data,
@ -458,4 +495,19 @@ class DHTRecord {
_addValueChange( _addValueChange(
local: false, data: update.value?.data, subkeys: update.subkeys); local: false, data: update.value?.data, subkeys: update.subkeys);
} }
//////////////////////////////////////////////////////////////
final SharedDHTRecordData _sharedDHTRecordData;
final VeilidRoutingContext _routingContext;
final int _defaultSubkey;
final KeyPair? _writer;
final DHTRecordCrypto _crypto;
final String debugName;
bool _open;
@internal
StreamController<DHTRecordWatchChange>? watchController;
@internal
WatchState? watchState;
} }

View File

@ -93,7 +93,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
for (final skr in subkeys) { for (final skr in subkeys) {
for (var sk = skr.low; sk <= skr.high; sk++) { for (var sk = skr.low; sk <= skr.high; sk++) {
final data = await _record.get( final data = await _record.get(
subkey: sk, forceRefresh: true, onlyUpdates: true); subkey: sk, refreshMode: DHTRecordRefreshMode.refreshOnlyUpdates);
if (data != null) { if (data != null) {
final newState = await _stateFunction(_record, updateSubkeys, data); final newState = await _stateFunction(_record, updateSubkeys, data);
if (newState != null) { if (newState != null) {

View File

@ -3,7 +3,6 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart'; import 'package:async_tools/async_tools.dart';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart'; import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto; import '../../proto/proto.dart' as proto;
@ -14,7 +13,7 @@ part 'dht_short_array_write.dart';
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
class DHTShortArray { class DHTShortArray implements DHTOpenable {
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Constructors // Constructors
@ -34,22 +33,22 @@ class DHTShortArray {
VeilidRoutingContext? routingContext, VeilidRoutingContext? routingContext,
TypedKey? parent, TypedKey? parent,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
KeyPair? smplWriter}) async { KeyPair? writer}) async {
assert(stride <= maxElements, 'stride too long'); assert(stride <= maxElements, 'stride too long');
final pool = DHTRecordPool.instance; final pool = DHTRecordPool.instance;
late final DHTRecord dhtRecord; late final DHTRecord dhtRecord;
if (smplWriter != null) { if (writer != null) {
final schema = DHTSchema.smpl( final schema = DHTSchema.smpl(
oCnt: 0, oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]); members: [DHTSchemaMember(mKey: writer.key, mCnt: stride + 1)]);
dhtRecord = await pool.createRecord( dhtRecord = await pool.createRecord(
debugName: debugName, debugName: debugName,
parent: parent, parent: parent,
routingContext: routingContext, routingContext: routingContext,
schema: schema, schema: schema,
crypto: crypto, crypto: crypto,
writer: smplWriter); writer: writer);
} else { } else {
final schema = DHTSchema.dflt(oCnt: stride + 1); final schema = DHTSchema.dflt(oCnt: stride + 1);
dhtRecord = await pool.createRecord( dhtRecord = await pool.createRecord(
@ -120,15 +119,15 @@ class DHTShortArray {
} }
static Future<DHTShortArray> openOwned( static Future<DHTShortArray> openOwned(
OwnedDHTRecordPointer ownedDHTRecordPointer, { OwnedDHTRecordPointer ownedShortArrayRecordPointer, {
required String debugName, required String debugName,
required TypedKey parent, required TypedKey parent,
VeilidRoutingContext? routingContext, VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto, DHTRecordCrypto? crypto,
}) => }) =>
openWrite( openWrite(
ownedDHTRecordPointer.recordKey, ownedShortArrayRecordPointer.recordKey,
ownedDHTRecordPointer.owner, ownedShortArrayRecordPointer.owner,
debugName: debugName, debugName: debugName,
routingContext: routingContext, routingContext: routingContext,
parent: parent, parent: parent,
@ -136,18 +135,14 @@ class DHTShortArray {
); );
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
// Public API // DHTOpenable
/// Get the record key for this shortarray
TypedKey get recordKey => _head.recordKey;
/// Get the record pointer foir this shortarray
OwnedDHTRecordPointer get recordPointer => _head.recordPointer;
/// Check if the shortarray is open /// Check if the shortarray is open
@override
bool get isOpen => _head.isOpen; bool get isOpen => _head.isOpen;
/// Free all resources for the DHTShortArray /// Free all resources for the DHTShortArray
@override
Future<void> close() async { Future<void> close() async {
if (!isOpen) { if (!isOpen) {
return; return;
@ -159,44 +154,22 @@ class DHTShortArray {
/// Free all resources for the DHTShortArray and delete it from the DHT /// Free all resources for the DHTShortArray and delete it from the DHT
/// Will wait until the short array is closed to delete it /// Will wait until the short array is closed to delete it
@override
Future<void> delete() async { Future<void> delete() async {
await _head.delete(); await _head.delete();
} }
/// Runs a closure that guarantees the DHTShortArray ////////////////////////////////////////////////////////////////////////////
/// will be closed upon exit, even if an uncaught exception is thrown // Public API
Future<T> scope<T>(Future<T> Function(DHTShortArray) scopeFunction) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
try {
return await scopeFunction(this);
} finally {
await close();
}
}
/// Runs a closure that guarantees the DHTShortArray /// Get the record key for this shortarray
/// will be closed upon exit, and deleted if an an TypedKey get recordKey => _head.recordKey;
/// uncaught exception is thrown
Future<T> deleteScope<T>(
Future<T> Function(DHTShortArray) scopeFunction) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
try { /// Get the record pointer foir this shortarray
final out = await scopeFunction(this); OwnedDHTRecordPointer get recordPointer => _head.recordPointer;
await close();
return out;
} on Exception catch (_) {
await delete();
rethrow;
}
}
/// Runs a closure allowing read-only access to the shortarray /// Runs a closure allowing read-only access to the shortarray
Future<T?> operate<T>(Future<T?> Function(DHTShortArrayRead) closure) async { Future<T> operate<T>(Future<T> Function(DHTRandomRead) closure) async {
if (!isOpen) { if (!isOpen) {
throw StateError('short array is not open"'); throw StateError('short array is not open"');
} }
@ -209,14 +182,19 @@ class DHTShortArray {
/// Runs a closure allowing read-write access to the shortarray /// Runs a closure allowing read-write access to the shortarray
/// Makes only one attempt to consistently write the changes to the DHT /// Makes only one attempt to consistently write the changes to the DHT
/// Returns (result, true) of the closure if the write could be performed /// Returns result of the closure if the write could be performed
/// Returns (null, false) if the write could not be performed at this time /// Throws DHTOperateException if the write could not be performed at this time
Future<(T?, bool)> operateWrite<T>( Future<T> operateWrite<T>(
Future<T?> Function(DHTShortArrayWrite) closure) async => Future<T> Function(DHTRandomReadWrite) closure) async {
_head.operateWrite((head) async { if (!isOpen) {
final writer = _DHTShortArrayWrite._(head); throw StateError('short array is not open"');
return closure(writer); }
});
return _head.operateWrite((head) async {
final writer = _DHTShortArrayWrite._(head);
return closure(writer);
});
}
/// Runs a closure allowing read-write access to the shortarray /// Runs a closure allowing read-write access to the shortarray
/// Will execute the closure multiple times if a consistent write to the DHT /// Will execute the closure multiple times if a consistent write to the DHT
@ -225,7 +203,7 @@ class DHTShortArray {
/// succeeded, returning false will trigger another eventual consistency /// succeeded, returning false will trigger another eventual consistency
/// attempt. /// attempt.
Future<void> operateWriteEventual( Future<void> operateWriteEventual(
Future<bool> Function(DHTShortArrayWrite) closure, Future<bool> Function(DHTRandomReadWrite) closure,
{Duration? timeout}) async { {Duration? timeout}) async {
if (!isOpen) { if (!isOpen) {
throw StateError('short array is not open"'); throw StateError('short array is not open"');

View File

@ -41,19 +41,6 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
}); });
} }
// DHTShortArrayCubit.value({
// required DHTShortArray shortArray,
// required T Function(List<int> data) decodeElement,
// }) : _shortArray = shortArray,
// _decodeElement = decodeElement,
// super(const BlocBusyState(AsyncValue.loading())) {
// _initFuture = Future(() async {
// // Make initial state update
// unawaited(_refreshNoWait());
// _subscription = await shortArray.listen(_update);
// });
// }
Future<void> refresh({bool forceRefresh = false}) async { Future<void> refresh({bool forceRefresh = false}) async {
await _initWait(); await _initWait();
await _refreshNoWait(forceRefresh: forceRefresh); await _refreshNoWait(forceRefresh: forceRefresh);
@ -67,12 +54,13 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
try { try {
final newState = await _shortArray.operate((reader) async { final newState = await _shortArray.operate((reader) async {
final offlinePositions = await reader.getOfflinePositions(); final offlinePositions = await reader.getOfflinePositions();
final allItems = (await reader.getAllItems(forceRefresh: forceRefresh)) final allItems =
?.indexed (await reader.getItemRange(0, forceRefresh: forceRefresh))
.map((x) => DHTShortArrayElementState( ?.indexed
value: _decodeElement(x.$2), .map((x) => DHTShortArrayElementState(
isOffline: offlinePositions.contains(x.$1))) value: _decodeElement(x.$2),
.toIList(); isOffline: offlinePositions.contains(x.$1)))
.toIList();
return allItems; return allItems;
}); });
if (newState != null) { if (newState != null) {
@ -103,19 +91,19 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
await super.close(); await super.close();
} }
Future<R?> operate<R>(Future<R?> Function(DHTShortArrayRead) closure) async { Future<R> operate<R>(Future<R> Function(DHTRandomRead) closure) async {
await _initWait(); await _initWait();
return _shortArray.operate(closure); return _shortArray.operate(closure);
} }
Future<(R?, bool)> operateWrite<R>( Future<R> operateWrite<R>(
Future<R?> Function(DHTShortArrayWrite) closure) async { Future<R> Function(DHTRandomReadWrite) closure) async {
await _initWait(); await _initWait();
return _shortArray.operateWrite(closure); return _shortArray.operateWrite(closure);
} }
Future<void> operateWriteEventual( Future<void> operateWriteEventual(
Future<bool> Function(DHTShortArrayWrite) closure, Future<bool> Function(DHTRandomReadWrite) closure,
{Duration? timeout}) async { {Duration? timeout}) async {
await _initWait(); await _initWait();
return _shortArray.operateWriteEventual(closure, timeout: timeout); return _shortArray.operateWriteEventual(closure, timeout: timeout);

View File

@ -82,8 +82,8 @@ class _DHTShortArrayHead {
return closure(this); return closure(this);
}); });
Future<(T?, bool)> operateWrite<T>( Future<T> operateWrite<T>(
Future<T?> Function(_DHTShortArrayHead) closure) async => Future<T> Function(_DHTShortArrayHead) closure) async =>
_headMutex.protect(() async { _headMutex.protect(() async {
final oldLinkedRecords = List.of(_linkedRecords); final oldLinkedRecords = List.of(_linkedRecords);
final oldIndex = List.of(_index); final oldIndex = List.of(_index);
@ -95,11 +95,11 @@ class _DHTShortArrayHead {
if (!await _writeHead()) { if (!await _writeHead()) {
// Failed to write head means head got overwritten so write should // Failed to write head means head got overwritten so write should
// be considered failed // be considered failed
return (null, false); throw DHTExceptionTryAgain();
} }
onUpdatedHead?.call(); onUpdatedHead?.call();
return (out, true); return out;
} on Exception { } on Exception {
// Exception means state needs to be reverted // Exception means state needs to be reverted
_linkedRecords = oldLinkedRecords; _linkedRecords = oldLinkedRecords;
@ -249,22 +249,15 @@ class _DHTShortArrayHead {
} }
// Pull the latest or updated copy of the head record from the network // Pull the latest or updated copy of the head record from the network
Future<bool> _loadHead( Future<void> _loadHead() async {
{bool forceRefresh = true, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists // Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); subkey: 0, refreshMode: DHTRecordRefreshMode.refresh);
if (head == null) { if (head == null) {
if (onlyUpdates) { throw StateError('shortarray head missing during refresh');
// No update
return false;
}
throw StateError('head missing during refresh');
} }
await _updateHead(head); await _updateHead(head);
return true;
} }
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////

View File

@ -1,70 +1,14 @@
part of 'dht_short_array.dart'; part of 'dht_short_array.dart';
////////////////////////////////////////////////////////////////////////////
// Reader interface
abstract class DHTShortArrayRead {
/// Returns the number of elements in the DHTShortArray
int get length;
/// Return the item at position 'pos' in the DHTShortArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false});
/// Return a list of all of the items in the DHTShortArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false});
/// Get a list of the positions that were written offline and not flushed yet
Future<Set<int>> getOfflinePositions();
}
extension DHTShortArrayReadExt on DHTShortArrayRead {
/// Convenience function:
/// Like getItem but also parses the returned element as JSON
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as JSON
Future<List<T>?> getAllItemsJson<T>(T Function(dynamic) fromJson,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromJson).toList());
/// Convenience function:
/// Like getItem but also parses the returned element as a protobuf object
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as protobuf objects
Future<List<T>?> getAllItemsProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
{bool forceRefresh = false}) =>
getAllItems(forceRefresh: forceRefresh)
.then((out) => out?.map(fromBuffer).toList());
}
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
// Reader-only implementation // Reader-only implementation
class _DHTShortArrayRead implements DHTShortArrayRead { class _DHTShortArrayRead implements DHTRandomRead {
_DHTShortArrayRead._(_DHTShortArrayHead head) : _head = head; _DHTShortArrayRead._(_DHTShortArrayHead head) : _head = head;
/// Returns the number of elements in the DHTShortArray
@override @override
int get length => _head.length; int get length => _head.length;
/// Return the item at position 'pos' in the DHTShortArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
@override @override
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async { Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
if (pos < 0 || pos >= length) { if (pos < 0 || pos >= length) {
@ -77,7 +21,9 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
final outSeqNum = Output<int>(); final outSeqNum = Output<int>();
final out = lookup.record.get( final out = lookup.record.get(
subkey: lookup.recordSubkey, subkey: lookup.recordSubkey,
forceRefresh: refresh, refreshMode: refresh
? DHTRecordRefreshMode.refresh
: DHTRecordRefreshMode.existing,
outSeqNum: outSeqNum); outSeqNum: outSeqNum);
if (outSeqNum.value != null) { if (outSeqNum.value != null) {
_head.updatePositionSeq(pos, false, outSeqNum.value!); _head.updatePositionSeq(pos, false, outSeqNum.value!);
@ -86,17 +32,29 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
return out; return out;
} }
/// Return a list of all of the items in the DHTShortArray. If 'forceRefresh' (int, int) _clampStartLen(int start, int? len) {
/// is specified, the network will always be checked for newer values len ??= _head.length;
/// rather than returning the existing locally stored copy of the elements. if (start < 0) {
@override throw IndexError.withLength(start, _head.length);
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) async { }
final out = <Uint8List>[]; if (start > _head.length) {
throw IndexError.withLength(start, _head.length);
}
if ((len + start) > _head.length) {
len = _head.length - start;
}
return (start, len);
}
final chunks = Iterable<int>.generate(_head.length) @override
.slices(maxDHTConcurrency) Future<List<Uint8List>?> getItemRange(int start,
.map((chunk) => {int? length, bool forceRefresh = false}) async {
chunk.map((pos) => getItem(pos, forceRefresh: forceRefresh))); final out = <Uint8List>[];
(start, length) = _clampStartLen(start, length);
final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map(
(chunk) => chunk
.map((pos) => getItem(pos + start, forceRefresh: forceRefresh)));
for (final chunk in chunks) { for (final chunk in chunks) {
final elems = await chunk.wait; final elems = await chunk.wait;
@ -109,9 +67,10 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
return out; return out;
} }
/// Get a list of the positions that were written offline and not flushed yet
@override @override
Future<Set<int>> getOfflinePositions() async { Future<Set<int>> getOfflinePositions() async {
final (start, length) = _clampStartLen(0, DHTShortArray.maxElements);
final indexOffline = <int>{}; final indexOffline = <int>{};
final inspects = await [ final inspects = await [
_head._headRecord.inspect(), _head._headRecord.inspect(),
@ -134,7 +93,7 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
// See which positions map to offline indexes // See which positions map to offline indexes
final positionOffline = <int>{}; final positionOffline = <int>{};
for (var i = 0; i < _head._index.length; i++) { for (var i = start; i < (start + length); i++) {
final idx = _head._index[i]; final idx = _head._index[i];
if (indexOffline.contains(idx)) { if (indexOffline.contains(idx)) {
positionOffline.add(i); positionOffline.add(i);

View File

@ -1,101 +1,10 @@
part of 'dht_short_array.dart'; part of 'dht_short_array.dart';
////////////////////////////////////////////////////////////////////////////
// Writer interface
abstract class DHTShortArrayWrite implements DHTShortArrayRead {
/// Try to add an item to the end of the DHTShortArray. Return true if the
/// element was successfully added, and false if the state changed before
/// the element could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value);
/// Try to insert an item as position 'pos' of the DHTShortArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value);
/// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray.
/// Return true if the elements were successfully swapped, and false if the
/// state changed before the elements could be swapped or newer values were
/// found on the network.
/// This may throw an exception if either of the positions swapped exceed
/// the length of the list
Future<bool> trySwapItem(int aPos, int bPos);
/// Try to remove an item at position 'pos' in the DHTShortArray.
/// Return the element if it was successfully removed, and null if the
/// state changed before the elements could be removed or newer values were
/// found on the network.
/// This may throw an exception if the position removed exceeeds the length of
/// the list.
Future<Uint8List?> tryRemoveItem(int pos);
/// Try to remove all items in the DHTShortArray.
/// Return true if it was successfully cleared, and false if the
/// state changed before the elements could be cleared or newer values were
/// found on the network.
Future<bool> tryClear();
/// Try to set an item at position 'pos' of the DHTShortArray.
/// If the set was successful this returns:
/// * The prior contents of the element, or null if there was no value yet
/// * A boolean true
/// If the set was found a newer value on the network:
/// * The newer value of the element, or null if the head record
/// changed.
/// * A boolean false
/// This may throw an exception if the position exceeds the built-in limit of
/// 'maxElements = 256' entries.
Future<(Uint8List?, bool)> tryWriteItem(int pos, Uint8List newValue);
}
extension DHTShortArrayWriteExt on DHTShortArrayWrite {
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemJson<T>(
T Function(dynamic) fromJson,
int pos,
) =>
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos) =>
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<(T?, bool)> tryWriteItemJson<T>(
T Function(dynamic) fromJson,
int pos,
T newValue,
) =>
tryWriteItem(pos, jsonEncodeBytes(newValue))
.then((out) => (jsonDecodeOptBytes(fromJson, out.$1), out.$2));
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<(T?, bool)> tryWriteItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
T newValue,
) =>
tryWriteItem(pos, newValue.writeToBuffer()).then(
(out) => ((out.$1 == null ? null : fromBuffer(out.$1!)), out.$2));
}
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
// Writer implementation // Writer implementation
class _DHTShortArrayWrite extends _DHTShortArrayRead class _DHTShortArrayWrite extends _DHTShortArrayRead
implements DHTShortArrayWrite { implements DHTRandomReadWrite {
_DHTShortArrayWrite._(super.head) : super._(); _DHTShortArrayWrite._(super.head) : super._();
@override @override
@ -105,12 +14,12 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
_head.allocateIndex(pos); _head.allocateIndex(pos);
// Write item // Write item
final (_, wasSet) = await tryWriteItem(pos, value); final ok = await tryWriteItem(pos, value);
if (!wasSet) { if (!ok) {
return false; _head.freeIndex(pos);
} }
return true; return ok;
} }
@override @override
@ -119,16 +28,15 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
_head.allocateIndex(pos); _head.allocateIndex(pos);
// Write item // Write item
final (_, wasSet) = await tryWriteItem(pos, value); final ok = await tryWriteItem(pos, value);
if (!wasSet) { if (!ok) {
return false; _head.freeIndex(pos);
} }
return true; return true;
} }
@override @override
Future<bool> trySwapItem(int aPos, int bPos) async { Future<void> swapItem(int aPos, int bPos) async {
if (aPos < 0 || aPos >= _head.length) { if (aPos < 0 || aPos >= _head.length) {
throw IndexError.withLength(aPos, _head.length); throw IndexError.withLength(aPos, _head.length);
} }
@ -137,12 +45,10 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
} }
// Swap indices // Swap indices
_head.swapIndex(aPos, bPos); _head.swapIndex(aPos, bPos);
return true;
} }
@override @override
Future<Uint8List> tryRemoveItem(int pos) async { Future<void> removeItem(int pos, {Output<Uint8List>? output}) async {
if (pos < 0 || pos >= _head.length) { if (pos < 0 || pos >= _head.length) {
throw IndexError.withLength(pos, _head.length); throw IndexError.withLength(pos, _head.length);
} }
@ -162,17 +68,17 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
throw StateError('Element does not exist'); throw StateError('Element does not exist');
} }
_head.freeIndex(pos); _head.freeIndex(pos);
return result; output?.save(result);
} }
@override @override
Future<bool> tryClear() async { Future<void> clear() async {
_head.clearIndex(); _head.clearIndex();
return true;
} }
@override @override
Future<(Uint8List?, bool)> tryWriteItem(int pos, Uint8List newValue) async { Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output}) async {
if (pos < 0 || pos >= _head.length) { if (pos < 0 || pos >= _head.length) {
throw IndexError.withLength(pos, _head.length); throw IndexError.withLength(pos, _head.length);
} }
@ -198,8 +104,10 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
if (result != null) { if (result != null) {
// A result coming back means the element was overwritten already // A result coming back means the element was overwritten already
return (result, false); output?.save(result);
return false;
} }
return (oldValue, true); output?.save(oldValue);
return true;
} }
} }

View File

@ -0,0 +1,44 @@
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Append/truncate interface
abstract class DHTAppendTruncate {
/// Try to add an item to the end of the DHT data structure.
/// Return true if the element was successfully added, and false if the state
/// changed before the element could be added or a newer value was found on
/// the network.
/// This may throw an exception if the number elements added exceeds limits.
Future<bool> tryAppendItem(Uint8List value);
/// Try to remove a number of items from the head of the DHT data structure.
/// Throws StateError if count < 0
Future<void> truncate(int count);
/// Remove all items in the DHT data structure.
Future<void> clear();
}
abstract class DHTAppendTruncateRandomRead
implements DHTAppendTruncate, DHTRandomRead {}
extension DHTAppendTruncateExt on DHTAppendTruncate {
/// Convenience function:
/// Like tryAppendItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<bool> tryAppendItemJson<T>(
T newValue,
) =>
tryAppendItem(jsonEncodeBytes(newValue));
/// Convenience function:
/// Like tryAppendItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<bool> tryAppendItemProtobuf<T extends GeneratedMessage>(
T newValue,
) =>
tryAppendItem(newValue.writeToBuffer());
}

View File

@ -0,0 +1,49 @@
import 'dart:async';
abstract class DHTOpenable {
bool get isOpen;
Future<void> close();
Future<void> delete();
}
extension DHTOpenableExt<D extends DHTOpenable> on D {
/// Runs a closure that guarantees the DHTOpenable
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(D) scopeFunction) async {
if (!isOpen) {
throw StateError('not open in scope');
}
try {
return await scopeFunction(this);
} finally {
await close();
}
}
/// Runs a closure that guarantees the DHTOpenable
/// will be closed upon exit, and deleted if an an
/// uncaught exception is thrown
Future<T> deleteScope<T>(Future<T> Function(D) scopeFunction) async {
if (!isOpen) {
throw StateError('not open in deleteScope');
}
try {
final out = await scopeFunction(this);
await close();
return out;
} on Exception catch (_) {
await delete();
rethrow;
}
}
/// Scopes a closure that conditionally deletes the DHTOpenable on exit
Future<T> maybeDeleteScope<T>(
bool delete, Future<T> Function(D) scopeFunction) async {
if (delete) {
return deleteScope(scopeFunction);
}
return scope(scopeFunction);
}
}

View File

@ -0,0 +1,63 @@
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Reader interface
abstract class DHTRandomRead {
/// Returns the number of elements in the DHTArray
/// This number will be >= 0 and <= DHTShortArray.maxElements (256)
int get length;
/// Return the item at position 'pos' in the DHTArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// * 'pos' must be >= 0 and < 'length'
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false});
/// Return a list of a range of items in the DHTArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// * 'start' must be >= 0
/// * 'len' must be >= 0 and <= DHTShortArray.maxElements (256) and defaults
/// to the maximum length
Future<List<Uint8List>?> getItemRange(int start,
{int? length, bool forceRefresh = false});
/// Get a list of the positions that were written offline and not flushed yet
Future<Set<int>> getOfflinePositions();
}
extension DHTRandomReadExt on DHTRandomRead {
/// Convenience function:
/// Like getItem but also parses the returned element as JSON
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as JSON
Future<List<T>?> getItemRangeJson<T>(T Function(dynamic) fromJson, int start,
{int? length, bool forceRefresh = false}) =>
getItemRange(start, length: length, forceRefresh: forceRefresh)
.then((out) => out?.map(fromJson).toList());
/// Convenience function:
/// Like getItem but also parses the returned element as a protobuf object
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out));
/// Convenience function:
/// Like getAllItems but also parses the returned elements as protobuf objects
Future<List<T>?> getItemRangeProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int start,
{int? length, bool forceRefresh = false}) =>
getItemRange(start, length: length, forceRefresh: forceRefresh)
.then((out) => out?.map(fromBuffer).toList());
}

View File

@ -0,0 +1,104 @@
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Writer interface
abstract class DHTRandomWrite {
/// Try to set an item at position 'pos' of the DHTArray.
/// If the set was successful this returns:
/// * A boolean true
/// * outValue will return the prior contents of the element,
/// or null if there was no value yet
///
/// If the set was found a newer value on the network this returns:
/// * A boolean false
/// * outValue will return the newer value of the element,
/// or null if the head record changed.
///
/// This may throw an exception if the position exceeds the built-in limit of
/// 'maxElements = 256' entries.
Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output});
/// Try to add an item to the end of the DHTArray. Return true if the
/// element was successfully added, and false if the state changed before
/// the element could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value);
/// Try to insert an item as position 'pos' of the DHTArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value);
/// Swap items at position 'aPos' and 'bPos' in the DHTArray.
/// Throws IndexError if either of the positions swapped exceed
/// the length of the list
Future<void> swapItem(int aPos, int bPos);
/// Remove an item at position 'pos' in the DHTArray.
/// If the remove was successful this returns:
/// * outValue will return the prior contents of the element
/// Throws IndexError if the position removed exceeds the length of
/// the list.
Future<void> removeItem(int pos, {Output<Uint8List>? output});
/// Remove all items in the DHTShortArray.
Future<void> clear();
}
extension DHTRandomWriteExt on DHTRandomWrite {
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<bool> tryWriteItemJson<T>(
T Function(dynamic) fromJson, int pos, T newValue,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
final out = await tryWriteItem(pos, jsonEncodeBytes(newValue),
output: outValueBytes);
output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b));
return out;
}
/// Convenience function:
/// Like tryWriteItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<bool> tryWriteItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos, T newValue,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
final out = await tryWriteItem(pos, newValue.writeToBuffer(),
output: outValueBytes);
output.mapSave(outValueBytes, fromBuffer);
return out;
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<void> removeItemJson<T>(T Function(dynamic) fromJson, int pos,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes);
output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b));
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<void> removeItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes);
output.mapSave(outValueBytes, fromBuffer);
}
}
abstract class DHTRandomReadWrite implements DHTRandomRead, DHTRandomWrite {}

View File

@ -0,0 +1,5 @@
class DHTExceptionTryAgain implements Exception {
DHTExceptionTryAgain(
[this.cause = 'operation failed due to newer dht value']);
String cause;
}

View File

@ -0,0 +1,4 @@
export 'dht_openable.dart';
export 'dht_random_read.dart';
export 'dht_random_write.dart';
export 'exceptions.dart';

View File

@ -83,6 +83,68 @@ class DHTData extends $pb.GeneratedMessage {
void clearSize() => clearField(4); void clearSize() => clearField(4);
} }
class DHTLog extends $pb.GeneratedMessage {
factory DHTLog() => create();
DHTLog._() : super();
factory DHTLog.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory DHTLog.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTLog', package: const $pb.PackageName(_omitMessageNames ? '' : 'dht'), createEmptyInstance: create)
..a<$core.int>(1, _omitFieldNames ? '' : 'head', $pb.PbFieldType.OU3)
..a<$core.int>(2, _omitFieldNames ? '' : 'tail', $pb.PbFieldType.OU3)
..a<$core.int>(3, _omitFieldNames ? '' : 'stride', $pb.PbFieldType.OU3)
..hasRequiredFields = false
;
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
DHTLog clone() => DHTLog()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
DHTLog copyWith(void Function(DHTLog) updates) => super.copyWith((message) => updates(message as DHTLog)) as DHTLog;
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static DHTLog create() => DHTLog._();
DHTLog createEmptyInstance() => create();
static $pb.PbList<DHTLog> createRepeated() => $pb.PbList<DHTLog>();
@$core.pragma('dart2js:noInline')
static DHTLog getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<DHTLog>(create);
static DHTLog? _defaultInstance;
@$pb.TagNumber(1)
$core.int get head => $_getIZ(0);
@$pb.TagNumber(1)
set head($core.int v) { $_setUnsignedInt32(0, v); }
@$pb.TagNumber(1)
$core.bool hasHead() => $_has(0);
@$pb.TagNumber(1)
void clearHead() => clearField(1);
@$pb.TagNumber(2)
$core.int get tail => $_getIZ(1);
@$pb.TagNumber(2)
set tail($core.int v) { $_setUnsignedInt32(1, v); }
@$pb.TagNumber(2)
$core.bool hasTail() => $_has(1);
@$pb.TagNumber(2)
void clearTail() => clearField(2);
@$pb.TagNumber(3)
$core.int get stride => $_getIZ(2);
@$pb.TagNumber(3)
set stride($core.int v) { $_setUnsignedInt32(2, v); }
@$pb.TagNumber(3)
$core.bool hasStride() => $_has(2);
@$pb.TagNumber(3)
void clearStride() => clearField(3);
}
class DHTShortArray extends $pb.GeneratedMessage { class DHTShortArray extends $pb.GeneratedMessage {
factory DHTShortArray() => create(); factory DHTShortArray() => create();
DHTShortArray._() : super(); DHTShortArray._() : super();
@ -133,68 +195,6 @@ class DHTShortArray extends $pb.GeneratedMessage {
$core.List<$core.int> get seqs => $_getList(2); $core.List<$core.int> get seqs => $_getList(2);
} }
class DHTLog extends $pb.GeneratedMessage {
factory DHTLog() => create();
DHTLog._() : super();
factory DHTLog.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory DHTLog.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTLog', package: const $pb.PackageName(_omitMessageNames ? '' : 'dht'), createEmptyInstance: create)
..pc<$0.TypedKey>(1, _omitFieldNames ? '' : 'keys', $pb.PbFieldType.PM, subBuilder: $0.TypedKey.create)
..aOM<$0.TypedKey>(2, _omitFieldNames ? '' : 'back', subBuilder: $0.TypedKey.create)
..p<$core.int>(3, _omitFieldNames ? '' : 'subkeyCounts', $pb.PbFieldType.KU3)
..a<$core.int>(4, _omitFieldNames ? '' : 'totalSubkeys', $pb.PbFieldType.OU3)
..hasRequiredFields = false
;
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
DHTLog clone() => DHTLog()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
DHTLog copyWith(void Function(DHTLog) updates) => super.copyWith((message) => updates(message as DHTLog)) as DHTLog;
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static DHTLog create() => DHTLog._();
DHTLog createEmptyInstance() => create();
static $pb.PbList<DHTLog> createRepeated() => $pb.PbList<DHTLog>();
@$core.pragma('dart2js:noInline')
static DHTLog getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<DHTLog>(create);
static DHTLog? _defaultInstance;
@$pb.TagNumber(1)
$core.List<$0.TypedKey> get keys => $_getList(0);
@$pb.TagNumber(2)
$0.TypedKey get back => $_getN(1);
@$pb.TagNumber(2)
set back($0.TypedKey v) { setField(2, v); }
@$pb.TagNumber(2)
$core.bool hasBack() => $_has(1);
@$pb.TagNumber(2)
void clearBack() => clearField(2);
@$pb.TagNumber(2)
$0.TypedKey ensureBack() => $_ensure(1);
@$pb.TagNumber(3)
$core.List<$core.int> get subkeyCounts => $_getList(2);
@$pb.TagNumber(4)
$core.int get totalSubkeys => $_getIZ(3);
@$pb.TagNumber(4)
set totalSubkeys($core.int v) { $_setUnsignedInt32(3, v); }
@$pb.TagNumber(4)
$core.bool hasTotalSubkeys() => $_has(3);
@$pb.TagNumber(4)
void clearTotalSubkeys() => clearField(4);
}
enum DataReference_Kind { enum DataReference_Kind {
dhtData, dhtData,
notSet notSet

View File

@ -30,6 +30,21 @@ final $typed_data.Uint8List dHTDataDescriptor = $convert.base64Decode(
'gCIAEoCzIQLnZlaWxpZC5UeXBlZEtleVIEaGFzaBIUCgVjaHVuaxgDIAEoDVIFY2h1bmsSEgoE' 'gCIAEoCzIQLnZlaWxpZC5UeXBlZEtleVIEaGFzaBIUCgVjaHVuaxgDIAEoDVIFY2h1bmsSEgoE'
'c2l6ZRgEIAEoDVIEc2l6ZQ=='); 'c2l6ZRgEIAEoDVIEc2l6ZQ==');
@$core.Deprecated('Use dHTLogDescriptor instead')
const DHTLog$json = {
'1': 'DHTLog',
'2': [
{'1': 'head', '3': 1, '4': 1, '5': 13, '10': 'head'},
{'1': 'tail', '3': 2, '4': 1, '5': 13, '10': 'tail'},
{'1': 'stride', '3': 3, '4': 1, '5': 13, '10': 'stride'},
],
};
/// Descriptor for `DHTLog`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List dHTLogDescriptor = $convert.base64Decode(
'CgZESFRMb2cSEgoEaGVhZBgBIAEoDVIEaGVhZBISCgR0YWlsGAIgASgNUgR0YWlsEhYKBnN0cm'
'lkZRgDIAEoDVIGc3RyaWRl');
@$core.Deprecated('Use dHTShortArrayDescriptor instead') @$core.Deprecated('Use dHTShortArrayDescriptor instead')
const DHTShortArray$json = { const DHTShortArray$json = {
'1': 'DHTShortArray', '1': 'DHTShortArray',
@ -45,23 +60,6 @@ final $typed_data.Uint8List dHTShortArrayDescriptor = $convert.base64Decode(
'Cg1ESFRTaG9ydEFycmF5EiQKBGtleXMYASADKAsyEC52ZWlsaWQuVHlwZWRLZXlSBGtleXMSFA' 'Cg1ESFRTaG9ydEFycmF5EiQKBGtleXMYASADKAsyEC52ZWlsaWQuVHlwZWRLZXlSBGtleXMSFA'
'oFaW5kZXgYAiABKAxSBWluZGV4EhIKBHNlcXMYAyADKA1SBHNlcXM='); 'oFaW5kZXgYAiABKAxSBWluZGV4EhIKBHNlcXMYAyADKA1SBHNlcXM=');
@$core.Deprecated('Use dHTLogDescriptor instead')
const DHTLog$json = {
'1': 'DHTLog',
'2': [
{'1': 'keys', '3': 1, '4': 3, '5': 11, '6': '.veilid.TypedKey', '10': 'keys'},
{'1': 'back', '3': 2, '4': 1, '5': 11, '6': '.veilid.TypedKey', '10': 'back'},
{'1': 'subkey_counts', '3': 3, '4': 3, '5': 13, '10': 'subkeyCounts'},
{'1': 'total_subkeys', '3': 4, '4': 1, '5': 13, '10': 'totalSubkeys'},
],
};
/// Descriptor for `DHTLog`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List dHTLogDescriptor = $convert.base64Decode(
'CgZESFRMb2cSJAoEa2V5cxgBIAMoCzIQLnZlaWxpZC5UeXBlZEtleVIEa2V5cxIkCgRiYWNrGA'
'IgASgLMhAudmVpbGlkLlR5cGVkS2V5UgRiYWNrEiMKDXN1YmtleV9jb3VudHMYAyADKA1SDHN1'
'YmtleUNvdW50cxIjCg10b3RhbF9zdWJrZXlzGAQgASgNUgx0b3RhbFN1YmtleXM=');
@$core.Deprecated('Use dataReferenceDescriptor instead') @$core.Deprecated('Use dataReferenceDescriptor instead')
const DataReference$json = { const DataReference$json = {
'1': 'DataReference', '1': 'DataReference',

View File

@ -300,8 +300,8 @@ Future<IdentityMaster> openIdentityMaster(
debugName: debugName:
'IdentityMaster::openIdentityMaster::IdentityMasterRecord')) 'IdentityMaster::openIdentityMaster::IdentityMasterRecord'))
.deleteScope((masterRec) async { .deleteScope((masterRec) async {
final identityMaster = final identityMaster = (await masterRec.getJson(IdentityMaster.fromJson,
(await masterRec.getJson(IdentityMaster.fromJson, forceRefresh: true))!; refreshMode: DHTRecordRefreshMode.refresh))!;
// Validate IdentityMaster // Validate IdentityMaster
final masterRecordKey = masterRec.key; final masterRecordKey = masterRec.key;

View File

@ -0,0 +1,33 @@
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
export 'package:fast_immutable_collections/fast_immutable_collections.dart'
show Output;
extension OutputNullExt<T> on Output<T>? {
void mapSave<S>(Output<S>? other, T Function(S output) closure) {
if (this == null) {
return;
}
if (other == null) {
return;
}
final v = other.value;
if (v == null) {
return;
}
return this!.save(closure(v));
}
}
extension OutputExt<T> on Output<T> {
void mapSave<S>(Output<S>? other, T Function(S output) closure) {
if (other == null) {
return;
}
final v = other.value;
if (v == null) {
return;
}
return save(closure(v));
}
}

View File

@ -10,6 +10,7 @@ export 'src/config.dart';
export 'src/identity.dart'; export 'src/identity.dart';
export 'src/json_tools.dart'; export 'src/json_tools.dart';
export 'src/memory_tools.dart'; export 'src/memory_tools.dart';
export 'src/output.dart';
export 'src/persistent_queue.dart'; export 'src/persistent_queue.dart';
export 'src/protobuf_tools.dart'; export 'src/protobuf_tools.dart';
export 'src/table_db.dart'; export 'src/table_db.dart';

View File

@ -37,10 +37,10 @@ packages:
dependency: "direct main" dependency: "direct main"
description: description:
name: archive name: archive
sha256: "22600aa1e926be775fa5fe7e6894e7fb3df9efda8891c73f70fb3262399a432d" sha256: ecf4273855368121b1caed0d10d4513c7241dfc813f7d3c8933b36622ae9b265
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "3.4.10" version: "3.5.1"
args: args:
dependency: transitive dependency: transitive
description: description:
@ -203,18 +203,18 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: cached_network_image_web name: cached_network_image_web
sha256: "42a835caa27c220d1294311ac409a43361088625a4f23c820b006dd9bffb3316" sha256: "205d6a9f1862de34b93184f22b9d2d94586b2f05c581d546695e3d8f6a805cd7"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.1.1" version: "1.2.0"
camera: camera:
dependency: transitive dependency: transitive
description: description:
name: camera name: camera
sha256: "9499cbc2e51d8eb0beadc158b288380037618ce4e30c9acbc4fae1ac3ecb5797" sha256: dfa8fc5a1adaeb95e7a54d86a5bd56f4bb0e035515354c8ac6d262e35cec2ec8
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.10.5+9" version: "0.10.6"
camera_android: camera_android:
dependency: transitive dependency: transitive
description: description:
@ -227,10 +227,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: camera_avfoundation name: camera_avfoundation
sha256: "9dbbb253aaf201a69c40cf95571f366ca936305d2de012684e21f6f1b1433d31" sha256: "7d021e8cd30d9b71b8b92b4ad669e80af432d722d18d6aac338572754a786c15"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.9.15+4" version: "0.9.16"
camera_platform_interface: camera_platform_interface:
dependency: transitive dependency: transitive
description: description:
@ -456,10 +456,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: flutter_cache_manager name: flutter_cache_manager
sha256: "8207f27539deb83732fdda03e259349046a39a4c767269285f449ade355d54ba" sha256: "395d6b7831f21f3b989ebedbb785545932adb9afe2622c1ffacf7f4b53a7e544"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "3.3.1" version: "3.3.2"
flutter_chat_types: flutter_chat_types:
dependency: "direct main" dependency: "direct main"
description: description:
@ -634,10 +634,10 @@ packages:
dependency: "direct main" dependency: "direct main"
description: description:
name: go_router name: go_router
sha256: "771c8feb40ad0ef639973d7ecf1b43d55ffcedb2207fd43fab030f5639e40446" sha256: b465e99ce64ba75e61c8c0ce3d87b66d8ac07f0b35d0a7e0263fcfc10f99e836
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "13.2.4" version: "13.2.5"
graphs: graphs:
dependency: transitive dependency: transitive
description: description:
@ -898,10 +898,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: path_provider_foundation name: path_provider_foundation
sha256: "5a7999be66e000916500be4f15a3633ebceb8302719b47b9cc49ce924125350f" sha256: f234384a3fdd67f989b4d54a5d73ca2a6c422fa55ae694381ae0f4375cd1ea16
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.3.2" version: "2.4.0"
path_provider_linux: path_provider_linux:
dependency: transitive dependency: transitive
description: description:
@ -970,10 +970,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: pointycastle name: pointycastle
sha256: "79fbafed02cfdbe85ef3fd06c7f4bc2cbcba0177e61b765264853d4253b21744" sha256: "4be0097fcf3fd3e8449e53730c631200ebc7b88016acecab2b0da2f0149222fe"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "3.9.0" version: "3.9.1"
pool: pool:
dependency: transitive dependency: transitive
description: description:
@ -1106,10 +1106,10 @@ packages:
dependency: "direct main" dependency: "direct main"
description: description:
name: searchable_listview name: searchable_listview
sha256: d8513a968bdd540cb011220a5670b23b346e04a7bcb99690a859ed58092f72a4 sha256: f9bc1a57dfcba49ce2d190d642567fb82309dd23849b3b0a328266e3f90054db
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.11.2" version: "2.12.0"
share_plus: share_plus:
dependency: "direct main" dependency: "direct main"
description: description:
@ -1146,10 +1146,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: shared_preferences_foundation name: shared_preferences_foundation
sha256: "7708d83064f38060c7b39db12aefe449cb8cdc031d6062280087bc4cdb988f5c" sha256: "0a8a893bf4fd1152f93fec03a415d11c27c74454d96e2318a7ac38dd18683ab7"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.3.5" version: "2.4.0"
shared_preferences_linux: shared_preferences_linux:
dependency: transitive dependency: transitive
description: description:
@ -1263,10 +1263,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: sqflite name: sqflite
sha256: "5ce2e1a15e822c3b4bfb5400455775e421da7098eed8adc8f26298ada7c9308c" sha256: a43e5a27235518c03ca238e7b4732cf35eabe863a369ceba6cbefa537a66f16d
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.3.3" version: "2.3.3+1"
sqflite_common: sqflite_common:
dependency: transitive dependency: transitive
description: description:
@ -1407,10 +1407,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: url_launcher_ios name: url_launcher_ios
sha256: "9149d493b075ed740901f3ee844a38a00b33116c7c5c10d7fb27df8987fb51d5" sha256: "7068716403343f6ba4969b4173cbf3b84fc768042124bc2c011e5d782b24fe89"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "6.2.5" version: "6.3.0"
url_launcher_linux: url_launcher_linux:
dependency: transitive dependency: transitive
description: description:
@ -1423,10 +1423,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: url_launcher_macos name: url_launcher_macos
sha256: b7244901ea3cf489c5335bdacda07264a6e960b1c1b1a9f91e4bc371d9e68234 sha256: "9a1a42d5d2d95400c795b2914c36fdcb525870c752569438e4ebb09a2b5d90de"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "3.1.0" version: "3.2.0"
url_launcher_platform_interface: url_launcher_platform_interface:
dependency: transitive dependency: transitive
description: description:
@ -1541,10 +1541,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: win32 name: win32
sha256: "0a989dc7ca2bb51eac91e8fd00851297cfffd641aa7538b165c62637ca0eaa4a" sha256: "0eaf06e3446824099858367950a813472af675116bf63f008a4c2a75ae13e9cb"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "5.4.0" version: "5.5.0"
window_manager: window_manager:
dependency: "direct main" dependency: "direct main"
description: description:

View File

@ -10,7 +10,7 @@ environment:
dependencies: dependencies:
animated_theme_switcher: ^2.0.10 animated_theme_switcher: ^2.0.10
ansicolor: ^2.0.2 ansicolor: ^2.0.2
archive: ^3.4.10 archive: ^3.5.1
async_tools: ^0.1.1 async_tools: ^0.1.1
awesome_extensions: ^2.0.14 awesome_extensions: ^2.0.14
badges: ^3.1.2 badges: ^3.1.2
@ -44,11 +44,11 @@ dependencies:
flutter_translate: ^4.0.4 flutter_translate: ^4.0.4
form_builder_validators: ^9.1.0 form_builder_validators: ^9.1.0
freezed_annotation: ^2.4.1 freezed_annotation: ^2.4.1
go_router: ^13.2.4 go_router: ^13.2.5
hydrated_bloc: ^9.1.5 hydrated_bloc: ^9.1.5
image: ^4.1.7 image: ^4.1.7
intl: ^0.18.1 intl: ^0.18.1
json_annotation: ^4.8.1 json_annotation: ^4.9.0
loggy: ^2.0.3 loggy: ^2.0.3
meta: ^1.11.0 meta: ^1.11.0
mobile_scanner: ^4.0.1 mobile_scanner: ^4.0.1
@ -65,7 +65,7 @@ dependencies:
quickalert: ^1.1.0 quickalert: ^1.1.0
radix_colors: ^1.0.4 radix_colors: ^1.0.4
reorderable_grid: ^1.0.10 reorderable_grid: ^1.0.10
searchable_listview: ^2.11.2 searchable_listview: ^2.12.0
share_plus: ^8.0.3 share_plus: ^8.0.3
shared_preferences: ^2.2.3 shared_preferences: ^2.2.3
signal_strength_indicator: ^0.4.1 signal_strength_indicator: ^0.4.1
@ -93,7 +93,7 @@ dev_dependencies:
build_runner: ^2.4.9 build_runner: ^2.4.9
freezed: ^2.5.2 freezed: ^2.5.2
icons_launcher: ^2.1.7 icons_launcher: ^2.1.7
json_serializable: ^6.7.1 json_serializable: ^6.8.0
lint_hard: ^4.0.0 lint_hard: ^4.0.0
flutter_native_splash: flutter_native_splash: