mirror of
https://gitlab.com/veilid/veilidchat.git
synced 2025-08-02 11:16:10 -04:00
checkpoint
This commit is contained in:
parent
56d65442f4
commit
751022e743
26 changed files with 482 additions and 303 deletions
|
@ -9,12 +9,15 @@ import 'package:async_tools/async_tools.dart';
|
|||
import 'package:equatable/equatable.dart';
|
||||
import 'package:flutter_bloc/flutter_bloc.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
import 'package:veilid_support/veilid_support.dart';
|
||||
|
||||
import '../../account_manager/account_manager.dart';
|
||||
import '../../proto/proto.dart' as proto;
|
||||
import '../../tools/tools.dart';
|
||||
|
||||
const _sfUpdateAccountChange = 'updateAccountChange';
|
||||
|
||||
@immutable
|
||||
class ConversationState extends Equatable {
|
||||
const ConversationState(
|
||||
|
@ -27,6 +30,9 @@ class ConversationState extends Equatable {
|
|||
List<Object?> get props => [localConversation, remoteConversation];
|
||||
}
|
||||
|
||||
/// Represents the control channel between two contacts
|
||||
/// Used to pass profile, identity and status changes, and the messages key for
|
||||
/// 1-1 chats
|
||||
class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
||||
ConversationCubit(
|
||||
{required UnlockedAccountInfo activeAccountInfo,
|
||||
|
@ -53,6 +59,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
debugName: 'ConversationCubit::LocalConversation',
|
||||
parent: accountRecordKey,
|
||||
crypto: crypto);
|
||||
|
||||
return record;
|
||||
});
|
||||
});
|
||||
|
@ -80,6 +87,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
@override
|
||||
Future<void> close() async {
|
||||
await _initWait();
|
||||
await _accountSubscription?.cancel();
|
||||
await _localSubscription?.cancel();
|
||||
await _remoteSubscription?.cancel();
|
||||
await _localConversationCubit?.close();
|
||||
|
@ -88,127 +96,16 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
await super.close();
|
||||
}
|
||||
|
||||
void _updateLocalConversationState(AsyncValue<proto.Conversation> avconv) {
|
||||
final newState = avconv.when(
|
||||
data: (conv) {
|
||||
_incrementalState = ConversationState(
|
||||
localConversation: conv,
|
||||
remoteConversation: _incrementalState.remoteConversation);
|
||||
// return loading still if state isn't complete
|
||||
if ((_localConversationRecordKey != null &&
|
||||
_incrementalState.localConversation == null) ||
|
||||
(_remoteConversationRecordKey != null &&
|
||||
_incrementalState.remoteConversation == null)) {
|
||||
return const AsyncValue<ConversationState>.loading();
|
||||
}
|
||||
// state is complete, all required keys are open
|
||||
return AsyncValue.data(_incrementalState);
|
||||
},
|
||||
loading: AsyncValue<ConversationState>.loading,
|
||||
error: AsyncValue<ConversationState>.error,
|
||||
);
|
||||
emit(newState);
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Public Interface
|
||||
|
||||
void _updateRemoteConversationState(AsyncValue<proto.Conversation> avconv) {
|
||||
final newState = avconv.when(
|
||||
data: (conv) {
|
||||
_incrementalState = ConversationState(
|
||||
localConversation: _incrementalState.localConversation,
|
||||
remoteConversation: conv);
|
||||
// return loading still if state isn't complete
|
||||
if ((_localConversationRecordKey != null &&
|
||||
_incrementalState.localConversation == null) ||
|
||||
(_remoteConversationRecordKey != null &&
|
||||
_incrementalState.remoteConversation == null)) {
|
||||
return const AsyncValue<ConversationState>.loading();
|
||||
}
|
||||
// state is complete, all required keys are open
|
||||
return AsyncValue.data(_incrementalState);
|
||||
},
|
||||
loading: AsyncValue<ConversationState>.loading,
|
||||
error: AsyncValue<ConversationState>.error,
|
||||
);
|
||||
emit(newState);
|
||||
}
|
||||
|
||||
// Open local converation key
|
||||
Future<void> _setLocalConversation(Future<DHTRecord> Function() open) async {
|
||||
assert(_localConversationCubit == null,
|
||||
'shoud not set local conversation twice');
|
||||
_localConversationCubit = DefaultDHTRecordCubit(
|
||||
open: open, decodeState: proto.Conversation.fromBuffer);
|
||||
_localSubscription =
|
||||
_localConversationCubit!.stream.listen(_updateLocalConversationState);
|
||||
}
|
||||
|
||||
// Open remote converation key
|
||||
Future<void> _setRemoteConversation(Future<DHTRecord> Function() open) async {
|
||||
assert(_remoteConversationCubit == null,
|
||||
'shoud not set remote conversation twice');
|
||||
_remoteConversationCubit = DefaultDHTRecordCubit(
|
||||
open: open, decodeState: proto.Conversation.fromBuffer);
|
||||
_remoteSubscription =
|
||||
_remoteConversationCubit!.stream.listen(_updateRemoteConversationState);
|
||||
}
|
||||
|
||||
Future<bool> delete() async {
|
||||
final pool = DHTRecordPool.instance;
|
||||
|
||||
await _initWait();
|
||||
final localConversationCubit = _localConversationCubit;
|
||||
final remoteConversationCubit = _remoteConversationCubit;
|
||||
|
||||
final deleteSet = DelayedWaitSet<void>();
|
||||
|
||||
if (localConversationCubit != null) {
|
||||
final data = localConversationCubit.state.asData;
|
||||
if (data == null) {
|
||||
log.warning('could not delete local conversation');
|
||||
return false;
|
||||
}
|
||||
|
||||
deleteSet.add(() async {
|
||||
_localConversationCubit = null;
|
||||
await localConversationCubit.close();
|
||||
final conversation = data.value;
|
||||
final messagesKey = conversation.messages.toVeilid();
|
||||
await pool.deleteRecord(messagesKey);
|
||||
await pool.deleteRecord(_localConversationRecordKey!);
|
||||
_localConversationRecordKey = null;
|
||||
});
|
||||
}
|
||||
|
||||
if (remoteConversationCubit != null) {
|
||||
final data = remoteConversationCubit.state.asData;
|
||||
if (data == null) {
|
||||
log.warning('could not delete remote conversation');
|
||||
return false;
|
||||
}
|
||||
|
||||
deleteSet.add(() async {
|
||||
_remoteConversationCubit = null;
|
||||
await remoteConversationCubit.close();
|
||||
final conversation = data.value;
|
||||
final messagesKey = conversation.messages.toVeilid();
|
||||
await pool.deleteRecord(messagesKey);
|
||||
await pool.deleteRecord(_remoteConversationRecordKey!);
|
||||
});
|
||||
}
|
||||
|
||||
// Commit the delete futures
|
||||
await deleteSet();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Initialize a local conversation
|
||||
// If we were the initiator of the conversation there may be an
|
||||
// incomplete 'existingConversationRecord' that we need to fill
|
||||
// in now that we have the remote identity key
|
||||
// The ConversationCubit must not already have a local conversation
|
||||
// The callback allows for more initialization to occur and for
|
||||
// cleanup to delete records upon failure of the callback
|
||||
/// Initialize a local conversation
|
||||
/// If we were the initiator of the conversation there may be an
|
||||
/// incomplete 'existingConversationRecord' that we need to fill
|
||||
/// in now that we have the remote identity key
|
||||
/// The ConversationCubit must not already have a local conversation
|
||||
/// The callback allows for more initialization to occur and for
|
||||
/// cleanup to delete records upon failure of the callback
|
||||
Future<T> initLocalConversation<T>(
|
||||
{required proto.Profile profile,
|
||||
required FutureOr<T> Function(DHTRecord) callback,
|
||||
|
@ -280,6 +177,167 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
return out;
|
||||
}
|
||||
|
||||
/// Delete the conversation keys associated with this conversation
|
||||
Future<bool> delete() async {
|
||||
final pool = DHTRecordPool.instance;
|
||||
|
||||
await _initWait();
|
||||
final localConversationCubit = _localConversationCubit;
|
||||
final remoteConversationCubit = _remoteConversationCubit;
|
||||
|
||||
final deleteSet = DelayedWaitSet<void>();
|
||||
|
||||
if (localConversationCubit != null) {
|
||||
final data = localConversationCubit.state.asData;
|
||||
if (data == null) {
|
||||
log.warning('could not delete local conversation');
|
||||
return false;
|
||||
}
|
||||
|
||||
deleteSet.add(() async {
|
||||
_localConversationCubit = null;
|
||||
await localConversationCubit.close();
|
||||
final conversation = data.value;
|
||||
final messagesKey = conversation.messages.toVeilid();
|
||||
await pool.deleteRecord(messagesKey);
|
||||
await pool.deleteRecord(_localConversationRecordKey!);
|
||||
_localConversationRecordKey = null;
|
||||
});
|
||||
}
|
||||
|
||||
if (remoteConversationCubit != null) {
|
||||
final data = remoteConversationCubit.state.asData;
|
||||
if (data == null) {
|
||||
log.warning('could not delete remote conversation');
|
||||
return false;
|
||||
}
|
||||
|
||||
deleteSet.add(() async {
|
||||
_remoteConversationCubit = null;
|
||||
await remoteConversationCubit.close();
|
||||
final conversation = data.value;
|
||||
final messagesKey = conversation.messages.toVeilid();
|
||||
await pool.deleteRecord(messagesKey);
|
||||
await pool.deleteRecord(_remoteConversationRecordKey!);
|
||||
});
|
||||
}
|
||||
|
||||
// Commit the delete futures
|
||||
await deleteSet();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Force refresh of conversation keys
|
||||
Future<void> refresh() async {
|
||||
await _initWait();
|
||||
|
||||
final lcc = _localConversationCubit;
|
||||
final rcc = _remoteConversationCubit;
|
||||
|
||||
if (lcc != null) {
|
||||
await lcc.refreshDefault();
|
||||
}
|
||||
if (rcc != null) {
|
||||
await rcc.refreshDefault();
|
||||
}
|
||||
}
|
||||
|
||||
/// Watch for account record changes and update the conversation
|
||||
void watchAccountChanges(Stream<AsyncValue<proto.Account>> accountStream,
|
||||
AsyncValue<proto.Account> currentState) {
|
||||
assert(_accountSubscription == null, 'only watch account once');
|
||||
_accountSubscription = accountStream.listen(_updateAccountChange);
|
||||
_updateAccountChange(currentState);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Private Implementation
|
||||
|
||||
void _updateAccountChange(AsyncValue<proto.Account> avaccount) {
|
||||
final account = avaccount.asData?.value;
|
||||
if (account == null) {
|
||||
return;
|
||||
}
|
||||
final cubit = _localConversationCubit;
|
||||
if (cubit == null) {
|
||||
return;
|
||||
}
|
||||
serialFuture((this, _sfUpdateAccountChange), () async {
|
||||
await cubit.record.eventualUpdateProtobuf(proto.Conversation.fromBuffer,
|
||||
(old) async {
|
||||
if (old == null || old.profile == account.profile) {
|
||||
return null;
|
||||
}
|
||||
return old.deepCopy()..profile = account.profile;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void _updateLocalConversationState(AsyncValue<proto.Conversation> avconv) {
|
||||
final newState = avconv.when(
|
||||
data: (conv) {
|
||||
_incrementalState = ConversationState(
|
||||
localConversation: conv,
|
||||
remoteConversation: _incrementalState.remoteConversation);
|
||||
// return loading still if state isn't complete
|
||||
if ((_localConversationRecordKey != null &&
|
||||
_incrementalState.localConversation == null) ||
|
||||
(_remoteConversationRecordKey != null &&
|
||||
_incrementalState.remoteConversation == null)) {
|
||||
return const AsyncValue<ConversationState>.loading();
|
||||
}
|
||||
// state is complete, all required keys are open
|
||||
return AsyncValue.data(_incrementalState);
|
||||
},
|
||||
loading: AsyncValue<ConversationState>.loading,
|
||||
error: AsyncValue<ConversationState>.error,
|
||||
);
|
||||
emit(newState);
|
||||
}
|
||||
|
||||
void _updateRemoteConversationState(AsyncValue<proto.Conversation> avconv) {
|
||||
final newState = avconv.when(
|
||||
data: (conv) {
|
||||
_incrementalState = ConversationState(
|
||||
localConversation: _incrementalState.localConversation,
|
||||
remoteConversation: conv);
|
||||
// return loading still if state isn't complete
|
||||
if ((_localConversationRecordKey != null &&
|
||||
_incrementalState.localConversation == null) ||
|
||||
(_remoteConversationRecordKey != null &&
|
||||
_incrementalState.remoteConversation == null)) {
|
||||
return const AsyncValue<ConversationState>.loading();
|
||||
}
|
||||
// state is complete, all required keys are open
|
||||
return AsyncValue.data(_incrementalState);
|
||||
},
|
||||
loading: AsyncValue<ConversationState>.loading,
|
||||
error: AsyncValue<ConversationState>.error,
|
||||
);
|
||||
emit(newState);
|
||||
}
|
||||
|
||||
// Open local converation key
|
||||
Future<void> _setLocalConversation(Future<DHTRecord> Function() open) async {
|
||||
assert(_localConversationCubit == null,
|
||||
'shoud not set local conversation twice');
|
||||
_localConversationCubit = DefaultDHTRecordCubit(
|
||||
open: open, decodeState: proto.Conversation.fromBuffer);
|
||||
_localSubscription =
|
||||
_localConversationCubit!.stream.listen(_updateLocalConversationState);
|
||||
}
|
||||
|
||||
// Open remote converation key
|
||||
Future<void> _setRemoteConversation(Future<DHTRecord> Function() open) async {
|
||||
assert(_remoteConversationCubit == null,
|
||||
'shoud not set remote conversation twice');
|
||||
_remoteConversationCubit = DefaultDHTRecordCubit(
|
||||
open: open, decodeState: proto.Conversation.fromBuffer);
|
||||
_remoteSubscription =
|
||||
_remoteConversationCubit!.stream.listen(_updateRemoteConversationState);
|
||||
}
|
||||
|
||||
// Initialize local messages
|
||||
Future<T> _initLocalMessages<T>({
|
||||
required UnlockedAccountInfo activeAccountInfo,
|
||||
|
@ -299,34 +357,6 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
.deleteScope((messages) async => await callback(messages));
|
||||
}
|
||||
|
||||
// Force refresh of conversation keys
|
||||
Future<void> refresh() async {
|
||||
await _initWait();
|
||||
|
||||
final lcc = _localConversationCubit;
|
||||
final rcc = _remoteConversationCubit;
|
||||
|
||||
if (lcc != null) {
|
||||
await lcc.refreshDefault();
|
||||
}
|
||||
if (rcc != null) {
|
||||
await rcc.refreshDefault();
|
||||
}
|
||||
}
|
||||
|
||||
Future<proto.Conversation?> writeLocalConversation({
|
||||
required proto.Conversation conversation,
|
||||
}) async {
|
||||
final update = await _localConversationCubit!.record
|
||||
.tryWriteProtobuf(proto.Conversation.fromBuffer, conversation);
|
||||
|
||||
if (update != null) {
|
||||
_updateLocalConversationState(AsyncValue.data(conversation));
|
||||
}
|
||||
|
||||
return update;
|
||||
}
|
||||
|
||||
Future<VeilidCrypto> _cachedConversationCrypto() async {
|
||||
var conversationCrypto = _conversationCrypto;
|
||||
if (conversationCrypto != null) {
|
||||
|
@ -339,6 +369,9 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
return conversationCrypto;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Fields
|
||||
|
||||
final UnlockedAccountInfo _unlockedAccountInfo;
|
||||
final TypedKey _remoteIdentityPublicKey;
|
||||
TypedKey? _localConversationRecordKey;
|
||||
|
@ -347,9 +380,9 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
|
|||
DefaultDHTRecordCubit<proto.Conversation>? _remoteConversationCubit;
|
||||
StreamSubscription<AsyncValue<proto.Conversation>>? _localSubscription;
|
||||
StreamSubscription<AsyncValue<proto.Conversation>>? _remoteSubscription;
|
||||
StreamSubscription<AsyncValue<proto.Account>>? _accountSubscription;
|
||||
ConversationState _incrementalState = const ConversationState(
|
||||
localConversation: null, remoteConversation: null);
|
||||
//
|
||||
VeilidCrypto? _conversationCrypto;
|
||||
final WaitSet<void> _initWait = WaitSet();
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue