integration test for dht records

This commit is contained in:
Christien Rioux 2024-04-28 12:42:13 -04:00
parent 612921a34d
commit eba349d22c
6 changed files with 92 additions and 23 deletions

View File

@ -50,6 +50,7 @@ impl StorageManager {
let reclaimed = local_record_store let reclaimed = local_record_store
.reclaim_space(reclaim.unwrap_or(usize::MAX)) .reclaim_space(reclaim.unwrap_or(usize::MAX))
.await; .await;
inner.offline_subkey_writes.clear();
format!("Local records purged: reclaimed {} bytes", reclaimed) format!("Local records purged: reclaimed {} bytes", reclaimed)
} }
pub(crate) async fn purge_remote_records(&self, reclaim: Option<usize>) -> String { pub(crate) async fn purge_remote_records(&self, reclaim: Option<usize>) -> String {

View File

@ -40,14 +40,20 @@ impl StorageManager {
}; };
let Ok(get_result) = get_result else { let Ok(get_result) = get_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
// drop this one
written_subkeys.insert(subkey);
continue; continue;
}; };
let Some(value) = get_result.opt_value else { let Some(value) = get_result.opt_value else {
log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey);
// drop this one
written_subkeys.insert(subkey);
continue; continue;
}; };
let Some(descriptor) = get_result.opt_descriptor else { let Some(descriptor) = get_result.opt_descriptor else {
log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey);
// drop this one
written_subkeys.insert(subkey);
continue; continue;
}; };
log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len()); log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len());

View File

@ -21,16 +21,16 @@ class VeilidTestConnectionError(Exception):
@cache @cache
def server_info() -> tuple[str, int]: def server_info(subindex: int = 0) -> tuple[str, int]:
"""Return the hostname and port of the test server.""" """Return the hostname and port of the test server."""
VEILID_SERVER_NETWORK = os.getenv("VEILID_SERVER_NETWORK") VEILID_SERVER_NETWORK = os.getenv("VEILID_SERVER_NETWORK")
if VEILID_SERVER_NETWORK is None: if VEILID_SERVER_NETWORK is None:
return "localhost", 5959 return "localhost", 5959 + subindex
hostname, *rest = VEILID_SERVER_NETWORK.split(":") hostname, *rest = VEILID_SERVER_NETWORK.split(":")
if rest: if rest:
return hostname, int(rest[0]) return hostname, int(rest[0]) + subindex
return hostname, 5959 return hostname, 5959 + subindex
def ipc_path_exists(path: str) -> bool: def ipc_path_exists(path: str) -> bool:
"""Determine if an IPC socket exists in a platform independent way.""" """Determine if an IPC socket exists in a platform independent way."""
@ -42,42 +42,42 @@ def ipc_path_exists(path: str) -> bool:
return os.path.exists(path) return os.path.exists(path)
@cache @cache
def ipc_info() -> str: def ipc_info(subindex: int = 0) -> str:
"""Return the path of the ipc socket of the test server.""" """Return the path of the ipc socket of the test server."""
VEILID_SERVER_IPC = os.getenv("VEILID_SERVER_IPC") VEILID_SERVER_IPC = os.getenv("VEILID_SERVER_IPC")
if VEILID_SERVER_IPC is not None: if VEILID_SERVER_IPC is not None:
return VEILID_SERVER_IPC return VEILID_SERVER_IPC
if os.name == 'nt': if os.name == 'nt':
return '\\\\.\\PIPE\\veilid-server\\0' return f'\\\\.\\PIPE\\veilid-server\\{subindex}'
ipc_0_path = "/var/db/veilid-server/ipc/0" ipc_path = f"/var/db/veilid-server/ipc/{subindex}"
if os.path.exists(ipc_0_path): if os.path.exists(ipc_path):
return ipc_0_path return ipc_path
# hack to deal with rust's 'directories' crate case-inconsistency # hack to deal with rust's 'directories' crate case-inconsistency
if sys.platform.startswith('darwin'): if sys.platform.startswith('darwin'):
data_dir = appdirs.user_data_dir("org.Veilid.Veilid") data_dir = appdirs.user_data_dir("org.Veilid.Veilid")
else: else:
data_dir = appdirs.user_data_dir("veilid","veilid") data_dir = appdirs.user_data_dir("veilid","veilid")
ipc_0_path = os.path.join(data_dir, "ipc", "0") ipc_path = os.path.join(data_dir, "ipc", str(subindex))
return ipc_0_path return ipc_path
async def api_connector(callback: Callable) -> _JsonVeilidAPI: async def api_connector(callback: Callable, subindex: int = 0) -> _JsonVeilidAPI:
"""Return an API connection if possible. """Return an API connection if possible.
If the connection fails due to an inability to connect to the If the connection fails due to an inability to connect to the
server's socket, raise an easy-to-catch VeilidTestConnectionError. server's socket, raise an easy-to-catch VeilidTestConnectionError.
""" """
ipc_path = ipc_info() ipc_path = ipc_info(subindex)
hostname, port = server_info()
try: try:
if ipc_path_exists(ipc_path): if ipc_path_exists(ipc_path):
return await veilid.json_api_connect_ipc(ipc_path, callback) return await veilid.json_api_connect_ipc(ipc_path, callback)
else: else:
hostname, port = server_info(subindex)
return await veilid.json_api_connect(hostname, port, callback) return await veilid.json_api_connect(hostname, port, callback)
except OSError as exc: except OSError as exc:
# This is a little goofy. The underlying Python library handles # This is a little goofy. The underlying Python library handles

