checkpoint

This commit is contained in:
Christien Rioux 2024-05-15 22:45:50 -04:00
parent 3315644ba8
commit 8cd73b2844
14 changed files with 513 additions and 114 deletions

View file

@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:math';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';

View file

@ -9,34 +9,74 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
@override
Future<bool> tryAppendItem(Uint8List value) async {
// Allocate empty index at the end of the list
final endPos = _spine.length;
final insertPos = _spine.length;
_spine.allocateTail(1);
final lookup = await _spine.lookupPosition(endPos);
final lookup = await _spine.lookupPosition(insertPos);
if (lookup == null) {
throw StateError("can't write to dht log");
}
// Write item to the segment
return lookup.shortArray
.operateWrite((write) async => write.tryWriteItem(lookup.pos, value));
return lookup.shortArray.operateWrite((write) async {
// If this a new segment, then clear it in case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw StateError('appending should be at the end');
}
return write.tryAddItem(value);
});
}
@override
Future<bool> tryAppendItems(List<Uint8List> values) async {
// Allocate empty index at the end of the list
final insertPos = _spine.length;
_spine.allocateTail(values.length);
// Look up the first position and shortarray
for (var valueIdx = 0; valueIdx < values.length;) {
final remaining = values.length - valueIdx;
final lookup = await _spine.lookupPosition(insertPos + valueIdx);
if (lookup == null) {
throw StateError("can't write to dht log");
}
final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos);
final success = await lookup.shortArray.operateWrite((write) async {
// If this a new segment, then clear it in case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw StateError('appending should be at the end');
}
return write.tryAddItems(values.sublist(valueIdx, valueIdx + sacount));
});
if (!success) {
return false;
}
valueIdx += sacount;
}
return true;
}
@override
Future<void> truncate(int count) async {
final len = _spine.length;
if (count > len) {
count = len;
}
count = min(count, _spine.length);
if (count == 0) {
return;
}
if (count < 0) {
throw StateError('can not remove negative items');
}
_spine.releaseHead(count);
await _spine.releaseHead(count);
}
@override
Future<void> clear() async {
_spine.releaseHead(_spine.length);
await _spine.releaseHead(_spine.length);
}
}

View file

