diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 6e02c6f1..b57b7a4d 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -582,13 +582,6 @@ impl StorageManager { ) -> VeilidAPIResult { let inner = self.lock().await?; - // Rewrite subkey range if empty to full - let subkeys = if subkeys.is_empty() { - ValueSubkeyRangeSet::full() - } else { - subkeys - }; - // Get the safety selection and the writer we opened this record // and whatever active watch id and watch node we may have in case this is a watch update let (safety_selection, opt_writer, opt_watch_id, opt_watch_node) = { @@ -603,6 +596,24 @@ impl StorageManager { ) }; + // Rewrite subkey range if empty to full + let subkeys = if subkeys.is_empty() { + ValueSubkeyRangeSet::full() + } else { + subkeys + }; + + // Get the schema so we can truncate the watch to the number of subkeys + let schema = if let Some(lrs) = inner.local_record_store.as_ref() { + let Some(schema) = lrs.peek_record(key, |r| r.schema()) else { + apibail_generic!("no local record found"); + }; + schema + } else { + apibail_not_initialized!(); + }; + let subkeys = schema.truncate_subkeys(&subkeys, None); + // Get rpc processor and drop mutex so we don't block while requesting the watch from the network let Some(rpc_processor) = Self::online_ready_inner(&inner) else { apibail_try_again!("offline, try again later"); diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 426367e8..4f98d959 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -203,6 +203,35 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): await rc.close_dht_record(key) await rc.delete_dht_record(key) +@pytest.mark.asyncio +async def test_watch_dht_values(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + rec = await rc.create_dht_record(veilid.DHTSchema.dflt(10)) + + vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") + assert vd == None + + ts = await rc.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF) + assert ts != 0 + + vd = await rc.set_dht_value(rec.key, 3, b"BLAH") + assert vd == None + + all_gone = await rc.cancel_dht_watch(rec.key, [(0, 2)]) + assert all_gone == True + + vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") + assert vd == None + + all_gone = await rc.cancel_dht_watch(rec.key, [(3, 9)]) + assert all_gone == False + + vd = await rc.set_dht_value(rec.key, 3, b"BLAH") + assert vd == None + + await rc.close_dht_record(rec.key) + await rc.delete_dht_record(rec.key) @pytest.mark.asyncio async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 300f2286..ee29f592 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -663,7 +663,7 @@ class _JsonRoutingContext(RoutingContext): rc_op=RoutingContextOperation.WATCH_DHT_VALUES, key=key, subkeys=subkeys, - expiration=expiration, + expiration=str(expiration), count=count, ) )