diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 1cfe69df..02187f24 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1114,8 +1114,8 @@ impl NetworkManager { connection_descriptor: ConnectionDescriptor, // the connection descriptor used reporting_peer: NodeRef, // the peer's noderef reporting the socket address ) { - // debug code - //info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); + #[cfg(feature = "verbose-tracing")] + debug!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); // Ignore these reports if we are currently detecting public dial info let net = self.net(); diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 49685b12..8774f5ad 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -31,7 +31,8 @@ impl NetworkManager { cm => (cm, target_node_ref.clone(), false), }; - info!( + #[cfg(feature = "verbose-tracing")] + debug!( "ContactMethod: {:?} for {:?}", contact_method, target_node_ref ); @@ -247,10 +248,12 @@ impl NetworkManager { // First try to send data to the last socket we've seen this peer on let data = if let Some(connection_descriptor) = node_ref.last_connection() { - info!( + #[cfg(feature = "verbose-tracing")] + debug!( "ExistingConnection: {:?} for {:?}", connection_descriptor, node_ref ); + match self .net() .send_data_to_existing_connection(connection_descriptor, data) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index d2ffaa44..eea3f982 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1134,7 +1134,8 @@ impl RPCProcessor { let op_id = operation.op_id(); // Log rpc send - trace!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest); + #[cfg(feature = "verbose-tracing")] + debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -1228,7 +1229,8 @@ impl RPCProcessor { let operation = RPCOperation::new_statement(statement, spi); // Log rpc send - info!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); + #[cfg(feature = "verbose-tracing")] + debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -1305,7 +1307,8 @@ impl RPCProcessor { let operation = RPCOperation::new_answer(&request.operation, answer, spi); // Log rpc send - trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); + #[cfg(feature = "verbose-tracing")] + debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { diff --git a/veilid-python/tests/conftest.py b/veilid-python/tests/conftest.py index 6cabbd26..4306ed8e 100644 --- a/veilid-python/tests/conftest.py +++ b/veilid-python/tests/conftest.py @@ -22,7 +22,7 @@ def server_info() -> tuple[str, int]: return hostname, 5959 -async def simple_update_callback(update: veilid.VeilidUpdate): +async def simple_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): print(f"VeilidUpdate: {update}") diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py index 00f5e714..3dd868aa 100644 --- a/veilid-python/tests/test_routing_context.py +++ b/veilid-python/tests/test_routing_context.py @@ -2,6 +2,7 @@ import asyncio import random +import sys import pytest import veilid @@ -29,7 +30,7 @@ async def test_routing_context_app_message_loopback(): # Seriously, mypy? app_message_queue: asyncio.Queue = asyncio.Queue() - async def app_message_queue_update_callback(update: veilid.VeilidUpdate): + async def app_message_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: await app_message_queue.put(update) @@ -68,7 +69,7 @@ async def test_routing_context_app_message_loopback(): async def test_routing_context_app_call_loopback(): app_call_queue: asyncio.Queue = asyncio.Queue() - async def app_call_queue_update_callback(update: veilid.VeilidUpdate): + async def app_call_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_CALL: await app_call_queue.put(update) @@ -119,11 +120,11 @@ async def test_routing_context_app_message_loopback_big_packets(): global got_message got_message = 0 - async def app_message_queue_update_callback(update: veilid.VeilidUpdate): + async def app_message_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: global got_message got_message += 1 - print("got {}".format(got_message)) + sys.stdout.write("{} ".format(got_message)) await app_message_queue.put(update) sent_messages: set[bytes] = set() @@ -165,3 +166,45 @@ async def test_routing_context_app_message_loopback_big_packets(): assert isinstance(update.detail, veilid.VeilidAppMessage) assert update.detail.message in sent_messages + +@pytest.mark.asyncio +async def test_routing_context_app_call_loopback_big_packets(): + + print("") + + global got_message + got_message = 0 + async def app_message_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): + if update.kind == veilid.VeilidUpdateKind.APP_CALL: + global got_message + got_message += 1 + sys.stdout.write("{} ".format(got_message)) + sys.stdout.flush() + await api.app_call_reply(update.detail.call_id, update.detail.message) + + hostname, port = server_info() + api = await veilid.json_api_connect( + hostname, port, app_message_queue_update_callback + ) + async with api: + # purge routes to ensure we start fresh + await api.debug("purge routes") + + # make a routing context that uses a safety route + rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) + async with rc: + + # make a new local private route + prl, blob = await api.new_private_route() + + # import it as a remote route as well so we can send to it + prr = await api.import_remote_private_route(blob) + + # do this test 100 times + for _ in range(100): + + # send a random sized random app message to our own private route + message = random.randbytes(random.randint(0, 32768)) + out_message = await rc.app_call(prr, message) + + assert message == out_message diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 6881395a..710ff125 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -51,7 +51,7 @@ _VALIDATOR_RECV_MESSAGE = _get_schema_validator( class _JsonVeilidAPI(VeilidAPI): reader: Optional[asyncio.StreamReader] writer: Optional[asyncio.StreamWriter] - update_callback: Callable[[VeilidUpdate], Awaitable] + update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable] handle_recv_messages_task: Optional[asyncio.Task] validate_schema: bool done: bool @@ -64,7 +64,7 @@ class _JsonVeilidAPI(VeilidAPI): self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, - update_callback: Callable[[VeilidUpdate], Awaitable], + update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable], validate_schema: bool = True, ): self.reader = reader @@ -115,7 +115,7 @@ class _JsonVeilidAPI(VeilidAPI): @classmethod async def connect( - cls, host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable] + cls, host: str, port: int, update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable] ) -> Self: reader, writer = await asyncio.open_connection(host, port) veilid_api = cls(reader, writer, update_callback) @@ -155,7 +155,7 @@ class _JsonVeilidAPI(VeilidAPI): if j["type"] == "Response": await self.handle_recv_message_response(j) elif j["type"] == "Update": - await self.update_callback(VeilidUpdate.from_json(j)) + await self.update_callback(self, VeilidUpdate.from_json(j)) finally: await self._cleanup_close() @@ -1162,6 +1162,6 @@ class _JsonCryptoSystem(CryptoSystem): async def json_api_connect( - host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable] + host: str, port: int, update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable] ) -> _JsonVeilidAPI: return await _JsonVeilidAPI.connect(host, port, update_callback)