diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart
new file mode 100644
index 0000000..b441e75
--- /dev/null
+++ b/lib/chat/cubits/reconciliation/author_input_queue.dart
@@ -0,0 +1,145 @@
+import 'dart:async';
+import 'dart:collection';
+import 'dart:math';
+import 'package:fast_immutable_collections/fast_immutable_collections.dart';
+import 'package:veilid_support/veilid_support.dart';
+import '../../../proto/proto.dart' as proto;
+import 'author_input_source.dart';
+import 'output_position.dart';
+class AuthorInputQueue {
+  AuthorInputQueue({
+    required this.author,
+    required this.inputSource,
+    required this.lastOutputPosition,
+    required this.onError,
+  }):
+    assert(inputSource.messages.count>0, 'no input source window length'),
+    assert(inputSource.messages.elements.isNotEmpty, 'no input source elements'),
+    assert(inputSource.messages.tail >= inputSource.messages.elements.length, 'tail is before initial messages end'),
+    assert(inputSource.messages.tail > 0, 'tail is not greater than zero'),
+    currentPosition = inputSource.messages.tail,
+    currentWindow = inputSource.messages.elements,
+    windowLength = inputSource.messages.count,
+    windowFirst = inputSource.messages.tail - inputSource.messages.elements.length,
+    windowLast = inputSource.messages.tail - 1;
+  ////////////////////////////////////////////////////////////////////////////
+  bool get isEmpty => toReconcile.isEmpty;
+  proto.Message? get current => toReconcile.firstOrNull;
+  bool consume() {
+    toReconcile.removeFirst();
+    return toReconcile.isNotEmpty;
+  }
+  Future<bool> prepareInputQueue() async {
+    // Go through batches of the input dhtlog starting with
+    // the current cubit state which is at the tail of the log
+    // Find the last reconciled message for this author
+    outer:
+    while (true) {
+      for (var rn = currentWindow.length;
+          rn >= 0 && currentPosition >= 0;
+          rn--, currentPosition--) {
+        final elem = currentWindow[rn];
+        // If we've found an input element that is older than our last
+        // reconciled message for this author, then we stop
+        if (lastOutputPosition != null) {
+          if (elem.value.timestamp < lastOutputPosition!.message.timestamp) {
+            break outer;
+          }
+        }
+        // Drop the 'offline' elements because we don't reconcile
+        // anything until it has been confirmed to be committed to the DHT
+        if (elem.isOffline) {
+          continue;
+        }
+        // Add to head of reconciliation queue
+        toReconcile.addFirst(elem.value);
+        if (toReconcile.length > _maxQueueChunk) {
+          toReconcile.removeLast();
+        }
+      }
+      if (currentPosition < 0) {
+        break;
+      }
+    xxx update window here and make this and other methods work
+    }
+    return true;
+  }
+  // Slide the window toward the current position and load the batch around it
+  Future<bool> updateWindow() async {
+      // Check if we are still in the window
+      if (currentPosition>=windowFirst && currentPosition <= windowLast) {
+        return true;
+      }
+      // Get the length of the cubit
+      final inputLength = await inputSource.cubit.operate((r) async => r.length);
+      // If not, slide the window
+      if (currentPosition<windowFirst) {
+        // Slide it backward, current position is now windowLast
+        windowFirst = max((currentPosition - windowLength) + 1, 0);
+        windowLast = currentPosition;
+      } else {
+        // Slide it forward, current position is now windowFirst
+        windowFirst = currentPosition;
+        windowLast = min((currentPosition + windowLength) - 1, inputLength - 1);
+      }
+      // Get another input batch futher back
+      final nextWindow =
+          await inputSource.cubit.loadElements(windowLast + 1, (windowLast + 1) - windowFirst);
+      final asErr = nextWindow.asError;
+      if (asErr != null) {
+        onError(asErr.error, asErr.stackTrace);
+        return false;
+      }
+      final asLoading = nextWindow.asLoading;
+      if (asLoading != null) {
+        // xxx: no need to block the cubit here for this
+        // xxx: might want to switch to a 'busy' state though
+        // xxx: to let the messages view show a spinner at the bottom
+        // xxx: while we reconcile...
+        // emit(const AsyncValue.loading());
+        return false;
+      }
+      currentWindow = nextWindow.asData!.value;
+      return true;
+  }
+  ////////////////////////////////////////////////////////////////////////////
+  final TypedKey author;
+  final ListQueue<proto.Message> toReconcile = ListQueue<proto.Message>();
+  final AuthorInputSource inputSource;
+  final OutputPosition? lastOutputPosition;
+  final void Function(Object, StackTrace?) onError;
+  // The current position in the input log that we are looking at
+  int currentPosition;
+  // The current input window elements
+  IList<DHTLogElementState<proto.Message>> currentWindow;
+  // The first position of the sliding input window
+  int windowFirst;
+  // The last position of the sliding input window
+  int windowLast;
+  // Desired maximum window length
+  int windowLength;
+  static const int _maxQueueChunk = 256;
diff --git a/lib/chat/cubits/reconciliation/author_input_source.dart b/lib/chat/cubits/reconciliation/author_input_source.dart
new file mode 100644
index 0000000..75f020e
--- /dev/null
+++ b/lib/chat/cubits/reconciliation/author_input_source.dart
@@ -0,0 +1,10 @@
+import 'package:veilid_support/veilid_support.dart';
+import '../../../proto/proto.dart' as proto;
+class AuthorInputSource {
+  AuthorInputSource({required this.messages, required this.cubit});
+  final DHTLogStateData<proto.Message> messages;
+  final DHTLogCubit<proto.Message> cubit;
diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart
new file mode 100644
index 0000000..51cfe2b
--- /dev/null
+++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart
@@ -0,0 +1,206 @@
+import 'dart:async';
+import 'package:async_tools/async_tools.dart';
+import 'package:fast_immutable_collections/fast_immutable_collections.dart';
+import 'package:sorted_list/sorted_list.dart';
+import 'package:veilid_support/veilid_support.dart';
+import '../../../proto/proto.dart' as proto;
+import 'author_input_queue.dart';
+import 'author_input_source.dart';
+import 'output_position.dart';
+class MessageReconciliation {
+  MessageReconciliation(
+      {required TableDBArrayCubit<proto.ReconciledMessage> output,
+      required void Function(Object, StackTrace?) onError})
+      : _outputCubit = output,
+        _onError = onError;
+  ////////////////////////////////////////////////////////////////////////////
+  void reconcileMessages(
+      TypedKey author,
+      DHTLogStateData<proto.Message> inputMessages,
+      DHTLogCubit<proto.Message> inputMessagesCubit) {
+    if (inputMessages.elements.isEmpty) {
+      return;
+    }
+    _inputSources[author] =
+        AuthorInputSource(messages: inputMessages, cubit: inputMessagesCubit);
+    singleFuture(this, onError: _onError, () async {
+      // Take entire list of input sources we have currently and process them
+      final inputSources = _inputSources;
+      _inputSources = {};
+      final inputFuts = <Future<AuthorInputQueue?>>[];
+      for (final kv in inputSources.entries) {
+        final author = kv.key;
+        final inputSource = kv.value;
+        inputFuts
+            .add(_enqueueAuthorInput(author: author, inputSource: inputSource));
+      }
+      final inputQueues = await inputFuts.wait;
+      // Make this safe to cast by removing inputs that were rejected or empty
+      inputQueues.removeNulls();
+      // Process all input queues together
+      await _outputCubit
+          .operate((reconciledArray) async => _reconcileInputQueues(
+                reconciledArray: reconciledArray,
+                inputQueues: inputQueues.cast<AuthorInputQueue>(),
+              ));
+    });
+  }
+  ////////////////////////////////////////////////////////////////////////////
+  // Set up a single author's message reconciliation
+  Future<AuthorInputQueue?> _enqueueAuthorInput(
+      {required TypedKey author,
+      required AuthorInputSource inputSource}) async {
+    // Get the position of our most recent reconciled message from this author
+    final lastReconciledMessage =
+        await _findNewestReconciledMessage(author: author);
+    // Find oldest message we have not yet reconciled
+    final inputQueue = await _buildAuthorInputQueue(
+        author: author,
+        inputSource: inputSource,
+        lastOutputPosition: lastReconciledMessage);
+    return inputQueue;
+  }
+  // Get the position of our most recent
+  // reconciled message from this author
+  // XXX: For a group chat, this should find when the author
+  // was added to the membership so we don't just go back in time forever
+  Future<OutputPosition?> _findNewestReconciledMessage(
+          {required TypedKey author}) async =>
+      _outputCubit.operate((arr) async {
+        var pos = arr.length - 1;
+        while (pos >= 0) {
+          final message = await arr.getProtobuf(proto.Message.fromBuffer, pos);
+          if (message == null) {
+            throw StateError('should have gotten last message');
+          }
+          if (message.author.toVeilid() == author) {
+            return OutputPosition(message, pos);
+          }
+          pos--;
+        }
+        return null;
+      });
+  // Find oldest message we have not yet reconciled and build a queue forward
+  // from that position
+  Future<AuthorInputQueue?> _buildAuthorInputQueue(
+      {required TypedKey author,
+      required AuthorInputSource inputSource,
+      required OutputPosition? lastOutputPosition}) async {
+    // Make an author input queue
+    final authorInputQueue = AuthorInputQueue(
+        author: author,
+        inputSource: inputSource,
+        lastOutputPosition: lastOutputPosition,
+        onError: _onError);
+    if (!await authorInputQueue.prepareInputQueue()) {
+      return null;
+    }
+    return authorInputQueue;
+  }
+  // Process a list of author input queues and insert their messages
+  // into the output array, performing validation steps along the way
+  Future<void> _reconcileInputQueues({
+    required TableDBArray reconciledArray,
+    required List<AuthorInputQueue> inputQueues,
+  }) async {
+    // Ensure queues all have something to do
+    inputQueues.removeWhere((q) => q.isEmpty);
+    if (inputQueues.isEmpty) {
+      return;
+    }
+    // Sort queues from earliest to latest and then by author
+    // to ensure a deterministic insert order
+    inputQueues.sort((a, b) {
+      final acmp = a.lastOutputPosition?.pos ?? -1;
+      final bcmp = b.lastOutputPosition?.pos ?? -1;
+      if (acmp == bcmp) {
+        return a.author.toString().compareTo(b.author.toString());
+      }
+      return acmp.compareTo(bcmp);
+    });
+    // Start at the earliest position we know about in all the queues
+    final firstOutputPos = inputQueues.first.lastOutputPosition?.pos;
+    // Get the timestamp for this output position
+    var currentOutputMessage = firstOutputPos == null
+        ? null
+        : await reconciledArray.getProtobuf(
+            proto.Message.fromBuffer, firstOutputPos);
+    var currentOutputPos = firstOutputPos ?? 0;
+    final toInsert =
+        SortedList<proto.Message>(proto.MessageExt.compareTimestamp);
+    while (inputQueues.isNotEmpty) {
+      // Get up to '_maxReconcileChunk' of the items from the queues
+      // that we can insert at this location
+      bool added;
+      do {
+        added = false;
+        var someQueueEmpty = false;
+        for (final inputQueue in inputQueues) {
+          final inputCurrent = inputQueue.current!;
+          if (currentOutputMessage == null ||
+              inputCurrent.timestamp <= currentOutputMessage.timestamp) {
+            toInsert.add(inputCurrent);
+            added = true;
+            // Advance this queue
+            if (!inputQueue.consume()) {
+              // Queue is empty now, run a queue purge
+              someQueueEmpty = true;
+            }
+          }
+        }
+        // Remove empty queues now that we're done iterating
+        if (someQueueEmpty) {
+          inputQueues.removeWhere((q) => q.isEmpty);
+        }
+        if (toInsert.length >= _maxReconcileChunk) {
+          break;
+        }
+      } while (added);
+      // Perform insertions in bulk
+      if (toInsert.isNotEmpty) {
+        await reconciledArray.insertAllProtobuf(currentOutputPos, toInsert);
+        toInsert.clear();
+      } else {
+        // If there's nothing to insert at this position move to the next one
+        currentOutputPos++;
+        currentOutputMessage = await reconciledArray.getProtobuf(
+            proto.Message.fromBuffer, currentOutputPos);
+      }
+    }
+  }
+  ////////////////////////////////////////////////////////////////////////////
+  Map<TypedKey, AuthorInputSource> _inputSources = {};
+  final TableDBArrayCubit<proto.ReconciledMessage> _outputCubit;
+  final void Function(Object, StackTrace?) _onError;
+  static const int _maxReconcileChunk = 65536;
diff --git a/lib/chat/cubits/reconciliation/output_position.dart b/lib/chat/cubits/reconciliation/output_position.dart
new file mode 100644
index 0000000..258259e
--- /dev/null
+++ b/lib/chat/cubits/reconciliation/output_position.dart
@@ -0,0 +1,13 @@
+import 'package:equatable/equatable.dart';
+import 'package:meta/meta.dart';
+import '../../../proto/proto.dart' as proto;
+class OutputPosition extends Equatable {
+  const OutputPosition(this.message, this.pos);
+  final proto.Message message;
+  final int pos;
+  @override
+  List<Object?> get props => [message, pos];
diff --git a/lib/chat/cubits/reconciliation/reconciliation.dart b/lib/chat/cubits/reconciliation/reconciliation.dart
new file mode 100644
index 0000000..2dc0b93
--- /dev/null
+++ b/lib/chat/cubits/reconciliation/reconciliation.dart
@@ -0,0 +1 @@
+export 'message_reconciliation.dart';
diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart
index ff21d43..6c4fd8f 100644
--- a/lib/chat/cubits/single_contact_messages_cubit.dart
+++ b/lib/chat/cubits/single_contact_messages_cubit.dart
@@ -1,28 +1,16 @@
 import 'dart:async';
-import 'dart:collection';
 import 'dart:convert';
 import 'dart:typed_data';
 import 'package:async_tools/async_tools.dart';
-import 'package:equatable/equatable.dart';
 import 'package:fast_immutable_collections/fast_immutable_collections.dart';
-import 'package:fixnum/fixnum.dart';
 import 'package:flutter_bloc/flutter_bloc.dart';
-import 'package:meta/meta.dart';
 import 'package:veilid_support/veilid_support.dart';
 import '../../account_manager/account_manager.dart';
 import '../../proto/proto.dart' as proto;
 import '../models/models.dart';
-class MessagePosition extends Equatable {
-  const MessagePosition(this.message, this.pos);
-  final proto.Message message;
-  final int pos;
-  @override
-  List<Object?> get props => [message, pos];
+import 'message_reconciliation.dart';
 class RenderStateElement {
@@ -165,6 +153,13 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
     _reconciledMessagesCubit = TableDBArrayCubit(
         open: () async => TableDBArray.make(table: tableName, crypto: crypto),
         decodeElement: proto.ReconciledMessage.fromBuffer);
+    _reconciliation = MessageReconciliation(
+        output: _reconciledMessagesCubit!,
+        onError: (e, st) {
+          emit(AsyncValue.error(e, st));
+        });
     _reconciledSubscription =
@@ -203,7 +198,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
-    _reconcileMessages(
+    _reconciliation.reconcileMessages(
@@ -216,7 +211,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
-    _reconcileMessages(
+    _reconciliation.reconcileMessages(
         _remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!);
@@ -246,6 +241,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
     message.signature = signature.toProto();
+  Future<Uint8List> _generateInitialId(
+          {required PublicKey identityPublicKey}) async =>
+      (await _localMessagesCryptoSystem
+              .generateHash(identityPublicKey.decode()))
+          .decode();
   Future<void> _processMessageToSend(
       proto.Message message, proto.Message? previousMessage) async {
     // Get the previous message if we don't have one
@@ -257,10 +258,9 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
     if (previousMessage == null) {
       // If there's no last sent message,
       // we start at a hash of the identity public key
-      message.id = (await _localMessagesCryptoSystem.generateHash(
-              _activeAccountInfo.localAccount.identityMaster.identityPublicKey
-                  .decode()))
-          .decode();
+      message.id = await _generateInitialId(
+          identityPublicKey:
+              _activeAccountInfo.localAccount.identityMaster.identityPublicKey);
     } else {
       // If there is a last message, we generate the hash
       // of the last message's signature and use it as our next id
@@ -285,177 +285,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
         writer.tryAddAll(messages.map((m) => m.writeToBuffer()).toList()));
-  void _reconcileMessages(
-      TypedKey author,
-      DHTLogStateData<proto.Message> inputMessages,
-      DHTLogCubit<proto.Message> inputMessagesCubit) {
-    singleFuture(_reconciledMessagesCubit!, () async {
-      // Get the position of our most recent
-      // reconciled message from this author
-      // XXX: For a group chat, this should find when the author
-      // was added to the membership so we don't just go back in time forever
-      final lastReconciledMessage =
-          await _reconciledMessagesCubit!.operate((arr) async {
-        var pos = arr.length - 1;
-        while (pos >= 0) {
-          final message = await arr.getProtobuf(proto.Message.fromBuffer, pos);
-          if (message == null) {
-            throw StateError('should have gotten last message');
-          }
-          if (message.author.toVeilid() == author) {
-            return MessagePosition(message, pos);
-          }
-          pos--;
-        }
-        return null;
-      });
-      // Find oldest message we have not yet reconciled
-      final toReconcile = ListQueue<proto.Message>();
-      // Go through batches of the input dhtlog starting with
-      // the current cubit state which is at the tail of the log
-      // Find the last reconciled message for this author
-      var currentInputPos = inputMessages.tail;
-      var currentInputElements = inputMessages.elements;
-      final inputBatchCount = inputMessages.count;
-      outer:
-      while (true) {
-        for (var rn = currentInputElements.length;
-            rn >= 0 && currentInputPos >= 0;
-            rn--, currentInputPos--) {
-          final elem = currentInputElements[rn];
-          // If we've found an input element that is older than our last
-          // reconciled message for this author, then we stop
-          if (lastReconciledMessage != null) {
-            if (elem.value.timestamp <
-                lastReconciledMessage.message.timestamp) {
-              break outer;
-            }
-          }
-          // Drop the 'offline' elements because we don't reconcile
-          // anything until it has been confirmed to be committed to the DHT
-          if (elem.isOffline) {
-            continue;
-          }
-          // Add to head of reconciliation queue
-          toReconcile.addFirst(elem.value);
-          if (toReconcile.length > _maxReconcileChunk) {
-            toReconcile.removeLast();
-          }
-        }
-        if (currentInputPos < 0) {
-          break;
-        }
-        // Get another input batch futher back
-        final nextInputBatch = await inputMessagesCubit.loadElements(
-            currentInputPos, inputBatchCount);
-        final asErr = nextInputBatch.asError;
-        if (asErr != null) {
-          emit(AsyncValue.error(asErr.error, asErr.stackTrace));
-          return;
-        }
-        final asLoading = nextInputBatch.asLoading;
-        if (asLoading != null) {
-          // xxx: no need to block the cubit here for this
-          // xxx: might want to switch to a 'busy' state though
-          // xxx: to let the messages view show a spinner at the bottom
-          // xxx: while we reconcile...
-          // emit(const AsyncValue.loading());
-          return;
-        }
-        currentInputElements = nextInputBatch.asData!.value;
-      }
-      // Now iterate from our current input position in batches
-      // and reconcile the messages in the forward direction
-      var insertPosition =
-          (lastReconciledMessage != null) ? lastReconciledMessage.pos : 0;
-      var lastInsertTime = (lastReconciledMessage != null)
-          ? lastReconciledMessage.message.timestamp
-          : Int64.ZERO;
-      // Insert this batch
-      xxx expand upon 'res' and iterate batches and update insert position/time
-      final res = await _reconciledMessagesCubit!.operate((arr) async =>
-          _reconcileMessagesInner(
-              reconciledArray: arr,
-              toReconcile: toReconcile,
-              insertPosition: insertPosition,
-              lastInsertTime: lastInsertTime));
-      // Update the view
-      _renderState();
-    });
-  }
-  Future<void> _reconcileMessagesInner(
-      {required TableDBArray reconciledArray,
-      required Iterable<proto.Message> toReconcile,
-      required int insertPosition,
-      required Int64 lastInsertTime}) async {
-    // // Ensure remoteMessages is sorted by timestamp
-    // final newMessages = messages
-    //     .sort((a, b) => a.timestamp.compareTo(b.timestamp))
-    //     .removeDuplicates();
-    // // Existing messages will always be sorted by timestamp so merging is easy
-    // final existingMessages = await reconciledMessagesWriter
-    //     .getItemRangeProtobuf(proto.Message.fromBuffer, 0);
-    // if (existingMessages == null) {
-    //   throw Exception(
-    //       'Could not load existing reconciled messages at this time');
-    // }
-    // var ePos = 0;
-    // var nPos = 0;
-    // while (ePos < existingMessages.length && nPos < newMessages.length) {
-    //   final existingMessage = existingMessages[ePos];
-    //   final newMessage = newMessages[nPos];
-    //   // If timestamp to insert is less than
-    //   // the current position, insert it here
-    //   final newTs = Timestamp.fromInt64(newMessage.timestamp);
-    //   final existingTs = Timestamp.fromInt64(existingMessage.timestamp);
-    //   final cmp = newTs.compareTo(existingTs);
-    //   if (cmp < 0) {
-    //     // New message belongs here
-    //     // Insert into dht backing array
-    //     await reconciledMessagesWriter.tryInsertItem(
-    //         ePos, newMessage.writeToBuffer());
-    //     // Insert into local copy as well for this operation
-    //     existingMessages.insert(ePos, newMessage);
-    //     // Next message
-    //     nPos++;
-    //     ePos++;
-    //   } else if (cmp == 0) {
-    //     // Duplicate, skip
-    //     nPos++;
-    //     ePos++;
-    //   } else if (cmp > 0) {
-    //     // New message belongs later
-    //     ePos++;
-    //   }
-    // }
-    // // If there are any new messages left, append them all
-    // while (nPos < newMessages.length) {
-    //   final newMessage = newMessages[nPos];
-    //   // Append to dht backing array
-    //   await reconciledMessagesWriter.tryAddItem(newMessage.writeToBuffer());
-    //   // Insert into local copy as well for this operation
-    //   existingMessages.add(newMessage);
-    //   nPos++;
-    // }
-  }
   // Produce a state for this cubit from the input cubits and queues
   void _renderState() {
     // Get all reconciled messages
@@ -584,12 +413,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
   DHTLogCubit<proto.Message>? _rcvdMessagesCubit;
   TableDBArrayCubit<proto.ReconciledMessage>? _reconciledMessagesCubit;
+  late final MessageReconciliation _reconciliation;
   late final PersistentQueue<proto.Message> _sendingMessagesQueue;
   StreamSubscription<DHTLogBusyState<proto.Message>>? _sentSubscription;
   StreamSubscription<DHTLogBusyState<proto.Message>>? _rcvdSubscription;
-  static const int _maxReconcileChunk = 65536;
diff --git a/lib/proto/extensions.dart b/lib/proto/extensions.dart
index e9fd9a2..1da55f9 100644
--- a/lib/proto/extensions.dart
+++ b/lib/proto/extensions.dart
@@ -23,4 +23,7 @@ extension MessageExt on proto.Message {
   String get uniqueIdString => base64UrlNoPadEncode(uniqueIdBytes);
+  static int compareTimestamp(proto.Message a, proto.Message b) =>
+      a.timestamp.compareTo(b.timestamp);
diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart
index acdc6fe..985b11f 100644
--- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart
+++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart
@@ -209,8 +209,7 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
   OwnedDHTRecordPointer get recordPointer => _spine.recordPointer;
   /// Runs a closure allowing read-only access to the log
-  Future<T?> operate<T>(
-      Future<T?> Function(DHTLogReadOperations) closure) async {
+  Future<T> operate<T>(Future<T> Function(DHTLogReadOperations) closure) async {
     if (!isOpen) {
       throw StateError('log is not open"');
diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart
index 010c76e..2f97b3f 100644
--- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart
+++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart
@@ -108,6 +108,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
         elements: elements, tail: _tail, count: _count, follow: _follow)));
+  // Tail is one past the last element to load
   Future<AsyncValue<IList<DHTLogElementState<T>>>> loadElements(
       int tail, int count,
       {bool forceRefresh = false}) async {
@@ -184,8 +185,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
     await super.close();
-  Future<R?> operate<R>(
-      Future<R?> Function(DHTLogReadOperations) closure) async {
+  Future<R> operate<R>(Future<R> Function(DHTLogReadOperations) closure) async {
     await _initWait();
     return _log.operate(closure);
diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart
index 4d5b9dd..8bcd146 100644
--- a/packages/veilid_support/lib/src/table_db_array.dart
+++ b/packages/veilid_support/lib/src/table_db_array.dart
@@ -662,4 +662,46 @@ extension TableDBArrayExt on TableDBArray {
           T Function(List<int>) fromBuffer, int start, [int? end]) =>
       getRange(start, end ?? _length)
           .then((out) => out.map(fromBuffer).toList());
+  /// Convenience function:
+  /// Like add but for a JSON value
+  Future<void> addJson<T>(T value) async => add(jsonEncodeBytes(value));
+  /// Convenience function:
+  /// Like add but for a Protobuf value
+  Future<void> addProtobuf<T extends GeneratedMessage>(T value) =>
+      add(value.writeToBuffer());
+  /// Convenience function:
+  /// Like addAll but for a JSON value
+  Future<void> addAllJson<T>(List<T> values) async =>
+      addAll(values.map(jsonEncodeBytes).toList());
+  /// Convenience function:
+  /// Like addAll but for a Protobuf value
+  Future<void> addAllProtobuf<T extends GeneratedMessage>(
+          List<T> values) async =>
+      addAll(values.map((x) => x.writeToBuffer()).toList());
+  /// Convenience function:
+  /// Like insert but for a JSON value
+  Future<void> insertJson<T>(int pos, T value) async =>
+      insert(pos, jsonEncodeBytes(value));
+  /// Convenience function:
+  /// Like insert but for a Protobuf value
+  Future<void> insertProtobuf<T extends GeneratedMessage>(
+          int pos, T value) async =>
+      insert(pos, value.writeToBuffer());
+  /// Convenience function:
+  /// Like insertAll but for a JSON value
+  Future<void> insertAllJson<T>(int pos, List<T> values) async =>
+      insertAll(pos, values.map(jsonEncodeBytes).toList());
+  /// Convenience function:
+  /// Like insertAll but for a Protobuf value
+  Future<void> insertAllProtobuf<T extends GeneratedMessage>(
+          int pos, List<T> values) async =>
+      insertAll(pos, values.map((x) => x.writeToBuffer()).toList());
diff --git a/pubspec.lock b/pubspec.lock
index c6e754b..8a70f22 100644
--- a/pubspec.lock
+++ b/pubspec.lock
@@ -1219,6 +1219,15 @@ packages:
       url: "https://pub.dev"
     source: hosted
     version: "2.0.0"
+  sorted_list:
+    dependency: "direct main"
+    description:
+      path: "."
+      ref: main
+      resolved-ref: "090eb9be48ab85ff064a0a1d8175b4a72d79b139"
+      url: "https://gitlab.com/veilid/dart-sorted-list-improved.git"
+    source: git
+    version: "1.0.0"
     dependency: transitive
diff --git a/pubspec.yaml b/pubspec.yaml
index 1cc893f..133d482 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -69,6 +69,10 @@ dependencies:
   share_plus: ^9.0.0
   shared_preferences: ^2.2.3
   signal_strength_indicator: ^0.4.1
+  sorted_list:
+    git:
+      url: https://gitlab.com/veilid/dart-sorted-list-improved.git
+      ref: main
   split_view: ^3.2.1
   stack_trace: ^1.11.1
   stream_transform: ^2.1.0