From 5de4d801514c545d90ed270d5e8821acd53ffd6b Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 12 Apr 2025 13:01:51 -0400 Subject: [PATCH] fix fanout --- veilid-core/src/rpc_processor/fanout/fanout_call.rs | 2 +- veilid-core/src/rpc_processor/fanout/fanout_queue.rs | 5 ++++- veilid-core/src/veilid_api/debug.rs | 2 +- veilid-python/tests/test_dht.py | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/veilid-core/src/rpc_processor/fanout/fanout_call.rs b/veilid-core/src/rpc_processor/fanout/fanout_call.rs index b242d5e0..069a464b 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_call.rs @@ -260,7 +260,7 @@ impl<'a> FanoutCall<'a> { loop { // Put in a work request { - let context_locked = context.lock(); + let mut context_locked = context.lock(); context_locked .fanout_queue .request_work(work_sender.clone()); diff --git a/veilid-core/src/rpc_processor/fanout/fanout_queue.rs b/veilid-core/src/rpc_processor/fanout/fanout_queue.rs index ee5628fd..d82b63d1 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_queue.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_queue.rs @@ -72,8 +72,11 @@ impl<'a> FanoutQueue<'a> { /// Ask for more work when some is ready /// When work is ready it will be sent to work_sender so it can be received /// by the worker - pub fn request_work(&self, work_sender: flume::Sender) { + pub fn request_work(&mut self, work_sender: flume::Sender) { let _ = self.sender.send(work_sender); + + // Send whatever work is available immediately + self.send_more_work(); } /// Add new nodes to a filtered and sorted list of fanout candidates diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index c6df3800..fdc46c63 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -2120,7 +2120,7 @@ RPC Operations: appreply [#id] - Reply to an 'App Call' RPC received by this node DHT Operations: - record list - display the dht records in the store + record list - display the dht records in the store purge [bytes] - clear all dht records optionally down to some total size create [ []] - create a new dht record open [+] [] - open an existing dht record diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 493e8350..b62197cb 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -86,8 +86,8 @@ async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): vd4 = await rc.get_dht_value(rec.key, ValueSubkey(1), False) assert vd4 is None - print("vd2: {}", vd2.__dict__) - print("vd3: {}", vd3.__dict__) + #print("vd2: {}", vd2.__dict__) + #print("vd3: {}", vd3.__dict__) assert vd2 == vd3