From 634543910b4919ebdf69c9c2d44494f98d9d7e46 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 11 Feb 2024 00:29:58 -0500 Subject: [PATCH] messages work --- lib/chat/cubits/messages_cubit.dart | 1 + lib/chat/views/chat_component.dart | 167 +++--- .../active_conversation_messages_cubit.dart | 108 ++++ .../cubits/active_conversations_cubit.dart | 57 +- lib/chat_list/cubits/cubits.dart | 1 + .../chat_single_contact_item_widget.dart | 15 +- .../chat_single_contact_list_widget.dart | 2 - lib/contacts/cubits/conversation_cubit.dart | 1 + lib/tools/bloc_map_cubit.dart | 10 +- .../lib/src => lib/tools}/future_cubit.dart | 3 +- lib/tools/stream_wrapper_cubit.dart | 2 +- lib/tools/tools.dart | 4 +- lib/tools/transformer_cubit.dart | 21 + lib/tools/widget_helpers.dart | 2 +- .../views/signal_strength_meter.dart | 1 + packages/async_tools/.gitignore | 7 + packages/async_tools/analysis_options.yaml | 15 + .../example/async_tools_example.dart | 6 + packages/async_tools/lib/async_tools.dart | 6 + .../lib/src/async_tag_lock.dart | 24 +- .../lib/src/async_value.dart | 17 + .../lib/src/async_value.freezed.dart | 0 .../async_tools/lib/src/single_async.dart | 19 + packages/async_tools/pubspec.yaml | 18 + .../async_tools/test/async_tools_test.dart | 16 + packages/mutex/.gitignore | 16 + packages/mutex/CHANGELOG.md | 50 ++ packages/mutex/LICENSE | 24 + packages/mutex/README.md | 191 +++++++ packages/mutex/analysis_options.yaml | 15 + packages/mutex/example/example.dart | 114 ++++ packages/mutex/lib/mutex.dart | 11 + packages/mutex/lib/src/mutex.dart | 89 ++++ packages/mutex/lib/src/read_write_mutex.dart | 304 +++++++++++ packages/mutex/pubspec.yaml | 12 + .../mutex/test/mutex_multiple_read_test.dart | 102 ++++ packages/mutex/test/mutex_readwrite_test.dart | 486 ++++++++++++++++++ packages/mutex/test/mutex_test.dart | 341 ++++++++++++ .../lib/dht_support/src/dht_record_cubit.dart | 1 + .../lib/dht_support/src/dht_record_pool.dart | 1 + .../src/dht_short_array_cubit.dart | 1 + .../lib/src/async_table_db_backed_cubit.dart | 3 +- .../veilid_support/lib/veilid_support.dart | 3 - packages/veilid_support/pubspec.lock | 18 +- packages/veilid_support/pubspec.yaml | 3 +- pubspec.lock | 16 +- pubspec.yaml | 5 +- 47 files changed, 2206 insertions(+), 123 deletions(-) create mode 100644 lib/chat_list/cubits/active_conversation_messages_cubit.dart rename {packages/veilid_support/lib/src => lib/tools}/future_cubit.dart (90%) create mode 100644 lib/tools/transformer_cubit.dart create mode 100644 packages/async_tools/.gitignore create mode 100644 packages/async_tools/analysis_options.yaml create mode 100644 packages/async_tools/example/async_tools_example.dart create mode 100644 packages/async_tools/lib/async_tools.dart rename packages/{veilid_support => async_tools}/lib/src/async_tag_lock.dart (69%) rename packages/{veilid_support => async_tools}/lib/src/async_value.dart (90%) rename packages/{veilid_support => async_tools}/lib/src/async_value.freezed.dart (100%) create mode 100644 packages/async_tools/lib/src/single_async.dart create mode 100644 packages/async_tools/pubspec.yaml create mode 100644 packages/async_tools/test/async_tools_test.dart create mode 100644 packages/mutex/.gitignore create mode 100644 packages/mutex/CHANGELOG.md create mode 100644 packages/mutex/LICENSE create mode 100644 packages/mutex/README.md create mode 100644 packages/mutex/analysis_options.yaml create mode 100644 packages/mutex/example/example.dart create mode 100644 packages/mutex/lib/mutex.dart create mode 100644 packages/mutex/lib/src/mutex.dart create mode 100644 packages/mutex/lib/src/read_write_mutex.dart create mode 100644 packages/mutex/pubspec.yaml create mode 100644 packages/mutex/test/mutex_multiple_read_test.dart create mode 100644 packages/mutex/test/mutex_readwrite_test.dart create mode 100644 packages/mutex/test/mutex_test.dart diff --git a/lib/chat/cubits/messages_cubit.dart b/lib/chat/cubits/messages_cubit.dart index 76acfdc..b1846cf 100644 --- a/lib/chat/cubits/messages_cubit.dart +++ b/lib/chat/cubits/messages_cubit.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:veilid_support/veilid_support.dart'; diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index 71cec22..9076a49 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -1,12 +1,17 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:awesome_extensions/awesome_extensions.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:flutter/foundation.dart'; import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:flutter_chat_types/flutter_chat_types.dart' as types; import 'package:flutter_chat_ui/flutter_chat_ui.dart'; +import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; +import '../../chat_list/chat_list.dart'; import '../../contacts/contacts.dart'; import '../../proto/proto.dart' as proto; import '../../theme/theme.dart'; @@ -14,10 +19,19 @@ import '../../tools/tools.dart'; import '../chat.dart'; class ChatComponent extends StatefulWidget { - const ChatComponent({super.key}); + const ChatComponent({required this.remoteConversationRecordKey, super.key}); @override ChatComponentState createState() => ChatComponentState(); + + final TypedKey remoteConversationRecordKey; + + @override + void debugFillProperties(DiagnosticPropertiesBuilder properties) { + super.debugFillProperties(properties); + properties.add(DiagnosticsProperty( + 'chatRemoteConversationKey', remoteConversationRecordKey)); + } } class ChatComponentState extends State { @@ -113,99 +127,92 @@ class ChatComponentState extends State { final textTheme = Theme.of(context).textTheme; final chatTheme = makeChatTheme(scale, textTheme); - final activeChatCubit = context.watch(); final contactListCubit = context.watch(); - final activeAccountInfo = context.watch(); - final activeChatContactKey = activeChatCubit.state; - if (activeChatContactKey == null) { - return const NoConversationWidget(); - } return contactListCubit.state.builder((context, contactList) { // Get active chat contact profile - final activeChatContactIdx = contactList.indexWhere( - (c) => activeChatContactKey == c.remoteConversationRecordKey); + final activeChatContactIdx = contactList.indexWhere((c) => + widget.remoteConversationRecordKey == c.remoteConversationRecordKey); late final proto.Contact activeChatContact; if (activeChatContactIdx == -1) { + // xxx: error, no contact for conversation... return const NoConversationWidget(); } else { activeChatContact = contactList[activeChatContactIdx]; } final contactName = activeChatContact.editedProfile.name; - // final protoMessages = - // ref.watch(activeConversationMessagesProvider).asData?.value; - // if (protoMessages == null) { - // return waitingPage(context); - // } - // final messages = []; - // for (final protoMessage in protoMessages) { - // final message = protoMessageToMessage(protoMessage); - // messages.insert(0, message); - // } + final messages = context.select>?>( + (x) => x.state[widget.remoteConversationRecordKey]); + if (messages == null) { + // xxx: error, no messages for conversation... + return const NoConversationWidget(); + } + return messages.builder((context, protoMessages) { + final messages = []; + for (final protoMessage in protoMessages) { + final message = protoMessageToMessage(protoMessage); + messages.insert(0, message); + } + return DefaultTextStyle( + style: textTheme.bodySmall!, + child: Align( + alignment: AlignmentDirectional.centerEnd, + child: Stack( + children: [ + Column( + children: [ + Container( + height: 48, + decoration: BoxDecoration( + color: scale.primaryScale.subtleBorder, + ), + child: Row(children: [ + Align( + alignment: AlignmentDirectional.centerStart, + child: Padding( + padding: const EdgeInsetsDirectional.fromSTEB( + 16, 0, 16, 0), + child: Text(contactName, + textAlign: TextAlign.start, + style: textTheme.titleMedium), + )), + const Spacer(), + IconButton( + icon: const Icon(Icons.close), + onPressed: () async { + context + .read() + .setActiveChat(null); + }).paddingLTRB(16, 0, 16, 0) + ]), + ), + Expanded( + child: DecoratedBox( + decoration: const BoxDecoration(), + child: Chat( + theme: chatTheme, + messages: messages, + //onAttachmentPressed: _handleAttachmentPressed, + //onMessageTap: _handleMessageTap, + //onPreviewDataFetched: _handlePreviewDataFetched, - return BlocProvider( - create: (context) => MessagesCubit( - activeAccountInfo: activeAccountInfo, - remoteIdentityPublicKey: activeChatContact.identityPublicKey, localConversationRecordKey: activeChatContact.localConversationRecordKey, localMessagesRecordKey: activeChatContact., + onSendPressed: (message) { + unawaited(_handleSendPressed(message)); + }, + //showUserAvatars: false, + //showUserNames: true, + user: _localUser, + ), + ), + ), + ], + ), + ], ), - child: DefaultTextStyle( - style: textTheme.bodySmall!, - child: Align( - alignment: AlignmentDirectional.centerEnd, - child: Stack( - children: [ - Column( - children: [ - Container( - height: 48, - decoration: BoxDecoration( - color: scale.primaryScale.subtleBorder, - ), - child: Row(children: [ - Align( - alignment: AlignmentDirectional.centerStart, - child: Padding( - padding: const EdgeInsetsDirectional.fromSTEB( - 16, 0, 16, 0), - child: Text(contactName, - textAlign: TextAlign.start, - style: textTheme.titleMedium), - )), - const Spacer(), - IconButton( - icon: const Icon(Icons.close), - onPressed: () async { - context - .read() - .setActiveChat(null); - }).paddingLTRB(16, 0, 16, 0) - ]), - ), - Expanded( - child: DecoratedBox( - decoration: const BoxDecoration(), - child: Chat( - theme: chatTheme, - messages: messages, - //onAttachmentPressed: _handleAttachmentPressed, - //onMessageTap: _handleMessageTap, - //onPreviewDataFetched: _handlePreviewDataFetched, - - onSendPressed: (message) { - unawaited(_handleSendPressed(message)); - }, - //showUserAvatars: false, - //showUserNames: true, - user: _localUser, - ), - ), - ), - ], - ), - ], - ), - ))); + )); + }); }); } } diff --git a/lib/chat_list/cubits/active_conversation_messages_cubit.dart b/lib/chat_list/cubits/active_conversation_messages_cubit.dart new file mode 100644 index 0000000..d7db7a5 --- /dev/null +++ b/lib/chat_list/cubits/active_conversation_messages_cubit.dart @@ -0,0 +1,108 @@ +import 'dart:async'; + +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:veilid_support/veilid_support.dart'; + +import '../../account_manager/account_manager.dart'; +import '../../chat/chat.dart'; +import '../../proto/proto.dart' as proto; +import '../../tools/tools.dart'; +import 'active_conversations_cubit.dart'; + +class ActiveConversationMessagesCubit extends BlocMapCubit>, MessagesCubit> { + ActiveConversationMessagesCubit({ + required ActiveAccountInfo activeAccountInfo, + required Stream stream, + }) : _activeAccountInfo = activeAccountInfo { + // + _subscription = stream.listen(updateMessageCubits); + } + + @override + Future close() async { + await _subscription.cancel(); + await super.close(); + } + + // Determine which conversations have been added, deleted, or changed + // and update this cubit's state appropriately + void updateMessageCubits(ActiveConversationsBlocMapState newInputState) { + // Use a singlefuture here to ensure we get dont lose any updates + // If the ActiveConversations stream gives us an update while we are + // still processing the last update, the most recent input state will + // be saved and processed eventually. + singleFuture(this, () async { + var newActiveConversationsState = newInputState; + var done = false; + while (!done) { + // Build lists of changes to conversations + final deleted = _lastActiveConversationsState.keys + .where((k) => !newActiveConversationsState.containsKey(k)); + final added = newActiveConversationsState.keys + .where((k) => !_lastActiveConversationsState.containsKey(k)); + final changed = _lastActiveConversationsState.where((k, v) { + final nv = newActiveConversationsState[k]; + if (nv == null) { + return false; + } + return nv != v; + }).keys; + + // Process all deleted conversations + for (final d in deleted) { + await remove(d); + } + + // Process all added and changed conversations + for (final a in [...added, ...changed]) { + final av = newActiveConversationsState[a]!; + await av.when( + data: (state) => _addConversationMessages( + contact: state.contact, + localConversation: state.localConversation, + remoteConversation: state.remoteConversation), + loading: () => addState(a, const AsyncValue.loading()), + error: (error, stackTrace) => + addState(a, AsyncValue.error(error, stackTrace))); + } + + // Keep this state for the next time + _lastActiveConversationsState = newActiveConversationsState; + + // See if there's another state change to process + final next = _nextActiveConversationsState; + _nextActiveConversationsState = null; + if (next != null) { + newActiveConversationsState = next; + } else { + done = true; + } + } + }, onBusy: () { + // Keep this state until we process again + _nextActiveConversationsState = newInputState; + }); + } + + Future _addConversationMessages( + {required proto.Contact contact, + required proto.Conversation localConversation, + required proto.Conversation remoteConversation}) async => + add(() => MapEntry( + contact.remoteConversationRecordKey, + MessagesCubit( + activeAccountInfo: _activeAccountInfo, + remoteIdentityPublicKey: contact.identityPublicKey, + localConversationRecordKey: contact.localConversationRecordKey, + remoteConversationRecordKey: contact.remoteConversationRecordKey, + localMessagesRecordKey: localConversation.messages, + remoteMessagesRecordKey: remoteConversation.messages))); + + final ActiveAccountInfo _activeAccountInfo; + ActiveConversationsBlocMapState _lastActiveConversationsState = + ActiveConversationsBlocMapState(); + ActiveConversationsBlocMapState? _nextActiveConversationsState; + late final StreamSubscription _subscription; +} diff --git a/lib/chat_list/cubits/active_conversations_cubit.dart b/lib/chat_list/cubits/active_conversations_cubit.dart index 3df703f..0045f42 100644 --- a/lib/chat_list/cubits/active_conversations_cubit.dart +++ b/lib/chat_list/cubits/active_conversations_cubit.dart @@ -1,3 +1,6 @@ +import 'package:async_tools/async_tools.dart'; +import 'package:equatable/equatable.dart'; +import 'package:meta/meta.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; @@ -5,20 +8,60 @@ import '../../contacts/contacts.dart'; import '../../proto/proto.dart' as proto; import '../../tools/tools.dart'; +@immutable +class ActiveConversationState extends Equatable { + const ActiveConversationState({ + required this.contact, + required this.localConversation, + required this.remoteConversation, + }); + + final proto.Contact contact; + final proto.Conversation localConversation; + final proto.Conversation remoteConversation; + + @override + List get props => [contact, localConversation, remoteConversation]; +} + +typedef ActiveConversationCubit = TransformerCubit< + AsyncValue, AsyncValue>; + +typedef ActiveConversationsBlocMapState + = BlocMapState>; + +// Map of remoteConversationRecordKey to ActiveConversationCubit +// Wraps a conversation cubit to only expose completely built conversations class ActiveConversationsCubit extends BlocMapCubit, ConversationCubit> { + AsyncValue, ActiveConversationCubit> { ActiveConversationsCubit({required ActiveAccountInfo activeAccountInfo}) : _activeAccountInfo = activeAccountInfo; + // Add an active conversation to be tracked for changes Future addConversation({required proto.Contact contact}) async => add(() => MapEntry( contact.remoteConversationRecordKey, - ConversationCubit( - activeAccountInfo: _activeAccountInfo, - remoteIdentityPublicKey: contact.identityPublicKey, - localConversationRecordKey: contact.localConversationRecordKey, - remoteConversationRecordKey: contact.remoteConversationRecordKey, - ))); + TransformerCubit( + ConversationCubit( + activeAccountInfo: _activeAccountInfo, + remoteIdentityPublicKey: contact.identityPublicKey, + localConversationRecordKey: contact.localConversationRecordKey, + remoteConversationRecordKey: + contact.remoteConversationRecordKey, + ), + // Transformer that only passes through completed conversations + // along with the contact that corresponds to the completed + // conversation + transform: (avstate) => avstate.when( + data: (data) => (data.localConversation == null || + data.remoteConversation == null) + ? const AsyncValue.loading() + : AsyncValue.data(ActiveConversationState( + contact: contact, + localConversation: data.localConversation!, + remoteConversation: data.remoteConversation!)), + loading: AsyncValue.loading, + error: AsyncValue.error)))); final ActiveAccountInfo _activeAccountInfo; } diff --git a/lib/chat_list/cubits/cubits.dart b/lib/chat_list/cubits/cubits.dart index 1b77c00..474f5cb 100644 --- a/lib/chat_list/cubits/cubits.dart +++ b/lib/chat_list/cubits/cubits.dart @@ -1,2 +1,3 @@ +export 'active_conversation_messages_cubit.dart'; export 'active_conversations_cubit.dart'; export 'chat_list_cubit.dart'; diff --git a/lib/chat_list/views/chat_single_contact_item_widget.dart b/lib/chat_list/views/chat_single_contact_item_widget.dart index 37d1e33..8a3fda7 100644 --- a/lib/chat_list/views/chat_single_contact_item_widget.dart +++ b/lib/chat_list/views/chat_single_contact_item_widget.dart @@ -1,3 +1,4 @@ +import 'package:async_tools/async_tools.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; @@ -24,7 +25,9 @@ class ChatSingleContactItemWidget extends StatelessWidget { final scale = theme.extension()!; final activeChatCubit = context.watch(); - final activeConversationsCubit = context.watch(); + // final activeConversation = context.select(); + // final activeConversationMessagesCubit = + // context.watch(); xxx does this need to be here? final remoteConversationRecordKey = proto.TypedKeyProto.fromProto(_contact.remoteConversationRecordKey); @@ -69,9 +72,13 @@ class ChatSingleContactItemWidget extends StatelessWidget { // component is not dragged. child: ListTile( onTap: () { - xxx deal with async - activeConversationsCubit.addConversation(contact: _contact); - activeChatCubit.setActiveChat(remoteConversationRecordKey); + final activeConversationsCubit = + context.read(); + singleFuture(activeChatCubit, () async { + await activeConversationsCubit.addConversation( + contact: _contact); + activeChatCubit.setActiveChat(remoteConversationRecordKey); + }); }, title: Text(_contact.editedProfile.name), diff --git a/lib/chat_list/views/chat_single_contact_list_widget.dart b/lib/chat_list/views/chat_single_contact_list_widget.dart index 3d3f3eb..4a31e2d 100644 --- a/lib/chat_list/views/chat_single_contact_list_widget.dart +++ b/lib/chat_list/views/chat_single_contact_list_widget.dart @@ -10,8 +10,6 @@ import '../../proto/proto.dart' as proto; import '../../theme/theme.dart'; import '../../tools/tools.dart'; import '../chat_list.dart'; -import 'chat_single_contact_item_widget.dart'; -import 'empty_chat_list_widget.dart'; class ChatSingleContactListWidget extends StatelessWidget { const ChatSingleContactListWidget({super.key}); diff --git a/lib/contacts/cubits/conversation_cubit.dart b/lib/contacts/cubits/conversation_cubit.dart index 9590a08..c927a1b 100644 --- a/lib/contacts/cubits/conversation_cubit.dart +++ b/lib/contacts/cubits/conversation_cubit.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'dart:convert'; +import 'package:async_tools/async_tools.dart'; import 'package:equatable/equatable.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:meta/meta.dart'; diff --git a/lib/tools/bloc_map_cubit.dart b/lib/tools/bloc_map_cubit.dart index 1604807..04d7693 100644 --- a/lib/tools/bloc_map_cubit.dart +++ b/lib/tools/bloc_map_cubit.dart @@ -1,8 +1,8 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:veilid_support/veilid_support.dart'; typedef BlocMapState = IMap; @@ -48,6 +48,14 @@ abstract class BlocMapCubit> }); } + Future addState(K key, S value) => + _tagLock.protect(key, closure: () async { + // Remove entry with the same key if it exists + await _internalRemove(key); + + emit(state.add(key, value)); + }); + Future _internalRemove(K key) async { final sub = _entries.remove(key); if (sub != null) { diff --git a/packages/veilid_support/lib/src/future_cubit.dart b/lib/tools/future_cubit.dart similarity index 90% rename from packages/veilid_support/lib/src/future_cubit.dart rename to lib/tools/future_cubit.dart index 851c422..77d8387 100644 --- a/packages/veilid_support/lib/src/future_cubit.dart +++ b/lib/tools/future_cubit.dart @@ -1,9 +1,8 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; -import '../veilid_support.dart'; - abstract class FutureCubit extends Cubit> { FutureCubit(Future fut) : super(const AsyncValue.loading()) { unawaited(fut.then((value) { diff --git a/lib/tools/stream_wrapper_cubit.dart b/lib/tools/stream_wrapper_cubit.dart index 569304d..732695b 100644 --- a/lib/tools/stream_wrapper_cubit.dart +++ b/lib/tools/stream_wrapper_cubit.dart @@ -1,7 +1,7 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; -import 'package:veilid_support/veilid_support.dart'; abstract class StreamWrapperCubit extends Cubit> { StreamWrapperCubit(Stream stream, {State? defaultState}) diff --git a/lib/tools/tools.dart b/lib/tools/tools.dart index fd18fd9..35792b8 100644 --- a/lib/tools/tools.dart +++ b/lib/tools/tools.dart @@ -1,7 +1,8 @@ export 'animations.dart'; -export 'cubit_map.dart'; +export 'bloc_map_cubit.dart'; export 'enter_password.dart'; export 'enter_pin.dart'; +export 'future_cubit.dart'; export 'loggy.dart'; export 'phono_byte.dart'; export 'responsive.dart'; @@ -10,5 +11,6 @@ export 'shared_preferences.dart'; export 'state_logger.dart'; export 'stream_listenable.dart'; export 'stream_wrapper_cubit.dart'; +export 'transformer_cubit.dart'; export 'widget_helpers.dart'; export 'window_control.dart'; diff --git a/lib/tools/transformer_cubit.dart b/lib/tools/transformer_cubit.dart new file mode 100644 index 0000000..e9aa9b6 --- /dev/null +++ b/lib/tools/transformer_cubit.dart @@ -0,0 +1,21 @@ +import 'dart:async'; + +import 'package:bloc/bloc.dart'; + +class TransformerCubit extends Cubit { + TransformerCubit(this.input, {required this.transform}) + : super(transform(input.state)) { + _subscription = input.stream.listen((event) => emit(transform(event))); + } + + @override + Future close() async { + await _subscription.cancel(); + await input.close(); + await super.close(); + } + + Cubit input; + T Function(S) transform; + late final StreamSubscription _subscription; +} diff --git a/lib/tools/widget_helpers.dart b/lib/tools/widget_helpers.dart index 7812c5e..9eddc83 100644 --- a/lib/tools/widget_helpers.dart +++ b/lib/tools/widget_helpers.dart @@ -1,3 +1,4 @@ +import 'package:async_tools/async_tools.dart'; import 'package:awesome_extensions/awesome_extensions.dart'; import 'package:blurry_modal_progress_hud/blurry_modal_progress_hud.dart'; import 'package:flutter/material.dart'; @@ -6,7 +7,6 @@ import 'package:flutter_spinkit/flutter_spinkit.dart'; import 'package:flutter_translate/flutter_translate.dart'; import 'package:motion_toast/motion_toast.dart'; import 'package:quickalert/quickalert.dart'; -import 'package:veilid_support/veilid_support.dart'; import '../theme/theme.dart'; diff --git a/lib/veilid_processor/views/signal_strength_meter.dart b/lib/veilid_processor/views/signal_strength_meter.dart index 9b189f4..1b94f78 100644 --- a/lib/veilid_processor/views/signal_strength_meter.dart +++ b/lib/veilid_processor/views/signal_strength_meter.dart @@ -1,3 +1,4 @@ +import 'package:async_tools/async_tools.dart'; import 'package:awesome_extensions/awesome_extensions.dart'; import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; diff --git a/packages/async_tools/.gitignore b/packages/async_tools/.gitignore new file mode 100644 index 0000000..3cceda5 --- /dev/null +++ b/packages/async_tools/.gitignore @@ -0,0 +1,7 @@ +# https://dart.dev/guides/libraries/private-files +# Created by `dart pub` +.dart_tool/ + +# Avoid committing pubspec.lock for library packages; see +# https://dart.dev/guides/libraries/private-files#pubspeclock. +pubspec.lock diff --git a/packages/async_tools/analysis_options.yaml b/packages/async_tools/analysis_options.yaml new file mode 100644 index 0000000..e1620f7 --- /dev/null +++ b/packages/async_tools/analysis_options.yaml @@ -0,0 +1,15 @@ +include: package:lint_hard/all.yaml +analyzer: + errors: + invalid_annotation_target: ignore + exclude: + - '**/*.g.dart' + - '**/*.freezed.dart' + - '**/*.pb.dart' + - '**/*.pbenum.dart' + - '**/*.pbjson.dart' + - '**/*.pbserver.dart' +linter: + rules: + unawaited_futures: true + avoid_positional_boolean_parameters: false \ No newline at end of file diff --git a/packages/async_tools/example/async_tools_example.dart b/packages/async_tools/example/async_tools_example.dart new file mode 100644 index 0000000..33a41ab --- /dev/null +++ b/packages/async_tools/example/async_tools_example.dart @@ -0,0 +1,6 @@ +// import 'package:async_tools/async_tools.dart'; + +// void main() { +// var awesome = Awesome(); +// print('awesome: ${awesome.isAwesome}'); +// } diff --git a/packages/async_tools/lib/async_tools.dart b/packages/async_tools/lib/async_tools.dart new file mode 100644 index 0000000..4dbe72e --- /dev/null +++ b/packages/async_tools/lib/async_tools.dart @@ -0,0 +1,6 @@ +/// Async Tools +library; + +export 'src/async_tag_lock.dart'; +export 'src/async_value.dart'; +export 'src/single_async.dart'; diff --git a/packages/veilid_support/lib/src/async_tag_lock.dart b/packages/async_tools/lib/src/async_tag_lock.dart similarity index 69% rename from packages/veilid_support/lib/src/async_tag_lock.dart rename to packages/async_tools/lib/src/async_tag_lock.dart index 0187827..a0f6117 100644 --- a/packages/veilid_support/lib/src/async_tag_lock.dart +++ b/packages/async_tools/lib/src/async_tag_lock.dart @@ -2,8 +2,8 @@ import 'package:mutex/mutex.dart'; class _AsyncTagLockEntry { _AsyncTagLockEntry() - : mutex = Mutex(), - waitingCount = 1; + : mutex = Mutex.locked(), + waitingCount = 0; // Mutex mutex; int waitingCount; @@ -16,18 +16,28 @@ class AsyncTagLock { Future lockTag(T tag) async { await _tableLock.protect(() async { - var lockEntry = _locks[tag]; + final lockEntry = _locks[tag]; if (lockEntry != null) { lockEntry.waitingCount++; + await lockEntry.mutex.acquire(); + lockEntry.waitingCount--; } else { - lockEntry = _locks[tag] = _AsyncTagLockEntry(); + _locks[tag] = _AsyncTagLockEntry(); } - - await lockEntry.mutex.acquire(); - lockEntry.waitingCount--; }); } + bool isLocked(T tag) => _locks.containsKey(tag); + + bool tryLock(T tag) { + final lockEntry = _locks[tag]; + if (lockEntry != null) { + return false; + } + _locks[tag] = _AsyncTagLockEntry(); + return true; + } + void unlockTag(T tag) { final lockEntry = _locks[tag]!; if (lockEntry.waitingCount == 0) { diff --git a/packages/veilid_support/lib/src/async_value.dart b/packages/async_tools/lib/src/async_value.dart similarity index 90% rename from packages/veilid_support/lib/src/async_value.dart rename to packages/async_tools/lib/src/async_value.dart index 8f02478..aee070d 100644 --- a/packages/veilid_support/lib/src/async_value.dart +++ b/packages/async_tools/lib/src/async_value.dart @@ -169,4 +169,21 @@ abstract class AsyncValue with _$AsyncValue { loading: () => const AsyncValue.loading(), error: AsyncValue.error, ); + + /// Check two AsyncData instances for equality + bool equalsData(AsyncValue other, + {required bool Function(T a, T b) equals}) => + other.when( + data: (nd) => when( + data: (d) => equals(d, nd), + loading: () => true, + error: (_e, _st) => true), + loading: () => when( + data: (_) => true, + loading: () => false, + error: (_e, _st) => true), + error: (ne, nst) => when( + data: (_) => true, + loading: () => true, + error: (e, st) => e != ne || st != nst)); } diff --git a/packages/veilid_support/lib/src/async_value.freezed.dart b/packages/async_tools/lib/src/async_value.freezed.dart similarity index 100% rename from packages/veilid_support/lib/src/async_value.freezed.dart rename to packages/async_tools/lib/src/async_value.freezed.dart diff --git a/packages/async_tools/lib/src/single_async.dart b/packages/async_tools/lib/src/single_async.dart new file mode 100644 index 0000000..aee9bc2 --- /dev/null +++ b/packages/async_tools/lib/src/single_async.dart @@ -0,0 +1,19 @@ +import 'dart:async'; + +import 'async_tag_lock.dart'; + +AsyncTagLock _keys = AsyncTagLock(); + +void singleFuture(Object tag, Future Function() closure, + {void Function()? onBusy}) { + if (!_keys.tryLock(tag)) { + if (onBusy != null) { + onBusy(); + } + return; + } + unawaited(() async { + await closure(); + _keys.unlockTag(tag); + }()); +} diff --git a/packages/async_tools/pubspec.yaml b/packages/async_tools/pubspec.yaml new file mode 100644 index 0000000..a495170 --- /dev/null +++ b/packages/async_tools/pubspec.yaml @@ -0,0 +1,18 @@ +name: async_tools +description: Useful data structures and tools for async/Future code +version: 1.0.0 +publish_to: none + +environment: + sdk: ^3.2.6 + +# Add regular dependencies here. +dependencies: + freezed_annotation: ^2.2.0 + mutex: + path: ../mutex + +dev_dependencies: + freezed: ^2.3.5 + lint_hard: ^4.0.0 + test: ^1.24.0 diff --git a/packages/async_tools/test/async_tools_test.dart b/packages/async_tools/test/async_tools_test.dart new file mode 100644 index 0000000..0d54797 --- /dev/null +++ b/packages/async_tools/test/async_tools_test.dart @@ -0,0 +1,16 @@ +// import 'package:async_tools/async_tools.dart'; +// import 'package:test/test.dart'; + +// void main() { +// group('A group of tests', () { +// final awesome = Awesome(); + +// setUp(() { +// // Additional setup goes here. +// }); + +// test('First Test', () { +// expect(awesome.isAwesome, isTrue); +// }); +// }); +// } diff --git a/packages/mutex/.gitignore b/packages/mutex/.gitignore new file mode 100644 index 0000000..2ca4cae --- /dev/null +++ b/packages/mutex/.gitignore @@ -0,0 +1,16 @@ +# Files and directories created by pub +.packages +.pub/ +.dart_tool/ +build/ +packages +pubspec.lock + +# Directory created by dartdoc +doc/api/ + +# JetBrains IDEs +.idea/ +*.iml +*.ipr +*.iws diff --git a/packages/mutex/CHANGELOG.md b/packages/mutex/CHANGELOG.md new file mode 100644 index 0000000..9115310 --- /dev/null +++ b/packages/mutex/CHANGELOG.md @@ -0,0 +1,50 @@ +## 3.1.0 + +- Increased minimum Dart SDK to 2.15.0 for `unawaited` function. +- Added development dependencies lints ^2.1.1 and pana: ^0.21.37. +- Fixed code to remove lint warnings. + +## 3.0.1 + +- Fixed bug with new read mutexes preventing a write mutex from being acquired. + +## 3.0.0 + +- BREAKING CHANGE: critical section functions must return a Future. + - This is unlikely to affect real-world code, since only functions + containing asynchronous code would be critical. +- Protect method returns Future to the value from the critical section. + +## 2.0.0 + +- Null safety release. + +## 2.0.0-nullsafety.0 + +- Pre-release version: updated library to null safety (Non-nullable by default). +- Removed support for Dart 1.x. + +## 1.1.0 + +- Added protect, protectRead and protectWrite convenience methods. +- Improved tests to not depend on timing. + +## 1.0.3 + +- Added an example. + +## 1.0.2 + +- Code clean up to satisfy pana 0.13.2 health checks. + +## 1.0.1 + +- Fixed dartanalyzer warnings. + +## 1.0.0 + +- Updated the upper bound of the SDK constraint to <3.0.0. + +## 0.0.1 + +- Initial version diff --git a/packages/mutex/LICENSE b/packages/mutex/LICENSE new file mode 100644 index 0000000..eb40cc8 --- /dev/null +++ b/packages/mutex/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2016, Hoylen Sue. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/packages/mutex/README.md b/packages/mutex/README.md new file mode 100644 index 0000000..1df6b41 --- /dev/null +++ b/packages/mutex/README.md @@ -0,0 +1,191 @@ +# mutex + +A library for creating locks to ensure mutual exclusion when +running critical sections of code. + +## Purpose + +Mutexes can be used to protect critical sections of code to prevent +race conditions. + +Although Dart uses a single thread of execution, race conditions +can still occur when asynchronous operations are used inside +critical sections. For example, + + x = 42; + synchronousOperations(); // this does not modify x + assert(x == 42); // x will NOT have changed + + y = 42; // a variable that other asynchronous code can modify + await asynchronousOperations(); // this does NOT modify y, but... + // There is NO GUARANTEE other async code didn't run and change it! + assert(y == 42 || y != 42); // WARNING: y might have changed + +An example is when Dart is used to implement a server-side Web server +that updates a database (assuming database transactions are not being +used). The update involves querying the database, performing +calculations on those retrieved values, and then updating the database +with the result. You don't want the database to be changed by +"something else" while performing the calculations, since the results +you would write will not incorporate those other changes. That +"something else" could be the same Web server handling another request +in parallel. + +This package provides a normal mutex and a read-write mutex. + +## Mutex + +A mutex guarantees at most only one lock can exist at any one time. + +If the lock has already been acquired, attempts to acquire another +lock will be blocked until the lock has been released. + +```dart +import 'package:mutex/mutex.dart'; + +... + +final m = Mutex(); +``` + +Acquiring the lock before running the critical section of code, +and then releasing the lock. + +```dart +await m.acquire(); +// No other lock can be acquired until the lock is released + +try { + // critical section with asynchronous code + await ... +} finally { + m.release(); +} +``` + +### protect + +The following code uses the _protect_ convenience method to do the +same thing as the above code. Use the convenence method whenever +possible, since it ensures the lock will always be released. + +```dart +await m.protect(() async { + // critical section +}); +``` + +If the critial section returns a Future to a value, the _protect_ +convenience method will return a Future to that value. + +```dart +final result = await m.protect(() async { + // critical section + return valueFromCriticalSection; +}); +// result contains the valueFromCriticalSection +``` + + +## Read-write mutex + +A read-write mutex allows multiple _reads locks_ to be exist +simultaneously, but at most only one _write lock_ can exist at any one +time. A _write lock_ and any _read locks_ cannot both exist together +at the same time. + +If there is one or more _read locks_, attempts to acquire a _write +lock_ will be blocked until all the _read locks_ have been +released. But attempts to acquire more _read locks_ will not be +blocked. If there is a _write lock_, attempts to acquire any lock +(read or write) will be blocked until that _write lock_ is released. + +A read-write mutex can also be described as a single-writer mutex, +multiple-reader mutex, or a reentrant lock. + +```dart +import 'package:mutex/mutex.dart'; + +... + +final m = ReadWriteMutex(); +``` + +Acquiring a write lock: + + await m.acquireWrite(); + // No other locks (read or write) can be acquired until released + + try { + // critical write section with asynchronous code + await ... + } finally { + m.release(); + } + +Acquiring a read lock: + + await m.acquireRead(); + // No write lock can be acquired until all read locks are released, + // but additional read locks can be acquired. + + try { + // critical read section with asynchronous code + await ... + } finally { + m.release(); + } + +### protectWrite and protectRead + +The following code uses the _protectWrite_ and _protectRead_ +convenience methods to do the same thing as the above code. Use the +convenence method whenever possible, since it ensures the lock will +always be released. + +```dart +await m.protectWrite(() async { + // critical write section +}); + +await m.protectRead(() async { + // critical read section +}); +``` + +If the critial section returns a Future to a value, these convenience +methods will return a Future to that value. + +```dart +final result1 await m.protectWrite(() async { + // critical write section + return valueFromCritialSection1; +}); +// result1 contains the valueFromCriticalSection1 + +final result2 = await m.protectRead(() async { + // critical read section + return valueFromCritialSection2; +}); +// result2 contains the valueFromCriticalSection2 +``` + +## When mutual exclusion is not needed + +The critical section should always contain some asynchronous code. If +the critical section only contains synchronous code, there is no need +to put it in a critical section. In Dart, synchronous code cannot be +interrupted, so there is no need to protect it using mutual exclusion. + +Also, if the critical section does not involve data or shared +resources that can be accessed by other asynchronous code, it also +does not need to be protected. For example, if it only uses local +variables that other asynchronous code won't have access to: while the +other asynchronous code could run, it won't be able to make unexpected +changes to the local variables it can't access. + +## Features and bugs + +Please file feature requests and bugs at the [issue tracker][tracker]. + +[tracker]: https://github.com/hoylen/dart-mutex/issues diff --git a/packages/mutex/analysis_options.yaml b/packages/mutex/analysis_options.yaml new file mode 100644 index 0000000..e1620f7 --- /dev/null +++ b/packages/mutex/analysis_options.yaml @@ -0,0 +1,15 @@ +include: package:lint_hard/all.yaml +analyzer: + errors: + invalid_annotation_target: ignore + exclude: + - '**/*.g.dart' + - '**/*.freezed.dart' + - '**/*.pb.dart' + - '**/*.pbenum.dart' + - '**/*.pbjson.dart' + - '**/*.pbserver.dart' +linter: + rules: + unawaited_futures: true + avoid_positional_boolean_parameters: false \ No newline at end of file diff --git a/packages/mutex/example/example.dart b/packages/mutex/example/example.dart new file mode 100644 index 0000000..c13b007 --- /dev/null +++ b/packages/mutex/example/example.dart @@ -0,0 +1,114 @@ +// Mutex example. +// +// This example demonstrates why a mutex is needed. + +import 'dart:async'; +import 'dart:math'; +import 'package:mutex/mutex.dart'; + +//---------------------------------------------------------------- +// Random asynchronous delays to try and simulate race conditions. + +const _maxDelay = 500; // milliseconds + +final _random = Random(); + +Future randomDelay() async { + await Future.delayed( + Duration(milliseconds: _random.nextInt(_maxDelay))); +} + +//---------------------------------------------------------------- +/// Account balance. +/// +/// The classical example of a race condition is when a bank account is updated +/// by different simultaneous operations. + +int balance = 0; + +//---------------------------------------------------------------- +/// Deposit without using mutex. + +Future unsafeUpdate(int id, int depositAmount) async { + // Random delay before updating starts + await randomDelay(); + + // Add the deposit to the balance. But this operation is not atomic if + // there are asynchronous operations in it (as simulated by the randomDelay). + + final oldBalance = balance; + await randomDelay(); + balance = oldBalance + depositAmount; + + print(' [$id] added $depositAmount to $oldBalance -> $balance'); +} + +//---------------------------------------------------------------- +/// Deposit using mutex. + +Mutex m = Mutex(); + +Future safeUpdate(int id, int depositAmount) async { + // Random delay before updating starts + await randomDelay(); + + // Acquire the mutex before running the critical section of code + + await m.protect(() async { + // critical section + + // This is the same as the unsafe update. But since it is performed only + // when the mutex is acquired, it is safe: no other safe update can happen + // until this mutex is released. + + final oldBalance = balance; + await randomDelay(); + balance = oldBalance + depositAmount; + + // end of critical section + + print(' [$id] added $depositAmount to $oldBalance -> $balance'); + }); +} + +//---------------------------------------------------------------- +/// Make a series of deposits and see if the final balance is correct. + +Future makeDeposits({bool safe = true}) async { + print(safe ? 'Using mutex:' : 'Not using mutex:'); + + const numberDeposits = 10; + const amount = 10; + + balance = 0; + + // Create a set of operations, each attempting to deposit the same amount + // into the account. + + final operations = >[]; + for (var x = 0; x < numberDeposits; x++) { + final f = (safe) ? safeUpdate(x, amount) : unsafeUpdate(x, amount); + operations.add(f); + } + + // Wait for all the deposit operations to finish + + await Future.wait(operations); + + // Check if all of the operations succeeded + + final expected = numberDeposits * amount; + if (balance != expected) { + print('Error: deposits were lost (final balance $balance != $expected)'); + } else { + print('Success: no deposits were lost'); + } +} + +//---------------------------------------------------------------- + +void main() async { + await makeDeposits(safe: false); + print(''); + await makeDeposits(safe: true); +} diff --git a/packages/mutex/lib/mutex.dart b/packages/mutex/lib/mutex.dart new file mode 100644 index 0000000..ba224d1 --- /dev/null +++ b/packages/mutex/lib/mutex.dart @@ -0,0 +1,11 @@ +// Copyright (c) 2016, Hoylen Sue. All rights reserved. Use of this source code +// is governed by a BSD-style license that can be found in the LICENSE file. + +/// Mutual exclusion. +/// +library mutex; + +import 'dart:async'; + +part 'src/mutex.dart'; +part 'src/read_write_mutex.dart'; diff --git a/packages/mutex/lib/src/mutex.dart b/packages/mutex/lib/src/mutex.dart new file mode 100644 index 0000000..1c9e9ec --- /dev/null +++ b/packages/mutex/lib/src/mutex.dart @@ -0,0 +1,89 @@ +part of mutex; + +/// Mutual exclusion. +/// +/// The [protect] method is a convenience method for acquiring a lock before +/// running critical code, and then releasing the lock afterwards. Using this +/// convenience method will ensure the lock is always released after use. +/// +/// Usage: +/// +/// m = Mutex(); +/// +/// await m.protect(() async { +/// // critical section +/// }); +/// +/// Alternatively, a lock can be explicitly acquired and managed. In this +/// situation, the program is responsible for releasing the lock after it +/// have been used. Failure to release the lock will prevent other code for +/// ever acquiring the lock. +/// +/// m = Mutex(); +/// +/// await m.acquire(); +/// try { +/// // critical section +/// } +/// finally { +/// m.release(); +/// } + +class Mutex { + //================================================================ + // Constructors + Mutex() : _rwMutex = ReadWriteMutex(); + Mutex.locked() : _rwMutex = ReadWriteMutex.writeLocked(); + + // Implemented as a ReadWriteMutex that is used only with write locks. + final ReadWriteMutex _rwMutex; + + /// Indicates if a lock has been acquired and not released. + bool get isLocked => _rwMutex.isLocked; + + /// Acquire a lock + /// + /// Returns a future that will be completed when the lock has been acquired. + /// + /// Consider using the convenience method [protect], otherwise the caller + /// is responsible for making sure the lock is released after it is no longer + /// needed. Failure to release the lock means no other code can acquire the + /// lock. + + Future acquire() => _rwMutex.acquireWrite(); + + /// Release a lock. + /// + /// Release a lock that has been acquired. + + void release() => _rwMutex.release(); + + /// Convenience method for protecting a function with a lock. + /// + /// This method guarantees a lock is always acquired before invoking the + /// [criticalSection] function. It also guarantees the lock is always + /// released. + /// + /// A critical section should always contain asynchronous code, since purely + /// synchronous code does not need to be protected inside a critical section. + /// Therefore, the critical section is a function that returns a _Future_. + /// If the critical section does not need to return a value, it should be + /// defined as returning `Future`. + /// + /// Returns a _Future_ whose value is the value of the _Future_ returned by + /// the critical section. + /// + /// An exception is thrown if the critical section throws an exception, + /// or an exception is thrown while waiting for the _Future_ returned by + /// the critical section to complete. The lock is released, when those + /// exceptions occur. + + Future protect(Future Function() criticalSection) async { + await acquire(); + try { + return await criticalSection(); + } finally { + release(); + } + } +} diff --git a/packages/mutex/lib/src/read_write_mutex.dart b/packages/mutex/lib/src/read_write_mutex.dart new file mode 100644 index 0000000..8a5c12e --- /dev/null +++ b/packages/mutex/lib/src/read_write_mutex.dart @@ -0,0 +1,304 @@ +part of mutex; + +//################################################################ +/// Internal representation of a request for a lock. +/// +/// This is instantiated for each acquire and, if necessary, it is added +/// to the waiting queue. + +class _ReadWriteMutexRequest { + /// Internal constructor. + /// + /// The [isRead] indicates if this is a request for a read lock (true) or a + /// request for a write lock (false). + + _ReadWriteMutexRequest({required this.isRead}); + + /// Indicates if this is a read or write lock. + + final bool isRead; // true = read lock requested; false = write lock requested + + /// The job's completer. + /// + /// This [Completer] will complete when the job has acquired the lock. + + final Completer completer = Completer(); +} + +//################################################################ +/// Mutual exclusion that supports read and write locks. +/// +/// Multiple read locks can be simultaneously acquired, but at most only +/// one write lock can be acquired at any one time. +/// +/// **Protecting critical code** +/// +/// The [protectWrite] and [protectRead] are convenience methods for acquiring +/// locks and releasing them. Using them will ensure the locks are always +/// released after use. +/// +/// Create the mutex: +/// +/// m = ReadWriteMutex(); +/// +/// Code protected by a write lock: +/// +/// await m.protectWrite(() { +/// // critical write section +/// }); +/// +/// Other code can be protected by a read lock: +/// +/// await m.protectRead(() { +/// // critical read section +/// }); +/// +/// +/// **Explicitly managing locks** +/// +/// Alternatively, the locks can be explicitly acquired and managed. In this +/// situation, the program is responsible for releasing the locks after they +/// have been used. Failure to release the lock will prevent other code for +/// ever acquiring a lock. +/// +/// Create the mutex: +/// +/// m = ReadWriteMutex(); +/// +/// Some code can acquire a write lock: +/// +/// await m.acquireWrite(); +/// try { +/// // critical write section +/// assert(m.isWriteLocked); +/// } finally { +/// m.release(); +/// } +/// +/// Other code can acquire a read lock. +/// +/// await m.acquireRead(); +/// try { +/// // critical read section +/// assert(m.isReadLocked); +/// } finally { +/// m.release(); +/// } +/// +/// The current implementation lets locks be acquired in first-in-first-out +/// order. This ensures there will not be any lock starvation, which can +/// happen if some locks are prioritised over others. Submit a feature +/// request issue, if there is a need for another scheduling algorithm. + +class ReadWriteMutex { + //================================================================ + // Constructors + ReadWriteMutex(); + ReadWriteMutex.writeLocked() : _state = -1; + ReadWriteMutex.readLocked(int? count) : _state = count ?? 1 { + assert(_state > 0, "can't have a negative read lock count"); + } + + //================================================================ + // Members + + /// List of requests waiting for a lock on this mutex. + + final _waiting = <_ReadWriteMutexRequest>[]; + + /// State of the mutex + + int _state = 0; // -1 = write lock, +ve = number of read locks; 0 = no lock + + //================================================================ + // Methods + + /// Indicates if a lock (read or write) has been acquired and not released. + bool get isLocked => _state != 0; + + /// Indicates if a write lock has been acquired and not released. + bool get isWriteLocked => _state == -1; + + /// Indicates if one or more read locks has been acquired and not released. + bool get isReadLocked => 0 < _state; + + /// Indicates the number of waiters on this mutex + int get waiters => _waiting.length; + + /// Acquire a read lock + /// + /// Returns a future that will be completed when the lock has been acquired. + /// + /// A read lock can not be acquired when there is a write lock on the mutex. + /// But it can be acquired if there are other read locks. + /// + /// Consider using the convenience method [protectRead], otherwise the caller + /// is responsible for making sure the lock is released after it is no longer + /// needed. Failure to release the lock means no other code can acquire a + /// write lock. + + Future acquireRead() => _acquire(isRead: true); + + /// Acquire a write lock + /// + /// Returns a future that will be completed when the lock has been acquired. + /// + /// A write lock can only be acquired when there are no other locks (neither + /// read locks nor write locks) on the mutex. + /// + /// Consider using the convenience method [protectWrite], otherwise the caller + /// is responsible for making sure the lock is released after it is no longer + /// needed. Failure to release the lock means no other code can acquire the + /// lock (neither a read lock or a write lock). + + Future acquireWrite() => _acquire(isRead: false); + + /// Release a lock. + /// + /// Release the lock that was previously acquired. + /// + /// When the lock is released, locks waiting to be acquired can be acquired + /// depending on the type of lock waiting and if other locks have been + /// acquired. + /// + /// A [StateError] is thrown if the mutex does not currently have a lock on + /// it. + + void release() { + if (_state == -1) { + // Write lock released + _state = 0; + } else if (0 < _state) { + // Read lock released + _state--; + } else if (_state == 0) { + throw StateError('no lock to release'); + } else { + assert(false, 'invalid state'); + } + + // If there are jobs waiting and the next job can acquire the mutex, + // let it acquire it and remove it from the queue. + // + // This is a while loop, because there could be multiple jobs on the + // queue waiting for a read-only mutex. So they can all be allowed to run. + + while (_waiting.isNotEmpty) { + final nextJob = _waiting.first; + if (_jobAcquired(nextJob)) { + _waiting.removeAt(0); + } else { + // The next job cannot acquire the mutex. This only occurs when: the + // the currently running job has a write mutex (_state == -1); or the + // next job wants write mutex and there is a job currently running + // (regardless of what type of mutex it has acquired). + assert(_state < 0 || !nextJob.isRead, + 'unexpected: next job cannot be acquired'); + break; // no more can be removed from the queue + } + } + } + + /// Convenience method for protecting a function with a read lock. + /// + /// This method guarantees a read lock is always acquired before invoking the + /// [criticalSection] function. It also guarantees the lock is always + /// released. + /// + /// A critical section should always contain asynchronous code, since purely + /// synchronous code does not need to be protected inside a critical section. + /// Therefore, the critical section is a function that returns a _Future_. + /// If the critical section does not need to return a value, it should be + /// defined as returning `Future`. + /// + /// Returns a _Future_ whose value is the value of the _Future_ returned by + /// the critical section. + /// + /// An exception is thrown if the critical section throws an exception, + /// or an exception is thrown while waiting for the _Future_ returned by + /// the critical section to complete. The lock is released, when those + /// exceptions occur. + + Future protectRead(Future Function() criticalSection) async { + await acquireRead(); + try { + return await criticalSection(); + } finally { + release(); + } + } + + /// Convenience method for protecting a function with a write lock. + /// + /// This method guarantees a write lock is always acquired before invoking the + /// [criticalSection] function. It also guarantees the lock is always + /// released. + /// + /// A critical section should always contain asynchronous code, since purely + /// synchronous code does not need to be protected inside a critical section. + /// Therefore, the critical section is a function that returns a _Future_. + /// If the critical section does not need to return a value, it should be + /// defined as returning `Future`. + /// + /// Returns a _Future_ whose value is the value of the _Future_ returned by + /// the critical section. + /// + /// An exception is thrown if the critical section throws an exception, + /// or an exception is thrown while waiting for the _Future_ returned by + /// the critical section to complete. The lock is released, when those + /// exceptions occur. + + Future protectWrite(Future Function() criticalSection) async { + await acquireWrite(); + try { + return await criticalSection(); + } finally { + release(); + } + } + + /// Internal acquire method. + /// + /// Used to acquire a read lock (when [isRead] is true) or a write lock + /// (when [isRead] is false). + /// + /// Returns a Future that completes when the lock has been acquired. + + Future _acquire({required bool isRead}) { + final newJob = _ReadWriteMutexRequest(isRead: isRead); + + if (_waiting.isNotEmpty || !_jobAcquired(newJob)) { + // This new job cannot run yet. There are either other jobs already + // waiting, or there are no waiting jobs but this job cannot start + // because the mutex is currently acquired (namely, either this new job + // or the currently running job is read-write). + // + // Add the new job to the end of the queue. + + _waiting.add(newJob); + } + + return newJob.completer.future; + } + + /// Determine if the [job] can now acquire the lock. + /// + /// If it can acquire the lock, the job's completer is completed, the + /// state updated, and true is returned. If not, false is returned. + /// + /// A job for a read lock can only be acquired if there are no other locks + /// or there are read lock(s). A job for a write lock can only be acquired + /// if there are no other locks. + + bool _jobAcquired(_ReadWriteMutexRequest job) { + assert(-1 <= _state, 'must not be write locked'); + if (_state == 0 || (0 < _state && job.isRead)) { + // Can acquire + _state = (job.isRead) ? (_state + 1) : -1; + job.completer.complete(); + return true; + } else { + return false; + } + } +} diff --git a/packages/mutex/pubspec.yaml b/packages/mutex/pubspec.yaml new file mode 100644 index 0000000..52d86a1 --- /dev/null +++ b/packages/mutex/pubspec.yaml @@ -0,0 +1,12 @@ +name: mutex +description: Mutual exclusion with implementation of normal and read-write mutex +version: 3.1.0 +publish_to: none + +environment: + sdk: '>=2.15.0 <4.0.0' + +dev_dependencies: + lint_hard: ^4.0.0 + pana: ^0.21.37 + test: ^1.16.3 diff --git a/packages/mutex/test/mutex_multiple_read_test.dart b/packages/mutex/test/mutex_multiple_read_test.dart new file mode 100644 index 0000000..5ed8345 --- /dev/null +++ b/packages/mutex/test/mutex_multiple_read_test.dart @@ -0,0 +1,102 @@ +// Test contributed by "Cat-sushi" +// + +import 'dart:async'; +// import 'dart:io'; + +import 'package:mutex/mutex.dart'; +import 'package:test/test.dart'; + +//================================================================ +// For debug output +// +// Uncomment the "stdout.write" line in the [debugWrite] method to enable +// debug output. + +int numReadAcquired = 0; +int numReadReleased = 0; + +enum State { waitingToAcquire, acquired, released } + +const stateSymbol = { + State.waitingToAcquire: '?', + State.acquired: '+', + State.released: '-' +}; + +var _outputCount = 0; // to manage line breaks + +void debugOutput(String id, State state) { + debugWrite('$id${stateSymbol[state]} '); + + _outputCount++; + if (_outputCount % 10 == 0) { + debugWrite('\n'); + } +} + +void debugWrite(String str) { + // Uncomment to show what is happening + // stdout.write(str); +} + +//================================================================ + +Future mySleep([int ms = 1000]) async { + await Future.delayed(Duration(milliseconds: ms)); +} + +Future sharedLoop1(ReadWriteMutex mutex, String symbol) async { + while (true) { + debugOutput(symbol, State.waitingToAcquire); + + await mutex.protectRead(() async { + numReadAcquired++; + debugOutput(symbol, State.acquired); + + await mySleep(100); + }); + numReadReleased++; + + debugOutput(symbol, State.released); + } +} + +void main() { + group('exclusive lock tests', () { + test('test1', () async { + const numReadLoops = 5; + + final mutex = ReadWriteMutex(); + + assert(numReadLoops < 26, 'too many read loops for lowercase letters'); + debugWrite('Number of read loops: $numReadLoops\n'); + + for (var x = 0; x < numReadLoops; x++) { + final symbol = String.fromCharCode('a'.codeUnitAt(0) + x); + unawaited(sharedLoop1(mutex, symbol)); + await mySleep(10); + } + + await mySleep(); + + debugWrite('\nAbout to acquireWrite' + ' (reads: acquired=$numReadAcquired released=$numReadReleased' + ' outstanding=${numReadAcquired - numReadReleased})\n'); + _outputCount = 0; // reset line break + + const writeSymbol = 'W'; + + debugOutput(writeSymbol, State.waitingToAcquire); + await mutex.acquireWrite(); + debugOutput(writeSymbol, State.acquired); + mutex.release(); + debugOutput(writeSymbol, State.released); + + debugWrite('\nWrite mutex released\n'); + _outputCount = 0; // reset line break + + expect('a', 'a'); + }); + }); +} diff --git a/packages/mutex/test/mutex_readwrite_test.dart b/packages/mutex/test/mutex_readwrite_test.dart new file mode 100644 index 0000000..310caa1 --- /dev/null +++ b/packages/mutex/test/mutex_readwrite_test.dart @@ -0,0 +1,486 @@ +import 'dart:async'; +import 'package:mutex/mutex.dart'; +import 'package:test/test.dart'; + +//################################################################ + +class RWTester { + int _operation = 0; + final _operationSequences = []; + + /// Execution sequence of the operations done. + /// + /// Each element corresponds to the position of the initial execution + /// order of the read/write operation future. + List get operationSequences => _operationSequences; + + ReadWriteMutex mutex = ReadWriteMutex(); + + /// Set to true to print out read/write to the balance during deposits + static const bool debugOutput = false; + + final DateTime _startTime = DateTime.now(); + + void _debugPrint(String message) { + if (debugOutput) { + final t = DateTime.now().difference(_startTime).inMilliseconds; + // ignore: avoid_print + print('$t: $message'); + } + } + + void reset() { + _operationSequences.clear(); + _debugPrint('reset'); + } + + /// Waits [startDelay] and then invokes critical section with mutex. + /// + /// Writes to [_operationSequences]. If the readwrite locks are respected + /// then the final state of the list will be in ascending order. + Future writing(int startDelay, int sequence, int endDelay) async { + await Future.delayed(Duration(milliseconds: startDelay)); + + await mutex.protectWrite(() async { + final op = ++_operation; + _debugPrint('[$op] write start: <- $_operationSequences'); + final tmp = _operationSequences; + expect(mutex.isWriteLocked, isTrue); + expect(_operationSequences, orderedEquals(tmp)); + // Add the position of operation to the list of operations. + _operationSequences.add(sequence); // add position to list + expect(mutex.isWriteLocked, isTrue); + await Future.delayed(Duration(milliseconds: endDelay)); + _debugPrint('[$op] write finish: -> $_operationSequences'); + }); + } + + /// Waits [startDelay] and then invokes critical section with mutex. + /// + /// + Future reading(int startDelay, int sequence, int endDelay) async { + await Future.delayed(Duration(milliseconds: startDelay)); + + await mutex.protectRead(() async { + final op = ++_operation; + _debugPrint('[$op] read start: <- $_operationSequences'); + expect(mutex.isReadLocked, isTrue); + _operationSequences.add(sequence); // add position to list + await Future.delayed(Duration(milliseconds: endDelay)); + _debugPrint('[$op] read finish: <- $_operationSequences'); + }); + } +} + +//################################################################ + +//---------------------------------------------------------------- + +void main() { + final account = RWTester(); + + setUp(account.reset); + + test('multiple read locks', () async { + await Future.wait([ + account.reading(0, 1, 1000), + account.reading(0, 2, 900), + account.reading(0, 3, 800), + account.reading(0, 4, 700), + account.reading(0, 5, 600), + account.reading(0, 6, 500), + account.reading(0, 7, 400), + account.reading(0, 8, 300), + account.reading(0, 9, 200), + account.reading(0, 10, 100), + ]); + // The first future acquires the lock first and waits the longest to give it + // up. This should however not block any of the other read operations + // as such the reads should finish in ascending orders. + expect( + account.operationSequences, + orderedEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), + ); + }); + + test('multiple write locks', () async { + await Future.wait([ + account.writing(0, 1, 100), + account.writing(0, 2, 100), + account.writing(0, 3, 100), + ]); + // The first future writes first and holds the lock until 100 ms + // Even though the second future starts execution, the lock cannot be + // acquired until it is released by the first future. + // Therefore the sequence of operations will be in ascending order + // of the futures. + expect( + account.operationSequences, + orderedEquals([1, 2, 3]), + ); + }); + + test('acquireWrite() before acquireRead()', () async { + const lockTimeout = Duration(milliseconds: 100); + + final mutex = ReadWriteMutex(); + + await mutex.acquireWrite(); + expect(mutex.isReadLocked, equals(false)); + expect(mutex.isWriteLocked, equals(true)); + + // Since there is a write lock existing, a read lock cannot be acquired. + final readLock = mutex.acquireRead().timeout(lockTimeout); + expect( + () async => readLock, + throwsA(isA()), + ); + }); + + test('acquireRead() before acquireWrite()', () async { + const lockTimeout = Duration(milliseconds: 100); + + final mutex = ReadWriteMutex(); + + await mutex.acquireRead(); + expect(mutex.isReadLocked, equals(true)); + expect(mutex.isWriteLocked, equals(false)); + + // Since there is a read lock existing, a write lock cannot be acquired. + final writeLock = mutex.acquireWrite().timeout(lockTimeout); + expect( + () async => writeLock, + throwsA(isA()), + ); + }); + + test('mixture of read write locks execution order', () async { + await Future.wait([ + account.reading(0, 1, 100), + account.reading(10, 2, 100), + account.reading(20, 3, 100), + account.writing(30, 4, 100), + account.writing(40, 5, 100), + account.writing(50, 6, 100), + ]); + + expect( + account.operationSequences, + orderedEquals([1, 2, 3, 4, 5, 6]), + ); + }); + + group('protectRead', () { + test('lock obtained and released on success', () async { + final m = ReadWriteMutex(); + + await m.protectRead(() async { + // critical section + expect(m.isLocked, isTrue); + }); + expect(m.isLocked, isFalse); + }); + + test('value returned from critical section', () async { + // These are the normal scenario of the critical section running + // successfully. It tests different return types from the + // critical section. + + final m = ReadWriteMutex(); + + // returns Future + await m.protectRead(() async {}); + + // returns Future + final number = await m.protectRead(() async => 42); + expect(number, equals(42)); + + // returns Future completes with value + final optionalNumber = await m.protectRead(() async => 1024); + expect(optionalNumber, equals(1024)); + + // returns Future completes with null + final optionalNumberNull = await m.protectRead(() async => null); + expect(optionalNumberNull, isNull); + + // returns Future + final word = await m.protectRead(() async => 'foobar'); + expect(word, equals('foobar')); + + // returns Future completes with value + final optionalWord = await m.protectRead(() async => 'baz'); + expect(optionalWord, equals('baz')); + + // returns Future completes with null + final optionalWordNull = await m.protectRead(() async => null); + expect(optionalWordNull, isNull); + + expect(m.isLocked, isFalse); + }); + + test('exception in synchronous code', () async { + // Tests what happens when an exception is raised in the **synchronous** + // part of the critical section. + // + // Locks are correctly managed: the lock is obtained before executing + // the critical section, and is released when the exception is thrown + // by the _protect_ method. + // + // The exception is raised when waiting for the Future returned by + // _protect_ to complete. Even though the exception is synchronously + // raised by the critical section, it won't be thrown when _protect_ + // is invoked. The _protect_ method always successfully returns a + // _Future_. + + Future criticalSection() { + final c = Completer()..complete(42); + + // synchronous exception + throw const FormatException('synchronous exception'); + // ignore: dead_code + return c.future; + } + + // Check the criticalSection behaves as expected for the test + + try { + // ignore: unused_local_variable + final resultFuture = criticalSection(); + fail('critical section did not throw synchronous exception'); + } on FormatException { + // expected: invoking the criticalSection results in the exception + } + + final m = ReadWriteMutex(); + + try { + // Invoke protect to get the Future (this should succeed) + final resultFuture = m.protectRead(criticalSection); + expect(resultFuture, isA>()); + + // Wait for the Future (this should fail) + final result = await resultFuture; + expect(result, isNotNull); + fail('exception not thrown'); + } on FormatException catch (e) { + expect(m.isLocked, isFalse); + expect(e.message, equals('synchronous exception')); + } + + expect(m.isLocked, isFalse); + }); + + test('exception in asynchronous code', () async { + // Tests what happens when an exception is raised in the **asynchronous** + // part of the critical section. + // + // Locks are correctly managed: the lock is obtained before executing + // the critical section, and is released when the exception is thrown + // by the _protect_ method. + // + // The exception is raised when waiting for the Future returned by + // _protect_ to complete. + + Future criticalSection() async { + final c = Completer()..complete(42); + + await Future.delayed(const Duration(seconds: 1), () {}); + + // asynchronous exception (since it must wait for the above line) + throw const FormatException('asynchronous exception'); + // ignore: dead_code + return c.future; + } + + // Check the criticalSection behaves as expected for the test + + final resultFuture = criticalSection(); + expect(resultFuture, isA>()); + // invoking the criticalSection does not result in the exception + try { + await resultFuture; + fail('critical section did not throw asynchronous exception'); + } on FormatException { + // expected: exception happens on the await + } + + final m = ReadWriteMutex(); + + try { + // Invoke protect to get the Future (this should succeed) + final resultFuture = m.protectRead(criticalSection); + expect(resultFuture, isA>()); + + // Even though the criticalSection throws the exception in synchronous + // code, protect causes it to become an asynchronous exception. + + // Wait for the Future (this should fail) + final result = await resultFuture; + expect(result, isNotNull); + fail('exception not thrown'); + } on FormatException catch (e) { + expect(m.isLocked, isFalse); + expect(e.message, equals('asynchronous exception')); + } + + expect(m.isLocked, isFalse); + }); + }); + + group('protectWrite', () { + test('lock obtained and released on success', () async { + final m = ReadWriteMutex(); + + await m.protectWrite(() async { + // critical section + expect(m.isLocked, isTrue); + }); + expect(m.isLocked, isFalse); + }); + + test('value returned from critical section', () async { + // These are the normal scenario of the critical section running + // successfully. It tests different return types from the + // critical section. + + final m = ReadWriteMutex(); + + // returns Future + await m.protectWrite(() async {}); + + // returns Future + final number = await m.protectWrite(() async => 42); + expect(number, equals(42)); + + // returns Future completes with value + final optionalNumber = await m.protectWrite(() async => 1024); + expect(optionalNumber, equals(1024)); + + // returns Future completes with null + final optionalNumberNull = await m.protectWrite(() async => null); + expect(optionalNumberNull, isNull); + + // returns Future + final word = await m.protectWrite(() async => 'foobar'); + expect(word, equals('foobar')); + + // returns Future completes with value + final optionalWord = await m.protectWrite(() async => 'baz'); + expect(optionalWord, equals('baz')); + + // returns Future completes with null + final optionalWordNull = await m.protectWrite(() async => null); + expect(optionalWordNull, isNull); + + expect(m.isLocked, isFalse); + }); + + test('exception in synchronous code', () async { + // Tests what happens when an exception is raised in the **synchronous** + // part of the critical section. + // + // Locks are correctly managed: the lock is obtained before executing + // the critical section, and is released when the exception is thrown + // by the _protect_ method. + // + // The exception is raised when waiting for the Future returned by + // _protect_ to complete. Even though the exception is synchronously + // raised by the critical section, it won't be thrown when _protect_ + // is invoked. The _protect_ method always successfully returns a + // _Future_. + + Future criticalSection() { + final c = Completer()..complete(42); + + // synchronous exception + throw const FormatException('synchronous exception'); + // ignore: dead_code + return c.future; + } + + // Check the criticalSection behaves as expected for the test + + try { + // ignore: unused_local_variable + final resultFuture = criticalSection(); + fail('critical section did not throw synchronous exception'); + } on FormatException { + // expected: invoking the criticalSection results in the exception + } + + final m = ReadWriteMutex(); + + try { + // Invoke protect to get the Future (this should succeed) + final resultFuture = m.protectWrite(criticalSection); + expect(resultFuture, isA>()); + + // Wait for the Future (this should fail) + final result = await resultFuture; + expect(result, isNotNull); + fail('exception not thrown'); + } on FormatException catch (e) { + expect(m.isLocked, isFalse); + expect(e.message, equals('synchronous exception')); + } + + expect(m.isLocked, isFalse); + }); + + test('exception in asynchronous code', () async { + // Tests what happens when an exception is raised in the **asynchronous** + // part of the critical section. + // + // Locks are correctly managed: the lock is obtained before executing + // the critical section, and is released when the exception is thrown + // by the _protect_ method. + // + // The exception is raised when waiting for the Future returned by + // _protect_ to complete. + + Future criticalSection() async { + final c = Completer()..complete(42); + + await Future.delayed(const Duration(seconds: 1), () {}); + + // asynchronous exception (since it must wait for the above line) + throw const FormatException('asynchronous exception'); + // ignore: dead_code + return c.future; + } + + // Check the criticalSection behaves as expected for the test + + final resultFuture = criticalSection(); + expect(resultFuture, isA>()); + // invoking the criticalSection does not result in the exception + try { + await resultFuture; + fail('critical section did not throw asynchronous exception'); + } on FormatException { + // expected: exception happens on the await + } + + final m = ReadWriteMutex(); + + try { + // Invoke protect to get the Future (this should succeed) + final resultFuture = m.protectWrite(criticalSection); + expect(resultFuture, isA>()); + + // Even though the criticalSection throws the exception in synchronous + // code, protect causes it to become an asynchronous exception. + + // Wait for the Future (this should fail) + final result = await resultFuture; + expect(result, isNotNull); + fail('exception not thrown'); + } on FormatException catch (e) { + expect(m.isLocked, isFalse); + expect(e.message, equals('asynchronous exception')); + } + + expect(m.isLocked, isFalse); + }); + }); +} diff --git a/packages/mutex/test/mutex_test.dart b/packages/mutex/test/mutex_test.dart new file mode 100644 index 0000000..0db5f52 --- /dev/null +++ b/packages/mutex/test/mutex_test.dart @@ -0,0 +1,341 @@ +import 'dart:async'; +import 'package:mutex/mutex.dart'; +import 'package:test/test.dart'; + +//################################################################ +/// Account simulating the classic "simultaneous update" concurrency problem. +/// +/// The deposit operation reads the balance, waits for a short time (where +/// problems can occur if the balance is changed) and then writes out the +/// new balance. +/// +class Account { + int get balance => _balance; + int _balance = 0; + + int _operation = 0; + + Mutex mutex = Mutex(); + + /// Set to true to print out read/write to the balance during deposits + static const bool debugOutput = false; + + /// Time used for calculating time offsets in debug messages. + final DateTime _startTime = DateTime.now(); + + void _debugPrint(String message) { + if (debugOutput) { + final t = DateTime.now().difference(_startTime).inMilliseconds; + // ignore: avoid_print + print('$t: $message'); + } + } + + void reset([int startingBalance = 0]) { + _balance = startingBalance; + _debugPrint('reset: balance = $_balance'); + } + + /// Waits [startDelay] and then invokes critical section without mutex. + /// + Future depositUnsafe( + int amount, int startDelay, int dangerWindow) async { + await Future.delayed(Duration(milliseconds: startDelay)); + + await _depositCriticalSection(amount, dangerWindow); + } + + /// Waits [startDelay] and then invokes critical section with mutex. + /// + Future depositWithMutex( + int amount, int startDelay, int dangerWindow) async { + await Future.delayed(Duration(milliseconds: startDelay)); + + await mutex.acquire(); + try { + expect(mutex.isLocked, isTrue); + await _depositCriticalSection(amount, dangerWindow); + expect(mutex.isLocked, isTrue); + } finally { + mutex.release(); + } + } + + /// Critical section of adding [amount] to the balance. + /// + /// Reads the balance, then sleeps for [dangerWindow] milliseconds, before + /// saving the new balance. If not protected, another invocation of this + /// method while it is sleeping will read the balance before it is updated. + /// The one that saves its balance last will overwrite the earlier saved + /// balances (effectively those other deposits will be lost). + /// + Future _depositCriticalSection(int amount, int dangerWindow) async { + final op = ++_operation; + + _debugPrint('[$op] read balance: $_balance'); + + final tmp = _balance; + + await Future.delayed(Duration(milliseconds: dangerWindow)); + + _balance = tmp + amount; + + _debugPrint('[$op] write balance: $_balance (= $tmp + $amount)'); + } +} + +//################################################################ + +//---------------------------------------------------------------- + +void main() { + const correctBalance = 68; + + final account = Account(); + + test('without mutex', () async { + // First demonstrate that without mutex incorrect results are produced. + + // Without mutex produces incorrect result + // 000. a reads 0 + // 025. b reads 0 + // 050. a writes 42 + // 075. b writes 26 + account.reset(); + await Future.wait([ + account.depositUnsafe(42, 0, 50), + account.depositUnsafe(26, 25, 50) // result overwrites first deposit + ]); + expect(account.balance, equals(26)); // incorrect: first deposit lost + + // Without mutex produces incorrect result + // 000. b reads 0 + // 025. a reads 0 + // 050. b writes 26 + // 075. a writes 42 + account.reset(); + await Future.wait([ + account.depositUnsafe(42, 25, 50), // result overwrites second deposit + account.depositUnsafe(26, 0, 50) + ]); + expect(account.balance, equals(42)); // incorrect: second deposit lost + }); + + test('with mutex', () async { +// Test correct results are produced with mutex + + // With mutex produces correct result + // 000. a acquires lock + // 000. a reads 0 + // 025. b is blocked + // 050. a writes 42 + // 050. a releases lock + // 050. b acquires lock + // 050. b reads 42 + // 100. b writes 68 + account.reset(); + await Future.wait([ + account.depositWithMutex(42, 0, 50), + account.depositWithMutex(26, 25, 50) + ]); + expect(account.balance, equals(correctBalance)); + + // With mutex produces correct result + // 000. b acquires lock + // 000. b reads 0 + // 025. a is blocked + // 050. b writes 26 + // 050. b releases lock + // 050. a acquires lock + // 050. a reads 26 + // 100. a writes 68 + account.reset(); + await Future.wait([ + account.depositWithMutex(42, 25, 50), + account.depositWithMutex(26, 0, 50) + ]); + expect(account.balance, equals(correctBalance)); + }); + + test('multiple acquires are serialized', () async { + // Demonstrate that sections running in a mutex are effectively serialized + const delay = 200; // milliseconds + account.reset(); + await Future.wait([ + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + account.depositWithMutex(1, 0, delay), + ]); + expect(account.balance, equals(10)); + }); + + group('protect', () { + test('lock obtained and released on success', () async { + // This is the normal scenario of the critical section running + // successfully. The lock is acquired before running the critical + // section, and it is released after it runs (and will remain + // unlocked after the _protect_ method returns). + + final m = Mutex(); + + await m.protect(() async { + // critical section: returns Future + expect(m.isLocked, isTrue); + }); + + expect(m.isLocked, isFalse); + }); + + test('value returned from critical section', () async { + // These are the normal scenario of the critical section running + // successfully. It tests different return types from the + // critical section. + + final m = Mutex(); + + // returns Future + await m.protect(() async {}); + + // returns Future + final number = await m.protect(() async => 42); + expect(number, equals(42)); + + // returns Future completes with value + final optionalNumber = await m.protect(() async => 1024); + expect(optionalNumber, equals(1024)); + + // returns Future completes with null + final optionalNumberNull = await m.protect(() async => null); + expect(optionalNumberNull, isNull); + + // returns Future + final word = await m.protect(() async => 'foobar'); + expect(word, equals('foobar')); + + // returns Future completes with value + final optionalWord = await m.protect(() async => 'baz'); + expect(optionalWord, equals('baz')); + + // returns Future completes with null + final optionalWordNull = await m.protect(() async => null); + expect(optionalWordNull, isNull); + + expect(m.isLocked, isFalse); + }); + + test('exception in synchronous code', () async { + // Tests what happens when an exception is raised in the **synchronous** + // part of the critical section. + // + // Locks are correctly managed: the lock is obtained before executing + // the critical section, and is released when the exception is thrown + // by the _protect_ method. + // + // The exception is raised when waiting for the Future returned by + // _protect_ to complete. Even though the exception is synchronously + // raised by the critical section, it won't be thrown when _protect_ + // is invoked. The _protect_ method always successfully returns a + // _Future_. + + Future criticalSection() { + final c = Completer()..complete(42); + + // synchronous exception + throw const FormatException('synchronous exception'); + // ignore: dead_code + return c.future; + } + + // Check the criticalSection behaves as expected for the test + + try { + // ignore: unused_local_variable + final resultFuture = criticalSection(); + fail('critical section did not throw synchronous exception'); + } on FormatException { + // expected: invoking the criticalSection results in the exception + } + + final m = Mutex(); + + try { + // Invoke protect to get the Future (this should succeed) + final resultFuture = m.protect(criticalSection); + expect(resultFuture, isA>()); + + // Wait for the Future (this should fail) + final result = await resultFuture; + expect(result, isNotNull); + fail('exception not thrown'); + } on FormatException catch (e) { + expect(m.isLocked, isFalse); + expect(e.message, equals('synchronous exception')); + } + + expect(m.isLocked, isFalse); + }); + + test('exception in asynchronous code', () async { + // Tests what happens when an exception is raised in the **asynchronous** + // part of the critical section. + // + // Locks are correctly managed: the lock is obtained before executing + // the critical section, and is released when the exception is thrown + // by the _protect_ method. + // + // The exception is raised when waiting for the Future returned by + // _protect_ to complete. + + Future criticalSection() async { + final c = Completer()..complete(42); + + await Future.delayed(const Duration(seconds: 1), () {}); + + // asynchronous exception (since it must wait for the above line) + throw const FormatException('asynchronous exception'); + // ignore: dead_code + return c.future; + } + + // Check the criticalSection behaves as expected for the test + + final resultFuture = criticalSection(); + expect(resultFuture, isA>()); + // invoking the criticalSection does not result in the exception + try { + await resultFuture; + fail('critical section did not throw asynchronous exception'); + } on FormatException { + // expected: exception happens on the await + } + + final m = Mutex(); + + try { + // Invoke protect to get the Future (this should succeed) + final resultFuture = m.protect(criticalSection); + expect(resultFuture, isA>()); + + // Even though the criticalSection throws the exception in synchronous + // code, protect causes it to become an asynchronous exception. + + // Wait for the Future (this should fail) + final result = await resultFuture; + expect(result, isNotNull); + fail('exception not thrown'); + } on FormatException catch (e) { + expect(m.isLocked, isFalse); + expect(e.message, equals('asynchronous exception')); + } + + expect(m.isLocked, isFalse); + }); + }); +} diff --git a/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart index 20fdf08..86806cf 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record_cubit.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:typed_data'; +import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; import '../../veilid_support.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart index fd47da4..ee4b426 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record_pool.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:equatable/equatable.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:freezed_annotation/freezed_annotation.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart index 71d1d38..afccbde 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array_cubit.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; diff --git a/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart b/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart index ab1c3a8..a50d893 100644 --- a/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart +++ b/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart @@ -1,8 +1,9 @@ import 'dart:async'; +import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; -import '../veilid_support.dart'; +import 'table_db.dart'; abstract class AsyncTableDBBackedCubit extends Cubit> with TableDBBacked { diff --git a/packages/veilid_support/lib/veilid_support.dart b/packages/veilid_support/lib/veilid_support.dart index 309f118..f873397 100644 --- a/packages/veilid_support/lib/veilid_support.dart +++ b/packages/veilid_support/lib/veilid_support.dart @@ -6,10 +6,7 @@ library veilid_support; export 'package:veilid/veilid.dart'; export 'dht_support/dht_support.dart'; -export 'src/async_tag_lock.dart'; -export 'src/async_value.dart'; export 'src/config.dart'; -export 'src/future_cubit.dart'; export 'src/identity.dart'; export 'src/json_tools.dart'; export 'src/protobuf_tools.dart'; diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index ec62d29..54c4755 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -33,6 +33,13 @@ packages: url: "https://pub.dev" source: hosted version: "2.11.0" + async_tools: + dependency: "direct main" + description: + path: "../async_tools" + relative: true + source: path + version: "1.0.0" bloc: dependency: "direct main" description: @@ -404,12 +411,11 @@ packages: source: hosted version: "1.0.4" mutex: - dependency: "direct main" + dependency: transitive description: - name: mutex - sha256: "8827da25de792088eb33e572115a5eb0d61d61a3c01acbc8bcbe76ed78f1a1f2" - url: "https://pub.dev" - source: hosted + path: "../mutex" + relative: true + source: path version: "3.1.0" node_preamble: dependency: transitive @@ -784,5 +790,5 @@ packages: source: hosted version: "3.1.2" sdks: - dart: ">=3.2.0-194.0.dev <4.0.0" + dart: ">=3.2.6 <4.0.0" flutter: ">=3.10.6" diff --git a/packages/veilid_support/pubspec.yaml b/packages/veilid_support/pubspec.yaml index 471d00b..b1d21c1 100644 --- a/packages/veilid_support/pubspec.yaml +++ b/packages/veilid_support/pubspec.yaml @@ -7,6 +7,8 @@ environment: sdk: '>=3.0.5 <4.0.0' dependencies: + async_tools: + path: ../async_tools bloc: ^8.1.2 equatable: ^2.0.5 fast_immutable_collections: ^9.1.5 @@ -14,7 +16,6 @@ dependencies: json_annotation: ^4.8.1 loggy: ^2.0.3 meta: ^1.10.0 - mutex: ^3.1.0 protobuf: ^3.0.0 veilid: # veilid: ^0.0.1 diff --git a/pubspec.lock b/pubspec.lock index 648e896..1d4800b 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -57,6 +57,13 @@ packages: url: "https://pub.dev" source: hosted version: "2.11.0" + async_tools: + dependency: "direct main" + description: + path: "packages/async_tools" + relative: true + source: path + version: "1.0.0" awesome_extensions: dependency: "direct main" description: @@ -832,10 +839,9 @@ packages: mutex: dependency: "direct main" description: - name: mutex - sha256: "8827da25de792088eb33e572115a5eb0d61d61a3c01acbc8bcbe76ed78f1a1f2" - url: "https://pub.dev" - source: hosted + path: "packages/mutex" + relative: true + source: path version: "3.1.0" nested: dependency: transitive @@ -1609,5 +1615,5 @@ packages: source: hosted version: "0.9.0" sdks: - dart: ">=3.2.3 <4.0.0" + dart: ">=3.2.6 <4.0.0" flutter: ">=3.16.6" diff --git a/pubspec.yaml b/pubspec.yaml index a5f22bc..f2ed0f5 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -11,6 +11,8 @@ dependencies: animated_theme_switcher: ^2.0.10 ansicolor: ^2.0.2 archive: ^3.4.10 + async_tools: + path: packages/async_tools awesome_extensions: ^2.0.12 badges: ^3.1.2 basic_utils: ^5.7.0 @@ -51,7 +53,8 @@ dependencies: meta: ^1.10.0 mobile_scanner: ^3.5.7 motion_toast: ^2.8.0 - mutex: ^3.1.0 + mutex: + path: packages/mutex pasteboard: ^0.2.0 path: ^1.8.3 path_provider: ^2.1.2