test improvements

This commit is contained in:
Christien Rioux 2025-12-11 20:06:28 -06:00
parent 8eff68570b
commit fa8aa45527

View file

@ -637,6 +637,8 @@ async def test_dht_transaction_write_read_full_parallel_local():
COUNT = 48
# Number of subkeys per record
SUBKEY_COUNT = 32
# Number of subkeys to batch
SUBKEY_BATCH = 4
# BareNonce to encrypt test data
NONCE = Nonce.from_bytes(b"A"*await cs.nonce_length())
# Secret to encrypt test data
@ -649,6 +651,7 @@ async def test_dht_transaction_write_read_full_parallel_local():
records : list[DHTRecordDescriptor] = []
subkey_data_list : list[bytes] = []
schema = DHTSchema.dflt(SUBKEY_COUNT)
print(f'writing {COUNT} records with full subkeys')
for n in range(COUNT):
desc = await rc0.create_dht_record(kind, schema)
@ -660,36 +663,37 @@ async def test_dht_transaction_write_read_full_parallel_local():
subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET)
subkey_data_list.append(subkey_data)
start = time.time()
t1start = time.time()
transaction = await api0.transact_dht_records([x.key for x in records], None)
print(f'transaction begin: {time.time()-start}')
print(f'transaction begin: {time.time()-t1start}')
init_set_futures : set[Coroutine[Any, Any, ValueData | None]] = set()
for i in range(0,SUBKEY_COUNT, SUBKEY_BATCH):
init_set_futures : set[Coroutine[Any, Any, ValueData | None]] = set()
for j in range(SUBKEY_BATCH):
subkey = ValueSubkey(i+j)
for n in range(COUNT):
key = records[n].key
subkey_data = subkey_data_list[n]
for i in range(SUBKEY_COUNT):
for n in range(COUNT):
key = records[n].key
subkey_data = subkey_data_list[n]
async def setter(key: RecordKey, subkey: ValueSubkey, data: bytes):
# start = time.time()
cnt = 0
while True:
try:
await transaction.set(key, subkey, data)
break
except veilid.VeilidAPIErrorTryAgain:
cnt += 1
print(f' retry #{cnt} setting {key} #{subkey}')
continue
# print(f'set {key} #{subkey}: {time.time()-start}')
async def setter(key: RecordKey, subkey: ValueSubkey, data: bytes):
start = time.time()
cnt = 0
while True:
try:
await transaction.set(key, subkey, data)
break
except veilid.VeilidAPIErrorTryAgain:
cnt += 1
print(f' retry #{cnt} setting {key} #{subkey}')
continue
print(f'set {key} #{subkey}: {time.time()-start}')
init_set_futures.add(setter(key, subkey, subkey_data))
init_set_futures.add(setter(key, ValueSubkey(i), subkey_data))
# Update all subkeys for all records simultaneously
start = time.time()
await asyncio.gather(*init_set_futures)
print(f'transaction set subkeys: {time.time()-start}')
# Update all subkeys for all records simultaneously
start = time.time()
await asyncio.gather(*init_set_futures)
print(f'transaction set subkeys {i}-{i+SUBKEY_BATCH-1}: {time.time()-start}')
start = time.time()
await transaction.commit()
@ -707,35 +711,37 @@ async def test_dht_transaction_write_read_full_parallel_local():
for desc in records:
await rc0.open_dht_record(desc.key)
start = time.time()
t2start = time.time()
transaction = await api0.transact_dht_records([x.key for x in records], None)
print(f'transaction begin: {time.time()-start}')
print(f'transaction begin: {time.time()-t2start}')
init_get_futures : set[Coroutine[Any, Any, tuple[RecordKey, ValueSubkey, bytes, ValueData | None]]] = set()
for i in range(SUBKEY_COUNT):
subkey = ValueSubkey(i)
for i in range(0,SUBKEY_COUNT, SUBKEY_BATCH):
init_get_futures : set[Coroutine[Any, Any, tuple[RecordKey, ValueSubkey, bytes, ValueData | None]]] = set()
for j in range(SUBKEY_BATCH):
subkey = ValueSubkey(i+j)
for n in range(COUNT):
key = records[n].key
subkey_data = subkey_data_list[n]
for n in range(COUNT):
key = records[n].key
subkey_data = subkey_data_list[n]
async def getter(key: RecordKey, subkey: ValueSubkey, check_data: bytes):
start = time.time()
out = (key, subkey, check_data, await transaction.get(key, subkey))
print(f'get {key} #{subkey}: {time.time()-start}')
return out
async def getter(key: RecordKey, subkey: ValueSubkey, check_data: bytes):
#start = time.time()
out = (key, subkey, check_data, await transaction.get(key, subkey))
# print(f'get {key} #{subkey}: {time.time()-start}')
return out
init_get_futures.add(getter(key, subkey, subkey_data))
init_get_futures.add(getter(key, subkey, subkey_data))
# Update each subkey for each record in parallel
# This ensures that each record gets its own expiration update
start = time.time()
get_results = await asyncio.gather(*init_get_futures)
for key, sk, check_data, vd in get_results:
assert vd is not None and vd.data == check_data
print(f'transaction get subkeys: {time.time()-start}')
# Update each subkey for each record in parallel
# This ensures that each record gets its own expiration update
start = time.time()
get_results = await asyncio.gather(*init_get_futures)
for key, sk, check_data, vd in get_results:
assert vd is not None and vd.data == check_data
print(f'transaction get subkeys {i}-{i+SUBKEY_BATCH-1}: {time.time()-start}')
await transaction.rollback()
print(f'done: {time.time()-t1start}')
for desc in records:
await rc0.close_dht_record(desc.key)