add watch capability

This commit is contained in:
Christien Rioux 2023-12-17 14:03:56 -05:00
parent 70ef992714
commit 92cb5a07cf
9 changed files with 28 additions and 16 deletions

View File

@ -31,11 +31,11 @@ pub const PEEK_DETECT_LEN: usize = 64;
cfg_if! { cfg_if! {
if #[cfg(all(feature = "unstable-blockstore", feature="unstable-tunnels"))] { if #[cfg(all(feature = "unstable-blockstore", feature="unstable-tunnels"))] {
const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 8; const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 9;
} else if #[cfg(any(feature = "unstable-blockstore", feature="unstable-tunnels"))] { } else if #[cfg(any(feature = "unstable-blockstore", feature="unstable-tunnels"))] {
const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 7; const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 8;
} else { } else {
const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 6; const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 7;
} }
} }
pub const PUBLIC_INTERNET_CAPABILITIES: [Capability; PUBLIC_INTERNET_CAPABILITIES_LEN] = [ pub const PUBLIC_INTERNET_CAPABILITIES: [Capability; PUBLIC_INTERNET_CAPABILITIES_LEN] = [
@ -46,19 +46,21 @@ pub const PUBLIC_INTERNET_CAPABILITIES: [Capability; PUBLIC_INTERNET_CAPABILITIE
CAP_RELAY, CAP_RELAY,
CAP_VALIDATE_DIAL_INFO, CAP_VALIDATE_DIAL_INFO,
CAP_DHT, CAP_DHT,
CAP_DHT_WATCH,
CAP_APPMESSAGE, CAP_APPMESSAGE,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
CAP_BLOCKSTORE, CAP_BLOCKSTORE,
]; ];
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
const LOCAL_NETWORK_CAPABILITIES_LEN: usize = 4; const LOCAL_NETWORK_CAPABILITIES_LEN: usize = 5;
#[cfg(not(feature = "unstable-blockstore"))] #[cfg(not(feature = "unstable-blockstore"))]
const LOCAL_NETWORK_CAPABILITIES_LEN: usize = 3; const LOCAL_NETWORK_CAPABILITIES_LEN: usize = 4;
pub const LOCAL_NETWORK_CAPABILITIES: [Capability; LOCAL_NETWORK_CAPABILITIES_LEN] = [ pub const LOCAL_NETWORK_CAPABILITIES: [Capability; LOCAL_NETWORK_CAPABILITIES_LEN] = [
CAP_RELAY, CAP_RELAY,
CAP_DHT, CAP_DHT,
CAP_DHT_WATCH,
CAP_APPMESSAGE, CAP_APPMESSAGE,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
CAP_BLOCKSTORE, CAP_BLOCKSTORE,
@ -551,10 +553,7 @@ impl Network {
.wrap_err("connect failure")? .wrap_err("connect failure")?
} }
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect( WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
None,
&dial_info,
connect_timeout_ms)
.await .await
.wrap_err("connect failure")? .wrap_err("connect failure")?
} }

View File