View File

@ -4,6 +4,7 @@ import veilid
import pytest import pytest
import asyncio import asyncio
import json import json
import os
from . import * from . import *
from .api import VeilidTestConnectionError, api_connector from .api import VeilidTestConnectionError, api_connector
@ -341,3 +342,64 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
await rc.close_dht_record(rec.key) await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key) await rc.delete_dht_record(rec.key)
@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running")
@pytest.mark.asyncio
async def test_dht_integration_writer_reader():
async def null_update_callback(update: veilid.VeilidUpdate):
pass
try:
api0 = await api_connector(null_update_callback, 0)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server 0.")
return
try:
api1 = await api_connector(null_update_callback, 1)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server 1.")
return
async with api0, api1:
# purge local and remote record stores to ensure we start fresh
await api0.debug("record purge local")
await api0.debug("record purge remote")
await api1.debug("record purge local")
await api1.debug("record purge remote")
# make routing contexts
rc0 = await api0.new_routing_context()
rc1 = await api1.new_routing_context()
async with rc0, rc1:
COUNT = 10
TEST_DATA = b"test data"
# write dht records on server 0
records = []
schema = veilid.DHTSchema.dflt(1)
print(f'writing {COUNT} records')
for n in range(COUNT):
desc = await rc0.create_dht_record(schema)
records.append(desc)
await rc0.set_dht_value(desc.key, 0, TEST_DATA)
await rc0.close_dht_record(desc.key)
print(f' {n}')
# read dht records on server 1
print(f'reading {COUNT} records')
n=0
for desc0 in records:
desc1 = await rc1.open_dht_record(desc0.key)
vd1 = await rc1.get_dht_value(desc1.key, 0)
assert vd1.data == TEST_DATA
await rc1.close_dht_record(desc1.key)
print(f' {n}')
n+=1

View File

@ -69,7 +69,7 @@ class RoutingContext(ABC):
@abstractmethod @abstractmethod
async def get_dht_value( async def get_dht_value(
self, key: types.TypedKey, subkey: types.ValueSubkey, force_refresh: bool self, key: types.TypedKey, subkey: types.ValueSubkey, force_refresh: bool = False
) -> Optional[types.ValueData]: ) -> Optional[types.ValueData]:
pass pass
@ -84,8 +84,8 @@ class RoutingContext(ABC):
self, self,
key: types.TypedKey, key: types.TypedKey,
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]], subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
expiration: types.Timestamp, expiration: types.Timestamp = 0,
count: int, count: int = 0xFFFFFFFF,
) -> types.Timestamp: ) -> types.Timestamp:
pass pass
@ -102,7 +102,7 @@ class RoutingContext(ABC):
self, self,
key: types.TypedKey, key: types.TypedKey,
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]], subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
scope: types.DHTReportScope, scope: types.DHTReportScope = types.DHTReportScope.LOCAL,
) -> types.DHTRecordReport: ) -> types.DHTRecordReport:
pass pass

View File

@ -619,7 +619,7 @@ class _JsonRoutingContext(RoutingContext):
) )
async def get_dht_value( async def get_dht_value(
self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool = False
) -> Optional[ValueData]: ) -> Optional[ValueData]:
ret = raise_api_result( ret = raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(
@ -655,8 +655,8 @@ class _JsonRoutingContext(RoutingContext):
self, self,
key: TypedKey, key: TypedKey,
subkeys: list[tuple[ValueSubkey, ValueSubkey]], subkeys: list[tuple[ValueSubkey, ValueSubkey]],
expiration: Timestamp, expiration: Timestamp = 0,
count: int, count: int = 0xFFFFFFFF,
) -> Timestamp: ) -> Timestamp:
return Timestamp( return Timestamp(
raise_api_result( raise_api_result(
@ -691,7 +691,7 @@ class _JsonRoutingContext(RoutingContext):
self, self,
key: TypedKey, key: TypedKey,
subkeys: list[tuple[ValueSubkey, ValueSubkey]], subkeys: list[tuple[ValueSubkey, ValueSubkey]],
scope: DHTReportScope, scope: DHTReportScope = DHTReportScope.LOCAL,
) -> DHTRecordReport: ) -> DHTRecordReport:
return DHTRecordReport.from_json( return DHTRecordReport.from_json(
raise_api_result( raise_api_result(