mutex debugging

This commit is contained in:
Christien Rioux 2024-08-06 08:51:19 -07:00
parent 120a7105c8
commit 103975bb56
24 changed files with 88 additions and 65 deletions

View file

@ -305,10 +305,10 @@ class DHTLog implements DHTDeleteable<DHTLog> {
// Openable
int _openCount;
final _mutex = Mutex();
final _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
final Mutex _listenMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Stream of external changes
StreamController<DHTLogUpdate>? _watchController;
}

View file

@ -713,7 +713,7 @@ class _DHTLogSpine {
DHTShortArray.maxElements;
// Spine head mutex to ensure we keep the representation valid
final Mutex _spineMutex = Mutex();
final Mutex _spineMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external spine head changes
@ -733,7 +733,8 @@ class _DHTLogSpine {
// LRU cache of DHT spine elements accessed recently
// Pair of position and associated shortarray segment
final Mutex _spineCacheMutex = Mutex();
final Mutex _spineCacheMutex =
Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
final List<int> _openCache;
final Map<int, DHTShortArray> _openedSegments;
static const int _openCacheSize = 3;

View file

@ -562,7 +562,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
final KeyPair? _writer;
final VeilidCrypto _crypto;
final String debugName;
final _mutex = Mutex();
final _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
int _openCount;
StreamController<DHTRecordWatchChange>? _watchController;
_WatchState? _watchState;

View file

@ -65,7 +65,7 @@ class OwnedDHTRecordPointer with _$OwnedDHTRecordPointer {
class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
: _state = const DHTRecordPoolAllocations(),
_mutex = Mutex(debugLockTimeout: 30),
_mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null),
_recordTagLock = AsyncTagLock(),
_opened = <TypedKey, _OpenedRecordInfo>{},
_markedForDelete = <TypedKey>{},
@ -835,9 +835,11 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
openedRecordInfo.shared.unionWatchState = null;
openedRecordInfo.shared.needsWatchStateUpdate = false;
} on VeilidAPIExceptionTimeout {
log('Timeout in watch cancel for key=$openedRecordKey');
} on VeilidAPIException catch (e) {
// Failed to cancel DHT watch, try again next tick
log('Exception in watch cancel: $e');
log('Exception in watch cancel for key=$openedRecordKey: $e');
}
return;
}
@ -877,12 +879,22 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
openedRecordInfo.records, realExpiration, renewalTime);
openedRecordInfo.shared.needsWatchStateUpdate = false;
}
} on VeilidAPIExceptionTimeout {
log('Timeout in watch update for key=$openedRecordKey');
} on VeilidAPIException catch (e) {
// Failed to update DHT watch, try again next tick
log('Exception in watch update: $e');
log('Exception in watch update for key=$openedRecordKey: $e');
}
// If we still need a state update after this then do a poll instead
if (openedRecordInfo.shared.needsWatchStateUpdate) {
_pollWatch(openedRecordKey, openedRecordInfo, unionWatchState);
}
}
// In lieu of a completed watch, set off a polling operation
// on the first value of the watched range, which, due to current
// veilid limitations can only be one subkey at a time right now
void _pollWatch(TypedKey openedRecordKey, _OpenedRecordInfo openedRecordInfo,
_WatchState unionWatchState) {
singleFuture((this, _sfPollWatch, openedRecordKey), () async {
@ -942,18 +954,11 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
final unionWatchState =
_collectUnionWatchState(openedRecordInfo.records);
final processed = _watchStateProcessors.updateState(
_watchStateProcessors.updateState(
openedRecordKey,
unionWatchState,
(newState) =>
_watchStateChange(openedRecordKey, unionWatchState));
// In lieu of a completed watch, set off a polling operation
// on the first value of the watched range, which, due to current
// veilid limitations can only be one subkey at a time right now
if (!processed && unionWatchState != null) {
_pollWatch(openedRecordKey, openedRecordInfo, unionWatchState);
}
}
}
});

View file

@ -289,10 +289,10 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray> {
// Openable
int _openCount;
final _mutex = Mutex();
final _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
final Mutex _listenMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Stream of external changes
StreamController<void>? _watchController;
}

View file

@ -518,7 +518,7 @@ class _DHTShortArrayHead {
////////////////////////////////////////////////////////////////////////////
// Head/element mutex to ensure we keep the representation valid
final Mutex _headMutex = Mutex();
final Mutex _headMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external head changes

View file

@ -4,6 +4,7 @@ import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:meta/meta.dart';
import 'config.dart';
import 'table_db.dart';
abstract class AsyncTableDBBackedCubit<T> extends Cubit<AsyncValue<T?>>
@ -45,5 +46,5 @@ abstract class AsyncTableDBBackedCubit<T> extends Cubit<AsyncValue<T?>>
}
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _mutex = Mutex();
final Mutex _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
}

View file

@ -5,10 +5,10 @@ import 'package:path_provider/path_provider.dart';
import 'package:veilid/veilid.dart';
// ignore: do_not_use_environment
const bool _kReleaseMode = bool.fromEnvironment('dart.vm.product');
const bool kIsReleaseMode = bool.fromEnvironment('dart.vm.product');
// ignore: do_not_use_environment
const bool _kProfileMode = bool.fromEnvironment('dart.vm.profile');
const bool _kDebugMode = !_kReleaseMode && !_kProfileMode;
const bool kIsProfileMode = bool.fromEnvironment('dart.vm.profile');
const bool kIsDebugMode = !kIsReleaseMode && !kIsProfileMode;
Future<Map<String, dynamic>> getDefaultVeilidPlatformConfig(
bool isWeb, String appName) async {
@ -34,7 +34,7 @@ Future<Map<String, dynamic>> getDefaultVeilidPlatformConfig(
logging: VeilidWASMConfigLogging(
performance: VeilidWASMConfigLoggingPerformance(
enabled: true,
level: _kDebugMode
level: kIsDebugMode
? VeilidConfigLogLevel.debug
: VeilidConfigLogLevel.info,
logsInTimings: true,
@ -50,8 +50,8 @@ Future<Map<String, dynamic>> getDefaultVeilidPlatformConfig(
logging: VeilidFFIConfigLogging(
terminal: VeilidFFIConfigLoggingTerminal(
enabled:
_kDebugMode && (Platform.isIOS || Platform.isAndroid),
level: _kDebugMode
kIsDebugMode && (Platform.isIOS || Platform.isAndroid),
level: kIsDebugMode
? VeilidConfigLogLevel.debug
: VeilidConfigLogLevel.info,
ignoreLogTargets: ignoreLogTargets),

View file

@ -5,6 +5,7 @@ import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:protobuf/protobuf.dart';
import 'config.dart';
import 'table_db.dart';
class PersistentQueue<T extends GeneratedMessage>
@ -203,7 +204,7 @@ class PersistentQueue<T extends GeneratedMessage>
final T Function(Uint8List) _fromBuffer;
final bool _deleteOnClose;
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _queueMutex = Mutex();
final Mutex _queueMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
IList<T> _queue = IList<T>.empty();
final StreamController<Iterable<T>> _syncAddController = StreamController();
final StreamController<void> _queueReady = StreamController();

View file

@ -614,7 +614,7 @@ class _TableDBArrayBase {
var _initDone = false;
final VeilidCrypto _crypto;
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _mutex = Mutex();
final Mutex _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Change tracking
int _headDelta = 0;