mirror of
https://gitlab.com/veilid/veilidchat.git
synced 2025-06-05 05:12:23 -04:00
beta warning dialog
This commit is contained in:
parent
ba191d3903
commit
6080c2f0c6
26 changed files with 445 additions and 339 deletions
|
@ -37,7 +37,7 @@ typedef DHTLogState<T> = AsyncValue<DHTLogStateData<T>>;
|
|||
typedef DHTLogBusyState<T> = BlocBusyState<DHTLogState<T>>;
|
||||
|
||||
class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
|
||||
with BlocBusyWrapper<DHTLogState<T>> {
|
||||
with BlocBusyWrapper<DHTLogState<T>>, RefreshableCubit {
|
||||
DHTLogCubit({
|
||||
required Future<DHTLog> Function() open,
|
||||
required T Function(List<int> data) decodeElement,
|
||||
|
@ -52,7 +52,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
|
|||
_log = await open();
|
||||
_wantsCloseRecord = true;
|
||||
break;
|
||||
} on VeilidAPIExceptionTryAgain {
|
||||
} on DHTExceptionNotAvailable {
|
||||
// Wait for a bit
|
||||
await asyncSleep();
|
||||
}
|
||||
|
@ -91,6 +91,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
|
|||
await _refreshNoWait(forceRefresh: forceRefresh);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> refresh({bool forceRefresh = false}) async {
|
||||
await _initWait();
|
||||
await _refreshNoWait(forceRefresh: forceRefresh);
|
||||
|
@ -101,68 +102,51 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
|
|||
|
||||
Future<void> _refreshInner(void Function(AsyncValue<DHTLogStateData<T>>) emit,
|
||||
{bool forceRefresh = false}) async {
|
||||
late final AsyncValue<IList<OnlineElementState<T>>> avElements;
|
||||
late final int length;
|
||||
await _log.operate((reader) async {
|
||||
final window = await _log.operate((reader) async {
|
||||
length = reader.length;
|
||||
avElements =
|
||||
await loadElementsFromReader(reader, _windowTail, _windowSize);
|
||||
return loadElementsFromReader(reader, _windowTail, _windowSize);
|
||||
});
|
||||
final err = avElements.asError;
|
||||
if (err != null) {
|
||||
emit(AsyncValue.error(err.error, err.stackTrace));
|
||||
if (window == null) {
|
||||
setWantsRefresh();
|
||||
return;
|
||||
}
|
||||
final loading = avElements.asLoading;
|
||||
if (loading != null) {
|
||||
emit(const AsyncValue.loading());
|
||||
return;
|
||||
}
|
||||
final window = avElements.asData!.value;
|
||||
emit(AsyncValue.data(DHTLogStateData(
|
||||
length: length,
|
||||
window: window,
|
||||
windowTail: _windowTail,
|
||||
windowSize: _windowSize,
|
||||
follow: _follow)));
|
||||
setRefreshed();
|
||||
}
|
||||
|
||||
// Tail is one past the last element to load
|
||||
Future<AsyncValue<IList<OnlineElementState<T>>>> loadElementsFromReader(
|
||||
Future<IList<OnlineElementState<T>>?> loadElementsFromReader(
|
||||
DHTLogReadOperations reader, int tail, int count,
|
||||
{bool forceRefresh = false}) async {
|
||||
try {
|
||||
final length = reader.length;
|
||||
if (length == 0) {
|
||||
return const AsyncValue.data(IList.empty());
|
||||
}
|
||||
final end = ((tail - 1) % length) + 1;
|
||||
final start = (count < end) ? end - count : 0;
|
||||
|
||||
// If this is writeable get the offline positions
|
||||
Set<int>? offlinePositions;
|
||||
if (_log.writer != null) {
|
||||
offlinePositions = await reader.getOfflinePositions();
|
||||
if (offlinePositions == null) {
|
||||
return const AsyncValue.loading();
|
||||
}
|
||||
}
|
||||
|
||||
// Get the items
|
||||
final allItems = (await reader.getRange(start,
|
||||
length: end - start, forceRefresh: forceRefresh))
|
||||
?.indexed
|
||||
.map((x) => OnlineElementState(
|
||||
value: _decodeElement(x.$2),
|
||||
isOffline: offlinePositions?.contains(x.$1) ?? false))
|
||||
.toIList();
|
||||
if (allItems == null) {
|
||||
return const AsyncValue.loading();
|
||||
}
|
||||
return AsyncValue.data(allItems);
|
||||
} on Exception catch (e, st) {
|
||||
return AsyncValue.error(e, st);
|
||||
final length = reader.length;
|
||||
if (length == 0) {
|
||||
return const IList.empty();
|
||||
}
|
||||
final end = ((tail - 1) % length) + 1;
|
||||
final start = (count < end) ? end - count : 0;
|
||||
|
||||
// If this is writeable get the offline positions
|
||||
Set<int>? offlinePositions;
|
||||
if (_log.writer != null) {
|
||||
offlinePositions = await reader.getOfflinePositions();
|
||||
}
|
||||
|
||||
// Get the items
|
||||
final allItems = (await reader.getRange(start,
|
||||
length: end - start, forceRefresh: forceRefresh))
|
||||
?.indexed
|
||||
.map((x) => OnlineElementState(
|
||||
value: _decodeElement(x.$2),
|
||||
isOffline: offlinePositions?.contains(x.$1) ?? false))
|
||||
.toIList();
|
||||
|
||||
return allItems;
|
||||
}
|
||||
|
||||
void _update(DHTLogUpdate upd) {
|
||||
|
|
|
@ -47,11 +47,13 @@ class _DHTLogRead implements DHTLogReadOperations {
|
|||
|
||||
final chunks = Iterable<int>.generate(length)
|
||||
.slices(kMaxDHTConcurrency)
|
||||
.map((chunk) =>
|
||||
chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh)));
|
||||
.map((chunk) => chunk
|
||||
.map((pos) async => get(pos + start, forceRefresh: forceRefresh)));
|
||||
|
||||
for (final chunk in chunks) {
|
||||
final elems = await chunk.wait;
|
||||
|
||||
// If any element was unavailable, return null
|
||||
if (elems.contains(null)) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -296,7 +296,7 @@ class _DHTLogSpine {
|
|||
segmentKeyBytes);
|
||||
}
|
||||
|
||||
Future<DHTShortArray> _openOrCreateSegment(int segmentNumber) async {
|
||||
Future<DHTShortArray?> _openOrCreateSegment(int segmentNumber) async {
|
||||
assert(_spineMutex.isLocked, 'should be in mutex here');
|
||||
assert(_spineRecord.writer != null, 'should be writable');
|
||||
|
||||
|
@ -306,51 +306,56 @@ class _DHTLogSpine {
|
|||
final subkey = l.subkey;
|
||||
final segment = l.segment;
|
||||
|
||||
var subkeyData = await _spineRecord.get(subkey: subkey);
|
||||
subkeyData ??= _makeEmptySubkey();
|
||||
while (true) {
|
||||
final segmentKey = _getSegmentKey(subkeyData!, segment);
|
||||
if (segmentKey == null) {
|
||||
// Create a shortarray segment
|
||||
final segmentRec = await DHTShortArray.create(
|
||||
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
|
||||
stride: _segmentStride,
|
||||
crypto: _spineRecord.crypto,
|
||||
parent: _spineRecord.key,
|
||||
routingContext: _spineRecord.routingContext,
|
||||
writer: _spineRecord.writer,
|
||||
);
|
||||
var success = false;
|
||||
try {
|
||||
// Write it back to the spine record
|
||||
_setSegmentKey(subkeyData, segment, segmentRec.recordKey);
|
||||
subkeyData =
|
||||
await _spineRecord.tryWriteBytes(subkeyData, subkey: subkey);
|
||||
// If the write was successful then we're done
|
||||
if (subkeyData == null) {
|
||||
// Return it
|
||||
success = true;
|
||||
return segmentRec;
|
||||
}
|
||||
} finally {
|
||||
if (!success) {
|
||||
await segmentRec.close();
|
||||
await segmentRec.delete();
|
||||
try {
|
||||
var subkeyData = await _spineRecord.get(subkey: subkey);
|
||||
subkeyData ??= _makeEmptySubkey();
|
||||
|
||||
while (true) {
|
||||
final segmentKey = _getSegmentKey(subkeyData!, segment);
|
||||
if (segmentKey == null) {
|
||||
// Create a shortarray segment
|
||||
final segmentRec = await DHTShortArray.create(
|
||||
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
|
||||
stride: _segmentStride,
|
||||
crypto: _spineRecord.crypto,
|
||||
parent: _spineRecord.key,
|
||||
routingContext: _spineRecord.routingContext,
|
||||
writer: _spineRecord.writer,
|
||||
);
|
||||
var success = false;
|
||||
try {
|
||||
// Write it back to the spine record
|
||||
_setSegmentKey(subkeyData, segment, segmentRec.recordKey);
|
||||
subkeyData =
|
||||
await _spineRecord.tryWriteBytes(subkeyData, subkey: subkey);
|
||||
// If the write was successful then we're done
|
||||
if (subkeyData == null) {
|
||||
// Return it
|
||||
success = true;
|
||||
return segmentRec;
|
||||
}
|
||||
} finally {
|
||||
if (!success) {
|
||||
await segmentRec.close();
|
||||
await segmentRec.delete();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Open a shortarray segment
|
||||
final segmentRec = await DHTShortArray.openWrite(
|
||||
segmentKey,
|
||||
_spineRecord.writer!,
|
||||
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
|
||||
crypto: _spineRecord.crypto,
|
||||
parent: _spineRecord.key,
|
||||
routingContext: _spineRecord.routingContext,
|
||||
);
|
||||
return segmentRec;
|
||||
}
|
||||
} else {
|
||||
// Open a shortarray segment
|
||||
final segmentRec = await DHTShortArray.openWrite(
|
||||
segmentKey,
|
||||
_spineRecord.writer!,
|
||||
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
|
||||
crypto: _spineRecord.crypto,
|
||||
parent: _spineRecord.key,
|
||||
routingContext: _spineRecord.routingContext,
|
||||
);
|
||||
return segmentRec;
|
||||
// Loop if we need to try again with the new data from the network
|
||||
}
|
||||
// Loop if we need to try again with the new data from the network
|
||||
} on DHTExceptionNotAvailable {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -364,34 +369,38 @@ class _DHTLogSpine {
|
|||
final segment = l.segment;
|
||||
|
||||
// See if we have the segment key locally
|
||||
TypedKey? segmentKey;
|
||||
var subkeyData = await _spineRecord.get(
|
||||
subkey: subkey, refreshMode: DHTRecordRefreshMode.local);
|
||||
if (subkeyData != null) {
|
||||
segmentKey = _getSegmentKey(subkeyData, segment);
|
||||
}
|
||||
if (segmentKey == null) {
|
||||
// If not, try from the network
|
||||
subkeyData = await _spineRecord.get(
|
||||
subkey: subkey, refreshMode: DHTRecordRefreshMode.network);
|
||||
if (subkeyData == null) {
|
||||
return null;
|
||||
try {
|
||||
TypedKey? segmentKey;
|
||||
var subkeyData = await _spineRecord.get(
|
||||
subkey: subkey, refreshMode: DHTRecordRefreshMode.local);
|
||||
if (subkeyData != null) {
|
||||
segmentKey = _getSegmentKey(subkeyData, segment);
|
||||
}
|
||||
segmentKey = _getSegmentKey(subkeyData, segment);
|
||||
if (segmentKey == null) {
|
||||
return null;
|
||||
// If not, try from the network
|
||||
subkeyData = await _spineRecord.get(
|
||||
subkey: subkey, refreshMode: DHTRecordRefreshMode.network);
|
||||
if (subkeyData == null) {
|
||||
return null;
|
||||
}
|
||||
segmentKey = _getSegmentKey(subkeyData, segment);
|
||||
if (segmentKey == null) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Open a shortarray segment
|
||||
final segmentRec = await DHTShortArray.openRead(
|
||||
segmentKey,
|
||||
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
|
||||
crypto: _spineRecord.crypto,
|
||||
parent: _spineRecord.key,
|
||||
routingContext: _spineRecord.routingContext,
|
||||
);
|
||||
return segmentRec;
|
||||
// Open a shortarray segment
|
||||
final segmentRec = await DHTShortArray.openRead(
|
||||
segmentKey,
|
||||
debugName: '${_spineRecord.debugName}_spine_${subkey}_$segment',
|
||||
crypto: _spineRecord.crypto,
|
||||
parent: _spineRecord.key,
|
||||
routingContext: _spineRecord.routingContext,
|
||||
);
|
||||
return segmentRec;
|
||||
} on DHTExceptionNotAvailable {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
_DHTLogSegmentLookup _lookupSegment(int segmentNumber) {
|
||||
|
|
|
@ -17,7 +17,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
}
|
||||
final lookup = await _spine.lookupPosition(pos);
|
||||
if (lookup == null) {
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
|
||||
// Write item to the segment
|
||||
|
@ -26,7 +26,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
final success =
|
||||
await write.tryWriteItem(lookup.pos, newValue, output: output);
|
||||
if (!success) {
|
||||
throw DHTExceptionOutdated();
|
||||
throw const DHTExceptionOutdated();
|
||||
}
|
||||
}));
|
||||
} on DHTExceptionOutdated {
|
||||
|
@ -45,12 +45,12 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
}
|
||||
final aLookup = await _spine.lookupPosition(aPos);
|
||||
if (aLookup == null) {
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
final bLookup = await _spine.lookupPosition(bPos);
|
||||
if (bLookup == null) {
|
||||
await aLookup.close();
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
|
||||
// Swap items in the segments
|
||||
|
@ -65,20 +65,20 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
if (bItem.value == null) {
|
||||
final aItem = await aWrite.get(aLookup.pos);
|
||||
if (aItem == null) {
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
await sb.operateWriteEventual((bWrite) async {
|
||||
final success = await bWrite
|
||||
.tryWriteItem(bLookup.pos, aItem, output: bItem);
|
||||
if (!success) {
|
||||
throw DHTExceptionOutdated();
|
||||
throw const DHTExceptionOutdated();
|
||||
}
|
||||
});
|
||||
}
|
||||
final success =
|
||||
await aWrite.tryWriteItem(aLookup.pos, bItem.value!);
|
||||
if (!success) {
|
||||
throw DHTExceptionOutdated();
|
||||
throw const DHTExceptionOutdated();
|
||||
}
|
||||
})));
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
await write.clear();
|
||||
} else if (lookup.pos != write.length) {
|
||||
// We should always be appending at the length
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
return write.add(value);
|
||||
}));
|
||||
|
@ -122,7 +122,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
|
||||
final lookup = await _spine.lookupPosition(insertPos + valueIdx);
|
||||
if (lookup == null) {
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
|
||||
final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos);
|
||||
|
@ -137,7 +137,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
await write.clear();
|
||||
} else if (lookup.pos != write.length) {
|
||||
// We should always be appending at the length
|
||||
throw DHTExceptionInvalidData();
|
||||
throw const DHTExceptionInvalidData();
|
||||
}
|
||||
return write.addAll(sublistValues);
|
||||
}));
|
||||
|
@ -152,7 +152,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
|
|||
await dws();
|
||||
|
||||
if (!success) {
|
||||
throw DHTExceptionOutdated();
|
||||
throw const DHTExceptionOutdated();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -134,11 +134,25 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
|||
return null;
|
||||
}
|
||||
|
||||
final valueData = await _routingContext.getDHTValue(key, subkey,
|
||||
forceRefresh: refreshMode._forceRefresh);
|
||||
var retry = kDHTTryAgainTries;
|
||||
ValueData? valueData;
|
||||
while (true) {
|
||||
try {
|
||||
valueData = await _routingContext.getDHTValue(key, subkey,
|
||||
forceRefresh: refreshMode._forceRefresh);
|
||||
break;
|
||||
} on VeilidAPIExceptionTryAgain {
|
||||
retry--;
|
||||
if (retry == 0) {
|
||||
throw const DHTExceptionNotAvailable();
|
||||
}
|
||||
await asyncSleep();
|
||||
}
|
||||
}
|
||||
if (valueData == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// See if this get resulted in a newer sequence number
|
||||
if (refreshMode == DHTRecordRefreshMode.update &&
|
||||
lastSeq != null &&
|
||||
|
@ -415,10 +429,10 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
|||
Timestamp? expiration,
|
||||
int? count}) async {
|
||||
// Set up watch requirements which will get picked up by the next tick
|
||||
final oldWatchState = watchState;
|
||||
watchState =
|
||||
final oldWatchState = _watchState;
|
||||
_watchState =
|
||||
_WatchState(subkeys: subkeys, expiration: expiration, count: count);
|
||||
if (oldWatchState != watchState) {
|
||||
if (oldWatchState != _watchState) {
|
||||
_sharedDHTRecordData.needsWatchStateUpdate = true;
|
||||
}
|
||||
}
|
||||
|
@ -476,8 +490,8 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
|||
/// Takes effect on the next DHTRecordPool tick
|
||||
Future<void> cancelWatch() async {
|
||||
// Tear down watch requirements
|
||||
if (watchState != null) {
|
||||
watchState = null;
|
||||
if (_watchState != null) {
|
||||
_watchState = null;
|
||||
_sharedDHTRecordData.needsWatchStateUpdate = true;
|
||||
}
|
||||
}
|
||||
|
@ -503,7 +517,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
|||
{required bool local,
|
||||
required Uint8List? data,
|
||||
required List<ValueSubkeyRange> subkeys}) {
|
||||
final ws = watchState;
|
||||
final ws = _watchState;
|
||||
if (ws != null) {
|
||||
final watchedSubkeys = ws.subkeys;
|
||||
if (watchedSubkeys == null) {
|
||||
|
@ -551,6 +565,5 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
|
|||
final _mutex = Mutex();
|
||||
int _openCount;
|
||||
StreamController<DHTRecordWatchChange>? _watchController;
|
||||
@internal
|
||||
_WatchState? watchState;
|
||||
_WatchState? _watchState;
|
||||
}
|
||||
|
|
|
@ -29,8 +29,7 @@ abstract class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
|
|||
_record = await open();
|
||||
_wantsCloseRecord = true;
|
||||
break;
|
||||
} on VeilidAPIExceptionKeyNotFound {
|
||||
} on VeilidAPIExceptionTryAgain {
|
||||
} on DHTExceptionNotAvailable {
|
||||
// Wait for a bit
|
||||
await asyncSleep();
|
||||
}
|
||||
|
|
|
@ -21,8 +21,11 @@ part 'dht_record_pool_private.dart';
|
|||
/// Maximum number of concurrent DHT operations to perform on the network
|
||||
const int kMaxDHTConcurrency = 8;
|
||||
|
||||
/// Number of times to retry a 'key not found'
|
||||
const int kDHTKeyNotFoundRetry = 3;
|
||||
/// Total number of times to try in a 'VeilidAPIExceptionKeyNotFound' loop
|
||||
const int kDHTKeyNotFoundTries = 3;
|
||||
|
||||
/// Total number of times to try in a 'VeilidAPIExceptionTryAgain' loop
|
||||
const int kDHTTryAgainTries = 3;
|
||||
|
||||
typedef DHTRecordPoolLogger = void Function(String message);
|
||||
|
||||
|
@ -280,12 +283,12 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
|||
for (final rec in openedRecordInfo.records) {
|
||||
// See if the watch had an expiration and if it has expired
|
||||
// otherwise the renewal will keep the same parameters
|
||||
final watchState = rec.watchState;
|
||||
final watchState = rec._watchState;
|
||||
if (watchState != null) {
|
||||
final exp = watchState.expiration;
|
||||
if (exp != null && exp.value < now) {
|
||||
// Has expiration, and it has expired, clear watch state
|
||||
rec.watchState = null;
|
||||
rec._watchState = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -392,7 +395,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
|||
|
||||
if (openedRecordInfo == null) {
|
||||
// Fresh open, just open the record
|
||||
var retry = kDHTKeyNotFoundRetry;
|
||||
var retry = kDHTKeyNotFoundTries;
|
||||
late final DHTRecordDescriptor recordDescriptor;
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -403,7 +406,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
|||
await asyncSleep();
|
||||
retry--;
|
||||
if (retry == 0) {
|
||||
rethrow;
|
||||
throw DHTExceptionNotAvailable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -705,7 +708,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
|||
var cancelWatch = true;
|
||||
|
||||
for (final rec in records) {
|
||||
final ws = rec.watchState;
|
||||
final ws = rec._watchState;
|
||||
if (ws != null) {
|
||||
cancelWatch = false;
|
||||
final wsCount = ws.count;
|
||||
|
@ -762,9 +765,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
|
|||
static void _updateWatchRealExpirations(Iterable<DHTRecord> records,
|
||||
Timestamp realExpiration, Timestamp renewalTime) {
|
||||
for (final rec in records) {
|
||||
final ws = rec.watchState;
|
||||
final ws = rec._watchState;
|
||||
if (ws != null) {
|
||||
rec.watchState = _WatchState(
|
||||
rec._watchState = _WatchState(
|
||||
subkeys: ws.subkeys,
|
||||
expiration: ws.expiration,
|
||||
count: ws.count,
|
||||
|
|
|
@ -68,7 +68,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray> {
|
|||
}
|
||||
});
|
||||
return dhtShortArray;
|
||||
} on Exception catch (_) {
|
||||
} on Exception {
|
||||
await dhtRecord.close();
|
||||
await pool.deleteRecord(dhtRecord.key);
|
||||
rethrow;
|
||||
|
@ -89,7 +89,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray> {
|
|||
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
|
||||
await dhtShortArray._head.operate((head) => head._loadHead());
|
||||
return dhtShortArray;
|
||||
} on Exception catch (_) {
|
||||
} on Exception {
|
||||
await dhtRecord.close();
|
||||
rethrow;
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray> {
|
|||
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
|
||||
await dhtShortArray._head.operate((head) => head._loadHead());
|
||||
return dhtShortArray;
|
||||
} on Exception catch (_) {
|
||||
} on Exception {
|
||||
await dhtRecord.close();
|
||||
rethrow;
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import 'package:fast_immutable_collections/fast_immutable_collections.dart';
|
|||
import 'package:meta/meta.dart';
|
||||
|
||||
import '../../../veilid_support.dart';
|
||||
import '../interfaces/refreshable_cubit.dart';
|
||||
|
||||
@immutable
|
||||
class DHTShortArrayElementState<T> extends Equatable {
|
||||
|
@ -24,7 +25,7 @@ typedef DHTShortArrayState<T> = AsyncValue<IList<DHTShortArrayElementState<T>>>;
|
|||
typedef DHTShortArrayBusyState<T> = BlocBusyState<DHTShortArrayState<T>>;
|
||||
|
||||
class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
|
||||
with BlocBusyWrapper<DHTShortArrayState<T>> {
|
||||
with BlocBusyWrapper<DHTShortArrayState<T>>, RefreshableCubit {
|
||||
DHTShortArrayCubit({
|
||||
required Future<DHTShortArray> Function() open,
|
||||
required T Function(List<int> data) decodeElement,
|
||||
|
@ -39,7 +40,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
|
|||
_shortArray = await open();
|
||||
_wantsCloseRecord = true;
|
||||
break;
|
||||
} on VeilidAPIExceptionTryAgain {
|
||||
} on DHTExceptionNotAvailable {
|
||||
// Wait for a bit
|
||||
await asyncSleep();
|
||||
}
|
||||
|
@ -57,6 +58,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
|
|||
});
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> refresh({bool forceRefresh = false}) async {
|
||||
await _initWait();
|
||||
await _refreshNoWait(forceRefresh: forceRefresh);
|
||||
|
@ -87,9 +89,13 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
|
|||
.toIList();
|
||||
return allItems;
|
||||
});
|
||||
if (newState != null) {
|
||||
emit(AsyncValue.data(newState));
|
||||
if (newState == null) {
|
||||
// Mark us as needing refresh
|
||||
setWantsRefresh();
|
||||
return;
|
||||
}
|
||||
emit(AsyncValue.data(newState));
|
||||
setRefreshed();
|
||||
} on Exception catch (e) {
|
||||
emit(AsyncValue.error(e));
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ class _DHTShortArrayHead {
|
|||
if (!await _writeHead()) {
|
||||
// Failed to write head means head got overwritten so write should
|
||||
// be considered failed
|
||||
throw DHTExceptionOutdated();
|
||||
throw const DHTExceptionOutdated();
|
||||
}
|
||||
|
||||
onUpdatedHead?.call();
|
||||
|
|
|
@ -17,21 +17,25 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
|
|||
throw IndexError.withLength(pos, length);
|
||||
}
|
||||
|
||||
final lookup = await _head.lookupPosition(pos, false);
|
||||
try {
|
||||
final lookup = await _head.lookupPosition(pos, false);
|
||||
|
||||
final refresh = forceRefresh || _head.positionNeedsRefresh(pos);
|
||||
final outSeqNum = Output<int>();
|
||||
final out = lookup.record.get(
|
||||
subkey: lookup.recordSubkey,
|
||||
refreshMode: refresh
|
||||
? DHTRecordRefreshMode.network
|
||||
: DHTRecordRefreshMode.cached,
|
||||
outSeqNum: outSeqNum);
|
||||
if (outSeqNum.value != null) {
|
||||
_head.updatePositionSeq(pos, false, outSeqNum.value!);
|
||||
final refresh = forceRefresh || _head.positionNeedsRefresh(pos);
|
||||
final outSeqNum = Output<int>();
|
||||
final out = await lookup.record.get(
|
||||
subkey: lookup.recordSubkey,
|
||||
refreshMode: refresh
|
||||
? DHTRecordRefreshMode.network
|
||||
: DHTRecordRefreshMode.cached,
|
||||
outSeqNum: outSeqNum);
|
||||
if (outSeqNum.value != null) {
|
||||
_head.updatePositionSeq(pos, false, outSeqNum.value!);
|
||||
}
|
||||
return out;
|
||||
} on DHTExceptionNotAvailable {
|
||||
// If any element is not available, return null
|
||||
return null;
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
(int, int) _clampStartLen(int start, int? len) {
|
||||
|
@ -56,11 +60,13 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
|
|||
|
||||
final chunks = Iterable<int>.generate(length)
|
||||
.slices(kMaxDHTConcurrency)
|
||||
.map((chunk) =>
|
||||
chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh)));
|
||||
.map((chunk) => chunk
|
||||
.map((pos) async => get(pos + start, forceRefresh: forceRefresh)));
|
||||
|
||||
for (final chunk in chunks) {
|
||||
final elems = await chunk.wait;
|
||||
|
||||
// If any element was unavailable, return null
|
||||
if (elems.contains(null)) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
class DHTExceptionOutdated implements Exception {
|
||||
DHTExceptionOutdated(
|
||||
const DHTExceptionOutdated(
|
||||
[this.cause = 'operation failed due to newer dht value']);
|
||||
String cause;
|
||||
final String cause;
|
||||
}
|
||||
|
||||
class DHTExceptionInvalidData implements Exception {
|
||||
DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']);
|
||||
String cause;
|
||||
const DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']);
|
||||
final String cause;
|
||||
}
|
||||
|
||||
class DHTExceptionCancelled implements Exception {
|
||||
DHTExceptionCancelled([this.cause = 'operation was cancelled']);
|
||||
String cause;
|
||||
const DHTExceptionCancelled([this.cause = 'operation was cancelled']);
|
||||
final String cause;
|
||||
}
|
||||
|
||||
class DHTExceptionNotAvailable implements Exception {
|
||||
const DHTExceptionNotAvailable(
|
||||
[this.cause = 'request could not be completed at this time']);
|
||||
final String cause;
|
||||
}
|
||||
|
|
|
@ -6,3 +6,4 @@ export 'dht_random_read.dart';
|
|||
export 'dht_random_write.dart';
|
||||
export 'dht_truncate.dart';
|
||||
export 'exceptions.dart';
|
||||
export 'refreshable_cubit.dart';
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
abstract mixin class RefreshableCubit {
|
||||
Future<void> refresh({bool forceRefresh = false});
|
||||
|
||||
void setWantsRefresh() {
|
||||
_wantsRefresh = true;
|
||||
}
|
||||
|
||||
void setRefreshed() {
|
||||
_wantsRefresh = false;
|
||||
}
|
||||
|
||||
bool get wantsRefresh => _wantsRefresh;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
bool _wantsRefresh = false;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue