conversation cubit work

This commit is contained in:
Christien Rioux 2024-02-07 19:13:32 -05:00
parent 4fe8a07d45
commit f2caa7a0b3
2 changed files with 117 additions and 130 deletions

View file

@ -74,61 +74,69 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
await super.close(); await super.close();
} }
void updateLocalConversationState(AsyncValue<proto.Conversation> avconv) {
final newState = avconv.when(
data: (conv) {
_incrementalState = ConversationState(
localConversation: conv,
remoteConversation: _incrementalState.remoteConversation);
// return loading still if state isn't complete
if ((_localConversationRecordKey != null &&
_incrementalState.localConversation == null) ||
(_remoteConversationRecordKey != null &&
_incrementalState.remoteConversation == null)) {
return const AsyncValue<ConversationState>.loading();
}
// state is complete, all required keys are open
return AsyncValue.data(_incrementalState);
},
loading: AsyncValue<ConversationState>.loading,
error: AsyncValue<ConversationState>.error,
);
emit(newState);
}
void updateRemoteConversationState(AsyncValue<proto.Conversation> avconv) {
final newState = avconv.when(
data: (conv) {
_incrementalState = ConversationState(
localConversation: _incrementalState.localConversation,
remoteConversation: conv);
// return loading still if state isn't complete
if ((_localConversationRecordKey != null &&
_incrementalState.localConversation == null) ||
(_remoteConversationRecordKey != null &&
_incrementalState.remoteConversation == null)) {
return const AsyncValue<ConversationState>.loading();
}
// state is complete, all required keys are open
return AsyncValue.data(_incrementalState);
},
loading: AsyncValue<ConversationState>.loading,
error: AsyncValue<ConversationState>.error,
);
emit(newState);
}
// Open local converation key // Open local converation key
Future<void> _setLocalConversation(DHTRecord localConversationRecord) async { Future<void> _setLocalConversation(DHTRecord localConversationRecord) async {
assert(_localConversationCubit == null,
'shoud not set local conversation twice');
_localConversationCubit = DefaultDHTRecordCubit.value( _localConversationCubit = DefaultDHTRecordCubit.value(
record: localConversationRecord, record: localConversationRecord,
decodeState: proto.Conversation.fromBuffer); decodeState: proto.Conversation.fromBuffer);
_localConversationCubit!.stream.listen((avconv) { _localConversationCubit!.stream.listen(updateLocalConversationState);
final newState = avconv.when(
data: (conv) {
_incrementalState = ConversationState(
localConversation: conv,
remoteConversation: _incrementalState.remoteConversation);
// return loading still if state isn't complete
if ((_localConversationRecordKey != null &&
_incrementalState.localConversation == null) ||
(_remoteConversationRecordKey != null &&
_incrementalState.remoteConversation == null)) {
return const AsyncValue<ConversationState>.loading();
}
// state is complete, all required keys are open
return AsyncValue.data(_incrementalState);
},
loading: AsyncValue<ConversationState>.loading,
error: AsyncValue<ConversationState>.error,
);
emit(newState);
});
} }
// Open remote converation key // Open remote converation key
Future<void> _setRemoteConversation( Future<void> _setRemoteConversation(
DHTRecord remoteConversationRecord) async { DHTRecord remoteConversationRecord) async {
assert(_remoteConversationCubit == null,
'shoud not set remote conversation twice');
_remoteConversationCubit = DefaultDHTRecordCubit.value( _remoteConversationCubit = DefaultDHTRecordCubit.value(
record: remoteConversationRecord, record: remoteConversationRecord,
decodeState: proto.Conversation.fromBuffer); decodeState: proto.Conversation.fromBuffer);
_remoteConversationCubit!.stream.listen((avconv) { _remoteConversationCubit!.stream.listen(updateRemoteConversationState);
final newState = avconv.when(
data: (conv) {
_incrementalState = ConversationState(
localConversation: _incrementalState.localConversation,
remoteConversation: conv);
// return loading still if state isn't complete
if ((_localConversationRecordKey != null &&
_incrementalState.localConversation == null) ||
(_remoteConversationRecordKey != null &&
_incrementalState.remoteConversation == null)) {
return const AsyncValue<ConversationState>.loading();
}
// state is complete, all required keys are open
return AsyncValue.data(_incrementalState);
},
loading: AsyncValue<ConversationState>.loading,
error: AsyncValue<ConversationState>.error,
);
emit(newState);
});
} }
// Initialize a local conversation // Initialize a local conversation
@ -136,6 +144,8 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// incomplete 'existingConversationRecord' that we need to fill // incomplete 'existingConversationRecord' that we need to fill
// in now that we have the remote identity key // in now that we have the remote identity key
// The ConversationCubit must not already have a local conversation // The ConversationCubit must not already have a local conversation
// The callback allows for more initialization to occur and for
// cleanup to delete records upon failure of the callback
Future<T> initLocalConversation<T>( Future<T> initLocalConversation<T>(
{required proto.Profile profile, {required proto.Profile profile,
required FutureOr<T> Function(DHTRecord) callback, required FutureOr<T> Function(DHTRecord) callback,
@ -176,20 +186,25 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
crypto: crypto, crypto: crypto,
smplWriter: writer)) smplWriter: writer))
.deleteScope((messages) async { .deleteScope((messages) async {
// Write local conversation key // Create initial local conversation key contents
final conversation = proto.Conversation() final conversation = proto.Conversation()
..profile = profile ..profile = profile
..identityMasterJson = jsonEncode( ..identityMasterJson = jsonEncode(
_activeAccountInfo.localAccount.identityMaster.toJson()) _activeAccountInfo.localAccount.identityMaster.toJson())
..messages = messages.record.key.toProto(); ..messages = messages.record.key.toProto();
// // Write initial conversation to record
final update = await localConversation.tryWriteProtobuf( final update = await localConversation.tryWriteProtobuf(
proto.Conversation.fromBuffer, conversation); proto.Conversation.fromBuffer, conversation);
if (update != null) { if (update != null) {
throw Exception('Failed to write local conversation'); throw Exception('Failed to write local conversation');
} }
return await callback(localConversation); final out = await callback(localConversation);
// Upon success emit the local conversation record to the state
updateLocalConversationState(AsyncValue.data(conversation));
return out;
}); });
}); });
@ -202,70 +217,29 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
// Force refresh of conversation keys // Force refresh of conversation keys
Future<void> refresh() async { Future<void> refresh() async {
if (_localConversationCubit != null) { final lcc = _localConversationCubit;
xxx use defaultdhtrecordcubit refresh mechanism final rcc = _remoteConversationCubit;
if (lcc != null) {
await lcc.refreshDefault();
}
if (rcc != null) {
await rcc.refreshDefault();
} }
} }
// Future<proto.Conversation?> readRemoteConversation() async { Future<proto.Conversation?> writeLocalConversation({
// final accountRecordKey = required proto.Conversation conversation,
// _activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey; }) async {
// final pool = DHTRecordPool.instance; final update = await _localConversationCubit!.record
.tryWriteProtobuf(proto.Conversation.fromBuffer, conversation);
// final crypto = await getConversationCrypto(); if (update != null) {
// return (await pool.openRead(_remoteConversationRecordKey!, updateLocalConversationState(AsyncValue.data(conversation));
// parent: accountRecordKey, crypto: crypto)) }
// .scope((remoteConversation) async {
// //
// final conversation =
// await remoteConversation.getProtobuf(proto.Conversation.fromBuffer);
// return conversation;
// });
// }
// Future<proto.Conversation?> readLocalConversation() async { return update;
// final accountRecordKey = }
// _activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
// final pool = DHTRecordPool.instance;
// final crypto = await getConversationCrypto();
// return (await pool.openRead(_localConversationRecordKey!,
// parent: accountRecordKey, crypto: crypto))
// .scope((localConversation) async {
// //
// final update =
// await localConversation.getProtobuf(proto.Conversation.fromBuffer);
// if (update != null) {
// return update;
// }
// return null;
// });
// }
// Future<proto.Conversation?> writeLocalConversation({
// required proto.Conversation conversation,
// }) async {
// final accountRecordKey =
// _activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey;
// final pool = DHTRecordPool.instance;
// final crypto = await getConversationCrypto();
// final writer = _activeAccountInfo.conversationWriter;
// return (await pool.openWrite(_localConversationRecordKey!, writer,
// parent: accountRecordKey, crypto: crypto))
// .scope((localConversation) async {
// //
// final update = await localConversation.tryWriteProtobuf(
// proto.Conversation.fromBuffer, conversation);
// if (update != null) {
// return update;
// }
// return null;
// });
// }
//
Future<DHTRecordCrypto> getConversationCrypto() async { Future<DHTRecordCrypto> getConversationCrypto() async {
var conversationCrypto = _conversationCrypto; var conversationCrypto = _conversationCrypto;
@ -286,7 +260,7 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
final ActiveAccountInfo _activeAccountInfo; final ActiveAccountInfo _activeAccountInfo;
final TypedKey _remoteIdentityPublicKey; final TypedKey _remoteIdentityPublicKey;
TypedKey? _localConversationRecordKey; TypedKey? _localConversationRecordKey;
TypedKey? _remoteConversationRecordKey; final TypedKey? _remoteConversationRecordKey;
DefaultDHTRecordCubit<proto.Conversation>? _localConversationCubit; DefaultDHTRecordCubit<proto.Conversation>? _localConversationCubit;
DefaultDHTRecordCubit<proto.Conversation>? _remoteConversationCubit; DefaultDHTRecordCubit<proto.Conversation>? _remoteConversationCubit;
ConversationState _incrementalState; ConversationState _incrementalState;

View file

@ -78,18 +78,29 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
} }
Future<void> refresh(List<ValueSubkeyRange> subkeys) async { Future<void> refresh(List<ValueSubkeyRange> subkeys) async {
var updateSubkeys = [...subkeys];
for (final skr in subkeys) { for (final skr in subkeys) {
for (var sk = skr.low; sk <= skr.high; sk++) { for (var sk = skr.low; sk <= skr.high; sk++) {
final data = await _record.get( final data = await _record.get(
subkey: sk, forceRefresh: true, onlyUpdates: true); subkey: sk, forceRefresh: true, onlyUpdates: true);
if (data != null) { if (data != null) {
final newState = await _stateFunction(_record, subkeys, data); final newState = await _stateFunction(_record, updateSubkeys, data);
xxx remove sk from update if (newState != null) {
// Emit the new state
emit(AsyncValue.data(newState));
}
return;
} }
// remove sk from update list
// if we did not get an update for that subkey
updateSubkeys = updateSubkeys.removeSubkey(sk);
} }
} }
} }
DHTRecord get record => _record;
StreamSubscription<VeilidUpdateValueChange>? _subscription; StreamSubscription<VeilidUpdateValueChange>? _subscription;
late DHTRecord _record; late DHTRecord _record;
bool _wantsCloseRecord; bool _wantsCloseRecord;
@ -113,7 +124,7 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
stateFunction: _makeStateFunction(decodeState), stateFunction: _makeStateFunction(decodeState),
); );
static Future<T?> Function(DHTRecord) _makeInitialStateFunction<T>( static InitialStateFunction<T> _makeInitialStateFunction<T>(
T Function(List<int> data) decodeState) => T Function(List<int> data) decodeState) =>
(record) async { (record) async {
final initialData = await record.get(); final initialData = await record.get();
@ -123,28 +134,30 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
return decodeState(initialData); return decodeState(initialData);
}; };
static Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List) static StateFunction<T> _makeStateFunction<T>(
_makeStateFunction<T>(T Function(List<int> data) decodeState) => T Function(List<int> data) decodeState) =>
(record, subkeys, updatedata) async { (record, subkeys, updatedata) async {
final defaultSubkey = record.subkeyOrDefault(-1); final defaultSubkey = record.subkeyOrDefault(-1);
if (subkeys.containsSubkey(defaultSubkey)) { if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data; final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low; final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey) { if (firstSubkey != defaultSubkey) {
final maybeData = await record.get(forceRefresh: true); final maybeData = await record.get(forceRefresh: true);
if (maybeData == null) { if (maybeData == null) {
return null; return null;
}
data = maybeData;
} else {
data = updatedata;
}
final newState = decodeState(data);
return newState;
} }
return null; data = maybeData;
}; } else {
data = updatedata;
}
final newState = decodeState(data);
return newState;
}
return null;
};
// xxx add refresh/get mechanism to DHTRecordCubit and here too, then propagage to conversation_cubit Future<void> refreshDefault() async {
// xxx should just be a 'get' like in dht_short_array_cubit final defaultSubkey = _record.subkeyOrDefault(-1);
await refresh([ValueSubkeyRange(low: defaultSubkey, high: defaultSubkey)]);
}
} }