tests pass

This commit is contained in:
Christien Rioux 2024-05-21 15:19:27 -04:00
parent ed893852a2
commit ff1ea709a8
14 changed files with 178 additions and 75 deletions

View File

@ -1,10 +1,6 @@
//@Timeout(Duration(seconds: 240))
//library veilid_support_integration_test;
import 'package:flutter/foundation.dart';
import 'package:test/test.dart';
import 'package:integration_test/integration_test.dart';
import 'package:test/test.dart';
import 'package:veilid_test/veilid_test.dart';
import 'fixtures/fixtures.dart';
@ -26,7 +22,7 @@ void main() {
tickerFixture: tickerFixture,
updateProcessorFixture: updateProcessorFixture);
group('Started Tests', () {
group(timeout: const Timeout(Duration(seconds: 240)), 'Started Tests', () {
setUpAll(veilidFixture.setUp);
tearDownAll(veilidFixture.tearDown);
tearDownAll(() {
@ -74,9 +70,11 @@ void main() {
for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) {
test('create log stride=$stride',
makeTestDHTLogCreateDelete(stride: stride));
test('add/truncate log stride=$stride',
makeTestDHTLogAddTruncate(stride: stride),
timeout: const Timeout(Duration(seconds: 480)));
test(
timeout: const Timeout(Duration(seconds: 480)),
'add/truncate log stride=$stride',
makeTestDHTLogAddTruncate(stride: stride),
);
}
});
});

View File

@ -1,6 +1,6 @@
import 'dart:convert';
import 'package:flutter_test/flutter_test.dart';
import 'package:test/test.dart';
import 'package:veilid_support/veilid_support.dart';
Future<void> Function() makeTestDHTLogCreateDelete({required int stride}) =>
@ -61,7 +61,7 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
print('adding\n');
{
final res = await dlog.operateAppend((w) async {
const chunk = 50;
const chunk = 25;
for (var n = 0; n < dataset.length; n += chunk) {
print('$n-${n + chunk - 1} ');
final success =

View File

@ -1,7 +1,7 @@
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:test/test.dart';
import 'package:veilid_support/veilid_support.dart';
Future<void> testDHTRecordPoolCreate() async {

View File

@ -1,6 +1,6 @@
import 'dart:convert';
import 'package:flutter_test/flutter_test.dart';
import 'package:test/test.dart';
import 'package:veilid_support/veilid_support.dart';
Future<void> Function() makeTestDHTShortArrayCreateDelete(
@ -118,7 +118,10 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
//print('clear\n');
{
await arr.operateWrite((w) async => w.clear());
await arr.operateWriteEventual((w) async {
await w.clear();
return true;
});
}
//print('get all\n');

View File

@ -196,7 +196,7 @@ packages:
source: sdk
version: "0.0.0"
flutter_test:
dependency: "direct dev"
dependency: transitive
description: flutter
source: sdk
version: "0.0.0"

View File

@ -15,8 +15,6 @@ dependencies:
dev_dependencies:
async_tools: ^0.1.1
flutter_test:
sdk: flutter
integration_test:
sdk: flutter
lint_hard: ^4.0.0

View File

@ -42,7 +42,7 @@ class DHTLogUpdate extends Equatable {
/// * The head and tail position of the log
/// - subkeyIdx = pos / recordsPerSubkey
/// - recordIdx = pos % recordsPerSubkey
class DHTLog implements DHTOpenable<DHTLog> {
class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
////////////////////////////////////////////////////////////////
// Constructors
@ -160,12 +160,16 @@ class DHTLog implements DHTOpenable<DHTLog> {
);
////////////////////////////////////////////////////////////////////////////
// DHTOpenable
// DHTCloseable
/// Check if the DHTLog is open
@override
bool get isOpen => _openCount > 0;
/// The type of the openable scope
@override
FutureOr<DHTLog> scoped() => this;
/// Add a reference to this log
@override
Future<DHTLog> ref() async => _mutex.protect(() async {

View File

@ -17,7 +17,7 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
}
// Write item to the segment
return lookup.shortArray.scope((sa) => sa.operateWrite((write) async {
return lookup.scope((sa) => sa.operateWrite((write) async {
// If this a new segment, then clear it in case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
@ -51,18 +51,17 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
final sublistValues = values.sublist(valueIdx, valueIdx + sacount);
dws.add(() async {
final ok = await lookup.shortArray
.scope((sa) => sa.operateWrite((write) async {
// If this a new segment, then clear it in
// case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw StateError('appending should be at the end');
}
return write.tryAddItems(sublistValues);
}));
final ok = await lookup.scope((sa) => sa.operateWrite((write) async {
// If this a new segment, then clear it in
// case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw StateError('appending should be at the end');
}
return write.tryAddItems(sublistValues);
}));
if (!ok) {
success = false;
}
@ -71,7 +70,7 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
valueIdx += sacount;
}
await dws(chunkSize: maxDHTConcurrency);
await dws();
return success;
}

View File

@ -19,7 +19,7 @@ class _DHTLogRead implements DHTRandomRead {
return null;
}
return lookup.shortArray.scope((sa) => sa.operate(
return lookup.scope((sa) => sa.operate(
(read) => read.getItem(lookup.pos, forceRefresh: forceRefresh)));
}
@ -71,7 +71,7 @@ class _DHTLogRead implements DHTRandomRead {
// Check each segment for offline positions
var foundOffline = false;
await lookup.shortArray.scope((sa) => sa.operate((read) async {
await lookup.scope((sa) => sa.operate((read) async {
final segmentOffline = await read.getOfflinePositions();
// For each shortarray segment go through their segment positions

View File

@ -1,9 +1,58 @@
part of 'dht_log.dart';
class DHTLogPositionLookup {
const DHTLogPositionLookup({required this.shortArray, required this.pos});
final DHTShortArray shortArray;
class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> {
_DHTLogPosition._({
required _DHTLogSpine dhtLogSpine,
required DHTShortArray shortArray,
required this.pos,
required int segmentNumber,
}) : _segmentShortArray = shortArray,
_dhtLogSpine = dhtLogSpine,
_segmentNumber = segmentNumber;
final int pos;
final _DHTLogSpine _dhtLogSpine;
final DHTShortArray _segmentShortArray;
var _openCount = 1;
final int _segmentNumber;
final Mutex _mutex = Mutex();
/// Check if the DHTLogPosition is open
@override
bool get isOpen => _openCount > 0;
/// The type of the openable scope
@override
FutureOr<DHTShortArray> scoped() => _segmentShortArray;
/// Add a reference to this log
@override
Future<_DHTLogPosition> ref() async => _mutex.protect(() async {
_openCount++;
return this;
});
/// Free all resources for the DHTLogPosition
@override
Future<void> close() async => _mutex.protect(() async {
if (_openCount == 0) {
throw StateError('already closed');
}
_openCount--;
if (_openCount != 0) {
return;
}
await _dhtLogSpine._segmentClosed(_segmentNumber);
});
}
class _OpenedSegment {
_OpenedSegment._({
required this.shortArray,
});
final DHTShortArray shortArray;
int openCount = 1;
}
class _DHTLogSegmentLookup extends Equatable {
@ -32,6 +81,7 @@ class _DHTLogSpine {
_head = head,
_tail = tail,
_segmentStride = stride,
_openedSegments = {},
_spineCache = [];
// Create a new spine record and push it to the network
@ -85,6 +135,8 @@ class _DHTLogSpine {
futures.add(sc.close());
}
await Future.wait(futures);
assert(_openedSegments.isEmpty, 'should have closed all segments by now');
});
}
@ -247,7 +299,7 @@ class _DHTLogSpine {
// Lookup what subkey and segment subrange has this position's segment
// shortarray
final l = lookupSegment(segmentNumber);
final l = _lookupSegment(segmentNumber);
final subkey = l.subkey;
final segment = l.segment;
@ -304,7 +356,7 @@ class _DHTLogSpine {
// Lookup what subkey and segment subrange has this position's segment
// shortarray
final l = lookupSegment(segmentNumber);
final l = _lookupSegment(segmentNumber);
final subkey = l.subkey;
final segment = l.segment;
@ -381,7 +433,7 @@ class _DHTLogSpine {
return segment;
}
_DHTLogSegmentLookup lookupSegment(int segmentNumber) {
_DHTLogSegmentLookup _lookupSegment(int segmentNumber) {
assert(_spineMutex.isLocked, 'should be in mutex here');
if (segmentNumber < 0) {
@ -400,30 +452,60 @@ class _DHTLogSpine {
///////////////////////////////////////////
// API for public interfaces
Future<DHTLogPositionLookup?> lookupPosition(int pos) async {
Future<_DHTLogPosition?> lookupPosition(int pos) async {
assert(_spineMutex.isLocked, 'should be locked');
return _spineCacheMutex.protect(() async {
// Check if our position is in bounds
final endPos = length;
if (pos < 0 || pos >= endPos) {
throw IndexError.withLength(pos, endPos);
}
// Check if our position is in bounds
final endPos = length;
if (pos < 0 || pos >= endPos) {
throw IndexError.withLength(pos, endPos);
}
// Calculate absolute position, ring-buffer style
final absolutePosition = (_head + pos) % _positionLimit;
// Calculate absolute position, ring-buffer style
final absolutePosition = (_head + pos) % _positionLimit;
// Determine the segment number and position within the segment
final segmentNumber = absolutePosition ~/ DHTShortArray.maxElements;
final segmentPos = absolutePosition % DHTShortArray.maxElements;
// Determine the segment number and position within the segment
final segmentNumber = absolutePosition ~/ DHTShortArray.maxElements;
final segmentPos = absolutePosition % DHTShortArray.maxElements;
// Get the segment shortArray
final openedSegment = _openedSegments[segmentNumber];
late final DHTShortArray shortArray;
if (openedSegment != null) {
openedSegment.openCount++;
shortArray = openedSegment.shortArray;
} else {
final newShortArray = (_spineRecord.writer == null)
? await _openSegment(segmentNumber)
: await _openOrCreateSegment(segmentNumber);
if (newShortArray == null) {
return null;
}
// Get the segment shortArray
final shortArray = (_spineRecord.writer == null)
? await _openSegment(segmentNumber)
: await _openOrCreateSegment(segmentNumber);
if (shortArray == null) {
return null;
}
return DHTLogPositionLookup(shortArray: shortArray, pos: segmentPos);
_openedSegments[segmentNumber] =
_OpenedSegment._(shortArray: newShortArray);
shortArray = newShortArray;
}
return _DHTLogPosition._(
dhtLogSpine: this,
shortArray: shortArray,
pos: segmentPos,
segmentNumber: segmentNumber);
});
}
Future<void> _segmentClosed(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be locked');
await _spineCacheMutex.protect(() async {
final os = _openedSegments[segmentNumber]!;
os.openCount--;
if (os.openCount == 0) {
_openedSegments.remove(segmentNumber);
await os.shortArray.close();
}
});
}
void allocateTail(int count) {
@ -479,7 +561,7 @@ class _DHTLogSpine {
segmentNumber++) {
// Lookup what subkey and segment subrange has this position's segment
// shortarray
final l = lookupSegment(segmentNumber);
final l = _lookupSegment(segmentNumber);
final subkey = l.subkey;
final segment = l.segment;
@ -608,6 +690,8 @@ class _DHTLogSpine {
// Spine DHT record
final DHTRecord _spineRecord;
// Segment stride to use for spine elements
final int _segmentStride;
// Position of the start of the log (oldest items)
int _head;
@ -616,8 +700,8 @@ class _DHTLogSpine {
// LRU cache of DHT spine elements accessed recently
// Pair of position and associated shortarray segment
final Mutex _spineCacheMutex = Mutex();
final List<(int, DHTShortArray)> _spineCache;
final Map<int, _OpenedSegment> _openedSegments;
static const int _spineCacheLength = 3;
// Segment stride to use for spine elements
final int _segmentStride;
}

View File

@ -36,7 +36,7 @@ enum DHTRecordRefreshMode {
/////////////////////////////////////////////////
class DHTRecord implements DHTOpenable<DHTRecord> {
class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
DHTRecord._(
{required VeilidRoutingContext routingContext,
required SharedDHTRecordData sharedDHTRecordData,
@ -52,12 +52,16 @@ class DHTRecord implements DHTOpenable<DHTRecord> {
_sharedDHTRecordData = sharedDHTRecordData;
////////////////////////////////////////////////////////////////////////////
// DHTOpenable
// DHTCloseable
/// Check if the DHTRecord is open
@override
bool get isOpen => _openCount > 0;
/// The type of the openable scope
@override
FutureOr<DHTRecord> scoped() => this;
/// Add a reference to this DHTRecord
@override
Future<DHTRecord> ref() async => _mutex.protect(() async {

View File

@ -13,7 +13,7 @@ part 'dht_short_array_write.dart';
///////////////////////////////////////////////////////////////////////
class DHTShortArray implements DHTOpenable<DHTShortArray> {
class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
////////////////////////////////////////////////////////////////
// Constructors
@ -136,12 +136,16 @@ class DHTShortArray implements DHTOpenable<DHTShortArray> {
);
////////////////////////////////////////////////////////////////////////////
// DHTOpenable
// DHTCloseable
/// Check if the shortarray is open
@override
bool get isOpen => _openCount > 0;
/// The type of the openable scope
@override
FutureOr<DHTShortArray> scoped() => this;
/// Add a reference to this shortarray
@override
Future<DHTShortArray> ref() async => _mutex.protect(() async {

View File

@ -1,27 +1,36 @@
import 'dart:async';
abstract class DHTOpenable<C> {
import 'package:meta/meta.dart';
abstract class DHTCloseable<C, D> {
bool get isOpen;
@protected
FutureOr<D> scoped();
Future<C> ref();
Future<void> close();
}
abstract class DHTDeleteable<C, D> extends DHTCloseable<C, D> {
Future<void> delete();
}
extension DHTOpenableExt<D extends DHTOpenable<D>> on D {
/// Runs a closure that guarantees the DHTOpenable
extension DHTCloseableExt<C, D> on DHTCloseable<C, D> {
/// Runs a closure that guarantees the DHTCloseable
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(D) scopeFunction) async {
if (!isOpen) {
throw StateError('not open in scope');
}
try {
return await scopeFunction(this);
return await scopeFunction(await scoped());
} finally {
await close();
}
}
}
/// Runs a closure that guarantees the DHTOpenable
extension DHTDeletableExt<C, D> on DHTDeleteable<C, D> {
/// Runs a closure that guarantees the DHTCloseable
/// will be closed upon exit, and deleted if an an
/// uncaught exception is thrown
Future<T> deleteScope<T>(Future<T> Function(D) scopeFunction) async {
@ -30,7 +39,7 @@ extension DHTOpenableExt<D extends DHTOpenable<D>> on D {
}
try {
return await scopeFunction(this);
return await scopeFunction(await scoped());
} on Exception {
await delete();
rethrow;
@ -39,7 +48,7 @@ extension DHTOpenableExt<D extends DHTOpenable<D>> on D {
}
}
/// Scopes a closure that conditionally deletes the DHTOpenable on exit
/// Scopes a closure that conditionally deletes the DHTCloseable on exit
Future<T> maybeDeleteScope<T>(
bool delete, Future<T> Function(D) scopeFunction) async {
if (delete) {

View File

@ -1,4 +1,4 @@
export 'dht_openable.dart';
export 'dht_closeable.dart';
export 'dht_random_read.dart';
export 'dht_random_write.dart';
export 'exceptions.dart';