This commit is contained in:
Christien Rioux 2025-09-04 17:18:01 -04:00
parent 34c82ec128
commit 0ca19d6207
22 changed files with 650 additions and 367 deletions

View file

@ -38,6 +38,7 @@
- `RecordKey`s are now validated on both server side and client side of DHT RPC operations, closes [#299](https://gitlab.com/veilid/veilid/-/issues/299) - `RecordKey`s are now validated on both server side and client side of DHT RPC operations, closes [#299](https://gitlab.com/veilid/veilid/-/issues/299)
- Revert punishment for FailedToVerifySenderPeerInfo, with a better peer info filter, fixes [#470](https://gitlab.com/veilid/veilid/-/issues/470) - Revert punishment for FailedToVerifySenderPeerInfo, with a better peer info filter, fixes [#470](https://gitlab.com/veilid/veilid/-/issues/470)
- Update keyring-manager to eliminate licensing issue - Update keyring-manager to eliminate licensing issue
- Added 'tick lag' detection to check for missed watch updates
- veilid-python: - veilid-python:
- Correction of type hints - Correction of type hints

View file

@ -28,11 +28,11 @@ fn format_ts(ts: &json::JsonValue) -> String {
return "---".to_owned(); return "---".to_owned();
} }
let ts = json_str_u64(ts); let ts = json_str_u64(ts);
let secs = timestamp_to_secs(ts); let secs = timestamp_duration_to_secs(ts);
if secs >= 1.0 { if secs >= 1.0 {
format!("{:.2}s", timestamp_to_secs(ts)) format!("{:.2}s", secs)
} else { } else {
format!("{:.2}ms", timestamp_to_secs(ts) * 1000.0) format!("{:.2}ms", secs * 1000.0)
} }
} }

View file

