optimizations

This commit is contained in:
Christien Rioux 2024-04-07 16:19:45 -04:00
parent 8335e36876
commit b7f7258c70
7 changed files with 93 additions and 51 deletions

View File

@ -10,8 +10,7 @@ import '../../account_manager/account_manager.dart';
import '../../proto/proto.dart' as proto; import '../../proto/proto.dart' as proto;
class _SingleContactMessageQueueEntry { class _SingleContactMessageQueueEntry {
_SingleContactMessageQueueEntry({this.localMessages, this.remoteMessages}); _SingleContactMessageQueueEntry({this.remoteMessages});
IList<proto.Message>? localMessages;
IList<proto.Message>? remoteMessages; IList<proto.Message>? remoteMessages;
} }
@ -96,9 +95,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
parent: _localConversationRecordKey, parent: _localConversationRecordKey,
crypto: _messagesCrypto), crypto: _messagesCrypto),
decodeElement: proto.Message.fromBuffer); decodeElement: proto.Message.fromBuffer);
_localSubscription =
_localMessagesCubit!.stream.listen(_updateLocalMessagesState);
_updateLocalMessagesState(_localMessagesCubit!.state);
} }
// Open remote messages key // Open remote messages key
@ -132,18 +128,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_updateReconciledChatState(_reconciledChatMessagesCubit!.state); _updateReconciledChatState(_reconciledChatMessagesCubit!.state);
} }
// Called when the local messages list gets a change
void _updateLocalMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
final localMessages = avmessages.state.asData?.value;
if (localMessages == null) {
return;
}
// Add local messages updates to queue to process asynchronously
_messagesUpdateQueue
.add(_SingleContactMessageQueueEntry(localMessages: localMessages));
}
// Called when the remote messages list gets a change // Called when the remote messages list gets a change
void _updateRemoteMessagesState( void _updateRemoteMessagesState(
BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) { BlocBusyState<AsyncValue<IList<proto.Message>>> avmessages) {
@ -232,12 +216,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Merge remote and local messages into the reconciled chat log // Merge remote and local messages into the reconciled chat log
await reconciledChatMessagesCubit await reconciledChatMessagesCubit
.operateWrite((reconciledMessagesWriter) async { .operateWrite((reconciledMessagesWriter) async {
// xxx for now, keep two lists, but can probable simplify this out soon
if (entry.localMessages != null) {
await _mergeMessagesInner(
reconciledMessagesWriter: reconciledMessagesWriter,
messages: entry.localMessages!);
}
if (entry.remoteMessages != null) { if (entry.remoteMessages != null) {
await _mergeMessagesInner( await _mergeMessagesInner(
reconciledMessagesWriter: reconciledMessagesWriter, reconciledMessagesWriter: reconciledMessagesWriter,
@ -246,24 +224,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}); });
} }
// Force refresh of messages
Future<void> refresh() async {
await _initWait();
final lcc = _localMessagesCubit;
final rcc = _remoteMessagesCubit;
if (lcc != null) {
await lcc.refresh();
}
if (rcc != null) {
await rcc.refresh();
}
}
Future<void> addMessage({required proto.Message message}) async { Future<void> addMessage({required proto.Message message}) async {
await _initWait(); await _initWait();
await _reconciledChatMessagesCubit!.operateWrite((writer) =>
_mergeMessagesInner(
reconciledMessagesWriter: writer, messages: [message].toIList()));
await _localMessagesCubit! await _localMessagesCubit!
.operateWrite((writer) => writer.tryAddItem(message.writeToBuffer())); .operateWrite((writer) => writer.tryAddItem(message.writeToBuffer()));
} }

View File

@ -528,6 +528,12 @@ ChatTheme makeChatTheme(ScaleScheme scale, TextTheme textTheme) =>
inputPadding: const EdgeInsets.all(9), inputPadding: const EdgeInsets.all(9),
inputTextColor: scale.primaryScale.text, inputTextColor: scale.primaryScale.text,
attachmentButtonIcon: const Icon(Icons.attach_file), attachmentButtonIcon: const Icon(Icons.attach_file),
receivedMessageBodyTextStyle: const TextStyle(
color: neutral0,
fontSize: 16,
fontWeight: FontWeight.w500,
height: 1.5,
),
); );
ThemeData radixGenerator(Brightness brightness, RadixThemeColor themeColor) { ThemeData radixGenerator(Brightness brightness, RadixThemeColor themeColor) {

View File

@ -69,12 +69,23 @@ class _DeveloperPageState extends State<DeveloperPage> {
} }
Future<void> _sendDebugCommand(String debugCommand) async { Future<void> _sendDebugCommand(String debugCommand) async {
if (debugCommand == 'pool allocations') {
DHTRecordPool.instance.debugPrintAllocations();
return;
}
if (debugCommand == 'pool opened') {
DHTRecordPool.instance.debugPrintOpened();
return;
}
if (debugCommand == 'ellet') { if (debugCommand == 'ellet') {
setState(() { setState(() {
_showEllet = !_showEllet; _showEllet = !_showEllet;
}); });
return; return;
} }
_debugOut('DEBUG >>>\n$debugCommand\n'); _debugOut('DEBUG >>>\n$debugCommand\n');
try { try {
final out = await Veilid.instance.debug(debugCommand); final out = await Veilid.instance.debug(debugCommand);

View File

@ -98,6 +98,15 @@ class OpenedRecordInfo {
..sort((a, b) => a.key.toString().compareTo(b.key.toString())); ..sort((a, b) => a.key.toString().compareTo(b.key.toString()));
return '[${r.map((x) => x.debugName).join(',')}]'; return '[${r.map((x) => x.debugName).join(',')}]';
} }
String get details {
final r = records.toList()
..sort((a, b) => a.key.toString().compareTo(b.key.toString()));
return '[${r.map((x) => "writer=${x._writer} "
"defaultSubkey=${x._defaultSubkey}").join(',')}]';
}
String get sharedDetails => shared.toString();
} }
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> { class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
@ -768,4 +777,29 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
_inTick = false; _inTick = false;
} }
} }
void debugPrintAllocations() {
final sortedAllocations = _state.debugNames.entries.asList()
..sort((a, b) => a.key.compareTo(b.key));
log('DHTRecordPool Allocations: (count=${sortedAllocations.length})');
for (final entry in sortedAllocations) {
log(' ${entry.key}: ${entry.value}');
}
}
void debugPrintOpened() {
final sortedOpened = _opened.entries.asList()
..sort((a, b) => a.key.toString().compareTo(b.key.toString()));
log('DHTRecordPool Opened Records: (count=${sortedOpened.length})');
for (final entry in sortedOpened) {
log(' ${entry.key}: \n'
' debugNames=${entry.value.debugNames}\n'
' details=${entry.value.details}\n'
' sharedDetails=${entry.value.sharedDetails}\n');
}
}
} }

