pagination work

This commit is contained in:
Christien Rioux 2024-06-03 21:20:00 -04:00
parent 4082d1dd76
commit 5473bd2ee4
11 changed files with 469 additions and 24 deletions

View file

@ -2,11 +2,13 @@ import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../proto/proto.dart' as proto;
import '../../tools/tools.dart';
import '../models/models.dart';
import 'reconciliation/reconciliation.dart';
@ -42,7 +44,7 @@ class RenderStateElement {
bool sentOffline;
}
typedef SingleContactMessagesState = AsyncValue<IList<MessageState>>;
typedef SingleContactMessagesState = AsyncValue<MessagesState>;
// Cubit that processes single-contact chats
// Builds the reconciled chat record from the local and remote conversation keys
@ -60,6 +62,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_localMessagesRecordKey = localMessagesRecordKey,
_remoteConversationRecordKey = remoteConversationRecordKey,
_remoteMessagesRecordKey = remoteMessagesRecordKey,
_commandController = StreamController(),
super(const AsyncValue.loading()) {
// Async Init
_initWait.add(_init);
@ -69,6 +72,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
Future<void> close() async {
await _initWait();
await _commandController.close();
await _commandRunnerFut;
await _unsentMessagesQueue.close();
await _sentSubscription?.cancel();
await _rcvdSubscription?.cancel();
@ -99,6 +104,9 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Remote messages key
await _initRcvdMessagesCubit();
// Command execution background process
_commandRunnerFut = Future.delayed(Duration.zero, _commandRunner);
}
// Make crypto
@ -191,6 +199,34 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_sendMessage(message: message);
}
// Run a chat command
void runCommand(String command) {
final (cmd, rest) = command.splitOnce(' ');
if (kDebugMode) {
if (cmd == '/repeat' && rest != null) {
final (countStr, text) = rest.splitOnce(' ');
final count = int.tryParse(countStr);
if (count != null) {
runCommandRepeat(count, text ?? '');
}
}
}
}
// Run a repeat command
void runCommandRepeat(int count, String text) {
_commandController.sink.add(() async {
for (var i = 0; i < count; i++) {
final protoMessageText = proto.Message_Text()
..text = text.replaceAll(RegExp(r'\$n\b'), i.toString());
final message = proto.Message()..text = protoMessageText;
_sendMessage(message: message);
await Future<void>.delayed(const Duration(milliseconds: 50));
}
});
}
////////////////////////////////////////////////////////////////////////////
// Internal implementation
@ -220,9 +256,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_reconciliation.reconcileMessages(
_remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!);
// Update the view
_renderState();
}
// Called when the reconciled messages window gets a change
@ -296,7 +329,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
final renderedElements = <RenderStateElement>[];
for (final m in reconciledMessages.elements) {
for (final m in reconciledMessages.windowElements) {
final isLocal = m.content.author.toVeilid() ==
_activeAccountInfo.localAccount.identityMaster
.identityPublicTypedKey();
@ -316,7 +349,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}
// Render the state
final renderedState = renderedElements
final messages = renderedElements
.map((x) => MessageState(
content: x.message,
sentTimestamp: Timestamp.fromInt64(x.message.timestamp),
@ -325,7 +358,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
.toIList();
// Emit the rendered state
emit(AsyncValue.data(renderedState));
emit(AsyncValue.data(MessagesState(
windowMessages: messages,
length: reconciledMessages.length,
windowTail: reconciledMessages.windowTail,
windowCount: reconciledMessages.windowCount,
follow: reconciledMessages.follow)));
}
void _sendMessage({required proto.Message message}) {
@ -344,6 +382,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_renderState();
}
Future<void> _commandRunner() async {
await for (final command in _commandController.stream) {
await command();
}
}
/////////////////////////////////////////////////////////////////////////
// Static utility functions
@ -383,4 +427,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
StreamSubscription<DHTLogBusyState<proto.Message>>? _rcvdSubscription;
StreamSubscription<TableDBArrayProtobufBusyState<proto.ReconciledMessage>>?
_reconciledSubscription;
final StreamController<Future<void> Function()> _commandController;
late final Future<void> _commandRunnerFut;
}