mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-12-27 08:19:27 -05:00
Merge branch 'watchvalue-fixes' into 'main'
Fixes for watchvalue See merge request veilid/veilid!272
This commit is contained in:
commit
7daf351608
@ -39,8 +39,7 @@ while true; do
|
|||||||
[yY])
|
[yY])
|
||||||
echo Installing Android SDK...
|
echo Installing Android SDK...
|
||||||
# Install Android SDK
|
# Install Android SDK
|
||||||
mkdir $HOME/Android
|
mkdir -p $HOME/Android/Sdk
|
||||||
mkdir $HOME/Android/Sdk
|
|
||||||
curl -o $HOME/Android/cmdline-tools.zip https://dl.google.com/android/repository/commandlinetools-linux-9123335_latest.zip
|
curl -o $HOME/Android/cmdline-tools.zip https://dl.google.com/android/repository/commandlinetools-linux-9123335_latest.zip
|
||||||
cd $HOME/Android
|
cd $HOME/Android
|
||||||
unzip $HOME/Android/cmdline-tools.zip
|
unzip $HOME/Android/cmdline-tools.zip
|
||||||
|
@ -33,6 +33,8 @@ pub(crate) struct RoutingTableInner {
|
|||||||
/// Async tagged critical sections table
|
/// Async tagged critical sections table
|
||||||
/// Tag: "tick" -> in ticker
|
/// Tag: "tick" -> in ticker
|
||||||
pub(super) critical_sections: AsyncTagLockTable<&'static str>,
|
pub(super) critical_sections: AsyncTagLockTable<&'static str>,
|
||||||
|
/// Last time we pinged checked the active watches
|
||||||
|
pub(super) opt_active_watch_keepalive_ts: Option<Timestamp>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RoutingTableInner {
|
impl RoutingTableInner {
|
||||||
@ -50,6 +52,7 @@ impl RoutingTableInner {
|
|||||||
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
|
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
|
||||||
route_spec_store: None,
|
route_spec_store: None,
|
||||||
critical_sections: AsyncTagLockTable::new(),
|
critical_sections: AsyncTagLockTable::new(),
|
||||||
|
opt_active_watch_keepalive_ts: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,10 @@ use super::*;
|
|||||||
|
|
||||||
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo
|
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo
|
||||||
/// remains valid, as well as to make sure we remain in any relay node's routing table
|
/// remains valid, as well as to make sure we remain in any relay node's routing table
|
||||||
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
|
const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
|
||||||
|
|
||||||
|
/// Keepalive pings are done for active watch nodes to make sure they are still there
|
||||||
|
const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
|
||||||
|
|
||||||
/// Ping queue processing depth
|
/// Ping queue processing depth
|
||||||
const MAX_PARALLEL_PINGS: usize = 16;
|
const MAX_PARALLEL_PINGS: usize = 16;
|
||||||
@ -15,8 +18,7 @@ type PingValidatorFuture =
|
|||||||
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>;
|
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>;
|
||||||
|
|
||||||
impl RoutingTable {
|
impl RoutingTable {
|
||||||
// Ping each node in the routing table if they need to be pinged
|
// Ping the relay to keep it alive, over every protocol it is relaying for us
|
||||||
// to determine their reliability
|
|
||||||
#[instrument(level = "trace", skip(self, futurequeue), err)]
|
#[instrument(level = "trace", skip(self, futurequeue), err)]
|
||||||
async fn relay_keepalive_public_internet(
|
async fn relay_keepalive_public_internet(
|
||||||
&self,
|
&self,
|
||||||
@ -35,7 +37,7 @@ impl RoutingTable {
|
|||||||
let relay_needs_keepalive = opt_relay_keepalive_ts
|
let relay_needs_keepalive = opt_relay_keepalive_ts
|
||||||
.map(|kts| {
|
.map(|kts| {
|
||||||
cur_ts.saturating_sub(kts).as_u64()
|
cur_ts.saturating_sub(kts).as_u64()
|
||||||
>= (KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
|
>= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
|
||||||
})
|
})
|
||||||
.unwrap_or(true);
|
.unwrap_or(true);
|
||||||
|
|
||||||
@ -118,6 +120,53 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping the active watch nodes to ensure they are still there
|
||||||
|
#[instrument(level = "trace", skip(self, futurequeue), err)]
|
||||||
|
async fn active_watches_keepalive_public_internet(
|
||||||
|
&self,
|
||||||
|
cur_ts: Timestamp,
|
||||||
|
futurequeue: &mut VecDeque<PingValidatorFuture>,
|
||||||
|
) -> EyreResult<()> {
|
||||||
|
let rpc = self.rpc_processor();
|
||||||
|
|
||||||
|
let watches_need_keepalive = {
|
||||||
|
let mut inner = self.inner.write();
|
||||||
|
let need = inner
|
||||||
|
.opt_active_watch_keepalive_ts
|
||||||
|
.map(|kts| {
|
||||||
|
cur_ts.saturating_sub(kts).as_u64()
|
||||||
|
>= (ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
|
||||||
|
})
|
||||||
|
.unwrap_or(true);
|
||||||
|
if need {
|
||||||
|
inner.opt_active_watch_keepalive_ts = Some(cur_ts);
|
||||||
|
}
|
||||||
|
need
|
||||||
|
};
|
||||||
|
|
||||||
|
if !watches_need_keepalive {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all the active watches from the storage manager
|
||||||
|
let storage_manager = self.unlocked_inner.network_manager.storage_manager();
|
||||||
|
let watch_node_refs = storage_manager.get_active_watch_nodes().await;
|
||||||
|
|
||||||
|
for watch_nr in watch_node_refs {
|
||||||
|
let rpc = rpc.clone();
|
||||||
|
|
||||||
|
log_rtab!("--> Watch ping to {:?}", watch_nr);
|
||||||
|
|
||||||
|
futurequeue.push_back(
|
||||||
|
async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await }
|
||||||
|
.instrument(Span::current())
|
||||||
|
.boxed(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// Ping each node in the routing table if they need to be pinged
|
// Ping each node in the routing table if they need to be pinged
|
||||||
// to determine their reliability
|
// to determine their reliability
|
||||||
#[instrument(level = "trace", skip(self, futurequeue), err)]
|
#[instrument(level = "trace", skip(self, futurequeue), err)]
|
||||||
@ -140,6 +189,10 @@ impl RoutingTable {
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check active watch keepalives
|
||||||
|
self.active_watches_keepalive_public_internet(cur_ts, futurequeue)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Just do a single ping with the best protocol for all the other nodes to check for liveness
|
// Just do a single ping with the best protocol for all the other nodes to check for liveness
|
||||||
for nr in node_refs {
|
for nr in node_refs {
|
||||||
let rpc = rpc.clone();
|
let rpc = rpc.clone();
|
||||||
|
@ -72,7 +72,7 @@ impl FanoutQueue {
|
|||||||
|
|
||||||
// Return next fanout candidate
|
// Return next fanout candidate
|
||||||
pub fn next(&mut self) -> Option<NodeRef> {
|
pub fn next(&mut self) -> Option<NodeRef> {
|
||||||
let cn = self.current_nodes.pop_front()?;
|
let cn = self.current_nodes.pop_back()?;
|
||||||
self.current_nodes.make_contiguous();
|
self.current_nodes.make_contiguous();
|
||||||
let key = cn.node_ids().get(self.crypto_kind).unwrap();
|
let key = cn.node_ids().get(self.crypto_kind).unwrap();
|
||||||
|
|
||||||
|
@ -201,6 +201,16 @@ impl StorageManager {
|
|||||||
Ok(!inner.offline_subkey_writes.is_empty())
|
Ok(!inner.offline_subkey_writes.is_empty())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the set of nodes in our active watches
|
||||||
|
pub async fn get_active_watch_nodes(&self) -> Vec<NodeRef> {
|
||||||
|
let inner = self.inner.lock().await;
|
||||||
|
inner
|
||||||
|
.opened_records
|
||||||
|
.values()
|
||||||
|
.filter_map(|v| v.active_watch().map(|aw| aw.watch_node))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
|
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
|
||||||
pub async fn create_record(
|
pub async fn create_record(
|
||||||
&self,
|
&self,
|
||||||
@ -304,16 +314,14 @@ impl StorageManager {
|
|||||||
// Use the safety selection we opened the record with
|
// Use the safety selection we opened the record with
|
||||||
// Use the writer we opened with as the 'watcher' as well
|
// Use the writer we opened with as the 'watcher' as well
|
||||||
let opt_owvresult = self
|
let opt_owvresult = self
|
||||||
.outbound_watch_value(
|
.outbound_watch_value_cancel(
|
||||||
rpc_processor,
|
rpc_processor,
|
||||||
key,
|
key,
|
||||||
ValueSubkeyRangeSet::full(),
|
ValueSubkeyRangeSet::full(),
|
||||||
Timestamp::new(0),
|
|
||||||
0,
|
|
||||||
opened_record.safety_selection(),
|
opened_record.safety_selection(),
|
||||||
opened_record.writer().cloned(),
|
opened_record.writer().cloned(),
|
||||||
Some(active_watch.id),
|
active_watch.id,
|
||||||
Some(active_watch.watch_node),
|
active_watch.watch_node,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
if let Some(owvresult) = opt_owvresult {
|
if let Some(owvresult) = opt_owvresult {
|
||||||
|
@ -20,6 +20,113 @@ pub(super) struct OutboundWatchValueResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl StorageManager {
|
impl StorageManager {
|
||||||
|
/// Perform a 'watch value cancel' on the network without fanout
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub(super) async fn outbound_watch_value_cancel(
|
||||||
|
&self,
|
||||||
|
rpc_processor: RPCProcessor,
|
||||||
|
key: TypedKey,
|
||||||
|
subkeys: ValueSubkeyRangeSet,
|
||||||
|
safety_selection: SafetySelection,
|
||||||
|
opt_watcher: Option<KeyPair>,
|
||||||
|
watch_id: u64,
|
||||||
|
watch_node: NodeRef,
|
||||||
|
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
||||||
|
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||||
|
// which lives for the duration of the app's runtime
|
||||||
|
let watcher = opt_watcher.unwrap_or_else(|| {
|
||||||
|
self.unlocked_inner
|
||||||
|
.anonymous_watch_keys
|
||||||
|
.get(key.kind)
|
||||||
|
.unwrap()
|
||||||
|
.value
|
||||||
|
});
|
||||||
|
|
||||||
|
let wva = VeilidAPIError::from_network_result(
|
||||||
|
rpc_processor
|
||||||
|
.clone()
|
||||||
|
.rpc_call_watch_value(
|
||||||
|
Destination::direct(watch_node.clone()).with_safety(safety_selection),
|
||||||
|
key,
|
||||||
|
subkeys,
|
||||||
|
Timestamp::default(),
|
||||||
|
0,
|
||||||
|
watcher,
|
||||||
|
Some(watch_id),
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if wva.answer.accepted {
|
||||||
|
Ok(Some(OutboundWatchValueResult {
|
||||||
|
expiration_ts: wva.answer.expiration_ts,
|
||||||
|
watch_id: wva.answer.watch_id,
|
||||||
|
watch_node,
|
||||||
|
opt_value_changed_route: wva.reply_private_route,
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a 'watch value change' on the network without fanout
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub(super) async fn outbound_watch_value_change(
|
||||||
|
&self,
|
||||||
|
rpc_processor: RPCProcessor,
|
||||||
|
key: TypedKey,
|
||||||
|
subkeys: ValueSubkeyRangeSet,
|
||||||
|
expiration: Timestamp,
|
||||||
|
count: u32,
|
||||||
|
safety_selection: SafetySelection,
|
||||||
|
opt_watcher: Option<KeyPair>,
|
||||||
|
watch_id: u64,
|
||||||
|
watch_node: NodeRef,
|
||||||
|
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
||||||
|
if count == 0 {
|
||||||
|
apibail_internal!("cancel should be done with outbound_watch_value_cancel");
|
||||||
|
}
|
||||||
|
if watch_id == 0 {
|
||||||
|
apibail_internal!("watch id should not be zero when changing watch");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||||
|
// which lives for the duration of the app's runtime
|
||||||
|
let watcher = opt_watcher.unwrap_or_else(|| {
|
||||||
|
self.unlocked_inner
|
||||||
|
.anonymous_watch_keys
|
||||||
|
.get(key.kind)
|
||||||
|
.unwrap()
|
||||||
|
.value
|
||||||
|
});
|
||||||
|
|
||||||
|
let wva = VeilidAPIError::from_network_result(
|
||||||
|
rpc_processor
|
||||||
|
.clone()
|
||||||
|
.rpc_call_watch_value(
|
||||||
|
Destination::direct(watch_node.clone()).with_safety(safety_selection),
|
||||||
|
key,
|
||||||
|
subkeys,
|
||||||
|
expiration,
|
||||||
|
count,
|
||||||
|
watcher,
|
||||||
|
Some(watch_id),
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if wva.answer.accepted {
|
||||||
|
Ok(Some(OutboundWatchValueResult {
|
||||||
|
expiration_ts: wva.answer.expiration_ts,
|
||||||
|
watch_id: wva.answer.watch_id,
|
||||||
|
watch_node,
|
||||||
|
opt_value_changed_route: wva.reply_private_route,
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Perform a 'watch value' query on the network using fanout
|
/// Perform a 'watch value' query on the network using fanout
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(super) async fn outbound_watch_value(
|
pub(super) async fn outbound_watch_value(
|
||||||
@ -34,6 +141,56 @@ impl StorageManager {
|
|||||||
opt_watch_id: Option<u64>,
|
opt_watch_id: Option<u64>,
|
||||||
opt_watch_node: Option<NodeRef>,
|
opt_watch_node: Option<NodeRef>,
|
||||||
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
||||||
|
// if the count is zero, we are cancelling
|
||||||
|
if count == 0 {
|
||||||
|
// Ensure watch id is specified
|
||||||
|
let Some(watch_id) = opt_watch_id else {
|
||||||
|
apibail_internal!("Must specify a watch id in order to cancel it");
|
||||||
|
};
|
||||||
|
// Ensure watch node is specified
|
||||||
|
let Some(watch_node) = opt_watch_node else {
|
||||||
|
apibail_internal!("Must specify a watch node in order to cancel it");
|
||||||
|
};
|
||||||
|
return self
|
||||||
|
.outbound_watch_value_cancel(
|
||||||
|
rpc_processor,
|
||||||
|
key,
|
||||||
|
subkeys,
|
||||||
|
safety_selection,
|
||||||
|
opt_watcher,
|
||||||
|
watch_id,
|
||||||
|
watch_node,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the watch id and watch node are specified, then we're trying to change an existing watch
|
||||||
|
// first try to do that, then fall back to fanout for a new watch id
|
||||||
|
if let Some(watch_id) = opt_watch_id {
|
||||||
|
let Some(watch_node) = opt_watch_node else {
|
||||||
|
apibail_internal!("Must specify a watch node in order to change it");
|
||||||
|
};
|
||||||
|
if let Some(res) = self
|
||||||
|
.outbound_watch_value_change(
|
||||||
|
rpc_processor.clone(),
|
||||||
|
key,
|
||||||
|
subkeys.clone(),
|
||||||
|
expiration,
|
||||||
|
count,
|
||||||
|
safety_selection,
|
||||||
|
opt_watcher,
|
||||||
|
watch_id,
|
||||||
|
watch_node,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
// If a change was successful then return immediately
|
||||||
|
return Ok(Some(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, treat this like a new watch
|
||||||
|
}
|
||||||
|
|
||||||
let routing_table = rpc_processor.routing_table();
|
let routing_table = rpc_processor.routing_table();
|
||||||
|
|
||||||
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
|
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
|
||||||
@ -45,23 +202,6 @@ impl StorageManager {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get the nodes we know are caching this value to seed the fanout
|
|
||||||
let init_fanout_queue = if let Some(watch_node) = opt_watch_node {
|
|
||||||
vec![watch_node]
|
|
||||||
} else {
|
|
||||||
let inner = self.inner.lock().await;
|
|
||||||
inner
|
|
||||||
.get_value_nodes(key)?
|
|
||||||
.unwrap_or_default()
|
|
||||||
.into_iter()
|
|
||||||
.filter(|x| {
|
|
||||||
x.node_info(RoutingDomain::PublicInternet)
|
|
||||||
.map(|ni| ni.has_capability(CAP_DHT_WATCH))
|
|
||||||
.unwrap_or_default()
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||||
// which lives for the duration of the app's runtime
|
// which lives for the duration of the app's runtime
|
||||||
let watcher = opt_watcher.unwrap_or_else(|| {
|
let watcher = opt_watcher.unwrap_or_else(|| {
|
||||||
@ -72,6 +212,21 @@ impl StorageManager {
|
|||||||
.value
|
.value
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Get the nodes we know are caching this value to seed the fanout
|
||||||
|
let init_fanout_queue = {
|
||||||
|
let inner = self.inner.lock().await;
|
||||||
|
inner
|
||||||
|
.get_value_nodes(key)?
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.filter(|x| {
|
||||||
|
x.node_info(RoutingDomain::PublicInternet)
|
||||||
|
.map(|ni| ni.has_capabilities(&[CAP_DHT, CAP_DHT_WATCH]))
|
||||||
|
.unwrap_or_default()
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
// Make do-watch-value answer context
|
// Make do-watch-value answer context
|
||||||
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
|
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
|
||||||
opt_watch_value_result: None,
|
opt_watch_value_result: None,
|
||||||
@ -82,6 +237,7 @@ impl StorageManager {
|
|||||||
let rpc_processor = rpc_processor.clone();
|
let rpc_processor = rpc_processor.clone();
|
||||||
let context = context.clone();
|
let context = context.clone();
|
||||||
let subkeys = subkeys.clone();
|
let subkeys = subkeys.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let wva = network_result_try!(
|
let wva = network_result_try!(
|
||||||
rpc_processor
|
rpc_processor
|
||||||
@ -93,27 +249,32 @@ impl StorageManager {
|
|||||||
expiration,
|
expiration,
|
||||||
count,
|
count,
|
||||||
watcher,
|
watcher,
|
||||||
opt_watch_id
|
None
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
);
|
);
|
||||||
|
|
||||||
// Keep answer if we got one
|
// Keep answer if we got one
|
||||||
|
// (accepted means the node could provide an answer, not that the watch is active)
|
||||||
if wva.answer.accepted {
|
if wva.answer.accepted {
|
||||||
|
let mut done = false;
|
||||||
if wva.answer.expiration_ts.as_u64() > 0 {
|
if wva.answer.expiration_ts.as_u64() > 0 {
|
||||||
// If the expiration time is greater than zero this watch is active
|
// If the expiration time is greater than zero this watch is active
|
||||||
log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
|
log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
|
||||||
|
done = true;
|
||||||
} else {
|
} else {
|
||||||
// If the returned expiration time is zero, this watch was cancelled, or inactive
|
// If the returned expiration time is zero, this watch was cancelled or rejected
|
||||||
log_dht!(debug "Watch inactive: id={}", wva.answer.watch_id);
|
// If we are asking to cancel then check_done will stop after the first node
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
let mut ctx = context.lock();
|
||||||
|
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
|
||||||
|
expiration_ts: wva.answer.expiration_ts,
|
||||||
|
watch_id: wva.answer.watch_id,
|
||||||
|
watch_node: next_node.clone(),
|
||||||
|
opt_value_changed_route: wva.reply_private_route,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
let mut ctx = context.lock();
|
|
||||||
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
|
|
||||||
expiration_ts: wva.answer.expiration_ts,
|
|
||||||
watch_id: wva.answer.watch_id,
|
|
||||||
watch_node: next_node.clone(),
|
|
||||||
opt_value_changed_route: wva.reply_private_route,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
|
Loading…
Reference in New Issue
Block a user