mirror of
https://gitlab.com/veilid/veilidchat.git
synced 2024-10-01 06:55:46 -04:00
more shortarray work
This commit is contained in:
parent
5a8b1caf93
commit
7b64307987
@ -1,6 +1,7 @@
|
|||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:typed_data';
|
import 'dart:typed_data';
|
||||||
|
|
||||||
|
import 'package:async_tools/async_tools.dart';
|
||||||
import 'package:mutex/mutex.dart';
|
import 'package:mutex/mutex.dart';
|
||||||
import 'package:protobuf/protobuf.dart';
|
import 'package:protobuf/protobuf.dart';
|
||||||
|
|
||||||
@ -8,6 +9,8 @@ import '../../../veilid_support.dart';
|
|||||||
import '../../proto/proto.dart' as proto;
|
import '../../proto/proto.dart' as proto;
|
||||||
|
|
||||||
part 'dht_short_array_head.dart';
|
part 'dht_short_array_head.dart';
|
||||||
|
part 'dht_short_array_read.dart';
|
||||||
|
part 'dht_short_array_write.dart';
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@ -128,9 +131,6 @@ class DHTShortArray {
|
|||||||
/// Get the record pointer foir this shortarray
|
/// Get the record pointer foir this shortarray
|
||||||
OwnedDHTRecordPointer get recordPointer => _head.recordPointer;
|
OwnedDHTRecordPointer get recordPointer => _head.recordPointer;
|
||||||
|
|
||||||
/// Returns the number of elements in the DHTShortArray
|
|
||||||
int get length => _head.length;
|
|
||||||
|
|
||||||
/// Free all resources for the DHTShortArray
|
/// Free all resources for the DHTShortArray
|
||||||
Future<void> close() async {
|
Future<void> close() async {
|
||||||
await _watchController?.close();
|
await _watchController?.close();
|
||||||
@ -168,250 +168,22 @@ class DHTShortArray {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the item at position 'pos' in the DHTShortArray. If 'forceRefresh'
|
/// Runs a closure allowing read-only access to the shortarray
|
||||||
/// is specified, the network will always be checked for newer values
|
Future<T> operate<T>(Future<T> Function(DHTShortArrayRead) closure) async =>
|
||||||
/// rather than returning the existing locally stored copy of the elements.
|
|
||||||
Future<Uint8List?> getItem(int pos, {bool forceRefresh = false}) async =>
|
|
||||||
_head.operate(
|
|
||||||
(head) async => _getItemInner(head, pos, forceRefresh: forceRefresh));
|
|
||||||
|
|
||||||
Future<Uint8List?> _getItemInner(_DHTShortArrayHead head, int pos,
|
|
||||||
{bool forceRefresh = false}) async {
|
|
||||||
if (pos < 0 || pos >= length) {
|
|
||||||
throw IndexError.withLength(pos, length);
|
|
||||||
}
|
|
||||||
|
|
||||||
final (record, recordSubkey) = await head.lookupPosition(pos);
|
|
||||||
|
|
||||||
final refresh = forceRefresh || head.positionNeedsRefresh(pos);
|
|
||||||
final out = record.get(subkey: recordSubkey, forceRefresh: refresh);
|
|
||||||
await head.updatePositionSeq(pos, false);
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return a list of all of the items in the DHTShortArray. If 'forceRefresh'
|
|
||||||
/// is specified, the network will always be checked for newer values
|
|
||||||
/// rather than returning the existing locally stored copy of the elements.
|
|
||||||
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) async =>
|
|
||||||
_head.operate((head) async {
|
_head.operate((head) async {
|
||||||
final out = <Uint8List>[];
|
final reader = _DHTShortArrayRead._(head);
|
||||||
|
return closure(reader);
|
||||||
for (var pos = 0; pos < head.length; pos++) {
|
|
||||||
final elem =
|
|
||||||
await _getItemInner(head, pos, forceRefresh: forceRefresh);
|
|
||||||
if (elem == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
out.add(elem);
|
|
||||||
}
|
|
||||||
|
|
||||||
return out;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
/// Convenience function:
|
/// Runs a closure allowing read-write access to the shortarray
|
||||||
/// Like getItem but also parses the returned element as JSON
|
/// Returns (result, true) of the closure if the write could be performed
|
||||||
Future<T?> getItemJson<T>(T Function(dynamic) fromJson, int pos,
|
/// Returns (null, false) if the write could not be performed at this time
|
||||||
{bool forceRefresh = false}) =>
|
Future<(T?, bool)> operateWrite<T>(
|
||||||
getItem(pos, forceRefresh: forceRefresh)
|
Future<T> Function(DHTShortArrayWrite) closure) async =>
|
||||||
.then((out) => jsonDecodeOptBytes(fromJson, out));
|
_head.operateWrite((head) async {
|
||||||
|
final writer = _DHTShortArrayWrite._(head);
|
||||||
/// Convenience function:
|
return closure(writer);
|
||||||
/// Like getAllItems but also parses the returned elements as JSON
|
});
|
||||||
Future<List<T>?> getAllItemsJson<T>(T Function(dynamic) fromJson,
|
|
||||||
{bool forceRefresh = false}) =>
|
|
||||||
getAllItems(forceRefresh: forceRefresh)
|
|
||||||
.then((out) => out?.map(fromJson).toList());
|
|
||||||
|
|
||||||
/// Convenience function:
|
|
||||||
/// Like getItem but also parses the returned element as a protobuf object
|
|
||||||
Future<T?> getItemProtobuf<T extends GeneratedMessage>(
|
|
||||||
T Function(List<int>) fromBuffer, int pos,
|
|
||||||
{bool forceRefresh = false}) =>
|
|
||||||
getItem(pos, forceRefresh: forceRefresh)
|
|
||||||
.then((out) => (out == null) ? null : fromBuffer(out));
|
|
||||||
|
|
||||||
/// Convenience function:
|
|
||||||
/// Like getAllItems but also parses the returned elements as protobuf objects
|
|
||||||
Future<List<T>?> getAllItemsProtobuf<T extends GeneratedMessage>(
|
|
||||||
T Function(List<int>) fromBuffer,
|
|
||||||
{bool forceRefresh = false}) =>
|
|
||||||
getAllItems(forceRefresh: forceRefresh)
|
|
||||||
.then((out) => out?.map(fromBuffer).toList());
|
|
||||||
|
|
||||||
/// Try to add an item to the end of the DHTShortArray. Return true if the
|
|
||||||
/// element was successfully added, and false if the state changed before
|
|
||||||
/// the element could be added or a newer value was found on the network.
|
|
||||||
/// This may throw an exception if the number elements added exceeds the
|
|
||||||
/// built-in limit of 'maxElements = 256' entries.
|
|
||||||
Future<bool> tryAddItem(Uint8List value) async {
|
|
||||||
final out = await _head
|
|
||||||
.operateWrite((head) async => _tryAddItemInner(head, value)) ??
|
|
||||||
false;
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<bool> _tryAddItemInner(
|
|
||||||
_DHTShortArrayHead head, Uint8List value) async {
|
|
||||||
// Allocate empty index at the end of the list
|
|
||||||
final pos = head.length;
|
|
||||||
head.allocateIndex(pos);
|
|
||||||
|
|
||||||
// Write item
|
|
||||||
final (_, wasSet) = await _tryWriteItemInner(head, pos, value);
|
|
||||||
if (!wasSet) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get sequence number written
|
|
||||||
await head.updatePositionSeq(pos, true);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to insert an item as position 'pos' of the DHTShortArray.
|
|
||||||
/// Return true if the element was successfully inserted, and false if the
|
|
||||||
/// state changed before the element could be inserted or a newer value was
|
|
||||||
/// found on the network.
|
|
||||||
/// This may throw an exception if the number elements added exceeds the
|
|
||||||
/// built-in limit of 'maxElements = 256' entries.
|
|
||||||
Future<bool> tryInsertItem(int pos, Uint8List value) async {
|
|
||||||
final out = await _head.operateWrite(
|
|
||||||
(head) async => _tryInsertItemInner(head, pos, value)) ??
|
|
||||||
false;
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<bool> _tryInsertItemInner(
|
|
||||||
_DHTShortArrayHead head, int pos, Uint8List value) async {
|
|
||||||
// Allocate empty index at position
|
|
||||||
head.allocateIndex(pos);
|
|
||||||
|
|
||||||
// Write item
|
|
||||||
final (_, wasSet) = await _tryWriteItemInner(head, pos, value);
|
|
||||||
if (!wasSet) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get sequence number written
|
|
||||||
await head.updatePositionSeq(pos, true);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to swap items at position 'aPos' and 'bPos' in the DHTShortArray.
|
|
||||||
/// Return true if the elements were successfully swapped, and false if the
|
|
||||||
/// state changed before the elements could be swapped or newer values were
|
|
||||||
/// found on the network.
|
|
||||||
/// This may throw an exception if either of the positions swapped exceed
|
|
||||||
/// the length of the list
|
|
||||||
|
|
||||||
Future<bool> trySwapItem(int aPos, int bPos) async {
|
|
||||||
final out = await _head.operateWrite(
|
|
||||||
(head) async => _trySwapItemInner(head, aPos, bPos)) ??
|
|
||||||
false;
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<bool> _trySwapItemInner(
|
|
||||||
_DHTShortArrayHead head, int aPos, int bPos) async {
|
|
||||||
// Swap indices
|
|
||||||
head.swapIndex(aPos, bPos);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to remove an item at position 'pos' in the DHTShortArray.
|
|
||||||
/// Return the element if it was successfully removed, and null if the
|
|
||||||
/// state changed before the elements could be removed or newer values were
|
|
||||||
/// found on the network.
|
|
||||||
/// This may throw an exception if the position removed exceeeds the length of
|
|
||||||
/// the list.
|
|
||||||
|
|
||||||
Future<Uint8List?> tryRemoveItem(int pos) async {
|
|
||||||
final out =
|
|
||||||
_head.operateWrite((head) async => _tryRemoveItemInner(head, pos));
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<Uint8List> _tryRemoveItemInner(
|
|
||||||
_DHTShortArrayHead head, int pos) async {
|
|
||||||
final (record, recordSubkey) = await head.lookupPosition(pos);
|
|
||||||
final result = await record.get(subkey: recordSubkey);
|
|
||||||
if (result == null) {
|
|
||||||
throw StateError('Element does not exist');
|
|
||||||
}
|
|
||||||
head.freeIndex(pos);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convenience function:
|
|
||||||
/// Like removeItem but also parses the returned element as JSON
|
|
||||||
Future<T?> tryRemoveItemJson<T>(
|
|
||||||
T Function(dynamic) fromJson,
|
|
||||||
int pos,
|
|
||||||
) =>
|
|
||||||
tryRemoveItem(pos).then((out) => jsonDecodeOptBytes(fromJson, out));
|
|
||||||
|
|
||||||
/// Convenience function:
|
|
||||||
/// Like removeItem but also parses the returned element as JSON
|
|
||||||
Future<T?> tryRemoveItemProtobuf<T extends GeneratedMessage>(
|
|
||||||
T Function(List<int>) fromBuffer, int pos) =>
|
|
||||||
getItem(pos).then((out) => (out == null) ? null : fromBuffer(out));
|
|
||||||
|
|
||||||
/// Try to remove all items in the DHTShortArray.
|
|
||||||
/// Return true if it was successfully cleared, and false if the
|
|
||||||
/// state changed before the elements could be cleared or newer values were
|
|
||||||
/// found on the network.
|
|
||||||
Future<bool> tryClear() async {
|
|
||||||
final out =
|
|
||||||
await _head.operateWrite((head) async => _tryClearInner(head)) ?? false;
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<bool> _tryClearInner(_DHTShortArrayHead head) async {
|
|
||||||
head.clearIndex();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to set an item at position 'pos' of the DHTShortArray.
|
|
||||||
/// If the set was successful this returns:
|
|
||||||
/// * The prior contents of the element, or null if there was no value yet
|
|
||||||
/// * A boolean true
|
|
||||||
/// If the set was found a newer value on the network:
|
|
||||||
/// * The newer value of the element, or null if the head record
|
|
||||||
/// changed.
|
|
||||||
/// * A boolean false
|
|
||||||
/// This may throw an exception if the position exceeds the built-in limit of
|
|
||||||
/// 'maxElements = 256' entries.
|
|
||||||
Future<(Uint8List?, bool)> tryWriteItem(int pos, Uint8List newValue) async {
|
|
||||||
final out = await _head
|
|
||||||
.operateWrite((head) async => _tryWriteItemInner(head, pos, newValue));
|
|
||||||
if (out == null) {
|
|
||||||
return (null, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<(Uint8List?, bool)> _tryWriteItemInner(
|
|
||||||
_DHTShortArrayHead head, int pos, Uint8List newValue) async {
|
|
||||||
if (pos < 0 || pos >= head.length) {
|
|
||||||
throw IndexError.withLength(pos, head.length);
|
|
||||||
}
|
|
||||||
final (record, recordSubkey) = await head.lookupPosition(pos);
|
|
||||||
final oldValue = await record.get(subkey: recordSubkey);
|
|
||||||
final result = await record.tryWriteBytes(newValue, subkey: recordSubkey);
|
|
||||||
if (result != null) {
|
|
||||||
// A result coming back means the element was overwritten already
|
|
||||||
return (result, false);
|
|
||||||
}
|
|
||||||
return (oldValue, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set an item at position 'pos' of the DHTShortArray. Retries until the
|
/// Set an item at position 'pos' of the DHTShortArray. Retries until the
|
||||||
/// value being written is successfully made the newest value of the element.
|
/// value being written is successfully made the newest value of the element.
|
||||||
@ -449,28 +221,6 @@ class DHTShortArray {
|
|||||||
}, timeout: timeout);
|
}, timeout: timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convenience function:
|
|
||||||
/// Like tryWriteItem but also encodes the input value as JSON and parses the
|
|
||||||
/// returned element as JSON
|
|
||||||
Future<(T?, bool)> tryWriteItemJson<T>(
|
|
||||||
T Function(dynamic) fromJson,
|
|
||||||
int pos,
|
|
||||||
T newValue,
|
|
||||||
) =>
|
|
||||||
tryWriteItem(pos, jsonEncodeBytes(newValue))
|
|
||||||
.then((out) => (jsonDecodeOptBytes(fromJson, out.$1), out.$2));
|
|
||||||
|
|
||||||
/// Convenience function:
|
|
||||||
/// Like tryWriteItem but also encodes the input value as a protobuf object
|
|
||||||
/// and parses the returned element as a protobuf object
|
|
||||||
Future<(T?, bool)> tryWriteItemProtobuf<T extends GeneratedMessage>(
|
|
||||||
T Function(List<int>) fromBuffer,
|
|
||||||
int pos,
|
|
||||||
T newValue,
|
|
||||||
) =>
|
|
||||||
tryWriteItem(pos, newValue.writeToBuffer()).then(
|
|
||||||
(out) => ((out.$1 == null ? null : fromBuffer(out.$1!)), out.$2));
|
|
||||||
|
|
||||||
/// Convenience function:
|
/// Convenience function:
|
||||||
/// Like eventualWriteItem but also encodes the input value as JSON and parses
|
/// Like eventualWriteItem but also encodes the input value as JSON and parses
|
||||||
/// the returned element as JSON
|
/// the returned element as JSON
|
||||||
|
@ -57,7 +57,7 @@ class _DHTShortArrayHead {
|
|||||||
return closure(this);
|
return closure(this);
|
||||||
});
|
});
|
||||||
|
|
||||||
Future<T?> operateWrite<T>(
|
Future<(T?, bool)> operateWrite<T>(
|
||||||
Future<T> Function(_DHTShortArrayHead) closure) async =>
|
Future<T> Function(_DHTShortArrayHead) closure) async =>
|
||||||
_headMutex.protect(() async {
|
_headMutex.protect(() async {
|
||||||
final oldLinkedRecords = List.of(_linkedRecords);
|
final oldLinkedRecords = List.of(_linkedRecords);
|
||||||
@ -70,11 +70,11 @@ class _DHTShortArrayHead {
|
|||||||
if (!await _writeHead()) {
|
if (!await _writeHead()) {
|
||||||
// Failed to write head means head got overwritten so write should
|
// Failed to write head means head got overwritten so write should
|
||||||
// be considered failed
|
// be considered failed
|
||||||
return null;
|
return (null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
onUpdatedHead?.call();
|
onUpdatedHead?.call();
|
||||||
return out;
|
return (out, true);
|
||||||
} on Exception {
|
} on Exception {
|
||||||
// Exception means state needs to be reverted
|
// Exception means state needs to be reverted
|
||||||
_linkedRecords = oldLinkedRecords;
|
_linkedRecords = oldLinkedRecords;
|
||||||
|
Loading…
Reference in New Issue
Block a user