@ -27,6 +27,7 @@ pub const PUBLIC_INTERNET_CAPABILITIES: [Capability; PUBLIC_INTERNET_CAPABILITIE
//CAP_RELAY, //CAP_RELAY,
//CAP_VALIDATE_DIAL_INFO, //CAP_VALIDATE_DIAL_INFO,
CAP_DHT, CAP_DHT,
CAP_DHT_WATCH,
CAP_APPMESSAGE, CAP_APPMESSAGE,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
CAP_BLOCKSTORE, CAP_BLOCKSTORE,
@ -40,6 +41,7 @@ pub const PUBLIC_INTERNET_CAPABILITIES: [Capability; PUBLIC_INTERNET_CAPABILITIE
// pub const LOCAL_NETWORK_CAPABILITIES: [Capability; LOCAL_NETWORK_CAPABILITIES_LEN] = [ // pub const LOCAL_NETWORK_CAPABILITIES: [Capability; LOCAL_NETWORK_CAPABILITIES_LEN] = [
// //CAP_RELAY, // //CAP_RELAY,
// CAP_DHT, // CAP_DHT,
// CAP_DHT_WATCH,
// CAP_APPMESSAGE, // CAP_APPMESSAGE,
// #[cfg(feature = "unstable-blockstore")] // #[cfg(feature = "unstable-blockstore")]
// CAP_BLOCKSTORE, // CAP_BLOCKSTORE,

View File

@ -8,6 +8,7 @@ pub const CAP_SIGNAL: Capability = FourCC(*b"SGNL");
pub const CAP_RELAY: Capability = FourCC(*b"RLAY"); pub const CAP_RELAY: Capability = FourCC(*b"RLAY");
pub const CAP_VALIDATE_DIAL_INFO: Capability = FourCC(*b"DIAL"); pub const CAP_VALIDATE_DIAL_INFO: Capability = FourCC(*b"DIAL");
pub const CAP_DHT: Capability = FourCC(*b"DHTV"); pub const CAP_DHT: Capability = FourCC(*b"DHTV");
pub const CAP_DHT_WATCH: Capability = FourCC(*b"DHTW");
pub const CAP_APPMESSAGE: Capability = FourCC(*b"APPM"); pub const CAP_APPMESSAGE: Capability = FourCC(*b"APPM");
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC"); pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC");

View File

@ -211,7 +211,7 @@ impl RPCProcessor {
// Get the nodes that we know about that are closer to the the key than our own node // Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH]));
#[cfg(feature="debug-dht")] #[cfg(feature="debug-dht")]
{ {

View File

@ -164,6 +164,15 @@ impl RPCProcessor {
if !opi.signed_node_info().node_info().has_capability(CAP_DHT) { if !opi.signed_node_info().node_info().has_capability(CAP_DHT) {
return Ok(NetworkResult::service_unavailable("dht is not available")); return Ok(NetworkResult::service_unavailable("dht is not available"));
} }
if !opi
.signed_node_info()
.node_info()
.has_capability(CAP_DHT_WATCH)
{
return Ok(NetworkResult::service_unavailable(
"dht watch is not available",
));
}
// Get the question // Get the question
let kind = msg.operation.kind().clone(); let kind = msg.operation.kind().clone();
@ -199,7 +208,7 @@ impl RPCProcessor {
// Get the nodes that we know about that are closer to the the key than our own node // Get the nodes that we know about that are closer to the the key than our own node
let closer_to_key_peers = network_result_try!( let closer_to_key_peers = network_result_try!(
routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]) routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH])
); );
// See if we would have accepted this as a set // See if we would have accepted this as a set

View File

@ -169,7 +169,7 @@ impl StorageManager {
key_count, key_count,
fanout, fanout,
timeout_us, timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]), capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]),
call_routine, call_routine,
check_done, check_done,
); );

View File

@ -160,7 +160,7 @@ impl StorageManager {
key_count, key_count,
fanout, fanout,
timeout_us, timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]), capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]),
call_routine, call_routine,
check_done, check_done,
); );

View File

@ -127,7 +127,7 @@ impl StorageManager {
key_count, key_count,
1, 1,
timeout_us, timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]), capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]),
call_routine, call_routine,
check_done, check_done,
); );

View File

@ -59,6 +59,7 @@ class Capability(StrEnum):
CAP_RELAY = "RLAY" CAP_RELAY = "RLAY"
CAP_VALIDATE_DIAL_INFO = "DIAL" CAP_VALIDATE_DIAL_INFO = "DIAL"
CAP_DHT = "DHTV" CAP_DHT = "DHTV"
CAP_DHT_WATCH = "DHTW"
CAP_APPMESSAGE = "APPM" CAP_APPMESSAGE = "APPM"
CAP_BLOCKSTORE = "BLOC" CAP_BLOCKSTORE = "BLOC"