mirror of
https://gitlab.com/veilid/veilidchat.git
synced 2025-08-05 20:54:27 -04:00
Merge branch 'dht-perf' into 'main'
Dht perf See merge request veilid/veilidchat!50
This commit is contained in:
commit
0ea8fe7f28
10 changed files with 340 additions and 289 deletions
|
@ -1,3 +1,11 @@
|
||||||
|
## UNRELEASED ##
|
||||||
|
|
||||||
|
- Fix reconciliation `advance()`
|
||||||
|
- Add `pool stats` command
|
||||||
|
- Fixed issue with Android 'back' button exiting the app (#331)
|
||||||
|
- Deprecated accounts no longer crash application at startup
|
||||||
|
- Simplify SingleContactMessagesCubit and MessageReconciliation
|
||||||
|
|
||||||
## v0.4.7 ##
|
## v0.4.7 ##
|
||||||
- *Community Contributions*
|
- *Community Contributions*
|
||||||
- Fix getting stuck on splash screen when veilid is already started @bmv437 / @bgrift
|
- Fix getting stuck on splash screen when veilid is already started @bmv437 / @bgrift
|
||||||
|
|
|
@ -83,16 +83,13 @@ class AuthorInputQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a reconciled message and move to the next message
|
/// Move the reconciliation cursor (_inputPosition) forward on the input
|
||||||
|
/// queue and tees up the next message for processing
|
||||||
/// Returns true if there is more work to do
|
/// Returns true if there is more work to do
|
||||||
|
/// Returns false if there are no more messages to reconcile in this queue
|
||||||
Future<bool> advance() async {
|
Future<bool> advance() async {
|
||||||
final currentMessage = await getCurrentMessage();
|
|
||||||
if (currentMessage == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Move current message to previous
|
// Move current message to previous
|
||||||
_previousMessage = _currentMessage;
|
_previousMessage = await getCurrentMessage();
|
||||||
_currentMessage = null;
|
_currentMessage = null;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -178,7 +175,7 @@ class AuthorInputQueue {
|
||||||
|
|
||||||
// _inputPosition points to either before the input source starts
|
// _inputPosition points to either before the input source starts
|
||||||
// or the position of the previous element. We still need to set the
|
// or the position of the previous element. We still need to set the
|
||||||
// _currentMessage to the previous element so consume() can compare
|
// _currentMessage to the previous element so advance() can compare
|
||||||
// against it if we can.
|
// against it if we can.
|
||||||
if (_inputPosition >= 0) {
|
if (_inputPosition >= 0) {
|
||||||
_currentMessage = currentWindow
|
_currentMessage = currentWindow
|
||||||
|
|
|
@ -98,6 +98,16 @@ class _DeveloperPageState extends State<DeveloperPage> {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (debugCommand == 'pool stats') {
|
||||||
|
try {
|
||||||
|
DHTRecordPool.instance.debugPrintStats();
|
||||||
|
} on Exception catch (e, st) {
|
||||||
|
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if (debugCommand.startsWith('change_log_ignore ')) {
|
if (debugCommand.startsWith('change_log_ignore ')) {
|
||||||
final args = debugCommand.split(' ');
|
final args = debugCommand.split(' ');
|
||||||
if (args.length < 3) {
|
if (args.length < 3) {
|
||||||
|
@ -129,9 +139,10 @@ class _DeveloperPageState extends State<DeveloperPage> {
|
||||||
|
|
||||||
if (debugCommand == 'help') {
|
if (debugCommand == 'help') {
|
||||||
out = 'VeilidChat Commands:\n'
|
out = 'VeilidChat Commands:\n'
|
||||||
' pool allocations - List DHTRecordPool allocations\n'
|
' pool <allocations|opened|stats>\n'
|
||||||
' pool opened - List opened DHTRecord instances'
|
' allocations - List DHTRecordPool allocations\n'
|
||||||
' from the pool\n'
|
' opened - List opened DHTRecord instances\n'
|
||||||
|
' stats - Dump DHTRecordPool statistics\n'
|
||||||
' change_log_ignore <layer> <changes> change the log'
|
' change_log_ignore <layer> <changes> change the log'
|
||||||
' target ignore list for a tracing layer\n'
|
' target ignore list for a tracing layer\n'
|
||||||
' targets to add to the ignore list can be separated by'
|
' targets to add to the ignore list can be separated by'
|
||||||
|
|
|
@ -258,6 +258,14 @@ packages:
|
||||||
url: "https://pub.dev"
|
url: "https://pub.dev"
|
||||||
source: hosted
|
source: hosted
|
||||||
version: "4.1.2"
|
version: "4.1.2"
|
||||||
|
indent:
|
||||||
|
dependency: transitive
|
||||||
|
description:
|
||||||
|
name: indent
|
||||||
|
sha256: "819319a5c185f7fe412750c798953378b37a0d0d32564ce33e7c5acfd1372d2a"
|
||||||
|
url: "https://pub.dev"
|
||||||
|
source: hosted
|
||||||
|
version: "2.0.0"
|
||||||
integration_test:
|
integration_test:
|
||||||
dependency: "direct dev"
|
dependency: "direct dev"
|
||||||
description: flutter
|
description: flutter
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
export 'default_dht_record_cubit.dart';
|
export 'default_dht_record_cubit.dart';
|
||||||
export 'dht_record_cubit.dart';
|
export 'dht_record_cubit.dart';
|
||||||
export 'dht_record_pool.dart';
|
export 'dht_record_pool.dart';
|
||||||
|
export 'stats.dart';
|
||||||
|
|
|
@ -119,55 +119,56 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
||||||
/// * 'outSeqNum' optionally returns the sequence number of the value being
|
/// * 'outSeqNum' optionally returns the sequence number of the value being
|
||||||
/// returned if one was returned.
|
/// returned if one was returned.
|
||||||
Future<Uint8List?> get(
|
Future<Uint8List?> get(
|
||||||
{int subkey = -1,
|
{int subkey = -1,
|
||||||
VeilidCrypto? crypto,
|
VeilidCrypto? crypto,
|
||||||
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
|
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
|
||||||
Output<int>? outSeqNum}) async {
|
Output<int>? outSeqNum}) async =>
|
||||||
subkey = subkeyOrDefault(subkey);
|
_wrapStats('get', () async {
|
||||||
|
subkey = subkeyOrDefault(subkey);
|
||||||
|
|
||||||
// Get the last sequence number if we need it
|
// Get the last sequence number if we need it
|
||||||
final lastSeq =
|
final lastSeq =
|
||||||
refreshMode._inspectLocal ? await _localSubkeySeq(subkey) : null;
|
refreshMode._inspectLocal ? await _localSubkeySeq(subkey) : null;
|
||||||
|
|
||||||
// See if we only ever want the locally stored value
|
// See if we only ever want the locally stored value
|
||||||
if (refreshMode == DHTRecordRefreshMode.local && lastSeq == null) {
|
if (refreshMode == DHTRecordRefreshMode.local && lastSeq == null) {
|
||||||
// If it's not available locally already just return null now
|
// If it's not available locally already just return null now
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
var retry = kDHTTryAgainTries;
|
|
||||||
ValueData? valueData;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
valueData = await _routingContext.getDHTValue(key, subkey,
|
|
||||||
forceRefresh: refreshMode._forceRefresh);
|
|
||||||
break;
|
|
||||||
} on VeilidAPIExceptionTryAgain {
|
|
||||||
retry--;
|
|
||||||
if (retry == 0) {
|
|
||||||
throw const DHTExceptionNotAvailable();
|
|
||||||
}
|
}
|
||||||
await asyncSleep();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (valueData == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if this get resulted in a newer sequence number
|
var retry = kDHTTryAgainTries;
|
||||||
if (refreshMode == DHTRecordRefreshMode.update &&
|
ValueData? valueData;
|
||||||
lastSeq != null &&
|
while (true) {
|
||||||
valueData.seq <= lastSeq) {
|
try {
|
||||||
// If we're only returning updates then punt now
|
valueData = await _routingContext.getDHTValue(key, subkey,
|
||||||
return null;
|
forceRefresh: refreshMode._forceRefresh);
|
||||||
}
|
break;
|
||||||
// If we're returning a value, decrypt it
|
} on VeilidAPIExceptionTryAgain {
|
||||||
final out = (crypto ?? _crypto).decrypt(valueData.data);
|
retry--;
|
||||||
if (outSeqNum != null) {
|
if (retry == 0) {
|
||||||
outSeqNum.save(valueData.seq);
|
throw const DHTExceptionNotAvailable();
|
||||||
}
|
}
|
||||||
return out;
|
await asyncSleep();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (valueData == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if this get resulted in a newer sequence number
|
||||||
|
if (refreshMode == DHTRecordRefreshMode.update &&
|
||||||
|
lastSeq != null &&
|
||||||
|
valueData.seq <= lastSeq) {
|
||||||
|
// If we're only returning updates then punt now
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// If we're returning a value, decrypt it
|
||||||
|
final out = (crypto ?? _crypto).decrypt(valueData.data);
|
||||||
|
if (outSeqNum != null) {
|
||||||
|
outSeqNum.save(valueData.seq);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
});
|
||||||
|
|
||||||
/// Get a subkey value from this record.
|
/// Get a subkey value from this record.
|
||||||
/// Process the record returned with a JSON unmarshal function 'fromJson'.
|
/// Process the record returned with a JSON unmarshal function 'fromJson'.
|
||||||
|
@ -223,97 +224,102 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
||||||
/// If a newer value was found on the network, it is returned
|
/// If a newer value was found on the network, it is returned
|
||||||
/// If the value was succesfully written, null is returned
|
/// If the value was succesfully written, null is returned
|
||||||
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
|
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
|
||||||
{int subkey = -1,
|
{int subkey = -1,
|
||||||
VeilidCrypto? crypto,
|
VeilidCrypto? crypto,
|
||||||
KeyPair? writer,
|
KeyPair? writer,
|
||||||
Output<int>? outSeqNum}) async {
|
Output<int>? outSeqNum}) async =>
|
||||||
subkey = subkeyOrDefault(subkey);
|
_wrapStats('tryWriteBytes', () async {
|
||||||
final lastSeq = await _localSubkeySeq(subkey);
|
subkey = subkeyOrDefault(subkey);
|
||||||
final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue);
|
final lastSeq = await _localSubkeySeq(subkey);
|
||||||
|
final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue);
|
||||||
|
|
||||||
// Set the new data if possible
|
// Set the new data if possible
|
||||||
var newValueData = await _routingContext
|
var newValueData = await _routingContext.setDHTValue(
|
||||||
.setDHTValue(key, subkey, encryptedNewValue, writer: writer ?? _writer);
|
key, subkey, encryptedNewValue,
|
||||||
if (newValueData == null) {
|
writer: writer ?? _writer);
|
||||||
// A newer value wasn't found on the set, but
|
if (newValueData == null) {
|
||||||
// we may get a newer value when getting the value for the sequence number
|
// A newer value wasn't found on the set, but
|
||||||
newValueData = await _routingContext.getDHTValue(key, subkey);
|
// we may get a newer value when getting the value for the sequence number
|
||||||
if (newValueData == null) {
|
newValueData = await _routingContext.getDHTValue(key, subkey);
|
||||||
assert(newValueData != null, "can't get value that was just set");
|
if (newValueData == null) {
|
||||||
return null;
|
assert(newValueData != null, "can't get value that was just set");
|
||||||
}
|
return null;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Record new sequence number
|
// Record new sequence number
|
||||||
final isUpdated = newValueData.seq != lastSeq;
|
final isUpdated = newValueData.seq != lastSeq;
|
||||||
if (isUpdated && outSeqNum != null) {
|
if (isUpdated && outSeqNum != null) {
|
||||||
outSeqNum.save(newValueData.seq);
|
outSeqNum.save(newValueData.seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if the encrypted data returned is exactly the same
|
// See if the encrypted data returned is exactly the same
|
||||||
// if so, shortcut and don't bother decrypting it
|
// if so, shortcut and don't bother decrypting it
|
||||||
if (newValueData.data.equals(encryptedNewValue)) {
|
if (newValueData.data.equals(encryptedNewValue)) {
|
||||||
if (isUpdated) {
|
if (isUpdated) {
|
||||||
DHTRecordPool.instance._processLocalValueChange(key, newValue, subkey);
|
DHTRecordPool.instance
|
||||||
}
|
._processLocalValueChange(key, newValue, subkey);
|
||||||
return null;
|
}
|
||||||
}
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// Decrypt value to return it
|
// Decrypt value to return it
|
||||||
final decryptedNewValue =
|
final decryptedNewValue =
|
||||||
await (crypto ?? _crypto).decrypt(newValueData.data);
|
await (crypto ?? _crypto).decrypt(newValueData.data);
|
||||||
if (isUpdated) {
|
if (isUpdated) {
|
||||||
DHTRecordPool.instance
|
DHTRecordPool.instance
|
||||||
._processLocalValueChange(key, decryptedNewValue, subkey);
|
._processLocalValueChange(key, decryptedNewValue, subkey);
|
||||||
}
|
}
|
||||||
return decryptedNewValue;
|
return decryptedNewValue;
|
||||||
}
|
});
|
||||||
|
|
||||||
/// Attempt to write a byte buffer to a DHTRecord subkey
|
/// Attempt to write a byte buffer to a DHTRecord subkey
|
||||||
/// If a newer value was found on the network, another attempt
|
/// If a newer value was found on the network, another attempt
|
||||||
/// will be made to write the subkey until this succeeds
|
/// will be made to write the subkey until this succeeds
|
||||||
Future<void> eventualWriteBytes(Uint8List newValue,
|
Future<void> eventualWriteBytes(Uint8List newValue,
|
||||||
{int subkey = -1,
|
{int subkey = -1,
|
||||||
VeilidCrypto? crypto,
|
VeilidCrypto? crypto,
|
||||||
KeyPair? writer,
|
KeyPair? writer,
|
||||||
Output<int>? outSeqNum}) async {
|
Output<int>? outSeqNum}) async =>
|
||||||
subkey = subkeyOrDefault(subkey);
|
_wrapStats('eventualWriteBytes', () async {
|
||||||
final lastSeq = await _localSubkeySeq(subkey);
|
subkey = subkeyOrDefault(subkey);
|
||||||
final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue);
|
final lastSeq = await _localSubkeySeq(subkey);
|
||||||
|
final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue);
|
||||||
|
|
||||||
ValueData? newValueData;
|
ValueData? newValueData;
|
||||||
do {
|
do {
|
||||||
do {
|
do {
|
||||||
// Set the new data
|
// Set the new data
|
||||||
newValueData = await _routingContext.setDHTValue(
|
newValueData = await _routingContext.setDHTValue(
|
||||||
key, subkey, encryptedNewValue,
|
key, subkey, encryptedNewValue,
|
||||||
writer: writer ?? _writer);
|
writer: writer ?? _writer);
|
||||||
|
|
||||||
// Repeat if newer data on the network was found
|
// Repeat if newer data on the network was found
|
||||||
} while (newValueData != null);
|
} while (newValueData != null);
|
||||||
|
|
||||||
// Get the data to check its sequence number
|
// Get the data to check its sequence number
|
||||||
newValueData = await _routingContext.getDHTValue(key, subkey);
|
newValueData = await _routingContext.getDHTValue(key, subkey);
|
||||||
if (newValueData == null) {
|
if (newValueData == null) {
|
||||||
assert(newValueData != null, "can't get value that was just set");
|
assert(newValueData != null, "can't get value that was just set");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record new sequence number
|
// Record new sequence number
|
||||||
if (outSeqNum != null) {
|
if (outSeqNum != null) {
|
||||||
outSeqNum.save(newValueData.seq);
|
outSeqNum.save(newValueData.seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// The encrypted data returned should be exactly the same
|
// The encrypted data returned should be exactly the same
|
||||||
// as what we are trying to set,
|
// as what we are trying to set,
|
||||||
// otherwise we still need to keep trying to set the value
|
// otherwise we still need to keep trying to set the value
|
||||||
} while (!newValueData.data.equals(encryptedNewValue));
|
} while (!newValueData.data.equals(encryptedNewValue));
|
||||||
|
|
||||||
final isUpdated = newValueData.seq != lastSeq;
|
final isUpdated = newValueData.seq != lastSeq;
|
||||||
if (isUpdated) {
|
if (isUpdated) {
|
||||||
DHTRecordPool.instance._processLocalValueChange(key, newValue, subkey);
|
DHTRecordPool.instance
|
||||||
}
|
._processLocalValueChange(key, newValue, subkey);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
/// Attempt to write a byte buffer to a DHTRecord subkey
|
/// Attempt to write a byte buffer to a DHTRecord subkey
|
||||||
/// If a newer value was found on the network, another attempt
|
/// If a newer value was found on the network, another attempt
|
||||||
|
@ -321,32 +327,36 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
||||||
/// Each attempt to write the value calls an update function with the
|
/// Each attempt to write the value calls an update function with the
|
||||||
/// old value to determine what new value should be attempted for that write.
|
/// old value to determine what new value should be attempted for that write.
|
||||||
Future<void> eventualUpdateBytes(
|
Future<void> eventualUpdateBytes(
|
||||||
Future<Uint8List?> Function(Uint8List? oldValue) update,
|
Future<Uint8List?> Function(Uint8List? oldValue) update,
|
||||||
{int subkey = -1,
|
{int subkey = -1,
|
||||||
VeilidCrypto? crypto,
|
VeilidCrypto? crypto,
|
||||||
KeyPair? writer,
|
KeyPair? writer,
|
||||||
Output<int>? outSeqNum}) async {
|
Output<int>? outSeqNum}) async =>
|
||||||
subkey = subkeyOrDefault(subkey);
|
_wrapStats('eventualUpdateBytes', () async {
|
||||||
|
subkey = subkeyOrDefault(subkey);
|
||||||
|
|
||||||
// Get the existing data, do not allow force refresh here
|
// Get the existing data, do not allow force refresh here
|
||||||
// because if we need a refresh the setDHTValue will fail anyway
|
// because if we need a refresh the setDHTValue will fail anyway
|
||||||
var oldValue =
|
var oldValue =
|
||||||
await get(subkey: subkey, crypto: crypto, outSeqNum: outSeqNum);
|
await get(subkey: subkey, crypto: crypto, outSeqNum: outSeqNum);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
// Update the data
|
// Update the data
|
||||||
final updatedValue = await update(oldValue);
|
final updatedValue = await update(oldValue);
|
||||||
if (updatedValue == null) {
|
if (updatedValue == null) {
|
||||||
// If null is returned from the update, stop trying to do the update
|
// If null is returned from the update, stop trying to do the update
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// Try to write it back to the network
|
// Try to write it back to the network
|
||||||
oldValue = await tryWriteBytes(updatedValue,
|
oldValue = await tryWriteBytes(updatedValue,
|
||||||
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
|
subkey: subkey,
|
||||||
|
crypto: crypto,
|
||||||
|
writer: writer,
|
||||||
|
outSeqNum: outSeqNum);
|
||||||
|
|
||||||
// Repeat update if newer data on the network was found
|
// Repeat update if newer data on the network was found
|
||||||
} while (oldValue != null);
|
} while (oldValue != null);
|
||||||
}
|
});
|
||||||
|
|
||||||
/// Like 'tryWriteBytes' but with JSON marshal/unmarshal of the value
|
/// Like 'tryWriteBytes' but with JSON marshal/unmarshal of the value
|
||||||
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
|
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
|
||||||
|
@ -555,6 +565,9 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
||||||
local: false, data: update.value?.data, subkeys: update.subkeys);
|
local: false, data: update.value?.data, subkeys: update.subkeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<T> _wrapStats<T>(String func, Future<T> Function() closure) =>
|
||||||
|
DHTRecordPool.instance._stats.measure(key, debugName, func, closure);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
final _SharedDHTRecordData _sharedDHTRecordData;
|
final _SharedDHTRecordData _sharedDHTRecordData;
|
||||||
|
|
|
@ -273,24 +273,6 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// else {
|
|
||||||
|
|
||||||
// XXX: should no longer be necessary
|
|
||||||
// // Remove watch state
|
|
||||||
//
|
|
||||||
// for (final entry in _opened.entries) {
|
|
||||||
// final openedKey = entry.key;
|
|
||||||
// final openedRecordInfo = entry.value;
|
|
||||||
|
|
||||||
// if (openedKey == updateValueChange.key) {
|
|
||||||
// for (final rec in openedRecordInfo.records) {
|
|
||||||
// rec._watchState = null;
|
|
||||||
// }
|
|
||||||
// openedRecordInfo.shared.needsWatchStateUpdate = true;
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Log the current record allocations
|
/// Log the current record allocations
|
||||||
|
@ -320,6 +302,11 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Log the performance stats
|
||||||
|
void debugPrintStats() {
|
||||||
|
log('DHTRecordPool Stats:\n${_stats.debugString()}');
|
||||||
|
}
|
||||||
|
|
||||||
/// Public interface to DHTRecordPool logger
|
/// Public interface to DHTRecordPool logger
|
||||||
void log(String message) {
|
void log(String message) {
|
||||||
_logger?.call(message);
|
_logger?.call(message);
|
||||||
|
@ -369,109 +356,110 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<DHTRecord> _recordOpenCommon(
|
Future<DHTRecord> _recordOpenCommon(
|
||||||
{required String debugName,
|
{required String debugName,
|
||||||
required VeilidRoutingContext dhtctx,
|
required VeilidRoutingContext dhtctx,
|
||||||
required TypedKey recordKey,
|
required TypedKey recordKey,
|
||||||
required VeilidCrypto crypto,
|
required VeilidCrypto crypto,
|
||||||
required KeyPair? writer,
|
required KeyPair? writer,
|
||||||
required TypedKey? parent,
|
required TypedKey? parent,
|
||||||
required int defaultSubkey}) async {
|
required int defaultSubkey}) async =>
|
||||||
log('openDHTRecord: debugName=$debugName key=$recordKey');
|
_stats.measure(recordKey, debugName, '_recordOpenCommon', () async {
|
||||||
|
log('openDHTRecord: debugName=$debugName key=$recordKey');
|
||||||
|
|
||||||
// See if this has been opened yet
|
// See if this has been opened yet
|
||||||
final openedRecordInfo = await _mutex.protect(() async {
|
final openedRecordInfo = await _mutex.protect(() async {
|
||||||
// If we are opening a key that already exists
|
// If we are opening a key that already exists
|
||||||
// make sure we are using the same parent if one was specified
|
// make sure we are using the same parent if one was specified
|
||||||
_validateParentInner(parent, recordKey);
|
_validateParentInner(parent, recordKey);
|
||||||
|
|
||||||
return _opened[recordKey];
|
return _opened[recordKey];
|
||||||
});
|
});
|
||||||
|
|
||||||
if (openedRecordInfo == null) {
|
if (openedRecordInfo == null) {
|
||||||
// Fresh open, just open the record
|
// Fresh open, just open the record
|
||||||
var retry = kDHTKeyNotFoundTries;
|
var retry = kDHTKeyNotFoundTries;
|
||||||
late final DHTRecordDescriptor recordDescriptor;
|
late final DHTRecordDescriptor recordDescriptor;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
recordDescriptor =
|
recordDescriptor =
|
||||||
await dhtctx.openDHTRecord(recordKey, writer: writer);
|
await dhtctx.openDHTRecord(recordKey, writer: writer);
|
||||||
break;
|
break;
|
||||||
} on VeilidAPIExceptionTryAgain {
|
} on VeilidAPIExceptionTryAgain {
|
||||||
throw const DHTExceptionNotAvailable();
|
throw const DHTExceptionNotAvailable();
|
||||||
} on VeilidAPIExceptionKeyNotFound {
|
} on VeilidAPIExceptionKeyNotFound {
|
||||||
await asyncSleep();
|
await asyncSleep();
|
||||||
retry--;
|
retry--;
|
||||||
if (retry == 0) {
|
if (retry == 0) {
|
||||||
throw const DHTExceptionNotAvailable();
|
throw const DHTExceptionNotAvailable();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final newOpenedRecordInfo = _OpenedRecordInfo(
|
||||||
|
recordDescriptor: recordDescriptor,
|
||||||
|
defaultWriter: writer,
|
||||||
|
defaultRoutingContext: dhtctx);
|
||||||
|
|
||||||
|
final rec = DHTRecord._(
|
||||||
|
debugName: debugName,
|
||||||
|
routingContext: dhtctx,
|
||||||
|
defaultSubkey: defaultSubkey,
|
||||||
|
sharedDHTRecordData: newOpenedRecordInfo.shared,
|
||||||
|
writer: writer,
|
||||||
|
crypto: crypto);
|
||||||
|
|
||||||
|
await _mutex.protect(() async {
|
||||||
|
// Register the opened record
|
||||||
|
_opened[recordDescriptor.key] = newOpenedRecordInfo;
|
||||||
|
|
||||||
|
// Register the dependency
|
||||||
|
await _addDependencyInner(
|
||||||
|
parent,
|
||||||
|
recordKey,
|
||||||
|
debugName: debugName,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Register the newly opened record
|
||||||
|
newOpenedRecordInfo.records.add(rec);
|
||||||
|
});
|
||||||
|
|
||||||
|
return rec;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
final newOpenedRecordInfo = _OpenedRecordInfo(
|
// Already opened
|
||||||
recordDescriptor: recordDescriptor,
|
|
||||||
defaultWriter: writer,
|
|
||||||
defaultRoutingContext: dhtctx);
|
|
||||||
|
|
||||||
final rec = DHTRecord._(
|
// See if we need to reopen the record with a default writer and possibly
|
||||||
debugName: debugName,
|
// a different routing context
|
||||||
routingContext: dhtctx,
|
if (writer != null && openedRecordInfo.shared.defaultWriter == null) {
|
||||||
defaultSubkey: defaultSubkey,
|
await dhtctx.openDHTRecord(recordKey, writer: writer);
|
||||||
sharedDHTRecordData: newOpenedRecordInfo.shared,
|
// New writer if we didn't specify one before
|
||||||
writer: writer,
|
openedRecordInfo.shared.defaultWriter = writer;
|
||||||
crypto: crypto);
|
// New default routing context if we opened it again
|
||||||
|
openedRecordInfo.shared.defaultRoutingContext = dhtctx;
|
||||||
|
}
|
||||||
|
|
||||||
await _mutex.protect(() async {
|
final rec = DHTRecord._(
|
||||||
// Register the opened record
|
debugName: debugName,
|
||||||
_opened[recordDescriptor.key] = newOpenedRecordInfo;
|
routingContext: dhtctx,
|
||||||
|
defaultSubkey: defaultSubkey,
|
||||||
|
sharedDHTRecordData: openedRecordInfo.shared,
|
||||||
|
writer: writer,
|
||||||
|
crypto: crypto);
|
||||||
|
|
||||||
// Register the dependency
|
await _mutex.protect(() async {
|
||||||
await _addDependencyInner(
|
// Register the dependency
|
||||||
parent,
|
await _addDependencyInner(
|
||||||
recordKey,
|
parent,
|
||||||
debugName: debugName,
|
recordKey,
|
||||||
);
|
debugName: debugName,
|
||||||
|
);
|
||||||
|
|
||||||
// Register the newly opened record
|
openedRecordInfo.records.add(rec);
|
||||||
newOpenedRecordInfo.records.add(rec);
|
});
|
||||||
|
|
||||||
|
return rec;
|
||||||
});
|
});
|
||||||
|
|
||||||
return rec;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Already opened
|
|
||||||
|
|
||||||
// See if we need to reopen the record with a default writer and possibly
|
|
||||||
// a different routing context
|
|
||||||
if (writer != null && openedRecordInfo.shared.defaultWriter == null) {
|
|
||||||
await dhtctx.openDHTRecord(recordKey, writer: writer);
|
|
||||||
// New writer if we didn't specify one before
|
|
||||||
openedRecordInfo.shared.defaultWriter = writer;
|
|
||||||
// New default routing context if we opened it again
|
|
||||||
openedRecordInfo.shared.defaultRoutingContext = dhtctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
final rec = DHTRecord._(
|
|
||||||
debugName: debugName,
|
|
||||||
routingContext: dhtctx,
|
|
||||||
defaultSubkey: defaultSubkey,
|
|
||||||
sharedDHTRecordData: openedRecordInfo.shared,
|
|
||||||
writer: writer,
|
|
||||||
crypto: crypto);
|
|
||||||
|
|
||||||
await _mutex.protect(() async {
|
|
||||||
// Register the dependency
|
|
||||||
await _addDependencyInner(
|
|
||||||
parent,
|
|
||||||
recordKey,
|
|
||||||
debugName: debugName,
|
|
||||||
);
|
|
||||||
|
|
||||||
openedRecordInfo.records.add(rec);
|
|
||||||
});
|
|
||||||
|
|
||||||
return rec;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when a DHTRecord is closed
|
// Called when a DHTRecord is closed
|
||||||
// Cleans up the opened record housekeeping and processes any late deletions
|
// Cleans up the opened record housekeeping and processes any late deletions
|
||||||
Future<void> _recordClosed(DHTRecord record) async {
|
Future<void> _recordClosed(DHTRecord record) async {
|
||||||
|
@ -866,34 +854,37 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
||||||
void _pollWatch(TypedKey openedRecordKey, _OpenedRecordInfo openedRecordInfo,
|
void _pollWatch(TypedKey openedRecordKey, _OpenedRecordInfo openedRecordInfo,
|
||||||
_WatchState unionWatchState) {
|
_WatchState unionWatchState) {
|
||||||
singleFuture((this, _sfPollWatch, openedRecordKey), () async {
|
singleFuture((this, _sfPollWatch, openedRecordKey), () async {
|
||||||
final dhtctx = openedRecordInfo.shared.defaultRoutingContext;
|
await _stats.measure(
|
||||||
|
openedRecordKey, openedRecordInfo.debugNames, '_pollWatch', () async {
|
||||||
|
final dhtctx = openedRecordInfo.shared.defaultRoutingContext;
|
||||||
|
|
||||||
final currentReport = await dhtctx.inspectDHTRecord(openedRecordKey,
|
final currentReport = await dhtctx.inspectDHTRecord(openedRecordKey,
|
||||||
subkeys: unionWatchState.subkeys, scope: DHTReportScope.syncGet);
|
subkeys: unionWatchState.subkeys, scope: DHTReportScope.syncGet);
|
||||||
|
|
||||||
final fsc = currentReport.firstSeqChange;
|
final fsc = currentReport.firstSeqChange;
|
||||||
if (fsc == null) {
|
if (fsc == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final newerSubkeys = currentReport.newerOnlineSubkeys;
|
final newerSubkeys = currentReport.newerOnlineSubkeys;
|
||||||
|
|
||||||
final valueData = await dhtctx.getDHTValue(openedRecordKey, fsc.subkey,
|
final valueData = await dhtctx.getDHTValue(openedRecordKey, fsc.subkey,
|
||||||
forceRefresh: true);
|
forceRefresh: true);
|
||||||
if (valueData == null) {
|
if (valueData == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (valueData.seq < fsc.newSeq) {
|
if (valueData.seq < fsc.newSeq) {
|
||||||
log('inspect returned a newer seq than get: ${valueData.seq} < $fsc');
|
log('inspect returned a newer seq than get: ${valueData.seq} < $fsc');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fsc.oldSeq == null || valueData.seq > fsc.oldSeq!) {
|
if (fsc.oldSeq == null || valueData.seq > fsc.oldSeq!) {
|
||||||
processRemoteValueChange(VeilidUpdateValueChange(
|
processRemoteValueChange(VeilidUpdateValueChange(
|
||||||
key: openedRecordKey,
|
key: openedRecordKey,
|
||||||
subkeys: newerSubkeys,
|
subkeys: newerSubkeys,
|
||||||
count: 0xFFFFFFFF,
|
count: 0xFFFFFFFF,
|
||||||
value: valueData));
|
value: valueData));
|
||||||
}
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -915,8 +906,11 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
||||||
_watchStateProcessors.updateState(
|
_watchStateProcessors.updateState(
|
||||||
openedRecordKey,
|
openedRecordKey,
|
||||||
unionWatchState,
|
unionWatchState,
|
||||||
(newState) =>
|
(newState) => _stats.measure(
|
||||||
_watchStateChange(openedRecordKey, unionWatchState));
|
openedRecordKey,
|
||||||
|
openedRecordInfo.debugNames,
|
||||||
|
'_watchStateChange',
|
||||||
|
() => _watchStateChange(openedRecordKey, unionWatchState)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -958,6 +952,8 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
||||||
// Watch state processors
|
// Watch state processors
|
||||||
final _watchStateProcessors =
|
final _watchStateProcessors =
|
||||||
SingleStateProcessorMap<TypedKey, _WatchState?>();
|
SingleStateProcessorMap<TypedKey, _WatchState?>();
|
||||||
|
// Statistics
|
||||||
|
final _stats = DHTStats();
|
||||||
|
|
||||||
static DHTRecordPool? _singleton;
|
static DHTRecordPool? _singleton;
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,6 +331,14 @@ packages:
|
||||||
url: "https://pub.dev"
|
url: "https://pub.dev"
|
||||||
source: hosted
|
source: hosted
|
||||||
version: "4.1.2"
|
version: "4.1.2"
|
||||||
|
indent:
|
||||||
|
dependency: "direct main"
|
||||||
|
description:
|
||||||
|
name: indent
|
||||||
|
sha256: "819319a5c185f7fe412750c798953378b37a0d0d32564ce33e7c5acfd1372d2a"
|
||||||
|
url: "https://pub.dev"
|
||||||
|
source: hosted
|
||||||
|
version: "2.0.0"
|
||||||
io:
|
io:
|
||||||
dependency: transitive
|
dependency: transitive
|
||||||
description:
|
description:
|
||||||
|
|
|
@ -16,6 +16,7 @@ dependencies:
|
||||||
equatable: ^2.0.7
|
equatable: ^2.0.7
|
||||||
fast_immutable_collections: ^11.0.3
|
fast_immutable_collections: ^11.0.3
|
||||||
freezed_annotation: ^3.0.0
|
freezed_annotation: ^3.0.0
|
||||||
|
indent: ^2.0.0
|
||||||
json_annotation: ^4.9.0
|
json_annotation: ^4.9.0
|
||||||
loggy: ^2.0.3
|
loggy: ^2.0.3
|
||||||
meta: ^1.16.0
|
meta: ^1.16.0
|
||||||
|
|
|
@ -809,6 +809,14 @@ packages:
|
||||||
url: "https://pub.dev"
|
url: "https://pub.dev"
|
||||||
source: hosted
|
source: hosted
|
||||||
version: "4.5.3"
|
version: "4.5.3"
|
||||||
|
indent:
|
||||||
|
dependency: transitive
|
||||||
|
description:
|
||||||
|
name: indent
|
||||||
|
sha256: "819319a5c185f7fe412750c798953378b37a0d0d32564ce33e7c5acfd1372d2a"
|
||||||
|
url: "https://pub.dev"
|
||||||
|
source: hosted
|
||||||
|
version: "2.0.0"
|
||||||
intl:
|
intl:
|
||||||
dependency: "direct main"
|
dependency: "direct main"
|
||||||
description:
|
description:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue