From d6f442d431b28032d621a0ca93871617ae6083f3 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 15 Jun 2023 20:22:54 -0400 Subject: [PATCH] better error handling --- veilid-core/src/network_manager/mod.rs | 39 ++++--- veilid-core/src/routing_table/mod.rs | 21 ++-- veilid-core/src/routing_table/node_ref.rs | 33 +++--- veilid-core/src/routing_table/privacy.rs | 18 ++- .../route_spec_store/route_spec_store.rs | 25 +++-- .../route_spec_store_content.rs | 2 +- .../src/routing_table/routing_table_inner.rs | 105 +++++++++--------- .../src/routing_table/tasks/bootstrap.rs | 104 +++++++++-------- .../routing_table/tasks/relay_management.rs | 13 ++- veilid-core/src/rpc_processor/destination.rs | 7 +- veilid-core/src/rpc_processor/mod.rs | 19 +++- veilid-core/src/veilid_api/debug.rs | 4 +- veilid-python/pyproject.toml | 2 +- veilid-python/tests/__init__.py | 12 +- veilid-python/tests/test_basic.py | 13 ++- veilid-python/tests/test_crypto.py | 34 ++++-- veilid-python/tests/test_routing_context.py | 47 ++++++++ veilid-python/update_schema.sh | 4 +- .../{veilid_python => veilid}/__init__.py | 0 .../{veilid_python => veilid}/api.py | 0 .../{veilid_python => veilid}/config.py | 12 +- .../{veilid_python => veilid}/error.py | 0 .../{veilid_python => veilid}/json_api.py | 36 +++--- .../{veilid_python => veilid}/operations.py | 0 .../schema/RecvMessage.json | 0 .../schema/Request.json | 0 .../{veilid_python => veilid}/state.py | 15 +-- .../{veilid_python => veilid}/types.py | 7 +- veilid-server/src/client_api.rs | 30 +++-- 29 files changed, 368 insertions(+), 234 deletions(-) create mode 100644 veilid-python/tests/test_routing_context.py rename veilid-python/{veilid_python => veilid}/__init__.py (100%) rename veilid-python/{veilid_python => veilid}/api.py (100%) rename veilid-python/{veilid_python => veilid}/config.py (98%) rename veilid-python/{veilid_python => veilid}/error.py (100%) rename veilid-python/{veilid_python => veilid}/json_api.py (95%) rename veilid-python/{veilid_python => veilid}/operations.py (100%) rename veilid-python/{veilid_python => veilid}/schema/RecvMessage.json (100%) rename veilid-python/{veilid_python => veilid}/schema/Request.json (100%) rename veilid-python/{veilid_python => veilid}/state.py (96%) rename veilid-python/{veilid_python => veilid}/types.py (97%) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index d11f9b73..e7abd622 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -685,12 +685,12 @@ impl NetworkManager { peer_info, false, ) { - None => { + Ok(nr) => nr, + Err(e) => { return Ok(NetworkResult::invalid_message( - "unable to register reverse connect peerinfo", - )) + format!("unable to register reverse connect peerinfo: {}", e) + )); } - Some(nr) => nr, }; // Make a reverse connection to the peer and send the receipt to it @@ -708,13 +708,12 @@ impl NetworkManager { peer_info, false, ) { - None => { + Ok(nr) => nr, + Err(e) => { return Ok(NetworkResult::invalid_message( - //sender_id, - "unable to register hole punch connect peerinfo", + format!("unable to register hole punch connect peerinfo: {}", e) )); } - Some(nr) => nr, }; // Get the udp direct dialinfo for the hole punch @@ -1103,7 +1102,7 @@ impl NetworkManager { ContactMethod::Direct(di) => NodeContactMethod::Direct(di), ContactMethod::SignalReverse(relay_key, target_key) => { let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; if !target_node_ref.node_ids().contains(&target_key) { bail!("target noderef didn't match target key"); @@ -1112,7 +1111,7 @@ impl NetworkManager { } ContactMethod::SignalHolePunch(relay_key, target_key) => { let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; if target_node_ref.node_ids().contains(&target_key) { bail!("target noderef didn't match target key"); @@ -1121,13 +1120,13 @@ impl NetworkManager { } ContactMethod::InboundRelay(relay_key) => { let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; NodeContactMethod::InboundRelay(relay_nr) } ContactMethod::OutboundRelay(relay_key) => { let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; NodeContactMethod::OutboundRelay(relay_nr) } @@ -1430,7 +1429,13 @@ impl NetworkManager { // We should, because relays are chosen by nodes that have established connectivity and // should be mutually in each others routing tables. The node needing the relay will be // pinging this node regularly to keep itself in the routing table - routing_table.lookup_node_ref(recipient_id) + match routing_table.lookup_node_ref(recipient_id) { + Ok(v) => v, + Err(e) => { + log_net!(debug "failed to look up recipient node for relay, dropping outbound relayed packet: {}" ,e); + return Ok(false); + } + } }; if let Some(relay_nr) = some_relay_nr { @@ -1472,12 +1477,12 @@ impl NetworkManager { connection_descriptor, ts, ) { - None => { + Ok(v) => v, + Err(e) => { // If the node couldn't be registered just skip this envelope, - // the error will have already been logged + log_net!(debug "failed to register node with existing connection: {}", e); return Ok(false); } - Some(v) => v, }; source_noderef.add_envelope_version(envelope.get_version()); @@ -1574,7 +1579,7 @@ impl NetworkManager { peers: { let mut out = Vec::new(); for (k, v) in routing_table.get_recent_peers() { - if let Some(nr) = routing_table.lookup_node_ref(k) { + if let Ok(Some(nr)) = routing_table.lookup_node_ref(k) { let peer_stats = nr.peer_stats(); let peer = PeerTableData { node_ids: nr.node_ids().iter().copied().collect(), diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 1d63fcb5..2223d189 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -628,14 +628,14 @@ impl RoutingTable { } /// Resolve an existing routing table entry using any crypto kind and return a reference to it - pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> Option { + pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult> { self.inner .read() .lookup_any_node_ref(self.clone(), node_id_key) } /// Resolve an existing routing table entry and return a reference to it - pub fn lookup_node_ref(&self, node_id: TypedKey) -> Option { + pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult> { self.inner.read().lookup_node_ref(self.clone(), node_id) } @@ -645,7 +645,7 @@ impl RoutingTable { node_id: TypedKey, routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, - ) -> Option { + ) -> EyreResult> { self.inner.read().lookup_and_filter_noderef( self.clone(), node_id, @@ -662,7 +662,7 @@ impl RoutingTable { routing_domain: RoutingDomain, peer_info: PeerInfo, allow_invalid: bool, - ) -> Option { + ) -> EyreResult { self.inner.write().register_node_with_peer_info( self.clone(), routing_domain, @@ -678,7 +678,7 @@ impl RoutingTable { node_id: TypedKey, descriptor: ConnectionDescriptor, timestamp: Timestamp, - ) -> Option { + ) -> EyreResult { self.inner.write().register_node_with_existing_connection( self.clone(), node_id, @@ -711,7 +711,7 @@ impl RoutingTable { // (uses same logic as send_data, ensuring last_connection works for UDP) for e in &recent_peers { let mut dead = true; - if let Some(nr) = self.lookup_node_ref(*e) { + if let Ok(Some(nr)) = self.lookup_node_ref(*e) { if let Some(last_connection) = nr.last_connection() { out.push((*e, RecentPeersEntry { last_connection })); dead = false; @@ -1017,10 +1017,11 @@ impl RoutingTable { } // Register the node if it's new - if let Some(nr) = - self.register_node_with_peer_info(RoutingDomain::PublicInternet, p, false) - { - out.push(nr); + match self.register_node_with_peer_info(RoutingDomain::PublicInternet, p, false) { + Ok(nr) => out.push(nr), + Err(e) => { + log_rtab!(debug "failed to register node with peer info from find node answer: {}", e); + } } } out diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 1f637801..5cb53c64 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -192,25 +192,24 @@ pub trait NodeRefBase: Sized { } dif } - fn relay(&self, routing_domain: RoutingDomain) -> Option { + fn relay(&self, routing_domain: RoutingDomain) -> EyreResult> { self.operate_mut(|rti, e| { - e.signed_node_info(routing_domain) - .and_then(|n| n.relay_peer_info()) - .and_then(|rpi| { - // If relay is ourselves, then return None, because we can't relay through ourselves - // and to contact this node we should have had an existing inbound connection - if rti.unlocked_inner.matches_own_node_id(rpi.node_ids()) { - return None; - } + let Some(sni) = e.signed_node_info(routing_domain) else { + return Ok(None); + }; + let Some(rpi) = sni.relay_peer_info() else { + return Ok(None); + }; + // If relay is ourselves, then return None, because we can't relay through ourselves + // and to contact this node we should have had an existing inbound connection + if rti.unlocked_inner.matches_own_node_id(rpi.node_ids()) { + bail!("Can't relay though ourselves"); + } - // Register relay node and return noderef - rti.register_node_with_peer_info( - self.routing_table(), - routing_domain, - rpi, - false, - ) - }) + // Register relay node and return noderef + let nr = + rti.register_node_with_peer_info(self.routing_table(), routing_domain, rpi, false)?; + Ok(Some(nr)) }) } diff --git a/veilid-core/src/routing_table/privacy.rs b/veilid-core/src/routing_table/privacy.rs index fc670375..66f08c57 100644 --- a/veilid-core/src/routing_table/privacy.rs +++ b/veilid-core/src/routing_table/privacy.rs @@ -37,15 +37,27 @@ impl RouteNode { match self { RouteNode::NodeId(id) => { // - routing_table.lookup_node_ref(TypedKey::new(crypto_kind, *id)) + match routing_table.lookup_node_ref(TypedKey::new(crypto_kind, *id)) { + Ok(nr) => nr, + Err(e) => { + log_rtab!(debug "failed to look up route node: {}", e); + return None; + } + } } RouteNode::PeerInfo(pi) => { // - routing_table.register_node_with_peer_info( + match routing_table.register_node_with_peer_info( RoutingDomain::PublicInternet, pi.clone(), false, - ) + ) { + Ok(nr) => Some(nr), + Err(e) => { + log_rtab!(debug "failed to register route node: {}", e); + return None; + } + } } } } diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs index 3f29ba0d..043cd1e3 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs @@ -378,7 +378,14 @@ impl RouteSpecStore { // Already seen this node, should not be in the route twice return None; } - if let Some(relay) = node.locked_mut(rti).relay(RoutingDomain::PublicInternet) { + let opt_relay = match node.locked_mut(rti).relay(RoutingDomain::PublicInternet) { + Ok(r) => r, + Err(e) => { + log_rtab!(error "failed to get relay for route node: {}", e); + return None; + } + }; + if let Some(relay) = opt_relay { let relay_id = relay.locked(rti).best_node_id(); if !seen_nodes.insert(relay_id) { // Already seen this node, should not be in the route twice @@ -869,13 +876,15 @@ impl RouteSpecStore { }; let opt_first_hop = match pr_first_hop_node { - RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table.clone(), TypedKey::new(crypto_kind, id)), - RouteNode::PeerInfo(pi) => rti.register_node_with_peer_info( - routing_table.clone(), - RoutingDomain::PublicInternet, - pi, - false, - ), + RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table.clone(), TypedKey::new(crypto_kind, id))?, + RouteNode::PeerInfo(pi) => { + Some(rti.register_node_with_peer_info( + routing_table.clone(), + RoutingDomain::PublicInternet, + pi, + false, + )?) + } }; if opt_first_hop.is_none() { // Can't reach this private route any more diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs index b193c398..9c5fbafe 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs @@ -40,7 +40,7 @@ impl RouteSpecStoreContent { // Go through best route and resolve noderefs let mut hop_node_refs = Vec::with_capacity(rsd.hops.len()); for h in &rsd.hops { - let Some(nr) = routing_table.lookup_node_ref(TypedKey::new(rsd.crypto_kind, *h)) else { + let Ok(Some(nr)) = routing_table.lookup_node_ref(TypedKey::new(rsd.crypto_kind, *h)) else { dead_ids.push(rsid.clone()); break; }; diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index eea23a83..e7b8cd05 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -651,14 +651,13 @@ impl RoutingTableInner { outer_self: RoutingTable, node_ids: &TypedKeySet, update_func: F, - ) -> Option + ) -> EyreResult where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner), { // Ensure someone isn't trying register this node itself if self.unlocked_inner.matches_own_node_id(node_ids) { - log_rtab!(debug "can't register own node"); - return None; + bail!("can't register own node"); } // Look up all bucket entries and make sure we only have zero or one @@ -688,8 +687,7 @@ impl RoutingTableInner { if let Some(best_entry) = best_entry { // Update the entry with all of the node ids if let Err(e) = self.update_bucket_entries(best_entry.clone(), node_ids) { - log_rtab!(debug "Not registering new ids for existing node: {}", e); - return None; + bail!("Not registering new ids for existing node: {}", e); } // Make a noderef to return @@ -699,7 +697,7 @@ impl RoutingTableInner { best_entry.with_mut_inner(|e| update_func(self, e)); // Return the noderef - return Some(nr); + return Ok(nr); } // If no entry exists yet, add the first entry to a bucket, possibly evicting a bucket member @@ -712,8 +710,7 @@ impl RoutingTableInner { // Update the other bucket entries with the remaining node ids if let Err(e) = self.update_bucket_entries(new_entry.clone(), node_ids) { - log_rtab!(debug "Not registering new node: {}", e); - return None; + bail!("Not registering new node: {}", e); } // Make node ref to return @@ -725,7 +722,7 @@ impl RoutingTableInner { // Kick the bucket log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS)); - Some(nr) + Ok(nr) } /// Resolve an existing routing table entry using any crypto kind and return a reference to it @@ -733,28 +730,35 @@ impl RoutingTableInner { &self, outer_self: RoutingTable, node_id_key: PublicKey, - ) -> Option { - VALID_CRYPTO_KINDS.iter().find_map(|ck| { - self.lookup_node_ref(outer_self.clone(), TypedKey::new(*ck, node_id_key)) - }) + ) -> EyreResult> { + for ck in VALID_CRYPTO_KINDS { + if let Some(nr) = + self.lookup_node_ref(outer_self.clone(), TypedKey::new(ck, node_id_key))? + { + return Ok(Some(nr)); + } + } + Ok(None) } /// Resolve an existing routing table entry and return a reference to it - pub fn lookup_node_ref(&self, outer_self: RoutingTable, node_id: TypedKey) -> Option { + pub fn lookup_node_ref( + &self, + outer_self: RoutingTable, + node_id: TypedKey, + ) -> EyreResult> { if self.unlocked_inner.matches_own_node_id(&[node_id]) { - log_rtab!(error "can't look up own node id in routing table"); - return None; + bail!("can't look up own node id in routing table"); } if !VALID_CRYPTO_KINDS.contains(&node_id.kind) { - log_rtab!(error "can't look up node id with invalid crypto kind"); - return None; + bail!("can't look up node id with invalid crypto kind"); } let bucket_index = self.unlocked_inner.calculate_bucket_index(&node_id); let bucket = self.get_bucket(bucket_index); - bucket + Ok(bucket .entry(&node_id.value) - .map(|e| NodeRef::new(outer_self, e, None)) + .map(|e| NodeRef::new(outer_self, e, None))) } /// Resolve an existing routing table entry and return a filtered reference to it @@ -764,15 +768,15 @@ impl RoutingTableInner { node_id: TypedKey, routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, - ) -> Option { + ) -> EyreResult> { let nr = self.lookup_node_ref(outer_self, node_id)?; - Some( + Ok(nr.map(|nr| { nr.filtered_clone( NodeRefFilter::new() .with_dial_info_filter(dial_info_filter) .with_routing_domain_set(routing_domain_set), - ), - ) + ) + })) } /// Resolve an existing routing table entry and call a function on its entry without using a noderef @@ -802,50 +806,53 @@ impl RoutingTableInner { routing_domain: RoutingDomain, peer_info: PeerInfo, allow_invalid: bool, - ) -> Option { + ) -> EyreResult { // if our own node is in the list, then ignore it as we don't add ourselves to our own routing table if self .unlocked_inner .matches_own_node_id(peer_info.node_ids()) { - log_rtab!(debug "can't register own node id in routing table"); - return None; + bail!("can't register own node id in routing table"); } // node can not be its own relay let rids = peer_info.signed_node_info().relay_ids(); let nids = peer_info.node_ids(); if nids.contains_any(&rids) { - log_rtab!(debug "node can not be its own relay"); - return None; + bail!("node can not be its own relay"); } if !allow_invalid { // verify signature if !peer_info.signed_node_info().has_any_signature() { - log_rtab!(debug "signed node info for {:?} has no valid signature", peer_info.node_ids()); - return None; + bail!( + "signed node info for {:?} has no valid signature", + peer_info.node_ids() + ); } // verify signed node info is valid in this routing domain if !self.signed_node_info_is_valid_in_routing_domain( routing_domain, peer_info.signed_node_info(), ) { - log_rtab!(debug "signed node info for {:?} not valid in the {:?} routing domain", peer_info.node_ids(), routing_domain); - return None; + bail!( + "signed node info for {:?} not valid in the {:?} routing domain", + peer_info.node_ids(), + routing_domain + ); } } let (node_ids, signed_node_info) = peer_info.destructure(); - self.create_node_ref(outer_self, &node_ids, |_rti, e| { + let mut nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| { e.update_signed_node_info(routing_domain, signed_node_info); - }) - .map(|mut nr| { - nr.set_filter(Some( - NodeRefFilter::new().with_routing_domain(routing_domain), - )); - nr - }) + })?; + + nr.set_filter(Some( + NodeRefFilter::new().with_routing_domain(routing_domain), + )); + + Ok(nr) } /// Shortcut function to add a node to our routing table if it doesn't exist @@ -856,17 +863,15 @@ impl RoutingTableInner { node_id: TypedKey, descriptor: ConnectionDescriptor, timestamp: Timestamp, - ) -> Option { - let out = self.create_node_ref(outer_self, &TypedKeySet::from(node_id), |_rti, e| { + ) -> EyreResult { + let nr = self.create_node_ref(outer_self, &TypedKeySet::from(node_id), |_rti, e| { // this node is live because it literally just connected to us e.touch_last_seen(timestamp); - }); - if let Some(nr) = &out { - // set the most recent node address for connection finding and udp replies - nr.locked_mut(self) - .set_last_connection(descriptor, timestamp); - } - out + })?; + // set the most recent node address for connection finding and udp replies + nr.locked_mut(self) + .set_last_connection(descriptor, timestamp); + Ok(nr) } ////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index 25a51416..f811e4aa 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -259,19 +259,27 @@ impl RoutingTable { // Got peer info, let's add it to the routing table for pi in peer_info { // Register the node - if let Some(nr) = - self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, false) - { - // Add this our futures to process in parallel - for crypto_kind in VALID_CRYPTO_KINDS { - let routing_table = self.clone(); - let nr = nr.clone(); - unord.push( - // lets ask bootstrap to find ourselves now - async move { routing_table.reverse_find_node(crypto_kind, nr, true).await } - .instrument(Span::current()), - ); + let nr = match self.register_node_with_peer_info( + RoutingDomain::PublicInternet, + pi, + false, + ) { + Ok(nr) => nr, + Err(e) => { + log_rtab!(error "failed to register direct bootstrap peer info: {}", e); + continue; } + }; + + // Add this our futures to process in parallel + for crypto_kind in VALID_CRYPTO_KINDS { + let routing_table = self.clone(); + let nr = nr.clone(); + unord.push( + // lets ask bootstrap to find ourselves now + async move { routing_table.reverse_find_node(crypto_kind, nr, true).await } + .instrument(Span::current()), + ); } } } @@ -341,44 +349,46 @@ impl RoutingTable { let pi = PeerInfo::new(bsrec.node_ids, sni); - if let Some(nr) = - self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) - { - // Add this our futures to process in parallel - for crypto_kind in VALID_CRYPTO_KINDS { - // Do we need to bootstrap this crypto kind? - let eckey = (RoutingDomain::PublicInternet, crypto_kind); - let cnt = entry_count.get(&eckey).copied().unwrap_or_default(); - if cnt != 0 { + let nr = + match self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) { + Ok(nr) => nr, + Err(e) => { + log_rtab!(error "failed to register bootstrap peer info: {}", e); continue; } - - // Bootstrap this crypto kind - let nr = nr.clone(); - let routing_table = self.clone(); - unord.push( - async move { - // Need VALID signed peer info, so ask bootstrap to find_node of itself - // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_target(crypto_kind, nr.clone()).await; - - // Ensure we got the signed peer info - if !nr - .signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) - { - log_rtab!(warn - "bootstrap at {:?} did not return valid signed node info", - nr - ); - // If this node info is invalid, it will time out after being unpingable - } else { - // otherwise this bootstrap is valid, lets ask it to find ourselves now - routing_table.reverse_find_node(crypto_kind, nr, true).await - } - } - .instrument(Span::current()), - ); + }; + // Add this our futures to process in parallel + for crypto_kind in VALID_CRYPTO_KINDS { + // Do we need to bootstrap this crypto kind? + let eckey = (RoutingDomain::PublicInternet, crypto_kind); + let cnt = entry_count.get(&eckey).copied().unwrap_or_default(); + if cnt != 0 { + continue; } + + // Bootstrap this crypto kind + let nr = nr.clone(); + let routing_table = self.clone(); + unord.push( + async move { + // Need VALID signed peer info, so ask bootstrap to find_node of itself + // which will ensure it has the bootstrap's signed peer info as part of the response + let _ = routing_table.find_target(crypto_kind, nr.clone()).await; + + // Ensure we got the signed peer info + if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { + log_rtab!(warn + "bootstrap at {:?} did not return valid signed node info", + nr + ); + // If this node info is invalid, it will time out after being unpingable + } else { + // otherwise this bootstrap is valid, lets ask it to find ourselves now + routing_table.reverse_find_node(crypto_kind, nr, true).await + } + } + .instrument(Span::current()), + ); } } diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 3bc93145..8d980c04 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -51,14 +51,19 @@ impl RoutingTable { // The outbound relay is the host of the PWA if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { // Register new outbound relay - if let Some(nr) = self.register_node_with_peer_info( + match self.register_node_with_peer_info( RoutingDomain::PublicInternet, outbound_relay_peerinfo, false, ) { - info!("Outbound relay node selected: {}", nr); - editor.set_relay_node(nr); - got_outbound_relay = true; + Ok(nr) => { + log_rtab!("Outbound relay node selected: {}", nr); + editor.set_relay_node(nr); + got_outbound_relay = true; + } + Err(e) => { + log_rtab!(error "failed to register node with peer info: {}", e); + } } } } diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index ae19a4b0..e30052c8 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -312,7 +312,12 @@ impl RPCProcessor { NetworkResult::value(Destination::direct(peer_noderef)) } else { // Look up the sender node, we should have added it via senderNodeInfo before getting here. - if let Some(sender_noderef) = self.routing_table.lookup_node_ref(sender_node_id) { + let res = match self.routing_table.lookup_node_ref(sender_node_id) { + Ok(v) => v, + Err(e) => return NetworkResult::invalid_message( + format!("failed to look up node info for respond to: {}", e) + )}; + if let Some(sender_noderef) = res { NetworkResult::value(Destination::relay(peer_noderef, sender_noderef)) } else { return NetworkResult::invalid_message( diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index daf8ff80..805b62e4 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -480,7 +480,10 @@ impl RPCProcessor { let routing_table = this.routing_table(); // First see if we have the node in our routing table already - if let Some(nr) = routing_table.lookup_node_ref(node_id) { + if let Some(nr) = routing_table + .lookup_node_ref(node_id) + .map_err(RPCError::internal)? + { // ensure we have some dial info for the entry already, // if not, we should do the find_node anyway if nr.has_any_dial_info() { @@ -1346,20 +1349,26 @@ impl RPCProcessor { // Sender PeerInfo was specified, update our routing table with it if !self.filter_node_info(routing_domain, sender_peer_info.signed_node_info()) { - return Err(RPCError::invalid_format( + return Ok(NetworkResult::invalid_message( "sender peerinfo has invalid peer scope", )); } - opt_sender_nr = self.routing_table().register_node_with_peer_info( + opt_sender_nr = match self.routing_table().register_node_with_peer_info( routing_domain, sender_peer_info.clone(), false, - ); + ) { + Ok(v) => Some(v), + Err(e) => return Ok(NetworkResult::invalid_message(e)), + } } // look up sender node, in case it's different than our peer due to relaying if opt_sender_nr.is_none() { - opt_sender_nr = self.routing_table().lookup_node_ref(sender_node_id) + opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) { + Ok(v) => v, + Err(e) => return Ok(NetworkResult::invalid_message(e)), + } } // Update the 'seen our node info' timestamp to determine if this node needs a diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 7151b680..7c95e0c9 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -217,9 +217,9 @@ fn get_node_ref(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option"] readme = "README.md" -packages = [{include = "veilid_python"}] +packages = [{include = "veilid"}] [tool.poetry.dependencies] python = "^3.11" diff --git a/veilid-python/tests/__init__.py b/veilid-python/tests/__init__.py index e26f2d5f..7f5c42bb 100644 --- a/veilid-python/tests/__init__.py +++ b/veilid-python/tests/__init__.py @@ -1,7 +1,10 @@ +from typing import Callable, Awaitable +import os import pytest pytest_plugins = ('pytest_asyncio',) -import os +import veilid + ################################################################## VEILID_SERVER = os.getenv("VEILID_SERVER") @@ -18,5 +21,10 @@ else: ################################################################## -async def simple_update_callback(update): +async def simple_connect_and_run(func: Callable[[veilid.VeilidAPI], Awaitable]): + api = await veilid.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback) + async with api: + await func(api) + +async def simple_update_callback(update: veilid.VeilidUpdate): print("VeilidUpdate: {}".format(update)) diff --git a/veilid-python/tests/test_basic.py b/veilid-python/tests/test_basic.py index d74b25b2..6b083390 100644 --- a/veilid-python/tests/test_basic.py +++ b/veilid-python/tests/test_basic.py @@ -1,6 +1,6 @@ -# Basic veilid_python tests +# Basic veilid tests -import veilid_python +import veilid import pytest from . import * @@ -8,19 +8,22 @@ from . import * @pytest.mark.asyncio async def test_connect(): - async with await veilid_python.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback) as api: + async def func(api: veilid.VeilidAPI): pass + await simple_connect_and_run(func) @pytest.mark.asyncio async def test_fail_connect(): with pytest.raises(Exception): - async with await veilid_python.json_api_connect("fuahwelifuh32luhwafluehawea", 1, simple_update_callback) as api: + api = await veilid.json_api_connect("fuahwelifuh32luhwafluehawea", 1, simple_update_callback) + async with api: pass @pytest.mark.asyncio async def test_version(): - async with await veilid_python.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback) as api: + async def func(api: veilid.VeilidAPI): v = await api.veilid_version() print("veilid_version: {}".format(v.__dict__)) vstr = await api.veilid_version_string() print("veilid_version_string: {}".format(vstr)) + await simple_connect_and_run(func) diff --git a/veilid-python/tests/test_crypto.py b/veilid-python/tests/test_crypto.py index 6ceb993f..489beb20 100644 --- a/veilid-python/tests/test_crypto.py +++ b/veilid-python/tests/test_crypto.py @@ -1,6 +1,6 @@ -# Crypto veilid_python tests +# Crypto veilid tests -import veilid_python +import veilid import pytest from . import * @@ -8,21 +8,35 @@ from . import * @pytest.mark.asyncio async def test_best_crypto_system(): - async with await veilid_python.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback) as api: + async def func(api: veilid.VeilidAPI): bcs = await api.best_crypto_system() - # let handle dangle for test - # del bcs + await simple_connect_and_run(func) @pytest.mark.asyncio async def test_get_crypto_system(): - async with await veilid_python.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback) as api: - cs = await api.get_crypto_system(veilid_python.CryptoKind.CRYPTO_KIND_VLD0) + async def func(api: veilid.VeilidAPI): + cs = await api.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_VLD0) # clean up handle early del cs + await simple_connect_and_run(func) @pytest.mark.asyncio async def test_get_crypto_system_invalid(): - async with await veilid_python.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback) as api: - with pytest.raises(veilid_python.VeilidAPIError): - cs = await api.get_crypto_system(veilid_python.CryptoKind.CRYPTO_KIND_NONE) + async def func(api: veilid.VeilidAPI): + with pytest.raises(veilid.VeilidAPIError): + cs = await api.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_NONE) + await simple_connect_and_run(func) +@pytest.mark.asyncio +async def test_hash_and_verify_password(): + async def func(api: veilid.VeilidAPI): + bcs = await api.best_crypto_system() + nonce = await bcs.random_nonce() + salt = nonce.to_bytes() + # Password match + phash = await bcs.hash_password(b"abc123", salt) + assert await bcs.verify_password(b"abc123", phash) + # Password mismatch + phash2 = await bcs.hash_password(b"abc1234", salt) + assert not await bcs.verify_password(b"abc12345", phash) + await simple_connect_and_run(func) diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py new file mode 100644 index 00000000..4b782ecb --- /dev/null +++ b/veilid-python/tests/test_routing_context.py @@ -0,0 +1,47 @@ +# Routing context veilid tests + +import veilid +import pytest +import asyncio +import json +from . import * + +################################################################## + +@pytest.mark.asyncio +async def test_routing_contexts(): + async def func(api: veilid.VeilidAPI): + rc = await api.new_routing_context() + rcp = await rc.with_privacy() + rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED) + rcpsr = await rcps.with_custom_privacy(veilid.Stability.RELIABLE) + await simple_connect_and_run(func) + +@pytest.mark.asyncio +async def test_routing_context_app_message_loopback(): + + app_message_queue = asyncio.Queue() + + async def app_message_queue_update_callback(update: veilid.VeilidUpdate): + if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: + await app_message_queue.put(update) + + api = await veilid.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, app_message_queue_update_callback) + async with api: + + # make a routing context that uses a safety route + rc = await (await api.new_routing_context()).with_privacy() + + # get our own node id + state = await api.get_state() + node_id = state.config.config.network.routing_table.node_id.pop() + + # send an app message to our node id + message = b"abcd1234" + await rc.app_message(node_id, message) + + # we should get the same message back + #update: veilid.VeilidUpdate = await asyncio.wait_for(app_message_queue.get(), timeout=10) + #appmsg: veilid.VeilidAppMessage = update.detail + #assert appmsg.message == message + diff --git a/veilid-python/update_schema.sh b/veilid-python/update_schema.sh index ad2f59df..41f2366c 100755 --- a/veilid-python/update_schema.sh +++ b/veilid-python/update_schema.sh @@ -11,7 +11,7 @@ if [ ! -f "$VEILID_SERVER" ]; then fi # Produce schema from veilid-server -$VEILID_SERVER --emit-schema Request > $SCRIPTDIR/veilid_python/schema/Request.json -$VEILID_SERVER --emit-schema RecvMessage > $SCRIPTDIR/veilid_python/schema/RecvMessage.json +$VEILID_SERVER --emit-schema Request > $SCRIPTDIR/veilid/schema/Request.json +$VEILID_SERVER --emit-schema RecvMessage > $SCRIPTDIR/veilid/schema/RecvMessage.json diff --git a/veilid-python/veilid_python/__init__.py b/veilid-python/veilid/__init__.py similarity index 100% rename from veilid-python/veilid_python/__init__.py rename to veilid-python/veilid/__init__.py diff --git a/veilid-python/veilid_python/api.py b/veilid-python/veilid/api.py similarity index 100% rename from veilid-python/veilid_python/api.py rename to veilid-python/veilid/api.py diff --git a/veilid-python/veilid_python/config.py b/veilid-python/veilid/config.py similarity index 98% rename from veilid-python/veilid_python/config.py rename to veilid-python/veilid/config.py index d44bc11a..d9394c4e 100644 --- a/veilid-python/veilid_python/config.py +++ b/veilid-python/veilid/config.py @@ -2,6 +2,8 @@ from typing import Self, Optional from enum import StrEnum from json import dumps +from .types import * + class VeilidConfigLogLevel(StrEnum): OFF = 'Off' ERROR = 'Error' @@ -96,8 +98,8 @@ class VeilidConfigBlockStore: return self.__dict__ class VeilidConfigRoutingTable: - node_id: list[str] - node_id_secret: list[str] + node_id: list[TypedKey] + node_id_secret: list[TypedSecret] bootstrap: list[str] limit_over_attached: int limit_fully_attached: int @@ -105,7 +107,7 @@ class VeilidConfigRoutingTable: limit_attached_good: int limit_attached_weak: int - def __init__(self, node_id: list[str], node_id_secret: list[str], bootstrap: list[str], limit_over_attached: int, + def __init__(self, node_id: list[TypedKey], node_id_secret: list[TypedSecret], bootstrap: list[str], limit_over_attached: int, limit_fully_attached: int, limit_attached_strong: int, limit_attached_good: int, limit_attached_weak: int): self.node_id = node_id @@ -120,8 +122,8 @@ class VeilidConfigRoutingTable: @staticmethod def from_json(j: dict) -> Self: return VeilidConfigRoutingTable( - j['node_id'], - j['node_id_secret'], + list(map(lambda x: TypedKey(x), j['node_id'])), + list(map(lambda x: TypedSecret(x), j['node_id_secret'])), j['bootstrap'], j['limit_over_attached'], j['limit_fully_attached'], diff --git a/veilid-python/veilid_python/error.py b/veilid-python/veilid/error.py similarity index 100% rename from veilid-python/veilid_python/error.py rename to veilid-python/veilid/error.py diff --git a/veilid-python/veilid_python/json_api.py b/veilid-python/veilid/json_api.py similarity index 95% rename from veilid-python/veilid_python/json_api.py rename to veilid-python/veilid/json_api.py index 7ec5fff8..5b0d0fc3 100644 --- a/veilid-python/veilid_python/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -2,7 +2,7 @@ import json import asyncio from jsonschema import validators, exceptions -from typing import Callable, Awaitable +from typing import Callable, Awaitable, Mapping from .api import * from .state import * @@ -42,7 +42,7 @@ class _JsonVeilidAPI(VeilidAPI): # Shared Mutable State lock: asyncio.Lock next_id: int - in_flight_requests: dict + in_flight_requests: Mapping[str, asyncio.Future] def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, update_callback: Callable[[VeilidUpdate], Awaitable], validate_schema: bool = True): self.reader = reader @@ -54,7 +54,7 @@ class _JsonVeilidAPI(VeilidAPI): self.next_id = 1 self.in_flight_requests = dict() - async def __aenter__(self): + async def __aenter__(self) -> Self: return self async def __aexit__(self, *excinfo): @@ -67,6 +67,10 @@ class _JsonVeilidAPI(VeilidAPI): self.writer.close() await self.writer.wait_closed() self.writer = None + + for (reqid, reqfuture) in self.in_flight_requests.items(): + reqfuture.cancel() + finally: self.lock.release() @@ -103,7 +107,9 @@ class _JsonVeilidAPI(VeilidAPI): finally: self.lock.release() # Resolve the request's future to the response json - reqfuture.set_result(j) + if reqfuture is not None: + reqfuture.set_result(j) + async def handle_recv_messages(self): # Read lines until we're done @@ -124,8 +130,6 @@ class _JsonVeilidAPI(VeilidAPI): await self.handle_recv_message_response(j) elif j['type'] == "Update": await self.update_callback(VeilidUpdate.from_json(j)) - except: - pass finally: await self._cleanup_close() @@ -263,17 +267,17 @@ class _JsonVeilidAPI(VeilidAPI): cs_id = raise_api_result(await self.send_ndjson_request(Operation.BEST_CRYPTO_SYSTEM)) return _JsonCryptoSystem(self, cs_id) async def verify_signatures(self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature]) -> list[TypedKey]: - return map(lambda x: TypedKey(x), raise_api_result(await self.send_ndjson_request(Operation.VERIFY_SIGNATURES, + return list(map(lambda x: TypedKey(x), raise_api_result(await self.send_ndjson_request(Operation.VERIFY_SIGNATURES, node_ids = node_ids, data = data, - signatures = signatures))) + signatures = signatures)))) async def generate_signatures(self, data: bytes, key_pairs: list[TypedKeyPair]) -> list[TypedSignature]: - return map(lambda x: TypedSignature(x), raise_api_result(await self.send_ndjson_request(Operation.GENERATE_SIGNATURES, + return list(map(lambda x: TypedSignature(x), raise_api_result(await self.send_ndjson_request(Operation.GENERATE_SIGNATURES, data = data, - key_pairs = key_pairs))) + key_pairs = key_pairs)))) async def generate_key_pair(self, kind: CryptoKind) -> list[TypedKeyPair]: - return map(lambda x: TypedKeyPair(x), raise_api_result(await self.send_ndjson_request(Operation.GENERATE_KEY_PAIR, - kind = kind))) + return list(map(lambda x: TypedKeyPair(x), raise_api_result(await self.send_ndjson_request(Operation.GENERATE_KEY_PAIR, + kind = kind)))) async def now(self) -> Timestamp: return Timestamp(raise_api_result(await self.send_ndjson_request(Operation.NOW))) async def debug(self, command: str) -> str: @@ -456,10 +460,10 @@ class _JsonTableDb(TableDb): db_id = self.db_id, db_op = TableDbOperation.GET_COLUMN_COUNT)) async def get_keys(self, col: int) -> list[bytes]: - return map(lambda x: urlsafe_b64decode_no_pad(x), raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, + return list(map(lambda x: urlsafe_b64decode_no_pad(x), raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, db_id = self.db_id, db_op = TableDbOperation.GET_KEYS, - col = col))) + col = col)))) async def transact(self) -> TableDbTransaction: tx_id = raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, db_id = self.db_id, @@ -627,6 +631,6 @@ class _JsonCryptoSystem(CryptoSystem): ###################################################### -def json_api_connect(host:str, port:int, update_callback: Callable[[VeilidUpdate], Awaitable]) -> VeilidAPI: - return _JsonVeilidAPI.connect(host, port, update_callback) +async def json_api_connect(host:str, port:int, update_callback: Callable[[VeilidUpdate], Awaitable]) -> VeilidAPI: + return await _JsonVeilidAPI.connect(host, port, update_callback) diff --git a/veilid-python/veilid_python/operations.py b/veilid-python/veilid/operations.py similarity index 100% rename from veilid-python/veilid_python/operations.py rename to veilid-python/veilid/operations.py diff --git a/veilid-python/veilid_python/schema/RecvMessage.json b/veilid-python/veilid/schema/RecvMessage.json similarity index 100% rename from veilid-python/veilid_python/schema/RecvMessage.json rename to veilid-python/veilid/schema/RecvMessage.json diff --git a/veilid-python/veilid_python/schema/Request.json b/veilid-python/veilid/schema/Request.json similarity index 100% rename from veilid-python/veilid_python/schema/Request.json rename to veilid-python/veilid/schema/Request.json diff --git a/veilid-python/veilid_python/state.py b/veilid-python/veilid/state.py similarity index 96% rename from veilid-python/veilid_python/state.py rename to veilid-python/veilid/state.py index a5fd9cd8..13404754 100644 --- a/veilid-python/veilid_python/state.py +++ b/veilid-python/veilid/state.py @@ -181,7 +181,7 @@ class VeilidStateNetwork: j['started'], ByteCount(j['bps_down']), ByteCount(j['bps_up']), - map(lambda x: PeerTableData.from_json(x), j['peers'])) + list(map(lambda x: PeerTableData.from_json(x), j['peers']))) class VeilidStateConfig: config: VeilidConfig @@ -193,7 +193,8 @@ class VeilidStateConfig: def from_json(j: dict) -> Self: '''JSON object hook''' return VeilidStateConfig( - j['config']) + VeilidConfig.from_json(j['config']) + ) class VeilidState: attachment: VeilidStateAttachment @@ -227,7 +228,7 @@ class VeilidLog: def from_json(j: dict) -> Self: '''JSON object hook''' return VeilidLog( - VeilidLogLevel(j['attachment']), + VeilidLogLevel(j['log_level']), j['message'], j['backtrace']) @@ -276,8 +277,8 @@ class VeilidRouteChange: def from_json(j: dict) -> Self: '''JSON object hook''' return VeilidRouteChange( - map(lambda x: RouteId(x), j['dead_routes']), - map(lambda x: RouteId(x), j['dead_remote_routes'])) + list(map(lambda x: RouteId(x), j['dead_routes'])), + list(map(lambda x: RouteId(x), j['dead_remote_routes']))) class VeilidValueChange: key: TypedKey @@ -296,7 +297,7 @@ class VeilidValueChange: '''JSON object hook''' return VeilidValueChange( TypedKey(j['key']), - map(lambda x: ValueSubkey(x), j['subkeys']), + list(map(lambda x: ValueSubkey(x), j['subkeys'])), j['count'], ValueData.from_json(j['value'])) @@ -346,4 +347,4 @@ class VeilidUpdate: detail = None case _: raise ValueError("Unknown VeilidUpdateKind") - + return VeilidUpdate(kind, detail) diff --git a/veilid-python/veilid_python/types.py b/veilid-python/veilid/types.py similarity index 97% rename from veilid-python/veilid_python/types.py rename to veilid-python/veilid/types.py index 0f4ced5e..b3388531 100644 --- a/veilid-python/veilid_python/types.py +++ b/veilid-python/veilid/types.py @@ -11,8 +11,7 @@ def urlsafe_b64encode_no_pad(b: bytes) -> str: """ Removes any `=` used as padding from the encoded string. """ - encoded = str(base64.urlsafe_b64encode(b)) - return encoded.rstrip("=") + return base64.urlsafe_b64encode(b).decode().rstrip("=") def urlsafe_b64decode_no_pad(s: str) -> bytes: @@ -20,7 +19,7 @@ def urlsafe_b64decode_no_pad(s: str) -> bytes: Adds back in the required padding before decoding. """ padding = 4 - (len(s) % 4) - string = string + ("=" * padding) + s = s + ("=" * padding) return base64.urlsafe_b64decode(s) class VeilidJSONEncoder(json.JSONEncoder): @@ -248,7 +247,7 @@ class DHTSchema: if DHTSchemaKind(j['kind']) == DHTSchemaKind.SMPL: return DHTSchema.smpl( j['o_cnt'], - map(lambda x: DHTSchemaSMPLMember.from_json(x), j['members'])) + list(map(lambda x: DHTSchemaSMPLMember.from_json(x), j['members']))) raise Exception("Unknown DHTSchema kind", j['kind']) def to_json(self) -> dict: diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 6b2f9d55..d13b2a7d 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -238,18 +238,10 @@ impl ClientApi { async fn receive_requests( self, - conn_tuple: (SocketAddr, SocketAddr), mut reader: R, requests_tx: flume::Sender>, responses_tx: flume::Sender, ) -> VeilidAPIResult> { - // responses_tx becomes owned by recv_requests_future - // Start sending updates - self.inner - .lock() - .update_channels - .insert(conn_tuple, responses_tx.clone()); - let mut linebuf = String::new(); while let Ok(size) = reader.read_line(&mut linebuf).await { // Eof? @@ -277,10 +269,6 @@ impl ClientApi { } } - // Stop sending updates - // Will cause send_responses_future to stop because we drop the responses_tx - self.inner.lock().update_channels.remove(&conn_tuple); - VeilidAPIResult::Ok(None) } @@ -290,8 +278,8 @@ impl ClientApi { mut writer: W, ) -> VeilidAPIResult> { while let Ok(resp) = responses_rx.recv_async().await { - if let Err(e) = writer.write_all(resp.as_bytes()).await { - eprintln!("failed to write response: {}", e) + if let Err(_) = writer.write_all(resp.as_bytes()).await { + break; } } VeilidAPIResult::Ok(None) @@ -350,11 +338,16 @@ impl ClientApi { let (requests_tx, requests_rx) = flume::unbounded(); let (responses_tx, responses_rx) = flume::unbounded(); + // Start sending updates + self.inner + .lock() + .update_channels + .insert(conn_tuple, responses_tx.clone()); + // Request receive processor future // Receives from socket and enqueues RequestLines // Completes when the connection is closed or there is a failure unord.push(system_boxed(self.clone().receive_requests( - conn_tuple, reader, requests_tx, responses_tx, @@ -398,6 +391,9 @@ impl ClientApi { )); } + // Stop sending updates + self.inner.lock().update_channels.remove(&conn_tuple); + debug!( "Closed Client API Connection: {:?} -> {:?}", peer_addr, local_addr @@ -414,8 +410,8 @@ impl ClientApi { // Pass other updates to clients let inner = self.inner.lock(); for ch in inner.update_channels.values() { - if let Err(e) = ch.send(veilid_update.clone()) { - eprintln!("failed to send update: {}", e); + if let Err(_) = ch.send(veilid_update.clone()) { + // eprintln!("failed to send update: {}", e); } } }