mirror of
https://gitlab.com/veilid/veilidchat.git
synced 2025-08-09 06:32:27 -04:00
dht work
This commit is contained in:
parent
57c366ef91
commit
c35056f687
39 changed files with 1382 additions and 662 deletions
226
lib/veilid_support/dht_support/dht_record.dart
Normal file
226
lib/veilid_support/dht_support/dht_record.dart
Normal file
|
@ -0,0 +1,226 @@
|
|||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
import 'package:veilid/veilid.dart';
|
||||
|
||||
import '../../tools/tools.dart';
|
||||
import '../veilid_support.dart';
|
||||
|
||||
class DHTRecord {
|
||||
DHTRecord(
|
||||
{required VeilidRoutingContext routingContext,
|
||||
required DHTRecordDescriptor recordDescriptor,
|
||||
int defaultSubkey = 0,
|
||||
KeyPair? writer,
|
||||
DHTRecordCrypto crypto = const DHTRecordCryptoPublic()})
|
||||
: _crypto = crypto,
|
||||
_routingContext = routingContext,
|
||||
_recordDescriptor = recordDescriptor,
|
||||
_defaultSubkey = defaultSubkey,
|
||||
_writer = writer,
|
||||
_open = false,
|
||||
_valid = true,
|
||||
_subkeySeqCache = {};
|
||||
final VeilidRoutingContext _routingContext;
|
||||
final DHTRecordDescriptor _recordDescriptor;
|
||||
final int _defaultSubkey;
|
||||
final KeyPair? _writer;
|
||||
final Map<int, int> _subkeySeqCache;
|
||||
final DHTRecordCrypto _crypto;
|
||||
bool _open;
|
||||
bool _valid;
|
||||
|
||||
int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey;
|
||||
|
||||
VeilidRoutingContext get routingContext => _routingContext;
|
||||
TypedKey get key => _recordDescriptor.key;
|
||||
PublicKey get owner => _recordDescriptor.owner;
|
||||
KeyPair? get ownerKeyPair => _recordDescriptor.ownerKeyPair();
|
||||
DHTSchema get schema => _recordDescriptor.schema;
|
||||
KeyPair? get writer => _writer;
|
||||
OwnedDHTRecordPointer get ownedDHTRecordPointer =>
|
||||
OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!);
|
||||
|
||||
Future<void> close() async {
|
||||
if (!_valid) {
|
||||
throw StateError('already deleted');
|
||||
}
|
||||
if (!_open) {
|
||||
return;
|
||||
}
|
||||
final pool = await DHTRecordPool.instance();
|
||||
await _routingContext.closeDHTRecord(_recordDescriptor.key);
|
||||
pool.recordClosed(this);
|
||||
_open = false;
|
||||
}
|
||||
|
||||
Future<void> delete() async {
|
||||
if (!_valid) {
|
||||
throw StateError('already deleted');
|
||||
}
|
||||
if (_open) {
|
||||
await close();
|
||||
}
|
||||
final pool = await DHTRecordPool.instance();
|
||||
await pool.deleteDeep(key);
|
||||
_valid = false;
|
||||
}
|
||||
|
||||
Future<T> scope<T>(FutureOr<T> Function(DHTRecord) scopeFunction) async {
|
||||
try {
|
||||
return await scopeFunction(this);
|
||||
} finally {
|
||||
await close();
|
||||
}
|
||||
}
|
||||
|
||||
Future<T> deleteScope<T>(
|
||||
FutureOr<T> Function(DHTRecord) scopeFunction) async {
|
||||
try {
|
||||
final out = await scopeFunction(this);
|
||||
await close();
|
||||
return out;
|
||||
} on Exception catch (_) {
|
||||
await delete();
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Uint8List?> get(
|
||||
{int subkey = -1,
|
||||
bool forceRefresh = false,
|
||||
bool onlyUpdates = false}) async {
|
||||
subkey = subkeyOrDefault(subkey);
|
||||
final valueData = await _routingContext.getDHTValue(
|
||||
_recordDescriptor.key, subkey, forceRefresh);
|
||||
if (valueData == null) {
|
||||
return null;
|
||||
}
|
||||
final lastSeq = _subkeySeqCache[subkey];
|
||||
if (lastSeq != null && valueData.seq <= lastSeq) {
|
||||
return null;
|
||||
}
|
||||
final out = _crypto.decrypt(valueData.data, subkey);
|
||||
_subkeySeqCache[subkey] = valueData.seq;
|
||||
return out;
|
||||
}
|
||||
|
||||
Future<T?> getJson<T>(T Function(dynamic) fromJson,
|
||||
{int subkey = -1,
|
||||
bool forceRefresh = false,
|
||||
bool onlyUpdates = false}) async {
|
||||
final data = await get(
|
||||
subkey: subkey, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
return jsonDecodeBytes(fromJson, data);
|
||||
}
|
||||
|
||||
Future<T?> getProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int> i) fromBuffer,
|
||||
{int subkey = -1,
|
||||
bool forceRefresh = false,
|
||||
bool onlyUpdates = false}) async {
|
||||
final data = await get(
|
||||
subkey: subkey, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
return fromBuffer(data.toList());
|
||||
}
|
||||
|
||||
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
|
||||
{int subkey = -1}) async {
|
||||
subkey = subkeyOrDefault(subkey);
|
||||
newValue = await _crypto.encrypt(newValue, subkey);
|
||||
|
||||
// Set the new data if possible
|
||||
final valueData = await _routingContext.setDHTValue(
|
||||
_recordDescriptor.key, subkey, newValue);
|
||||
if (valueData == null) {
|
||||
return null;
|
||||
}
|
||||
return valueData.data;
|
||||
}
|
||||
|
||||
Future<void> eventualWriteBytes(Uint8List newValue, {int subkey = -1}) async {
|
||||
subkey = subkeyOrDefault(subkey);
|
||||
newValue = await _crypto.encrypt(newValue, subkey);
|
||||
|
||||
ValueData? valueData;
|
||||
do {
|
||||
// Set the new data
|
||||
valueData = await _routingContext.setDHTValue(
|
||||
_recordDescriptor.key, subkey, newValue);
|
||||
|
||||
// Repeat if newer data on the network was found
|
||||
} while (valueData != null);
|
||||
}
|
||||
|
||||
Future<void> eventualUpdateBytes(
|
||||
Future<Uint8List> Function(Uint8List oldValue) update,
|
||||
{int subkey = -1}) async {
|
||||
subkey = subkeyOrDefault(subkey);
|
||||
// Get existing identity key, do not allow force refresh here
|
||||
// because if we need a refresh the setDHTValue will fail anyway
|
||||
var valueData =
|
||||
await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false);
|
||||
// Ensure it exists already
|
||||
if (valueData == null) {
|
||||
throw const FormatException('value does not exist');
|
||||
}
|
||||
do {
|
||||
// Update cache
|
||||
_subkeySeqCache[subkey] = valueData!.seq;
|
||||
|
||||
// Update the data
|
||||
final oldData = await _crypto.decrypt(valueData.data, subkey);
|
||||
final updatedData = await update(oldData);
|
||||
final newData = await _crypto.encrypt(updatedData, subkey);
|
||||
|
||||
// Set it back
|
||||
valueData = await _routingContext.setDHTValue(
|
||||
_recordDescriptor.key, subkey, newData);
|
||||
|
||||
// Repeat if newer data on the network was found
|
||||
} while (valueData != null);
|
||||
}
|
||||
|
||||
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
|
||||
{int subkey = -1}) =>
|
||||
tryWriteBytes(jsonEncodeBytes(newValue), subkey: subkey).then((out) {
|
||||
if (out == null) {
|
||||
return null;
|
||||
}
|
||||
return jsonDecodeBytes(fromJson, out);
|
||||
});
|
||||
|
||||
Future<T?> tryWriteProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int>) fromBuffer, T newValue,
|
||||
{int subkey = -1}) =>
|
||||
tryWriteBytes(newValue.writeToBuffer(), subkey: subkey).then((out) {
|
||||
if (out == null) {
|
||||
return null;
|
||||
}
|
||||
return fromBuffer(out);
|
||||
});
|
||||
|
||||
Future<void> eventualWriteJson<T>(T newValue, {int subkey = -1}) =>
|
||||
eventualWriteBytes(jsonEncodeBytes(newValue), subkey: subkey);
|
||||
|
||||
Future<void> eventualWriteProtobuf<T extends GeneratedMessage>(T newValue,
|
||||
{int subkey = -1}) =>
|
||||
eventualWriteBytes(newValue.writeToBuffer(), subkey: subkey);
|
||||
|
||||
Future<void> eventualUpdateJson<T>(
|
||||
T Function(dynamic) fromJson, Future<T> Function(T) update,
|
||||
{int subkey = -1}) =>
|
||||
eventualUpdateBytes(jsonUpdate(fromJson, update), subkey: subkey);
|
||||
|
||||
Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int>) fromBuffer, Future<T> Function(T) update,
|
||||
{int subkey = -1}) =>
|
||||
eventualUpdateBytes(protobufUpdate(fromBuffer, update), subkey: subkey);
|
||||
}
|
73
lib/veilid_support/dht_support/dht_record_crypto.dart
Normal file
73
lib/veilid_support/dht_support/dht_record_crypto.dart
Normal file
|
@ -0,0 +1,73 @@
|
|||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:veilid/veilid.dart';
|
||||
|
||||
import '../veilid_init.dart';
|
||||
|
||||
abstract class DHTRecordCrypto {
|
||||
FutureOr<Uint8List> encrypt(Uint8List data, int subkey);
|
||||
FutureOr<Uint8List> decrypt(Uint8List data, int subkey);
|
||||
}
|
||||
|
||||
////////////////////////////////////
|
||||
/// Private DHT Record: Encrypted for a specific symmetric key
|
||||
class DHTRecordCryptoPrivate implements DHTRecordCrypto {
|
||||
DHTRecordCryptoPrivate._(
|
||||
VeilidCryptoSystem cryptoSystem, SharedSecret secretKey)
|
||||
: _cryptoSystem = cryptoSystem,
|
||||
_secretKey = secretKey;
|
||||
final VeilidCryptoSystem _cryptoSystem;
|
||||
final SharedSecret _secretKey;
|
||||
|
||||
static Future<DHTRecordCryptoPrivate> fromTypedKeyPair(
|
||||
TypedKeyPair typedKeyPair) async {
|
||||
final veilid = await eventualVeilid.future;
|
||||
final cryptoSystem = await veilid.getCryptoSystem(typedKeyPair.kind);
|
||||
final secretKey = typedKeyPair.secret;
|
||||
return DHTRecordCryptoPrivate._(cryptoSystem, secretKey);
|
||||
}
|
||||
|
||||
static Future<DHTRecordCryptoPrivate> fromSecret(
|
||||
CryptoKind kind, SharedSecret secretKey) async {
|
||||
final veilid = await eventualVeilid.future;
|
||||
final cryptoSystem = await veilid.getCryptoSystem(kind);
|
||||
return DHTRecordCryptoPrivate._(cryptoSystem, secretKey);
|
||||
}
|
||||
|
||||
@override
|
||||
FutureOr<Uint8List> encrypt(Uint8List data, int subkey) async {
|
||||
// generate nonce
|
||||
final nonce = await _cryptoSystem.randomNonce();
|
||||
// crypt and append nonce
|
||||
final b = BytesBuilder()
|
||||
..add(await _cryptoSystem.cryptNoAuth(data, nonce, _secretKey))
|
||||
..add(nonce.decode());
|
||||
return b.toBytes();
|
||||
}
|
||||
|
||||
@override
|
||||
FutureOr<Uint8List> decrypt(Uint8List data, int subkey) async {
|
||||
// split off nonce from end
|
||||
if (data.length <= Nonce.decodedLength()) {
|
||||
throw const FormatException('not enough data to decrypt');
|
||||
}
|
||||
final nonce =
|
||||
Nonce.fromBytes(data.sublist(data.length - Nonce.decodedLength()));
|
||||
final encryptedData = data.sublist(0, data.length - Nonce.decodedLength());
|
||||
// decrypt
|
||||
return await _cryptoSystem.cryptNoAuth(encryptedData, nonce, _secretKey);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////
|
||||
/// Public DHT Record: No encryption
|
||||
class DHTRecordCryptoPublic implements DHTRecordCrypto {
|
||||
const DHTRecordCryptoPublic();
|
||||
|
||||
@override
|
||||
FutureOr<Uint8List> encrypt(Uint8List data, int subkey) => data;
|
||||
|
||||
@override
|
||||
FutureOr<Uint8List> decrypt(Uint8List data, int subkey) => data;
|
||||
}
|
272
lib/veilid_support/dht_support/dht_record_pool.dart
Normal file
272
lib/veilid_support/dht_support/dht_record_pool.dart
Normal file
|
@ -0,0 +1,272 @@
|
|||
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
|
||||
import 'package:freezed_annotation/freezed_annotation.dart';
|
||||
|
||||
import '../veilid_support.dart';
|
||||
|
||||
part 'dht_record_pool.freezed.dart';
|
||||
part 'dht_record_pool.g.dart';
|
||||
|
||||
/// Record pool that managed DHTRecords and allows for tagged deletion
|
||||
@freezed
|
||||
class DHTRecordPoolAllocations with _$DHTRecordPoolAllocations {
|
||||
const factory DHTRecordPoolAllocations({
|
||||
required IMap<TypedKey, ISet<TypedKey>> childrenByParent,
|
||||
required IMap<TypedKey, TypedKey> parentByChild,
|
||||
}) = _DHTRecordPoolAllocations;
|
||||
|
||||
factory DHTRecordPoolAllocations.fromJson(dynamic json) =>
|
||||
_$DHTRecordPoolAllocationsFromJson(json as Map<String, dynamic>);
|
||||
}
|
||||
|
||||
/// Pointer to an owned record, with key, owner key and owner secret
|
||||
/// Ensure that these are only serialized encrypted
|
||||
@freezed
|
||||
class OwnedDHTRecordPointer with _$OwnedDHTRecordPointer {
|
||||
const factory OwnedDHTRecordPointer({
|
||||
required TypedKey recordKey,
|
||||
required KeyPair owner,
|
||||
}) = _OwnedDHTRecordPointer;
|
||||
|
||||
factory OwnedDHTRecordPointer.fromJson(dynamic json) =>
|
||||
_$OwnedDHTRecordPointerFromJson(json as Map<String, dynamic>);
|
||||
}
|
||||
|
||||
class DHTRecordPool with AsyncTableDBBacked<DHTRecordPoolAllocations> {
|
||||
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
|
||||
: _state = DHTRecordPoolAllocations(
|
||||
childrenByParent: IMap(), parentByChild: IMap()),
|
||||
_opened = <TypedKey, DHTRecord>{},
|
||||
_routingContext = routingContext,
|
||||
_veilid = veilid;
|
||||
|
||||
// Persistent DHT record list
|
||||
DHTRecordPoolAllocations _state;
|
||||
// Which DHT records are currently open
|
||||
final Map<TypedKey, DHTRecord> _opened;
|
||||
// Default routing context to use for new keys
|
||||
final VeilidRoutingContext _routingContext;
|
||||
// Convenience accessor
|
||||
final Veilid _veilid;
|
||||
|
||||
static DHTRecordPool? _singleton;
|
||||
|
||||
//////////////////////////////////////////////////////////////
|
||||
/// AsyncTableDBBacked
|
||||
@override
|
||||
String tableName() => 'dht_record_pool';
|
||||
@override
|
||||
String tableKeyName() => 'pool_allocations';
|
||||
@override
|
||||
DHTRecordPoolAllocations valueFromJson(Object? obj) => obj != null
|
||||
? DHTRecordPoolAllocations.fromJson(obj)
|
||||
: DHTRecordPoolAllocations(
|
||||
childrenByParent: IMap(), parentByChild: IMap());
|
||||
@override
|
||||
Object? valueToJson(DHTRecordPoolAllocations val) => val.toJson();
|
||||
|
||||
//////////////////////////////////////////////////////////////
|
||||
|
||||
static Future<DHTRecordPool> instance() async {
|
||||
if (_singleton == null) {
|
||||
final veilid = await eventualVeilid.future;
|
||||
final routingContext = (await veilid.routingContext())
|
||||
.withPrivacy()
|
||||
.withSequencing(Sequencing.preferOrdered);
|
||||
|
||||
final globalPool = DHTRecordPool._(veilid, routingContext);
|
||||
globalPool._state = await globalPool.load();
|
||||
_singleton = globalPool;
|
||||
}
|
||||
return _singleton!;
|
||||
}
|
||||
|
||||
Veilid get veilid => _veilid;
|
||||
|
||||
void _recordOpened(DHTRecord record) {
|
||||
assert(!_opened.containsKey(record.key), 'record already opened');
|
||||
_opened[record.key] = record;
|
||||
}
|
||||
|
||||
void recordClosed(DHTRecord record) {
|
||||
assert(_opened.containsKey(record.key), 'record already closed');
|
||||
_opened.remove(record.key);
|
||||
}
|
||||
|
||||
Future<void> deleteDeep(TypedKey parent) async {
|
||||
// Collect all dependencies
|
||||
final allDeps = <TypedKey>[];
|
||||
final currentDeps = [parent];
|
||||
while (currentDeps.isNotEmpty) {
|
||||
final nextDep = currentDeps.removeLast();
|
||||
|
||||
// Remove this child from its parent
|
||||
_removeDependency(nextDep);
|
||||
|
||||
// Ensure all records are closed before delete
|
||||
assert(!_opened.containsKey(nextDep), 'should not delete opened record');
|
||||
|
||||
allDeps.add(nextDep);
|
||||
final childDeps = _state.childrenByParent[nextDep]?.toList() ?? [];
|
||||
currentDeps.addAll(childDeps);
|
||||
}
|
||||
|
||||
// Delete all records
|
||||
final allFutures = <Future<void>>[];
|
||||
for (final dep in allDeps) {
|
||||
allFutures.add(_routingContext.deleteDHTRecord(dep));
|
||||
}
|
||||
await Future.wait(allFutures);
|
||||
}
|
||||
|
||||
void _addDependency(TypedKey parent, TypedKey child) {
|
||||
final childrenOfParent =
|
||||
_state.childrenByParent[parent] ?? ISet<TypedKey>();
|
||||
if (childrenOfParent.contains(child)) {
|
||||
throw StateError('Dependency added twice: $parent -> $child');
|
||||
}
|
||||
if (_state.parentByChild.containsKey(child)) {
|
||||
throw StateError('Child has two parents: $child <- $parent');
|
||||
}
|
||||
if (_state.childrenByParent.containsKey(child)) {
|
||||
// dependencies should be opened after their parents
|
||||
throw StateError('Child is not a leaf: $child');
|
||||
}
|
||||
|
||||
_state = _state.copyWith(
|
||||
childrenByParent:
|
||||
_state.childrenByParent.add(parent, childrenOfParent.add(child)),
|
||||
parentByChild: _state.parentByChild.add(child, parent));
|
||||
}
|
||||
|
||||
void _removeDependency(TypedKey child) {
|
||||
final parent = _state.parentByChild[child];
|
||||
if (parent == null) {
|
||||
return;
|
||||
}
|
||||
final children = _state.childrenByParent[parent]!.remove(child);
|
||||
if (children.isEmpty) {
|
||||
_state = _state.copyWith(
|
||||
childrenByParent: _state.childrenByParent.remove(parent),
|
||||
parentByChild: _state.parentByChild.remove(child));
|
||||
} else {
|
||||
_state = _state.copyWith(
|
||||
childrenByParent: _state.childrenByParent.add(parent, children),
|
||||
parentByChild: _state.parentByChild.remove(child));
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Create a root DHTRecord that has no dependent records
|
||||
Future<DHTRecord> create(
|
||||
{VeilidRoutingContext? routingContext,
|
||||
TypedKey? parent,
|
||||
DHTSchema schema = const DHTSchema.dflt(oCnt: 1),
|
||||
int defaultSubkey = 0,
|
||||
DHTRecordCrypto? crypto}) async {
|
||||
final dhtctx = routingContext ?? _routingContext;
|
||||
final recordDescriptor = await dhtctx.createDHTRecord(schema);
|
||||
|
||||
final rec = DHTRecord(
|
||||
routingContext: dhtctx,
|
||||
recordDescriptor: recordDescriptor,
|
||||
defaultSubkey: defaultSubkey,
|
||||
writer: recordDescriptor.ownerKeyPair(),
|
||||
crypto: crypto ??
|
||||
await DHTRecordCryptoPrivate.fromTypedKeyPair(
|
||||
recordDescriptor.ownerTypedKeyPair()!));
|
||||
|
||||
if (parent != null) {
|
||||
_addDependency(parent, rec.key);
|
||||
}
|
||||
_recordOpened(rec);
|
||||
|
||||
return rec;
|
||||
}
|
||||
|
||||
/// Open a DHTRecord readonly
|
||||
Future<DHTRecord> openRead(TypedKey recordKey,
|
||||
{VeilidRoutingContext? routingContext,
|
||||
TypedKey? parent,
|
||||
int defaultSubkey = 0,
|
||||
DHTRecordCrypto? crypto}) async {
|
||||
// If we are opening a key that already exists
|
||||
// make sure we are using the same parent if one was specified
|
||||
final existingParent = _state.parentByChild[recordKey];
|
||||
assert(existingParent == parent, 'wrong parent for opened key');
|
||||
|
||||
// Open from the veilid api
|
||||
final dhtctx = routingContext ?? _routingContext;
|
||||
final recordDescriptor = await dhtctx.openDHTRecord(recordKey, null);
|
||||
final rec = DHTRecord(
|
||||
routingContext: dhtctx,
|
||||
recordDescriptor: recordDescriptor,
|
||||
defaultSubkey: defaultSubkey,
|
||||
crypto: crypto ?? const DHTRecordCryptoPublic());
|
||||
|
||||
// Register the dependency if specified
|
||||
if (parent != null) {
|
||||
_addDependency(parent, rec.key);
|
||||
}
|
||||
_recordOpened(rec);
|
||||
|
||||
return rec;
|
||||
}
|
||||
|
||||
/// Open a DHTRecord writable
|
||||
Future<DHTRecord> openWrite(
|
||||
TypedKey recordKey,
|
||||
KeyPair writer, {
|
||||
VeilidRoutingContext? routingContext,
|
||||
TypedKey? parent,
|
||||
int defaultSubkey = 0,
|
||||
DHTRecordCrypto? crypto,
|
||||
}) async {
|
||||
// If we are opening a key that already exists
|
||||
// make sure we are using the same parent if one was specified
|
||||
final existingParent = _state.parentByChild[recordKey];
|
||||
assert(existingParent == parent, 'wrong parent for opened key');
|
||||
|
||||
// Open from the veilid api
|
||||
final dhtctx = routingContext ?? _routingContext;
|
||||
final recordDescriptor = await dhtctx.openDHTRecord(recordKey, writer);
|
||||
final rec = DHTRecord(
|
||||
routingContext: dhtctx,
|
||||
recordDescriptor: recordDescriptor,
|
||||
defaultSubkey: defaultSubkey,
|
||||
writer: writer,
|
||||
crypto: crypto ??
|
||||
await DHTRecordCryptoPrivate.fromTypedKeyPair(
|
||||
TypedKeyPair.fromKeyPair(recordKey.kind, writer)));
|
||||
|
||||
// Register the dependency if specified
|
||||
if (parent != null) {
|
||||
_addDependency(parent, rec.key);
|
||||
}
|
||||
_recordOpened(rec);
|
||||
|
||||
return rec;
|
||||
}
|
||||
|
||||
/// Open a DHTRecord owned
|
||||
/// This is the same as writable but uses an OwnedDHTRecordPointer
|
||||
/// for convenience and uses symmetric encryption on the key
|
||||
/// This is primarily used for backing up private content on to the DHT
|
||||
/// to synchronizing it between devices. Because it is 'owned', the correct
|
||||
/// parent must be specified.
|
||||
Future<DHTRecord> openOwned(
|
||||
OwnedDHTRecordPointer ownedDHTRecordPointer, {
|
||||
required TypedKey parent,
|
||||
VeilidRoutingContext? routingContext,
|
||||
int defaultSubkey = 0,
|
||||
DHTRecordCrypto? crypto,
|
||||
}) =>
|
||||
openWrite(
|
||||
ownedDHTRecordPointer.recordKey,
|
||||
ownedDHTRecordPointer.owner,
|
||||
routingContext: routingContext,
|
||||
parent: parent,
|
||||
defaultSubkey: defaultSubkey,
|
||||
crypto: crypto,
|
||||
);
|
||||
}
|
357
lib/veilid_support/dht_support/dht_record_pool.freezed.dart
Normal file
357
lib/veilid_support/dht_support/dht_record_pool.freezed.dart
Normal file
|
@ -0,0 +1,357 @@
|
|||
// coverage:ignore-file
|
||||
// GENERATED CODE - DO NOT MODIFY BY HAND
|
||||
// ignore_for_file: type=lint
|
||||
// ignore_for_file: unused_element, deprecated_member_use, deprecated_member_use_from_same_package, use_function_type_syntax_for_parameters, unnecessary_const, avoid_init_to_null, invalid_override_different_default_values_named, prefer_expression_function_bodies, annotate_overrides, invalid_annotation_target, unnecessary_question_mark
|
||||
|
||||
part of 'dht_record_pool.dart';
|
||||
|
||||
// **************************************************************************
|
||||
// FreezedGenerator
|
||||
// **************************************************************************
|
||||
|
||||
T _$identity<T>(T value) => value;
|
||||
|
||||
final _privateConstructorUsedError = UnsupportedError(
|
||||
'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#custom-getters-and-methods');
|
||||
|
||||
DHTRecordPoolAllocations _$DHTRecordPoolAllocationsFromJson(
|
||||
Map<String, dynamic> json) {
|
||||
return _DHTRecordPoolAllocations.fromJson(json);
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
mixin _$DHTRecordPoolAllocations {
|
||||
IMap<Typed<FixedEncodedString43>, ISet<Typed<FixedEncodedString43>>>
|
||||
get childrenByParent => throw _privateConstructorUsedError;
|
||||
IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>
|
||||
get parentByChild => throw _privateConstructorUsedError;
|
||||
|
||||
Map<String, dynamic> toJson() => throw _privateConstructorUsedError;
|
||||
@JsonKey(ignore: true)
|
||||
$DHTRecordPoolAllocationsCopyWith<DHTRecordPoolAllocations> get copyWith =>
|
||||
throw _privateConstructorUsedError;
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
abstract class $DHTRecordPoolAllocationsCopyWith<$Res> {
|
||||
factory $DHTRecordPoolAllocationsCopyWith(DHTRecordPoolAllocations value,
|
||||
$Res Function(DHTRecordPoolAllocations) then) =
|
||||
_$DHTRecordPoolAllocationsCopyWithImpl<$Res, DHTRecordPoolAllocations>;
|
||||
@useResult
|
||||
$Res call(
|
||||
{IMap<Typed<FixedEncodedString43>, ISet<Typed<FixedEncodedString43>>>
|
||||
childrenByParent,
|
||||
IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>
|
||||
parentByChild});
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
class _$DHTRecordPoolAllocationsCopyWithImpl<$Res,
|
||||
$Val extends DHTRecordPoolAllocations>
|
||||
implements $DHTRecordPoolAllocationsCopyWith<$Res> {
|
||||
_$DHTRecordPoolAllocationsCopyWithImpl(this._value, this._then);
|
||||
|
||||
// ignore: unused_field
|
||||
final $Val _value;
|
||||
// ignore: unused_field
|
||||
final $Res Function($Val) _then;
|
||||
|
||||
@pragma('vm:prefer-inline')
|
||||
@override
|
||||
$Res call({
|
||||
Object? childrenByParent = null,
|
||||
Object? parentByChild = null,
|
||||
}) {
|
||||
return _then(_value.copyWith(
|
||||
childrenByParent: null == childrenByParent
|
||||
? _value.childrenByParent
|
||||
: childrenByParent // ignore: cast_nullable_to_non_nullable
|
||||
as IMap<Typed<FixedEncodedString43>,
|
||||
ISet<Typed<FixedEncodedString43>>>,
|
||||
parentByChild: null == parentByChild
|
||||
? _value.parentByChild
|
||||
: parentByChild // ignore: cast_nullable_to_non_nullable
|
||||
as IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>,
|
||||
) as $Val);
|
||||
}
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
abstract class _$$_DHTRecordPoolAllocationsCopyWith<$Res>
|
||||
implements $DHTRecordPoolAllocationsCopyWith<$Res> {
|
||||
factory _$$_DHTRecordPoolAllocationsCopyWith(
|
||||
_$_DHTRecordPoolAllocations value,
|
||||
$Res Function(_$_DHTRecordPoolAllocations) then) =
|
||||
__$$_DHTRecordPoolAllocationsCopyWithImpl<$Res>;
|
||||
@override
|
||||
@useResult
|
||||
$Res call(
|
||||
{IMap<Typed<FixedEncodedString43>, ISet<Typed<FixedEncodedString43>>>
|
||||
childrenByParent,
|
||||
IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>
|
||||
parentByChild});
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
class __$$_DHTRecordPoolAllocationsCopyWithImpl<$Res>
|
||||
extends _$DHTRecordPoolAllocationsCopyWithImpl<$Res,
|
||||
_$_DHTRecordPoolAllocations>
|
||||
implements _$$_DHTRecordPoolAllocationsCopyWith<$Res> {
|
||||
__$$_DHTRecordPoolAllocationsCopyWithImpl(_$_DHTRecordPoolAllocations _value,
|
||||
$Res Function(_$_DHTRecordPoolAllocations) _then)
|
||||
: super(_value, _then);
|
||||
|
||||
@pragma('vm:prefer-inline')
|
||||
@override
|
||||
$Res call({
|
||||
Object? childrenByParent = null,
|
||||
Object? parentByChild = null,
|
||||
}) {
|
||||
return _then(_$_DHTRecordPoolAllocations(
|
||||
childrenByParent: null == childrenByParent
|
||||
? _value.childrenByParent
|
||||
: childrenByParent // ignore: cast_nullable_to_non_nullable
|
||||
as IMap<Typed<FixedEncodedString43>,
|
||||
ISet<Typed<FixedEncodedString43>>>,
|
||||
parentByChild: null == parentByChild
|
||||
? _value.parentByChild
|
||||
: parentByChild // ignore: cast_nullable_to_non_nullable
|
||||
as IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
@JsonSerializable()
|
||||
class _$_DHTRecordPoolAllocations implements _DHTRecordPoolAllocations {
|
||||
const _$_DHTRecordPoolAllocations(
|
||||
{required this.childrenByParent, required this.parentByChild});
|
||||
|
||||
factory _$_DHTRecordPoolAllocations.fromJson(Map<String, dynamic> json) =>
|
||||
_$$_DHTRecordPoolAllocationsFromJson(json);
|
||||
|
||||
@override
|
||||
final IMap<Typed<FixedEncodedString43>, ISet<Typed<FixedEncodedString43>>>
|
||||
childrenByParent;
|
||||
@override
|
||||
final IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>
|
||||
parentByChild;
|
||||
|
||||
@override
|
||||
String toString() {
|
||||
return 'DHTRecordPoolAllocations(childrenByParent: $childrenByParent, parentByChild: $parentByChild)';
|
||||
}
|
||||
|
||||
@override
|
||||
bool operator ==(dynamic other) {
|
||||
return identical(this, other) ||
|
||||
(other.runtimeType == runtimeType &&
|
||||
other is _$_DHTRecordPoolAllocations &&
|
||||
(identical(other.childrenByParent, childrenByParent) ||
|
||||
other.childrenByParent == childrenByParent) &&
|
||||
(identical(other.parentByChild, parentByChild) ||
|
||||
other.parentByChild == parentByChild));
|
||||
}
|
||||
|
||||
@JsonKey(ignore: true)
|
||||
@override
|
||||
int get hashCode => Object.hash(runtimeType, childrenByParent, parentByChild);
|
||||
|
||||
@JsonKey(ignore: true)
|
||||
@override
|
||||
@pragma('vm:prefer-inline')
|
||||
_$$_DHTRecordPoolAllocationsCopyWith<_$_DHTRecordPoolAllocations>
|
||||
get copyWith => __$$_DHTRecordPoolAllocationsCopyWithImpl<
|
||||
_$_DHTRecordPoolAllocations>(this, _$identity);
|
||||
|
||||
@override
|
||||
Map<String, dynamic> toJson() {
|
||||
return _$$_DHTRecordPoolAllocationsToJson(
|
||||
this,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
abstract class _DHTRecordPoolAllocations implements DHTRecordPoolAllocations {
|
||||
const factory _DHTRecordPoolAllocations(
|
||||
{required final IMap<Typed<FixedEncodedString43>,
|
||||
ISet<Typed<FixedEncodedString43>>>
|
||||
childrenByParent,
|
||||
required final IMap<Typed<FixedEncodedString43>,
|
||||
Typed<FixedEncodedString43>>
|
||||
parentByChild}) = _$_DHTRecordPoolAllocations;
|
||||
|
||||
factory _DHTRecordPoolAllocations.fromJson(Map<String, dynamic> json) =
|
||||
_$_DHTRecordPoolAllocations.fromJson;
|
||||
|
||||
@override
|
||||
IMap<Typed<FixedEncodedString43>, ISet<Typed<FixedEncodedString43>>>
|
||||
get childrenByParent;
|
||||
@override
|
||||
IMap<Typed<FixedEncodedString43>, Typed<FixedEncodedString43>>
|
||||
get parentByChild;
|
||||
@override
|
||||
@JsonKey(ignore: true)
|
||||
_$$_DHTRecordPoolAllocationsCopyWith<_$_DHTRecordPoolAllocations>
|
||||
get copyWith => throw _privateConstructorUsedError;
|
||||
}
|
||||
|
||||
OwnedDHTRecordPointer _$OwnedDHTRecordPointerFromJson(
|
||||
Map<String, dynamic> json) {
|
||||
return _OwnedDHTRecordPointer.fromJson(json);
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
mixin _$OwnedDHTRecordPointer {
|
||||
Typed<FixedEncodedString43> get recordKey =>
|
||||
throw _privateConstructorUsedError;
|
||||
KeyPair get owner => throw _privateConstructorUsedError;
|
||||
|
||||
Map<String, dynamic> toJson() => throw _privateConstructorUsedError;
|
||||
@JsonKey(ignore: true)
|
||||
$OwnedDHTRecordPointerCopyWith<OwnedDHTRecordPointer> get copyWith =>
|
||||
throw _privateConstructorUsedError;
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
abstract class $OwnedDHTRecordPointerCopyWith<$Res> {
|
||||
factory $OwnedDHTRecordPointerCopyWith(OwnedDHTRecordPointer value,
|
||||
$Res Function(OwnedDHTRecordPointer) then) =
|
||||
_$OwnedDHTRecordPointerCopyWithImpl<$Res, OwnedDHTRecordPointer>;
|
||||
@useResult
|
||||
$Res call({Typed<FixedEncodedString43> recordKey, KeyPair owner});
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
class _$OwnedDHTRecordPointerCopyWithImpl<$Res,
|
||||
$Val extends OwnedDHTRecordPointer>
|
||||
implements $OwnedDHTRecordPointerCopyWith<$Res> {
|
||||
_$OwnedDHTRecordPointerCopyWithImpl(this._value, this._then);
|
||||
|
||||
// ignore: unused_field
|
||||
final $Val _value;
|
||||
// ignore: unused_field
|
||||
final $Res Function($Val) _then;
|
||||
|
||||
@pragma('vm:prefer-inline')
|
||||
@override
|
||||
$Res call({
|
||||
Object? recordKey = null,
|
||||
Object? owner = null,
|
||||
}) {
|
||||
return _then(_value.copyWith(
|
||||
recordKey: null == recordKey
|
||||
? _value.recordKey
|
||||
: recordKey // ignore: cast_nullable_to_non_nullable
|
||||
as Typed<FixedEncodedString43>,
|
||||
owner: null == owner
|
||||
? _value.owner
|
||||
: owner // ignore: cast_nullable_to_non_nullable
|
||||
as KeyPair,
|
||||
) as $Val);
|
||||
}
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
abstract class _$$_OwnedDHTRecordPointerCopyWith<$Res>
|
||||
implements $OwnedDHTRecordPointerCopyWith<$Res> {
|
||||
factory _$$_OwnedDHTRecordPointerCopyWith(_$_OwnedDHTRecordPointer value,
|
||||
$Res Function(_$_OwnedDHTRecordPointer) then) =
|
||||
__$$_OwnedDHTRecordPointerCopyWithImpl<$Res>;
|
||||
@override
|
||||
@useResult
|
||||
$Res call({Typed<FixedEncodedString43> recordKey, KeyPair owner});
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
class __$$_OwnedDHTRecordPointerCopyWithImpl<$Res>
|
||||
extends _$OwnedDHTRecordPointerCopyWithImpl<$Res, _$_OwnedDHTRecordPointer>
|
||||
implements _$$_OwnedDHTRecordPointerCopyWith<$Res> {
|
||||
__$$_OwnedDHTRecordPointerCopyWithImpl(_$_OwnedDHTRecordPointer _value,
|
||||
$Res Function(_$_OwnedDHTRecordPointer) _then)
|
||||
: super(_value, _then);
|
||||
|
||||
@pragma('vm:prefer-inline')
|
||||
@override
|
||||
$Res call({
|
||||
Object? recordKey = null,
|
||||
Object? owner = null,
|
||||
}) {
|
||||
return _then(_$_OwnedDHTRecordPointer(
|
||||
recordKey: null == recordKey
|
||||
? _value.recordKey
|
||||
: recordKey // ignore: cast_nullable_to_non_nullable
|
||||
as Typed<FixedEncodedString43>,
|
||||
owner: null == owner
|
||||
? _value.owner
|
||||
: owner // ignore: cast_nullable_to_non_nullable
|
||||
as KeyPair,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// @nodoc
|
||||
@JsonSerializable()
|
||||
class _$_OwnedDHTRecordPointer implements _OwnedDHTRecordPointer {
|
||||
const _$_OwnedDHTRecordPointer(
|
||||
{required this.recordKey, required this.owner});
|
||||
|
||||
factory _$_OwnedDHTRecordPointer.fromJson(Map<String, dynamic> json) =>
|
||||
_$$_OwnedDHTRecordPointerFromJson(json);
|
||||
|
||||
@override
|
||||
final Typed<FixedEncodedString43> recordKey;
|
||||
@override
|
||||
final KeyPair owner;
|
||||
|
||||
@override
|
||||
String toString() {
|
||||
return 'OwnedDHTRecordPointer(recordKey: $recordKey, owner: $owner)';
|
||||
}
|
||||
|
||||
@override
|
||||
bool operator ==(dynamic other) {
|
||||
return identical(this, other) ||
|
||||
(other.runtimeType == runtimeType &&
|
||||
other is _$_OwnedDHTRecordPointer &&
|
||||
(identical(other.recordKey, recordKey) ||
|
||||
other.recordKey == recordKey) &&
|
||||
(identical(other.owner, owner) || other.owner == owner));
|
||||
}
|
||||
|
||||
@JsonKey(ignore: true)
|
||||
@override
|
||||
int get hashCode => Object.hash(runtimeType, recordKey, owner);
|
||||
|
||||
@JsonKey(ignore: true)
|
||||
@override
|
||||
@pragma('vm:prefer-inline')
|
||||
_$$_OwnedDHTRecordPointerCopyWith<_$_OwnedDHTRecordPointer> get copyWith =>
|
||||
__$$_OwnedDHTRecordPointerCopyWithImpl<_$_OwnedDHTRecordPointer>(
|
||||
this, _$identity);
|
||||
|
||||
@override
|
||||
Map<String, dynamic> toJson() {
|
||||
return _$$_OwnedDHTRecordPointerToJson(
|
||||
this,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
abstract class _OwnedDHTRecordPointer implements OwnedDHTRecordPointer {
|
||||
const factory _OwnedDHTRecordPointer(
|
||||
{required final Typed<FixedEncodedString43> recordKey,
|
||||
required final KeyPair owner}) = _$_OwnedDHTRecordPointer;
|
||||
|
||||
factory _OwnedDHTRecordPointer.fromJson(Map<String, dynamic> json) =
|
||||
_$_OwnedDHTRecordPointer.fromJson;
|
||||
|
||||
@override
|
||||
Typed<FixedEncodedString43> get recordKey;
|
||||
@override
|
||||
KeyPair get owner;
|
||||
@override
|
||||
@JsonKey(ignore: true)
|
||||
_$$_OwnedDHTRecordPointerCopyWith<_$_OwnedDHTRecordPointer> get copyWith =>
|
||||
throw _privateConstructorUsedError;
|
||||
}
|
52
lib/veilid_support/dht_support/dht_record_pool.g.dart
Normal file
52
lib/veilid_support/dht_support/dht_record_pool.g.dart
Normal file
|
@ -0,0 +1,52 @@
|
|||
// GENERATED CODE - DO NOT MODIFY BY HAND
|
||||
|
||||
part of 'dht_record_pool.dart';
|
||||
|
||||
// **************************************************************************
|
||||
// JsonSerializableGenerator
|
||||
// **************************************************************************
|
||||
|
||||
_$_DHTRecordPoolAllocations _$$_DHTRecordPoolAllocationsFromJson(
|
||||
Map<String, dynamic> json) =>
|
||||
_$_DHTRecordPoolAllocations(
|
||||
childrenByParent: IMap<Typed<FixedEncodedString43>,
|
||||
ISet<Typed<FixedEncodedString43>>>.fromJson(
|
||||
json['children_by_parent'] as Map<String, dynamic>,
|
||||
(value) => Typed<FixedEncodedString43>.fromJson(value),
|
||||
(value) => ISet<Typed<FixedEncodedString43>>.fromJson(
|
||||
value, (value) => Typed<FixedEncodedString43>.fromJson(value))),
|
||||
parentByChild: IMap<Typed<FixedEncodedString43>,
|
||||
Typed<FixedEncodedString43>>.fromJson(
|
||||
json['parent_by_child'] as Map<String, dynamic>,
|
||||
(value) => Typed<FixedEncodedString43>.fromJson(value),
|
||||
(value) => Typed<FixedEncodedString43>.fromJson(value)),
|
||||
);
|
||||
|
||||
Map<String, dynamic> _$$_DHTRecordPoolAllocationsToJson(
|
||||
_$_DHTRecordPoolAllocations instance) =>
|
||||
<String, dynamic>{
|
||||
'children_by_parent': instance.childrenByParent.toJson(
|
||||
(value) => value.toJson(),
|
||||
(value) => value.toJson(
|
||||
(value) => value.toJson(),
|
||||
),
|
||||
),
|
||||
'parent_by_child': instance.parentByChild.toJson(
|
||||
(value) => value.toJson(),
|
||||
(value) => value.toJson(),
|
||||
),
|
||||
};
|
||||
|
||||
_$_OwnedDHTRecordPointer _$$_OwnedDHTRecordPointerFromJson(
|
||||
Map<String, dynamic> json) =>
|
||||
_$_OwnedDHTRecordPointer(
|
||||
recordKey: Typed<FixedEncodedString43>.fromJson(json['record_key']),
|
||||
owner: KeyPair.fromJson(json['owner']),
|
||||
);
|
||||
|
||||
Map<String, dynamic> _$$_OwnedDHTRecordPointerToJson(
|
||||
_$_OwnedDHTRecordPointer instance) =>
|
||||
<String, dynamic>{
|
||||
'record_key': instance.recordKey.toJson(),
|
||||
'owner': instance.owner.toJson(),
|
||||
};
|
580
lib/veilid_support/dht_support/dht_short_array.dart
Normal file
580
lib/veilid_support/dht_support/dht_short_array.dart
Normal file
|
@ -0,0 +1,580 @@
|
|||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
|
||||
import '../../entities/proto.dart' as proto;
|
||||
import '../../tools/tools.dart';
|
||||
import '../veilid_support.dart';
|
||||
|
||||
class _DHTShortArrayCache {
|
||||
_DHTShortArrayCache()
|
||||
: linkedRecords = List<DHTRecord>.empty(growable: true),
|
||||
index = List<int>.empty(growable: true),
|
||||
free = List<int>.empty(growable: true);
|
||||
_DHTShortArrayCache.from(_DHTShortArrayCache other)
|
||||
: linkedRecords = List.of(other.linkedRecords),
|
||||
index = List.of(other.index),
|
||||
free = List.of(other.free);
|
||||
|
||||
final List<DHTRecord> linkedRecords;
|
||||
final List<int> index;
|
||||
final List<int> free;
|
||||
}
|
||||
|
||||
class DHTShortArray {
|
||||
DHTShortArray._({required DHTRecord headRecord})
|
||||
: _headRecord = headRecord,
|
||||
_head = _DHTShortArrayCache() {
|
||||
late final int stride;
|
||||
switch (headRecord.schema) {
|
||||
case DHTSchemaDFLT(oCnt: final oCnt):
|
||||
stride = oCnt - 1;
|
||||
if (stride <= 0) {
|
||||
throw StateError('Invalid stride in DHTShortArray');
|
||||
}
|
||||
case DHTSchemaSMPL():
|
||||
throw StateError('Wrote kind of DHT record for DHTShortArray');
|
||||
}
|
||||
assert(stride <= maxElements, 'stride too long');
|
||||
_stride = stride;
|
||||
}
|
||||
|
||||
static const maxElements = 256;
|
||||
|
||||
// Head DHT record
|
||||
final DHTRecord _headRecord;
|
||||
late final int _stride;
|
||||
|
||||
// Cached representation refreshed from head record
|
||||
_DHTShortArrayCache _head;
|
||||
|
||||
static Future<DHTShortArray> create(
|
||||
{int stride = maxElements,
|
||||
VeilidRoutingContext? routingContext,
|
||||
TypedKey? parent,
|
||||
DHTRecordCrypto? crypto}) async {
|
||||
assert(stride <= maxElements, 'stride too long');
|
||||
final pool = await DHTRecordPool.instance();
|
||||
|
||||
final dhtRecord = await pool.create(
|
||||
parent: parent,
|
||||
routingContext: routingContext,
|
||||
schema: DHTSchema.dflt(oCnt: stride + 1),
|
||||
crypto: crypto);
|
||||
try {
|
||||
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
|
||||
return dhtShortArray;
|
||||
} on Exception catch (_) {
|
||||
await dhtRecord.delete();
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
static Future<DHTShortArray> openRead(TypedKey headRecordKey,
|
||||
{VeilidRoutingContext? routingContext,
|
||||
TypedKey? parent,
|
||||
DHTRecordCrypto? crypto}) async {
|
||||
final pool = await DHTRecordPool.instance();
|
||||
|
||||
final dhtRecord = await pool.openRead(headRecordKey,
|
||||
parent: parent, routingContext: routingContext, crypto: crypto);
|
||||
try {
|
||||
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
|
||||
await dhtShortArray._refreshHead();
|
||||
return dhtShortArray;
|
||||
} on Exception catch (_) {
|
||||
await dhtRecord.close();
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
static Future<DHTShortArray> openWrite(
|
||||
TypedKey headRecordKey,
|
||||
KeyPair writer, {
|
||||
VeilidRoutingContext? routingContext,
|
||||
TypedKey? parent,
|
||||
DHTRecordCrypto? crypto,
|
||||
}) async {
|
||||
final pool = await DHTRecordPool.instance();
|
||||
final dhtRecord = await pool.openWrite(headRecordKey, writer,
|
||||
parent: parent, routingContext: routingContext, crypto: crypto);
|
||||
try {
|
||||
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
|
||||
await dhtShortArray._refreshHead();
|
||||
return dhtShortArray;
|
||||
} on Exception catch (_) {
|
||||
await dhtRecord.close();
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
static Future<DHTShortArray> openOwned(
|
||||
OwnedDHTRecordPointer ownedDHTRecordPointer, {
|
||||
required TypedKey parent,
|
||||
VeilidRoutingContext? routingContext,
|
||||
DHTRecordCrypto? crypto,
|
||||
}) =>
|
||||
openWrite(
|
||||
ownedDHTRecordPointer.recordKey,
|
||||
ownedDHTRecordPointer.owner,
|
||||
routingContext: routingContext,
|
||||
parent: parent,
|
||||
crypto: crypto,
|
||||
);
|
||||
|
||||
DHTRecord get record => _headRecord;
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Seralize and write out the current head record, possibly updating it
|
||||
/// if a newer copy is available online. Returns true if the write was
|
||||
/// successful
|
||||
Future<bool> _tryWriteHead() async {
|
||||
final head = proto.DHTShortArray();
|
||||
head.keys.addAll(_head.linkedRecords.map((lr) => lr.key.toProto()));
|
||||
head.index.addAll(_head.index);
|
||||
final headBuffer = head.writeToBuffer();
|
||||
|
||||
final existingData = await _headRecord.tryWriteBytes(headBuffer);
|
||||
if (existingData != null) {
|
||||
// Head write failed, incorporate update
|
||||
await _newHead(proto.DHTShortArray.fromBuffer(existingData));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Validate the head from the DHT is properly formatted
|
||||
/// and calculate the free list from it while we're here
|
||||
List<int> _validateHeadCacheData(
|
||||
List<Typed<FixedEncodedString43>> linkedKeys, List<int> index) {
|
||||
// Ensure nothing is duplicated in the linked keys set
|
||||
final newKeys = linkedKeys.toSet();
|
||||
assert(newKeys.length <= (maxElements + (_stride - 1)) ~/ _stride,
|
||||
'too many keys');
|
||||
assert(newKeys.length == linkedKeys.length, 'duplicated linked keys');
|
||||
final newIndex = index.toSet();
|
||||
assert(newIndex.length <= maxElements, 'too many indexes');
|
||||
assert(newIndex.length == index.length, 'duplicated index locations');
|
||||
// Ensure all the index keys fit into the existing records
|
||||
final indexCapacity = (linkedKeys.length + 1) * _stride;
|
||||
int? maxIndex;
|
||||
for (final idx in newIndex) {
|
||||
assert(idx >= 0 || idx < indexCapacity, 'index out of range');
|
||||
if (maxIndex == null || idx > maxIndex) {
|
||||
maxIndex = idx;
|
||||
}
|
||||
}
|
||||
final free = <int>[];
|
||||
if (maxIndex != null) {
|
||||
for (var i = 0; i < maxIndex; i++) {
|
||||
if (!newIndex.contains(i)) {
|
||||
free.add(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return free;
|
||||
}
|
||||
|
||||
/// Open a linked record for reading or writing, same as the head record
|
||||
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
|
||||
final pool = await DHTRecordPool.instance();
|
||||
|
||||
final writer = _headRecord.writer;
|
||||
return (writer != null)
|
||||
? await pool.openWrite(
|
||||
recordKey,
|
||||
writer,
|
||||
parent: _headRecord.key,
|
||||
routingContext: _headRecord.routingContext,
|
||||
)
|
||||
: await pool.openRead(
|
||||
recordKey,
|
||||
parent: _headRecord.key,
|
||||
routingContext: _headRecord.routingContext,
|
||||
);
|
||||
}
|
||||
|
||||
/// Validate a new head record
|
||||
Future<void> _newHead(proto.DHTShortArray head) async {
|
||||
// Get the set of new linked keys and validate it
|
||||
final linkedKeys = head.keys.map(proto.TypedKeyProto.fromProto).toList();
|
||||
final index = head.index;
|
||||
final free = _validateHeadCacheData(linkedKeys, index);
|
||||
|
||||
// See which records are actually new
|
||||
final oldRecords = Map<TypedKey, DHTRecord>.fromEntries(
|
||||
_head.linkedRecords.map((lr) => MapEntry(lr.key, lr)));
|
||||
final newRecords = <TypedKey, DHTRecord>{};
|
||||
final sameRecords = <TypedKey, DHTRecord>{};
|
||||
try {
|
||||
for (var n = 0; n < linkedKeys.length; n++) {
|
||||
final newKey = linkedKeys[n];
|
||||
final oldRecord = oldRecords[newKey];
|
||||
if (oldRecord == null) {
|
||||
// Open the new record
|
||||
final newRecord = await _openLinkedRecord(newKey);
|
||||
newRecords[newKey] = newRecord;
|
||||
} else {
|
||||
sameRecords[newKey] = oldRecord;
|
||||
}
|
||||
}
|
||||
} on Exception catch (_) {
|
||||
// On any exception close the records we have opened
|
||||
await Future.wait(newRecords.entries.map((e) => e.value.close()));
|
||||
rethrow;
|
||||
}
|
||||
|
||||
// From this point forward we should not throw an exception or everything
|
||||
// is possibly invalid. Just pass the exception up it happens and the caller
|
||||
// will have to delete this short array and reopen it if it can
|
||||
await Future.wait(oldRecords.entries
|
||||
.where((e) => !sameRecords.containsKey(e.key))
|
||||
.map((e) => e.value.close()));
|
||||
|
||||
// Figure out which indices are free
|
||||
|
||||
// Make the new head cache
|
||||
_head = _DHTShortArrayCache()
|
||||
..linkedRecords.addAll(
|
||||
linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!))
|
||||
..index.addAll(index)
|
||||
..free.addAll(free);
|
||||
}
|
||||
|
||||
/// Pull the latest or updated copy of the head record from the network
|
||||
Future<bool> _refreshHead(
|
||||
{bool forceRefresh = false, bool onlyUpdates = false}) async {
|
||||
// Get an updated head record copy if one exists
|
||||
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
|
||||
forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
|
||||
if (head == null) {
|
||||
if (onlyUpdates) {
|
||||
// No update
|
||||
return false;
|
||||
}
|
||||
throw StateError('head missing during refresh');
|
||||
}
|
||||
|
||||
await _newHead(head);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
||||
Future<void> close() async {
|
||||
final futures = <Future<void>>[_headRecord.close()];
|
||||
for (final lr in _head.linkedRecords) {
|
||||
futures.add(lr.close());
|
||||
}
|
||||
await Future.wait(futures);
|
||||
}
|
||||
|
||||
Future<void> delete() async {
|
||||
final futures = <Future<void>>[_headRecord.close()];
|
||||
for (final lr in _head.linkedRecords) {
|
||||
futures.add(lr.delete());
|
||||
}
|
||||
await Future.wait(futures);
|
||||
}
|
||||
|
||||
Future<T> scope<T>(FutureOr<T> Function(DHTShortArray) scopeFunction) async {
|
||||
try {
|
||||
return await scopeFunction(this);
|
||||
} finally {
|
||||
await close();
|
||||
}
|
||||
}
|
||||
|
||||
Future<T> deleteScope<T>(
|
||||
FutureOr<T> Function(DHTShortArray) scopeFunction) async {
|
||||
try {
|
||||
final out = await scopeFunction(this);
|
||||
await close();
|
||||
return out;
|
||||
} on Exception catch (_) {
|
||||
await delete();
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
DHTRecord? _getRecord(int recordNumber) {
|
||||
if (recordNumber == 0) {
|
||||
return _headRecord;
|
||||
}
|
||||
recordNumber--;
|
||||
if (recordNumber >= _head.linkedRecords.length) {
|
||||
return null;
|
||||
}
|
||||
return _head.linkedRecords[recordNumber];
|
||||
}
|
||||
|
||||
int _emptyIndex() {
|
||||
if (_head.free.isNotEmpty) {
|
||||
return _head.free.removeLast();
|
||||
}
|
||||
if (_head.index.length == maxElements) {
|
||||
throw StateError('too many elements');
|
||||
}
|
||||
return _head.index.length;
|
||||
}
|
||||
|
||||
void _freeIndex(int idx) {
|
||||
_head.free.add(idx);
|
||||
// xxx: free list optimization here?
|
||||
}
|
||||
|
||||
int length() => _head.index.length;
|
||||
|
||||
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async {
|
||||
await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true);
|
||||
|
||||
if (pos < 0 || pos >= _head.index.length) {
|
||||
throw IndexError.withLength(pos, _head.index.length);
|
||||
}
|
||||
final index = _head.index[pos];
|
||||
final recordNumber = index ~/ _stride;
|
||||
final record = _getRecord(recordNumber);
|
||||
assert(record != null, 'Record does not exist');
|
||||
|
||||
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
|
||||
return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh);
|
||||
}
|
||||
|
||||
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
|
||||
{bool forceRefresh = false}) =>
|
||||
getItem(pos, forceRefresh: forceRefresh)
|
||||
.then((out) => jsonDecodeOptBytes(fromJson, out));
|
||||
|
||||
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int>) fromBuffer, int pos,
|
||||
{bool forceRefresh = false}) =>
|
||||
getItem(pos, forceRefresh: forceRefresh)
|
||||
.then((out) => (out == null) ? null : fromBuffer(out));
|
||||
|
||||
Future<bool> tryAddItem(Uint8List value) async {
|
||||
await _refreshHead(onlyUpdates: true);
|
||||
|
||||
final oldHead = _DHTShortArrayCache.from(_head);
|
||||
late final int pos;
|
||||
try {
|
||||
// Allocate empty index
|
||||
final idx = _emptyIndex();
|
||||
// Add new index
|
||||
pos = _head.index.length;
|
||||
_head.index.add(idx);
|
||||
|
||||
// Write new head
|
||||
if (!await _tryWriteHead()) {
|
||||
// Failed to write head means head got overwritten
|
||||
return false;
|
||||
}
|
||||
} on Exception catch (_) {
|
||||
// Exception on write means state needs to be reverted
|
||||
_head = oldHead;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Head write succeeded, now write item
|
||||
await eventualWriteItem(pos, value);
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<bool> tryInsertItem(int pos, Uint8List value) async {
|
||||
await _refreshHead(onlyUpdates: true);
|
||||
|
||||
final oldHead = _DHTShortArrayCache.from(_head);
|
||||
try {
|
||||
// Allocate empty index
|
||||
final idx = _emptyIndex();
|
||||
// Add new index
|
||||
_head.index.insert(pos, idx);
|
||||
|
||||
// Write new head
|
||||
if (!await _tryWriteHead()) {
|
||||
// Failed to write head means head got overwritten
|
||||
return false;
|
||||
}
|
||||
} on Exception catch (_) {
|
||||
// Exception on write means state needs to be reverted
|
||||
_head = oldHead;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Head write succeeded, now write item
|
||||
await eventualWriteItem(pos, value);
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<bool> trySwapItem(int aPos, int bPos) async {
|
||||
await _refreshHead(onlyUpdates: true);
|
||||
|
||||
final oldHead = _DHTShortArrayCache.from(_head);
|
||||
try {
|
||||
// Add new index
|
||||
final aIdx = _head.index[aPos];
|
||||
final bIdx = _head.index[bPos];
|
||||
_head.index[aPos] = bIdx;
|
||||
_head.index[bPos] = aIdx;
|
||||
|
||||
// Write new head
|
||||
if (!await _tryWriteHead()) {
|
||||
// Failed to write head means head got overwritten
|
||||
return false;
|
||||
}
|
||||
} on Exception catch (_) {
|
||||
// Exception on write means state needs to be reverted
|
||||
_head = oldHead;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<Uint8List?> tryRemoveItem(int pos) async {
|
||||
await _refreshHead(onlyUpdates: true);
|
||||
|
||||
final oldHead = _DHTShortArrayCache.from(_head);
|
||||
try {
|
||||
final removedIdx = _head.index.removeAt(pos);
|
||||
_freeIndex(removedIdx);
|
||||
final recordNumber = removedIdx ~/ _stride;
|
||||
final record = _getRecord(recordNumber);
|
||||
assert(record != null, 'Record does not exist');
|
||||
final recordSubkey =
|
||||
(removedIdx % _stride) + ((recordNumber == 0) ? 1 : 0);
|
||||
|
||||
// Write new head
|
||||
if (!await _tryWriteHead()) {
|
||||
// Failed to write head means head got overwritten
|
||||
return null;
|
||||
}
|
||||
|
||||
return record!.get(subkey: recordSubkey);
|
||||
} on Exception catch (_) {
|
||||
// Exception on write means state needs to be reverted
|
||||
_head = oldHead;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<T?> tryRemoveItemJson<T>(
|
||||
T Function(dynamic) fromJson,
|
||||
int pos,
|
||||
) =>
|
||||
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
|
||||
|
||||
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int>) fromBuffer, int pos) =>
|
||||
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
|
||||
|
||||
Future<bool> tryClear() async {
|
||||
await _refreshHead(onlyUpdates: true);
|
||||
|
||||
final oldHead = _DHTShortArrayCache.from(_head);
|
||||
try {
|
||||
_head.index.clear();
|
||||
_head.free.clear();
|
||||
|
||||
// Write new head
|
||||
if (!await _tryWriteHead()) {
|
||||
// Failed to write head means head got overwritten
|
||||
return false;
|
||||
}
|
||||
} on Exception catch (_) {
|
||||
// Exception on write means state needs to be reverted
|
||||
_head = oldHead;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<Uint8List?> tryWriteItem(int pos, Uint8List newValue) async {
|
||||
if (await _refreshHead(onlyUpdates: true)) {
|
||||
throw StateError('structure changed');
|
||||
}
|
||||
if (pos < 0 || pos >= _head.index.length) {
|
||||
throw IndexError.withLength(pos, _head.index.length);
|
||||
}
|
||||
final index = _head.index[pos];
|
||||
|
||||
final recordNumber = index ~/ _stride;
|
||||
final record = _getRecord(recordNumber);
|
||||
assert(record != null, 'Record does not exist');
|
||||
|
||||
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
|
||||
return record!.tryWriteBytes(newValue, subkey: recordSubkey);
|
||||
}
|
||||
|
||||
Future<void> eventualWriteItem(int pos, Uint8List newValue) async {
|
||||
Uint8List? oldData;
|
||||
do {
|
||||
// Set it back
|
||||
oldData = await tryWriteItem(pos, newValue);
|
||||
|
||||
// Repeat if newer data on the network was found
|
||||
} while (oldData != null);
|
||||
}
|
||||
|
||||
Future<void> eventualUpdateItem(
|
||||
int pos, Future<Uint8List> Function(Uint8List oldValue) update) async {
|
||||
var oldData = await getItem(pos);
|
||||
// Ensure it exists already
|
||||
if (oldData == null) {
|
||||
throw const FormatException('value does not exist');
|
||||
}
|
||||
do {
|
||||
// Update the data
|
||||
final updatedData = await update(oldData!);
|
||||
|
||||
// Set it back
|
||||
oldData = await tryWriteItem(pos, updatedData);
|
||||
|
||||
// Repeat if newer data on the network was found
|
||||
} while (oldData != null);
|
||||
}
|
||||
|
||||
Future<T?> tryWriteItemJson<T>(
|
||||
T Function(dynamic) fromJson,
|
||||
int pos,
|
||||
T newValue,
|
||||
) =>
|
||||
tryWriteItem(pos, jsonEncodeBytes(newValue))
|
||||
.then((out) => jsonDecodeOptBytes(fromJson, out));
|
||||
|
||||
Future<T?> tryWriteItemProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int>) fromBuffer,
|
||||
int pos,
|
||||
T newValue,
|
||||
) =>
|
||||
tryWriteItem(pos, newValue.writeToBuffer()).then((out) {
|
||||
if (out == null) {
|
||||
return null;
|
||||
}
|
||||
return fromBuffer(out);
|
||||
});
|
||||
|
||||
Future<void> eventualWriteItemJson<T>(int pos, T newValue) =>
|
||||
eventualWriteItem(pos, jsonEncodeBytes(newValue));
|
||||
|
||||
Future<void> eventualWriteItemProtobuf<T extends GeneratedMessage>(
|
||||
int pos, T newValue,
|
||||
{int subkey = -1}) =>
|
||||
eventualWriteItem(pos, newValue.writeToBuffer());
|
||||
|
||||
Future<void> eventualUpdateItemJson<T>(
|
||||
T Function(dynamic) fromJson,
|
||||
int pos,
|
||||
Future<T> Function(T) update,
|
||||
) =>
|
||||
eventualUpdateItem(pos, jsonUpdate(fromJson, update));
|
||||
|
||||
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
|
||||
T Function(List<int>) fromBuffer,
|
||||
int pos,
|
||||
Future<T> Function(T) update,
|
||||
) =>
|
||||
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update));
|
||||
}
|
4
lib/veilid_support/dht_support/dht_support.dart
Normal file
4
lib/veilid_support/dht_support/dht_support.dart
Normal file
|
@ -0,0 +1,4 @@
|
|||
export 'dht_record.dart';
|
||||
export 'dht_record_crypto.dart';
|
||||
export 'dht_record_pool.dart';
|
||||
export 'dht_short_array.dart';
|
Loading…
Add table
Add a link
Reference in a new issue