@ -83,12 +83,8 @@ class _DHTLogSpine {
Future<void> delete() async {
await _spineMutex.protect(() async {
final pool = DHTRecordPool.instance;
final futures = <Future<void>>[pool.deleteRecord(_spineRecord.key)];
for (final (_, sc) in _spineCache) {
futures.add(sc.delete());
}
await Future.wait(futures);
// Will deep delete all segment records as they are children
await _spineRecord.delete();
});
}
@ -218,7 +214,7 @@ class _DHTLogSpine {
static TypedKey? _getSegmentKey(Uint8List subkeyData, int segment) {
final decodedLength = TypedKey.decodedLength<TypedKey>();
final segmentKeyBytes = subkeyData.sublist(
decodedLength * segment, (decodedLength + 1) * segment);
decodedLength * segment, decodedLength * (segment + 1));
if (segmentKeyBytes.equals(_emptySegmentKey)) {
return null;
}
@ -234,7 +230,7 @@ class _DHTLogSpine {
} else {
segmentKeyBytes = segmentKey.decode();
}
subkeyData.setRange(decodedLength * segment, (decodedLength + 1) * segment,
subkeyData.setRange(decodedLength * segment, decodedLength * (segment + 1),
segmentKeyBytes);
}
@ -435,7 +431,7 @@ class _DHTLogSpine {
_tail = (_tail + count) % _positionLimit;
}
void releaseHead(int count) {
Future<void> releaseHead(int count) async {
assert(_spineMutex.isLocked, 'should be locked');
final currentLength = length;
@ -447,6 +443,73 @@ class _DHTLogSpine {
}
_head = (_head + count) % _positionLimit;
await _purgeUnusedSegments();
}
Future<void> _deleteSegmentsContiguous(int start, int end) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
final startSegmentNumber = start ~/ DHTShortArray.maxElements;
final startSegmentPos = start % DHTShortArray.maxElements;
final endSegmentNumber = end ~/ DHTShortArray.maxElements;
final endSegmentPos = end % DHTShortArray.maxElements;
final firstDeleteSegment =
(startSegmentPos == 0) ? startSegmentNumber : startSegmentNumber + 1;
final lastDeleteSegment =
(endSegmentPos == 0) ? endSegmentNumber - 1 : endSegmentNumber - 2;
int? lastSubkey;
Uint8List? subkeyData;
for (var segmentNumber = firstDeleteSegment;
segmentNumber <= lastDeleteSegment;
segmentNumber++) {
// Lookup what subkey and segment subrange has this position's segment
// shortarray
final l = lookupSegment(segmentNumber);
final subkey = l.subkey;
final segment = l.segment;
if (lastSubkey != subkey) {
// Flush subkey writes
if (lastSubkey != null) {
await _spineRecord.eventualWriteBytes(subkeyData!,
subkey: lastSubkey);
}
xxx debug this, it takes forever
// Get next subkey
subkeyData = await _spineRecord.get(subkey: subkey);
if (subkeyData != null) {
lastSubkey = subkey;
} else {
lastSubkey = null;
}
}
if (subkeyData != null) {
final segmentKey = _getSegmentKey(subkeyData, segment);
if (segmentKey != null) {
await DHTRecordPool.instance.deleteRecord(segmentKey);
_setSegmentKey(subkeyData, segment, null);
}
}
}
// Flush subkey writes
if (lastSubkey != null) {
await _spineRecord.eventualWriteBytes(subkeyData!, subkey: lastSubkey);
}
}
Future<void> _purgeUnusedSegments() async {
assert(_spineMutex.isLocked, 'should be in mutex here');
if (_head < _tail) {
await _deleteSegmentsContiguous(0, _head);
await _deleteSegmentsContiguous(_tail, _positionLimit);
} else if (_head > _tail) {
await _deleteSegmentsContiguous(_tail, _head);
}
}
/////////////////////////////////////////////////////////////////////////////
@ -532,7 +595,7 @@ class _DHTLogSpine {
// Position of the start of the log (oldest items)
int _head;
// Position of the end of the log (newest items)
// Position of the end of the log (newest items) (exclusive)
int _tail;
// LRU cache of DHT spine elements accessed recently

View file

@ -91,7 +91,6 @@ class SharedDHTRecordData {
Map<int, int> subkeySeqCache = {};
bool needsWatchStateUpdate = false;
WatchState? unionWatchState;
bool deleteOnClose = false;
}
// Per opened record data
@ -128,6 +127,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
: _state = const DHTRecordPoolAllocations(),
_mutex = Mutex(),
_opened = <TypedKey, OpenedRecordInfo>{},
_markedForDelete = <TypedKey>{},
_routingContext = routingContext,
_veilid = veilid;
@ -140,6 +140,8 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
final Mutex _mutex;
// Which DHT records are currently open
final Map<TypedKey, OpenedRecordInfo> _opened;
// Which DHT records are marked for deletion
final Set<TypedKey> _markedForDelete;
// Default routing context to use for new keys
final VeilidRoutingContext _routingContext;
// Convenience accessor
@ -288,6 +290,8 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
return openedRecordInfo;
}
// Called when a DHTRecord is closed
// Cleans up the opened record housekeeping and processes any late deletions
Future<void> _recordClosed(DHTRecord record) async {
await _mutex.protect(() async {
final key = record.key;
@ -301,14 +305,37 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
if (openedRecordInfo.records.isEmpty) {
await _routingContext.closeDHTRecord(key);
if (openedRecordInfo.shared.deleteOnClose) {
await _deleteRecordInner(key);
}
_opened.remove(key);
await _checkForLateDeletesInner(key);
}
});
}
// Check to see if this key can finally be deleted
// If any parents are marked for deletion, try them first
Future<void> _checkForLateDeletesInner(TypedKey key) async {
// Get parent list in bottom up order including our own key
final parents = <TypedKey>[];
TypedKey? nextParent = key;
while (nextParent != null) {
parents.add(nextParent);
nextParent = getParentRecordKey(nextParent);
}
// If any parent is ready to delete all its children do it
for (final parent in parents) {
if (_markedForDelete.contains(parent)) {
final deleted = await _deleteRecordInner(parent);
if (!deleted) {
// If we couldn't delete a child then no 'marked for delete' parents
// above us will be ready to delete either
break;
}
}
}
}
// Collect all dependencies (including the record itself)
// in reverse (bottom-up/delete order)
List<TypedKey> _collectChildrenInner(TypedKey recordKey) {
@ -328,7 +355,13 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
return allDeps.reversedView;
}
String _debugChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
/// Collect all dependencies (including the record itself)
/// in reverse (bottom-up/delete order)
Future<List<TypedKey>> collectChildren(TypedKey recordKey) =>
_mutex.protect(() async => _collectChildrenInner(recordKey));
/// Print children
String debugChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
allDeps ??= _collectChildrenInner(recordKey);
// ignore: avoid_print
var out =
@ -342,32 +375,48 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
return out;
}
Future<void> _deleteRecordInner(TypedKey recordKey) async {
log('deleteDHTRecord: key=$recordKey');
// Actual delete function
Future<void> _finalizeDeleteRecordInner(TypedKey recordKey) async {
log('_finalizeDeleteRecordInner: key=$recordKey');
// Remove this child from parents
await _removeDependenciesInner([recordKey]);
await _routingContext.deleteDHTRecord(recordKey);
_markedForDelete.remove(recordKey);
}
Future<void> deleteRecord(TypedKey recordKey) async {
await _mutex.protect(() async {
final allDeps = _collectChildrenInner(recordKey);
if (allDeps.singleOrNull != recordKey) {
final dbgstr = _debugChildren(recordKey, allDeps: allDeps);
throw StateError('must delete children first: $dbgstr');
// Deep delete mechanism inside mutex
Future<bool> _deleteRecordInner(TypedKey recordKey) async {
final toDelete = _readyForDeleteInner(recordKey);
if (toDelete.isNotEmpty) {
// delete now
for (final deleteKey in toDelete) {
await _finalizeDeleteRecordInner(deleteKey);
}
return true;
}
// mark for deletion
_markedForDelete.add(recordKey);
return false;
}
final ori = _opened[recordKey];
if (ori != null) {
// delete after close
ori.shared.deleteOnClose = true;
} else {
// delete now
await _deleteRecordInner(recordKey);
/// Delete a record and its children if they are all closed
/// otherwise mark that record for deletion eventually
/// Returns true if the deletion was processed immediately
/// Returns false if the deletion was marked for later
Future<bool> deleteRecord(TypedKey recordKey) async =>
_mutex.protect(() async => _deleteRecordInner(recordKey));
// If everything underneath is closed including itself, return the
// list of children (and itself) to finally actually delete
List<TypedKey> _readyForDeleteInner(TypedKey recordKey) {
final allDeps = _collectChildrenInner(recordKey);
for (final dep in allDeps) {
if (_opened.containsKey(dep)) {
return [];
}
});
}
return allDeps;
}
void _validateParentInner(TypedKey? parent, TypedKey child) {
@ -456,6 +505,19 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
}
bool _isValidRecordKeyInner(TypedKey key) {
if (_state.rootRecords.contains(key)) {
return true;
}
if (_state.childrenByParent.containsKey(key.toJson())) {
return true;
}
return false;
}
Future<bool> isValidRecordKey(TypedKey key) =>
_mutex.protect(() async => _isValidRecordKeyInner(key));
///////////////////////////////////////////////////////////////////////
/// Create a root DHTRecord that has no dependent records

View file

@ -67,12 +67,8 @@ class _DHTShortArrayHead {
Future<void> delete() async {
await _headMutex.protect(() async {
final pool = DHTRecordPool.instance;
final futures = <Future<void>>[pool.deleteRecord(_headRecord.key)];
for (final lr in _linkedRecords) {
futures.add(pool.deleteRecord(lr.key));
}
await Future.wait(futures);
// Will deep delete all linked records as they are children
await _headRecord.delete();
});
}

View file

@ -8,19 +8,12 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
_DHTShortArrayWrite._(super.head) : super._();
@override
Future<bool> tryAddItem(Uint8List value) async {
// Allocate empty index at the end of the list
final pos = _head.length;
_head.allocateIndex(pos);
Future<bool> tryAddItem(Uint8List value) =>
tryInsertItem(_head.length, value);
// Write item
final ok = await tryWriteItem(pos, value);
if (!ok) {
_head.freeIndex(pos);
}
return ok;
}
@override
Future<bool> tryAddItems(List<Uint8List> values) =>
tryInsertItems(_head.length, values);
@override
Future<bool> tryInsertItem(int pos, Uint8List value) async {
@ -35,6 +28,29 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
return true;
}
@override
Future<bool> tryInsertItems(int pos, List<Uint8List> values) async {
// Allocate empty indices at the end of the list
for (var i = 0; i < values.length; i++) {
_head.allocateIndex(pos + i);
}
// Write items
var success = true;
final dws = DelayedWaitSet<void>();
for (var i = 0; i < values.length; i++) {
dws.add(() async {
final ok = await tryWriteItem(pos + i, values[i]);
if (!ok) {
_head.freeIndex(pos + i);
success = false;
}
});
}
await dws(chunkSize: maxDHTConcurrency, onChunkDone: (_) => success);
return success;
}
@override
Future<void> swapItem(int aPos, int bPos) async {
if (aPos < 0 || aPos >= _head.length) {

View file

@ -14,6 +14,13 @@ abstract class DHTAppendTruncate {
/// This may throw an exception if the number elements added exceeds limits.
Future<bool> tryAppendItem(Uint8List value);
/// Try to add a list of items to the end of the DHT data structure.
/// Return true if the elements were successfully added, and false if the
/// state changed before the element could be added or a newer value was found
/// on the network.
/// This may throw an exception if the number elements added exceeds limits.
Future<bool> tryAppendItems(List<Uint8List> values);
/// Try to remove a number of items from the head of the DHT data structure.
/// Throws StateError if count < 0
Future<void> truncate(int count);

View file

@ -29,12 +29,12 @@ extension DHTOpenableExt<D extends DHTOpenable> on D {
}
try {
final out = await scopeFunction(this);
await close();
return out;
} on Exception catch (_) {
return await scopeFunction(this);
} on Exception {
await delete();
rethrow;
} finally {
await close();
}
}

View file

@ -30,6 +30,13 @@ abstract class DHTRandomWrite {
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value);
/// Try to add a list of items to the end of the DHTArray. Return true if the
/// elements were successfully added, and false if the state changed before
/// the elements could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItems(List<Uint8List> values);
/// Try to insert an item as position 'pos' of the DHTArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
@ -38,6 +45,14 @@ abstract class DHTRandomWrite {
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value);
/// Try to insert items at position 'pos' of the DHTArray.
/// Return true if the elements were successfully inserted, and false if the
/// state changed before the elements could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItems(int pos, List<Uint8List> values);
/// Swap items at position 'aPos' and 'bPos' in the DHTArray.
/// Throws IndexError if either of the positions swapped exceed
/// the length of the list