spine update

This commit is contained in:
Christien Rioux 2024-06-07 21:38:19 -04:00
parent ddc02f6771
commit 68aad4a94e
3 changed files with 75 additions and 57 deletions

View File

@ -61,48 +61,39 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
print('adding singles\n');
{
final res = await arr.operateWrite((w) async {
for (var n = 4; n < 8; n++) {
for (var n = 4; n < 8; n++) {
await arr.operateWriteEventual((w) async {
print('$n ');
final success = await w.tryAdd(dataset[n]);
expect(success, isTrue);
}
});
expect(res, isNull);
return w.tryAdd(dataset[n]);
});
}
}
print('adding batch\n');
{
final res = await arr.operateWrite((w) async {
await arr.operateWriteEventual((w) async {
print('${dataset.length ~/ 2}-${dataset.length}');
final success = await w
return w
.tryAddAll(dataset.sublist(dataset.length ~/ 2, dataset.length));
expect(success, isTrue);
});
expect(res, isNull);
}
print('inserting singles\n');
{
final res = await arr.operateWrite((w) async {
for (var n = 0; n < 4; n++) {
for (var n = 0; n < 4; n++) {
await arr.operateWriteEventual((w) async {
print('$n ');
final success = await w.tryInsert(n, dataset[n]);
expect(success, isTrue);
}
});
expect(res, isNull);
return w.tryInsert(n, dataset[n]);
});
}
}
print('inserting batch\n');
{
final res = await arr.operateWrite((w) async {
await arr.operateWriteEventual((w) async {
print('8-${dataset.length ~/ 2}');
final success =
await w.tryInsertAll(8, dataset.sublist(8, dataset.length ~/ 2));
expect(success, isTrue);
return w.tryInsertAll(8, dataset.sublist(8, dataset.length ~/ 2));
});
expect(res, isNull);
}
//print('get all\n');

View File

@ -234,7 +234,8 @@ class _DHTLogSpine {
final existingData = await _spineRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _updateHead(proto.DHTLog.fromBuffer(existingData));
final existingHead = proto.DHTLog.fromBuffer(existingData);
_updateHead(existingHead.head, existingHead.tail, old: old);
if (old != null) {
sendUpdate(old.$1, old.$2);
}
@ -258,11 +259,22 @@ class _DHTLogSpine {
}
/// Validate a new spine head subkey that has come in from the network
Future<void> _updateHead(proto.DHTLog spineHead) async {
void _updateHead(int newHead, int newTail, {(int, int)? old}) {
assert(_spineMutex.isLocked, 'should be in mutex here');
_head = spineHead.head;
_tail = spineHead.tail;
if (old != null) {
final oldHead = old.$1;
final oldTail = old.$2;
final headDelta = _ringDistance(newHead, oldHead);
final tailDelta = _ringDistance(newTail, oldTail);
if (headDelta > _positionLimit ~/ 2 || tailDelta > _positionLimit ~/ 2) {
throw DHTExceptionInvalidData();
}
}
_head = newHead;
_tail = newTail;
}
/////////////////////////////////////////////////////////////////////////////
@ -456,7 +468,8 @@ class _DHTLogSpine {
// API for public interfaces
Future<_DHTLogPosition?> lookupPositionBySegmentNumber(
int segmentNumber, int segmentPos) async =>
int segmentNumber, int segmentPos,
{bool onlyOpened = false}) async =>
_spineCacheMutex.protect(() async {
// Get the segment shortArray
final openedSegment = _openedSegments[segmentNumber];
@ -465,6 +478,10 @@ class _DHTLogSpine {
openedSegment.openCount++;
shortArray = openedSegment.shortArray;
} else {
if (onlyOpened) {
return null;
}
final newShortArray = (_spineRecord.writer == null)
? await _openSegment(segmentNumber)
: await _openOrCreateSegment(segmentNumber);
@ -665,39 +682,42 @@ class _DHTLogSpine {
final headData = proto.DHTLog.fromBuffer(data);
// Then update the head record
await _spineMutex.protect(() async {
final oldHead = _head;
final oldTail = _tail;
await _updateHead(headData);
_spineChangeProcessor.updateState(headData, (headData) async {
await _spineMutex.protect(() async {
final oldHead = _head;
final oldTail = _tail;
// Lookup tail position segments that have changed
// and force their short arrays to refresh their heads
final segmentsToRefresh = <_DHTLogPosition>[];
int? lastSegmentNumber;
for (var curTail = oldTail;
curTail != _tail;
curTail = (curTail + 1) % _positionLimit) {
final segmentNumber = curTail ~/ DHTShortArray.maxElements;
final segmentPos = curTail % DHTShortArray.maxElements;
if (segmentNumber == lastSegmentNumber) {
continue;
_updateHead(headData.head, headData.tail, old: (oldHead, oldTail));
// Lookup tail position segments that have changed
// and force their short arrays to refresh their heads if
// they are opened
final segmentsToRefresh = <_DHTLogPosition>[];
for (var curTail = oldTail;
curTail != _tail;
curTail = (curTail +
(DHTShortArray.maxElements -
(curTail % DHTShortArray.maxElements))) %
_positionLimit) {
final segmentNumber = curTail ~/ DHTShortArray.maxElements;
final segmentPos = curTail % DHTShortArray.maxElements;
final dhtLogPosition = await lookupPositionBySegmentNumber(
segmentNumber, segmentPos,
onlyOpened: true);
if (dhtLogPosition == null) {
continue;
}
segmentsToRefresh.add(dhtLogPosition);
}
lastSegmentNumber = segmentNumber;
final dhtLogPosition =
await lookupPositionBySegmentNumber(segmentNumber, segmentPos);
if (dhtLogPosition == null) {
throw Exception('missing segment in dht log');
}
segmentsToRefresh.add(dhtLogPosition);
}
// Refresh the segments that have probably changed
await segmentsToRefresh.map((p) async {
await p.shortArray.refresh();
await p.close();
}).wait;
// Refresh the segments that have probably changed
await segmentsToRefresh.map((p) async {
await p.shortArray.refresh();
await p.close();
}).wait;
sendUpdate(oldHead, oldTail);
sendUpdate(oldHead, oldTail);
});
});
}
@ -723,6 +743,8 @@ class _DHTLogSpine {
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external spine head changes
void Function(DHTLogUpdate)? onUpdatedSpine;
// Single state processor for spine updates
final _spineChangeProcessor = SingleStateProcessor<proto.DHTLog>();
// Spine DHT record
final DHTRecord _spineRecord;

View File

@ -3,3 +3,8 @@ class DHTExceptionTryAgain implements Exception {
[this.cause = 'operation failed due to newer dht value']);
String cause;
}
class DHTExceptionInvalidData implements Exception {
DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']);
String cause;
}