refactor, use external libraries, and add integration test for veilid_support

This commit is contained in:
Christien Rioux 2024-05-01 20:58:25 -04:00
parent e622b7f949
commit 25a6a00fcf
84 changed files with 626 additions and 3835 deletions

View File

@ -400,7 +400,7 @@ class AccountRepository {
// Record not yet open, do it
final pool = DHTRecordPool.instance;
final record = await pool.openOwned(
final record = await pool.openRecordOwned(
userLogin.accountRecordInfo.accountRecord,
debugName: 'AccountRepository::openAccountRecord::AccountRecord',
parent: localAccount.identityMaster.identityRecordKey);

View File

@ -1,8 +1,7 @@
import 'package:bloc_tools/bloc_tools.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:veilid_support/veilid_support.dart';
class ActiveChatCubit extends Cubit<TypedKey?> with BlocTools {
class ActiveChatCubit extends Cubit<TypedKey?> {
ActiveChatCubit(super.initialState);
void setActiveChat(TypedKey? activeChatRemoteConversationRecordKey) {

View File

@ -1,5 +1,5 @@
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart';
import 'package:veilid_support/veilid_support.dart';

View File

@ -1,7 +1,7 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';

View File

@ -1,6 +1,6 @@
import 'dart:async';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:veilid_support/veilid_support.dart';

View File

@ -1,6 +1,6 @@
import 'dart:async';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:fixnum/fixnum.dart';
import 'package:flutter/foundation.dart';
@ -85,7 +85,7 @@ class ContactInvitationListCubit
// to and it will be eventually encrypted with the DH of the contact's
// identity key
late final Uint8List signedContactInvitationBytes;
await (await pool.create(
await (await pool.createRecord(
debugName: 'ContactInvitationListCubit::createInvitation::'
'LocalConversation',
parent: _activeAccountInfo.accountRecordKey,
@ -114,7 +114,7 @@ class ContactInvitationListCubit
// Create DHT unicast inbox for ContactRequest
// Subkey 0 is the ContactRequest from the initiator
// Subkey 1 will contain the invitation response accept/reject eventually
await (await pool.create(
await (await pool.createRecord(
debugName: 'ContactInvitationListCubit::createInvitation::'
'ContactRequestInbox',
parent: _activeAccountInfo.accountRecordKey,
@ -198,7 +198,7 @@ class ContactInvitationListCubit
if (success && deletedItem != null) {
// Delete the contact request inbox
final contactRequestInbox = deletedItem.contactRequestInbox.toVeilid();
await (await pool.openOwned(contactRequestInbox,
await (await pool.openRecordOwned(contactRequestInbox,
debugName: 'ContactInvitationListCubit::deleteInvitation::'
'ContactRequestInbox',
parent: accountRecordKey))
@ -250,7 +250,7 @@ class ContactInvitationListCubit
contactRequestInboxKey) !=
-1;
await (await pool.openRead(contactRequestInboxKey,
await (await pool.openRecordRead(contactRequestInboxKey,
debugName: 'ContactInvitationListCubit::validateInvitation::'
'ContactRequestInbox',
parent: _activeAccountInfo.accountRecordKey))

View File

@ -34,7 +34,7 @@ class ContactRequestInboxCubit
contactInvitationRecord.contactRequestInbox.recordKey.toVeilid();
final writer = TypedKeyPair(
kind: recordKey.kind, key: writerKey, secret: writerSecret);
return pool.openRead(recordKey,
return pool.openRecordRead(recordKey,
debugName: 'ContactRequestInboxCubit::_open::'
'ContactRequestInbox',
crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer),

View File

@ -1,6 +1,6 @@
import 'dart:typed_data';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
class InvitationGeneratorCubit extends FutureCubit<Uint8List> {
InvitationGeneratorCubit(super.fut);

View File

@ -1,7 +1,7 @@
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart';
import 'package:veilid_support/veilid_support.dart';

View File

@ -1,5 +1,5 @@
import 'package:async_tools/async_tools.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';

View File

@ -37,7 +37,7 @@ class ValidContactInvitation {
_activeAccountInfo.localAccount.identityMaster.identityPublicKey;
final accountRecordKey = _activeAccountInfo.accountRecordKey;
return (await pool.openWrite(_contactRequestInboxKey, _writer,
return (await pool.openRecordWrite(_contactRequestInboxKey, _writer,
debugName: 'ValidContactInvitation::accept::'
'ContactRequestInbox',
parent: accountRecordKey))
@ -100,7 +100,7 @@ class ValidContactInvitation {
final accountRecordKey =
_activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
return (await pool.openWrite(_contactRequestInboxKey, _writer,
return (await pool.openRecordWrite(_contactRequestInboxKey, _writer,
debugName: 'ValidContactInvitation::reject::'
'ContactRequestInbox',
parent: accountRecordKey))

View File

@ -48,7 +48,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
final pool = DHTRecordPool.instance;
final crypto = await _cachedConversationCrypto();
final writer = _activeAccountInfo.conversationWriter;
final record = await pool.openWrite(
final record = await pool.openRecordWrite(
_localConversationRecordKey!, writer,
debugName: 'ConversationCubit::LocalConversation',
parent: accountRecordKey,
@ -67,7 +67,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Open remote record key if it is specified
final pool = DHTRecordPool.instance;
final crypto = await _cachedConversationCrypto();
final record = await pool.openRead(_remoteConversationRecordKey,
final record = await pool.openRecordRead(_remoteConversationRecordKey,
debugName: 'ConversationCubit::RemoteConversation',
parent: accountRecordKey,
crypto: crypto);
@ -226,14 +226,14 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Open with SMPL scheme for identity writer
late final DHTRecord localConversationRecord;
if (existingConversationRecordKey != null) {
localConversationRecord = await pool.openWrite(
localConversationRecord = await pool.openRecordWrite(
existingConversationRecordKey, writer,
debugName:
'ConversationCubit::initLocalConversation::LocalConversation',
parent: accountRecordKey,
crypto: crypto);
} else {
localConversationRecord = await pool.create(
localConversationRecord = await pool.createRecord(
debugName:
'ConversationCubit::initLocalConversation::LocalConversation',
parent: accountRecordKey,

View File

@ -1,8 +1,8 @@
import 'package:async_tools/async_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:provider/provider.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../../account_manager/account_manager.dart';

View File

@ -1,4 +1,4 @@
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'settings.dart';

View File

@ -1,6 +1,6 @@
import 'package:async_tools/async_tools.dart';
import 'package:awesome_extensions/awesome_extensions.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:blurry_modal_progress_hud/blurry_modal_progress_hud.dart';
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';

View File

@ -1,5 +1,6 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:flutter/material.dart';
import 'package:veilid_support/veilid_support.dart';
@ -16,15 +17,12 @@ class BackgroundTicker extends StatefulWidget {
class BackgroundTickerState extends State<BackgroundTicker> {
Timer? _tickTimer;
bool _inTick = false;
@override
void initState() {
super.initState();
_tickTimer = Timer.periodic(const Duration(seconds: 1), (timer) {
if (!_inTick) {
unawaited(_onTick());
}
singleFuture(this, _onTick);
});
}
@ -50,12 +48,7 @@ class BackgroundTickerState extends State<BackgroundTicker> {
return;
}
_inTick = true;
try {
// Tick DHT record pool
unawaited(DHTRecordPool.instance.tick());
} finally {
_inTick = false;
}
// Tick DHT record pool
await DHTRecordPool.instance.tick();
}
}

View File

@ -1,4 +1,4 @@
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import '../models/models.dart';
import '../repository/processor_repository.dart';

View File

@ -1,7 +0,0 @@
# https://dart.dev/guides/libraries/private-files
# Created by `dart pub`
.dart_tool/
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

View File

@ -1,15 +0,0 @@
include: package:lint_hard/all.yaml
analyzer:
errors:
invalid_annotation_target: ignore
exclude:
- '**/*.g.dart'
- '**/*.freezed.dart'
- '**/*.pb.dart'
- '**/*.pbenum.dart'
- '**/*.pbjson.dart'
- '**/*.pbserver.dart'
linter:
rules:
unawaited_futures: true
avoid_positional_boolean_parameters: false

View File

@ -1,2 +0,0 @@
@echo off
dart run build_runner build --delete-conflicting-outputs

View File

@ -1,3 +0,0 @@
#!/bin/bash
set -e
dart run build_runner build --delete-conflicting-outputs

View File

@ -1,6 +0,0 @@
// import 'package:async_tools/async_tools.dart';
// void main() {
// var awesome = Awesome();
// print('awesome: ${awesome.isAwesome}');
// }

View File

@ -1,11 +0,0 @@
/// Async Tools
library;
export 'src/async_tag_lock.dart';
export 'src/async_value.dart';
export 'src/delayed_wait_set.dart';
export 'src/serial_future.dart';
export 'src/single_future.dart';
export 'src/single_state_processor.dart';
export 'src/single_stateless_processor.dart';
export 'src/wait_set.dart';

View File

@ -1,64 +0,0 @@
import 'package:mutex/mutex.dart';
class _AsyncTagLockEntry {
_AsyncTagLockEntry()
: mutex = Mutex.locked(),
waitingCount = 0;
//
Mutex mutex;
int waitingCount;
}
class AsyncTagLock<T> {
AsyncTagLock()
: _tableLock = Mutex(),
_locks = {};
Future<void> lockTag(T tag) async {
await _tableLock.protect(() async {
final lockEntry = _locks[tag];
if (lockEntry != null) {
lockEntry.waitingCount++;
await lockEntry.mutex.acquire();
lockEntry.waitingCount--;
} else {
_locks[tag] = _AsyncTagLockEntry();
}
});
}
bool isLocked(T tag) => _locks.containsKey(tag);
bool tryLock(T tag) {
final lockEntry = _locks[tag];
if (lockEntry != null) {
return false;
}
_locks[tag] = _AsyncTagLockEntry();
return true;
}
void unlockTag(T tag) {
final lockEntry = _locks[tag]!;
if (lockEntry.waitingCount == 0) {
// If nobody is waiting for the mutex we can just drop it
_locks.remove(tag);
} else {
// Someone's waiting for the tag lock so release the mutex for it
lockEntry.mutex.release();
}
}
Future<R> protect<R>(T tag, {required Future<R> Function() closure}) async {
await lockTag(tag);
try {
return await closure();
} finally {
unlockTag(tag);
}
}
//
final Mutex _tableLock;
final Map<T, _AsyncTagLockEntry> _locks;
}

View File

@ -1,209 +0,0 @@
// ignore_for_file: avoid_catches_without_on_clauses
import 'package:freezed_annotation/freezed_annotation.dart';
part 'async_value.freezed.dart';
/// An utility for safely manipulating asynchronous data.
///
/// By using [AsyncValue], you are guaranteed that you cannot forget to
/// handle the loading/error state of an asynchronous operation.
///
/// It also expose some utilities to nicely convert an [AsyncValue] to
/// a different object.
/// For example, a Flutter Widget may use [when] to convert an [AsyncValue]
/// into either a progress indicator, an error screen, or to show the data:
///
/// ```dart
/// /// A provider that asynchronously expose the current user
/// final userProvider = StreamProvider<User>((_) async* {
/// // fetch the user
/// });
///
/// class Example extends ConsumerWidget {
/// @override
/// Widget build(BuildContext context, ScopedReader watch) {
/// final AsyncValue<User> user = watch(userProvider);
///
/// return user.when(
/// loading: () => CircularProgressIndicator(),
/// error: (error, stack) => Text('Oops, something unexpected happened'),
/// data: (value) => Text('Hello ${user.name}'),
/// );
/// }
/// }
/// ```
///
/// If a consumer of an [AsyncValue] does not care about the loading/error
/// state, consider using [asData] to read the state:
///
/// ```dart
/// Widget build(BuildContext context, ScopedReader watch) {
/// // reads the data state directly will be null during loading/error states
/// final User user = watch(userProvider).data?.value;
///
/// return Text('Hello ${user?.name}');
/// }
/// ```
///
/// See also:
///
/// - [AsyncValue.guard], to simplify transforming a [Future] into an
/// [AsyncValue].
/// - The package Freezed (https://github.com/rrousselgit/freezed), which have
/// generated this [AsyncValue] class and explains how [map]/[when] works.
@freezed
@sealed
abstract class AsyncValue<T> with _$AsyncValue<T> {
const AsyncValue._();
/// Creates an [AsyncValue] with a data.
///
/// The data can be `null`.
const factory AsyncValue.data(T value) = AsyncData<T>;
/// Creates an [AsyncValue] in loading state.
///
/// Prefer always using this constructor with the `const` keyword.
const factory AsyncValue.loading() = AsyncLoading<T>;
/// Creates an [AsyncValue] in error state.
///
/// The parameter [error] cannot be `null`.
factory AsyncValue.error(Object error, [StackTrace? stackTrace]) =
AsyncError<T>;
/// Transforms a [Future] that may fail into something that is safe to read.
///
/// This is useful to avoid having to do a tedious `try/catch`. Instead of:
///
/// ```dart
/// class MyNotifier extends StateNotifier<AsyncValue<MyData> {
/// MyNotifier(): super(const AsyncValue.loading()) {
/// _fetchData();
/// }
///
/// Future<void> _fetchData() async {
/// state = const AsyncValue.loading();
/// try {
/// final response = await dio.get('my_api/data');
/// final data = MyData.fromJson(response);
/// state = AsyncValue.data(data);
/// } catch (err, stack) {
/// state = AsyncValue.error(err, stack);
/// }
/// }
/// }
/// ```
///
/// which is redundant as the application grows and we need more and more of
/// this pattern we can use [guard] to simplify it:
///
///
/// ```dart
/// class MyNotifier extends StateNotifier<AsyncValue<MyData>> {
/// MyNotifier(): super(const AsyncValue.loading()) {
/// _fetchData();
/// }
///
/// Future<void> _fetchData() async {
/// state = const AsyncValue.loading();
/// // does the try/catch for us like previously
/// state = await AsyncValue.guard(() async {
/// final response = await dio.get('my_api/data');
/// return Data.fromJson(response);
/// });
/// }
/// }
/// ```
static Future<AsyncValue<T>> guard<T>(Future<T> Function() future) async {
try {
return AsyncValue.data(await future());
} catch (err, stack) {
return AsyncValue.error(err, stack);
}
}
/// The current data, or null if in loading/error.
///
/// This is safe to use, as Dart (will) have non-nullable types.
/// As such reading [asData] still forces to handle the loading/error cases
/// by having to check `data != null`.
///
/// ## Why does [AsyncValue<T>.data] return [AsyncData<T>] instead of [T]?
///
/// The motivation behind this decision is to allow differentiating between:
///
/// - There is a data, and it is `null`.
/// ```dart
/// // There is a data, and it is "null"
/// AsyncValue<Configuration> configs = AsyncValue.data(null);
///
/// print(configs.data); // AsyncValue(value: null)
/// print(configs.data.value); // null
/// ```
///
/// - There is no data. [AsyncValue] is currently in loading/error state.
/// ```dart
/// // No data, currently loading
/// AsyncValue<Configuration> configs = AsyncValue.loading();
///
/// print(configs.data); // null, currently loading
/// print(configs.data.value); // throws null exception
/// ```
AsyncData<T>? get asData => map(
data: (data) => data,
loading: (_) => null,
error: (_) => null,
);
bool get isData => asData != null;
/// Check if this is loading
AsyncLoading<T>? get asLoading => map(
data: (_) => null,
loading: (loading) => loading,
error: (_) => null,
);
bool get isLoading => asLoading != null;
/// Check if this is an error
AsyncError<T>? get asError => map(
data: (_) => null,
loading: (_) => null,
error: (e) => e,
);
bool get isError => asError != null;
/// Shorthand for [when] to handle only the `data` case.
AsyncValue<R> whenData<R>(R Function(T value) cb) => when(
data: (value) {
try {
return AsyncValue.data(cb(value));
} catch (err, stack) {
return AsyncValue.error(err, stack);
}
},
loading: () => const AsyncValue.loading(),
error: AsyncValue.error,
);
/// Check two AsyncData instances for equality
bool equalsData(AsyncValue<T> other,
{required bool Function(T a, T b) equals}) =>
other.when(
data: (nd) => when(
data: (d) => equals(d, nd),
loading: () => true,
error: (_e, _st) => true),
loading: () => when(
data: (_) => true,
loading: () => false,
error: (_e, _st) => true),
error: (ne, nst) => when(
data: (_) => true,
loading: () => true,
error: (e, st) => e != ne || st != nst));
}

View File

@ -1,480 +0,0 @@
// coverage:ignore-file
// GENERATED CODE - DO NOT MODIFY BY HAND
// ignore_for_file: type=lint
// ignore_for_file: unused_element, deprecated_member_use, deprecated_member_use_from_same_package, use_function_type_syntax_for_parameters, unnecessary_const, avoid_init_to_null, invalid_override_different_default_values_named, prefer_expression_function_bodies, annotate_overrides, invalid_annotation_target, unnecessary_question_mark
part of 'async_value.dart';
// **************************************************************************
// FreezedGenerator
// **************************************************************************
T _$identity<T>(T value) => value;
final _privateConstructorUsedError = UnsupportedError(
'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#adding-getters-and-methods-to-our-models');
/// @nodoc
mixin _$AsyncValue<T> {
@optionalTypeArgs
TResult when<TResult extends Object?>({
required TResult Function(T value) data,
required TResult Function() loading,
required TResult Function(Object error, StackTrace? stackTrace) error,
}) =>
throw _privateConstructorUsedError;
@optionalTypeArgs
TResult? whenOrNull<TResult extends Object?>({
TResult? Function(T value)? data,
TResult? Function()? loading,
TResult? Function(Object error, StackTrace? stackTrace)? error,
}) =>
throw _privateConstructorUsedError;
@optionalTypeArgs
TResult maybeWhen<TResult extends Object?>({
TResult Function(T value)? data,
TResult Function()? loading,
TResult Function(Object error, StackTrace? stackTrace)? error,
required TResult orElse(),
}) =>
throw _privateConstructorUsedError;
@optionalTypeArgs
TResult map<TResult extends Object?>({
required TResult Function(AsyncData<T> value) data,
required TResult Function(AsyncLoading<T> value) loading,
required TResult Function(AsyncError<T> value) error,
}) =>
throw _privateConstructorUsedError;
@optionalTypeArgs
TResult? mapOrNull<TResult extends Object?>({
TResult? Function(AsyncData<T> value)? data,
TResult? Function(AsyncLoading<T> value)? loading,
TResult? Function(AsyncError<T> value)? error,
}) =>
throw _privateConstructorUsedError;
@optionalTypeArgs
TResult maybeMap<TResult extends Object?>({
TResult Function(AsyncData<T> value)? data,
TResult Function(AsyncLoading<T> value)? loading,
TResult Function(AsyncError<T> value)? error,
required TResult orElse(),
}) =>
throw _privateConstructorUsedError;
}
/// @nodoc
abstract class $AsyncValueCopyWith<T, $Res> {
factory $AsyncValueCopyWith(
AsyncValue<T> value, $Res Function(AsyncValue<T>) then) =
_$AsyncValueCopyWithImpl<T, $Res, AsyncValue<T>>;
}
/// @nodoc
class _$AsyncValueCopyWithImpl<T, $Res, $Val extends AsyncValue<T>>
implements $AsyncValueCopyWith<T, $Res> {
_$AsyncValueCopyWithImpl(this._value, this._then);
// ignore: unused_field
final $Val _value;
// ignore: unused_field
final $Res Function($Val) _then;
}
/// @nodoc
abstract class _$$AsyncDataImplCopyWith<T, $Res> {
factory _$$AsyncDataImplCopyWith(
_$AsyncDataImpl<T> value, $Res Function(_$AsyncDataImpl<T>) then) =
__$$AsyncDataImplCopyWithImpl<T, $Res>;
@useResult
$Res call({T value});
}
/// @nodoc
class __$$AsyncDataImplCopyWithImpl<T, $Res>
extends _$AsyncValueCopyWithImpl<T, $Res, _$AsyncDataImpl<T>>
implements _$$AsyncDataImplCopyWith<T, $Res> {
__$$AsyncDataImplCopyWithImpl(
_$AsyncDataImpl<T> _value, $Res Function(_$AsyncDataImpl<T>) _then)
: super(_value, _then);
@pragma('vm:prefer-inline')
@override
$Res call({
Object? value = freezed,
}) {
return _then(_$AsyncDataImpl<T>(
freezed == value
? _value.value
: value // ignore: cast_nullable_to_non_nullable
as T,
));
}
}
/// @nodoc
class _$AsyncDataImpl<T> extends AsyncData<T> {
const _$AsyncDataImpl(this.value) : super._();
@override
final T value;
@override
String toString() {
return 'AsyncValue<$T>.data(value: $value)';
}
@override
bool operator ==(Object other) {
return identical(this, other) ||
(other.runtimeType == runtimeType &&
other is _$AsyncDataImpl<T> &&
const DeepCollectionEquality().equals(other.value, value));
}
@override
int get hashCode =>
Object.hash(runtimeType, const DeepCollectionEquality().hash(value));
@JsonKey(ignore: true)
@override
@pragma('vm:prefer-inline')
_$$AsyncDataImplCopyWith<T, _$AsyncDataImpl<T>> get copyWith =>
__$$AsyncDataImplCopyWithImpl<T, _$AsyncDataImpl<T>>(this, _$identity);
@override
@optionalTypeArgs
TResult when<TResult extends Object?>({
required TResult Function(T value) data,
required TResult Function() loading,
required TResult Function(Object error, StackTrace? stackTrace) error,
}) {
return data(value);
}
@override
@optionalTypeArgs
TResult? whenOrNull<TResult extends Object?>({
TResult? Function(T value)? data,
TResult? Function()? loading,
TResult? Function(Object error, StackTrace? stackTrace)? error,
}) {
return data?.call(value);
}
@override
@optionalTypeArgs
TResult maybeWhen<TResult extends Object?>({
TResult Function(T value)? data,
TResult Function()? loading,
TResult Function(Object error, StackTrace? stackTrace)? error,
required TResult orElse(),
}) {
if (data != null) {
return data(value);
}
return orElse();
}
@override
@optionalTypeArgs
TResult map<TResult extends Object?>({
required TResult Function(AsyncData<T> value) data,
required TResult Function(AsyncLoading<T> value) loading,
required TResult Function(AsyncError<T> value) error,
}) {
return data(this);
}
@override
@optionalTypeArgs
TResult? mapOrNull<TResult extends Object?>({
TResult? Function(AsyncData<T> value)? data,
TResult? Function(AsyncLoading<T> value)? loading,
TResult? Function(AsyncError<T> value)? error,
}) {
return data?.call(this);
}
@override
@optionalTypeArgs
TResult maybeMap<TResult extends Object?>({
TResult Function(AsyncData<T> value)? data,
TResult Function(AsyncLoading<T> value)? loading,
TResult Function(AsyncError<T> value)? error,
required TResult orElse(),
}) {
if (data != null) {
return data(this);
}
return orElse();
}
}
abstract class AsyncData<T> extends AsyncValue<T> {
const factory AsyncData(final T value) = _$AsyncDataImpl<T>;
const AsyncData._() : super._();
T get value;
@JsonKey(ignore: true)
_$$AsyncDataImplCopyWith<T, _$AsyncDataImpl<T>> get copyWith =>
throw _privateConstructorUsedError;
}
/// @nodoc
abstract class _$$AsyncLoadingImplCopyWith<T, $Res> {
factory _$$AsyncLoadingImplCopyWith(_$AsyncLoadingImpl<T> value,
$Res Function(_$AsyncLoadingImpl<T>) then) =
__$$AsyncLoadingImplCopyWithImpl<T, $Res>;
}
/// @nodoc
class __$$AsyncLoadingImplCopyWithImpl<T, $Res>
extends _$AsyncValueCopyWithImpl<T, $Res, _$AsyncLoadingImpl<T>>
implements _$$AsyncLoadingImplCopyWith<T, $Res> {
__$$AsyncLoadingImplCopyWithImpl(
_$AsyncLoadingImpl<T> _value, $Res Function(_$AsyncLoadingImpl<T>) _then)
: super(_value, _then);
}
/// @nodoc
class _$AsyncLoadingImpl<T> extends AsyncLoading<T> {
const _$AsyncLoadingImpl() : super._();
@override
String toString() {
return 'AsyncValue<$T>.loading()';
}
@override
bool operator ==(Object other) {
return identical(this, other) ||
(other.runtimeType == runtimeType && other is _$AsyncLoadingImpl<T>);
}
@override
int get hashCode => runtimeType.hashCode;
@override
@optionalTypeArgs
TResult when<TResult extends Object?>({
required TResult Function(T value) data,
required TResult Function() loading,
required TResult Function(Object error, StackTrace? stackTrace) error,
}) {
return loading();
}
@override
@optionalTypeArgs
TResult? whenOrNull<TResult extends Object?>({
TResult? Function(T value)? data,
TResult? Function()? loading,
TResult? Function(Object error, StackTrace? stackTrace)? error,
}) {
return loading?.call();
}
@override
@optionalTypeArgs
TResult maybeWhen<TResult extends Object?>({
TResult Function(T value)? data,
TResult Function()? loading,
TResult Function(Object error, StackTrace? stackTrace)? error,
required TResult orElse(),
}) {
if (loading != null) {
return loading();
}
return orElse();
}
@override
@optionalTypeArgs
TResult map<TResult extends Object?>({
required TResult Function(AsyncData<T> value) data,
required TResult Function(AsyncLoading<T> value) loading,
required TResult Function(AsyncError<T> value) error,
}) {
return loading(this);
}
@override
@optionalTypeArgs
TResult? mapOrNull<TResult extends Object?>({
TResult? Function(AsyncData<T> value)? data,
TResult? Function(AsyncLoading<T> value)? loading,
TResult? Function(AsyncError<T> value)? error,
}) {
return loading?.call(this);
}
@override
@optionalTypeArgs
TResult maybeMap<TResult extends Object?>({
TResult Function(AsyncData<T> value)? data,
TResult Function(AsyncLoading<T> value)? loading,
TResult Function(AsyncError<T> value)? error,
required TResult orElse(),
}) {
if (loading != null) {
return loading(this);
}
return orElse();
}
}
abstract class AsyncLoading<T> extends AsyncValue<T> {
const factory AsyncLoading() = _$AsyncLoadingImpl<T>;
const AsyncLoading._() : super._();
}
/// @nodoc
abstract class _$$AsyncErrorImplCopyWith<T, $Res> {
factory _$$AsyncErrorImplCopyWith(
_$AsyncErrorImpl<T> value, $Res Function(_$AsyncErrorImpl<T>) then) =
__$$AsyncErrorImplCopyWithImpl<T, $Res>;
@useResult
$Res call({Object error, StackTrace? stackTrace});
}
/// @nodoc
class __$$AsyncErrorImplCopyWithImpl<T, $Res>
extends _$AsyncValueCopyWithImpl<T, $Res, _$AsyncErrorImpl<T>>
implements _$$AsyncErrorImplCopyWith<T, $Res> {
__$$AsyncErrorImplCopyWithImpl(
_$AsyncErrorImpl<T> _value, $Res Function(_$AsyncErrorImpl<T>) _then)
: super(_value, _then);
@pragma('vm:prefer-inline')
@override
$Res call({
Object? error = null,
Object? stackTrace = freezed,
}) {
return _then(_$AsyncErrorImpl<T>(
null == error ? _value.error : error,
freezed == stackTrace
? _value.stackTrace
: stackTrace // ignore: cast_nullable_to_non_nullable
as StackTrace?,
));
}
}
/// @nodoc
class _$AsyncErrorImpl<T> extends AsyncError<T> {
_$AsyncErrorImpl(this.error, [this.stackTrace]) : super._();
@override
final Object error;
@override
final StackTrace? stackTrace;
@override
String toString() {
return 'AsyncValue<$T>.error(error: $error, stackTrace: $stackTrace)';
}
@override
bool operator ==(Object other) {
return identical(this, other) ||
(other.runtimeType == runtimeType &&
other is _$AsyncErrorImpl<T> &&
const DeepCollectionEquality().equals(other.error, error) &&
(identical(other.stackTrace, stackTrace) ||
other.stackTrace == stackTrace));
}
@override
int get hashCode => Object.hash(
runtimeType, const DeepCollectionEquality().hash(error), stackTrace);
@JsonKey(ignore: true)
@override
@pragma('vm:prefer-inline')
_$$AsyncErrorImplCopyWith<T, _$AsyncErrorImpl<T>> get copyWith =>
__$$AsyncErrorImplCopyWithImpl<T, _$AsyncErrorImpl<T>>(this, _$identity);
@override
@optionalTypeArgs
TResult when<TResult extends Object?>({
required TResult Function(T value) data,
required TResult Function() loading,
required TResult Function(Object error, StackTrace? stackTrace) error,
}) {
return error(this.error, stackTrace);
}
@override
@optionalTypeArgs
TResult? whenOrNull<TResult extends Object?>({
TResult? Function(T value)? data,
TResult? Function()? loading,
TResult? Function(Object error, StackTrace? stackTrace)? error,
}) {
return error?.call(this.error, stackTrace);
}
@override
@optionalTypeArgs
TResult maybeWhen<TResult extends Object?>({
TResult Function(T value)? data,
TResult Function()? loading,
TResult Function(Object error, StackTrace? stackTrace)? error,
required TResult orElse(),
}) {
if (error != null) {
return error(this.error, stackTrace);
}
return orElse();
}
@override
@optionalTypeArgs
TResult map<TResult extends Object?>({
required TResult Function(AsyncData<T> value) data,
required TResult Function(AsyncLoading<T> value) loading,
required TResult Function(AsyncError<T> value) error,
}) {
return error(this);
}
@override
@optionalTypeArgs
TResult? mapOrNull<TResult extends Object?>({
TResult? Function(AsyncData<T> value)? data,
TResult? Function(AsyncLoading<T> value)? loading,
TResult? Function(AsyncError<T> value)? error,
}) {
return error?.call(this);
}
@override
@optionalTypeArgs
TResult maybeMap<TResult extends Object?>({
TResult Function(AsyncData<T> value)? data,
TResult Function(AsyncLoading<T> value)? loading,
TResult Function(AsyncError<T> value)? error,
required TResult orElse(),
}) {
if (error != null) {
return error(this);
}
return orElse();
}
}
abstract class AsyncError<T> extends AsyncValue<T> {
factory AsyncError(final Object error, [final StackTrace? stackTrace]) =
_$AsyncErrorImpl<T>;
AsyncError._() : super._();
Object get error;
StackTrace? get stackTrace;
@JsonKey(ignore: true)
_$$AsyncErrorImplCopyWith<T, _$AsyncErrorImpl<T>> get copyWith =>
throw _privateConstructorUsedError;
}

View File

@ -1,18 +0,0 @@
class DelayedWaitSet {
DelayedWaitSet();
void add(Future<void> Function() closure) {
_closures.add(closure);
}
Future<void> call() async {
final futures = _closures.map((c) => c()).toList();
_closures = [];
if (futures.isEmpty) {
return;
}
await futures.wait;
}
List<Future<void> Function()> _closures = [];
}

View File

@ -1,57 +0,0 @@
// Process a single future at a time per tag queued serially
//
// The closure function is called to produce the future that is to be executed.
// If a future with a particular tag is still executing, it is queued serially
// and executed when the previous tagged future completes.
// When a tagged serialFuture finishes executing, the onDone callback is called.
// If an unhandled exception happens in the closure future, the onError callback
// is called.
import 'dart:async';
import 'dart:collection';
import 'async_tag_lock.dart';
AsyncTagLock<Object> _keys = AsyncTagLock();
typedef SerialFutureQueueItem = Future<void> Function();
Map<Object, Queue<SerialFutureQueueItem>> _queues = {};
SerialFutureQueueItem _makeSerialFutureQueueItem<T>(
Future<T> Function() closure,
void Function(T)? onDone,
void Function(Object e, StackTrace? st)? onError) =>
() async {
try {
final out = await closure();
if (onDone != null) {
onDone(out);
}
// ignore: avoid_catches_without_on_clauses
} catch (e, sp) {
if (onError != null) {
onError(e, sp);
} else {
rethrow;
}
}
};
void serialFuture<T>(Object tag, Future<T> Function() closure,
{void Function(T)? onDone,
void Function(Object e, StackTrace? st)? onError}) {
final queueItem = _makeSerialFutureQueueItem(closure, onDone, onError);
if (!_keys.tryLock(tag)) {
final queue = _queues[tag];
queue!.add(queueItem);
return;
}
final queue = _queues[tag] = Queue.from([queueItem]);
unawaited(() async {
do {
final queueItem = queue.removeFirst();
await queueItem();
} while (queue.isNotEmpty);
_queues.remove(tag);
_keys.unlockTag(tag);
}());
}

View File

@ -1,45 +0,0 @@
import 'dart:async';
import 'async_tag_lock.dart';
AsyncTagLock<Object> _keys = AsyncTagLock();
/// Process a single future at a time per tag
///
/// The closure function is called to produce the future that is to be executed.
/// If a future with a particular tag is still executing, the onBusy callback
/// is called.
/// When a tagged singleFuture finishes executing, the onDone callback is called.
/// If an unhandled exception happens in the closure future, the onError callback
/// is called.
void singleFuture<T>(Object tag, Future<T> Function() closure,
{void Function()? onBusy,
void Function(T)? onDone,
void Function(Object e, StackTrace? st)? onError}) {
if (!_keys.tryLock(tag)) {
if (onBusy != null) {
onBusy();
}
return;
}
unawaited(() async {
try {
final out = await closure();
if (onDone != null) {
onDone(out);
}
// ignore: avoid_catches_without_on_clauses
} catch (e, sp) {
if (onError != null) {
onError(e, sp);
} else {
rethrow;
}
} finally {
_keys.unlockTag(tag);
}
}());
}
Future<void> singleFuturePause(Object tag) async => _keys.lockTag(tag);
void singleFutureResume(Object tag) => _keys.unlockTag(tag);

View File

@ -1,67 +0,0 @@
import 'dart:async';
import '../async_tools.dart';
// Process a single state update at a time ensuring the most
// recent state gets processed asynchronously, possibly skipping
// states that happen while a previous state is still being processed.
//
// Eventually this will always process the most recent state passed to
// updateState.
//
// This is useful for processing state changes asynchronously without waiting
// from a synchronous execution context
class SingleStateProcessor<State> {
SingleStateProcessor();
void updateState(State newInputState, Future<void> Function(State) closure) {
// Use a singlefuture here to ensure we get dont lose any updates
// If the input stream gives us an update while we are
// still processing the last update, the most recent input state will
// be saved and processed eventually.
singleFuture(this, () async {
var newState = newInputState;
var newClosure = closure;
var done = false;
while (!done) {
await newClosure(newState);
// See if there's another state change to process
final nextState = _nextState;
final nextClosure = _nextClosure;
_nextState = null;
_nextClosure = null;
if (nextState != null) {
newState = nextState;
newClosure = nextClosure!;
} else {
done = true;
}
}
}, onBusy: () {
// Keep this state until we process again
_nextState = newInputState;
_nextClosure = closure;
});
}
Future<void> pause() => singleFuturePause(this);
Future<void> resume() async {
// Process any next state before resuming the singlefuture
try {
final nextState = _nextState;
final nextClosure = _nextClosure;
_nextState = null;
_nextClosure = null;
if (nextState != null) {
await nextClosure!(nextState);
}
} finally {
singleFutureResume(this);
}
}
State? _nextState;
Future<void> Function(State)? _nextClosure;
}

View File

@ -1,48 +0,0 @@
import 'dart:async';
import '../async_tools.dart';
// Process a single stateless update at a time ensuring each request
// gets processed asynchronously, and continuously while update is requested.
//
// This is useful for processing updates asynchronously without waiting
// from a synchronous execution context
class SingleStatelessProcessor {
SingleStatelessProcessor();
void update(Future<void> Function() closure) {
singleFuture(this, () async {
do {
_more = false;
await closure();
// See if another update was requested
} while (_more);
}, onBusy: () {
// Keep this state until we process again
_more = true;
});
}
// Like update, but with a busy wrapper that
// clears once the updating is finished
void busyUpdate<T, S>(
Future<void> Function(Future<void> Function(void Function(S))) busy,
Future<void> Function(void Function(S)) closure) {
singleFuture(
this,
() async => busy((emit) async {
do {
_more = false;
await closure(emit);
// See if another update was requested
} while (_more);
}), onBusy: () {
// Keep this state until we process again
_more = true;
});
}
bool _more = false;
}

View File

@ -1,18 +0,0 @@
class WaitSet {
WaitSet();
void add(Future<void> Function() closure) {
_futures.add(Future.delayed(Duration.zero, closure));
}
Future<void> call() async {
final futures = _futures;
_futures = [];
if (futures.isEmpty) {
return;
}
await futures.wait;
}
List<Future<void>> _futures = [];
}

View File

@ -1,19 +0,0 @@
name: async_tools
description: Useful data structures and tools for async/Future code
version: 1.0.0
publish_to: none
environment:
sdk: '>=3.2.0 <4.0.0'
# Add regular dependencies here.
dependencies:
freezed_annotation: ^2.4.1
mutex:
path: ../mutex
dev_dependencies:
build_runner: ^2.4.8
freezed: ^2.4.7
lint_hard: ^4.0.0
test: ^1.25.2

View File

@ -1,16 +0,0 @@
// import 'package:async_tools/async_tools.dart';
// import 'package:test/test.dart';
// void main() {
// group('A group of tests', () {
// final awesome = Awesome();
// setUp(() {
// // Additional setup goes here.
// });
// test('First Test', () {
// expect(awesome.isAwesome, isTrue);
// });
// });
// }

View File

@ -1,7 +0,0 @@
# https://dart.dev/guides/libraries/private-files
# Created by `dart pub`
.dart_tool/
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

View File

@ -1,15 +0,0 @@
include: package:lint_hard/all.yaml
analyzer:
errors:
invalid_annotation_target: ignore
exclude:
- '**/*.g.dart'
- '**/*.freezed.dart'
- '**/*.pb.dart'
- '**/*.pbenum.dart'
- '**/*.pbjson.dart'
- '**/*.pbserver.dart'
linter:
rules:
unawaited_futures: true
avoid_positional_boolean_parameters: false

View File

@ -1,6 +0,0 @@
// import 'package:bloc_tools/bloc_tools.dart';
// void main() {
// var awesome = Awesome();
// print('awesome: ${awesome.isAwesome}');
// }

View File

@ -1,11 +0,0 @@
/// BLoC Tools
library;
export 'src/async_transformer_cubit.dart';
export 'src/bloc_busy_wrapper.dart';
export 'src/bloc_map_cubit.dart';
export 'src/bloc_tools_extension.dart';
export 'src/future_cubit.dart';
export 'src/state_map_follower.dart';
export 'src/stream_wrapper_cubit.dart';
export 'src/transformer_cubit.dart';

View File

@ -1,48 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
// A cubit with state T that wraps another input cubit of state S and
// produces T fro S via an asynchronous transform closure
// The input cubit becomes 'owned' by the AsyncTransformerCubit and will
// be closed when the AsyncTransformerCubit closes.
class AsyncTransformerCubit<T, S> extends Cubit<AsyncValue<T>> {
AsyncTransformerCubit(this.input, {required this.transform})
: super(const AsyncValue.loading()) {
_asyncTransform(input.state);
_subscription = input.stream.listen(_asyncTransform);
}
void _asyncTransform(AsyncValue<S> newInputState) {
_singleStateProcessor.updateState(newInputState, (newState) async {
// Emit the transformed state
try {
if (newState is AsyncLoading<S>) {
emit(const AsyncValue.loading());
} else if (newState is AsyncError<S>) {
emit(AsyncValue.error(newState.error, newState.stackTrace));
} else {
final transformedState = await transform(newState.asData!.value);
emit(transformedState);
}
} on Exception catch (e, st) {
emit(AsyncValue.error(e, st));
}
});
}
@override
Future<void> close() async {
await _subscription.cancel();
await input.close();
await super.close();
}
Cubit<AsyncValue<S>> input;
final SingleStateProcessor<AsyncValue<S>> _singleStateProcessor =
SingleStateProcessor();
Future<AsyncValue<T>> Function(S) transform;
late final StreamSubscription<AsyncValue<S>> _subscription;
}

View File

@ -1,78 +0,0 @@
import 'dart:async';
import 'package:bloc/bloc.dart';
import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
@immutable
class BlocBusyState<S> extends Equatable {
const BlocBusyState(this.state) : busy = false;
const BlocBusyState._busy(this.state) : busy = true;
final bool busy;
final S state;
@override
List<Object?> get props => [busy, state];
}
mixin BlocBusyWrapper<S> on BlocBase<BlocBusyState<S>> {
Future<T> busyValue<T>(Future<T> Function(void Function(S) emit) closure) =>
_mutex.protect(() async {
void busyemit(S state) {
changedState = state;
}
// Turn on busy state
emit(BlocBusyState._busy(state.state));
// Run the closure
final out = await closure(busyemit);
// If the closure did one or more 'busy emits' then
// take the most recent one and emit it for real
final finalState = changedState;
if (finalState != null && finalState != state.state) {
emit(BlocBusyState._busy(finalState));
} else {
emit(BlocBusyState._busy(state.state));
}
return out;
});
Future<void> busy(Future<void> Function(void Function(S) emit) closure) =>
_mutex.protect(() async {
void busyemit(S state) {
changedState = state;
}
// Turn on busy state
emit(BlocBusyState._busy(state.state));
// Run the closure
await closure(busyemit);
// If the closure did one or more 'busy emits' then
// take the most recent one and emit it for real and
// turn off the busy state
final finalState = changedState;
if (finalState != null && finalState != state.state) {
emit(BlocBusyState(finalState));
} else {
emit(BlocBusyState(state.state));
}
});
void changeState(S state) {
if (_mutex.isLocked) {
changedState = state;
} else {
emit(BlocBusyState(state));
}
}
bool get isBusy => _mutex.isLocked;
final Mutex _mutex = Mutex();
S? changedState;
}

View File

@ -1,127 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import 'state_map_follower.dart';
typedef BlocMapState<K, S> = IMap<K, S>;
class _ItemEntry<S, B> {
_ItemEntry({required this.bloc, required this.subscription});
final B bloc;
final StreamSubscription<S> subscription;
}
// Streaming container cubit that is a map from some immutable key
// to a some other cubit's output state. Output state for this container
// cubit is an immutable map of the key to the output state of the contained
// cubits.
//
// K = Key type for the bloc map, used to look up some mapped cubit
// V = State type for the value, keys will look up values of this type
// B = Bloc/cubit type for the value, output states of type S
abstract class BlocMapCubit<K, V, B extends BlocBase<V>>
extends Cubit<BlocMapState<K, V>>
with StateMapFollowable<BlocMapState<K, V>, K, V> {
BlocMapCubit()
: _entries = {},
_tagLock = AsyncTagLock(),
super(IMap<K, V>());
@override
Future<void> close() async {
await _entries.values.map((e) => e.subscription.cancel()).wait;
await _entries.values.map((e) => e.bloc.close()).wait;
await super.close();
}
@protected
@override
// ignore: unnecessary_overrides
void emit(BlocMapState<K, V> state) {
super.emit(state);
}
Future<void> add(MapEntry<K, B> Function() create) {
// Create new element
final newElement = create();
final key = newElement.key;
final bloc = newElement.value;
return _tagLock.protect(key, closure: () async {
// Remove entry with the same key if it exists
await _internalRemove(key);
// Add entry with this key
_entries[key] = _ItemEntry(
bloc: bloc,
subscription: bloc.stream.listen((data) {
// Add sub-cubit's state to the map state
emit(state.add(key, data));
}));
emit(state.add(key, bloc.state));
});
}
Future<void> addState(K key, V value) =>
_tagLock.protect(key, closure: () async {
// Remove entry with the same key if it exists
await _internalRemove(key);
emit(state.add(key, value));
});
Future<void> _internalRemove(K key) async {
final sub = _entries.remove(key);
if (sub != null) {
await sub.subscription.cancel();
await sub.bloc.close();
}
}
Future<void> remove(K key) => _tagLock.protect(key, closure: () async {
await _internalRemove(key);
emit(state.remove(key));
});
R operate<R>(K key, {required R Function(B bloc) closure}) {
final bloc = _entries[key]!.bloc;
return closure(bloc);
}
R? tryOperate<R>(K key, {required R Function(B bloc) closure}) {
final entry = _entries[key];
if (entry == null) {
return null;
}
return closure(entry.bloc);
}
Future<R> operateAsync<R>(K key,
{required Future<R> Function(B bloc) closure}) =>
_tagLock.protect(key, closure: () async {
final bloc = _entries[key]!.bloc;
return closure(bloc);
});
Future<R?> tryOperateAsync<R>(K key,
{required Future<R> Function(B bloc) closure}) =>
_tagLock.protect(key, closure: () async {
final entry = _entries[key];
if (entry == null) {
return null;
}
return closure(entry.bloc);
});
/// StateMapFollowable /////////////////////////
@override
IMap<K, V> getStateMap(BlocMapState<K, V> s) => s;
final Map<K, _ItemEntry<V, B>> _entries;
final AsyncTagLock<K> _tagLock;
}

View File

@ -1,12 +0,0 @@
import 'package:bloc/bloc.dart';
mixin BlocTools<State> on BlocBase<State> {
void withStateListen(void Function(State event)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
if (onData != null) {
onData(state);
}
stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}

View File

@ -1,24 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
abstract class FutureCubit<State> extends Cubit<AsyncValue<State>> {
FutureCubit(Future<State> fut) : super(const AsyncValue.loading()) {
_initWait.add(() async => fut.then((value) {
emit(AsyncValue.data(value));
// ignore: avoid_types_on_closure_parameters
}, onError: (Object e, StackTrace stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}));
}
FutureCubit.value(State state) : super(AsyncValue.data(state));
@override
Future<void> close() async {
await _initWait();
await super.close();
}
final WaitSet _initWait = WaitSet();
}

View File

@ -1,125 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
/// Mixin that automatically keeps two blocs/cubits in sync with each other
/// Useful for having a BlocMapCubit 'follow' the state of another input cubit.
/// As the state of the input cubit changes, the BlocMapCubit can add/remove
/// mapped Cubits that automatically process the input state reactively.
///
/// S = Input state type
/// K = Key derived from elements of input state
/// V = Value derived from elements of input state
mixin StateMapFollower<S extends Object, K, V> on Closable {
void follow(StateMapFollowable<S, K, V> followable) {
assert(_following == null, 'can only follow one followable at a time');
_following = followable;
_lastInputStateMap = IMap();
_subscription = followable.registerFollower(this);
}
Future<void> unfollow() async {
await _subscription?.cancel();
_subscription = null;
_following?.unregisterFollower(this);
_following = null;
}
@override
@mustCallSuper
Future<void> close() async {
await unfollow();
await super.close();
}
Future<void> removeFromState(K key);
Future<void> updateState(K key, V value);
void _updateFollow(IMap<K, V> newInputState) {
final following = _following;
if (following == null) {
return;
}
_singleStateProcessor.updateState(newInputState, (newStateMap) async {
for (final k in _lastInputStateMap.keys) {
if (!newStateMap.containsKey(k)) {
// deleted
await removeFromState(k);
}
}
for (final newEntry in newStateMap.entries) {
final v = _lastInputStateMap.get(newEntry.key);
if (v == null || v != newEntry.value) {
// added or changed
await updateState(newEntry.key, newEntry.value);
}
}
// Keep this state map for the next time
_lastInputStateMap = newStateMap;
});
}
StateMapFollowable<S, K, V>? _following;
late IMap<K, V> _lastInputStateMap;
late StreamSubscription<IMap<K, V>>? _subscription;
final SingleStateProcessor<IMap<K, V>> _singleStateProcessor =
SingleStateProcessor();
}
/// Interface that allows a StateMapFollower to follow some other class's
/// state changes
abstract mixin class StateMapFollowable<S extends Object, K, V>
implements StateStreamable<S> {
IMap<K, V> getStateMap(S state);
StreamSubscription<IMap<K, V>> registerFollower(
StateMapFollower<S, K, V> follower) {
final stateMapTransformer = StreamTransformer<S, IMap<K, V>>.fromHandlers(
handleData: (d, s) => s.add(getStateMap(d)));
if (_followers.isEmpty) {
// start transforming stream
_transformedStream = stream.transform(stateMapTransformer);
}
_followers.add(follower);
follower._updateFollow(getStateMap(state));
return _transformedStream!.listen((s) => follower._updateFollow(s));
}
void unregisterFollower(StateMapFollower<S, K, V> follower) {
_followers.remove(follower);
if (_followers.isEmpty) {
// stop transforming stream
_transformedStream = null;
}
}
Future<T> syncFollowers<T>(Future<T> Function() closure) async {
// pause all followers
await _followers.map((f) => f._singleStateProcessor.pause()).wait;
// run closure
final out = await closure();
// resume all followers and wait for current state map to be updated
final resumeState = getStateMap(state);
await _followers.map((f) async {
// Ensure the latest state has been updated
try {
f._updateFollow(resumeState);
} finally {
// Resume processing of the follower
await f._singleStateProcessor.resume();
}
}).wait;
return out;
}
Stream<IMap<K, V>>? _transformedStream;
final List<StateMapFollower<S, K, V>> _followers = [];
}

View File

@ -1,25 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
abstract class StreamWrapperCubit<State> extends Cubit<AsyncValue<State>> {
StreamWrapperCubit(Stream<State> stream, {State? defaultState})
: super(defaultState != null
? AsyncValue.data(defaultState)
: const AsyncValue.loading()) {
_subscription = stream.listen((event) => emit(AsyncValue.data(event)),
// ignore: avoid_types_on_closure_parameters
onError: (Object error, StackTrace stackTrace) {
emit(AsyncValue.error(error, stackTrace));
});
}
@override
Future<void> close() async {
await _subscription.cancel();
await super.close();
}
late final StreamSubscription<State> _subscription;
}

View File

@ -1,21 +0,0 @@
import 'dart:async';
import 'package:bloc/bloc.dart';
class TransformerCubit<T, S> extends Cubit<T> {
TransformerCubit(this.input, {required this.transform})
: super(transform(input.state)) {
_subscription = input.stream.listen((event) => emit(transform(event)));
}
@override
Future<void> close() async {
await _subscription.cancel();
await input.close();
await super.close();
}
Cubit<S> input;
T Function(S) transform;
late final StreamSubscription<S> _subscription;
}

View File

@ -1,24 +0,0 @@
name: bloc_tools
description: A starting point for Dart libraries or applications.
version: 1.0.0
publish_to: none
environment:
sdk: '>=3.2.0 <4.0.0'
dependencies:
async_tools:
path: ../async_tools
bloc: ^8.1.3
equatable: ^2.0.5
fast_immutable_collections: ^10.1.1
freezed_annotation: ^2.4.1
meta: ^1.11.0
mutex:
path: ../mutex
dev_dependencies:
build_runner: ^2.4.8
freezed: ^2.4.7
lint_hard: ^4.0.0
test: ^1.25.2

View File

@ -1,16 +0,0 @@
// import 'package:bloc_tools/bloc_tools.dart';
// import 'package:test/test.dart';
// void main() {
// group('A group of tests', () {
// final awesome = Awesome();
// setUp(() {
// // Additional setup goes here.
// });
// test('First Test', () {
// expect(awesome.isAwesome, isTrue);
// });
// });
// }

View File

@ -1,16 +0,0 @@
# Files and directories created by pub
.packages
.pub/
.dart_tool/
build/
packages
pubspec.lock
# Directory created by dartdoc
doc/api/
# JetBrains IDEs
.idea/
*.iml
*.ipr
*.iws

View File

@ -1,50 +0,0 @@
## 3.1.0
- Increased minimum Dart SDK to 2.15.0 for `unawaited` function.
- Added development dependencies lints ^2.1.1 and pana: ^0.21.37.
- Fixed code to remove lint warnings.
## 3.0.1
- Fixed bug with new read mutexes preventing a write mutex from being acquired.
## 3.0.0
- BREAKING CHANGE: critical section functions must return a Future.
- This is unlikely to affect real-world code, since only functions
containing asynchronous code would be critical.
- Protect method returns Future to the value from the critical section.
## 2.0.0
- Null safety release.
## 2.0.0-nullsafety.0
- Pre-release version: updated library to null safety (Non-nullable by default).
- Removed support for Dart 1.x.
## 1.1.0
- Added protect, protectRead and protectWrite convenience methods.
- Improved tests to not depend on timing.
## 1.0.3
- Added an example.
## 1.0.2
- Code clean up to satisfy pana 0.13.2 health checks.
## 1.0.1
- Fixed dartanalyzer warnings.
## 1.0.0
- Updated the upper bound of the SDK constraint to <3.0.0.
## 0.0.1
- Initial version

View File

@ -1,24 +0,0 @@
Copyright (c) 2016, Hoylen Sue.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the <organization> nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,191 +0,0 @@
# mutex
A library for creating locks to ensure mutual exclusion when
running critical sections of code.
## Purpose
Mutexes can be used to protect critical sections of code to prevent
race conditions.
Although Dart uses a single thread of execution, race conditions
can still occur when asynchronous operations are used inside
critical sections. For example,
x = 42;
synchronousOperations(); // this does not modify x
assert(x == 42); // x will NOT have changed
y = 42; // a variable that other asynchronous code can modify
await asynchronousOperations(); // this does NOT modify y, but...
// There is NO GUARANTEE other async code didn't run and change it!
assert(y == 42 || y != 42); // WARNING: y might have changed
An example is when Dart is used to implement a server-side Web server
that updates a database (assuming database transactions are not being
used). The update involves querying the database, performing
calculations on those retrieved values, and then updating the database
with the result. You don't want the database to be changed by
"something else" while performing the calculations, since the results
you would write will not incorporate those other changes. That
"something else" could be the same Web server handling another request
in parallel.
This package provides a normal mutex and a read-write mutex.
## Mutex
A mutex guarantees at most only one lock can exist at any one time.
If the lock has already been acquired, attempts to acquire another
lock will be blocked until the lock has been released.
```dart
import 'package:mutex/mutex.dart';
...
final m = Mutex();
```
Acquiring the lock before running the critical section of code,
and then releasing the lock.
```dart
await m.acquire();
// No other lock can be acquired until the lock is released
try {
// critical section with asynchronous code
await ...
} finally {
m.release();
}
```
### protect
The following code uses the _protect_ convenience method to do the
same thing as the above code. Use the convenence method whenever
possible, since it ensures the lock will always be released.
```dart
await m.protect(() async {
// critical section
});
```
If the critial section returns a Future to a value, the _protect_
convenience method will return a Future to that value.
```dart
final result = await m.protect<int>(() async {
// critical section
return valueFromCriticalSection;
});
// result contains the valueFromCriticalSection
```
## Read-write mutex
A read-write mutex allows multiple _reads locks_ to be exist
simultaneously, but at most only one _write lock_ can exist at any one
time. A _write lock_ and any _read locks_ cannot both exist together
at the same time.
If there is one or more _read locks_, attempts to acquire a _write
lock_ will be blocked until all the _read locks_ have been
released. But attempts to acquire more _read locks_ will not be
blocked. If there is a _write lock_, attempts to acquire any lock
(read or write) will be blocked until that _write lock_ is released.
A read-write mutex can also be described as a single-writer mutex,
multiple-reader mutex, or a reentrant lock.
```dart
import 'package:mutex/mutex.dart';
...
final m = ReadWriteMutex();
```
Acquiring a write lock:
await m.acquireWrite();
// No other locks (read or write) can be acquired until released
try {
// critical write section with asynchronous code
await ...
} finally {
m.release();
}
Acquiring a read lock:
await m.acquireRead();
// No write lock can be acquired until all read locks are released,
// but additional read locks can be acquired.
try {
// critical read section with asynchronous code
await ...
} finally {
m.release();
}
### protectWrite and protectRead
The following code uses the _protectWrite_ and _protectRead_
convenience methods to do the same thing as the above code. Use the
convenence method whenever possible, since it ensures the lock will
always be released.
```dart
await m.protectWrite(() async {
// critical write section
});
await m.protectRead(() async {
// critical read section
});
```
If the critial section returns a Future to a value, these convenience
methods will return a Future to that value.
```dart
final result1 await m.protectWrite<String>(() async {
// critical write section
return valueFromCritialSection1;
});
// result1 contains the valueFromCriticalSection1
final result2 = await m.protectRead(() async {
// critical read section
return valueFromCritialSection2;
});
// result2 contains the valueFromCriticalSection2
```
## When mutual exclusion is not needed
The critical section should always contain some asynchronous code. If
the critical section only contains synchronous code, there is no need
to put it in a critical section. In Dart, synchronous code cannot be
interrupted, so there is no need to protect it using mutual exclusion.
Also, if the critical section does not involve data or shared
resources that can be accessed by other asynchronous code, it also
does not need to be protected. For example, if it only uses local
variables that other asynchronous code won't have access to: while the
other asynchronous code could run, it won't be able to make unexpected
changes to the local variables it can't access.
## Features and bugs
Please file feature requests and bugs at the [issue tracker][tracker].
[tracker]: https://github.com/hoylen/dart-mutex/issues

View File

@ -1,15 +0,0 @@
include: package:lint_hard/all.yaml
analyzer:
errors:
invalid_annotation_target: ignore
exclude:
- '**/*.g.dart'
- '**/*.freezed.dart'
- '**/*.pb.dart'
- '**/*.pbenum.dart'
- '**/*.pbjson.dart'
- '**/*.pbserver.dart'
linter:
rules:
unawaited_futures: true
avoid_positional_boolean_parameters: false

View File

@ -1,114 +0,0 @@
// Mutex example.
//
// This example demonstrates why a mutex is needed.
import 'dart:async';
import 'dart:math';
import 'package:mutex/mutex.dart';
//----------------------------------------------------------------
// Random asynchronous delays to try and simulate race conditions.
const _maxDelay = 500; // milliseconds
final _random = Random();
Future<void> randomDelay() async {
await Future<void>.delayed(
Duration(milliseconds: _random.nextInt(_maxDelay)));
}
//----------------------------------------------------------------
/// Account balance.
///
/// The classical example of a race condition is when a bank account is updated
/// by different simultaneous operations.
int balance = 0;
//----------------------------------------------------------------
/// Deposit without using mutex.
Future<void> unsafeUpdate(int id, int depositAmount) async {
// Random delay before updating starts
await randomDelay();
// Add the deposit to the balance. But this operation is not atomic if
// there are asynchronous operations in it (as simulated by the randomDelay).
final oldBalance = balance;
await randomDelay();
balance = oldBalance + depositAmount;
print(' [$id] added $depositAmount to $oldBalance -> $balance');
}
//----------------------------------------------------------------
/// Deposit using mutex.
Mutex m = Mutex();
Future<void> safeUpdate(int id, int depositAmount) async {
// Random delay before updating starts
await randomDelay();
// Acquire the mutex before running the critical section of code
await m.protect(() async {
// critical section
// This is the same as the unsafe update. But since it is performed only
// when the mutex is acquired, it is safe: no other safe update can happen
// until this mutex is released.
final oldBalance = balance;
await randomDelay();
balance = oldBalance + depositAmount;
// end of critical section
print(' [$id] added $depositAmount to $oldBalance -> $balance');
});
}
//----------------------------------------------------------------
/// Make a series of deposits and see if the final balance is correct.
Future<void> makeDeposits({bool safe = true}) async {
print(safe ? 'Using mutex:' : 'Not using mutex:');
const numberDeposits = 10;
const amount = 10;
balance = 0;
// Create a set of operations, each attempting to deposit the same amount
// into the account.
final operations = <Future<void>>[];
for (var x = 0; x < numberDeposits; x++) {
final f = (safe) ? safeUpdate(x, amount) : unsafeUpdate(x, amount);
operations.add(f);
}
// Wait for all the deposit operations to finish
await Future.wait<void>(operations);
// Check if all of the operations succeeded
final expected = numberDeposits * amount;
if (balance != expected) {
print('Error: deposits were lost (final balance $balance != $expected)');
} else {
print('Success: no deposits were lost');
}
}
//----------------------------------------------------------------
void main() async {
await makeDeposits(safe: false);
print('');
await makeDeposits(safe: true);
}

View File

@ -1,11 +0,0 @@
// Copyright (c) 2016, Hoylen Sue. All rights reserved. Use of this source code
// is governed by a BSD-style license that can be found in the LICENSE file.
/// Mutual exclusion.
///
library mutex;
import 'dart:async';
part 'src/mutex.dart';
part 'src/read_write_mutex.dart';

View File

@ -1,89 +0,0 @@
part of mutex;
/// Mutual exclusion.
///
/// The [protect] method is a convenience method for acquiring a lock before
/// running critical code, and then releasing the lock afterwards. Using this
/// convenience method will ensure the lock is always released after use.
///
/// Usage:
///
/// m = Mutex();
///
/// await m.protect(() async {
/// // critical section
/// });
///
/// Alternatively, a lock can be explicitly acquired and managed. In this
/// situation, the program is responsible for releasing the lock after it
/// have been used. Failure to release the lock will prevent other code for
/// ever acquiring the lock.
///
/// m = Mutex();
///
/// await m.acquire();
/// try {
/// // critical section
/// }
/// finally {
/// m.release();
/// }
class Mutex {
//================================================================
// Constructors
Mutex() : _rwMutex = ReadWriteMutex();
Mutex.locked() : _rwMutex = ReadWriteMutex.writeLocked();
// Implemented as a ReadWriteMutex that is used only with write locks.
final ReadWriteMutex _rwMutex;
/// Indicates if a lock has been acquired and not released.
bool get isLocked => _rwMutex.isLocked;
/// Acquire a lock
///
/// Returns a future that will be completed when the lock has been acquired.
///
/// Consider using the convenience method [protect], otherwise the caller
/// is responsible for making sure the lock is released after it is no longer
/// needed. Failure to release the lock means no other code can acquire the
/// lock.
Future<void> acquire() => _rwMutex.acquireWrite();
/// Release a lock.
///
/// Release a lock that has been acquired.
void release() => _rwMutex.release();
/// Convenience method for protecting a function with a lock.
///
/// This method guarantees a lock is always acquired before invoking the
/// [criticalSection] function. It also guarantees the lock is always
/// released.
///
/// A critical section should always contain asynchronous code, since purely
/// synchronous code does not need to be protected inside a critical section.
/// Therefore, the critical section is a function that returns a _Future_.
/// If the critical section does not need to return a value, it should be
/// defined as returning `Future<void>`.
///
/// Returns a _Future_ whose value is the value of the _Future_ returned by
/// the critical section.
///
/// An exception is thrown if the critical section throws an exception,
/// or an exception is thrown while waiting for the _Future_ returned by
/// the critical section to complete. The lock is released, when those
/// exceptions occur.
Future<T> protect<T>(Future<T> Function() criticalSection) async {
await acquire();
try {
return await criticalSection();
} finally {
release();
}
}
}

View File

@ -1,304 +0,0 @@
part of mutex;
//################################################################
/// Internal representation of a request for a lock.
///
/// This is instantiated for each acquire and, if necessary, it is added
/// to the waiting queue.
class _ReadWriteMutexRequest {
/// Internal constructor.
///
/// The [isRead] indicates if this is a request for a read lock (true) or a
/// request for a write lock (false).
_ReadWriteMutexRequest({required this.isRead});
/// Indicates if this is a read or write lock.
final bool isRead; // true = read lock requested; false = write lock requested
/// The job's completer.
///
/// This [Completer] will complete when the job has acquired the lock.
final Completer<void> completer = Completer<void>();
}
//################################################################
/// Mutual exclusion that supports read and write locks.
///
/// Multiple read locks can be simultaneously acquired, but at most only
/// one write lock can be acquired at any one time.
///
/// **Protecting critical code**
///
/// The [protectWrite] and [protectRead] are convenience methods for acquiring
/// locks and releasing them. Using them will ensure the locks are always
/// released after use.
///
/// Create the mutex:
///
/// m = ReadWriteMutex();
///
/// Code protected by a write lock:
///
/// await m.protectWrite(() {
/// // critical write section
/// });
///
/// Other code can be protected by a read lock:
///
/// await m.protectRead(() {
/// // critical read section
/// });
///
///
/// **Explicitly managing locks**
///
/// Alternatively, the locks can be explicitly acquired and managed. In this
/// situation, the program is responsible for releasing the locks after they
/// have been used. Failure to release the lock will prevent other code for
/// ever acquiring a lock.
///
/// Create the mutex:
///
/// m = ReadWriteMutex();
///
/// Some code can acquire a write lock:
///
/// await m.acquireWrite();
/// try {
/// // critical write section
/// assert(m.isWriteLocked);
/// } finally {
/// m.release();
/// }
///
/// Other code can acquire a read lock.
///
/// await m.acquireRead();
/// try {
/// // critical read section
/// assert(m.isReadLocked);
/// } finally {
/// m.release();
/// }
///
/// The current implementation lets locks be acquired in first-in-first-out
/// order. This ensures there will not be any lock starvation, which can
/// happen if some locks are prioritised over others. Submit a feature
/// request issue, if there is a need for another scheduling algorithm.
class ReadWriteMutex {
//================================================================
// Constructors
ReadWriteMutex();
ReadWriteMutex.writeLocked() : _state = -1;
ReadWriteMutex.readLocked(int? count) : _state = count ?? 1 {
assert(_state > 0, "can't have a negative read lock count");
}
//================================================================
// Members
/// List of requests waiting for a lock on this mutex.
final _waiting = <_ReadWriteMutexRequest>[];
/// State of the mutex
int _state = 0; // -1 = write lock, +ve = number of read locks; 0 = no lock
//================================================================
// Methods
/// Indicates if a lock (read or write) has been acquired and not released.
bool get isLocked => _state != 0;
/// Indicates if a write lock has been acquired and not released.
bool get isWriteLocked => _state == -1;
/// Indicates if one or more read locks has been acquired and not released.
bool get isReadLocked => 0 < _state;
/// Indicates the number of waiters on this mutex
int get waiters => _waiting.length;
/// Acquire a read lock
///
/// Returns a future that will be completed when the lock has been acquired.
///
/// A read lock can not be acquired when there is a write lock on the mutex.
/// But it can be acquired if there are other read locks.
///
/// Consider using the convenience method [protectRead], otherwise the caller
/// is responsible for making sure the lock is released after it is no longer
/// needed. Failure to release the lock means no other code can acquire a
/// write lock.
Future<void> acquireRead() => _acquire(isRead: true);
/// Acquire a write lock
///
/// Returns a future that will be completed when the lock has been acquired.
///
/// A write lock can only be acquired when there are no other locks (neither
/// read locks nor write locks) on the mutex.
///
/// Consider using the convenience method [protectWrite], otherwise the caller
/// is responsible for making sure the lock is released after it is no longer
/// needed. Failure to release the lock means no other code can acquire the
/// lock (neither a read lock or a write lock).
Future<void> acquireWrite() => _acquire(isRead: false);
/// Release a lock.
///
/// Release the lock that was previously acquired.
///
/// When the lock is released, locks waiting to be acquired can be acquired
/// depending on the type of lock waiting and if other locks have been
/// acquired.
///
/// A [StateError] is thrown if the mutex does not currently have a lock on
/// it.
void release() {
if (_state == -1) {
// Write lock released
_state = 0;
} else if (0 < _state) {
// Read lock released
_state--;
} else if (_state == 0) {
throw StateError('no lock to release');
} else {
assert(false, 'invalid state');
}
// If there are jobs waiting and the next job can acquire the mutex,
// let it acquire it and remove it from the queue.
//
// This is a while loop, because there could be multiple jobs on the
// queue waiting for a read-only mutex. So they can all be allowed to run.
while (_waiting.isNotEmpty) {
final nextJob = _waiting.first;
if (_jobAcquired(nextJob)) {
_waiting.removeAt(0);
} else {
// The next job cannot acquire the mutex. This only occurs when: the
// the currently running job has a write mutex (_state == -1); or the
// next job wants write mutex and there is a job currently running
// (regardless of what type of mutex it has acquired).
assert(_state < 0 || !nextJob.isRead,
'unexpected: next job cannot be acquired');
break; // no more can be removed from the queue
}
}
}
/// Convenience method for protecting a function with a read lock.
///
/// This method guarantees a read lock is always acquired before invoking the
/// [criticalSection] function. It also guarantees the lock is always
/// released.
///
/// A critical section should always contain asynchronous code, since purely
/// synchronous code does not need to be protected inside a critical section.
/// Therefore, the critical section is a function that returns a _Future_.
/// If the critical section does not need to return a value, it should be
/// defined as returning `Future<void>`.
///
/// Returns a _Future_ whose value is the value of the _Future_ returned by
/// the critical section.
///
/// An exception is thrown if the critical section throws an exception,
/// or an exception is thrown while waiting for the _Future_ returned by
/// the critical section to complete. The lock is released, when those
/// exceptions occur.
Future<T> protectRead<T>(Future<T> Function() criticalSection) async {
await acquireRead();
try {
return await criticalSection();
} finally {
release();
}
}
/// Convenience method for protecting a function with a write lock.
///
/// This method guarantees a write lock is always acquired before invoking the
/// [criticalSection] function. It also guarantees the lock is always
/// released.
///
/// A critical section should always contain asynchronous code, since purely
/// synchronous code does not need to be protected inside a critical section.
/// Therefore, the critical section is a function that returns a _Future_.
/// If the critical section does not need to return a value, it should be
/// defined as returning `Future<void>`.
///
/// Returns a _Future_ whose value is the value of the _Future_ returned by
/// the critical section.
///
/// An exception is thrown if the critical section throws an exception,
/// or an exception is thrown while waiting for the _Future_ returned by
/// the critical section to complete. The lock is released, when those
/// exceptions occur.
Future<T> protectWrite<T>(Future<T> Function() criticalSection) async {
await acquireWrite();
try {
return await criticalSection();
} finally {
release();
}
}
/// Internal acquire method.
///
/// Used to acquire a read lock (when [isRead] is true) or a write lock
/// (when [isRead] is false).
///
/// Returns a Future that completes when the lock has been acquired.
Future<void> _acquire({required bool isRead}) {
final newJob = _ReadWriteMutexRequest(isRead: isRead);
if (_waiting.isNotEmpty || !_jobAcquired(newJob)) {
// This new job cannot run yet. There are either other jobs already
// waiting, or there are no waiting jobs but this job cannot start
// because the mutex is currently acquired (namely, either this new job
// or the currently running job is read-write).
//
// Add the new job to the end of the queue.
_waiting.add(newJob);
}
return newJob.completer.future;
}
/// Determine if the [job] can now acquire the lock.
///
/// If it can acquire the lock, the job's completer is completed, the
/// state updated, and true is returned. If not, false is returned.
///
/// A job for a read lock can only be acquired if there are no other locks
/// or there are read lock(s). A job for a write lock can only be acquired
/// if there are no other locks.
bool _jobAcquired(_ReadWriteMutexRequest job) {
assert(-1 <= _state, 'must not be write locked');
if (_state == 0 || (0 < _state && job.isRead)) {
// Can acquire
_state = (job.isRead) ? (_state + 1) : -1;
job.completer.complete();
return true;
} else {
return false;
}
}
}

View File

@ -1,12 +0,0 @@
name: mutex
description: Mutual exclusion with implementation of normal and read-write mutex
version: 3.1.0
publish_to: none
environment:
sdk: '>=3.2.0 <4.0.0'
dev_dependencies:
lint_hard: ^4.0.0
pana: ^0.22.2
test: ^1.25.2

View File

@ -1,102 +0,0 @@
// Test contributed by "Cat-sushi"
// <https://github.com/hoylen/dart-mutex/issues/11>
import 'dart:async';
// import 'dart:io';
import 'package:mutex/mutex.dart';
import 'package:test/test.dart';
//================================================================
// For debug output
//
// Uncomment the "stdout.write" line in the [debugWrite] method to enable
// debug output.
int numReadAcquired = 0;
int numReadReleased = 0;
enum State { waitingToAcquire, acquired, released }
const stateSymbol = <State, String>{
State.waitingToAcquire: '?',
State.acquired: '+',
State.released: '-'
};
var _outputCount = 0; // to manage line breaks
void debugOutput(String id, State state) {
debugWrite('$id${stateSymbol[state]} ');
_outputCount++;
if (_outputCount % 10 == 0) {
debugWrite('\n');
}
}
void debugWrite(String str) {
// Uncomment to show what is happening
// stdout.write(str);
}
//================================================================
Future<void> mySleep([int ms = 1000]) async {
await Future<void>.delayed(Duration(milliseconds: ms));
}
Future<void> sharedLoop1(ReadWriteMutex mutex, String symbol) async {
while (true) {
debugOutput(symbol, State.waitingToAcquire);
await mutex.protectRead(() async {
numReadAcquired++;
debugOutput(symbol, State.acquired);
await mySleep(100);
});
numReadReleased++;
debugOutput(symbol, State.released);
}
}
void main() {
group('exclusive lock tests', () {
test('test1', () async {
const numReadLoops = 5;
final mutex = ReadWriteMutex();
assert(numReadLoops < 26, 'too many read loops for lowercase letters');
debugWrite('Number of read loops: $numReadLoops\n');
for (var x = 0; x < numReadLoops; x++) {
final symbol = String.fromCharCode('a'.codeUnitAt(0) + x);
unawaited(sharedLoop1(mutex, symbol));
await mySleep(10);
}
await mySleep();
debugWrite('\nAbout to acquireWrite'
' (reads: acquired=$numReadAcquired released=$numReadReleased'
' outstanding=${numReadAcquired - numReadReleased})\n');
_outputCount = 0; // reset line break
const writeSymbol = 'W';
debugOutput(writeSymbol, State.waitingToAcquire);
await mutex.acquireWrite();
debugOutput(writeSymbol, State.acquired);
mutex.release();
debugOutput(writeSymbol, State.released);
debugWrite('\nWrite mutex released\n');
_outputCount = 0; // reset line break
expect('a', 'a');
});
});
}

View File

@ -1,486 +0,0 @@
import 'dart:async';
import 'package:mutex/mutex.dart';
import 'package:test/test.dart';
//################################################################
class RWTester {
int _operation = 0;
final _operationSequences = <int>[];
/// Execution sequence of the operations done.
///
/// Each element corresponds to the position of the initial execution
/// order of the read/write operation future.
List<int> get operationSequences => _operationSequences;
ReadWriteMutex mutex = ReadWriteMutex();
/// Set to true to print out read/write to the balance during deposits
static const bool debugOutput = false;
final DateTime _startTime = DateTime.now();
void _debugPrint(String message) {
if (debugOutput) {
final t = DateTime.now().difference(_startTime).inMilliseconds;
// ignore: avoid_print
print('$t: $message');
}
}
void reset() {
_operationSequences.clear();
_debugPrint('reset');
}
/// Waits [startDelay] and then invokes critical section with mutex.
///
/// Writes to [_operationSequences]. If the readwrite locks are respected
/// then the final state of the list will be in ascending order.
Future<void> writing(int startDelay, int sequence, int endDelay) async {
await Future<void>.delayed(Duration(milliseconds: startDelay));
await mutex.protectWrite(() async {
final op = ++_operation;
_debugPrint('[$op] write start: <- $_operationSequences');
final tmp = _operationSequences;
expect(mutex.isWriteLocked, isTrue);
expect(_operationSequences, orderedEquals(tmp));
// Add the position of operation to the list of operations.
_operationSequences.add(sequence); // add position to list
expect(mutex.isWriteLocked, isTrue);
await Future<void>.delayed(Duration(milliseconds: endDelay));
_debugPrint('[$op] write finish: -> $_operationSequences');
});
}
/// Waits [startDelay] and then invokes critical section with mutex.
///
///
Future<void> reading(int startDelay, int sequence, int endDelay) async {
await Future<void>.delayed(Duration(milliseconds: startDelay));
await mutex.protectRead(() async {
final op = ++_operation;
_debugPrint('[$op] read start: <- $_operationSequences');
expect(mutex.isReadLocked, isTrue);
_operationSequences.add(sequence); // add position to list
await Future<void>.delayed(Duration(milliseconds: endDelay));
_debugPrint('[$op] read finish: <- $_operationSequences');
});
}
}
//################################################################
//----------------------------------------------------------------
void main() {
final account = RWTester();
setUp(account.reset);
test('multiple read locks', () async {
await Future.wait([
account.reading(0, 1, 1000),
account.reading(0, 2, 900),
account.reading(0, 3, 800),
account.reading(0, 4, 700),
account.reading(0, 5, 600),
account.reading(0, 6, 500),
account.reading(0, 7, 400),
account.reading(0, 8, 300),
account.reading(0, 9, 200),
account.reading(0, 10, 100),
]);
// The first future acquires the lock first and waits the longest to give it
// up. This should however not block any of the other read operations
// as such the reads should finish in ascending orders.
expect(
account.operationSequences,
orderedEquals(<int>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
);
});
test('multiple write locks', () async {
await Future.wait([
account.writing(0, 1, 100),
account.writing(0, 2, 100),
account.writing(0, 3, 100),
]);
// The first future writes first and holds the lock until 100 ms
// Even though the second future starts execution, the lock cannot be
// acquired until it is released by the first future.
// Therefore the sequence of operations will be in ascending order
// of the futures.
expect(
account.operationSequences,
orderedEquals(<int>[1, 2, 3]),
);
});
test('acquireWrite() before acquireRead()', () async {
const lockTimeout = Duration(milliseconds: 100);
final mutex = ReadWriteMutex();
await mutex.acquireWrite();
expect(mutex.isReadLocked, equals(false));
expect(mutex.isWriteLocked, equals(true));
// Since there is a write lock existing, a read lock cannot be acquired.
final readLock = mutex.acquireRead().timeout(lockTimeout);
expect(
() async => readLock,
throwsA(isA<TimeoutException>()),
);
});
test('acquireRead() before acquireWrite()', () async {
const lockTimeout = Duration(milliseconds: 100);
final mutex = ReadWriteMutex();
await mutex.acquireRead();
expect(mutex.isReadLocked, equals(true));
expect(mutex.isWriteLocked, equals(false));
// Since there is a read lock existing, a write lock cannot be acquired.
final writeLock = mutex.acquireWrite().timeout(lockTimeout);
expect(
() async => writeLock,
throwsA(isA<TimeoutException>()),
);
});
test('mixture of read write locks execution order', () async {
await Future.wait([
account.reading(0, 1, 100),
account.reading(10, 2, 100),
account.reading(20, 3, 100),
account.writing(30, 4, 100),
account.writing(40, 5, 100),
account.writing(50, 6, 100),
]);
expect(
account.operationSequences,
orderedEquals(<int>[1, 2, 3, 4, 5, 6]),
);
});
group('protectRead', () {
test('lock obtained and released on success', () async {
final m = ReadWriteMutex();
await m.protectRead(() async {
// critical section
expect(m.isLocked, isTrue);
});
expect(m.isLocked, isFalse);
});
test('value returned from critical section', () async {
// These are the normal scenario of the critical section running
// successfully. It tests different return types from the
// critical section.
final m = ReadWriteMutex();
// returns Future<void>
await m.protectRead<void>(() async {});
// returns Future<int>
final number = await m.protectRead<int>(() async => 42);
expect(number, equals(42));
// returns Future<int?> completes with value
final optionalNumber = await m.protectRead<int?>(() async => 1024);
expect(optionalNumber, equals(1024));
// returns Future<int?> completes with null
final optionalNumberNull = await m.protectRead<int?>(() async => null);
expect(optionalNumberNull, isNull);
// returns Future<String>
final word = await m.protectRead<String>(() async => 'foobar');
expect(word, equals('foobar'));
// returns Future<String?> completes with value
final optionalWord = await m.protectRead<String?>(() async => 'baz');
expect(optionalWord, equals('baz'));
// returns Future<String?> completes with null
final optionalWordNull = await m.protectRead<String?>(() async => null);
expect(optionalWordNull, isNull);
expect(m.isLocked, isFalse);
});
test('exception in synchronous code', () async {
// Tests what happens when an exception is raised in the **synchronous**
// part of the critical section.
//
// Locks are correctly managed: the lock is obtained before executing
// the critical section, and is released when the exception is thrown
// by the _protect_ method.
//
// The exception is raised when waiting for the Future returned by
// _protect_ to complete. Even though the exception is synchronously
// raised by the critical section, it won't be thrown when _protect_
// is invoked. The _protect_ method always successfully returns a
// _Future_.
Future<int> criticalSection() {
final c = Completer<int>()..complete(42);
// synchronous exception
throw const FormatException('synchronous exception');
// ignore: dead_code
return c.future;
}
// Check the criticalSection behaves as expected for the test
try {
// ignore: unused_local_variable
final resultFuture = criticalSection();
fail('critical section did not throw synchronous exception');
} on FormatException {
// expected: invoking the criticalSection results in the exception
}
final m = ReadWriteMutex();
try {
// Invoke protect to get the Future (this should succeed)
final resultFuture = m.protectRead<int>(criticalSection);
expect(resultFuture, isA<Future<int>>());
// Wait for the Future (this should fail)
final result = await resultFuture;
expect(result, isNotNull);
fail('exception not thrown');
} on FormatException catch (e) {
expect(m.isLocked, isFalse);
expect(e.message, equals('synchronous exception'));
}
expect(m.isLocked, isFalse);
});
test('exception in asynchronous code', () async {
// Tests what happens when an exception is raised in the **asynchronous**
// part of the critical section.
//
// Locks are correctly managed: the lock is obtained before executing
// the critical section, and is released when the exception is thrown
// by the _protect_ method.
//
// The exception is raised when waiting for the Future returned by
// _protect_ to complete.
Future<int> criticalSection() async {
final c = Completer<int>()..complete(42);
await Future.delayed(const Duration(seconds: 1), () {});
// asynchronous exception (since it must wait for the above line)
throw const FormatException('asynchronous exception');
// ignore: dead_code
return c.future;
}
// Check the criticalSection behaves as expected for the test
final resultFuture = criticalSection();
expect(resultFuture, isA<Future<int>>());
// invoking the criticalSection does not result in the exception
try {
await resultFuture;
fail('critical section did not throw asynchronous exception');
} on FormatException {
// expected: exception happens on the await
}
final m = ReadWriteMutex();
try {
// Invoke protect to get the Future (this should succeed)
final resultFuture = m.protectRead<int>(criticalSection);
expect(resultFuture, isA<Future<int>>());
// Even though the criticalSection throws the exception in synchronous
// code, protect causes it to become an asynchronous exception.
// Wait for the Future (this should fail)
final result = await resultFuture;
expect(result, isNotNull);
fail('exception not thrown');
} on FormatException catch (e) {
expect(m.isLocked, isFalse);
expect(e.message, equals('asynchronous exception'));
}
expect(m.isLocked, isFalse);
});
});
group('protectWrite', () {
test('lock obtained and released on success', () async {
final m = ReadWriteMutex();
await m.protectWrite(() async {
// critical section
expect(m.isLocked, isTrue);
});
expect(m.isLocked, isFalse);
});
test('value returned from critical section', () async {
// These are the normal scenario of the critical section running
// successfully. It tests different return types from the
// critical section.
final m = ReadWriteMutex();
// returns Future<void>
await m.protectWrite<void>(() async {});
// returns Future<int>
final number = await m.protectWrite<int>(() async => 42);
expect(number, equals(42));
// returns Future<int?> completes with value
final optionalNumber = await m.protectWrite<int?>(() async => 1024);
expect(optionalNumber, equals(1024));
// returns Future<int?> completes with null
final optionalNumberNull = await m.protectWrite<int?>(() async => null);
expect(optionalNumberNull, isNull);
// returns Future<String>
final word = await m.protectWrite<String>(() async => 'foobar');
expect(word, equals('foobar'));
// returns Future<String?> completes with value
final optionalWord = await m.protectWrite<String?>(() async => 'baz');
expect(optionalWord, equals('baz'));
// returns Future<String?> completes with null
final optionalWordNull = await m.protectWrite<String?>(() async => null);
expect(optionalWordNull, isNull);
expect(m.isLocked, isFalse);
});
test('exception in synchronous code', () async {
// Tests what happens when an exception is raised in the **synchronous**
// part of the critical section.
//
// Locks are correctly managed: the lock is obtained before executing
// the critical section, and is released when the exception is thrown
// by the _protect_ method.
//
// The exception is raised when waiting for the Future returned by
// _protect_ to complete. Even though the exception is synchronously
// raised by the critical section, it won't be thrown when _protect_
// is invoked. The _protect_ method always successfully returns a
// _Future_.
Future<int> criticalSection() {
final c = Completer<int>()..complete(42);
// synchronous exception
throw const FormatException('synchronous exception');
// ignore: dead_code
return c.future;
}
// Check the criticalSection behaves as expected for the test
try {
// ignore: unused_local_variable
final resultFuture = criticalSection();
fail('critical section did not throw synchronous exception');
} on FormatException {
// expected: invoking the criticalSection results in the exception
}
final m = ReadWriteMutex();
try {
// Invoke protect to get the Future (this should succeed)
final resultFuture = m.protectWrite<int>(criticalSection);
expect(resultFuture, isA<Future<int>>());
// Wait for the Future (this should fail)
final result = await resultFuture;
expect(result, isNotNull);
fail('exception not thrown');
} on FormatException catch (e) {
expect(m.isLocked, isFalse);
expect(e.message, equals('synchronous exception'));
}
expect(m.isLocked, isFalse);
});
test('exception in asynchronous code', () async {
// Tests what happens when an exception is raised in the **asynchronous**
// part of the critical section.
//
// Locks are correctly managed: the lock is obtained before executing
// the critical section, and is released when the exception is thrown
// by the _protect_ method.
//
// The exception is raised when waiting for the Future returned by
// _protect_ to complete.
Future<int> criticalSection() async {
final c = Completer<int>()..complete(42);
await Future.delayed(const Duration(seconds: 1), () {});
// asynchronous exception (since it must wait for the above line)
throw const FormatException('asynchronous exception');
// ignore: dead_code
return c.future;
}
// Check the criticalSection behaves as expected for the test
final resultFuture = criticalSection();
expect(resultFuture, isA<Future<int>>());
// invoking the criticalSection does not result in the exception
try {
await resultFuture;
fail('critical section did not throw asynchronous exception');
} on FormatException {
// expected: exception happens on the await
}
final m = ReadWriteMutex();
try {
// Invoke protect to get the Future (this should succeed)
final resultFuture = m.protectWrite<int>(criticalSection);
expect(resultFuture, isA<Future<int>>());
// Even though the criticalSection throws the exception in synchronous
// code, protect causes it to become an asynchronous exception.
// Wait for the Future (this should fail)
final result = await resultFuture;
expect(result, isNotNull);
fail('exception not thrown');
} on FormatException catch (e) {
expect(m.isLocked, isFalse);
expect(e.message, equals('asynchronous exception'));
}
expect(m.isLocked, isFalse);
});
});
}

View File

@ -1,341 +0,0 @@
import 'dart:async';
import 'package:mutex/mutex.dart';
import 'package:test/test.dart';
//################################################################
/// Account simulating the classic "simultaneous update" concurrency problem.
///
/// The deposit operation reads the balance, waits for a short time (where
/// problems can occur if the balance is changed) and then writes out the
/// new balance.
///
class Account {
int get balance => _balance;
int _balance = 0;
int _operation = 0;
Mutex mutex = Mutex();
/// Set to true to print out read/write to the balance during deposits
static const bool debugOutput = false;
/// Time used for calculating time offsets in debug messages.
final DateTime _startTime = DateTime.now();
void _debugPrint(String message) {
if (debugOutput) {
final t = DateTime.now().difference(_startTime).inMilliseconds;
// ignore: avoid_print
print('$t: $message');
}
}
void reset([int startingBalance = 0]) {
_balance = startingBalance;
_debugPrint('reset: balance = $_balance');
}
/// Waits [startDelay] and then invokes critical section without mutex.
///
Future<void> depositUnsafe(
int amount, int startDelay, int dangerWindow) async {
await Future<void>.delayed(Duration(milliseconds: startDelay));
await _depositCriticalSection(amount, dangerWindow);
}
/// Waits [startDelay] and then invokes critical section with mutex.
///
Future<void> depositWithMutex(
int amount, int startDelay, int dangerWindow) async {
await Future<void>.delayed(Duration(milliseconds: startDelay));
await mutex.acquire();
try {
expect(mutex.isLocked, isTrue);
await _depositCriticalSection(amount, dangerWindow);
expect(mutex.isLocked, isTrue);
} finally {
mutex.release();
}
}
/// Critical section of adding [amount] to the balance.
///
/// Reads the balance, then sleeps for [dangerWindow] milliseconds, before
/// saving the new balance. If not protected, another invocation of this
/// method while it is sleeping will read the balance before it is updated.
/// The one that saves its balance last will overwrite the earlier saved
/// balances (effectively those other deposits will be lost).
///
Future<void> _depositCriticalSection(int amount, int dangerWindow) async {
final op = ++_operation;
_debugPrint('[$op] read balance: $_balance');
final tmp = _balance;
await Future<void>.delayed(Duration(milliseconds: dangerWindow));
_balance = tmp + amount;
_debugPrint('[$op] write balance: $_balance (= $tmp + $amount)');
}
}
//################################################################
//----------------------------------------------------------------
void main() {
const correctBalance = 68;
final account = Account();
test('without mutex', () async {
// First demonstrate that without mutex incorrect results are produced.
// Without mutex produces incorrect result
// 000. a reads 0
// 025. b reads 0
// 050. a writes 42
// 075. b writes 26
account.reset();
await Future.wait<void>([
account.depositUnsafe(42, 0, 50),
account.depositUnsafe(26, 25, 50) // result overwrites first deposit
]);
expect(account.balance, equals(26)); // incorrect: first deposit lost
// Without mutex produces incorrect result
// 000. b reads 0
// 025. a reads 0
// 050. b writes 26
// 075. a writes 42
account.reset();
await Future.wait([
account.depositUnsafe(42, 25, 50), // result overwrites second deposit
account.depositUnsafe(26, 0, 50)
]);
expect(account.balance, equals(42)); // incorrect: second deposit lost
});
test('with mutex', () async {
// Test correct results are produced with mutex
// With mutex produces correct result
// 000. a acquires lock
// 000. a reads 0
// 025. b is blocked
// 050. a writes 42
// 050. a releases lock
// 050. b acquires lock
// 050. b reads 42
// 100. b writes 68
account.reset();
await Future.wait([
account.depositWithMutex(42, 0, 50),
account.depositWithMutex(26, 25, 50)
]);
expect(account.balance, equals(correctBalance));
// With mutex produces correct result
// 000. b acquires lock
// 000. b reads 0
// 025. a is blocked
// 050. b writes 26
// 050. b releases lock
// 050. a acquires lock
// 050. a reads 26
// 100. a writes 68
account.reset();
await Future.wait([
account.depositWithMutex(42, 25, 50),
account.depositWithMutex(26, 0, 50)
]);
expect(account.balance, equals(correctBalance));
});
test('multiple acquires are serialized', () async {
// Demonstrate that sections running in a mutex are effectively serialized
const delay = 200; // milliseconds
account.reset();
await Future.wait([
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
account.depositWithMutex(1, 0, delay),
]);
expect(account.balance, equals(10));
});
group('protect', () {
test('lock obtained and released on success', () async {
// This is the normal scenario of the critical section running
// successfully. The lock is acquired before running the critical
// section, and it is released after it runs (and will remain
// unlocked after the _protect_ method returns).
final m = Mutex();
await m.protect(() async {
// critical section: returns Future<void>
expect(m.isLocked, isTrue);
});
expect(m.isLocked, isFalse);
});
test('value returned from critical section', () async {
// These are the normal scenario of the critical section running
// successfully. It tests different return types from the
// critical section.
final m = Mutex();
// returns Future<void>
await m.protect<void>(() async {});
// returns Future<int>
final number = await m.protect<int>(() async => 42);
expect(number, equals(42));
// returns Future<int?> completes with value
final optionalNumber = await m.protect<int?>(() async => 1024);
expect(optionalNumber, equals(1024));
// returns Future<int?> completes with null
final optionalNumberNull = await m.protect<int?>(() async => null);
expect(optionalNumberNull, isNull);
// returns Future<String>
final word = await m.protect<String>(() async => 'foobar');
expect(word, equals('foobar'));
// returns Future<String?> completes with value
final optionalWord = await m.protect<String?>(() async => 'baz');
expect(optionalWord, equals('baz'));
// returns Future<String?> completes with null
final optionalWordNull = await m.protect<String?>(() async => null);
expect(optionalWordNull, isNull);
expect(m.isLocked, isFalse);
});
test('exception in synchronous code', () async {
// Tests what happens when an exception is raised in the **synchronous**
// part of the critical section.
//
// Locks are correctly managed: the lock is obtained before executing
// the critical section, and is released when the exception is thrown
// by the _protect_ method.
//
// The exception is raised when waiting for the Future returned by
// _protect_ to complete. Even though the exception is synchronously
// raised by the critical section, it won't be thrown when _protect_
// is invoked. The _protect_ method always successfully returns a
// _Future_.
Future<int> criticalSection() {
final c = Completer<int>()..complete(42);
// synchronous exception
throw const FormatException('synchronous exception');
// ignore: dead_code
return c.future;
}
// Check the criticalSection behaves as expected for the test
try {
// ignore: unused_local_variable
final resultFuture = criticalSection();
fail('critical section did not throw synchronous exception');
} on FormatException {
// expected: invoking the criticalSection results in the exception
}
final m = Mutex();
try {
// Invoke protect to get the Future (this should succeed)
final resultFuture = m.protect<int>(criticalSection);
expect(resultFuture, isA<Future<void>>());
// Wait for the Future (this should fail)
final result = await resultFuture;
expect(result, isNotNull);
fail('exception not thrown');
} on FormatException catch (e) {
expect(m.isLocked, isFalse);
expect(e.message, equals('synchronous exception'));
}
expect(m.isLocked, isFalse);
});
test('exception in asynchronous code', () async {
// Tests what happens when an exception is raised in the **asynchronous**
// part of the critical section.
//
// Locks are correctly managed: the lock is obtained before executing
// the critical section, and is released when the exception is thrown
// by the _protect_ method.
//
// The exception is raised when waiting for the Future returned by
// _protect_ to complete.
Future<int> criticalSection() async {
final c = Completer<int>()..complete(42);
await Future.delayed(const Duration(seconds: 1), () {});
// asynchronous exception (since it must wait for the above line)
throw const FormatException('asynchronous exception');
// ignore: dead_code
return c.future;
}
// Check the criticalSection behaves as expected for the test
final resultFuture = criticalSection();
expect(resultFuture, isA<Future<int>>());
// invoking the criticalSection does not result in the exception
try {
await resultFuture;
fail('critical section did not throw asynchronous exception');
} on FormatException {
// expected: exception happens on the await
}
final m = Mutex();
try {
// Invoke protect to get the Future (this should succeed)
final resultFuture = m.protect<int>(criticalSection);
expect(resultFuture, isA<Future<int>>());
// Even though the criticalSection throws the exception in synchronous
// code, protect causes it to become an asynchronous exception.
// Wait for the Future (this should fail)
final result = await resultFuture;
expect(result, isNotNull);
fail('exception not thrown');
} on FormatException catch (e) {
expect(m.isLocked, isFalse);
expect(e.message, equals('asynchronous exception'));
}
expect(m.isLocked, isFalse);
});
});
}

View File

@ -1,28 +1,15 @@
# This file configures the analyzer, which statically analyzes Dart code to
# check for errors, warnings, and lints.
#
# The issues identified by the analyzer are surfaced in the UI of Dart-enabled
# IDEs (https://dart.dev/tools#ides-and-editors). The analyzer can also be
# invoked from the command line by running `flutter analyze`.
# The following line activates a set of recommended lints for Flutter apps,
# packages, and plugins designed to encourage good coding practices.
include: package:flutter_lints/flutter.yaml
include: package:lint_hard/all.yaml
analyzer:
errors:
invalid_annotation_target: ignore
exclude:
- '**/*.g.dart'
- '**/*.freezed.dart'
- '**/*.pb.dart'
- '**/*.pbenum.dart'
- '**/*.pbjson.dart'
- '**/*.pbserver.dart'
linter:
# The lint rules applied to this project can be customized in the
# section below to disable rules from the `package:flutter_lints/flutter.yaml`
# included above or to enable additional rules. A list of all available lints
# and their documentation is published at https://dart.dev/lints.
#
# Instead of disabling a lint rule for the entire project in the
# section below, it can also be suppressed for a single line of code
# or a specific dart file by using the `// ignore: name_of_lint` and
# `// ignore_for_file: name_of_lint` syntax on the line or in the file
# producing the lint.
rules:
# avoid_print: false # Uncomment to disable the `avoid_print` rule
# prefer_single_quotes: true # Uncomment to enable the `prefer_single_quotes` rule
# Additional information about this file can be found at
# https://dart.dev/guides/language/analysis-options
unawaited_futures: true
avoid_positional_boolean_parameters: false

View File

@ -1,19 +1,30 @@
@Timeout(Duration(seconds: 60))
@Timeout(Duration(seconds: 120))
library veilid_support_integration_test;
import 'package:flutter_test/flutter_test.dart';
import 'package:integration_test/integration_test.dart';
import 'package:veilid_test/veilid_test.dart';
import 'fixtures.dart';
import 'fixtures/fixtures.dart';
import 'test_dht_record_pool.dart';
import 'test_dht_short_array.dart';
void main() {
IntegrationTestWidgetsFlutterBinding.ensureInitialized();
final fixture = DefaultFixture();
final veilidFixture =
DefaultVeilidFixture(programName: 'veilid_support integration test');
final updateProcessorFixture =
UpdateProcessorFixture(veilidFixture: veilidFixture);
final tickerFixture =
TickerFixture(updateProcessorFixture: updateProcessorFixture);
final dhtRecordPoolFixture = DHTRecordPoolFixture(
tickerFixture: tickerFixture,
updateProcessorFixture: updateProcessorFixture);
group('Started Tests', () {
setUpAll(fixture.setUp);
tearDownAll(fixture.tearDown);
setUpAll(veilidFixture.setUp);
tearDownAll(veilidFixture.tearDown);
// group('Crypto Tests', () {
// test('best cryptosystem', testBestCryptoSystem);
@ -23,15 +34,32 @@ void main() {
// });
group('Attached Tests', () {
setUpAll(fixture.attach);
tearDownAll(fixture.detach);
setUpAll(veilidFixture.attach);
tearDownAll(veilidFixture.detach);
group('DHT Support Tests', () {
group('DHTRecordPool Tests', () {
test('create pool', testDHTRecordPoolCreate);
});
setUpAll(updateProcessorFixture.setUp);
setUpAll(tickerFixture.setUp);
tearDownAll(tickerFixture.tearDown);
tearDownAll(updateProcessorFixture.tearDown);
test('create pool', testDHTRecordPoolCreate);
// group('DHTRecordPool Tests', () {
// setUpAll(dhtRecordPoolFixture.setUp);
// tearDownAll(dhtRecordPoolFixture.tearDown);
// test('create/delete record', testDHTRecordCreateDelete);
// test('record scopes', testDHTRecordScopes);
// test('create/delete deep record', testDHTRecordDeepCreateDelete);
// });
group('DHTShortArray Tests', () {
test('create shortarray', testDHTShortArrayCreate);
setUpAll(dhtRecordPoolFixture.setUp);
tearDownAll(dhtRecordPoolFixture.tearDown);
// test('create shortarray', testDHTShortArrayCreateDelete);
test('add shortarray', testDHTShortArrayAdd);
});
});
});

View File

@ -1,152 +0,0 @@
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:mutex/mutex.dart';
import 'package:veilid/veilid.dart';
class DefaultFixture {
DefaultFixture();
StreamSubscription<VeilidUpdate>? _veilidUpdateSubscription;
Stream<VeilidUpdate>? _veilidUpdateStream;
final StreamController<VeilidUpdate> _updateStreamController =
StreamController.broadcast();
static final _fixtureMutex = Mutex();
Future<void> setUp() async {
await _fixtureMutex.acquire();
assert(_veilidUpdateStream == null, 'should not set up fixture twice');
final ignoreLogTargetsStr =
// ignore: do_not_use_environment
const String.fromEnvironment('IGNORE_LOG_TARGETS').trim();
final ignoreLogTargets = ignoreLogTargetsStr.isEmpty
? <String>[]
: ignoreLogTargetsStr.split(',').map((e) => e.trim()).toList();
final Map<String, dynamic> platformConfigJson;
if (kIsWeb) {
final platformConfig = VeilidWASMConfig(
logging: VeilidWASMConfigLogging(
performance: VeilidWASMConfigLoggingPerformance(
enabled: true,
level: VeilidConfigLogLevel.debug,
logsInTimings: true,
logsInConsole: false,
ignoreLogTargets: ignoreLogTargets,
),
api: VeilidWASMConfigLoggingApi(
enabled: true,
level: VeilidConfigLogLevel.info,
ignoreLogTargets: ignoreLogTargets,
)));
platformConfigJson = platformConfig.toJson();
} else {
final platformConfig = VeilidFFIConfig(
logging: VeilidFFIConfigLogging(
terminal: VeilidFFIConfigLoggingTerminal(
enabled: false,
level: VeilidConfigLogLevel.debug,
ignoreLogTargets: ignoreLogTargets,
),
otlp: VeilidFFIConfigLoggingOtlp(
enabled: false,
level: VeilidConfigLogLevel.trace,
grpcEndpoint: 'localhost:4317',
serviceName: 'Veilid Tests',
ignoreLogTargets: ignoreLogTargets,
),
api: VeilidFFIConfigLoggingApi(
enabled: true,
// level: VeilidConfigLogLevel.debug,
level: VeilidConfigLogLevel.info,
ignoreLogTargets: ignoreLogTargets,
)));
platformConfigJson = platformConfig.toJson();
}
Veilid.instance.initializeVeilidCore(platformConfigJson);
var config = await getDefaultVeilidConfig(
isWeb: kIsWeb,
programName: 'Veilid Tests',
// ignore: avoid_redundant_argument_values, do_not_use_environment
bootstrap: const String.fromEnvironment('BOOTSTRAP'),
// ignore: avoid_redundant_argument_values, do_not_use_environment
networkKeyPassword: const String.fromEnvironment('NETWORK_KEY'),
);
config =
config.copyWith(tableStore: config.tableStore.copyWith(delete: true));
config = config.copyWith(
protectedStore: config.protectedStore.copyWith(delete: true));
config =
config.copyWith(blockStore: config.blockStore.copyWith(delete: true));
final us =
_veilidUpdateStream = await Veilid.instance.startupVeilidCore(config);
_veilidUpdateSubscription = us.listen((update) {
if (update is VeilidLog) {
// print(update.message);
} else if (update is VeilidUpdateAttachment) {
} else if (update is VeilidUpdateConfig) {
} else if (update is VeilidUpdateNetwork) {
} else if (update is VeilidAppMessage) {
} else if (update is VeilidAppCall) {
} else if (update is VeilidUpdateValueChange) {
} else if (update is VeilidUpdateRouteChange) {
} else {
throw Exception('unexpected update: $update');
}
_updateStreamController.sink.add(update);
});
}
Stream<VeilidUpdate> get updateStream => _updateStreamController.stream;
Future<void> attach() async {
await Veilid.instance.attach();
// Wait for attached state
while (true) {
final state = await Veilid.instance.getVeilidState();
var done = false;
if (state.attachment.publicInternetReady) {
switch (state.attachment.state) {
case AttachmentState.detached:
break;
case AttachmentState.attaching:
break;
case AttachmentState.detaching:
break;
default:
done = true;
break;
}
}
if (done) {
break;
}
await Future.delayed(const Duration(seconds: 1));
}
}
Future<void> detach() async {
await Veilid.instance.detach();
}
Future<void> tearDown() async {
assert(_veilidUpdateStream != null, 'should not tearDown without setUp');
final cancelFut = _veilidUpdateSubscription?.cancel();
await Veilid.instance.shutdownVeilidCore();
await cancelFut;
_veilidUpdateSubscription = null;
_veilidUpdateStream = null;
_fixtureMutex.release();
}
}

View File

@ -0,0 +1,36 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:veilid_support/veilid_support.dart';
import 'package:veilid_test/veilid_test.dart';
class DHTRecordPoolFixture implements TickerFixtureTickable {
DHTRecordPoolFixture(
{required this.tickerFixture, required this.updateProcessorFixture});
static final _fixtureMutex = Mutex();
UpdateProcessorFixture updateProcessorFixture;
TickerFixture tickerFixture;
Future<void> setUp() async {
await _fixtureMutex.acquire();
await DHTRecordPool.init();
tickerFixture.register(this);
}
Future<void> tearDown() async {
assert(_fixtureMutex.isLocked, 'should not tearDown without setUp');
tickerFixture.unregister(this);
await DHTRecordPool.close();
_fixtureMutex.release();
}
@override
Future<void> onTick() async {
if (!updateProcessorFixture
.processorConnectionState.isPublicInternetReady) {
return;
}
await DHTRecordPool.instance.tick();
}
}

View File

@ -0,0 +1 @@
export 'dht_record_pool_fixture.dart';

View File

@ -1,9 +1,201 @@
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:veilid_support/veilid_support.dart';
Future<void> testDHTRecordPoolCreate() async {
// final cs = await Veilid.instance.bestCryptoSystem();
// expect(await cs.defaultSaltLength(), equals(16));
await DHTRecordPool.init(logger: debugPrintSynchronously);
final pool = DHTRecordPool.instance;
await pool.tick();
await DHTRecordPool.close();
}
Future<void> testDHTRecordCreateDelete() async {
final pool = DHTRecordPool.instance;
// Close before delete
{
final rec = await pool.createRecord(debugName: 'test_create_delete 1');
expect(rec.isOpen, isTrue);
await rec.close();
expect(rec.isOpen, isFalse);
await pool.deleteRecord(rec.key);
// Set should fail
await expectLater(() async => rec.tryWriteBytes(utf8.encode('test')),
throwsA(isA<VeilidAPIException>()));
}
// Close after delete
{
final rec2 = await pool.createRecord(debugName: 'test_create_delete 2');
expect(rec2.isOpen, isTrue);
await pool.deleteRecord(rec2.key);
expect(rec2.isOpen, isTrue);
await rec2.close();
expect(rec2.isOpen, isFalse);
// Set should fail
await expectLater(() async => rec2.tryWriteBytes(utf8.encode('test')),
throwsA(isA<VeilidAPIException>()));
}
// Close after delete multiple
// Okay to request delete multiple times before close
{
final rec3 = await pool.createRecord(debugName: 'test_create_delete 3');
await pool.deleteRecord(rec3.key);
await pool.deleteRecord(rec3.key);
// Set should succeed still
await rec3.tryWriteBytes(utf8.encode('test'));
await rec3.close();
await rec3.close();
// Set should fail
await expectLater(() async => rec3.tryWriteBytes(utf8.encode('test')),
throwsA(isA<VeilidAPIException>()));
// Delete already delete should fail
await expectLater(() async => pool.deleteRecord(rec3.key),
throwsA(isA<VeilidAPIException>()));
}
}
Future<void> testDHTRecordScopes() async {
final pool = DHTRecordPool.instance;
// Delete scope with exception should propagate exception
{
final rec = await pool.createRecord(debugName: 'test_scope 1');
await expectLater(
() async => rec.deleteScope((recd) async {
throw Exception();
}),
throwsA(isA<Exception>()));
// Set should fail
await expectLater(() async => rec.tryWriteBytes(utf8.encode('test')),
throwsA(isA<VeilidAPIException>()));
}
// Delete scope without exception
{
final rec2 = await pool.createRecord(debugName: 'test_scope 2');
try {
await rec2.deleteScope((rec2d) async {
//
});
} on Exception {
assert(false, 'should not throw');
}
await rec2.close();
await pool.deleteRecord(rec2.key);
}
// Close scope without exception
{
final rec3 = await pool.createRecord(debugName: 'test_scope 3');
try {
await rec3.scope((rec3d) async {
//
});
} on Exception {
assert(false, 'should not throw');
}
// Set should fail because scope closed it
await expectLater(() async => rec3.tryWriteBytes(utf8.encode('test')),
throwsA(isA<VeilidAPIException>()));
await pool.deleteRecord(rec3.key);
}
}
Future<void> testDHTRecordGetSet() async {
final pool = DHTRecordPool.instance;
final valdata = utf8.encode('test');
// Test get without set
{
final rec = await pool.createRecord(debugName: 'test_get_set 1');
final val = await rec.get();
await pool.deleteRecord(rec.key);
expect(val, isNull);
}
// Test set then get
{
final rec2 = await pool.createRecord(debugName: 'test_get_set 2');
expect(await rec2.tryWriteBytes(valdata), isNull);
expect(await rec2.get(), equals(valdata));
// Invalid subkey should throw
await expectLater(
() async => rec2.get(subkey: 1), throwsA(isA<VeilidAPIException>()));
await pool.deleteRecord(rec2.key);
}
// Test set then delete then open then get
{
final rec3 = await pool.createRecord(debugName: 'test_get_set 3');
expect(await rec3.tryWriteBytes(valdata), isNull);
expect(await rec3.get(), equals(valdata));
await rec3.close();
await pool.deleteRecord(rec3.key);
final rec4 =
await pool.openRecordRead(rec3.key, debugName: 'test_get_set 4');
expect(await rec4.get(), equals(valdata));
await rec4.close();
await pool.deleteRecord(rec4.key);
}
}
Future<void> testDHTRecordDeepCreateDelete() async {
final pool = DHTRecordPool.instance;
const numChildren = 20;
const numIterations = 10;
// Make root record
final recroot = await pool.createRecord(debugName: 'test_deep_create_delete');
for (var d = 0; d < numIterations; d++) {
// Make child set 1
var parent = recroot;
final children = <DHTRecord>[];
for (var n = 0; n < numChildren; n++) {
final child =
await pool.createRecord(debugName: 'deep $n', parent: parent.key);
children.add(child);
parent = child;
}
// Make child set 2
final children2 = <DHTRecord>[];
parent = recroot;
for (var n = 0; n < numChildren; n++) {
final child =
await pool.createRecord(debugName: 'deep2 $n ', parent: parent.key);
children2.add(child);
parent = child;
}
// Should fail to delete root
await expectLater(
() async => pool.deleteRecord(recroot.key), throwsA(isA<StateError>()));
// Close child set 1
await children.map((c) => c.close()).wait;
// Delete child set 1 in reverse order
for (var n = numChildren - 1; n >= 0; n--) {
await pool.deleteRecord(children[n].key);
}
// Should fail to delete root
await expectLater(
() async => pool.deleteRecord(recroot.key), throwsA(isA<StateError>()));
// Close child set 1
await children2.map((c) => c.close()).wait;
// Delete child set 2 in reverse order
for (var n = numChildren - 1; n >= 0; n--) {
await pool.deleteRecord(children2[n].key);
}
}
// Should be able to delete root now
await pool.deleteRecord(recroot.key);
}

View File

@ -3,7 +3,86 @@ import 'dart:convert';
import 'package:flutter_test/flutter_test.dart';
import 'package:veilid_support/veilid_support.dart';
Future<void> testDHTShortArrayCreate() async {
// final cs = await Veilid.instance.bestCryptoSystem();
// expect(await cs.defaultSaltLength(), equals(16));
Future<void> testDHTShortArrayCreateDelete() async {
// Close before delete
{
final arr = await DHTShortArray.create(debugName: 'sa_create_delete 1');
expect(await arr.operate((r) async => r.length), isZero);
expect(arr.isOpen, isTrue);
await arr.close();
expect(arr.isOpen, isFalse);
await arr.delete();
// Operate should fail
await expectLater(() async => arr.operate((r) async => r.length),
throwsA(isA<StateError>()));
}
// Close after delete
{
final arr = await DHTShortArray.create(debugName: 'sa_create_delete 2');
await arr.delete();
// Operate should still succeed because things aren't closed
expect(await arr.operate((r) async => r.length), isZero);
await arr.close();
// Operate should fail
await expectLater(() async => arr.operate((r) async => r.length),
throwsA(isA<StateError>()));
}
// Close after delete multiple
// Okay to request delete multiple times before close
{
final arr = await DHTShortArray.create(debugName: 'sa_create_delete 3');
await arr.delete();
await arr.delete();
// Operate should still succeed because things aren't closed
expect(await arr.operate((r) async => r.length), isZero);
await arr.close();
await arr.close();
// Operate should fail
await expectLater(() async => arr.operate((r) async => r.length),
throwsA(isA<StateError>()));
}
}
Future<void> testDHTShortArrayAdd() async {
final arr = await DHTShortArray.create(debugName: 'sa_add 1');
final dataset =
Iterable<int>.generate(256).map((n) => utf8.encode('elem $n')).toList();
print('adding');
{
final (res, ok) = await arr.operateWrite((w) async {
for (var n = 0; n < dataset.length; n++) {
print('add $n');
final success = await w.tryAddItem(dataset[n]);
expect(success, isTrue);
}
});
expect(res, isNull);
expect(ok, isTrue);
}
print('get all');
{
final dataset2 = await arr.operate((r) async => r.getAllItems());
expect(dataset2, equals(dataset));
}
print('clear');
{
final (res, ok) = await arr.operateWrite((w) async => w.tryClear());
expect(res, isTrue);
expect(ok, isTrue);
}
print('get all');
{
final dataset3 = await arr.operate((r) async => r.getAllItems());
expect(dataset3, isEmpty);
}
await arr.delete();
await arr.close();
}

View File

@ -45,6 +45,7 @@
331C807B294A618700263BE5 /* RunnerTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunnerTests.swift; sourceTree = "<group>"; };
331C8081294A63A400263BE5 /* RunnerTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = RunnerTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
3B3967151E833CAA004F5970 /* AppFrameworkInfo.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; name = AppFrameworkInfo.plist; path = Flutter/AppFrameworkInfo.plist; sourceTree = "<group>"; };
4380113E2BE01E850006987E /* libveilid_flutter.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; name = libveilid_flutter.a; path = "../../../../../veilid/target/lipo-ios/libveilid_flutter.a"; sourceTree = "<group>"; };
74858FAD1ED2DC5600515810 /* Runner-Bridging-Header.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "Runner-Bridging-Header.h"; sourceTree = "<group>"; };
74858FAE1ED2DC5600515810 /* AppDelegate.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AppDelegate.swift; sourceTree = "<group>"; };
7AFA3C8E1D35360C0083082E /* Release.xcconfig */ = {isa = PBXFileReference; lastKnownFileType = text.xcconfig; name = Release.xcconfig; path = Flutter/Release.xcconfig; sourceTree = "<group>"; };
@ -76,6 +77,14 @@
path = RunnerTests;
sourceTree = "<group>";
};
4380113D2BE01E850006987E /* Frameworks */ = {
isa = PBXGroup;
children = (
4380113E2BE01E850006987E /* libveilid_flutter.a */,
);
name = Frameworks;
sourceTree = "<group>";
};
9740EEB11CF90186004384FC /* Flutter */ = {
isa = PBXGroup;
children = (
@ -94,6 +103,7 @@
97C146F01CF9000F007C117D /* Runner */,
97C146EF1CF9000F007C117D /* Products */,
331C8082294A63A400263BE5 /* RunnerTests */,
4380113D2BE01E850006987E /* Frameworks */,
);
sourceTree = "<group>";
};
@ -361,16 +371,22 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
CLANG_ENABLE_MODULES = YES;
CODE_SIGN_IDENTITY = "Apple Development";
CODE_SIGN_STYLE = Automatic;
CURRENT_PROJECT_VERSION = "$(FLUTTER_BUILD_NUMBER)";
DEVELOPMENT_TEAM = W2TA5TB8Q5;
DEVELOPMENT_TEAM = XP5LBLT7M7;
ENABLE_BITCODE = NO;
INFOPLIST_FILE = Runner/Info.plist;
INFOPLIST_KEY_CFBundleDisplayName = "Veilid Support Tests";
INFOPLIST_KEY_LSApplicationCategoryType = "public.app-category.social-networking";
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@executable_path/Frameworks",
);
PRODUCT_BUNDLE_IDENTIFIER = com.example.example;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = "org.veilid.packages.veilid-support.tests";
PRODUCT_NAME = "$(TARGET_NAME)";
PROVISIONING_PROFILE_SPECIFIER = "";
SWIFT_OBJC_BRIDGING_HEADER = "Runner/Runner-Bridging-Header.h";
SWIFT_VERSION = 5.0;
VERSIONING_SYSTEM = "apple-generic";
@ -541,16 +557,22 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
CLANG_ENABLE_MODULES = YES;
CODE_SIGN_IDENTITY = "Apple Development";
CODE_SIGN_STYLE = Automatic;
CURRENT_PROJECT_VERSION = "$(FLUTTER_BUILD_NUMBER)";
DEVELOPMENT_TEAM = W2TA5TB8Q5;
DEVELOPMENT_TEAM = XP5LBLT7M7;
ENABLE_BITCODE = NO;
INFOPLIST_FILE = Runner/Info.plist;
INFOPLIST_KEY_CFBundleDisplayName = "Veilid Support Tests";
INFOPLIST_KEY_LSApplicationCategoryType = "public.app-category.social-networking";
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@executable_path/Frameworks",
);
PRODUCT_BUNDLE_IDENTIFIER = com.example.example;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = "org.veilid.packages.veilid-support.tests";
PRODUCT_NAME = "$(TARGET_NAME)";
PROVISIONING_PROFILE_SPECIFIER = "";
SWIFT_OBJC_BRIDGING_HEADER = "Runner/Runner-Bridging-Header.h";
SWIFT_OPTIMIZATION_LEVEL = "-Onone";
SWIFT_VERSION = 5.0;
@ -564,16 +586,22 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
CLANG_ENABLE_MODULES = YES;
CODE_SIGN_IDENTITY = "Apple Development";
CODE_SIGN_STYLE = Automatic;
CURRENT_PROJECT_VERSION = "$(FLUTTER_BUILD_NUMBER)";
DEVELOPMENT_TEAM = W2TA5TB8Q5;
DEVELOPMENT_TEAM = XP5LBLT7M7;
ENABLE_BITCODE = NO;
INFOPLIST_FILE = Runner/Info.plist;
INFOPLIST_KEY_CFBundleDisplayName = "Veilid Support Tests";
INFOPLIST_KEY_LSApplicationCategoryType = "public.app-category.social-networking";
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@executable_path/Frameworks",
);
PRODUCT_BUNDLE_IDENTIFIER = com.example.example;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = "org.veilid.packages.veilid-support.tests";
PRODUCT_NAME = "$(TARGET_NAME)";
PROVISIONING_PROFILE_SPECIFIER = "";
SWIFT_OBJC_BRIDGING_HEADER = "Runner/Runner-Bridging-Header.h";
SWIFT_VERSION = 5.0;
VERSIONING_SYSTEM = "apple-generic";

View File

@ -2,6 +2,8 @@
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>CADisableMinimumFrameDurationOnPhone</key>
<true/>
<key>CFBundleDevelopmentRegion</key>
<string>$(DEVELOPMENT_LANGUAGE)</string>
<key>CFBundleDisplayName</key>
@ -24,6 +26,8 @@
<string>$(FLUTTER_BUILD_NUMBER)</string>
<key>LSRequiresIPhoneOS</key>
<true/>
<key>UIApplicationSupportsIndirectInputEvents</key>
<true/>
<key>UILaunchStoryboardName</key>
<string>LaunchScreen</string>
<key>UIMainStoryboardFile</key>
@ -41,9 +45,5 @@
<string>UIInterfaceOrientationLandscapeLeft</string>
<string>UIInterfaceOrientationLandscapeRight</string>
</array>
<key>CADisableMinimumFrameDurationOnPhone</key>
<true/>
<key>UIApplicationSupportsIndirectInputEvents</key>
<true/>
</dict>
</plist>

View File

@ -10,12 +10,13 @@ packages:
source: hosted
version: "2.11.0"
async_tools:
dependency: transitive
dependency: "direct dev"
description:
path: "../../async_tools"
relative: true
source: path
version: "1.0.0"
name: async_tools
sha256: "972f68ab663724d86260a31e363c1355ff493308441b872bf4e7b8adc67c832c"
url: "https://pub.dev"
source: hosted
version: "0.1.0"
bloc:
dependency: transitive
description:
@ -24,13 +25,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "8.1.4"
bloc_tools:
bloc_advanced_tools:
dependency: transitive
description:
path: "../../bloc_tools"
relative: true
source: path
version: "1.0.0"
name: bloc_advanced_tools
sha256: bc0e1d5c26ae7df011464ab6abc2134dcfb668952acc87359abc7457cab091dd
url: "https://pub.dev"
source: hosted
version: "0.1.0"
boolean_selector:
dependency: transitive
description:
@ -145,14 +147,6 @@ packages:
description: flutter
source: sdk
version: "0.0.0"
flutter_lints:
dependency: "direct dev"
description:
name: flutter_lints
sha256: "9e8c3858111da373efc5aa341de011d9bd23e2c5c5e0c62bccf32438e192d7b1"
url: "https://pub.dev"
source: hosted
version: "3.0.2"
flutter_test:
dependency: "direct dev"
description: flutter
@ -221,14 +215,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "2.0.1"
lints:
dependency: transitive
lint_hard:
dependency: "direct dev"
description:
name: lints
sha256: cbf8d4b858bb0134ef3ef87841abdf8d63bfc255c266b7bf6b39daa1085c4290
name: lint_hard
sha256: "44d15ec309b1a8e1aff99069df9dcb1597f49d5f588f32811ca28fb7b38c32fe"
url: "https://pub.dev"
source: hosted
version: "3.0.0"
version: "4.0.0"
loggy:
dependency: transitive
description:
@ -261,13 +255,6 @@ packages:
url: "https://pub.dev"
source: hosted
version: "1.11.0"
mutex:
dependency: "direct dev"
description:
path: "../../mutex"
relative: true
source: path
version: "3.1.0"
path:
dependency: transitive
description:
@ -447,7 +434,7 @@ packages:
path: "../../../../veilid/veilid-flutter"
relative: true
source: path
version: "0.3.1"
version: "0.3.2"
veilid_support:
dependency: "direct main"
description:
@ -455,6 +442,13 @@ packages:
relative: true
source: path
version: "1.0.2+0"
veilid_test:
dependency: "direct dev"
description:
path: "../../../../veilid/veilid-flutter/packages/veilid_test"
relative: true
source: path
version: "0.1.0"
vm_service:
dependency: transitive
description:

View File

@ -1,90 +1,27 @@
name: example
description: "Veilid Support Example"
# The following line prevents the package from being accidentally published to
# pub.dev using `flutter pub publish`. This is preferred for private packages.
publish_to: 'none' # Remove this line if you wish to publish to pub.dev
# The following defines the version and build number for your application.
# A version number is three numbers separated by dots, like 1.2.43
# followed by an optional build number separated by a +.
# Both the version and the builder number may be overridden in flutter
# build by specifying --build-name and --build-number, respectively.
# In Android, build-name is used as versionName while build-number used as versionCode.
# Read more about Android versioning at https://developer.android.com/studio/publish/versioning
# In iOS, build-name is used as CFBundleShortVersionString while build-number is used as CFBundleVersion.
# Read more about iOS versioning at
# https://developer.apple.com/library/archive/documentation/General/Reference/InfoPlistKeyReference/Articles/CoreFoundationKeys.html
# In Windows, build-name is used as the major, minor, and patch parts
# of the product and file versions while build-number is used as the build suffix.
version: 1.0.0+1
environment:
sdk: '>=3.3.4 <4.0.0'
# Dependencies specify other packages that your package needs in order to work.
# To automatically upgrade your package dependencies to the latest versions
# consider running `flutter pub upgrade --major-versions`. Alternatively,
# dependencies can be manually updated by changing the version numbers below to
# the latest version available on pub.dev. To see which dependencies have newer
# versions available, run `flutter pub outdated`.
dependencies:
cupertino_icons: ^1.0.6
flutter:
sdk: flutter
# The following adds the Cupertino Icons font to your application.
# Use with the CupertinoIcons class for iOS style icons.
cupertino_icons: ^1.0.6
veilid_support:
path: ../
dev_dependencies:
async_tools: ^0.1.0
flutter_test:
sdk: flutter
integration_test:
sdk: flutter
flutter_lints: ^3.0.1
mutex:
path: ../../mutex
lint_hard: ^4.0.0
veilid_test:
path: ../../../../veilid/veilid-flutter/packages/veilid_test
# For information on the generic Dart part of this file, see the
# following page: https://dart.dev/tools/pub/pubspec
# The following section is specific to Flutter packages.
flutter:
# The following line ensures that the Material Icons font is
# included with your application, so that you can use the icons in
# the material Icons class.
uses-material-design: true
# To add assets to your application, add an assets section, like this:
# assets:
# - images/a_dot_burr.jpeg
# - images/a_dot_ham.jpeg
# An image asset can refer to one or more resolution-specific "variants", see
# https://flutter.dev/assets-and-images/#resolution-aware
# For details regarding adding assets from package dependencies, see
# https://flutter.dev/assets-and-images/#from-packages
# To add custom fonts to your application, add a fonts section here,
# in this "flutter" section. Each entry in this list should have a
# "family" key with the font family name, and a "fonts" key with a
# list giving the asset and other descriptors for the font. For
# example:
# fonts:
# - family: Schyler
# fonts:
# - asset: fonts/Schyler-Regular.ttf
# - asset: fonts/Schyler-Italic.ttf
# style: italic
# - family: Trajan Pro
# fonts:
# - asset: fonts/TrajanPro.ttf
# - asset: fonts/TrajanPro_Bold.ttf
# weight: 700
#
# For details regarding fonts from package dependencies,
# see https://flutter.dev/custom-fonts/#from-packages

View File

@ -57,6 +57,7 @@ class DHTRecord {
DHTRecordCrypto get crypto => _crypto;
OwnedDHTRecordPointer get ownedDHTRecordPointer =>
OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!);
bool get isOpen => _open;
Future<void> close() async {
if (!_open) {

View File

@ -2,10 +2,10 @@ import 'dart:async';
import 'dart:math';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import '../../../../veilid_support.dart';
@ -179,6 +179,13 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
_singleton = globalPool;
}
static Future<void> close() async {
if (_singleton != null) {
_singleton!._routingContext.close();
_singleton = null;
}
}
Veilid get veilid => _veilid;
void log(String message) {
@ -191,8 +198,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
required DHTSchema schema,
KeyPair? writer,
TypedKey? parent}) async {
assert(_mutex.isLocked, 'should be locked here');
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
// Create the record
final recordDescriptor = await dhtctx.createDHTRecord(schema);
@ -225,8 +233,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
required TypedKey recordKey,
KeyPair? writer,
TypedKey? parent}) async {
assert(_mutex.isLocked, 'should be locked here');
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
log('openDHTRecord: debugName=$debugName key=$recordKey');
// If we are opening a key that already exists
@ -303,8 +312,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
// Collect all dependencies (including the record itself)
// in reverse (bottom-up/delete order)
List<TypedKey> _collectChildrenInner(TypedKey recordKey) {
assert(_mutex.isLocked, 'should be locked here');
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
final allDeps = <TypedKey>[];
final currentDeps = [recordKey];
while (currentDeps.isNotEmpty) {
@ -318,16 +328,18 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
return allDeps.reversedView;
}
void _debugPrintChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
String _debugChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
allDeps ??= _collectChildrenInner(recordKey);
// ignore: avoid_print
print('Parent: $recordKey (${_state.debugNames[recordKey.toString()]})');
var out =
'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n';
for (final dep in allDeps) {
if (dep != recordKey) {
// ignore: avoid_print
print(' Child: $dep (${_state.debugNames[dep.toString()]})');
out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n';
}
}
return out;
}
Future<void> _deleteRecordInner(TypedKey recordKey) async {
@ -343,8 +355,8 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
final allDeps = _collectChildrenInner(recordKey);
if (allDeps.singleOrNull != recordKey) {
_debugPrintChildren(recordKey, allDeps: allDeps);
assert(false, 'must delete children first');
final dbgstr = _debugChildren(recordKey, allDeps: allDeps);
throw StateError('must delete children first: $dbgstr');
}
final ori = _opened[recordKey];
@ -359,7 +371,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
void _validateParentInner(TypedKey? parent, TypedKey child) {
assert(_mutex.isLocked, 'should be locked here');
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
final childJson = child.toJson();
final existingParent = _state.parentByChild[childJson];
@ -379,7 +393,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
Future<void> _addDependencyInner(TypedKey? parent, TypedKey child,
{required String debugName}) async {
assert(_mutex.isLocked, 'should be locked here');
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
if (parent == null) {
if (_state.rootRecords.contains(child)) {
// Dependency already added
@ -404,8 +420,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
Future<void> _removeDependenciesInner(List<TypedKey> childList) async {
assert(_mutex.isLocked, 'should be locked here');
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
var state = _state;
for (final child in childList) {
@ -442,7 +459,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
///////////////////////////////////////////////////////////////////////
/// Create a root DHTRecord that has no dependent records
Future<DHTRecord> create({
Future<DHTRecord> createRecord({
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
@ -479,7 +496,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
});
/// Open a DHTRecord readonly
Future<DHTRecord> openRead(TypedKey recordKey,
Future<DHTRecord> openRecordRead(TypedKey recordKey,
{required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
@ -508,7 +525,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
});
/// Open a DHTRecord writable
Future<DHTRecord> openWrite(
Future<DHTRecord> openRecordWrite(
TypedKey recordKey,
KeyPair writer, {
required String debugName,
@ -548,7 +565,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
/// This is primarily used for backing up private content on to the DHT
/// to synchronizing it between devices. Because it is 'owned', the correct
/// parent must be specified.
Future<DHTRecord> openOwned(
Future<DHTRecord> openRecordOwned(
OwnedDHTRecordPointer ownedDHTRecordPointer, {
required String debugName,
required TypedKey parent,
@ -556,7 +573,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
int defaultSubkey = 0,
DHTRecordCrypto? crypto,
}) =>
openWrite(
openRecordWrite(
ownedDHTRecordPointer.recordKey,
ownedDHTRecordPointer.owner,
debugName: debugName,

View File

@ -1,8 +1,8 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:collection/collection.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart';
@ -43,7 +43,7 @@ class DHTShortArray {
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]);
dhtRecord = await pool.create(
dhtRecord = await pool.createRecord(
debugName: debugName,
parent: parent,
routingContext: routingContext,
@ -52,7 +52,7 @@ class DHTShortArray {
writer: smplWriter);
} else {
final schema = DHTSchema.dflt(oCnt: stride + 1);
dhtRecord = await pool.create(
dhtRecord = await pool.createRecord(
debugName: debugName,
parent: parent,
routingContext: routingContext,
@ -80,7 +80,7 @@ class DHTShortArray {
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
final dhtRecord = await DHTRecordPool.instance.openRead(headRecordKey,
final dhtRecord = await DHTRecordPool.instance.openRecordRead(headRecordKey,
debugName: debugName,
parent: parent,
routingContext: routingContext,
@ -103,7 +103,7 @@ class DHTShortArray {
TypedKey? parent,
DHTRecordCrypto? crypto,
}) async {
final dhtRecord = await DHTRecordPool.instance.openWrite(
final dhtRecord = await DHTRecordPool.instance.openRecordWrite(
headRecordKey, writer,
debugName: debugName,
parent: parent,
@ -144,21 +144,31 @@ class DHTShortArray {
/// Get the record pointer foir this shortarray
OwnedDHTRecordPointer get recordPointer => _head.recordPointer;
/// Check if the shortarray is open
bool get isOpen => _head.isOpen;
/// Free all resources for the DHTShortArray
Future<void> close() async {
if (!isOpen) {
return;
}
await _watchController?.close();
_watchController = null;
await _head.close();
}
/// Free all resources for the DHTShortArray and delete it from the DHT
/// Will wait until the short array is closed to delete it
Future<void> delete() async {
await close();
await DHTRecordPool.instance.deleteRecord(recordKey);
await _head.delete();
}
/// Runs a closure that guarantees the DHTShortArray
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(DHTShortArray) scopeFunction) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
try {
return await scopeFunction(this);
} finally {
@ -171,6 +181,10 @@ class DHTShortArray {
/// uncaught exception is thrown
Future<T> deleteScope<T>(
Future<T> Function(DHTShortArray) scopeFunction) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
try {
final out = await scopeFunction(this);
await close();
@ -182,11 +196,16 @@ class DHTShortArray {
}
/// Runs a closure allowing read-only access to the shortarray
Future<T?> operate<T>(Future<T?> Function(DHTShortArrayRead) closure) async =>
_head.operate((head) async {
final reader = _DHTShortArrayRead._(head);
return closure(reader);
});
Future<T?> operate<T>(Future<T?> Function(DHTShortArrayRead) closure) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
return _head.operate((head) async {
final reader = _DHTShortArrayRead._(head);
return closure(reader);
});
}
/// Runs a closure allowing read-write access to the shortarray
/// Makes only one attempt to consistently write the changes to the DHT
@ -206,38 +225,48 @@ class DHTShortArray {
/// succeeded, returning false will trigger another eventual consistency
/// attempt.
Future<void> operateWriteEventual(
Future<bool> Function(DHTShortArrayWrite) closure,
{Duration? timeout}) async =>
_head.operateWriteEventual((head) async {
final writer = _DHTShortArrayWrite._(head);
return closure(writer);
}, timeout: timeout);
Future<bool> Function(DHTShortArrayWrite) closure,
{Duration? timeout}) async {
if (!isOpen) {
throw StateError('short array is not open"');
}
return _head.operateWriteEventual((head) async {
final writer = _DHTShortArrayWrite._(head);
return closure(writer);
}, timeout: timeout);
}
/// Listen to and any all changes to the structure of this short array
/// regardless of where the changes are coming from
Future<StreamSubscription<void>> listen(
void Function() onChanged,
) =>
_listenMutex.protect(() async {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
// Cancel watches of head record
await _head.cancelWatch();
_watchController = null;
}));
});
) {
if (!isOpen) {
throw StateError('short array is not open"');
}
// Start watching head record
await _head.watch();
}
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
});
return _listenMutex.protect(() async {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
// Cancel watches of head record
await _head.cancelWatch();
_watchController = null;
}));
});
// Start watching head record
await _head.watch();
}
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
});
}
////////////////////////////////////////////////////////////////
// Fields

View File

@ -2,7 +2,7 @@ import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:bloc_tools/bloc_tools.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';

View File

@ -50,13 +50,30 @@ class _DHTShortArrayHead {
TypedKey get recordKey => _headRecord.key;
OwnedDHTRecordPointer get recordPointer => _headRecord.ownedDHTRecordPointer;
int get length => _index.length;
bool get isOpen => _headRecord.isOpen;
Future<void> close() async {
final futures = <Future<void>>[_headRecord.close()];
for (final lr in _linkedRecords) {
futures.add(lr.close());
}
await Future.wait(futures);
await _headMutex.protect(() async {
if (!isOpen) {
return;
}
final futures = <Future<void>>[_headRecord.close()];
for (final lr in _linkedRecords) {
futures.add(lr.close());
}
await Future.wait(futures);
});
}
Future<void> delete() async {
await _headMutex.protect(() async {
final pool = DHTRecordPool.instance;
final futures = <Future<void>>[pool.deleteRecord(_headRecord.key)];
for (final lr in _linkedRecords) {
futures.add(pool.deleteRecord(lr.key));
}
await Future.wait(futures);
});
}
Future<T> operate<T>(Future<T> Function(_DHTShortArrayHead) closure) async =>
@ -270,7 +287,7 @@ class _DHTShortArrayHead {
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: _stride)]);
final dhtRecord = await pool.create(
final dhtRecord = await pool.createRecord(
debugName: '${_headRecord.debugName}_linked_$recordNumber',
parent: parent,
routingContext: routingContext,
@ -292,14 +309,14 @@ class _DHTShortArrayHead {
TypedKey recordKey, int recordNumber) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecordPool.instance.openWrite(
? await DHTRecordPool.instance.openRecordWrite(
recordKey,
writer,
debugName: '${_headRecord.debugName}_linked_$recordNumber',
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
)
: await DHTRecordPool.instance.openRead(
: await DHTRecordPool.instance.openRecordRead(
recordKey,
debugName: '${_headRecord.debugName}_linked_$recordNumber',
parent: _headRecord.key,

View File

@ -3,7 +3,6 @@ import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
import 'table_db.dart';

View File

@ -134,7 +134,7 @@ extension IdentityMasterExtension on IdentityMaster {
identityRecordKey.kind, identitySecret);
late final List<AccountRecordInfo> accountRecordInfo;
await (await pool.openRead(identityRecordKey,
await (await pool.openRecordRead(identityRecordKey,
debugName:
'IdentityMaster::readAccountsFromIdentity::IdentityRecord',
parent: masterRecordKey,
@ -168,14 +168,14 @@ extension IdentityMasterExtension on IdentityMaster {
// Open identity key for writing
veilidLoggy.debug('Opening identity record');
return (await pool.openWrite(
return (await pool.openRecordWrite(
identityRecordKey, identityWriter(identitySecret),
debugName: 'IdentityMaster::addAccountToIdentity::IdentityRecord',
parent: masterRecordKey))
.scope((identityRec) async {
// Create new account to insert into identity
veilidLoggy.debug('Creating new account');
return (await pool.create(
return (await pool.createRecord(
debugName: 'IdentityMaster::addAccountToIdentity::AccountRecord',
parent: identityRec.key))
.deleteScope((accountRec) async {
@ -231,14 +231,14 @@ class IdentityMasterWithSecrets {
// IdentityMaster DHT record is public/unencrypted
veilidLoggy.debug('Creating master identity record');
return (await pool.create(
return (await pool.createRecord(
debugName:
'IdentityMasterWithSecrets::create::IdentityMasterRecord',
crypto: const DHTRecordCryptoPublic()))
.deleteScope((masterRec) async {
veilidLoggy.debug('Creating identity record');
// Identity record is private
return (await pool.create(
return (await pool.createRecord(
debugName: 'IdentityMasterWithSecrets::create::IdentityRecord',
parent: masterRec.key))
.scope((identityRec) async {
@ -296,7 +296,7 @@ Future<IdentityMaster> openIdentityMaster(
final pool = DHTRecordPool.instance;
// IdentityMaster DHT record is public/unencrypted
return (await pool.openRead(identityMasterRecordKey,
return (await pool.openRecordRead(identityMasterRecordKey,
debugName:
'IdentityMaster::openIdentityMaster::IdentityMasterRecord'))
.deleteScope((masterRec) async {

View File

@ -3,7 +3,6 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import 'table_db.dart';

View File

@ -36,10 +36,11 @@ packages:
async_tools:
dependency: "direct main"
description:
path: "../async_tools"
relative: true
source: path
version: "1.0.0"
name: async_tools
sha256: "972f68ab663724d86260a31e363c1355ff493308441b872bf4e7b8adc67c832c"
url: "https://pub.dev"
source: hosted
version: "0.1.0"
bloc:
dependency: "direct main"
description:
@ -48,13 +49,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "8.1.3"
bloc_tools:
bloc_advanced_tools:
dependency: "direct main"
description:
path: "../bloc_tools"
relative: true
source: path
version: "1.0.0"
name: bloc_advanced_tools
sha256: bc0e1d5c26ae7df011464ab6abc2134dcfb668952acc87359abc7457cab091dd
url: "https://pub.dev"
source: hosted
version: "0.1.0"
boolean_selector:
dependency: transitive
description:
@ -409,13 +411,6 @@ packages:
url: "https://pub.dev"
source: hosted
version: "1.0.5"
mutex:
dependency: "direct main"
description:
path: "../mutex"
relative: true
source: path
version: "3.1.0"
node_preamble:
dependency: transitive
description:

View File

@ -7,11 +7,9 @@ environment:
sdk: '>=3.2.0 <4.0.0'
dependencies:
async_tools:
path: ../async_tools
async_tools: ^0.1.0
bloc: ^8.1.3
bloc_tools:
path: ../bloc_tools
bloc_advanced_tools: ^0.1.0
collection: ^1.18.0
equatable: ^2.0.5
fast_immutable_collections: ^10.1.1
@ -19,8 +17,6 @@ dependencies:
json_annotation: ^4.8.1
loggy: ^2.0.3
meta: ^1.11.0
mutex:
path: ../mutex
protobuf: ^3.1.0
veilid:

View File

@ -60,10 +60,10 @@ packages:
async_tools:
dependency: "direct main"
description:
path: "packages/async_tools"
path: "../dart_async_tools"
relative: true
source: path
version: "1.0.0"
version: "0.1.0"
awesome_extensions:
dependency: "direct main"
description:
@ -96,13 +96,13 @@ packages:
url: "https://pub.dev"
source: hosted
version: "8.1.4"
bloc_tools:
bloc_advanced_tools:
dependency: "direct main"
description:
path: "packages/bloc_tools"
path: "../bloc_advanced_tools"
relative: true
source: path
version: "1.0.0"
version: "0.1.0"
blurry_modal_progress_hud:
dependency: "direct main"
description:
@ -830,13 +830,6 @@ packages:
url: "https://pub.dev"
source: hosted
version: "2.9.1"
mutex:
dependency: "direct main"
description:
path: "packages/mutex"
relative: true
source: path
version: "3.1.0"
nested:
dependency: transitive
description:
@ -1504,7 +1497,7 @@ packages:
path: "../veilid/veilid-flutter"
relative: true
source: path
version: "0.3.1"
version: "0.3.2"
veilid_support:
dependency: "direct main"
description:

View File

@ -11,14 +11,12 @@ dependencies:
animated_theme_switcher: ^2.0.10
ansicolor: ^2.0.2
archive: ^3.4.10
async_tools:
path: packages/async_tools
async_tools: ^0.1.0
awesome_extensions: ^2.0.14
badges: ^3.1.2
basic_utils: ^5.7.0
bloc: ^8.1.4
bloc_tools:
path: packages/bloc_tools
bloc_advanced_tools: ^0.1.0
blurry_modal_progress_hud: ^1.1.1
change_case: ^2.0.1
charcode: ^1.3.1
@ -55,8 +53,6 @@ dependencies:
meta: ^1.11.0
mobile_scanner: ^4.0.1
motion_toast: ^2.9.1
mutex:
path: packages/mutex
pasteboard: ^0.2.0
path: ^1.9.0
path_provider: ^2.1.3
@ -87,6 +83,12 @@ dependencies:
xterm: ^4.0.0
zxing2: ^0.2.3
dependency_overrides:
async_tools:
path: ../dart_async_tools
bloc_advanced_tools:
path: ../bloc_advanced_tools
dev_dependencies:
build_runner: ^2.4.9
freezed: ^2.5.2