From fa8aa45527f5148d4ee52bcbea6a87f881a548c6 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 11 Dec 2025 20:06:28 -0600 Subject: [PATCH] test improvements --- veilid-python/tests/test_dht_transactions.py | 98 +++++++++++--------- 1 file changed, 52 insertions(+), 46 deletions(-) diff --git a/veilid-python/tests/test_dht_transactions.py b/veilid-python/tests/test_dht_transactions.py index 6fdd0be1..97158da0 100644 --- a/veilid-python/tests/test_dht_transactions.py +++ b/veilid-python/tests/test_dht_transactions.py @@ -637,6 +637,8 @@ async def test_dht_transaction_write_read_full_parallel_local(): COUNT = 48 # Number of subkeys per record SUBKEY_COUNT = 32 + # Number of subkeys to batch + SUBKEY_BATCH = 4 # BareNonce to encrypt test data NONCE = Nonce.from_bytes(b"A"*await cs.nonce_length()) # Secret to encrypt test data @@ -649,6 +651,7 @@ async def test_dht_transaction_write_read_full_parallel_local(): records : list[DHTRecordDescriptor] = [] subkey_data_list : list[bytes] = [] schema = DHTSchema.dflt(SUBKEY_COUNT) + print(f'writing {COUNT} records with full subkeys') for n in range(COUNT): desc = await rc0.create_dht_record(kind, schema) @@ -660,36 +663,37 @@ async def test_dht_transaction_write_read_full_parallel_local(): subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET) subkey_data_list.append(subkey_data) - start = time.time() + t1start = time.time() transaction = await api0.transact_dht_records([x.key for x in records], None) - print(f'transaction begin: {time.time()-start}') + print(f'transaction begin: {time.time()-t1start}') - init_set_futures : set[Coroutine[Any, Any, ValueData | None]] = set() + for i in range(0,SUBKEY_COUNT, SUBKEY_BATCH): + init_set_futures : set[Coroutine[Any, Any, ValueData | None]] = set() + for j in range(SUBKEY_BATCH): + subkey = ValueSubkey(i+j) + for n in range(COUNT): + key = records[n].key + subkey_data = subkey_data_list[n] - for i in range(SUBKEY_COUNT): - for n in range(COUNT): - key = records[n].key - subkey_data = subkey_data_list[n] + async def setter(key: RecordKey, subkey: ValueSubkey, data: bytes): + # start = time.time() + cnt = 0 + while True: + try: + await transaction.set(key, subkey, data) + break + except veilid.VeilidAPIErrorTryAgain: + cnt += 1 + print(f' retry #{cnt} setting {key} #{subkey}') + continue + # print(f'set {key} #{subkey}: {time.time()-start}') - async def setter(key: RecordKey, subkey: ValueSubkey, data: bytes): - start = time.time() - cnt = 0 - while True: - try: - await transaction.set(key, subkey, data) - break - except veilid.VeilidAPIErrorTryAgain: - cnt += 1 - print(f' retry #{cnt} setting {key} #{subkey}') - continue - print(f'set {key} #{subkey}: {time.time()-start}') + init_set_futures.add(setter(key, subkey, subkey_data)) - init_set_futures.add(setter(key, ValueSubkey(i), subkey_data)) - - # Update all subkeys for all records simultaneously - start = time.time() - await asyncio.gather(*init_set_futures) - print(f'transaction set subkeys: {time.time()-start}') + # Update all subkeys for all records simultaneously + start = time.time() + await asyncio.gather(*init_set_futures) + print(f'transaction set subkeys {i}-{i+SUBKEY_BATCH-1}: {time.time()-start}') start = time.time() await transaction.commit() @@ -707,35 +711,37 @@ async def test_dht_transaction_write_read_full_parallel_local(): for desc in records: await rc0.open_dht_record(desc.key) - start = time.time() + t2start = time.time() transaction = await api0.transact_dht_records([x.key for x in records], None) - print(f'transaction begin: {time.time()-start}') + print(f'transaction begin: {time.time()-t2start}') - init_get_futures : set[Coroutine[Any, Any, tuple[RecordKey, ValueSubkey, bytes, ValueData | None]]] = set() - for i in range(SUBKEY_COUNT): - subkey = ValueSubkey(i) + for i in range(0,SUBKEY_COUNT, SUBKEY_BATCH): + init_get_futures : set[Coroutine[Any, Any, tuple[RecordKey, ValueSubkey, bytes, ValueData | None]]] = set() + for j in range(SUBKEY_BATCH): + subkey = ValueSubkey(i+j) - for n in range(COUNT): - key = records[n].key - subkey_data = subkey_data_list[n] + for n in range(COUNT): + key = records[n].key + subkey_data = subkey_data_list[n] - async def getter(key: RecordKey, subkey: ValueSubkey, check_data: bytes): - start = time.time() - out = (key, subkey, check_data, await transaction.get(key, subkey)) - print(f'get {key} #{subkey}: {time.time()-start}') - return out + async def getter(key: RecordKey, subkey: ValueSubkey, check_data: bytes): + #start = time.time() + out = (key, subkey, check_data, await transaction.get(key, subkey)) + # print(f'get {key} #{subkey}: {time.time()-start}') + return out - init_get_futures.add(getter(key, subkey, subkey_data)) + init_get_futures.add(getter(key, subkey, subkey_data)) - # Update each subkey for each record in parallel - # This ensures that each record gets its own expiration update - start = time.time() - get_results = await asyncio.gather(*init_get_futures) - for key, sk, check_data, vd in get_results: - assert vd is not None and vd.data == check_data - print(f'transaction get subkeys: {time.time()-start}') + # Update each subkey for each record in parallel + # This ensures that each record gets its own expiration update + start = time.time() + get_results = await asyncio.gather(*init_get_futures) + for key, sk, check_data, vd in get_results: + assert vd is not None and vd.data == check_data + print(f'transaction get subkeys {i}-{i+SUBKEY_BATCH-1}: {time.time()-start}') await transaction.rollback() + print(f'done: {time.time()-t1start}') for desc in records: await rc0.close_dht_record(desc.key)