table db change tracking

This commit is contained in:
Christien Rioux 2024-05-27 20:34:54 -04:00
parent 9c5feed732
commit e04fd7ee77
7 changed files with 237 additions and 20 deletions

View File

@ -420,6 +420,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
void addTextMessage({required proto.Message_Text messageText}) {
final message = proto.Message()
..id = generateNextId()
..author = _activeAccountInfo.localAccount.identityMaster
.identityPublicTypedKey()
.toProto()

View File

@ -39,19 +39,9 @@ class MessageState with _$MessageState {
}
extension MessageStateExt on MessageState {
String get uniqueId {
final author = content.author.toVeilid().toString();
final id = base64UrlNoPadEncode(content.id);
return '$author|$id';
}
static (proto.TypedKey, Uint8List) splitUniqueId(String uniqueId) {
final parts = uniqueId.split('|');
if (parts.length != 2) {
throw Exception('invalid unique id');
}
final author = TypedKey.fromString(parts[0]).toProto();
final id = base64UrlNoPadDecode(parts[1]);
return (author, id);
Uint8List get uniqueId {
final author = content.author.toVeilid().decode();
final id = content.id;
return author..addAll(id);
}
}

View File

@ -126,7 +126,7 @@ class ChatComponent extends StatelessWidget {
final textMessage = types.TextMessage(
author: isLocal ? _localUser : _remoteUser,
createdAt: (message.timestamp.value ~/ BigInt.from(1000)).toInt(),
id: message.uniqueId,
id: base64UrlNoPadEncode(message.uniqueId),
text: contextText.text,
showStatus: status != null,
status: status);
@ -168,7 +168,7 @@ class ChatComponent extends StatelessWidget {
void _handleSendPressed(types.PartialText message) {
final text = message.text;
final replyId = (message.repliedMessage != null)
? MessageStateExt.splitUniqueId(message.repliedMessage!.id).$2
? base64UrlNoPadDecode(message.repliedMessage!.id)
: null;
Timestamp? expiration;
int? viewLimit;

View File

@ -124,7 +124,7 @@ message Message {
string text = 1;
// Topic of the message / Content warning
optional string topic = 2;
// Message id replied to
// Message id replied to (author id + message id)
optional bytes reply_id = 3;
// Message expiration timestamp
uint64 expiration = 4;

View File

@ -4,9 +4,24 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:charcode/charcode.dart';
import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart';
import '../veilid_support.dart';
@immutable
class TableDBArrayUpdate extends Equatable {
const TableDBArrayUpdate(
{required this.headDelta, required this.tailDelta, required this.length})
: assert(length >= 0, 'should never have negative length');
final int headDelta;
final int tailDelta;
final int length;
@override
List<Object?> get props => [headDelta, tailDelta, length];
}
class TableDBArray {
TableDBArray({
required String table,
@ -63,8 +78,9 @@ class TableDBArray {
await Veilid.instance.deleteTableDB(_table);
}
Future<StreamSubscription<void>> listen(void Function() onChanged) async =>
_changeStream.stream.listen((_) => onChanged());
Future<StreamSubscription<void>> listen(
void Function(TableDBArrayUpdate) onChanged) async =>
_changeStream.stream.listen(onChanged);
////////////////////////////////////////////////////////////
// Public interface
@ -160,6 +176,7 @@ class TableDBArray {
// Put the entry in the index
final pos = _length;
_length++;
_tailDelta++;
await _setIndexEntry(pos, entry);
}
@ -167,6 +184,7 @@ class TableDBArray {
VeilidTableDBTransaction t, List<Uint8List> values) async {
var pos = _length;
_length += values.length;
_tailDelta += values.length;
for (final value in values) {
// Allocate an entry to store the value
final entry = await _allocateEntry();
@ -318,11 +336,18 @@ class TableDBArray {
final _oldLength = _length;
final _oldNextFree = _nextFree;
final _oldMaxEntry = _maxEntry;
final _oldHeadDelta = _headDelta;
final _oldTailDelta = _tailDelta;
try {
final out = await transactionScope(_tableDB, (t) async {
final out = await closure(t);
await _saveHead(t);
await _flushDirtyChunks(t);
// Send change
_changeStream.add(TableDBArrayUpdate(
headDelta: _headDelta, tailDelta: _tailDelta, length: _length));
_headDelta = 0;
_tailDelta = 0;
return out;
});
@ -332,6 +357,8 @@ class TableDBArray {
_length = _oldLength;
_nextFree = _oldNextFree;
_maxEntry = _oldMaxEntry;
_headDelta = _oldHeadDelta;
_tailDelta = _oldTailDelta;
// invalidate caches because they could have been written to
_chunkCache.clear();
_dirtyChunks.clear();
@ -428,6 +455,10 @@ class TableDBArray {
// Then add to length
_length += length;
if (start == 0) {
_headDelta += length;
}
_tailDelta += length;
}
Future<void> _removeIndexEntry(int pos) async => _removeIndexEntries(pos, 1);
@ -485,6 +516,10 @@ class TableDBArray {
// Then truncate
_length -= length;
if (start == 0) {
_headDelta -= length;
}
_tailDelta -= length;
}
Future<Uint8List> _loadIndexChunk(int chunkNumber) async {
@ -578,6 +613,10 @@ class TableDBArray {
final WaitSet<void> _initWait = WaitSet();
final Mutex _mutex = Mutex();
// Change tracking
int _headDelta = 0;
int _tailDelta = 0;
// Head state
int _length = 0;
int _nextFree = 0;
@ -587,5 +626,6 @@ class TableDBArray {
final Map<int, Uint8List> _dirtyChunks = {};
static const int _chunkCacheLength = 3;
final StreamController<void> _changeStream = StreamController.broadcast();
final StreamController<TableDBArrayUpdate> _changeStream =
StreamController.broadcast();
}

View File

@ -0,0 +1,185 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:bloc_advanced_tools/bloc_advanced_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
@immutable
class TableDBArrayStateData<T> extends Equatable {
const TableDBArrayStateData(
{required this.elements,
required this.tail,
required this.count,
required this.follow});
// The view of the elements in the dhtlog
// Span is from [tail-length, tail)
final IList<T> elements;
// One past the end of the last element
final int tail;
// The total number of elements to try to keep in 'elements'
final int count;
// If we should have the tail following the array
final bool follow;
@override
List<Object?> get props => [elements, tail, count, follow];
}
typedef TableDBArrayState<T> = AsyncValue<TableDBArrayStateData<T>>;
typedef TableDBArrayBusyState<T> = BlocBusyState<TableDBArrayState<T>>;
class TableDBArrayCubit<T> extends Cubit<TableDBArrayBusyState<T>>
with BlocBusyWrapper<TableDBArrayState<T>> {
TableDBArrayCubit({
required Future<TableDBArray> Function() open,
required T Function(List<int> data) decodeElement,
}) : _decodeElement = decodeElement,
super(const BlocBusyState(AsyncValue.loading())) {
_initWait.add(() async {
// Open table db array
_array = await open();
_wantsCloseArray = true;
// Make initial state update
await _refreshNoWait();
_subscription = await _array.listen(_update);
});
}
// Set the tail position of the array for pagination.
// If tail is 0, the end of the array is used.
// If tail is negative, the position is subtracted from the current array
// length.
// If tail is positive, the position is absolute from the head of the array
// If follow is enabled, the tail offset will update when the array changes
Future<void> setWindow(
{int? tail, int? count, bool? follow, bool forceRefresh = false}) async {
await _initWait();
if (tail != null) {
_tail = tail;
}
if (count != null) {
_count = count;
}
if (follow != null) {
_follow = follow;
}
await _refreshNoWait(forceRefresh: forceRefresh);
}
Future<void> refresh({bool forceRefresh = false}) async {
await _initWait();
await _refreshNoWait(forceRefresh: forceRefresh);
}
Future<void> _refreshNoWait({bool forceRefresh = false}) async =>
busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh));
Future<void> _refreshInner(
void Function(AsyncValue<TableDBArrayStateData<T>>) emit,
{bool forceRefresh = false}) async {
final avElements = await _loadElements(_tail, _count);
final err = avElements.asError;
if (err != null) {
emit(AsyncValue.error(err.error, err.stackTrace));
return;
}
final loading = avElements.asLoading;
if (loading != null) {
emit(const AsyncValue.loading());
return;
}
final elements = avElements.asData!.value;
emit(AsyncValue.data(TableDBArrayStateData(
elements: elements, tail: _tail, count: _count, follow: _follow)));
}
Future<AsyncValue<IList<T>>> _loadElements(
int tail,
int count,
) async {
try {
final length = _array.length;
final end = ((tail - 1) % length) + 1;
final start = (count < end) ? end - count : 0;
final allItems =
(await _array.getRange(start, end)).map(_decodeElement).toIList();
return AsyncValue.data(allItems);
} on Exception catch (e, st) {
return AsyncValue.error(e, st);
}
}
void _update(TableDBArrayUpdate upd) {
// Run at most one background update process
// Because this is async, we could get an update while we're
// still processing the last one. Only called after init future has run
// so we dont have to wait for that here.
// Accumulate head and tail deltas
_headDelta += upd.headDelta;
_tailDelta += upd.tailDelta;
_sspUpdate.busyUpdate<T, TableDBArrayState<T>>(busy, (emit) async {
// apply follow
if (_follow) {
if (_tail <= 0) {
// Negative tail is already following tail changes
} else {
// Positive tail is measured from the head, so apply deltas
_tail = (_tail + _tailDelta - _headDelta) % upd.length;
}
} else {
if (_tail <= 0) {
// Negative tail is following tail changes so apply deltas
var posTail = _tail + upd.length;
posTail = (posTail + _tailDelta - _headDelta) % upd.length;
_tail = posTail - upd.length;
} else {
// Positive tail is measured from head so not following tail
}
}
_headDelta = 0;
_tailDelta = 0;
await _refreshInner(emit);
});
}
@override
Future<void> close() async {
await _initWait();
await _subscription?.cancel();
_subscription = null;
if (_wantsCloseArray) {
await _array.close();
}
await super.close();
}
Future<R?> operate<R>(Future<R?> Function(TableDBArray) closure) async {
await _initWait();
return closure(_array);
}
final WaitSet<void> _initWait = WaitSet();
late final TableDBArray _array;
final T Function(List<int> data) _decodeElement;
StreamSubscription<void>? _subscription;
bool _wantsCloseArray = false;
final _sspUpdate = SingleStatelessProcessor();
// Accumulated deltas since last update
var _headDelta = 0;
var _tailDelta = 0;
// Cubit window into the TableDBArray
var _tail = 0;
var _count = DHTShortArray.maxElements;
var _follow = true;
}

View File

@ -15,5 +15,6 @@ export 'src/persistent_queue.dart';
export 'src/protobuf_tools.dart';
export 'src/table_db.dart';
export 'src/table_db_array.dart';
export 'src/table_db_array_cubit.dart';
export 'src/veilid_crypto.dart';
export 'src/veilid_log.dart' hide veilidLoggy;