mirror of https://gitlab.com/veilid/veilidchat.git synced 2025-03-13 19:06:32 -04:00

dht log implementation

This commit is contained in:
Christien Rioux 2024-05-14 10:06:43 -04:00
parent c4d25fecb0
commit 3315644ba8
5 changed files with 252 additions and 116 deletions

@ -1,2 +1,2 @@
export 'dht_array.dart';
export 'dht_array_cubit.dart';
export 'dht_log.dart';
export 'dht_log_cubit.dart';

@ -4,6 +4,7 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:collection/collection.dart';
import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto;
@ -15,6 +16,21 @@ part 'dht_log_append.dart';
class DHTLogUpdate extends Equatable {
const DHTLogUpdate(
{required this.headDelta, required this.tailDelta, required this.length})
: assert(headDelta >= 0, 'should never have negative head delta'),
assert(tailDelta >= 0, 'should never have negative tail delta'),
assert(length >= 0, 'should never have negative length');
final int headDelta;
final int tailDelta;
final int length;
List<Object?> get props => [headDelta, tailDelta, length];
/// DHTLog is a ring-buffer queue like data structure with the following
/// operations:
/// * Add elements to the tail
@ -30,8 +46,8 @@ class DHTLog implements DHTOpenable {
// Constructors
DHTLog._({required _DHTLogSpine spine}) : _spine = spine {
_spine.onUpdatedSpine = () {
_spine.onUpdatedSpine = (update) {
@ -225,7 +241,7 @@ class DHTLog implements DHTOpenable {
/// Listen to and any all changes to the structure of this log
/// regardless of where the changes are coming from
Future<StreamSubscription<void>> listen(
void Function() onChanged,
void Function(DHTLogUpdate) onChanged,
) {
if (!isOpen) {
throw StateError('log is not open"');
@ -235,7 +251,8 @@ class DHTLog implements DHTOpenable {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
_watchController =
StreamController<DHTLogUpdate>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
@ -249,7 +266,7 @@ class DHTLog implements DHTOpenable {
await _spine.watch();
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
return _watchController!.stream.listen((upd) => onChanged(upd));
@ -269,5 +286,5 @@ class DHTLog implements DHTOpenable {
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
// Stream of external changes
StreamController<void>? _watchController;
StreamController<DHTLogUpdate>? _watchController;

@ -8,112 +8,213 @@ import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
import '../interfaces/dht_append_truncate.dart';
// xxx paginate and remember to paginate watches (could use short array cubit as a subcubit here?)
class DHTLogElementState<T> extends Equatable {
const DHTLogElementState({required this.value, required this.isOffline});
final T value;
final bool isOffline;
// @immutable
// class DHTArrayElementState<T> extends Equatable {
// const DHTArrayElementState(
// {required this.value, required this.isOffline});
// final T value;
// final bool isOffline;
List<Object?> get props => [value, isOffline];
// @override
// List<Object?> get props => [value, isOffline];
// }
class DHTLogStateData<T> extends Equatable {
const DHTLogStateData(
{required this.elements,
required this.tail,
required this.count,
required this.follow});
// The view of the elements in the dhtlog
// Span is from [tail-length, tail)
final IList<DHTLogElementState<T>> elements;
// One past the end of the last element
final int tail;
// The total number of elements to try to keep in 'elements'
final int count;
// If we should have the tail following the log
final bool follow;
// typedef DHTArrayState<T> = AsyncValue<IList<DHTArrayElementState<T>>>;
// typedef DHTArrayBusyState<T> = BlocBusyState<DHTArrayState<T>>;
List<Object?> get props => [elements, tail, count, follow];
// class DHTArrayCubit<T> extends Cubit<DHTArrayBusyState<T>>
// with BlocBusyWrapper<DHTArrayState<T>> {
// DHTArrayCubit({
// required Future<DHTArray> Function() open,
// required T Function(List<int> data) decodeElement,
// }) : _decodeElement = decodeElement,
// super(const BlocBusyState(AsyncValue.loading())) {
// _initWait.add(() async {
// // Open DHT record
// _array = await open();
// _wantsCloseRecord = true;
typedef DHTLogState<T> = AsyncValue<DHTLogStateData<T>>;
typedef DHTLogBusyState<T> = BlocBusyState<DHTLogState<T>>;
// // Make initial state update
// await _refreshNoWait();
// _subscription = await _array.listen(_update);
// });
// }
class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
with BlocBusyWrapper<DHTLogState<T>> {
required Future<DHTLog> Function() open,
required T Function(List<int> data) decodeElement,
}) : _decodeElement = decodeElement,
super(const BlocBusyState(AsyncValue.loading())) {
_initWait.add(() async {
// Open DHT record
_log = await open();
_wantsCloseRecord = true;
// Future<void> refresh({bool forceRefresh = false}) async {
// await _initWait();
// await _refreshNoWait(forceRefresh: forceRefresh);
// }
// Make initial state update
await _refreshNoWait();
_subscription = await _log.listen(_update);
// Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
// busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh));
// Set the tail position of the log for pagination.
// If tail is 0, the end of the log is used.
// If tail is negative, the position is subtracted from the current log
// length.
// If tail is positive, the position is absolute from the head of the log
// If follow is enabled, the tail offset will update when the log changes
Future<void> setWindow(
{int? tail, int? count, bool? follow, bool forceRefresh = false}) async {
await _initWait();
if (tail != null) {
_tail = tail;
if (count != null) {
_count = count;
if (follow != null) {
_follow = follow;
await _refreshNoWait(forceRefresh: forceRefresh);
// Future<void> _refreshInner(void Function(DHTShortArrayState<T>) emit,
// {bool forceRefresh = false}) async {
// try {
// final newState = await _shortArray.operate((reader) async {
// final offlinePositions = await reader.getOfflinePositions();
// final allItems = (await reader.getAllItems(forceRefresh: forceRefresh))
// ?.indexed
// .map((x) => DHTShortArrayElementState(
// value: _decodeElement(x.$2),
// isOffline: offlinePositions.contains(x.$1)))
// .toIList();
// return allItems;
// });
// if (newState != null) {
// emit(AsyncValue.data(newState));
// }
// } on Exception catch (e) {
// emit(AsyncValue.error(e));
// }
// }
Future<void> refresh({bool forceRefresh = false}) async {
await _initWait();
await _refreshNoWait(forceRefresh: forceRefresh);
// void _update() {
// // Run at most one background update process
// // Because this is async, we could get an update while we're
// // still processing the last one. Only called after init future has run
// // so we dont have to wait for that here.
// _sspUpdate.busyUpdate<T, DHTShortArrayState<T>>(
// busy, (emit) async => _refreshInner(emit));
// }
Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh));
// @override
// Future<void> close() async {
// await _initWait();
// await _subscription?.cancel();
// _subscription = null;
// if (_wantsCloseRecord) {
// await _shortArray.close();
// }
// await super.close();
// }
Future<void> _refreshInner(void Function(AsyncValue<DHTLogStateData<T>>) emit,
{bool forceRefresh = false}) async {
final avElements = await _loadElements(_tail, _count);
final err = avElements.asError;
if (err != null) {
emit(AsyncValue.error(err.error, err.stackTrace));
final loading = avElements.asLoading;
if (loading != null) {
emit(const AsyncValue.loading());
final elements = avElements.asData!.value;
elements: elements, tail: _tail, count: _count, follow: _follow)));
// Future<R?> operate<R>(Future<R?> Function(DHTShortArrayRead) closure) async {
// await _initWait();
// return _shortArray.operate(closure);
// }
Future<AsyncValue<IList<DHTLogElementState<T>>>> _loadElements(
int tail, int count,
{bool forceRefresh = false}) async {
try {
final allItems = await _log.operate((reader) async {
final length = reader.length;
final end = ((tail - 1) % length) + 1;
final start = (count < end) ? end - count : 0;
// Future<(R?, bool)> operateWrite<R>(
// Future<R?> Function(DHTShortArrayWrite) closure) async {
// await _initWait();
// return _shortArray.operateWrite(closure);
// }
final offlinePositions = await reader.getOfflinePositions();
final allItems = (await reader.getItemRange(start,
length: end - start, forceRefresh: forceRefresh))
.map((x) => DHTLogElementState(
value: _decodeElement(x.$2),
isOffline: offlinePositions.contains(x.$1)))
return allItems;
if (allItems == null) {
return const AsyncValue.loading();
return AsyncValue.data(allItems);
} on Exception catch (e, st) {
return AsyncValue.error(e, st);
// Future<void> operateWriteEventual(
// Future<bool> Function(DHTShortArrayWrite) closure,
// {Duration? timeout}) async {
// await _initWait();
// return _shortArray.operateWriteEventual(closure, timeout: timeout);
// }
void _update(DHTLogUpdate upd) {
// Run at most one background update process
// Because this is async, we could get an update while we're
// still processing the last one. Only called after init future has run
// so we dont have to wait for that here.
// final WaitSet<void> _initWait = WaitSet();
// late final DHTShortArray _shortArray;
// final T Function(List<int> data) _decodeElement;
// StreamSubscription<void>? _subscription;
// bool _wantsCloseRecord = false;
// final _sspUpdate = SingleStatelessProcessor();
// }
// Accumulate head and tail deltas
_headDelta += upd.headDelta;
_tailDelta += upd.tailDelta;
_sspUpdate.busyUpdate<T, DHTLogState<T>>(busy, (emit) async {
// apply follow
if (_follow) {
if (_tail <= 0) {
// Negative tail is already following tail changes
} else {
// Positive tail is measured from the head, so apply deltas
_tail = (_tail + _tailDelta - _headDelta) % upd.length;
} else {
if (_tail <= 0) {
// Negative tail is following tail changes so apply deltas
var posTail = _tail + upd.length;
posTail = (posTail + _tailDelta - _headDelta) % upd.length;
_tail = posTail - upd.length;
} else {
// Positive tail is measured from head so not following tail
_headDelta = 0;
_tailDelta = 0;
await _refreshInner(emit);
Future<void> close() async {
await _initWait();
await _subscription?.cancel();
_subscription = null;
if (_wantsCloseRecord) {
await _log.close();
await super.close();
Future<R?> operate<R>(Future<R?> Function(DHTRandomRead) closure) async {
await _initWait();
return _log.operate(closure);
Future<R> operateAppend<R>(
Future<R> Function(DHTAppendTruncateRandomRead) closure) async {
await _initWait();
return _log.operateAppend(closure);
Future<void> operateAppendEventual(
Future<bool> Function(DHTAppendTruncateRandomRead) closure,
{Duration? timeout}) async {
await _initWait();
return _log.operateAppendEventual(closure, timeout: timeout);
final WaitSet<void> _initWait = WaitSet();
late final DHTLog _log;
final T Function(List<int> data) _decodeElement;
StreamSubscription<void>? _subscription;
bool _wantsCloseRecord = false;
final _sspUpdate = SingleStatelessProcessor();
// Accumulated deltas since last update
var _headDelta = 0;
var _tailDelta = 0;
// Cubit window into the DHTLog
var _tail = 0;
var _count = DHTShortArray.maxElements;
var _follow = true;

@ -105,13 +105,11 @@ class _DHTLogSpine {
try {
final out = await closure(this);
// Write head assuming it has been changed
if (!await writeSpineHead()) {
if (!await writeSpineHead(old: (oldHead, oldTail))) {
// Failed to write head means head got overwritten so write should
// be considered failed
throw DHTExceptionTryAgain();
return out;
} on Exception {
// Exception means state needs to be reverted
@ -134,7 +132,6 @@ class _DHTLogSpine {
try {
// Iterate until we have a successful element and head write
do {
// Save off old values each pass of writeSpineHead because the head
// will have changed
@ -158,9 +155,7 @@ class _DHTLogSpine {
// Try to do the head write
} while (!await writeSpineHead());
} while (!await writeSpineHead(old: (oldHead, oldTail)));
} on Exception {
// Exception means state needs to be reverted
_head = oldHead;
@ -173,7 +168,7 @@ class _DHTLogSpine {
/// Serialize and write out the current spine head subkey, possibly updating
/// it if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> writeSpineHead() async {
Future<bool> writeSpineHead({(int, int)? old}) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
final headBuffer = _toProto().writeToBuffer();
@ -182,12 +177,28 @@ class _DHTLogSpine {
if (existingData != null) {
// Head write failed, incorporate update
await _updateHead(proto.DHTLog.fromBuffer(existingData));
if (old != null) {
sendUpdate(old.$1, old.$2);
return false;
if (old != null) {
sendUpdate(old.$1, old.$2);
return true;
/// Send a spine update callback
void sendUpdate(int oldHead, int oldTail) {
final oldLength = _ringDistance(oldTail, oldHead);
if (oldHead != _head || oldTail != _tail || oldLength != length) {
headDelta: _ringDistance(_head, oldHead),
tailDelta: _ringDistance(_tail, oldTail),
length: length));
/// Validate a new spine head subkey that has come in from the network
Future<void> _updateHead(proto.DHTLog spineHead) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
@ -486,8 +497,10 @@ class _DHTLogSpine {
// Then update the head record
await _spineMutex.protect(() async {
final oldHead = _head;
final oldTail = _tail;
await _updateHead(headData);
sendUpdate(oldHead, oldTail);
@ -495,10 +508,14 @@ class _DHTLogSpine {
TypedKey get recordKey => _spineRecord.key;
OwnedDHTRecordPointer get recordPointer => _spineRecord.ownedDHTRecordPointer;
int get length =>
(_tail < _head) ? (_positionLimit - _head) + _tail : _tail - _head;
int get length => _ringDistance(_tail, _head);
bool get isOpen => _spineRecord.isOpen;
// Ring buffer distance from old to new
static int _ringDistance(int n, int o) =>
(n < o) ? (_positionLimit - o) + n : n - o;
static const _positionLimit = DHTLog.segmentsPerSubkey *
DHTLog.spineSubkeys *
@ -508,7 +525,7 @@ class _DHTLogSpine {
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external spine head changes
void Function()? onUpdatedSpine;
void Function(DHTLogUpdate)? onUpdatedSpine;
// Spine DHT record
final DHTRecord _spineRecord;

@ -183,7 +183,8 @@ class DHTShortArray implements DHTOpenable {
/// Runs a closure allowing read-write access to the shortarray
/// Makes only one attempt to consistently write the changes to the DHT
/// Returns result of the closure if the write could be performed
/// Throws DHTOperateException if the write could not be performed at this time
/// Throws DHTOperateException if the write could not be performed
/// at this time
Future<T> operateWrite<T>(
Future<T> Function(DHTRandomReadWrite) closure) async {
if (!isOpen) {