short array operations

This commit is contained in:
Christien Rioux 2023-07-30 13:57:00 -04:00
parent 1907a15b0b
commit 57c366ef91
3 changed files with 241 additions and 57 deletions

View File

@ -4,6 +4,9 @@ import 'dart:typed_data';
T jsonDecodeBytes<T>(T Function(dynamic) fromJson, Uint8List data) =>
fromJson(jsonDecode(utf8.decode(data)));
T? jsonDecodeOptBytes<T>(T Function(dynamic) fromJson, Uint8List? data) =>
(data == null) ? null : fromJson(jsonDecode(utf8.decode(data)));
Uint8List jsonEncodeBytes(Object? object,
{Object? Function(Object?)? toEncodable}) =>
Uint8List.fromList(

View File

@ -12,6 +12,10 @@ class _DHTShortArrayCache {
: linkedRecords = List<DHTRecord>.empty(growable: true),
index = List<int>.empty(growable: true),
free = List<int>.empty(growable: true);
_DHTShortArrayCache.from(_DHTShortArrayCache other)
: linkedRecords = List.of(other.linkedRecords),
index = List.of(other.index),
free = List.of(other.free);
final List<DHTRecord> linkedRecords;
final List<int> index;
@ -94,20 +98,23 @@ class DHTShortArray {
////////////////////////////////////////////////////////////////
/// Write the current head cache out to a protobuf to be serialized
Uint8List _headToBuffer() {
/// Seralize and write out the current head record, possibly updating it
/// if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> _tryWriteHead() async {
final head = proto.DHTShortArray();
head.keys.addAll(_head.linkedRecords.map((lr) => lr.key.toProto()));
head.index.addAll(_head.index);
return head.writeToBuffer();
}
final headBuffer = head.writeToBuffer();
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecord.openWrite(
_headRecord.routingContext, recordKey, writer)
: await DHTRecord.openRead(_headRecord.routingContext, recordKey);
final existingData = await _headRecord.tryWriteBytes(headBuffer);
if (existingData != null) {
// Head write failed, incorporate update
await _newHead(proto.DHTShortArray.fromBuffer(existingData));
return false;
}
return true;
}
/// Validate the head from the DHT is properly formatted
@ -142,19 +149,17 @@ class DHTShortArray {
return free;
}
Future<bool> _refreshHead(
{bool forceRefresh = false, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) {
if (onlyUpdates) {
// No update
return false;
}
throw StateError('head missing during initial refresh');
}
/// Open a linked record for reading or writing, same as the head record
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecord.openWrite(
_headRecord.routingContext, recordKey, writer)
: await DHTRecord.openRead(_headRecord.routingContext, recordKey);
}
/// Validate a new head record
Future<void> _newHead(proto.DHTShortArray head) async {
// Get the set of new linked keys and validate it
final linkedKeys = head.keys.map(proto.TypedKeyProto.fromProto).toList();
final index = head.index;
@ -180,6 +185,7 @@ class DHTShortArray {
} on Exception catch (_) {
// On any exception close the records we have opened
await Future.wait(newRecords.entries.map((e) => e.value.close()));
rethrow;
}
// From this point forward we should not throw an exception or everything
@ -197,6 +203,24 @@ class DHTShortArray {
linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!))
..index.addAll(index)
..free.addAll(free);
}
/// Pull the latest or updated copy of the head record from the network
Future<bool> _refreshHead(
{bool forceRefresh = false, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) {
if (onlyUpdates) {
// No update
return false;
}
throw StateError('head missing during refresh');
}
await _newHead(head);
return true;
}
@ -249,19 +273,30 @@ class DHTShortArray {
return _head.linkedRecords[recordNumber];
}
// xxx: add
// xxx: insert
// xxx: swap
// xxx: remove
// xxx: clear
// xxx ensure these write the head back out because they change it
int _emptyIndex() {
if (_head.free.isNotEmpty) {
return _head.free.removeLast();
}
if (_head.index.length == maxElements) {
throw StateError('too many elements');
}
return _head.index.length;
}
Future<Uint8List?> getItem(int index, {bool forceRefresh = false}) async {
void _freeIndex(int idx) {
_head.free.add(idx);
// xxx: free list optimization here?
}
int length() => _head.index.length;
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
if (index < 0 || index >= _head.index.length) {
throw IndexError.withLength(index, _head.index.length);
if (pos < 0 || pos >= _head.index.length) {
throw IndexError.withLength(pos, _head.index.length);
}
final index = _head.index[pos];
final recordNumber = index ~/ _stride;
final record = _getRecord(recordNumber);
assert(record != null, 'Record does not exist');
@ -270,14 +305,162 @@ class DHTShortArray {
return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh);
}
Future<Uint8List?> tryWriteItem(int index, Uint8List newValue) async {
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => jsonDecodeOptBytes(fromJson, out));
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{bool forceRefresh = false}) =>
getItem(pos, forceRefresh: forceRefresh)
.then((out) => (out == null) ? null : fromBuffer(out));
Future<bool> tryAddItem(Uint8List value) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
late final int pos;
try {
// Allocate empty index
final idx = _emptyIndex();
// Add new index
pos = _head.index.length;
_head.index.add(idx);
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
// Head write succeeded, now write item
await eventualWriteItem(pos, value);
return true;
}
Future<bool> tryInsertItem(int pos, Uint8List value) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
// Allocate empty index
final idx = _emptyIndex();
// Add new index
_head.index.insert(pos, idx);
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
// Head write succeeded, now write item
await eventualWriteItem(pos, value);
return true;
}
Future<bool> trySwapItem(int aPos, int bPos) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
// Add new index
final aIdx = _head.index[aPos];
final bIdx = _head.index[bPos];
_head.index[aPos] = bIdx;
_head.index[bPos] = aIdx;
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
return true;
}
Future<Uint8List?> tryRemoveItem(int pos) async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
final removedIdx = _head.index.removeAt(pos);
_freeIndex(removedIdx);
final recordNumber = removedIdx ~/ _stride;
final record = _getRecord(recordNumber);
assert(record != null, 'Record does not exist');
final recordSubkey =
(removedIdx % _stride) + ((recordNumber == 0) ? 1 : 0);
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return null;
}
return record!.get(subkey: recordSubkey);
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return null;
}
}
Future<T?> tryRemoveItemJson<T>(
T Function(dynamic) fromJson,
int pos,
) =>
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos) =>
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
Future<bool> tryClear() async {
await _refreshHead(onlyUpdates: true);
final oldHead = _DHTShortArrayCache.from(_head);
try {
_head.index.clear();
_head.free.clear();
// Write new head
if (!await _tryWriteHead()) {
// Failed to write head means head got overwritten
return false;
}
} on Exception catch (_) {
// Exception on write means state needs to be reverted
_head = oldHead;
return false;
}
return true;
}
Future<Uint8List?> tryWriteItem(int pos, Uint8List newValue) async {
if (await _refreshHead(onlyUpdates: true)) {
throw StateError('structure changed');
}
if (index < 0 || index >= _head.index.length) {
throw IndexError.withLength(index, _head.index.length);
if (pos < 0 || pos >= _head.index.length) {
throw IndexError.withLength(pos, _head.index.length);
}
final index = _head.index[pos];
final recordNumber = index ~/ _stride;
final record = _getRecord(recordNumber);
assert(record != null, 'Record does not exist');
@ -286,19 +469,19 @@ class DHTShortArray {
return record!.tryWriteBytes(newValue, subkey: recordSubkey);
}
Future<void> eventualWriteItem(int index, Uint8List newValue) async {
Future<void> eventualWriteItem(int pos, Uint8List newValue) async {
Uint8List? oldData;
do {
// Set it back
oldData = await tryWriteItem(index, newValue);
oldData = await tryWriteItem(pos, newValue);
// Repeat if newer data on the network was found
} while (oldData != null);
}
Future<void> eventualUpdateItem(
int index, Future<Uint8List> Function(Uint8List oldValue) update) async {
var oldData = await getItem(index);
int pos, Future<Uint8List> Function(Uint8List oldValue) update) async {
var oldData = await getItem(pos);
// Ensure it exists already
if (oldData == null) {
throw const FormatException('value does not exist');
@ -308,7 +491,7 @@ class DHTShortArray {
final updatedData = await update(oldData!);
// Set it back
oldData = await tryWriteItem(index, updatedData);
oldData = await tryWriteItem(pos, updatedData);
// Repeat if newer data on the network was found
} while (oldData != null);
@ -316,47 +499,43 @@ class DHTShortArray {
Future<T?> tryWriteItemJson<T>(
T Function(dynamic) fromJson,
int index,
int pos,
T newValue,
) =>
tryWriteItem(index, jsonEncodeBytes(newValue)).then((out) {
if (out == null) {
return null;
}
return jsonDecodeBytes(fromJson, out);
});
tryWriteItem(pos, jsonEncodeBytes(newValue))
.then((out) => jsonDecodeOptBytes(fromJson, out));
Future<T?> tryWriteItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int index,
int pos,
T newValue,
) =>
tryWriteItem(index, newValue.writeToBuffer()).then((out) {
tryWriteItem(pos, newValue.writeToBuffer()).then((out) {
if (out == null) {
return null;
}
return fromBuffer(out);
});
Future<void> eventualWriteItemJson<T>(int index, T newValue) =>
eventualWriteItem(index, jsonEncodeBytes(newValue));
Future<void> eventualWriteItemJson<T>(int pos, T newValue) =>
eventualWriteItem(pos, jsonEncodeBytes(newValue));
Future<void> eventualWriteItemProtobuf<T extends GeneratedMessage>(
int index, T newValue,
int pos, T newValue,
{int subkey = -1}) =>
eventualWriteItem(index, newValue.writeToBuffer());
eventualWriteItem(pos, newValue.writeToBuffer());
Future<void> eventualUpdateItemJson<T>(
T Function(dynamic) fromJson,
int index,
int pos,
Future<T> Function(T) update,
) =>
eventualUpdateItem(index, jsonUpdate(fromJson, update));
eventualUpdateItem(pos, jsonUpdate(fromJson, update));
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int index,
int pos,
Future<T> Function(T) update,
) =>
eventualUpdateItem(index, protobufUpdate(fromBuffer, update));
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update));
}

View File

@ -1,3 +1,5 @@
// ignore_for_file: prefer_expression_function_bodies
import 'dart:typed_data';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';