@ -3,6 +3,10 @@ use routing_table::RoutingTableHealth;
impl_veilid_log_facility!("attach"); impl_veilid_log_facility!("attach");
const TICK_INTERVAL_MSEC: u32 = 1000;
const ATTACHMENT_MAINTAINER_INTERVAL_MSEC: u32 = 1000;
const BIND_WAIT_DELAY_MSEC: u32 = 10000;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AttachmentManagerStartupContext { pub struct AttachmentManagerStartupContext {
pub startup_lock: Arc<StartupLock>, pub startup_lock: Arc<StartupLock>,
@ -20,21 +24,56 @@ impl Default for AttachmentManagerStartupContext {
} }
} }
#[derive(Debug)] /// Event sent every second while veilid-core is initialized
struct AttachmentManagerInner { #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
last_attachment_state: AttachmentState, pub struct TickEvent {
last_routing_table_health: Option<Arc<RoutingTableHealth>>, pub last_tick_ts: Option<Timestamp>,
maintain_peers: bool, pub cur_tick_ts: Timestamp,
started_ts: Timestamp, }
attach_ts: Option<Timestamp>,
attachment_maintainer_jh: Option<MustJoinHandle<()>>, struct AttachmentManagerInner {
attachment_state: AttachmentState,
last_routing_table_health: Option<Arc<RoutingTableHealth>>,
maintain_peers: bool,
attach_enabled: bool,
started_ts: Timestamp,
attach_ts: Option<Timestamp>,
last_tick_ts: Option<Timestamp>,
tick_future: Option<PinBoxFutureStatic<()>>,
eventual_termination: Option<EventualValue<()>>,
}
impl fmt::Debug for AttachmentManagerInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AttachmentManagerInner")
.field("attachment_state", &self.attachment_state)
.field("last_routing_table_health", &self.last_routing_table_health)
.field("maintain_peers", &self.maintain_peers)
.field("attach_enabled", &self.attach_enabled)
.field("started_ts", &self.started_ts)
.field("attach_ts", &self.attach_ts)
.field("last_tick_ts", &self.last_tick_ts)
.field("eventual_termination", &self.eventual_termination)
.finish()
}
} }
#[derive(Debug)]
pub struct AttachmentManager { pub struct AttachmentManager {
registry: VeilidComponentRegistry, registry: VeilidComponentRegistry,
inner: Mutex<AttachmentManagerInner>, inner: Mutex<AttachmentManagerInner>,
startup_context: AttachmentManagerStartupContext, startup_context: AttachmentManagerStartupContext,
attachment_maintainer_task: TickTask<EyreReport>,
}
impl fmt::Debug for AttachmentManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AttachmentManager")
// .field("registry", &self.registry)
.field("inner", &self.inner)
.field("startup_context", &self.startup_context)
// .field("attachment_maintainer_task", &self.attachment_maintainer_task)
.finish()
}
} }
impl_veilid_component!(AttachmentManager); impl_veilid_component!(AttachmentManager);
@ -42,12 +81,15 @@ impl_veilid_component!(AttachmentManager);
impl AttachmentManager { impl AttachmentManager {
fn new_inner() -> AttachmentManagerInner { fn new_inner() -> AttachmentManagerInner {
AttachmentManagerInner { AttachmentManagerInner {
last_attachment_state: AttachmentState::Detached, attachment_state: AttachmentState::Detached,
last_routing_table_health: None, last_routing_table_health: None,
maintain_peers: false, maintain_peers: false,
attach_enabled: false,
started_ts: Timestamp::now(), started_ts: Timestamp::now(),
attach_ts: None, attach_ts: None,
attachment_maintainer_jh: None, last_tick_ts: None,
tick_future: None,
eventual_termination: None,
} }
} }
pub fn new( pub fn new(
@ -58,18 +100,20 @@ impl AttachmentManager {
registry, registry,
inner: Mutex::new(Self::new_inner()), inner: Mutex::new(Self::new_inner()),
startup_context, startup_context,
attachment_maintainer_task: TickTask::new_ms(
"attachment_maintainer_task",
ATTACHMENT_MAINTAINER_INTERVAL_MSEC,
),
} }
} }
pub fn is_attached(&self) -> bool { pub fn is_attached(&self) -> bool {
let s = self.inner.lock().last_attachment_state; self.inner.lock().attachment_state.is_attached()
!matches!(s, AttachmentState::Detached | AttachmentState::Detaching)
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn is_detached(&self) -> bool { pub fn is_detached(&self) -> bool {
let s = self.inner.lock().last_attachment_state; self.inner.lock().attachment_state.is_detached()
matches!(s, AttachmentState::Detached)
} }
#[allow(dead_code)] #[allow(dead_code)]
@ -77,128 +121,242 @@ impl AttachmentManager {
self.inner.lock().attach_ts self.inner.lock().attach_ts
} }
fn translate_routing_table_health( #[instrument(level = "debug", skip_all, err)]
health: &RoutingTableHealth, pub async fn init_async(&self) -> EyreResult<()> {
config: &VeilidConfigRoutingTable, let guard = self.startup_context.startup_lock.startup()?;
) -> AttachmentState { guard.success();
if health.reliable_entry_count Ok(())
>= TryInto::<usize>::try_into(config.limit_over_attached).unwrap()
{
return AttachmentState::OverAttached;
}
if health.reliable_entry_count
>= TryInto::<usize>::try_into(config.limit_fully_attached).unwrap()
{
return AttachmentState::FullyAttached;
}
if health.reliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_strong).unwrap()
{
return AttachmentState::AttachedStrong;
}
if health.reliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_good).unwrap()
{
return AttachmentState::AttachedGood;
}
if health.reliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_weak).unwrap()
|| health.unreliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_weak).unwrap()
{
return AttachmentState::AttachedWeak;
}
AttachmentState::Attaching
} }
/// Update attachment and network readiness state #[instrument(level = "debug", skip_all, err)]
/// and possibly send a VeilidUpdate::Attachment. pub async fn post_init_async(&self) -> EyreResult<()> {
fn update_attachment(&self) { let registry = self.registry();
// update the routing table health
let routing_table = self.network_manager().routing_table();
let health = routing_table.get_routing_table_health();
let opt_update = {
let mut inner = self.inner.lock();
// Check if the routing table health is different veilid_log!(self debug "starting attachment maintainer task");
if let Some(last_routing_table_health) = &inner.last_routing_table_health { impl_setup_task!(
// If things are the same, just return self,
if last_routing_table_health.as_ref() == &health { Self,
return; attachment_maintainer_task,
attachment_maintainer_task_routine
);
// Create top level tick interval
let tick_future = interval(
"attachment maintainer tick",
TICK_INTERVAL_MSEC,
move || {
let registry = registry.clone();
async move {
let this = registry.attachment_manager();
if let Err(e) = this.tick().await {
veilid_log!(this warn "attachment maintainer tick failed: {}", e);
}
}
},
);
{
let mut inner = self.inner.lock();
inner.tick_future = Some(tick_future);
// Enable attachment now
inner.attach_enabled = true;
}
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub async fn pre_terminate_async(&self) {
{
let mut inner = self.inner.lock();
// Disable attachment now
// Will cause attachment maintainer to drive the state toward 'Detached'
inner.attach_enabled = false;
}
// Wait for detached state
while !matches!(
self.inner.lock().attachment_state,
AttachmentState::Detached
) {
sleep(500).await;
}
// Stop ticker
let tick_future = self.inner.lock().tick_future.take();
if let Some(tick_future) = tick_future {
tick_future.await;
}
// Stop background operations
veilid_log!(self debug "stopping attachment maintainer task");
if let Err(e) = self.attachment_maintainer_task.stop().await {
veilid_log!(self warn "attachment_maintainer not stopped: {}", e);
}
}
#[instrument(level = "debug", skip_all)]
pub async fn terminate_async(&self) {
let guard = self
.startup_context
.startup_lock
.shutdown()
.await
.expect("should be initialized");
// Shutdown successful
guard.success();
}
#[instrument(level = "trace", skip_all)]
pub async fn attach(&self) -> bool {
let Ok(_guard) = self.startup_context.startup_lock.enter() else {
return false;
};
let mut inner = self.inner.lock();
// If attaching is disabled (because we are terminating)
// then just return now
if !inner.attach_enabled {
return false;
}
let previous = inner.maintain_peers;
inner.maintain_peers = true;
previous != inner.maintain_peers
}
#[instrument(level = "trace", skip_all)]
pub async fn detach(&self) -> bool {
let Ok(_guard) = self.startup_context.startup_lock.enter() else {
return false;
};
{
let mut inner = self.inner.lock();
let previous = inner.maintain_peers;
if !previous {
// Already detached or detaching
return false;
}
// Wants to be detached
inner.maintain_peers = false;
}
true
}
/////////////////////////////////////////////////////////////////////////////
async fn tick(&self) -> EyreResult<()> {
let cur_tick_ts = Timestamp::now();
let last_tick_ts = {
let mut inner = self.inner.lock();
let last_tick_ts = inner.last_tick_ts;
inner.last_tick_ts = Some(cur_tick_ts);
last_tick_ts
};
// Log if we're seeing missed ticks
if let Some(lag) = last_tick_ts.map(|x| cur_tick_ts.saturating_sub(x)) {
if lag > TimestampDuration::new_ms(2 * (TICK_INTERVAL_MSEC as u64)) {
veilid_log!(self debug "tick lag: {}", lag);
}
}
// Tick our own ticktask for the attachment maintainer state machine
self.attachment_maintainer_task.tick().await?;
// Send a 'tick' event for the rest of the system to get ticks
let event_bus = self.event_bus();
event_bus.post(TickEvent {
last_tick_ts,
cur_tick_ts,
})?;
Ok(())
}
// Manage attachment state
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn attachment_maintainer_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let (state, maintain_peers, attach_enabled) = {
let inner = self.inner.lock();
(
inner.attachment_state,
inner.maintain_peers,
inner.attach_enabled,
)
};
let next_state = match state {
AttachmentState::Detached => {
if maintain_peers && attach_enabled {
veilid_log!(self debug "attachment starting");
match self.startup().await {
Err(err) => {
error!("attachment startup failed: {}", err);
None
}
Ok(StartupDisposition::BindRetry) => {
veilid_log!(self info "waiting for network to bind...");
sleep(BIND_WAIT_DELAY_MSEC).await;
None
}
Ok(StartupDisposition::Success) => {
veilid_log!(self debug "started maintaining peers");
self.update_non_attached_state(AttachmentState::Attaching);
Some(AttachmentState::Attaching)
}
}
} else {
None
} }
} }
AttachmentState::Attaching
| AttachmentState::AttachedWeak
| AttachmentState::AttachedGood
| AttachmentState::AttachedStrong
| AttachmentState::FullyAttached
| AttachmentState::OverAttached => {
if maintain_peers && attach_enabled {
let network_manager = self.network_manager();
if network_manager.network_needs_restart() {
veilid_log!(self info "Restarting network");
self.update_non_attached_state(AttachmentState::Detaching);
Some(AttachmentState::Detaching)
} else {
self.update_attached_state(state)
}
} else {
veilid_log!(self debug "stopped maintaining peers");
Some(AttachmentState::Detaching)
}
}
AttachmentState::Detaching => {
veilid_log!(self debug "shutting down attachment");
self.shutdown().await;
// Swap in new health numbers self.update_non_attached_state(AttachmentState::Detached);
let opt_previous_health = inner.last_routing_table_health.take(); Some(AttachmentState::Detached)
inner.last_routing_table_health = Some(Arc::new(health.clone()));
// Calculate new attachment state
let config = self.config();
let routing_table_config = &config.get().network.routing_table;
let previous_attachment_state = inner.last_attachment_state;
inner.last_attachment_state =
AttachmentManager::translate_routing_table_health(&health, routing_table_config);
// Send update if one of:
// * the attachment state has changed
// * routing domain readiness has changed
// * this is our first routing table health check
let send_update = previous_attachment_state != inner.last_attachment_state
|| opt_previous_health
.map(|x| {
x.public_internet_ready != health.public_internet_ready
|| x.local_network_ready != health.local_network_ready
})
.unwrap_or(true);
if send_update {
Some(Self::get_veilid_state_inner(&inner))
} else {
None
} }
}; };
// Send the update outside of the lock // Transition to next state
if let Some(update) = opt_update { if let Some(next_state) = next_state {
(self.update_callback())(VeilidUpdate::Attachment(update));
}
}
fn update_attaching_detaching_state(&self, state: AttachmentState) {
let uptime;
let attached_uptime;
{
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.attachment_state = next_state;
}
// Clear routing table health so when we start measuring it we start from scratch Ok(())
inner.last_routing_table_health = None;
// Set attachment state directly
inner.last_attachment_state = state;
// Set timestamps
if state == AttachmentState::Attaching {
inner.attach_ts = Some(Timestamp::now());
} else if state == AttachmentState::Detached {
inner.attach_ts = None;
} else if state == AttachmentState::Detaching {
// ok
} else {
unreachable!("don't use this for attached states, use update_attachment()");
}
let now = Timestamp::now();
uptime = now - inner.started_ts;
attached_uptime = inner.attach_ts.map(|ts| now - ts);
};
// Send update
(self.update_callback())(VeilidUpdate::Attachment(Box::new(VeilidStateAttachment {
state,
public_internet_ready: false,
local_network_ready: false,
uptime,
attached_uptime,
})))
} }
async fn startup(&self) -> EyreResult<StartupDisposition> { async fn startup(&self) -> EyreResult<StartupDisposition> {
@ -257,174 +415,136 @@ impl AttachmentManager {
network_manager.send_network_update(); network_manager.send_network_update();
} }
async fn tick(&self) -> EyreResult<()> { fn translate_routing_table_health(
// Run the network manager tick health: &RoutingTableHealth,
let network_manager = self.network_manager(); config: &VeilidConfigRoutingTable,
network_manager.tick().await?; ) -> AttachmentState {
if health.reliable_entry_count
// Run the routing table tick >= TryInto::<usize>::try_into(config.limit_over_attached).unwrap()
let routing_table = self.routing_table(); {
routing_table.tick().await?; return AttachmentState::OverAttached;
Ok(())
}
#[instrument(parent = None, level = "debug", skip_all)]
async fn attachment_maintainer(&self) {
veilid_log!(self debug "attachment starting");
self.update_attaching_detaching_state(AttachmentState::Attaching);
let network_manager = self.network_manager();
let mut restart;
let mut restart_delay;
while self.inner.lock().maintain_peers {
restart = false;
restart_delay = 1;
match self.startup().await {
Err(err) => {
error!("attachment startup failed: {}", err);
restart = true;
}
Ok(StartupDisposition::BindRetry) => {
veilid_log!(self info "waiting for network to bind...");
restart = true;
restart_delay = 10;
}
Ok(StartupDisposition::Success) => {
veilid_log!(self debug "started maintaining peers");
while self.inner.lock().maintain_peers {
// tick network manager
let next_tick_ts = get_timestamp() + 1_000_000u64;
if let Err(err) = self.tick().await {
error!("Error in attachment tick: {}", err);
self.inner.lock().maintain_peers = false;
restart = true;
break;
}
// see if we need to restart the network
if network_manager.network_needs_restart() {
veilid_log!(self info "Restarting network");
restart = true;
break;
}
// Update attachment and network readiness state
// and possibly send a VeilidUpdate::Attachment
self.update_attachment();
// sleep should be at the end in case maintain_peers changes state
let wait_duration = next_tick_ts
.saturating_sub(get_timestamp())
.clamp(0, 1_000_000u64);
sleep((wait_duration / 1_000) as u32).await;
}
veilid_log!(self debug "stopped maintaining peers");
if !restart {
self.update_attaching_detaching_state(AttachmentState::Detaching);
veilid_log!(self debug "attachment stopping");
}
veilid_log!(self debug "shutting down attachment");
self.shutdown().await;
}
}
if !restart {
break;
}
veilid_log!(self debug "completely restarting attachment");
// chill out for a second first, give network stack time to settle out
for _ in 0..restart_delay {
if !self.inner.lock().maintain_peers {
break;
}
sleep(1000).await;
}
} }
if health.reliable_entry_count
self.update_attaching_detaching_state(AttachmentState::Detached); >= TryInto::<usize>::try_into(config.limit_fully_attached).unwrap()
veilid_log!(self debug "attachment stopped"); {
} return AttachmentState::FullyAttached;
#[instrument(level = "debug", skip_all, err)]
pub async fn init_async(&self) -> EyreResult<()> {
let guard = self.startup_context.startup_lock.startup()?;
guard.success();
Ok(())
}
#[instrument(level = "debug", skip_all, err)]
pub async fn post_init_async(&self) -> EyreResult<()> {
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub async fn pre_terminate_async(&self) {
// Ensure we detached
self.detach().await;
}
#[instrument(level = "debug", skip_all)]
pub async fn terminate_async(&self) {
let guard = self
.startup_context
.startup_lock
.shutdown()
.await
.expect("should be initialized");
// Shutdown successful
guard.success();
}
#[instrument(level = "trace", skip_all)]
pub async fn attach(&self) -> bool {
let Ok(_guard) = self.startup_context.startup_lock.enter() else {
return false;
};
// Create long-running connection maintenance routine
let mut inner = self.inner.lock();
if inner.attachment_maintainer_jh.is_some() {
return false;
} }
inner.maintain_peers = true; if health.reliable_entry_count
let registry = self.registry(); >= TryInto::<usize>::try_into(config.limit_attached_strong).unwrap()
inner.attachment_maintainer_jh = Some(spawn("attachment maintainer", async move { {
let this = registry.attachment_manager(); return AttachmentState::AttachedStrong;
this.attachment_maintainer().await; }
})); if health.reliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_good).unwrap()
true {
return AttachmentState::AttachedGood;
}
if health.reliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_weak).unwrap()
|| health.unreliable_entry_count
>= TryInto::<usize>::try_into(config.limit_attached_weak).unwrap()
{
return AttachmentState::AttachedWeak;
}
AttachmentState::Attaching
} }
#[instrument(level = "trace", skip_all)] /// Update attachment and network readiness state
pub async fn detach(&self) -> bool { /// and possibly send a VeilidUpdate::Attachment.
let Ok(_guard) = self.startup_context.startup_lock.enter() else { fn update_attached_state(
return false; &self,
}; current_attachment_state: AttachmentState,
) -> Option<AttachmentState> {
let attachment_maintainer_jh = { // update the routing table health
let routing_table = self.network_manager().routing_table();
let health = routing_table.get_routing_table_health();
let (opt_update, opt_next_attachment_state) = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let attachment_maintainer_jh = inner.attachment_maintainer_jh.take();
if attachment_maintainer_jh.is_some() { // Check if the routing table health is different
// Terminate long-running connection maintenance routine if let Some(last_routing_table_health) = &inner.last_routing_table_health {
inner.maintain_peers = false; // If things are the same, just return
if last_routing_table_health.as_ref() == &health {
return None;
}
} }
attachment_maintainer_jh
// Swap in new health numbers
let opt_previous_health = inner.last_routing_table_health.take();
inner.last_routing_table_health = Some(Arc::new(health.clone()));
// Calculate new attachment state
let config = self.config();
let routing_table_config = &config.get().network.routing_table;
let next_attachment_state =
AttachmentManager::translate_routing_table_health(&health, routing_table_config);
// Send update if one of:
// * the attachment state has changed
// * routing domain readiness has changed
// * this is our first routing table health check
let send_update = current_attachment_state != next_attachment_state
|| opt_previous_health
.map(|x| {
x.public_internet_ready != health.public_internet_ready
|| x.local_network_ready != health.local_network_ready
})
.unwrap_or(true);
let opt_update = if send_update {
Some(Self::get_veilid_state_inner(&inner))
} else {
None
};
let opt_next_attachment_state = if current_attachment_state != next_attachment_state {
Some(next_attachment_state)
} else {
None
};
(opt_update, opt_next_attachment_state)
}; };
if let Some(jh) = attachment_maintainer_jh {
jh.await; // Send the update outside of the lock
true if let Some(update) = opt_update {
} else { (self.update_callback())(VeilidUpdate::Attachment(update));
false
} }
opt_next_attachment_state
}
fn update_non_attached_state(&self, current_attachment_state: AttachmentState) {
let uptime;
let attached_uptime;
{
let mut inner = self.inner.lock();
// Clear routing table health so when we start measuring it we start from scratch
inner.last_routing_table_health = None;
// Set timestamps
if current_attachment_state == AttachmentState::Attaching {
inner.attach_ts = Some(Timestamp::now());
} else if current_attachment_state == AttachmentState::Detached {
inner.attach_ts = None;
} else if current_attachment_state == AttachmentState::Detaching {
// ok
} else {
unreachable!("don't use this for attached states, use update_attached_state()");
}
let now = Timestamp::now();
uptime = now - inner.started_ts;
attached_uptime = inner.attach_ts.map(|ts| now - ts);
};
// Send update
(self.update_callback())(VeilidUpdate::Attachment(Box::new(VeilidStateAttachment {
state: current_attachment_state,
public_internet_ready: false,
local_network_ready: false,
uptime,
attached_uptime,
})))
} }
fn get_veilid_state_inner(inner: &AttachmentManagerInner) -> Box<VeilidStateAttachment> { fn get_veilid_state_inner(inner: &AttachmentManagerInner) -> Box<VeilidStateAttachment> {
@ -433,7 +553,7 @@ impl AttachmentManager {
let attached_uptime = inner.attach_ts.map(|ts| now - ts); let attached_uptime = inner.attach_ts.map(|ts| now - ts);
Box::new(VeilidStateAttachment { Box::new(VeilidStateAttachment {
state: inner.last_attachment_state, state: inner.attachment_state,
public_internet_ready: inner public_internet_ready: inner
.last_routing_table_health .last_routing_table_health
.as_ref() .as_ref()
@ -456,7 +576,7 @@ impl AttachmentManager {
#[expect(dead_code)] #[expect(dead_code)]
pub fn get_attachment_state(&self) -> AttachmentState { pub fn get_attachment_state(&self) -> AttachmentState {
self.inner.lock().last_attachment_state self.inner.lock().attachment_state
} }
#[expect(dead_code)] #[expect(dead_code)]

View file

@ -2,7 +2,7 @@ use crate::attachment_manager::{AttachmentManager, AttachmentManagerStartupConte
use crate::crypto::Crypto; use crate::crypto::Crypto;
use crate::logging::*; use crate::logging::*;
use crate::network_manager::{NetworkManager, NetworkManagerStartupContext}; use crate::network_manager::{NetworkManager, NetworkManagerStartupContext};
use crate::routing_table::RoutingTable; use crate::routing_table::{RoutingTable, RoutingTableStartupContext};
use crate::rpc_processor::{RPCProcessor, RPCProcessorStartupContext}; use crate::rpc_processor::{RPCProcessor, RPCProcessorStartupContext};
use crate::storage_manager::StorageManager; use crate::storage_manager::StorageManager;
use crate::veilid_api::*; use crate::veilid_api::*;
@ -81,8 +81,8 @@ impl VeilidCoreContext {
registry.register(TableStore::new); registry.register(TableStore::new);
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
registry.register(BlockStore::new); registry.register(BlockStore::new);
registry.register_with_context(RoutingTable::new, RoutingTableStartupContext::default());
registry.register(StorageManager::new); registry.register(StorageManager::new);
registry.register(RoutingTable::new);
registry registry
.register_with_context(NetworkManager::new, NetworkManagerStartupContext::default()); .register_with_context(NetworkManager::new, NetworkManagerStartupContext::default());
registry.register_with_context(RPCProcessor::new, RPCProcessorStartupContext::default()); registry.register_with_context(RPCProcessor::new, RPCProcessorStartupContext::default());

View file

@ -110,13 +110,13 @@ pub fn veilid_version() -> (u32, u32, u32) {
) )
} }
#[cfg(all(not(docsrs), not(doc)))] #[cfg(not(docsrs))]
include!(env!("BOSION_PATH")); include!(env!("BOSION_PATH"));
/// Return the features that were enabled when veilid-core was built. /// Return the features that were enabled when veilid-core was built.
#[must_use] #[must_use]
pub fn veilid_features() -> Vec<String> { pub fn veilid_features() -> Vec<String> {
if cfg!(any(docsrs, doc)) { if cfg!(docsrs) {
vec!["default".to_string()] vec!["default".to_string()]
} else { } else {
let features = Bosion::CRATE_FEATURES.to_vec(); let features = Bosion::CRATE_FEATURES.to_vec();

View file

@ -196,6 +196,7 @@ struct NetworkManagerInner {
client_allowlist: LruCache<NodeId, ClientAllowlistEntry>, client_allowlist: LruCache<NodeId, ClientAllowlistEntry>,
node_contact_method_cache: NodeContactMethodCache, node_contact_method_cache: NodeContactMethodCache,
address_check: Option<AddressCheck>, address_check: Option<AddressCheck>,
tick_subscription: Option<EventBusSubscription>,
peer_info_change_subscription: Option<EventBusSubscription>, peer_info_change_subscription: Option<EventBusSubscription>,
socket_address_change_subscription: Option<EventBusSubscription>, socket_address_change_subscription: Option<EventBusSubscription>,
@ -256,6 +257,7 @@ impl NetworkManager {
client_allowlist: LruCache::new_unbounded(), client_allowlist: LruCache::new_unbounded(),
node_contact_method_cache: NodeContactMethodCache::new(), node_contact_method_cache: NodeContactMethodCache::new(),
address_check: None, address_check: None,
tick_subscription: None,
peer_info_change_subscription: None, peer_info_change_subscription: None,
socket_address_change_subscription: None, socket_address_change_subscription: None,
txt_lookup_cache: LruCache::new(TXT_LOOKUP_CACHE_SIZE), txt_lookup_cache: LruCache::new(TXT_LOOKUP_CACHE_SIZE),
@ -394,7 +396,13 @@ impl NetworkManager {
} }
#[expect(clippy::unused_async)] #[expect(clippy::unused_async)]
async fn pre_terminate_async(&self) {} async fn pre_terminate_async(&self) {
// Ensure things have shut down
assert!(
self.startup_context.startup_lock.is_shut_down(),
"should have shut down by now"
);
}
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
async fn terminate_async(&self) {} async fn terminate_async(&self) {}
@ -423,19 +431,11 @@ impl NetworkManager {
// Startup relay workers // Startup relay workers
self.startup_relay_workers()?; self.startup_relay_workers()?;
// Register event handlers // Set up address filter
let peer_info_change_subscription =
impl_subscribe_event_bus!(self, Self, peer_info_change_event_handler);
let socket_address_change_subscription =
impl_subscribe_event_bus!(self, Self, socket_address_change_event_handler);
{ {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let address_check = AddressCheck::new(net.clone()); let address_check = AddressCheck::new(net.clone());
inner.address_check = Some(address_check); inner.address_check = Some(address_check);
inner.peer_info_change_subscription = Some(peer_info_change_subscription);
inner.socket_address_change_subscription = Some(socket_address_change_subscription);
} }
// Start network components // Start network components
@ -449,6 +449,22 @@ impl NetworkManager {
receipt_manager.startup()?; receipt_manager.startup()?;
// Register event handlers
let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
let peer_info_change_subscription =
impl_subscribe_event_bus!(self, Self, peer_info_change_event_handler);
let socket_address_change_subscription =
impl_subscribe_event_bus!(self, Self, socket_address_change_event_handler);
{
let mut inner = self.inner.lock();
inner.tick_subscription = Some(tick_subscription);
inner.peer_info_change_subscription = Some(peer_info_change_subscription);
inner.socket_address_change_subscription = Some(socket_address_change_subscription);
}
veilid_log!(self trace "NetworkManager::internal_startup end"); veilid_log!(self trace "NetworkManager::internal_startup end");
Ok(StartupDisposition::Success) Ok(StartupDisposition::Success)
@ -479,13 +495,15 @@ impl NetworkManager {
// Shutdown event bus subscriptions and address check // Shutdown event bus subscriptions and address check
{ {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if let Some(sub) = inner.tick_subscription.take() {
self.event_bus().unsubscribe(sub);
}
if let Some(sub) = inner.socket_address_change_subscription.take() { if let Some(sub) = inner.socket_address_change_subscription.take() {
self.event_bus().unsubscribe(sub); self.event_bus().unsubscribe(sub);
} }
if let Some(sub) = inner.peer_info_change_subscription.take() { if let Some(sub) = inner.peer_info_change_subscription.take() {
self.event_bus().unsubscribe(sub); self.event_bus().unsubscribe(sub);
} }
inner.address_check = None;
} }
// Shutdown relay workers // Shutdown relay workers
@ -498,6 +516,12 @@ impl NetworkManager {
let components = self.components.read().clone(); let components = self.components.read().clone();
if let Some(components) = components { if let Some(components) = components {
components.net.shutdown().await; components.net.shutdown().await;
{
let mut inner = self.inner.lock();
inner.address_check = None;
}
components.receipt_manager.shutdown().await; components.receipt_manager.shutdown().await;
components.connection_manager.shutdown().await; components.connection_manager.shutdown().await;
} }
@ -1093,7 +1117,7 @@ impl NetworkManager {
if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) { if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) {
veilid_log!(self debug veilid_log!(self debug
"Timestamp behind: {}ms ({})", "Timestamp behind: {}ms ({})",
timestamp_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64, timestamp_duration_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64,
flow.remote() flow.remote()
); );
return Ok(false); return Ok(false);
@ -1103,7 +1127,7 @@ impl NetworkManager {
if tsahead.as_u64() != 0 && (ts < ets && ets.saturating_sub(ts) > tsahead) { if tsahead.as_u64() != 0 && (ts < ets && ets.saturating_sub(ts) > tsahead) {
veilid_log!(self debug veilid_log!(self debug
"Timestamp ahead: {}ms ({})", "Timestamp ahead: {}ms ({})",
timestamp_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64, timestamp_duration_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64,
flow.remote() flow.remote()
); );
return Ok(false); return Ok(false);

View file

@ -139,12 +139,12 @@ impl Network {
if let Err(e) = set_tcp_stream_linger(&tcp_stream, Some(core::time::Duration::from_secs(0))) if let Err(e) = set_tcp_stream_linger(&tcp_stream, Some(core::time::Duration::from_secs(0)))
{ {
veilid_log!(self debug "Couldn't set TCP linger: {}", e); veilid_log!(self debug "Couldn't set TCP linger: {} on {:?} -> {:?}", e, peer_addr, local_addr);
return; return;
} }
if let Err(e) = tcp_stream.set_nodelay(true) { if let Err(e) = tcp_stream.set_nodelay(true) {
veilid_log!(self debug "Couldn't set TCP nodelay: {}", e); veilid_log!(self debug "Couldn't set TCP nodelay: {} on {:?} -> {:?}", e, peer_addr, local_addr);
return; return;
} }

View file

@ -1,6 +1,6 @@
pub mod rolling_transfers; pub mod rolling_transfers;
use super::*; use super::*;
use crate::attachment_manager::TickEvent;
impl NetworkManager { impl NetworkManager {
pub fn setup_tasks(&self) { pub fn setup_tasks(&self) {
@ -28,8 +28,14 @@ impl NetworkManager {
} }
} }
pub async fn tick_event_handler(&self, _evt: Arc<TickEvent>) {
if let Err(e) = self.tick().await {
error!("Error in network manager tick: {}", e);
}
}
#[instrument(level = "trace", name = "NetworkManager::tick", skip_all, err)] #[instrument(level = "trace", name = "NetworkManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> { async fn tick(&self) -> EyreResult<()> {
let net = self.net(); let net = self.net();
let receipt_manager = self.receipt_manager(); let receipt_manager = self.receipt_manager();

View file

@ -1094,7 +1094,9 @@ impl BucketEntryInner {
{ {
format!( format!(
"{}s ago", "{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts).as_u64()) timestamp_duration_to_secs(
cur_ts.saturating_sub(first_consecutive_seen_ts).as_u64()
)
) )
} else { } else {
"never".to_owned() "never".to_owned()
@ -1102,7 +1104,7 @@ impl BucketEntryInner {
let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts { let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts {
format!( format!(
"{}s ago", "{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts).as_u64()) timestamp_duration_to_secs(cur_ts.saturating_sub(last_seen_ts).as_u64())
) )
} else { } else {
"never".to_owned() "never".to_owned()

View file

@ -102,11 +102,31 @@ pub struct RecentPeersEntry {
pub last_connection: Flow, pub last_connection: Flow,
} }
#[derive(Debug, Clone)]
pub struct RoutingTableStartupContext {
pub startup_lock: Arc<StartupLock>,
}
impl RoutingTableStartupContext {
pub fn new() -> Self {
Self {
startup_lock: Arc::new(StartupLock::new()),
}
}
}
impl Default for RoutingTableStartupContext {
fn default() -> Self {
Self::new()
}
}
#[must_use] #[must_use]
pub(crate) struct RoutingTable { pub(crate) struct RoutingTable {
registry: VeilidComponentRegistry, registry: VeilidComponentRegistry,
inner: RwLock<RoutingTableInner>, inner: RwLock<RoutingTableInner>,
// Startup context
startup_context: RoutingTableStartupContext,
/// Node Ids /// Node Ids
node_ids: RwLock<NodeIdGroup>, node_ids: RwLock<NodeIdGroup>,
/// Route spec store /// Route spec store
@ -141,6 +161,8 @@ pub(crate) struct RoutingTable {
relay_management_task: TickTask<EyreReport>, relay_management_task: TickTask<EyreReport>,
/// Background process to keep private routes up /// Background process to keep private routes up
private_route_management_task: TickTask<EyreReport>, private_route_management_task: TickTask<EyreReport>,
/// Tick subscription
tick_subscription: Mutex<Option<EventBusSubscription>>,
} }
impl fmt::Debug for RoutingTable { impl fmt::Debug for RoutingTable {
@ -155,7 +177,10 @@ impl fmt::Debug for RoutingTable {
impl_veilid_component!(RoutingTable); impl_veilid_component!(RoutingTable);
impl RoutingTable { impl RoutingTable {
pub fn new(registry: VeilidComponentRegistry) -> Self { pub fn new(
registry: VeilidComponentRegistry,
startup_context: RoutingTableStartupContext,
) -> Self {
let config = registry.config(); let config = registry.config();
let c = config.get(); let c = config.get();
let inner = RwLock::new(RoutingTableInner::new(registry.clone())); let inner = RwLock::new(RoutingTableInner::new(registry.clone()));
@ -163,6 +188,7 @@ impl RoutingTable {
let this = Self { let this = Self {
registry, registry,
inner, inner,
startup_context,
node_ids: RwLock::new(NodeIdGroup::new()), node_ids: RwLock::new(NodeIdGroup::new()),
route_spec_store, route_spec_store,
kick_queue: Mutex::new(BTreeSet::default()), kick_queue: Mutex::new(BTreeSet::default()),
@ -207,6 +233,7 @@ impl RoutingTable {
"private_route_management_task", "private_route_management_task",
PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS, PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS,
), ),
tick_subscription: Mutex::new(None),
}; };
this.setup_tasks(); this.setup_tasks();
@ -267,6 +294,14 @@ impl RoutingTable {
#[expect(clippy::unused_async)] #[expect(clippy::unused_async)]
pub(crate) async fn startup(&self) -> EyreResult<()> { pub(crate) async fn startup(&self) -> EyreResult<()> {
let guard = self.startup_context.startup_lock.startup()?;
// Register event handlers
let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
*self.tick_subscription.lock() = Some(tick_subscription);
guard.success();
Ok(()) Ok(())
} }
@ -275,6 +310,17 @@ impl RoutingTable {
veilid_log!(self debug "stopping routing table tasks"); veilid_log!(self debug "stopping routing table tasks");
self.cancel_tasks().await; self.cancel_tasks().await;
let guard = self
.startup_context
.startup_lock
.shutdown()
.await
.expect("should be started up");
if let Some(sub) = self.tick_subscription.lock().take() {
self.event_bus().unsubscribe(sub);
}
// Unpublish peer info // Unpublish peer info
veilid_log!(self debug "unpublishing peer info"); veilid_log!(self debug "unpublishing peer info");
{ {
@ -283,10 +329,18 @@ impl RoutingTable {
inner.unpublish_peer_info(routing_domain); inner.unpublish_peer_info(routing_domain);
} }
} }
guard.success();
} }
#[expect(clippy::unused_async)] #[expect(clippy::unused_async)]
async fn pre_terminate_async(&self) {} async fn pre_terminate_async(&self) {
// Ensure things have shut down
assert!(
self.startup_context.startup_lock.is_shut_down(),
"should have shut down by now"
);
}
/// Called to shut down the routing table /// Called to shut down the routing table
async fn terminate_async(&self) { async fn terminate_async(&self) {

View file

@ -8,6 +8,8 @@ pub mod private_route_management;
pub mod relay_management; pub mod relay_management;
pub mod update_statistics; pub mod update_statistics;
use crate::attachment_manager::TickEvent;
use super::*; use super::*;
impl_veilid_log_facility!("rtab"); impl_veilid_log_facility!("rtab");
@ -112,6 +114,12 @@ impl RoutingTable {
); );
} }
pub async fn tick_event_handler(&self, _evt: Arc<TickEvent>) {
if let Err(e) = self.tick().await {
error!("Error in routing table tick: {}", e);
}
}
/// Ticks about once per second /// Ticks about once per second
/// to run tick tasks which may run at slower tick rates as configured /// to run tick tasks which may run at slower tick rates as configured
#[instrument(level = "trace", name = "RoutingTable::tick", skip_all, err)] #[instrument(level = "trace", name = "RoutingTable::tick", skip_all, err)]

View file

@ -18,9 +18,9 @@ pub mod mock_registry {
registry.register(TableStore::new); registry.register(TableStore::new);
registry.register(Crypto::new); registry.register(Crypto::new);
registry.register(StorageManager::new); registry.register(StorageManager::new);
registry.register(RoutingTable::new); registry.register_with_context(RoutingTable::new, RoutingTableStartupContext::default());
let startup_context = NetworkManagerStartupContext::default(); registry
registry.register_with_context(NetworkManager::new, startup_context); .register_with_context(NetworkManager::new, NetworkManagerStartupContext::default());
registry.init().await.expect("should init"); registry.init().await.expect("should init");
registry.post_init().await.expect("should post init"); registry.post_init().await.expect("should post init");

View file

@ -12,6 +12,8 @@ mod tasks;
mod types; mod types;
mod watch_value; mod watch_value;
use crate::attachment_manager::TickEvent;
use super::*; use super::*;
use hashlink::LinkedHashMap; use hashlink::LinkedHashMap;
@ -59,6 +61,8 @@ const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1;
const REHYDRATE_RECORDS_INTERVAL_SECS: u32 = 1; const REHYDRATE_RECORDS_INTERVAL_SECS: u32 = 1;
/// Number of rehydration requests to process in parallel /// Number of rehydration requests to process in parallel
const REHYDRATE_BATCH_SIZE: usize = 16; const REHYDRATE_BATCH_SIZE: usize = 16;
/// Maximum 'offline lag' before we decide to poll for changed watches
const CHANGE_INSPECT_LAG_SECS: u32 = 2;
/// Table store table for storage manager metadata /// Table store table for storage manager metadata
const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata";
/// Storage manager metadata key name for offline subkey write persistence /// Storage manager metadata key name for offline subkey write persistence
@ -99,10 +103,10 @@ struct StorageManagerInner {
pub outbound_watch_manager: OutboundWatchManager, pub outbound_watch_manager: OutboundWatchManager,
/// Storage manager metadata that is persistent, including copy of offline subkey writes /// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>, pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too) /// Peer info change subscription
pub tick_future: Option<PinBoxFutureStatic<()>>, pub peer_info_change_subscription: Option<EventBusSubscription>,
/// PeerInfo subscription /// Tick subscription
peer_info_change_subscription: Option<EventBusSubscription>, pub tick_subscription: Option<EventBusSubscription>,
} }
impl fmt::Debug for StorageManagerInner { impl fmt::Debug for StorageManagerInner {
@ -121,7 +125,7 @@ impl fmt::Debug for StorageManagerInner {
&self.peer_info_change_subscription, &self.peer_info_change_subscription,
) )
//.field("metadata_db", &self.metadata_db) //.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future) //.field("tick_subscription", &self.tick_subscription)
.finish() .finish()
} }
} }
@ -322,25 +326,16 @@ impl StorageManager {
// Register event handlers // Register event handlers
let peer_info_change_subscription = let peer_info_change_subscription =
impl_subscribe_event_bus_async!(self, Self, peer_info_change_event_handler); impl_subscribe_event_bus_async!(self, Self, peer_info_change_event_handler);
let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
// Resolve outbound watch manager noderefs // Resolve outbound watch manager noderefs
inner.outbound_watch_manager.prepare(self.routing_table()); inner.outbound_watch_manager.prepare(&self.routing_table());
// Schedule tick // Schedule tick
let registry = self.registry();
let tick_future = interval("storage manager tick", 1000, move || {
let registry = registry.clone();
async move {
let this = registry.storage_manager();
if let Err(e) = this.tick().await {
veilid_log!(this warn "storage manager tick failed: {}", e);
}
}
});
inner.tick_future = Some(tick_future);
inner.peer_info_change_subscription = Some(peer_info_change_subscription); inner.peer_info_change_subscription = Some(peer_info_change_subscription);
inner.tick_subscription = Some(tick_subscription);
Ok(()) Ok(())
} }
@ -350,14 +345,12 @@ impl StorageManager {
// Stop background operations // Stop background operations
{ {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
// Stop ticker
let tick_future = inner.tick_future.take();
if let Some(f) = tick_future {
f.await;
}
if let Some(sub) = inner.peer_info_change_subscription.take() { if let Some(sub) = inner.peer_info_change_subscription.take() {
self.event_bus().unsubscribe(sub); self.event_bus().unsubscribe(sub);
} }
if let Some(sub) = inner.tick_subscription.take() {
self.event_bus().unsubscribe(sub);
}
} }
// Cancel all tasks associated with the tick future // Cancel all tasks associated with the tick future
@ -2249,6 +2242,13 @@ impl StorageManager {
} }
} }
async fn tick_event_handler(&self, evt: Arc<TickEvent>) {
let lag = evt.last_tick_ts.map(|x| evt.cur_tick_ts.saturating_sub(x));
if let Err(e) = self.tick(lag).await {
error!("Error in storage manager tick: {}", e);
}
}
pub async fn get_encryption_key_for_opaque_record_key( pub async fn get_encryption_key_for_opaque_record_key(
&self, &self,
opaque_record_key: &OpaqueRecordKey, opaque_record_key: &OpaqueRecordKey,

View file

@ -90,7 +90,7 @@ impl OutboundWatchManager {
} }
} }
pub fn prepare(&mut self, routing_table: VeilidComponentGuard<'_, RoutingTable>) { pub fn prepare(&mut self, routing_table: &RoutingTable) {
for (pnk, pns) in &mut self.per_node_states { for (pnk, pns) in &mut self.per_node_states {
pns.watch_node_ref = match routing_table.lookup_node_ref(pnk.node_id.clone()) { pns.watch_node_ref = match routing_table.lookup_node_ref(pnk.node_id.clone()) {
Ok(v) => v, Ok(v) => v,
@ -204,7 +204,13 @@ impl OutboundWatchManager {
} }
/// Set a record up to be inspected for changed subkeys /// Set a record up to be inspected for changed subkeys
pub fn enqueue_change_inspect(&mut self, record_key: RecordKey, subkeys: ValueSubkeyRangeSet) { pub fn enqueue_change_inspect(
&mut self,
storage_manager: &StorageManager,
record_key: RecordKey,
subkeys: ValueSubkeyRangeSet,
) {
veilid_log!(storage_manager debug "change inspect: record_key={} subkeys={}", record_key, subkeys);
self.needs_change_inspection self.needs_change_inspection
.entry(record_key) .entry(record_key)
.and_modify(|x| *x = x.union(&subkeys)) .and_modify(|x| *x = x.union(&subkeys))

View file

@ -18,9 +18,11 @@ impl StorageManager {
flush_record_stores_task, flush_record_stores_task,
flush_record_stores_task_routine flush_record_stores_task_routine
); );
// Set save metadata task // Set save metadata task
veilid_log!(self debug "starting save metadata task"); veilid_log!(self debug "starting save metadata task");
impl_setup_task!(self, Self, save_metadata_task, save_metadata_task_routine); impl_setup_task!(self, Self, save_metadata_task, save_metadata_task_routine);
// Set offline subkey writes tick task // Set offline subkey writes tick task
veilid_log!(self debug "starting offline subkey writes task"); veilid_log!(self debug "starting offline subkey writes task");
impl_setup_task!( impl_setup_task!(
@ -68,7 +70,7 @@ impl StorageManager {
} }
#[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)] #[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self, lag: Option<TimestampDuration>) -> EyreResult<()> {
// Run the flush stores task // Run the flush stores task
self.flush_record_stores_task.tick().await?; self.flush_record_stores_task.tick().await?;
@ -96,6 +98,14 @@ impl StorageManager {
// Send value changed notifications // Send value changed notifications
self.send_value_changes_task.tick().await?; self.send_value_changes_task.tick().await?;
} }
// Change inspection
if let Some(lag) = lag {
if lag > TimestampDuration::new_secs(CHANGE_INSPECT_LAG_SECS) {
self.change_inspect_all_watches().await;
}
}
Ok(()) Ok(())
} }
@ -121,6 +131,10 @@ impl StorageManager {
if let Err(e) = self.offline_subkey_writes_task.stop().await { if let Err(e) = self.offline_subkey_writes_task.stop().await {
veilid_log!(self warn "offline_subkey_writes_task not stopped: {}", e); veilid_log!(self warn "offline_subkey_writes_task not stopped: {}", e);
} }
veilid_log!(self debug "stopping save metadata task");
if let Err(e) = self.save_metadata_task.stop().await {
veilid_log!(self warn "save_metadata_task not stopped: {}", e);
}
veilid_log!(self debug "stopping record rehydration task"); veilid_log!(self debug "stopping record rehydration task");
if let Err(e) = self.rehydrate_records_task.stop().await { if let Err(e) = self.rehydrate_records_task.stop().await {
veilid_log!(self warn "rehydrate_records_task not stopped: {}", e); veilid_log!(self warn "rehydrate_records_task not stopped: {}", e);

View file

@ -813,6 +813,8 @@ impl StorageManager {
} }
// Update watch state // Update watch state
let did_add_nodes = !added_nodes.is_empty();
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.set_params(desired.clone()); editor.set_params(desired.clone());
editor.retain_nodes(|x| !remove_nodes.contains(x)); editor.retain_nodes(|x| !remove_nodes.contains(x));
@ -821,10 +823,12 @@ impl StorageManager {
// Watch was reconciled, now kick off an inspect to // Watch was reconciled, now kick off an inspect to
// ensure that any changes online are immediately reported to the app // ensure that any changes online are immediately reported to the app
if opt_old_state_params != Some(desired) { // If the watch parameters changed, or we added new nodes to the watch state
// then we should inspect and see if anything changed
if opt_old_state_params != Some(desired) || did_add_nodes {
inner inner
.outbound_watch_manager .outbound_watch_manager
.enqueue_change_inspect(record_key, watch_subkeys); .enqueue_change_inspect(self, record_key, watch_subkeys);
} }
} }
@ -1057,9 +1061,13 @@ impl StorageManager {
inbound_node_id: NodeId, inbound_node_id: NodeId,
watch_id: u64, watch_id: u64,
) -> VeilidAPIResult<NetworkResult<()>> { ) -> VeilidAPIResult<NetworkResult<()>> {
let encryption_key = self let Ok(encryption_key) = self
.get_encryption_key_for_opaque_record_key(&opaque_record_key) .get_encryption_key_for_opaque_record_key(&opaque_record_key)
.await?; .await
else {
// value change received for unopened key
return Ok(NetworkResult::value(()));
};
let record_key = RecordKey::new( let record_key = RecordKey::new(
opaque_record_key.kind(), opaque_record_key.kind(),
BareRecordKey::new(opaque_record_key.value(), encryption_key), BareRecordKey::new(opaque_record_key.value(), encryption_key),
@ -1299,9 +1307,11 @@ impl StorageManager {
// inspect the range to see what changed // inspect the range to see what changed
// Queue this up for inspection // Queue this up for inspection
inner inner.outbound_watch_manager.enqueue_change_inspect(
.outbound_watch_manager self,
.enqueue_change_inspect(record_key, reportable_subkeys); record_key,
reportable_subkeys,
);
} }
Ok(NetworkResult::value(())) Ok(NetworkResult::value(()))
@ -1329,9 +1339,11 @@ impl StorageManager {
veilid_log!(self debug "change inspecting {} watches", change_inspects.len()); veilid_log!(self debug "change inspecting {} watches", change_inspects.len());
for change_inspect in change_inspects { for change_inspect in change_inspects {
inner inner.outbound_watch_manager.enqueue_change_inspect(
.outbound_watch_manager self,
.enqueue_change_inspect(change_inspect.0, change_inspect.1); change_inspect.0,
change_inspect.1,
);
} }
} }
} }

View file

@ -22,12 +22,11 @@ pub fn format_opt_ts(ts: Option<TimestampDuration>) -> String {
let Some(ts) = ts else { let Some(ts) = ts else {
return "---".to_owned(); return "---".to_owned();
}; };
let ts = ts.as_u64(); let secs = ts.seconds_f64();
let secs = timestamp_to_secs(ts);
if secs >= 1.0 { if secs >= 1.0 {
format!("{:.2}s", timestamp_to_secs(ts)) format!("{:.2}s", secs)
} else { } else {
format!("{:.2}ms", timestamp_to_secs(ts) * 1000.0) format!("{:.2}ms", secs * 1000.0)
} }
} }

View file

@ -54,7 +54,7 @@ macro_rules! aligned_u64_type {
Self(v) Self(v)
} }
#[must_use] #[must_use]
pub fn as_u64(self) -> u64 { pub const fn as_u64(self) -> u64 {
self.0 self.0
} }
} }

View file

@ -18,4 +18,26 @@ impl TimestampDuration {
pub const fn new_ms(ms: u64) -> Self { pub const fn new_ms(ms: u64) -> Self {
TimestampDuration::new(ms * 1_000u64) TimestampDuration::new(ms * 1_000u64)
} }
pub fn seconds_u32(&self) -> Result<u32, String> {
u32::try_from(self.as_u64() / 1_000_000u64)
.map_err(|e| format!("could not convert to seconds: {}", e))
}
pub fn millis_u32(&self) -> Result<u32, String> {
u32::try_from(self.as_u64() / 1_000u64)
.map_err(|e| format!("could not convert to milliseconds: {}", e))
}
#[must_use]
pub fn seconds_f64(&self) -> f64 {
// Downshift precision until it fits, lose least significant bits
let mut mul: f64 = 1.0f64 / 1_000_000.0f64;
let mut usec = self.0;
while usec > (u32::MAX as u64) {
usec >>= 1;
mul *= 2.0f64;
}
f64::from(usec as u32) * mul
}
} }

View file

@ -534,7 +534,7 @@ pub fn test_tools() {
for x in 0..1024 { for x in 0..1024 {
let cur_us = x as u64 * 1000000u64; let cur_us = x as u64 * 1000000u64;
if retry_falloff_log(last_us, cur_us, 10_000_000u64, 6_000_000_000u64, 2.0f64) { if retry_falloff_log(last_us, cur_us, 10_000_000u64, 6_000_000_000u64, 2.0f64) {
info!(" retry at {} secs", timestamp_to_secs(cur_us)); info!(" retry at {} secs", timestamp_duration_to_secs(cur_us));
last_us = cur_us; last_us = cur_us;
} }
} }

View file

@ -19,7 +19,7 @@ pub struct TickTask<E: Send + 'static> {
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
} }
impl<E: Send + 'static> TickTask<E> { impl<E: Send + fmt::Debug + 'static> TickTask<E> {
#[must_use] #[must_use]
pub fn new_us(name: &str, tick_period_us: u64) -> Self { pub fn new_us(name: &str, tick_period_us: u64) -> Self {
Self { Self {
@ -179,9 +179,14 @@ impl<E: Send + 'static> TickTask<E> {
*stop_source_guard = Some(stop_source); *stop_source_guard = Some(stop_source);
Ok(true) Ok(true)
} }
Err(()) => {
// If we get this, it's because we are joining the singlefuture already
// Don't bother running but this is not an error in this case
Ok(false)
}
// All other conditions should not be reachable // All other conditions should not be reachable
_ => { x => {
unreachable!(); panic!("should not have gotten this result: {:?}", x);
} }
} }
} }

View file

@ -160,12 +160,19 @@ pub fn prepend_slash(s: String) -> String {
} }
#[must_use] #[must_use]
pub fn timestamp_to_secs(ts: u64) -> f64 { pub fn timestamp_duration_to_secs(dur: u64) -> f64 {
ts as f64 / 1000000.0f64 // Downshift precision until it fits, lose least significant bits
let mut mul: f64 = 1.0f64 / 1_000_000.0f64;
let mut usec = dur;
while usec > (u32::MAX as u64) {
usec >>= 1;
mul *= 2.0f64;
}
f64::from(usec as u32) * mul
} }
#[must_use] #[must_use]
pub fn secs_to_timestamp(secs: f64) -> u64 { pub fn secs_to_timestamp_duration(secs: f64) -> u64 {
(secs * 1000000.0f64) as u64 (secs * 1000000.0f64) as u64
} }
@ -196,7 +203,10 @@ pub fn retry_falloff_log(
true true
} else { } else {
// Exponential falloff between 'interval_start_us' and 'interval_max_us' microseconds // Exponential falloff between 'interval_start_us' and 'interval_max_us' microseconds
last_us <= secs_to_timestamp(timestamp_to_secs(cur_us) / interval_multiplier_us) last_us
<= secs_to_timestamp_duration(
timestamp_duration_to_secs(cur_us) / interval_multiplier_us,
)
} }
} }