tabledb array work

This commit is contained in:
Christien Rioux 2024-05-25 22:46:43 -04:00
parent 83c8715742
commit 5d89de9bfe
45 changed files with 3022 additions and 1035 deletions

View file

@ -9,11 +9,11 @@ import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto;
import '../interfaces/dht_append_truncate.dart';
import '../interfaces/dht_append.dart';
part 'dht_log_spine.dart';
part 'dht_log_read.dart';
part 'dht_log_append.dart';
part 'dht_log_write.dart';
///////////////////////////////////////////////////////////////////////
@ -60,7 +60,7 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
int stride = DHTShortArray.maxElements,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer}) async {
assert(stride <= DHTShortArray.maxElements, 'stride too long');
final pool = DHTRecordPool.instance;
@ -102,7 +102,7 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
{required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
VeilidCrypto? crypto}) async {
final spineRecord = await DHTRecordPool.instance.openRecordRead(
logRecordKey,
debugName: debugName,
@ -125,7 +125,7 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) async {
final spineRecord = await DHTRecordPool.instance.openRecordWrite(
logRecordKey, writer,
@ -148,7 +148,7 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
required String debugName,
required TypedKey parent,
VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) =>
openWrite(
ownedLogRecordPointer.recordKey,
@ -209,7 +209,8 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
OwnedDHTRecordPointer get recordPointer => _spine.recordPointer;
/// Runs a closure allowing read-only access to the log
Future<T?> operate<T>(Future<T?> Function(DHTRandomRead) closure) async {
Future<T?> operate<T>(
Future<T?> Function(DHTLogReadOperations) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
}
@ -226,13 +227,13 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
/// Throws DHTOperateException if the write could not be performed
/// at this time
Future<T> operateAppend<T>(
Future<T> Function(DHTAppendTruncateRandomRead) closure) async {
Future<T> Function(DHTLogWriteOperations) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
}
return _spine.operateAppend((spine) async {
final writer = _DHTLogAppend._(spine);
final writer = _DHTLogWrite._(spine);
return closure(writer);
});
}
@ -244,14 +245,14 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
/// succeeded, returning false will trigger another eventual consistency
/// attempt.
Future<void> operateAppendEventual(
Future<bool> Function(DHTAppendTruncateRandomRead) closure,
Future<bool> Function(DHTLogWriteOperations) closure,
{Duration? timeout}) async {
if (!isOpen) {
throw StateError('log is not open"');
}
return _spine.operateAppendEventual((spine) async {
final writer = _DHTLogAppend._(spine);
final writer = _DHTLogWrite._(spine);
return closure(writer);
}, timeout: timeout);
}

View file

@ -8,7 +8,6 @@ import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
import '../interfaces/dht_append_truncate.dart';
@immutable
class DHTLogElementState<T> extends Equatable {
@ -184,19 +183,20 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
await super.close();
}
Future<R?> operate<R>(Future<R?> Function(DHTRandomRead) closure) async {
Future<R?> operate<R>(
Future<R?> Function(DHTLogReadOperations) closure) async {
await _initWait();
return _log.operate(closure);
}
Future<R> operateAppend<R>(
Future<R> Function(DHTAppendTruncateRandomRead) closure) async {
Future<R> Function(DHTLogWriteOperations) closure) async {
await _initWait();
return _log.operateAppend(closure);
}
Future<void> operateAppendEventual(
Future<bool> Function(DHTAppendTruncateRandomRead) closure,
Future<bool> Function(DHTLogWriteOperations) closure,
{Duration? timeout}) async {
await _initWait();
return _log.operateAppendEventual(closure, timeout: timeout);

View file

@ -3,7 +3,9 @@ part of 'dht_log.dart';
////////////////////////////////////////////////////////////////////////////
// Reader-only implementation
class _DHTLogRead implements DHTRandomRead {
abstract class DHTLogReadOperations implements DHTRandomRead {}
class _DHTLogRead implements DHTLogReadOperations {
_DHTLogRead._(_DHTLogSpine spine) : _spine = spine;
@override

View file

@ -1,13 +1,32 @@
part of 'dht_log.dart';
////////////////////////////////////////////////////////////////////////////
// Append/truncate implementation
// Writer implementation
class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
_DHTLogAppend._(super.spine) : super._();
abstract class DHTLogWriteOperations
implements DHTRandomRead, DHTRandomWrite, DHTAdd, DHTTruncate, DHTClear {}
class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
_DHTLogWrite._(super.spine) : super._();
@override
Future<bool> tryAppendItem(Uint8List value) async {
Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output}) async {
if (pos < 0 || pos >= _spine.length) {
throw IndexError.withLength(pos, _spine.length);
}
final lookup = await _spine.lookupPosition(pos);
if (lookup == null) {
throw StateError("can't write to dht log");
}
// Write item to the segment
return lookup.scope((sa) => sa.operateWrite((write) async =>
write.tryWriteItem(lookup.pos, newValue, output: output)));
}
@override
Future<bool> tryAddItem(Uint8List value) async {
// Allocate empty index at the end of the list
final insertPos = _spine.length;
_spine.allocateTail(1);
@ -30,7 +49,7 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
}
@override
Future<bool> tryAppendItems(List<Uint8List> values) async {
Future<bool> tryAddItems(List<Uint8List> values) async {
// Allocate empty index at the end of the list
final insertPos = _spine.length;
_spine.allocateTail(values.length);
@ -76,15 +95,14 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
}
@override
Future<void> truncate(int count) async {
count = min(count, _spine.length);
if (count == 0) {
Future<void> truncate(int newLength) async {
if (newLength < 0) {
throw StateError('can not truncate to negative length');
}
if (newLength >= _spine.length) {
return;
}
if (count < 0) {
throw StateError('can not remove negative items');
}
await _spine.releaseHead(count);
await _spine.releaseHead(_spine.length - newLength);
}
@override

View file

@ -1,4 +1,3 @@
export 'default_dht_record_cubit.dart';
export 'dht_record_crypto.dart';
export 'dht_record_cubit.dart';
export 'dht_record_pool.dart';

View file

@ -42,7 +42,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
required SharedDHTRecordData sharedDHTRecordData,
required int defaultSubkey,
required KeyPair? writer,
required DHTRecordCrypto crypto,
required VeilidCrypto crypto,
required this.debugName})
: _crypto = crypto,
_routingContext = routingContext,
@ -104,7 +104,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
int get subkeyCount =>
_sharedDHTRecordData.recordDescriptor.schema.subkeyCount();
KeyPair? get writer => _writer;
DHTRecordCrypto get crypto => _crypto;
VeilidCrypto get crypto => _crypto;
OwnedDHTRecordPointer get ownedDHTRecordPointer =>
OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!);
int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey;
@ -118,7 +118,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// returned if one was returned.
Future<Uint8List?> get(
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
@ -146,7 +146,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
return null;
}
// If we're returning a value, decrypt it
final out = (crypto ?? _crypto).decrypt(valueData.data, subkey);
final out = (crypto ?? _crypto).decrypt(valueData.data);
if (outSeqNum != null) {
outSeqNum.save(valueData.seq);
}
@ -163,7 +163,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// returned if one was returned.
Future<T?> getJson<T>(T Function(dynamic) fromJson,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum}) async {
final data = await get(
@ -189,7 +189,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
Future<T?> getProtobuf<T extends GeneratedMessage>(
T Function(List<int> i) fromBuffer,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum}) async {
final data = await get(
@ -208,13 +208,12 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// If the value was succesfully written, null is returned
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = await _localSubkeySeq(subkey);
final encryptedNewValue =
await (crypto ?? _crypto).encrypt(newValue, subkey);
final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue);
// Set the new data if possible
var newValueData = await _routingContext
@ -246,7 +245,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
// Decrypt value to return it
final decryptedNewValue =
await (crypto ?? _crypto).decrypt(newValueData.data, subkey);
await (crypto ?? _crypto).decrypt(newValueData.data);
if (isUpdated) {
DHTRecordPool.instance
.processLocalValueChange(key, decryptedNewValue, subkey);
@ -259,13 +258,12 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// will be made to write the subkey until this succeeds
Future<void> eventualWriteBytes(Uint8List newValue,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = await _localSubkeySeq(subkey);
final encryptedNewValue =
await (crypto ?? _crypto).encrypt(newValue, subkey);
final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue);
ValueData? newValueData;
do {
@ -309,7 +307,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
Future<void> eventualUpdateBytes(
Future<Uint8List> Function(Uint8List? oldValue) update,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
@ -334,7 +332,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// Like 'tryWriteBytes' but with JSON marshal/unmarshal of the value
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
tryWriteBytes(jsonEncodeBytes(newValue),
@ -353,7 +351,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
Future<T?> tryWriteProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, T newValue,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
tryWriteBytes(newValue.writeToBuffer(),
@ -371,7 +369,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// Like 'eventualWriteBytes' but with JSON marshal/unmarshal of the value
Future<void> eventualWriteJson<T>(T newValue,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualWriteBytes(jsonEncodeBytes(newValue),
@ -380,7 +378,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// Like 'eventualWriteBytes' but with protobuf marshal/unmarshal of the value
Future<void> eventualWriteProtobuf<T extends GeneratedMessage>(T newValue,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualWriteBytes(newValue.writeToBuffer(),
@ -390,7 +388,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
Future<void> eventualUpdateJson<T>(
T Function(dynamic) fromJson, Future<T> Function(T?) update,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualUpdateBytes(jsonUpdate(fromJson, update),
@ -400,7 +398,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, Future<T> Function(T?) update,
{int subkey = -1,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualUpdateBytes(protobufUpdate(fromBuffer, update),
@ -433,7 +431,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys)
onUpdate, {
bool localChanges = true,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) async {
// Set up watch requirements
_watchController ??=
@ -457,8 +455,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
final changeData = change.data;
data = changeData == null
? null
: await (crypto ?? _crypto)
.decrypt(changeData, change.subkeys.first.low);
: await (crypto ?? _crypto).decrypt(changeData);
}
await onUpdate(this, data, change.subkeys);
});
@ -544,7 +541,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
final VeilidRoutingContext _routingContext;
final int _defaultSubkey;
final KeyPair? _writer;
final DHTRecordCrypto _crypto;
final VeilidCrypto _crypto;
final String debugName;
final _mutex = Mutex();
int _openCount;

View file

@ -1,53 +0,0 @@
import 'dart:async';
import 'dart:typed_data';
import '../../../../../veilid_support.dart';
abstract class DHTRecordCrypto {
Future<Uint8List> encrypt(Uint8List data, int subkey);
Future<Uint8List> decrypt(Uint8List data, int subkey);
}
////////////////////////////////////
/// Private DHT Record: Encrypted for a specific symmetric key
class DHTRecordCryptoPrivate implements DHTRecordCrypto {
DHTRecordCryptoPrivate._(
VeilidCryptoSystem cryptoSystem, SharedSecret secretKey)
: _cryptoSystem = cryptoSystem,
_secretKey = secretKey;
final VeilidCryptoSystem _cryptoSystem;
final SharedSecret _secretKey;
static Future<DHTRecordCryptoPrivate> fromTypedKeyPair(
TypedKeyPair typedKeyPair) async {
final cryptoSystem =
await Veilid.instance.getCryptoSystem(typedKeyPair.kind);
final secretKey = typedKeyPair.secret;
return DHTRecordCryptoPrivate._(cryptoSystem, secretKey);
}
static Future<DHTRecordCryptoPrivate> fromSecret(
CryptoKind kind, SharedSecret secretKey) async {
final cryptoSystem = await Veilid.instance.getCryptoSystem(kind);
return DHTRecordCryptoPrivate._(cryptoSystem, secretKey);
}
@override
Future<Uint8List> encrypt(Uint8List data, int subkey) =>
_cryptoSystem.encryptNoAuthWithNonce(data, _secretKey);
@override
Future<Uint8List> decrypt(Uint8List data, int subkey) =>
_cryptoSystem.decryptNoAuthWithNonce(data, _secretKey);
}
////////////////////////////////////
/// Public DHT Record: No encryption
class DHTRecordCryptoPublic implements DHTRecordCrypto {
const DHTRecordCryptoPublic();
@override
Future<Uint8List> encrypt(Uint8List data, int subkey) async => data;
@override
Future<Uint8List> decrypt(Uint8List data, int subkey) async => data;
}

View file

@ -526,7 +526,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
TypedKey? parent,
DHTSchema schema = const DHTSchema.dflt(oCnt: 1),
int defaultSubkey = 0,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer,
}) async =>
_mutex.protect(() async {
@ -547,7 +547,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
writer: writer ??
openedRecordInfo.shared.recordDescriptor.ownerKeyPair(),
crypto: crypto ??
await DHTRecordCryptoPrivate.fromTypedKeyPair(openedRecordInfo
await VeilidCryptoPrivate.fromTypedKeyPair(openedRecordInfo
.shared.recordDescriptor
.ownerTypedKeyPair()!));
@ -562,7 +562,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
VeilidRoutingContext? routingContext,
TypedKey? parent,
int defaultSubkey = 0,
DHTRecordCrypto? crypto}) async =>
VeilidCrypto? crypto}) async =>
_mutex.protect(() async {
final dhtctx = routingContext ?? _routingContext;
@ -578,7 +578,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
defaultSubkey: defaultSubkey,
sharedDHTRecordData: openedRecordInfo.shared,
writer: null,
crypto: crypto ?? const DHTRecordCryptoPublic());
crypto: crypto ?? const VeilidCryptoPublic());
openedRecordInfo.records.add(rec);
@ -593,7 +593,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
VeilidRoutingContext? routingContext,
TypedKey? parent,
int defaultSubkey = 0,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) async =>
_mutex.protect(() async {
final dhtctx = routingContext ?? _routingContext;
@ -612,7 +612,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
writer: writer,
sharedDHTRecordData: openedRecordInfo.shared,
crypto: crypto ??
await DHTRecordCryptoPrivate.fromTypedKeyPair(
await VeilidCryptoPrivate.fromTypedKeyPair(
TypedKeyPair.fromKeyPair(recordKey.kind, writer)));
openedRecordInfo.records.add(rec);
@ -632,7 +632,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
required TypedKey parent,
VeilidRoutingContext? routingContext,
int defaultSubkey = 0,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) =>
openRecordWrite(
ownedDHTRecordPointer.recordKey,

View file

@ -33,7 +33,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
int stride = maxElements,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
KeyPair? writer}) async {
assert(stride <= maxElements, 'stride too long');
final pool = DHTRecordPool.instance;
@ -79,7 +79,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
{required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
VeilidCrypto? crypto}) async {
final dhtRecord = await DHTRecordPool.instance.openRecordRead(headRecordKey,
debugName: debugName,
parent: parent,
@ -101,7 +101,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) async {
final dhtRecord = await DHTRecordPool.instance.openRecordWrite(
headRecordKey, writer,
@ -124,7 +124,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
required String debugName,
required TypedKey parent,
VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto,
VeilidCrypto? crypto,
}) =>
openWrite(
ownedShortArrayRecordPointer.recordKey,
@ -186,7 +186,8 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
OwnedDHTRecordPointer get recordPointer => _head.recordPointer;
/// Runs a closure allowing read-only access to the shortarray
Future<T> operate<T>(Future<T> Function(DHTRandomRead) closure) async {
Future<T> operate<T>(
Future<T> Function(DHTShortArrayReadOperations) closure) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
@ -203,7 +204,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
/// Throws DHTOperateException if the write could not be performed
/// at this time
Future<T> operateWrite<T>(
Future<T> Function(DHTRandomReadWrite) closure) async {
Future<T> Function(DHTShortArrayWriteOperations) closure) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
@ -221,7 +222,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
/// succeeded, returning false will trigger another eventual consistency
/// attempt.
Future<void> operateWriteEventual(
Future<bool> Function(DHTRandomReadWrite) closure,
Future<bool> Function(DHTShortArrayWriteOperations) closure,
{Duration? timeout}) async {
if (!isOpen) {
throw StateError('short array is not open"');

View file

@ -91,19 +91,20 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
await super.close();
}
Future<R> operate<R>(Future<R> Function(DHTRandomRead) closure) async {
Future<R> operate<R>(
Future<R> Function(DHTShortArrayReadOperations) closure) async {
await _initWait();
return _shortArray.operate(closure);
}
Future<R> operateWrite<R>(
Future<R> Function(DHTRandomReadWrite) closure) async {
Future<R> Function(DHTShortArrayWriteOperations) closure) async {
await _initWait();
return _shortArray.operateWrite(closure);
}
Future<void> operateWriteEventual(
Future<bool> Function(DHTRandomReadWrite) closure,
Future<bool> Function(DHTShortArrayWriteOperations) closure,
{Duration? timeout}) async {
await _initWait();
return _shortArray.operateWriteEventual(closure, timeout: timeout);

View file

@ -3,7 +3,9 @@ part of 'dht_short_array.dart';
////////////////////////////////////////////////////////////////////////////
// Reader-only implementation
class _DHTShortArrayRead implements DHTRandomRead {
abstract class DHTShortArrayReadOperations implements DHTRandomRead {}
class _DHTShortArrayRead implements DHTShortArrayReadOperations {
_DHTShortArrayRead._(_DHTShortArrayHead head) : _head = head;
@override

View file

@ -3,8 +3,16 @@ part of 'dht_short_array.dart';
////////////////////////////////////////////////////////////////////////////
// Writer implementation
abstract class DHTShortArrayWriteOperations
implements
DHTRandomRead,
DHTRandomWrite,
DHTInsertRemove,
DHTAdd,
DHTClear {}
class _DHTShortArrayWrite extends _DHTShortArrayRead
implements DHTRandomReadWrite {
implements DHTShortArrayWriteOperations {
_DHTShortArrayWrite._(super.head) : super._();
@override

View file

@ -0,0 +1,41 @@
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Add
abstract class DHTAdd {
/// Try to add an item to the DHT container.
/// Return true if the element was successfully added, and false if the state
/// changed before the element could be added or a newer value was found on
/// the network.
/// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryAddItem(Uint8List value);
/// Try to add a list of items to the DHT container.
/// Return true if the elements were successfully added, and false if the
/// state changed before the element could be added or a newer value was found
/// on the network.
/// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryAddItems(List<Uint8List> values);
}
extension DHTAddExt on DHTAdd {
/// Convenience function:
/// Like tryAddItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<bool> tryAppendItemJson<T>(
T newValue,
) =>
tryAddItem(jsonEncodeBytes(newValue));
/// Convenience function:
/// Like tryAddItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<bool> tryAddItemProtobuf<T extends GeneratedMessage>(
T newValue,
) =>
tryAddItem(newValue.writeToBuffer());
}

View file

@ -1,51 +0,0 @@
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Append/truncate interface
abstract class DHTAppendTruncate {
/// Try to add an item to the end of the DHT data structure.
/// Return true if the element was successfully added, and false if the state
/// changed before the element could be added or a newer value was found on
/// the network.
/// This may throw an exception if the number elements added exceeds limits.
Future<bool> tryAppendItem(Uint8List value);
/// Try to add a list of items to the end of the DHT data structure.
/// Return true if the elements were successfully added, and false if the
/// state changed before the element could be added or a newer value was found
/// on the network.
/// This may throw an exception if the number elements added exceeds limits.
Future<bool> tryAppendItems(List<Uint8List> values);
/// Try to remove a number of items from the head of the DHT data structure.
/// Throws StateError if count < 0
Future<void> truncate(int count);
/// Remove all items in the DHT data structure.
Future<void> clear();
}
abstract class DHTAppendTruncateRandomRead
implements DHTAppendTruncate, DHTRandomRead {}
extension DHTAppendTruncateExt on DHTAppendTruncate {
/// Convenience function:
/// Like tryAppendItem but also encodes the input value as JSON and parses the
/// returned element as JSON
Future<bool> tryAppendItemJson<T>(
T newValue,
) =>
tryAppendItem(jsonEncodeBytes(newValue));
/// Convenience function:
/// Like tryAppendItem but also encodes the input value as a protobuf object
/// and parses the returned element as a protobuf object
Future<bool> tryAppendItemProtobuf<T extends GeneratedMessage>(
T newValue,
) =>
tryAppendItem(newValue.writeToBuffer());
}

View file

@ -0,0 +1,7 @@
////////////////////////////////////////////////////////////////////////////
// Clear interface
// ignore: one_member_abstracts
abstract class DHTClear {
/// Remove all items in the DHT container.
Future<void> clear();
}

View file

@ -0,0 +1,60 @@
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Insert/Remove interface
abstract class DHTInsertRemove {
/// Try to insert an item as position 'pos' of the DHT container.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// Throws an IndexError if the position removed exceeds the length of
/// the container.
/// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryInsertItem(int pos, Uint8List value);
/// Try to insert items at position 'pos' of the DHT container.
/// Return true if the elements were successfully inserted, and false if the
/// state changed before the elements could be inserted or a newer value was
/// found on the network.
/// Throws an IndexError if the position removed exceeds the length of
/// the container.
/// Throws a StateError if the container exceeds its maximum size.
Future<bool> tryInsertItems(int pos, List<Uint8List> values);
/// Swap items at position 'aPos' and 'bPos' in the DHTArray.
/// Throws an IndexError if either of the positions swapped exceeds the length
/// of the container
Future<void> swapItem(int aPos, int bPos);
/// Remove an item at position 'pos' in the DHT container.
/// If the remove was successful this returns:
/// * outValue will return the prior contents of the element
/// Throws an IndexError if the position removed exceeds the length of
/// the container.
Future<void> removeItem(int pos, {Output<Uint8List>? output});
}
extension DHTInsertRemoveExt on DHTInsertRemove {
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<void> removeItemJson<T>(T Function(dynamic) fromJson, int pos,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes);
output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b));
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<void> removeItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes);
output.mapSave(outValueBytes, fromBuffer);
}
}

View file

@ -7,22 +7,21 @@ import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Reader interface
abstract class DHTRandomRead {
/// Returns the number of elements in the DHTArray
/// This number will be >= 0 and <= DHTShortArray.maxElements (256)
/// Returns the number of elements in the DHT container
int get length;
/// Return the item at position 'pos' in the DHTArray. If 'forceRefresh'
/// Return the item at position 'pos' in the DHT container. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// * 'pos' must be >= 0 and < 'length'
/// Throws an IndexError if the 'pos' is not within the length
/// of the container.
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false});
/// Return a list of a range of items in the DHTArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// * 'start' must be >= 0
/// * 'len' must be >= 0 and <= DHTShortArray.maxElements (256) and defaults
/// to the maximum length
/// Throws an IndexError if either 'start' or '(start+length)' is not within
/// the length of the container.
Future<List<Uint8List>?> getItemRange(int start,
{int? length, bool forceRefresh = false});

View file

@ -6,8 +6,9 @@ import '../../../veilid_support.dart';
////////////////////////////////////////////////////////////////////////////
// Writer interface
// ignore: one_member_abstracts
abstract class DHTRandomWrite {
/// Try to set an item at position 'pos' of the DHTArray.
/// Try to set an item at position 'pos' of the DHT container.
/// If the set was successful this returns:
/// * A boolean true
/// * outValue will return the prior contents of the element,
@ -18,55 +19,10 @@ abstract class DHTRandomWrite {
/// * outValue will return the newer value of the element,
/// or null if the head record changed.
///
/// This may throw an exception if the position exceeds the built-in limit of
/// 'maxElements = 256' entries.
/// Throws an IndexError if the position is not within the length
/// of the container.
Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output});
/// Try to add an item to the end of the DHTArray. Return true if the
/// element was successfully added, and false if the state changed before
/// the element could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItem(Uint8List value);
/// Try to add a list of items to the end of the DHTArray. Return true if the
/// elements were successfully added, and false if the state changed before
/// the elements could be added or a newer value was found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryAddItems(List<Uint8List> values);
/// Try to insert an item as position 'pos' of the DHTArray.
/// Return true if the element was successfully inserted, and false if the
/// state changed before the element could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItem(int pos, Uint8List value);
/// Try to insert items at position 'pos' of the DHTArray.
/// Return true if the elements were successfully inserted, and false if the
/// state changed before the elements could be inserted or a newer value was
/// found on the network.
/// This may throw an exception if the number elements added exceeds the
/// built-in limit of 'maxElements = 256' entries.
Future<bool> tryInsertItems(int pos, List<Uint8List> values);
/// Swap items at position 'aPos' and 'bPos' in the DHTArray.
/// Throws IndexError if either of the positions swapped exceed
/// the length of the list
Future<void> swapItem(int aPos, int bPos);
/// Remove an item at position 'pos' in the DHTArray.
/// If the remove was successful this returns:
/// * outValue will return the prior contents of the element
/// Throws IndexError if the position removed exceeds the length of
/// the list.
Future<void> removeItem(int pos, {Output<Uint8List>? output});
/// Remove all items in the DHTShortArray.
Future<void> clear();
}
extension DHTRandomWriteExt on DHTRandomWrite {
@ -95,25 +51,4 @@ extension DHTRandomWriteExt on DHTRandomWrite {
output.mapSave(outValueBytes, fromBuffer);
return out;
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<void> removeItemJson<T>(T Function(dynamic) fromJson, int pos,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes);
output.mapSave(outValueBytes, (b) => jsonDecodeBytes(fromJson, b));
}
/// Convenience function:
/// Like removeItem but also parses the returned element as JSON
Future<void> removeItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, int pos,
{Output<T>? output}) async {
final outValueBytes = output == null ? null : Output<Uint8List>();
await removeItem(pos, output: outValueBytes);
output.mapSave(outValueBytes, fromBuffer);
}
}
abstract class DHTRandomReadWrite implements DHTRandomRead, DHTRandomWrite {}

View file

@ -0,0 +1,8 @@
////////////////////////////////////////////////////////////////////////////
// Truncate interface
// ignore: one_member_abstracts
abstract class DHTTruncate {
/// Remove items from the DHT container to shrink its size to 'newLength'
/// Throws StateError if newLength < 0
Future<void> truncate(int newLength);
}

View file

@ -1,4 +1,8 @@
export 'dht_append.dart';
export 'dht_clear.dart';
export 'dht_closeable.dart';
export 'dht_insert_remove.dart';
export 'dht_random_read.dart';
export 'dht_random_write.dart';
export 'dht_truncate.dart';
export 'exceptions.dart';