optimization work and crypto overrides for dht operations

This commit is contained in:
Christien Rioux 2024-04-04 22:31:09 -04:00
parent 9bb20f4dd2
commit 5da68b2d94
4 changed files with 77 additions and 36 deletions

View File

@ -126,6 +126,13 @@ class ContactInvitationListCubit
.deleteScope((contactRequestInbox) async {
// Store ContactRequest in owner subkey
await contactRequestInbox.eventualWriteProtobuf(creq);
// Store an empty invitation response
await contactRequestInbox.eventualWriteBytes(Uint8List(0),
subkey: 1,
writer: contactRequestWriter,
crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(
TypedKeyPair.fromKeyPair(
contactRequestInbox.key.kind, contactRequestWriter)));
// Create ContactInvitation and SignedContactInvitation
final cinv = proto.ContactInvitation()

View File

@ -5,14 +5,16 @@ import '../../proto/proto.dart' as proto;
// Watch subkey #1 of the ContactRequest record for accept/reject
class ContactRequestInboxCubit
extends DefaultDHTRecordCubit<proto.SignedContactResponse> {
extends DefaultDHTRecordCubit<proto.SignedContactResponse?> {
ContactRequestInboxCubit(
{required this.activeAccountInfo, required this.contactInvitationRecord})
: super(
open: () => _open(
activeAccountInfo: activeAccountInfo,
contactInvitationRecord: contactInvitationRecord),
decodeState: proto.SignedContactResponse.fromBuffer);
decodeState: (buf) => buf.isEmpty
? null
: proto.SignedContactResponse.fromBuffer(buf));
// ContactRequestInboxCubit.value(
// {required super.record,

View File

@ -23,7 +23,7 @@ class InvitationStatus extends Equatable {
}
class WaitingInvitationCubit extends AsyncTransformerCubit<InvitationStatus,
proto.SignedContactResponse> {
proto.SignedContactResponse?> {
WaitingInvitationCubit(ContactRequestInboxCubit super.input,
{required ActiveAccountInfo activeAccountInfo,
required proto.Account account,
@ -36,10 +36,13 @@ class WaitingInvitationCubit extends AsyncTransformerCubit<InvitationStatus,
contactInvitationRecord: contactInvitationRecord));
static Future<AsyncValue<InvitationStatus>> _transform(
proto.SignedContactResponse signedContactResponse,
proto.SignedContactResponse? signedContactResponse,
{required ActiveAccountInfo activeAccountInfo,
required proto.Account account,
required proto.ContactInvitationRecord contactInvitationRecord}) async {
if (signedContactResponse == null) {
return const AsyncValue.loading();
}
final pool = DHTRecordPool.instance;
final contactResponseBytes =
Uint8List.fromList(signedContactResponse.contactResponse);

View File

@ -102,6 +102,7 @@ class DHTRecord {
Future<Uint8List?> get(
{int subkey = -1,
DHTRecordCrypto? crypto,
bool forceRefresh = false,
bool onlyUpdates = false}) async {
subkey = subkeyOrDefault(subkey);
@ -114,17 +115,21 @@ class DHTRecord {
if (onlyUpdates && lastSeq != null && valueData.seq <= lastSeq) {
return null;
}
final out = _crypto.decrypt(valueData.data, subkey);
final out = (crypto ?? _crypto).decrypt(valueData.data, subkey);
_sharedDHTRecordData.subkeySeqCache[subkey] = valueData.seq;
return out;
}
Future<T?> getJson<T>(T Function(dynamic) fromJson,
{int subkey = -1,
DHTRecordCrypto? crypto,
bool forceRefresh = false,
bool onlyUpdates = false}) async {
final data = await get(
subkey: subkey, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
subkey: subkey,
crypto: crypto,
forceRefresh: forceRefresh,
onlyUpdates: onlyUpdates);
if (data == null) {
return null;
}
@ -134,10 +139,14 @@ class DHTRecord {
Future<T?> getProtobuf<T extends GeneratedMessage>(
T Function(List<int> i) fromBuffer,
{int subkey = -1,
DHTRecordCrypto? crypto,
bool forceRefresh = false,
bool onlyUpdates = false}) async {
final data = await get(
subkey: subkey, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
subkey: subkey,
crypto: crypto,
forceRefresh: forceRefresh,
onlyUpdates: onlyUpdates);
if (data == null) {
return null;
}
@ -145,14 +154,15 @@ class DHTRecord {
}
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
{int subkey = -1}) async {
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
final encryptedNewValue =
await (crypto ?? _crypto).encrypt(newValue, subkey);
// Set the new data if possible
var newValueData = await _routingContext
.setDHTValue(key, subkey, encryptedNewValue, writer: _writer);
.setDHTValue(key, subkey, encryptedNewValue, writer: writer ?? _writer);
if (newValueData == null) {
// A newer value wasn't found on the set, but
// we may get a newer value when getting the value for the sequence number
@ -177,7 +187,8 @@ class DHTRecord {
}
// Decrypt value to return it
final decryptedNewValue = await _crypto.decrypt(newValueData.data, subkey);
final decryptedNewValue =
await (crypto ?? _crypto).decrypt(newValueData.data, subkey);
if (isUpdated) {
DHTRecordPool.instance
.processLocalValueChange(key, decryptedNewValue, subkey);
@ -185,17 +196,20 @@ class DHTRecord {
return decryptedNewValue;
}
Future<void> eventualWriteBytes(Uint8List newValue, {int subkey = -1}) async {
Future<void> eventualWriteBytes(Uint8List newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final encryptedNewValue = await _crypto.encrypt(newValue, subkey);
final encryptedNewValue =
await (crypto ?? _crypto).encrypt(newValue, subkey);
ValueData? newValueData;
do {
do {
// Set the new data
newValueData = await _routingContext
.setDHTValue(key, subkey, encryptedNewValue, writer: _writer);
newValueData = await _routingContext.setDHTValue(
key, subkey, encryptedNewValue,
writer: writer ?? _writer);
// Repeat if newer data on the network was found
} while (newValueData != null);
@ -223,27 +237,32 @@ class DHTRecord {
Future<void> eventualUpdateBytes(
Future<Uint8List> Function(Uint8List? oldValue) update,
{int subkey = -1}) async {
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer}) async {
subkey = subkeyOrDefault(subkey);
// Get the existing data, do not allow force refresh here
// because if we need a refresh the setDHTValue will fail anyway
var oldValue = await get(subkey: subkey);
var oldValue = await get(subkey: subkey, crypto: crypto);
do {
// Update the data
final updatedValue = await update(oldValue);
// Try to write it back to the network
oldValue = await tryWriteBytes(updatedValue, subkey: subkey);
oldValue = await tryWriteBytes(updatedValue,
subkey: subkey, crypto: crypto, writer: writer);
// Repeat update if newer data on the network was found
} while (oldValue != null);
}
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
{int subkey = -1}) =>
tryWriteBytes(jsonEncodeBytes(newValue), subkey: subkey).then((out) {
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
tryWriteBytes(jsonEncodeBytes(newValue),
subkey: subkey, crypto: crypto, writer: writer)
.then((out) {
if (out == null) {
return null;
}
@ -252,30 +271,37 @@ class DHTRecord {
Future<T?> tryWriteProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, T newValue,
{int subkey = -1}) =>
tryWriteBytes(newValue.writeToBuffer(), subkey: subkey).then((out) {
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
tryWriteBytes(newValue.writeToBuffer(),
subkey: subkey, crypto: crypto, writer: writer)
.then((out) {
if (out == null) {
return null;
}
return fromBuffer(out);
});
Future<void> eventualWriteJson<T>(T newValue, {int subkey = -1}) =>
eventualWriteBytes(jsonEncodeBytes(newValue), subkey: subkey);
Future<void> eventualWriteJson<T>(T newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
eventualWriteBytes(jsonEncodeBytes(newValue),
subkey: subkey, crypto: crypto, writer: writer);
Future<void> eventualWriteProtobuf<T extends GeneratedMessage>(T newValue,
{int subkey = -1}) =>
eventualWriteBytes(newValue.writeToBuffer(), subkey: subkey);
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
eventualWriteBytes(newValue.writeToBuffer(),
subkey: subkey, crypto: crypto, writer: writer);
Future<void> eventualUpdateJson<T>(
T Function(dynamic) fromJson, Future<T> Function(T?) update,
{int subkey = -1}) =>
eventualUpdateBytes(jsonUpdate(fromJson, update), subkey: subkey);
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
eventualUpdateBytes(jsonUpdate(fromJson, update),
subkey: subkey, crypto: crypto, writer: writer);
Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, Future<T> Function(T?) update,
{int subkey = -1}) =>
eventualUpdateBytes(protobufUpdate(fromBuffer, update), subkey: subkey);
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
eventualUpdateBytes(protobufUpdate(fromBuffer, update),
subkey: subkey, crypto: crypto, writer: writer);
Future<void> watch(
{List<ValueSubkeyRange>? subkeys,
@ -291,10 +317,12 @@ class DHTRecord {
}
Future<StreamSubscription<DHTRecordWatchChange>> listen(
Future<void> Function(
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys)
onUpdate,
{bool localChanges = true}) async {
Future<void> Function(
DHTRecord record, Uint8List? data, List<ValueSubkeyRange> subkeys)
onUpdate, {
bool localChanges = true,
DHTRecordCrypto? crypto,
}) async {
// Set up watch requirements
watchController ??=
StreamController<DHTRecordWatchChange>.broadcast(onCancel: () {
@ -317,7 +345,8 @@ class DHTRecord {
final changeData = change.data;
data = changeData == null
? null
: await _crypto.decrypt(changeData, change.subkeys.first.low);
: await (crypto ?? _crypto)
.decrypt(changeData, change.subkeys.first.low);
}
await onUpdate(this, data, change.subkeys);
});
@ -366,7 +395,7 @@ class DHTRecord {
overlappedFirstSubkey == updateFirstSubkey)
? data
: null;
// Report only wathced subkeys
// Report only watched subkeys
watchController?.add(DHTRecordWatchChange(
local: local, data: updatedData, subkeys: overlappedSubkeys));
}