waiting invitation state machine

This commit is contained in:
Christien Rioux 2025-05-22 19:58:32 -04:00
parent fde8ff2f35
commit 5f68539738
12 changed files with 144 additions and 121 deletions

View file

@ -214,7 +214,8 @@
"waiting_invitation": {
"accepted": "Contact invitation accepted from {name}",
"rejected": "Contact invitation was rejected",
"invalid": "Contact invitation was not valid"
"invalid": "Contact invitation was not valid",
"init_failed": "Contact initialization failed"
},
"paste_invitation_dialog": {
"title": "Paste Contact Invite",

View file

@ -40,6 +40,19 @@ class WaitingInvitationStateInvalidSignature
final WaitingInvitationStateGlobal global;
}
/// State of WaitingInvitationCubit:
/// Failed to initialize
class WaitingInvitationStateInitFailed
with StateMachineEndState<WaitingInvitationState>
implements WaitingInvitationState {
const WaitingInvitationStateInitFailed(
{required this.global, required this.exception});
@override
final WaitingInvitationStateGlobal global;
final Exception exception;
}
/// State of WaitingInvitationCubit:
/// Finished normally with an invitation status
class WaitingInvitationStateInvitationStatus
@ -70,8 +83,8 @@ class WaitingInvitationStateWaitForContactResponse
ContactRequestInboxCubit> implements WaitingInvitationState {
WaitingInvitationStateWaitForContactResponse(super.create,
{required this.global})
: super(onState: (_, state) async {
final signedContactResponse = state.asData?.value;
: super(onState: (ctx) async {
final signedContactResponse = ctx.state.asData?.value;
if (signedContactResponse == null) {
return null;
}
@ -100,8 +113,8 @@ class WaitingInvitationStateWaitForContactSuperIdentity
WaitingInvitationStateWaitForContactSuperIdentity(super.create,
{required this.global,
required proto.SignedContactResponse signedContactResponse})
: super(onState: (_, state) async {
final contactSuperIdentity = state.asData?.value;
: super(onState: (ctx) async {
final contactSuperIdentity = ctx.state.asData?.value;
if (contactSuperIdentity == null) {
return null;
}
@ -162,31 +175,39 @@ class WaitingInvitationStateWaitForConversation extends AsyncCubitReactorState<
{required this.global,
required TypedKey remoteConversationRecordKey,
required SuperIdentity contactSuperIdentity})
: super(onState: (conversation, state) async {
final remoteConversation = state.asData?.value.remoteConversation;
if (remoteConversation == null) {
: super(onState: (ctx) async {
final remoteConversation = ctx.state.asData?.value.remoteConversation;
final localConversation = ctx.state.asData?.value.localConversation;
if (remoteConversation == null || localConversation != null) {
return null;
}
// Stop reacting to the conversation cubit
ctx.stop();
// Complete the local conversation now that we have the remote profile
final remoteProfile = remoteConversation.profile;
final localConversationRecordKey = global
.contactInvitationRecord.localConversationRecordKey
.toVeilid();
return conversation.initLocalConversation(
profile: global.accountRecordCubit.state.asData!.value.profile,
existingConversationRecordKey: localConversationRecordKey,
callback: (localConversation) =>
WaitingInvitationStateInvitationStatus(
global: global,
status: InvitationStatus(
acceptedContact: AcceptedContact(
remoteProfile: remoteProfile,
remoteIdentity: contactSuperIdentity,
remoteConversationRecordKey:
remoteConversationRecordKey,
localConversationRecordKey:
localConversationRecordKey))));
try {
await ctx.cubit.initLocalConversation(
profile: global.accountRecordCubit.state.asData!.value.profile,
existingConversationRecordKey: localConversationRecordKey);
} on Exception catch (e) {
return WaitingInvitationStateInitFailed(
global: global, exception: e);
}
return WaitingInvitationStateInvitationStatus(
global: global,
status: InvitationStatus(
acceptedContact: AcceptedContact(
remoteProfile: remoteProfile,
remoteIdentity: contactSuperIdentity,
remoteConversationRecordKey: remoteConversationRecordKey,
localConversationRecordKey: localConversationRecordKey)));
});
@override

View file

@ -65,16 +65,25 @@ class WaitingInvitationsBlocMapCubit extends BlocMapCubit<TypedKey,
switch (entry.value) {
case WaitingInvitationStateInvalidSignature():
// Signature was invalid, throw an error and treat like a rejection
// Signature was invalid, display an error and treat like a rejection
await _contactInvitationListCubit.deleteInvitation(
accepted: false,
contactRequestInboxRecordKey: contactRequestInboxRecordKey);
// Notify about rejection
// Notify about error state
_notificationsCubit.error(
text: translate(
'waiting_invitation.invalid',
));
text: translate('waiting_invitation.invalid'));
case final WaitingInvitationStateInitFailed st:
// Initialization error, display an error and treat like a rejection
await _contactInvitationListCubit.deleteInvitation(
accepted: false,
contactRequestInboxRecordKey: contactRequestInboxRecordKey);
// Notify about error state
_notificationsCubit.error(
text: '${translate('waiting_invitation.init_failed')}\n'
'${st.exception}');
case final WaitingInvitationStateInvitationStatus st:
final invStatus = st.status;

View file

@ -47,36 +47,35 @@ class ValidContactInvitation {
accountInfo: _accountInfo,
remoteIdentityPublicKey:
_contactSuperIdentity.currentInstance.typedPublicKey);
return conversation.initLocalConversation(
profile: profile,
callback: (localConversation) async {
final contactResponse = proto.ContactResponse()
..accept = true
..remoteConversationRecordKey = localConversation.key.toProto()
..superIdentityRecordKey =
_accountInfo.superIdentityRecordKey.toProto();
final contactResponseBytes = contactResponse.writeToBuffer();
final localConversationRecordKey =
await conversation.initLocalConversation(profile: profile);
final cs = await _accountInfo.identityCryptoSystem;
final identitySignature = await cs.signWithKeyPair(
_accountInfo.identityWriter, contactResponseBytes);
final contactResponse = proto.ContactResponse()
..accept = true
..remoteConversationRecordKey = localConversationRecordKey.toProto()
..superIdentityRecordKey =
_accountInfo.superIdentityRecordKey.toProto();
final contactResponseBytes = contactResponse.writeToBuffer();
final signedContactResponse = proto.SignedContactResponse()
..contactResponse = contactResponseBytes
..identitySignature = identitySignature.toProto();
final cs = await _accountInfo.identityCryptoSystem;
final identitySignature = await cs.signWithKeyPair(
_accountInfo.identityWriter, contactResponseBytes);
// Write the acceptance to the inbox
await contactRequestInbox
.eventualWriteProtobuf(signedContactResponse, subkey: 1);
final signedContactResponse = proto.SignedContactResponse()
..contactResponse = contactResponseBytes
..identitySignature = identitySignature.toProto();
return AcceptedContact(
remoteProfile: _contactRequestPrivate.profile,
remoteIdentity: _contactSuperIdentity,
remoteConversationRecordKey:
_contactRequestPrivate.chatRecordKey.toVeilid(),
localConversationRecordKey: localConversation.key,
);
});
// Write the acceptance to the inbox
await contactRequestInbox.eventualWriteProtobuf(signedContactResponse,
subkey: 1);
return AcceptedContact(
remoteProfile: _contactRequestPrivate.profile,
remoteIdentity: _contactSuperIdentity,
remoteConversationRecordKey:
_contactRequestPrivate.chatRecordKey.toVeilid(),
localConversationRecordKey: localConversationRecordKey,
);
});
} on Exception catch (e) {
log.debug('exception: $e', e);

View file

@ -27,6 +27,12 @@ class ConversationState extends Equatable {
@override
List<Object?> get props => [localConversation, remoteConversation];
@override
String toString() => 'ConversationState('
'localConversation: ${DynamicDebug.toDebug(localConversation)}, '
'remoteConversation: ${DynamicDebug.toDebug(remoteConversation)}'
')';
}
/// Represents the control channel between two contacts
@ -39,13 +45,11 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
TypedKey? localConversationRecordKey,
TypedKey? remoteConversationRecordKey})
: _accountInfo = accountInfo,
_localConversationRecordKey = localConversationRecordKey,
_remoteIdentityPublicKey = remoteIdentityPublicKey,
_remoteConversationRecordKey = remoteConversationRecordKey,
super(const AsyncValue.loading()) {
_identityWriter = _accountInfo.identityWriter;
if (_localConversationRecordKey != null) {
if (localConversationRecordKey != null) {
_initWait.add((_) async {
await _setLocalConversation(() async {
// Open local record key if it is specified
@ -54,7 +58,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
final writer = _identityWriter;
final record = await pool.openRecordWrite(
_localConversationRecordKey!, writer,
localConversationRecordKey, writer,
debugName: 'ConversationCubit::LocalConversation',
parent: accountInfo.accountRecordKey,
crypto: crypto);
@ -64,17 +68,17 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
});
}
if (_remoteConversationRecordKey != null) {
if (remoteConversationRecordKey != null) {
_initWait.add((cancel) async {
await _setRemoteConversation(() async {
// Open remote record key if it is specified
final pool = DHTRecordPool.instance;
final crypto = await _cachedConversationCrypto();
final record = await pool.openRecordRead(_remoteConversationRecordKey,
final record = await pool.openRecordRead(remoteConversationRecordKey,
debugName: 'ConversationCubit::RemoteConversation',
parent:
await pool.getParentRecordKey(_remoteConversationRecordKey) ??
await pool.getParentRecordKey(remoteConversationRecordKey) ??
accountInfo.accountRecordKey,
crypto: crypto);
@ -104,13 +108,11 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
/// incomplete 'existingConversationRecord' that we need to fill
/// in now that we have the remote identity key
/// The ConversationCubit must not already have a local conversation
/// The callback allows for more initialization to occur and for
/// cleanup to delete records upon failure of the callback
Future<T> initLocalConversation<T>(
/// Returns the local conversation record key that was initialized
Future<TypedKey> initLocalConversation(
{required proto.Profile profile,
required FutureOr<T> Function(DHTRecord) callback,
TypedKey? existingConversationRecordKey}) async {
assert(_localConversationRecordKey == null,
assert(_localConversationCubit == null,
'must not have a local conversation yet');
final pool = DHTRecordPool.instance;
@ -138,36 +140,32 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
schema: DHTSchema.smpl(
oCnt: 0, members: [DHTSchemaMember(mKey: writer.key, mCnt: 1)]));
}
final out = localConversationRecord
.deleteScope((localConversation) => _initLocalMessages(
localConversationKey: localConversation.key,
callback: (messages) async {
// Create initial local conversation key contents
final conversation = proto.Conversation()
..profile = profile
..superIdentityJson =
jsonEncode(_accountInfo.localAccount.superIdentity.toJson())
..messages = messages.recordKey.toProto();
await localConversationRecord.deleteScope((localConversation) async {
await _initLocalMessages(
localConversationKey: localConversation.key,
callback: (messages) async {
// Create initial local conversation key contents
final conversation = proto.Conversation()
..profile = profile
..superIdentityJson =
jsonEncode(_accountInfo.localAccount.superIdentity.toJson())
..messages = messages.recordKey.toProto();
// Write initial conversation to record
final update = await localConversation.tryWriteProtobuf(
proto.Conversation.fromBuffer, conversation);
if (update != null) {
throw Exception('Failed to write local conversation');
}
final out = await callback(localConversation);
// Write initial conversation to record
final update = await localConversation.tryWriteProtobuf(
proto.Conversation.fromBuffer, conversation);
if (update != null) {
throw Exception('Failed to write local conversation');
}
// Upon success emit the local conversation record to the state
_updateLocalConversationState(AsyncValue.data(conversation));
// If success, save the new local conversation
// record key in this object
localConversation.ref();
await _setLocalConversation(() async => localConversation);
});
});
return out;
}));
// If success, save the new local conversation record key in this object
_localConversationRecordKey = localConversationRecord.key;
await _setLocalConversation(() async => localConversationRecord);
return out;
return localConversationRecord.key;
}
/// Force refresh of conversation keys
@ -222,12 +220,6 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
_incrementalState = ConversationState(
localConversation: conv,
remoteConversation: _incrementalState.remoteConversation);
// return loading still if state isn't complete
if (_localConversationRecordKey != null &&
_incrementalState.localConversation == null) {
return const AsyncValue<ConversationState>.loading();
}
// local state is complete, all remote state is emitted incrementally
return AsyncValue.data(_incrementalState);
},
loading: AsyncValue<ConversationState>.loading,
@ -242,12 +234,6 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
_incrementalState = ConversationState(
localConversation: _incrementalState.localConversation,
remoteConversation: conv);
// return loading still if the local state isn't complete
if (_localConversationRecordKey != null &&
_incrementalState.localConversation == null) {
return const AsyncValue<ConversationState>.loading();
}
// local state is complete, all remote state is emitted incrementally
return AsyncValue.data(_incrementalState);
},
loading: AsyncValue<ConversationState>.loading,
@ -259,9 +245,12 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Open local converation key
Future<void> _setLocalConversation(Future<DHTRecord> Function() open) async {
assert(_localConversationCubit == null,
'shoud not set local conversation twice');
'should not set local conversation twice');
_localConversationCubit = DefaultDHTRecordCubit(
open: open, decodeState: proto.Conversation.fromBuffer);
await _localConversationCubit!.ready();
_localSubscription =
_localConversationCubit!.stream.listen(_updateLocalConversationState);
_updateLocalConversationState(_localConversationCubit!.state);
@ -270,9 +259,12 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Open remote converation key
Future<void> _setRemoteConversation(Future<DHTRecord> Function() open) async {
assert(_remoteConversationCubit == null,
'shoud not set remote conversation twice');
'should not set remote conversation twice');
_remoteConversationCubit = DefaultDHTRecordCubit(
open: open, decodeState: proto.Conversation.fromBuffer);
await _remoteConversationCubit!.ready();
_remoteSubscription =
_remoteConversationCubit!.stream.listen(_updateRemoteConversationState);
_updateRemoteConversationState(_remoteConversationCubit!.state);
@ -312,8 +304,6 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
final AccountInfo _accountInfo;
late final KeyPair _identityWriter;
final TypedKey _remoteIdentityPublicKey;
TypedKey? _localConversationRecordKey;
final TypedKey? _remoteConversationRecordKey;
DefaultDHTRecordCubit<proto.Conversation>? _localConversationCubit;
DefaultDHTRecordCubit<proto.Conversation>? _remoteConversationCubit;
StreamSubscription<AsyncValue<proto.Conversation>>? _localSubscription;

View file

@ -17,6 +17,7 @@ const Map<String, LogLevel> _blocChangeLogLevels = {
'PreferencesCubit': LogLevel.debug,
'ConversationCubit': LogLevel.debug,
'DefaultDHTRecordCubit<Conversation>': LogLevel.debug,
'WaitingInvitationCubit': LogLevel.debug,
};
const Map<String, LogLevel> _blocCreateCloseLogLevels = {};

View file

@ -613,7 +613,7 @@ class _DHTLogSpine {
// xxx: Don't watch for local changes because this class already handles
// xxx: notifying listeners and knows when it makes local changes
_subscription ??=
await _spineRecord.listen(localChanges: true, _onSpineChanged);
await _spineRecord.listen(localChanges: false, _onSpineChanged);
await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
} on Exception {
// If anything fails, try to cancel the watches

View file

@ -87,6 +87,10 @@ abstract class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
await super.close();
}
Future<void> ready() async {
await initWait();
}
Future<void> refresh(List<ValueSubkeyRange> subkeys) async {
await initWait();

View file

@ -36,10 +36,9 @@ packages:
async_tools:
dependency: "direct main"
description:
name: async_tools
sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067
url: "https://pub.dev"
source: hosted
path: "../../../dart_async_tools"
relative: true
source: path
version: "0.1.9"
bloc:
dependency: "direct main"

View file

@ -29,8 +29,8 @@ dependencies:
path: ../../../veilid/veilid-flutter
dependency_overrides:
# async_tools:
# path: ../../../dart_async_tools
async_tools:
path: ../../../dart_async_tools
bloc_advanced_tools:
path: ../../../bloc_advanced_tools

View file

@ -92,10 +92,9 @@ packages:
async_tools:
dependency: "direct main"
description:
name: async_tools
sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067
url: "https://pub.dev"
source: hosted
path: "../dart_async_tools"
relative: true
source: path
version: "0.1.9"
auto_size_text:
dependency: "direct main"

View file

@ -109,8 +109,8 @@ dependencies:
zxing2: ^0.2.3
dependency_overrides:
# async_tools:
# path: ../dart_async_tools
async_tools:
path: ../dart_async_tools
bloc_advanced_tools:
path: ../bloc_advanced_tools
# searchable_listview: