This commit is contained in:
Christien Rioux 2024-02-01 09:41:58 -05:00
parent cd5d10ec1f
commit d3fb4b5b6a
3 changed files with 33 additions and 20 deletions

View file

@ -282,7 +282,9 @@ class DHTRecord {
} }
Future<StreamSubscription<VeilidUpdateValueChange>> listen( Future<StreamSubscription<VeilidUpdateValueChange>> listen(
Future<void> Function(VeilidUpdateValueChange update) onUpdate, Future<void> Function(
DHTRecord record, Uint8List data, List<ValueSubkeyRange> subkeys)
onUpdate,
) async { ) async {
// Set up watch requirements // Set up watch requirements
watchController ??= watchController ??=
@ -293,7 +295,12 @@ class DHTRecord {
return watchController!.stream.listen( return watchController!.stream.listen(
(update) { (update) {
Future.delayed(Duration.zero, () => onUpdate(update)); Future.delayed(Duration.zero, () async {
final out = await _crypto.decrypt(
update.valueData.data, update.subkeys.first.low);
await onUpdate(this, out, update.subkeys);
});
}, },
cancelOnError: true, cancelOnError: true,
onError: (e) async { onError: (e) async {

View file

@ -9,7 +9,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecordCubit({ DHTRecordCubit({
required Future<DHTRecord> Function() open, required Future<DHTRecord> Function() open,
required Future<T?> Function(DHTRecord) initialStateFunction, required Future<T?> Function(DHTRecord) initialStateFunction,
required Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, ValueData) required Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List)
stateFunction, stateFunction,
}) : _wantsCloseRecord = false, }) : _wantsCloseRecord = false,
super(const AsyncValue.loading()) { super(const AsyncValue.loading()) {
@ -24,7 +24,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecordCubit.value({ DHTRecordCubit.value({
required DHTRecord record, required DHTRecord record,
required Future<T?> Function(DHTRecord) initialStateFunction, required Future<T?> Function(DHTRecord) initialStateFunction,
required Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, ValueData) required Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List)
stateFunction, stateFunction,
}) : _record = record, }) : _record = record,
_wantsCloseRecord = false, _wantsCloseRecord = false,
@ -36,7 +36,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
Future<void> _init( Future<void> _init(
Future<T?> Function(DHTRecord) initialStateFunction, Future<T?> Function(DHTRecord) initialStateFunction,
Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, ValueData) Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List)
stateFunction, stateFunction,
) async { ) async {
// Make initial state update // Make initial state update
@ -49,10 +49,9 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
emit(AsyncValue.error(e)); emit(AsyncValue.error(e));
} }
_subscription = await _record.listen((update) async { _subscription = await _record.listen((record, data, subkeys) async {
try { try {
final newState = final newState = await stateFunction(record, subkeys, data);
await stateFunction(_record, update.subkeys, update.valueData);
if (newState != null) { if (newState != null) {
emit(AsyncValue.data(newState)); emit(AsyncValue.data(newState));
} }
@ -73,6 +72,16 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
await super.close(); await super.close();
} }
Future<void> refresh(List<ValueSubkeyRange> subkeys) async {
for (final skr in subkeys) {
for (var sk = skr.low; sk <= skr.high; sk++) {
final data = await _record.get(subkey: sk, forceRefresh: true);
final newState = await stateFunction(_record, subkeys, data);
xxx continue here
}
}
}
StreamSubscription<VeilidUpdateValueChange>? _subscription; StreamSubscription<VeilidUpdateValueChange>? _subscription;
late DHTRecord _record; late DHTRecord _record;
bool _wantsCloseRecord; bool _wantsCloseRecord;
@ -105,9 +114,9 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
return decodeState(initialData); return decodeState(initialData);
}; };
static Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, ValueData) static Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, Uint8List)
_makeStateFunction<T>(T Function(List<int> data) decodeState) => _makeStateFunction<T>(T Function(List<int> data) decodeState) =>
(record, subkeys, valueData) async { (record, subkeys, updatedata) async {
final defaultSubkey = record.subkeyOrDefault(-1); final defaultSubkey = record.subkeyOrDefault(-1);
if (subkeys.containsSubkey(defaultSubkey)) { if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data; final Uint8List data;
@ -119,7 +128,7 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
} }
data = maybeData; data = maybeData;
} else { } else {
data = valueData.data; data = updatedata;
} }
final newState = decodeState(data); final newState = decodeState(data);
return newState; return newState;
@ -127,6 +136,6 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
return null; return null;
}; };
xxx add refresh/get mechanism to DHTRecordCubit and here too, then propagage to conversation_cubit // xxx add refresh/get mechanism to DHTRecordCubit and here too, then propagage to conversation_cubit
xxx should just be a 'get' like in dht_short_array_cubit // xxx should just be a 'get' like in dht_short_array_cubit
} }

View file

@ -721,21 +721,18 @@ class DHTShortArray {
} }
// Called when a head or linked record changes // Called when a head or linked record changes
Future<void> _onUpdateRecord(VeilidUpdateValueChange update) async { Future<void> _onUpdateRecord(
final record = _head.linkedRecords.firstWhere( DHTRecord record, Uint8List data, List<ValueSubkeyRange> subkeys) async {
(element) => element.key == update.key,
orElse: () => _headRecord);
// If head record subkey zero changes, then the layout // If head record subkey zero changes, then the layout
// of the dhtshortarray has changed // of the dhtshortarray has changed
var updateHead = false; var updateHead = false;
if (record == _headRecord && update.subkeys.containsSubkey(0)) { if (record == _headRecord && subkeys.containsSubkey(0)) {
updateHead = true; updateHead = true;
} }
// If we have any other subkeys to update, do them first // If we have any other subkeys to update, do them first
final unord = List<Future<Uint8List?>>.empty(growable: true); final unord = List<Future<Uint8List?>>.empty(growable: true);
for (final skr in update.subkeys) { for (final skr in subkeys) {
for (var subkey = skr.low; subkey <= skr.high; subkey++) { for (var subkey = skr.low; subkey <= skr.high; subkey++) {
// Skip head subkey // Skip head subkey
if (subkey == 0) { if (subkey == 0) {