checkpoint

This commit is contained in:
Christien Rioux 2024-02-15 11:05:59 -07:00
parent 031d7aea82
commit fcccacfafa
8 changed files with 152 additions and 87 deletions

View File

@ -1,12 +1,28 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
@immutable
class DHTRecordWatchChange extends Equatable {
const DHTRecordWatchChange(
{required this.local, required this.data, required this.subkeys});
final bool local;
final Uint8List data;
final List<ValueSubkeyRange> subkeys;
@override
List<Object?> get props => [local, data, subkeys];
}
/////////////////////////////////////////////////
class DHTRecord {
DHTRecord(
{required VeilidRoutingContext routingContext,
@ -34,7 +50,7 @@ class DHTRecord {
bool _open;
bool _valid;
@internal
StreamController<VeilidUpdateValueChange>? watchController;
StreamController<DHTRecordWatchChange>? watchController;
@internal
bool needsWatchStateUpdate;
@internal
@ -160,76 +176,100 @@ class DHTRecord {
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
{int subkey = -1}) async {
subkey = subkeyOrDefault(subkey);
newValue = await _crypto.encrypt(newValue, subkey);
final lastSeq = _subkeySeqCache[subkey];
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
// Set the new data if possible
var valueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, newValue);
if (valueData == null) {
// Get the data to check its sequence number
valueData = await _routingContext.getDHTValue(
var newValueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, encryptedNewValue);
if (newValueData == null) {
// A newer value wasn't found on the set, but
// we may get a newer value when getting the value for the sequence number
newValueData = await _routingContext.getDHTValue(
_recordDescriptor.key, subkey, false);
assert(valueData != null, "can't get value that was just set");
_subkeySeqCache[subkey] = valueData!.seq;
if (newValueData == null) {
assert(newValueData != null, "can't get value that was just set");
return null;
}
_subkeySeqCache[subkey] = valueData.seq;
return valueData.data;
}
// Record new sequence number
final isUpdated = newValueData.seq != lastSeq;
_subkeySeqCache[subkey] = newValueData.seq;
// See if the encrypted data returned is exactly the same
// if so, shortcut and don't bother decrypting it
if (newValueData.data == encryptedNewValue) {
if (isUpdated) {
addLocalValueChange(newValue, subkey);
}
return null;
}
// Decrypt value to return it
final decryptedNewValue = await _crypto.decrypt(newValueData.data, subkey);
if (isUpdated) {
addLocalValueChange(decryptedNewValue, subkey);
}
return decryptedNewValue;
}
Future<void> eventualWriteBytes(Uint8List newValue, {int subkey = -1}) async {
subkey = subkeyOrDefault(subkey);
newValue = await _crypto.encrypt(newValue, subkey);
final lastSeq = _subkeySeqCache[subkey];
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
ValueData? valueData;
ValueData? newValueData;
do {
do {
// Set the new data
valueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, newValue);
newValueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, encryptedNewValue);
// Repeat if newer data on the network was found
} while (valueData != null);
} while (newValueData != null);
// Get the data to check its sequence number
valueData =
await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false);
assert(valueData != null, "can't get value that was just set");
_subkeySeqCache[subkey] = valueData!.seq;
newValueData = await _routingContext.getDHTValue(
_recordDescriptor.key, subkey, false);
if (newValueData == null) {
assert(newValueData != null, "can't get value that was just set");
return;
}
// Record new sequence number
_subkeySeqCache[subkey] = newValueData.seq;
// The encrypted data returned should be exactly the same
// as what we are trying to set,
// otherwise we still need to keep trying to set the value
} while (newValueData.data != encryptedNewValue);
final isUpdated = newValueData.seq != lastSeq;
if (isUpdated) {
addLocalValueChange(newValue, subkey);
}
}
Future<void> eventualUpdateBytes(
Future<Uint8List> Function(Uint8List oldValue) update,
Future<Uint8List> Function(Uint8List? oldValue) update,
{int subkey = -1}) async {
subkey = subkeyOrDefault(subkey);
// Get existing identity key, do not allow force refresh here
// Get the existing data, do not allow force refresh here
// because if we need a refresh the setDHTValue will fail anyway
var valueData =
await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false);
// Ensure it exists already
if (valueData == null) {
throw const FormatException('value does not exist');
}
var oldValue =
await get(subkey: subkey, forceRefresh: false, onlyUpdates: false);
do {
// Update cache
_subkeySeqCache[subkey] = valueData!.seq;
// Update the data
final oldData = await _crypto.decrypt(valueData.data, subkey);
final updatedData = await update(oldData);
final newData = await _crypto.encrypt(updatedData, subkey);
final updatedValue = await update(oldValue);
// Set it back
valueData = await _routingContext.setDHTValue(
_recordDescriptor.key, subkey, newData);
// Try to write it back to the network
oldValue = await tryWriteBytes(updatedValue, subkey: subkey);
// Repeat if newer data on the network was found
} while (valueData != null);
// Get the data to check its sequence number
valueData =
await _routingContext.getDHTValue(_recordDescriptor.key, subkey, false);
assert(valueData != null, "can't get value that was just set");
_subkeySeqCache[subkey] = valueData!.seq;
// Repeat update if newer data on the network was found
} while (oldValue != null);
}
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
@ -259,12 +299,12 @@ class DHTRecord {
eventualWriteBytes(newValue.writeToBuffer(), subkey: subkey);
Future<void> eventualUpdateJson<T>(
T Function(dynamic) fromJson, Future<T> Function(T) update,
T Function(dynamic) fromJson, Future<T> Function(T?) update,
{int subkey = -1}) =>
eventualUpdateBytes(jsonUpdate(fromJson, update), subkey: subkey);
Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, Future<T> Function(T) update,
T Function(List<int>) fromBuffer, Future<T> Function(T?) update,
{int subkey = -1}) =>
eventualUpdateBytes(protobufUpdate(fromBuffer, update), subkey: subkey);
@ -281,25 +321,34 @@ class DHTRecord {
}
}
Future<StreamSubscription<VeilidUpdateValueChange>> listen(
Future<StreamSubscription<DHTRecordWatchChange>> listen(
Future<void> Function(
DHTRecord record, Uint8List data, List<ValueSubkeyRange> subkeys)
onUpdate,
) async {
{bool localChanges = true}) async {
// Set up watch requirements
watchController ??=
StreamController<VeilidUpdateValueChange>.broadcast(onCancel: () {
StreamController<DHTRecordWatchChange>.broadcast(onCancel: () {
// If there are no more listeners then we can get rid of the controller
watchController = null;
});
return watchController!.stream.listen(
(update) {
(change) {
if (change.local && !localChanges) {
return;
}
Future.delayed(Duration.zero, () async {
final out = await _crypto.decrypt(
update.valueData.data, update.subkeys.first.low);
await onUpdate(this, out, update.subkeys);
final Uint8List data;
if (change.local) {
// local changes are not encrypted
data = change.data;
} else {
// incoming/remote changes are encrypted
data =
await _crypto.decrypt(change.data, change.subkeys.first.low);
}
await onUpdate(this, data, change.subkeys);
});
},
cancelOnError: true,
@ -316,4 +365,14 @@ class DHTRecord {
needsWatchStateUpdate = true;
}
}
void addLocalValueChange(Uint8List data, int subkey) {
watchController?.add(DHTRecordWatchChange(
local: true, data: data, subkeys: [ValueSubkeyRange.single(subkey)]));
}
void addRemoteValueChange(VeilidUpdateValueChange update) {
watchController?.add(DHTRecordWatchChange(
local: false, data: update.valueData.data, subkeys: update.subkeys));
}
}

View File

@ -102,7 +102,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecord get record => _record;
StreamSubscription<VeilidUpdateValueChange>? _subscription;
StreamSubscription<DHTRecordWatchChange>? _subscription;
late DHTRecord _record;
bool _wantsCloseRecord;
final StateFunction<T> _stateFunction;

View File

@ -351,7 +351,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
// Change
for (final kv in _opened.entries) {
if (kv.key == updateValueChange.key) {
kv.value.watchController?.add(updateValueChange);
kv.value.addRemoteValueChange(updateValueChange);
break;
}
}

View File

@ -63,8 +63,7 @@ class DHTShortArray {
_DHTShortArrayCache _head;
// Subscription to head and linked record internal changes
final Map<TypedKey, StreamSubscription<VeilidUpdateValueChange>>
_subscriptions;
final Map<TypedKey, StreamSubscription<DHTRecordWatchChange>> _subscriptions;
// Stream of external changes
StreamController<void>? _watchController;
// Watch mutex to ensure we keep the representation valid
@ -545,10 +544,10 @@ class DHTShortArray {
}
final result = await record!.get(subkey: recordSubkey);
if (result != null) {
// A change happened, notify any listeners
_watchController?.sink.add(null);
}
return result;
} on Exception catch (_) {
// Exception on write means state needs to be reverted
@ -607,8 +606,8 @@ class DHTShortArray {
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
final result = await record.tryWriteBytes(newValue, subkey: recordSubkey);
if (result != null) {
// A change happened, notify any listeners
if (result == null) {
// A newer value was not found, so the change took
_watchController?.sink.add(null);
}
return result;
@ -625,7 +624,7 @@ class DHTShortArray {
}
Future<void> eventualUpdateItem(
int pos, Future<Uint8List> Function(Uint8List oldValue) update) async {
int pos, Future<Uint8List> Function(Uint8List? oldValue) update) async {
var oldData = await getItem(pos);
// Ensure it exists already
if (oldData == null) {
@ -633,7 +632,7 @@ class DHTShortArray {
}
do {
// Update the data
final updatedData = await update(oldData!);
final updatedData = await update(oldData);
// Set it back
oldData = await tryWriteItem(pos, updatedData);
@ -673,14 +672,14 @@ class DHTShortArray {
Future<void> eventualUpdateItemJson<T>(
T Function(dynamic) fromJson,
int pos,
Future<T> Function(T) update,
Future<T> Function(T?) update,
) =>
eventualUpdateItem(pos, jsonUpdate(fromJson, update));
Future<void> eventualUpdateItemProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
int pos,
Future<T> Function(T) update,
Future<T> Function(T?) update,
) =>
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update));
@ -692,14 +691,17 @@ class DHTShortArray {
.wait;
// Update changes to the head record
// Don't watch for local changes because this class already handles
// notifying listeners and knows when it makes local changes
if (!_subscriptions.containsKey(_headRecord.key)) {
_subscriptions[_headRecord.key] =
await _headRecord.listen(_onUpdateRecord);
await _headRecord.listen(localChanges: false, _onUpdateRecord);
}
// Update changes to any linked records
for (final lr in _head.linkedRecords) {
if (!_subscriptions.containsKey(lr.key)) {
_subscriptions[lr.key] = await lr.listen(_onUpdateRecord);
_subscriptions[lr.key] =
await lr.listen(localChanges: false, _onUpdateRecord);
}
}
} on Exception {

View File

@ -13,14 +13,15 @@ Uint8List jsonEncodeBytes(Object? object,
utf8.encode(jsonEncode(object, toEncodable: toEncodable)));
Future<Uint8List> jsonUpdateBytes<T>(T Function(dynamic) fromJson,
Uint8List oldBytes, Future<T> Function(T) update) async {
final oldObj = fromJson(jsonDecode(utf8.decode(oldBytes)));
Uint8List? oldBytes, Future<T> Function(T?) update) async {
final oldObj =
oldBytes == null ? null : fromJson(jsonDecode(utf8.decode(oldBytes)));
final newObj = await update(oldObj);
return jsonEncodeBytes(newObj);
}
Future<Uint8List> Function(Uint8List) jsonUpdate<T>(
T Function(dynamic) fromJson, Future<T> Function(T) update) =>
Future<Uint8List> Function(Uint8List?) jsonUpdate<T>(
T Function(dynamic) fromJson, Future<T> Function(T?) update) =>
(oldBytes) => jsonUpdateBytes(fromJson, oldBytes, update);
T Function(Object?) genericFromJson<T>(

View File

@ -4,14 +4,14 @@ import 'package:protobuf/protobuf.dart';
Future<Uint8List> protobufUpdateBytes<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer,
Uint8List oldBytes,
Future<T> Function(T) update) async {
final oldObj = fromBuffer(oldBytes);
Uint8List? oldBytes,
Future<T> Function(T?) update) async {
final oldObj = oldBytes == null ? null : fromBuffer(oldBytes);
final newObj = await update(oldObj);
return Uint8List.fromList(newObj.writeToBuffer());
}
Future<Uint8List> Function(Uint8List)
Future<Uint8List> Function(Uint8List?)
protobufUpdate<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, Future<T> Function(T) update) =>
T Function(List<int>) fromBuffer, Future<T> Function(T?) update) =>
(oldBytes) => protobufUpdateBytes(fromBuffer, oldBytes, update);

View File

@ -411,7 +411,7 @@ packages:
source: hosted
version: "1.0.4"
mutex:
dependency: transitive
dependency: "direct main"
description:
path: "../mutex"
relative: true

View File

@ -16,6 +16,9 @@ dependencies:
json_annotation: ^4.8.1
loggy: ^2.0.3
meta: ^1.10.0
mutex:
path: ../mutex
protobuf: ^3.0.0
veilid:
# veilid: ^0.0.1