View File

@ -1,5 +1,13 @@
part of 'dht_short_array.dart'; part of 'dht_short_array.dart';
class DHTShortArrayHeadLookup {
DHTShortArrayHeadLookup(
{required this.record, required this.recordSubkey, required this.seq});
final DHTRecord record;
final int recordSubkey;
final int seq;
}
class _DHTShortArrayHead { class _DHTShortArrayHead {
_DHTShortArrayHead({required DHTRecord headRecord}) _DHTShortArrayHead({required DHTRecord headRecord})
: _headRecord = headRecord, : _headRecord = headRecord,
@ -299,16 +307,18 @@ class _DHTShortArrayHead {
); );
} }
Future<(DHTRecord, int)> lookupPosition(int pos) async { Future<DHTShortArrayHeadLookup> lookupPosition(int pos) async {
final idx = _index[pos]; final idx = _index[pos];
return lookupIndex(idx); return lookupIndex(idx);
} }
Future<(DHTRecord, int)> lookupIndex(int idx) async { Future<DHTShortArrayHeadLookup> lookupIndex(int idx) async {
final seq = idx < _seqs.length ? _seqs[idx] : 0xFFFFFFFF;
final recordNumber = idx ~/ _stride; final recordNumber = idx ~/ _stride;
final record = await _getOrCreateLinkedRecord(recordNumber); final record = await _getOrCreateLinkedRecord(recordNumber);
final recordSubkey = (idx % _stride) + ((recordNumber == 0) ? 1 : 0); final recordSubkey = (idx % _stride) + ((recordNumber == 0) ? 1 : 0);
return (record, recordSubkey); return DHTShortArrayHeadLookup(
record: record, recordSubkey: recordSubkey, seq: seq);
} }
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
@ -416,9 +426,9 @@ class _DHTShortArrayHead {
/// If a write is happening, update the network copy as well. /// If a write is happening, update the network copy as well.
Future<void> updatePositionSeq(int pos, bool write) async { Future<void> updatePositionSeq(int pos, bool write) async {
final idx = _index[pos]; final idx = _index[pos];
final (record, recordSubkey) = await lookupIndex(idx); final lookup = await lookupIndex(idx);
final report = final report = await lookup.record
await record.inspect(subkeys: [ValueSubkeyRange.single(recordSubkey)]); .inspect(subkeys: [ValueSubkeyRange.single(lookup.recordSubkey)]);
while (_localSeqs.length <= idx) { while (_localSeqs.length <= idx) {
_localSeqs.add(0xFFFFFFFF); _localSeqs.add(0xFFFFFFFF);

View File

@ -68,10 +68,11 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
throw IndexError.withLength(pos, length); throw IndexError.withLength(pos, length);
} }
final (record, recordSubkey) = await _head.lookupPosition(pos); final lookup = await _head.lookupPosition(pos);
final refresh = forceRefresh || _head.positionNeedsRefresh(pos); final refresh = forceRefresh || _head.positionNeedsRefresh(pos);
final out = record.get(subkey: recordSubkey, forceRefresh: refresh); final out =
lookup.record.get(subkey: lookup.recordSubkey, forceRefresh: refresh);
await _head.updatePositionSeq(pos, false); await _head.updatePositionSeq(pos, false);
return out; return out;

View File

@ -136,6 +136,12 @@ class _DHTShortArrayWrite implements DHTShortArrayWrite {
@override @override
Future<bool> trySwapItem(int aPos, int bPos) async { Future<bool> trySwapItem(int aPos, int bPos) async {
if (aPos < 0 || aPos >= _head.length) {
throw IndexError.withLength(aPos, _head.length);
}
if (bPos < 0 || bPos >= _head.length) {
throw IndexError.withLength(bPos, _head.length);
}
// Swap indices // Swap indices
_head.swapIndex(aPos, bPos); _head.swapIndex(aPos, bPos);
@ -144,8 +150,13 @@ class _DHTShortArrayWrite implements DHTShortArrayWrite {
@override @override
Future<Uint8List> tryRemoveItem(int pos) async { Future<Uint8List> tryRemoveItem(int pos) async {
final (record, recordSubkey) = await _head.lookupPosition(pos); if (pos < 0 || pos >= _head.length) {
final result = await record.get(subkey: recordSubkey); throw IndexError.withLength(pos, _head.length);
}
final lookup = await _head.lookupPosition(pos);
final result = lookup.seq == 0xFFFFFFFF
? null
: await lookup.record.get(subkey: lookup.recordSubkey);
if (result == null) { if (result == null) {
throw StateError('Element does not exist'); throw StateError('Element does not exist');
} }
@ -164,9 +175,12 @@ class _DHTShortArrayWrite implements DHTShortArrayWrite {
if (pos < 0 || pos >= _head.length) { if (pos < 0 || pos >= _head.length) {
throw IndexError.withLength(pos, _head.length); throw IndexError.withLength(pos, _head.length);
} }
final (record, recordSubkey) = await _head.lookupPosition(pos); final lookup = await _head.lookupPosition(pos);
final oldValue = await record.get(subkey: recordSubkey); final oldValue = lookup.seq == 0xFFFFFFFF
final result = await record.tryWriteBytes(newValue, subkey: recordSubkey); ? null
: await lookup.record.get(subkey: lookup.recordSubkey);
final result = await lookup.record
.tryWriteBytes(newValue, subkey: lookup.recordSubkey);
if (result != null) { if (result != null) {
// A result coming back means the element was overwritten already // A result coming back means the element was overwritten already
return (result, false); return (result, false);