From 672bcd5c3d4dd08be5236e2d70a8afc388aa9543 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 30 Jul 2023 01:11:24 -0400 Subject: [PATCH] short array work --- lib/entities/proto/veilidchat.pb.dart | 120 ++++++-- lib/entities/proto/veilidchat.pbjson.dart | 42 ++- lib/entities/veilidchat.proto | 11 +- lib/providers/contact_request_records.dart | 5 +- lib/veilid_support/dht_list.dart | 43 --- lib/veilid_support/dht_record.dart | 87 ++++-- lib/veilid_support/dht_short_array.dart | 339 +++++++++++++++++++++ 7 files changed, 543 insertions(+), 104 deletions(-) delete mode 100644 lib/veilid_support/dht_list.dart create mode 100644 lib/veilid_support/dht_short_array.dart diff --git a/lib/entities/proto/veilidchat.pb.dart b/lib/entities/proto/veilidchat.pb.dart index 61269e8..aad81a0 100644 --- a/lib/entities/proto/veilidchat.pb.dart +++ b/lib/entities/proto/veilidchat.pb.dart @@ -536,15 +536,15 @@ class DHTData extends $pb.GeneratedMessage { void clearSize() => clearField(4); } -class DHTList extends $pb.GeneratedMessage { - factory DHTList() => create(); - DHTList._() : super(); - factory DHTList.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory DHTList.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); +class DHTShortArray extends $pb.GeneratedMessage { + factory DHTShortArray() => create(); + DHTShortArray._() : super(); + factory DHTShortArray.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory DHTShortArray.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); - static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTList', createEmptyInstance: create) + static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'DHTShortArray', createEmptyInstance: create) ..pc(1, _omitFieldNames ? '' : 'keys', $pb.PbFieldType.PM, subBuilder: TypedKey.create) - ..p<$core.int>(2, _omitFieldNames ? '' : 'index', $pb.PbFieldType.KU3) + ..a<$core.List<$core.int>>(2, _omitFieldNames ? '' : 'index', $pb.PbFieldType.OY) ..hasRequiredFields = false ; @@ -552,28 +552,34 @@ class DHTList extends $pb.GeneratedMessage { 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' 'Will be removed in next major version') - DHTList clone() => DHTList()..mergeFromMessage(this); + DHTShortArray clone() => DHTShortArray()..mergeFromMessage(this); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' 'Will be removed in next major version') - DHTList copyWith(void Function(DHTList) updates) => super.copyWith((message) => updates(message as DHTList)) as DHTList; + DHTShortArray copyWith(void Function(DHTShortArray) updates) => super.copyWith((message) => updates(message as DHTShortArray)) as DHTShortArray; $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static DHTList create() => DHTList._(); - DHTList createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); + static DHTShortArray create() => DHTShortArray._(); + DHTShortArray createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); @$core.pragma('dart2js:noInline') - static DHTList getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static DHTList? _defaultInstance; + static DHTShortArray getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static DHTShortArray? _defaultInstance; @$pb.TagNumber(1) $core.List get keys => $_getList(0); @$pb.TagNumber(2) - $core.List<$core.int> get index => $_getList(1); + $core.List<$core.int> get index => $_getN(1); + @$pb.TagNumber(2) + set index($core.List<$core.int> v) { $_setBytes(1, v); } + @$pb.TagNumber(2) + $core.bool hasIndex() => $_has(1); + @$pb.TagNumber(2) + void clearIndex() => clearField(2); } class DHTLog extends $pb.GeneratedMessage { @@ -1111,6 +1117,74 @@ class Profile extends $pb.GeneratedMessage { TypedKey ensureAvatar() => $_ensure(4); } +class OwnedDHTRecordPointer extends $pb.GeneratedMessage { + factory OwnedDHTRecordPointer() => create(); + OwnedDHTRecordPointer._() : super(); + factory OwnedDHTRecordPointer.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory OwnedDHTRecordPointer.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + + static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'OwnedDHTRecordPointer', createEmptyInstance: create) + ..aOM(1, _omitFieldNames ? '' : 'recordKey', subBuilder: TypedKey.create) + ..aOM(2, _omitFieldNames ? '' : 'ownerKey', subBuilder: CryptoKey.create) + ..aOM(3, _omitFieldNames ? '' : 'ownerSecret', subBuilder: CryptoKey.create) + ..hasRequiredFields = false + ; + + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + OwnedDHTRecordPointer clone() => OwnedDHTRecordPointer()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + OwnedDHTRecordPointer copyWith(void Function(OwnedDHTRecordPointer) updates) => super.copyWith((message) => updates(message as OwnedDHTRecordPointer)) as OwnedDHTRecordPointer; + + $pb.BuilderInfo get info_ => _i; + + @$core.pragma('dart2js:noInline') + static OwnedDHTRecordPointer create() => OwnedDHTRecordPointer._(); + OwnedDHTRecordPointer createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static OwnedDHTRecordPointer getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static OwnedDHTRecordPointer? _defaultInstance; + + @$pb.TagNumber(1) + TypedKey get recordKey => $_getN(0); + @$pb.TagNumber(1) + set recordKey(TypedKey v) { setField(1, v); } + @$pb.TagNumber(1) + $core.bool hasRecordKey() => $_has(0); + @$pb.TagNumber(1) + void clearRecordKey() => clearField(1); + @$pb.TagNumber(1) + TypedKey ensureRecordKey() => $_ensure(0); + + @$pb.TagNumber(2) + CryptoKey get ownerKey => $_getN(1); + @$pb.TagNumber(2) + set ownerKey(CryptoKey v) { setField(2, v); } + @$pb.TagNumber(2) + $core.bool hasOwnerKey() => $_has(1); + @$pb.TagNumber(2) + void clearOwnerKey() => clearField(2); + @$pb.TagNumber(2) + CryptoKey ensureOwnerKey() => $_ensure(1); + + @$pb.TagNumber(3) + CryptoKey get ownerSecret => $_getN(2); + @$pb.TagNumber(3) + set ownerSecret(CryptoKey v) { setField(3, v); } + @$pb.TagNumber(3) + $core.bool hasOwnerSecret() => $_has(2); + @$pb.TagNumber(3) + void clearOwnerSecret() => clearField(3); + @$pb.TagNumber(3) + CryptoKey ensureOwnerSecret() => $_ensure(2); +} + class Account extends $pb.GeneratedMessage { factory Account() => create(); Account._() : super(); @@ -1121,8 +1195,8 @@ class Account extends $pb.GeneratedMessage { ..aOM(1, _omitFieldNames ? '' : 'profile', subBuilder: Profile.create) ..aOB(2, _omitFieldNames ? '' : 'invisible') ..a<$core.int>(3, _omitFieldNames ? '' : 'autoAwayTimeoutSec', $pb.PbFieldType.OU3) - ..aOM(4, _omitFieldNames ? '' : 'contactList', subBuilder: TypedKey.create) - ..aOM(5, _omitFieldNames ? '' : 'contactRequests', subBuilder: TypedKey.create) + ..aOM(4, _omitFieldNames ? '' : 'contactList', subBuilder: OwnedDHTRecordPointer.create) + ..aOM(5, _omitFieldNames ? '' : 'contactRequests', subBuilder: OwnedDHTRecordPointer.create) ..hasRequiredFields = false ; @@ -1177,26 +1251,26 @@ class Account extends $pb.GeneratedMessage { void clearAutoAwayTimeoutSec() => clearField(3); @$pb.TagNumber(4) - TypedKey get contactList => $_getN(3); + OwnedDHTRecordPointer get contactList => $_getN(3); @$pb.TagNumber(4) - set contactList(TypedKey v) { setField(4, v); } + set contactList(OwnedDHTRecordPointer v) { setField(4, v); } @$pb.TagNumber(4) $core.bool hasContactList() => $_has(3); @$pb.TagNumber(4) void clearContactList() => clearField(4); @$pb.TagNumber(4) - TypedKey ensureContactList() => $_ensure(3); + OwnedDHTRecordPointer ensureContactList() => $_ensure(3); @$pb.TagNumber(5) - TypedKey get contactRequests => $_getN(4); + OwnedDHTRecordPointer get contactRequests => $_getN(4); @$pb.TagNumber(5) - set contactRequests(TypedKey v) { setField(5, v); } + set contactRequests(OwnedDHTRecordPointer v) { setField(5, v); } @$pb.TagNumber(5) $core.bool hasContactRequests() => $_has(4); @$pb.TagNumber(5) void clearContactRequests() => clearField(5); @$pb.TagNumber(5) - TypedKey ensureContactRequests() => $_ensure(4); + OwnedDHTRecordPointer ensureContactRequests() => $_ensure(4); } class ContactInvitation extends $pb.GeneratedMessage { diff --git a/lib/entities/proto/veilidchat.pbjson.dart b/lib/entities/proto/veilidchat.pbjson.dart index 44de007..4d81fa8 100644 --- a/lib/entities/proto/veilidchat.pbjson.dart +++ b/lib/entities/proto/veilidchat.pbjson.dart @@ -163,19 +163,19 @@ final $typed_data.Uint8List dHTDataDescriptor = $convert.base64Decode( 'kuVHlwZWRLZXlSBGhhc2gSFAoFY2h1bmsYAyABKA1SBWNodW5rEhIKBHNpemUYBCABKA1SBHNp' 'emU='); -@$core.Deprecated('Use dHTListDescriptor instead') -const DHTList$json = { - '1': 'DHTList', +@$core.Deprecated('Use dHTShortArrayDescriptor instead') +const DHTShortArray$json = { + '1': 'DHTShortArray', '2': [ {'1': 'keys', '3': 1, '4': 3, '5': 11, '6': '.TypedKey', '10': 'keys'}, - {'1': 'index', '3': 2, '4': 3, '5': 13, '10': 'index'}, + {'1': 'index', '3': 2, '4': 1, '5': 12, '10': 'index'}, ], }; -/// Descriptor for `DHTList`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List dHTListDescriptor = $convert.base64Decode( - 'CgdESFRMaXN0Eh0KBGtleXMYASADKAsyCS5UeXBlZEtleVIEa2V5cxIUCgVpbmRleBgCIAMoDV' - 'IFaW5kZXg='); +/// Descriptor for `DHTShortArray`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List dHTShortArrayDescriptor = $convert.base64Decode( + 'Cg1ESFRTaG9ydEFycmF5Eh0KBGtleXMYASADKAsyCS5UeXBlZEtleVIEa2V5cxIUCgVpbmRleB' + 'gCIAEoDFIFaW5kZXg='); @$core.Deprecated('Use dHTLogDescriptor instead') const DHTLog$json = { @@ -308,6 +308,22 @@ final $typed_data.Uint8List profileDescriptor = $convert.base64Decode( 'eVIMYXZhaWxhYmlsaXR5EiYKBmF2YXRhchgFIAEoCzIJLlR5cGVkS2V5SABSBmF2YXRhcogBAU' 'IJCgdfYXZhdGFy'); +@$core.Deprecated('Use ownedDHTRecordPointerDescriptor instead') +const OwnedDHTRecordPointer$json = { + '1': 'OwnedDHTRecordPointer', + '2': [ + {'1': 'record_key', '3': 1, '4': 1, '5': 11, '6': '.TypedKey', '10': 'recordKey'}, + {'1': 'owner_key', '3': 2, '4': 1, '5': 11, '6': '.CryptoKey', '10': 'ownerKey'}, + {'1': 'owner_secret', '3': 3, '4': 1, '5': 11, '6': '.CryptoKey', '10': 'ownerSecret'}, + ], +}; + +/// Descriptor for `OwnedDHTRecordPointer`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List ownedDHTRecordPointerDescriptor = $convert.base64Decode( + 'ChVPd25lZERIVFJlY29yZFBvaW50ZXISKAoKcmVjb3JkX2tleRgBIAEoCzIJLlR5cGVkS2V5Ug' + 'lyZWNvcmRLZXkSJwoJb3duZXJfa2V5GAIgASgLMgouQ3J5cHRvS2V5Ughvd25lcktleRItCgxv' + 'd25lcl9zZWNyZXQYAyABKAsyCi5DcnlwdG9LZXlSC293bmVyU2VjcmV0'); + @$core.Deprecated('Use accountDescriptor instead') const Account$json = { '1': 'Account', @@ -315,8 +331,8 @@ const Account$json = { {'1': 'profile', '3': 1, '4': 1, '5': 11, '6': '.Profile', '10': 'profile'}, {'1': 'invisible', '3': 2, '4': 1, '5': 8, '10': 'invisible'}, {'1': 'auto_away_timeout_sec', '3': 3, '4': 1, '5': 13, '10': 'autoAwayTimeoutSec'}, - {'1': 'contact_list', '3': 4, '4': 1, '5': 11, '6': '.TypedKey', '10': 'contactList'}, - {'1': 'contact_requests', '3': 5, '4': 1, '5': 11, '6': '.TypedKey', '10': 'contactRequests'}, + {'1': 'contact_list', '3': 4, '4': 1, '5': 11, '6': '.OwnedDHTRecordPointer', '10': 'contactList'}, + {'1': 'contact_requests', '3': 5, '4': 1, '5': 11, '6': '.OwnedDHTRecordPointer', '10': 'contactRequests'}, ], }; @@ -324,9 +340,9 @@ const Account$json = { final $typed_data.Uint8List accountDescriptor = $convert.base64Decode( 'CgdBY2NvdW50EiIKB3Byb2ZpbGUYASABKAsyCC5Qcm9maWxlUgdwcm9maWxlEhwKCWludmlzaW' 'JsZRgCIAEoCFIJaW52aXNpYmxlEjEKFWF1dG9fYXdheV90aW1lb3V0X3NlYxgDIAEoDVISYXV0' - 'b0F3YXlUaW1lb3V0U2VjEiwKDGNvbnRhY3RfbGlzdBgEIAEoCzIJLlR5cGVkS2V5Ugtjb250YW' - 'N0TGlzdBI0ChBjb250YWN0X3JlcXVlc3RzGAUgASgLMgkuVHlwZWRLZXlSD2NvbnRhY3RSZXF1' - 'ZXN0cw=='); + 'b0F3YXlUaW1lb3V0U2VjEjkKDGNvbnRhY3RfbGlzdBgEIAEoCzIWLk93bmVkREhUUmVjb3JkUG' + '9pbnRlclILY29udGFjdExpc3QSQQoQY29udGFjdF9yZXF1ZXN0cxgFIAEoCzIWLk93bmVkREhU' + 'UmVjb3JkUG9pbnRlclIPY29udGFjdFJlcXVlc3Rz'); @$core.Deprecated('Use contactInvitationDescriptor instead') const ContactInvitation$json = { diff --git a/lib/entities/veilidchat.proto b/lib/entities/veilidchat.proto index 2053e7c..ee5af13 100644 --- a/lib/entities/veilidchat.proto +++ b/lib/entities/veilidchat.proto @@ -71,7 +71,7 @@ message DHTData { uint32 size = 4; } -// DHTList - represents a re-orderable collection of individual elements +// DHTShortArray - represents a re-orderable collection of up to 256 individual elements // Header in subkey 0 of first key follows this structure // // stride = descriptor subkey count on first key - 1 @@ -79,16 +79,17 @@ message DHTData { // Subkeys 0..stride on the 'keys' keys are also individual elements // // Keys must use writable schema in order to make this list mutable -message DHTList { +message DHTShortArray { // Other keys to concatenate // Uses the same writer as this DHTList with SMPL schema repeated TypedKey keys = 1; - // Item position index + + // Item position index (uint8[256]) // Actual item location is: // idx = index[n] + 1 (offset for header at idx 0) // key = idx / stride // subkey = idx % stride - repeated uint32 index = 2; + bytes index = 2; // Free items are not represented in the list but can be // calculated through iteration } @@ -250,7 +251,7 @@ message Account { // The contacts DHTList for this account // DHT Private OwnedDHTRecordPointer contact_list = 4; - // The contact requests DHTList for this account + // The contact requests DHTShortArray for this account // DHT Private OwnedDHTRecordPointer contact_requests = 5; } diff --git a/lib/providers/contact_request_records.dart b/lib/providers/contact_request_records.dart index 3718e52..ee90e99 100644 --- a/lib/providers/contact_request_records.dart +++ b/lib/providers/contact_request_records.dart @@ -9,14 +9,15 @@ import 'package:veilid/veilid.dart'; import '../entities/entities.dart'; import '../entities/proto.dart' as proto; import '../tools/tools.dart'; +import '../veilid_support/dht_short_array.dart'; import '../veilid_support/veilid_support.dart'; import 'logins.dart'; part 'contact_request_records.g.dart'; // Contact invitation records stored in Account -class ContactRequestRecords extends DHTList { - // +class ContactRequestRecords { + DHTShortArray _backingArray; Future newContactRequest( proto.EncryptionKind encryptionKind, diff --git a/lib/veilid_support/dht_list.dart b/lib/veilid_support/dht_list.dart deleted file mode 100644 index 85f90b1..0000000 --- a/lib/veilid_support/dht_list.dart +++ /dev/null @@ -1,43 +0,0 @@ -import 'dart:typed_data'; - -import 'package:protobuf/protobuf.dart'; -import 'package:veilid/veilid.dart'; - -import '../tools/tools.dart'; -import 'veilid_support.dart'; - -class DHTList { - DHTList({required DHTRecord dhtRecord}) : _dhtRecord = dhtRecord; - - final DHTRecord _dhtRecord; - - static Future create(VeilidRoutingContext dhtctx, - {DHTRecordCrypto? crypto}) async { - final dhtRecord = await DHTRecord.create(dhtctx, crypto: crypto); - final dhtList = DHTList(dhtRecord: dhtRecord); - return dhtList; - } - - static Future openRead( - VeilidRoutingContext dhtctx, TypedKey dhtRecordKey, - {DHTRecordCrypto? crypto}) async { - final dhtRecord = - await DHTRecord.openRead(dhtctx, dhtRecordKey, crypto: crypto); - final dhtList = DHTList(dhtRecord: dhtRecord); - return dhtList; - } - - static Future openWrite( - VeilidRoutingContext dhtctx, - TypedKey dhtRecordKey, - KeyPair writer, { - DHTRecordCrypto? crypto, - }) async { - final dhtRecord = - await DHTRecord.openWrite(dhtctx, dhtRecordKey, writer, crypto: crypto); - final dhtList = DHTList(dhtRecord: dhtRecord); - return dhtList; - } - - //////////////////////////////////////////////////////////////// -} diff --git a/lib/veilid_support/dht_record.dart b/lib/veilid_support/dht_record.dart index 67c4259..c493635 100644 --- a/lib/veilid_support/dht_record.dart +++ b/lib/veilid_support/dht_record.dart @@ -1,9 +1,9 @@ +import 'dart:async'; import 'dart:typed_data'; import 'package:protobuf/protobuf.dart'; import 'package:veilid/veilid.dart'; -import '../entities/proto.dart' as proto; import '../tools/tools.dart'; import 'veilid_support.dart'; @@ -17,11 +17,13 @@ class DHTRecord { : _dhtctx = dhtctx, _recordDescriptor = recordDescriptor, _defaultSubkey = defaultSubkey, - _writer = writer; + _writer = writer, + _subkeySeqCache = {}; final VeilidRoutingContext _dhtctx; final DHTRecordDescriptor _recordDescriptor; final int _defaultSubkey; final KeyPair? _writer; + final Map _subkeySeqCache; DHTRecordCrypto crypto; static Future create(VeilidRoutingContext dhtctx, @@ -76,12 +78,11 @@ class DHTRecord { int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey; + VeilidRoutingContext get routingContext => _dhtctx; TypedKey get key => _recordDescriptor.key; - PublicKey get owner => _recordDescriptor.owner; - KeyPair? get ownerKeyPair => _recordDescriptor.ownerKeyPair(); - + DHTSchema get schema => _recordDescriptor.schema; KeyPair? get writer => _writer; Future close() async { @@ -111,19 +112,31 @@ class DHTRecord { } } - Future get({int subkey = -1, bool forceRefresh = false}) async { + Future get( + {int subkey = -1, + bool forceRefresh = false, + bool onlyUpdates = false}) async { subkey = subkeyOrDefault(subkey); final valueData = - await _dhtctx.getDHTValue(_recordDescriptor.key, subkey, false); + await _dhtctx.getDHTValue(_recordDescriptor.key, subkey, forceRefresh); if (valueData == null) { return null; } - return crypto.decrypt(valueData.data, subkey); + final lastSeq = _subkeySeqCache[subkey]; + if (lastSeq != null && valueData.seq <= lastSeq) { + return null; + } + final out = crypto.decrypt(valueData.data, subkey); + _subkeySeqCache[subkey] = valueData.seq; + return out; } Future getJson(T Function(dynamic) fromJson, - {int subkey = -1, bool forceRefresh = false}) async { - final data = await get(subkey: subkey, forceRefresh: forceRefresh); + {int subkey = -1, + bool forceRefresh = false, + bool onlyUpdates = false}) async { + final data = await get( + subkey: subkey, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); if (data == null) { return null; } @@ -133,18 +146,34 @@ class DHTRecord { Future getProtobuf( T Function(List i) fromBuffer, {int subkey = -1, - bool forceRefresh = false}) async { - final data = await get(subkey: subkey, forceRefresh: forceRefresh); + bool forceRefresh = false, + bool onlyUpdates = false}) async { + final data = await get( + subkey: subkey, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); if (data == null) { return null; } return fromBuffer(data.toList()); } + Future tryWriteBytes(Uint8List newValue, + {int subkey = -1}) async { + subkey = subkeyOrDefault(subkey); + newValue = await crypto.encrypt(newValue, subkey); + + // Set the new data if possible + final valueData = + await _dhtctx.setDHTValue(_recordDescriptor.key, subkey, newValue); + if (valueData == null) { + return null; + } + return valueData.data; + } + Future eventualWriteBytes(Uint8List newValue, {int subkey = -1}) async { subkey = subkeyOrDefault(subkey); newValue = await crypto.encrypt(newValue, subkey); - // Get existing identity key + ValueData? valueData; do { // Set the new data @@ -159,14 +188,17 @@ class DHTRecord { Future Function(Uint8List oldValue) update, {int subkey = -1}) async { subkey = subkeyOrDefault(subkey); - // Get existing identity key + // Get existing identity key, do not allow force refresh here + // because if we need a refresh the setDHTValue will fail anyway var valueData = await _dhtctx.getDHTValue(_recordDescriptor.key, subkey, false); + // Ensure it exists already + if (valueData == null) { + throw const FormatException('value does not exist'); + } do { - // Ensure it exists already - if (valueData == null) { - throw const FormatException('value does not exist'); - } + // Update cache + _subkeySeqCache[subkey] = valueData!.seq; // Update the data final oldData = await crypto.decrypt(valueData.data, subkey); @@ -181,6 +213,25 @@ class DHTRecord { } while (valueData != null); } + Future tryWriteJson(T Function(dynamic) fromJson, T newValue, + {int subkey = -1}) => + tryWriteBytes(jsonEncodeBytes(newValue), subkey: subkey).then((out) { + if (out == null) { + return null; + } + return jsonDecodeBytes(fromJson, out); + }); + + Future tryWriteProtobuf( + T Function(List) fromBuffer, T newValue, + {int subkey = -1}) => + tryWriteBytes(newValue.writeToBuffer(), subkey: subkey).then((out) { + if (out == null) { + return null; + } + return fromBuffer(out); + }); + Future eventualWriteJson(T newValue, {int subkey = -1}) => eventualWriteBytes(jsonEncodeBytes(newValue), subkey: subkey); diff --git a/lib/veilid_support/dht_short_array.dart b/lib/veilid_support/dht_short_array.dart new file mode 100644 index 0000000..a962a5f --- /dev/null +++ b/lib/veilid_support/dht_short_array.dart @@ -0,0 +1,339 @@ +import 'dart:typed_data'; + +import 'package:protobuf/protobuf.dart'; +import 'package:veilid/veilid.dart'; + +import '../entities/proto.dart' as proto; +import '../tools/tools.dart'; +import 'veilid_support.dart'; + +class _DHTShortArrayCache { + _DHTShortArrayCache() + : linkedRecords = List.empty(growable: true), + index = List.empty(growable: true), + free = List.empty(growable: true); + + final List linkedRecords; + final List index; + final List free; +} + +class DHTShortArray { + DHTShortArray({required DHTRecord dhtRecord}) + : _headRecord = dhtRecord, + _head = _DHTShortArrayCache() { + late final int stride; + switch (dhtRecord.schema) { + case DHTSchemaDFLT(oCnt: final oCnt): + stride = oCnt - 1; + if (stride <= 0) { + throw StateError('Invalid stride in DHTShortArray'); + } + case DHTSchemaSMPL(): + throw StateError('Wrote kind of DHT record for DHTShortArray'); + } + _stride = stride; + } + + // Head DHT record + final DHTRecord _headRecord; + late final int _stride; + + // Cached representation refreshed from head record + _DHTShortArrayCache _head; + + static Future create(VeilidRoutingContext dhtctx, int stride, + {DHTRecordCrypto? crypto}) async { + final dhtRecord = await DHTRecord.create(dhtctx, + schema: DHTSchema.dflt(oCnt: stride + 1), crypto: crypto); + final dhtShortArray = DHTShortArray(dhtRecord: dhtRecord); + return dhtShortArray; + } + + static Future openRead( + VeilidRoutingContext dhtctx, TypedKey dhtRecordKey, + {DHTRecordCrypto? crypto}) async { + final dhtRecord = + await DHTRecord.openRead(dhtctx, dhtRecordKey, crypto: crypto); + final dhtShortArray = DHTShortArray(dhtRecord: dhtRecord); + await dhtShortArray._refreshHead(); + return dhtShortArray; + } + + static Future openWrite( + VeilidRoutingContext dhtctx, + TypedKey dhtRecordKey, + KeyPair writer, { + DHTRecordCrypto? crypto, + }) async { + final dhtRecord = + await DHTRecord.openWrite(dhtctx, dhtRecordKey, writer, crypto: crypto); + final dhtShortArray = DHTShortArray(dhtRecord: dhtRecord); + await dhtShortArray._refreshHead(); + return dhtShortArray; + } + + //////////////////////////////////////////////////////////////// + + /// Write the current head cache out to a protobuf to be serialized + Uint8List _headToBuffer() { + final head = proto.DHTShortArray(); + head.keys.addAll(_head.linkedRecords.map((lr) => lr.key.toProto())); + head.index.addAll(_head.index); + return head.writeToBuffer(); + } + + Future _openLinkedRecord(TypedKey recordKey) async { + final writer = _headRecord.writer; + return (writer != null) + ? await DHTRecord.openWrite( + _headRecord.routingContext, recordKey, writer) + : await DHTRecord.openRead(_headRecord.routingContext, recordKey); + } + + /// Validate the head from the DHT is properly formatted + /// and calculate the free list from it while we're here + List _validateHeadCacheData( + List> linkedKeys, List index) { + // Ensure nothing is duplicated in the linked keys set + final newKeys = linkedKeys.toSet(); + assert(newKeys.length == linkedKeys.length, 'duplicated linked keys'); + final newIndex = index.toSet(); + assert(newIndex.length == newIndex.length, 'duplicated index locations'); + // Ensure all the index keys fit into the existing records + final indexCount = (linkedKeys.length + 1) * _stride; + int? maxIndex; + for (final idx in newIndex) { + assert(idx >= 0 || idx < indexCount, 'index out of range'); + if (maxIndex == null || idx > maxIndex) { + maxIndex = idx; + } + } + final free = []; + if (maxIndex != null) { + for (var i = 0; i < maxIndex; i++) { + if (!newIndex.contains(i)) { + free.add(i); + } + } + } + return free; + } + + Future _refreshHead( + {bool forceRefresh = false, bool onlyUpdates = false}) async { + // Get an updated head record copy if one exists + final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, + forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); + if (head == null) { + if (onlyUpdates) { + // No update + return false; + } + throw StateError('head missing during initial refresh'); + } + + // Get the set of new linked keys and validate it + final linkedKeys = head.keys.map(proto.TypedKeyProto.fromProto).toList(); + final index = head.index; + final free = _validateHeadCacheData(linkedKeys, index); + + // See which records are actually new + final oldRecords = Map.fromEntries( + _head.linkedRecords.map((lr) => MapEntry(lr.key, lr))); + final newRecords = {}; + final sameRecords = {}; + try { + for (var n = 0; n < linkedKeys.length; n++) { + final newKey = linkedKeys[n]; + final oldRecord = oldRecords[newKey]; + if (oldRecord == null) { + // Open the new record + final newRecord = await _openLinkedRecord(newKey); + newRecords[newKey] = newRecord; + } else { + sameRecords[newKey] = oldRecord; + } + } + } on Exception catch (_) { + // On any exception close the records we have opened + await Future.wait(newRecords.entries.map((e) => e.value.close())); + } + + // From this point forward we should not throw an exception or everything + // is possibly invalid. Just pass the exception up it happens and the caller + // will have to delete this short array and reopen it if it can + await Future.wait(oldRecords.entries + .where((e) => !sameRecords.containsKey(e.key)) + .map((e) => e.value.close())); + + // Figure out which indices are free + + // Make the new head cache + _head = _DHTShortArrayCache() + ..linkedRecords.addAll( + linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!)) + ..index.addAll(index) + ..free.addAll(free); + return true; + } + + //////////////////////////////////////////////////////////////// + + Future close() async { + final futures = >[_headRecord.close()]; + for (final lr in _head.linkedRecords) { + futures.add(lr.close()); + } + await Future.wait(futures); + } + + Future delete() async { + final futures = >[_headRecord.close()]; + for (final lr in _head.linkedRecords) { + futures.add(lr.delete()); + } + await Future.wait(futures); + } + + Future scope(Future Function(DHTShortArray) scopeFunction) async { + try { + return await scopeFunction(this); + } finally { + await close(); + } + } + + Future deleteScope( + Future Function(DHTShortArray) scopeFunction) async { + try { + final out = await scopeFunction(this); + await close(); + return out; + } on Exception catch (_) { + await delete(); + rethrow; + } + } + + DHTRecord? _getRecord(int recordNumber) { + if (recordNumber == 0) { + return _headRecord; + } + recordNumber--; + if (recordNumber >= _head.linkedRecords.length) { + return null; + } + return _head.linkedRecords[recordNumber]; + } + + // xxx: add + // xxx: insert + // xxx: swap + // xxx: remove + // xxx: clear + + Future getItem(int index, {bool forceRefresh = false}) async { + await _refreshHead(forceRefresh: forceRefresh, onlyUpdates: true); + + if (index < 0 || index >= _head.index.length) { + throw IndexError.withLength(index, _head.index.length); + } + final recordNumber = index ~/ _stride; + final record = _getRecord(recordNumber); + assert(record != null, 'Record does not exist'); + + final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0); + return record!.get(subkey: recordSubkey, forceRefresh: forceRefresh); + } + + Future tryWriteItem(int index, Uint8List newValue) async { + if (await _refreshHead(onlyUpdates: true)) { + throw StateError('structure changed'); + } + + if (index < 0 || index >= _head.index.length) { + throw IndexError.withLength(index, _head.index.length); + } + final recordNumber = index ~/ _stride; + final record = _getRecord(recordNumber); + assert(record != null, 'Record does not exist'); + + final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0); + return record!.tryWriteBytes(newValue, subkey: recordSubkey); + } + + Future eventualWriteItem(int index, Uint8List newValue) async { + Uint8List? oldData; + do { + // Set it back + oldData = await tryWriteItem(index, newValue); + + // Repeat if newer data on the network was found + } while (oldData != null); + } + + Future eventualUpdateItem( + int index, Future Function(Uint8List oldValue) update) async { + var oldData = await getItem(index); + // Ensure it exists already + if (oldData == null) { + throw const FormatException('value does not exist'); + } + do { + // Update the data + final updatedData = await update(oldData!); + + // Set it back + oldData = await tryWriteItem(index, updatedData); + + // Repeat if newer data on the network was found + } while (oldData != null); + } + + Future tryWriteItemJson( + T Function(dynamic) fromJson, + int index, + T newValue, + ) => + tryWriteItem(index, jsonEncodeBytes(newValue)).then((out) { + if (out == null) { + return null; + } + return jsonDecodeBytes(fromJson, out); + }); + + Future tryWriteItemProtobuf( + T Function(List) fromBuffer, + int index, + T newValue, + ) => + tryWriteItem(index, newValue.writeToBuffer()).then((out) { + if (out == null) { + return null; + } + return fromBuffer(out); + }); + + Future eventualWriteItemJson(int index, T newValue) => + eventualWriteItem(index, jsonEncodeBytes(newValue)); + + Future eventualWriteItemProtobuf( + int index, T newValue, + {int subkey = -1}) => + eventualWriteItem(index, newValue.writeToBuffer()); + + Future eventualUpdateItemJson( + T Function(dynamic) fromJson, + int index, + Future Function(T) update, + ) => + eventualUpdateItem(index, jsonUpdate(fromJson, update)); + + Future eventualUpdateItemProtobuf( + T Function(List) fromBuffer, + int index, + Future Function(T) update, + ) => + eventualUpdateItem(index, protobufUpdate(fromBuffer, update)); +}