make timestamp / timestampduration their own newtypes

This commit is contained in:
Christien Rioux 2024-08-01 20:02:27 -05:00
parent ff2f2125b0
commit 1e83cd1349
36 changed files with 293 additions and 250 deletions

View File

@ -188,7 +188,7 @@ impl AttachmentManager {
// Set timestamps // Set timestamps
if state == AttachmentState::Attaching { if state == AttachmentState::Attaching {
inner.attach_ts = Some(get_aligned_timestamp()); inner.attach_ts = Some(Timestamp::now());
} else if state == AttachmentState::Detached { } else if state == AttachmentState::Detached {
inner.attach_ts = None; inner.attach_ts = None;
} else if state == AttachmentState::Detaching { } else if state == AttachmentState::Detaching {

View File

@ -133,7 +133,7 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
span_ref span_ref
.extensions_mut() .extensions_mut()
.insert::<SpanDuration>(SpanDuration { .insert::<SpanDuration>(SpanDuration {
start: get_aligned_timestamp(), start: Timestamp::now(),
end: Timestamp::default(), end: Timestamp::default(),
}); });
} }
@ -145,7 +145,7 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
if let Some(inner) = &mut *self.inner.lock() { if let Some(inner) = &mut *self.inner.lock() {
if let Some(span_ref) = ctx.span(&id) { if let Some(span_ref) = ctx.span(&id) {
if let Some(span_duration) = span_ref.extensions_mut().get_mut::<SpanDuration>() { if let Some(span_duration) = span_ref.extensions_mut().get_mut::<SpanDuration>() {
span_duration.end = get_aligned_timestamp(); span_duration.end = Timestamp::now();
let duration = span_duration.end.saturating_sub(span_duration.start); let duration = span_duration.end.saturating_sub(span_duration.start);
let meta = span_ref.metadata(); let meta = span_ref.metadata();
self.emit_log( self.emit_log(

View File

@ -255,7 +255,7 @@ impl AddressFilter {
} }
pub fn set_dial_info_failed(&self, dial_info: DialInfo) { pub fn set_dial_info_failed(&self, dial_info: DialInfo) {
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES { if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES {
@ -280,7 +280,7 @@ impl AddressFilter {
pub fn punish_ip_addr(&self, addr: IpAddr, reason: PunishmentReason) { pub fn punish_ip_addr(&self, addr: IpAddr, reason: PunishmentReason) {
log_net!(debug ">>> PUNISHED: {} for {:?}", addr, reason); log_net!(debug ">>> PUNISHED: {} for {:?}", addr, reason);
let timestamp = get_aligned_timestamp(); let timestamp = Timestamp::now();
let punishment = Punishment { reason, timestamp }; let punishment = Punishment { reason, timestamp };
let ipblock = ip_to_ipblock( let ipblock = ip_to_ipblock(
@ -321,7 +321,7 @@ impl AddressFilter {
nr.operate_mut(|_rti, e| e.set_punished(Some(reason))); nr.operate_mut(|_rti, e| e.set_punished(Some(reason)));
} }
let timestamp = get_aligned_timestamp(); let timestamp = Timestamp::now();
let punishment = Punishment { reason, timestamp }; let punishment = Punishment { reason, timestamp };
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -363,7 +363,7 @@ impl AddressFilter {
return Err(AddressFilterError::Punished); return Err(AddressFilterError::Punished);
} }
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
self.purge_old_timestamps(inner, ts); self.purge_old_timestamps(inner, ts);
match ipblock { match ipblock {
@ -423,7 +423,7 @@ impl AddressFilter {
addr, addr,
); );
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
self.purge_old_timestamps(&mut inner, ts); self.purge_old_timestamps(&mut inner, ts);
match ipblock { match ipblock {

View File

@ -232,7 +232,7 @@ impl ConnectionManager {
// See if this should be a protected connection // See if this should be a protected connection
if let Some(protect_nr) = self.should_protect_connection(&conn) { if let Some(protect_nr) = self.should_protect_connection(&conn) {
log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(get_aligned_timestamp()), protect_nr); log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(Timestamp::now()), protect_nr);
conn.protect(protect_nr); conn.protect(protect_nr);
} }
@ -244,7 +244,7 @@ impl ConnectionManager {
Ok(Some(conn)) => { Ok(Some(conn)) => {
// Connection added and a different one LRU'd out // Connection added and a different one LRU'd out
// Send it to be terminated // Send it to be terminated
log_net!(debug "== LRU kill connection due to limit: {:?}", conn.debug_print(get_aligned_timestamp())); log_net!(debug "== LRU kill connection due to limit: {:?}", conn.debug_print(Timestamp::now()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
} }
Err(ConnectionTableAddError::AddressFilter(conn, e)) => { Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
@ -259,7 +259,7 @@ impl ConnectionManager {
Err(ConnectionTableAddError::AlreadyExists(conn)) => { Err(ConnectionTableAddError::AlreadyExists(conn)) => {
// Connection already exists // Connection already exists
let desc = conn.flow(); let desc = conn.flow();
log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp())); log_net!(debug "== Connection already exists: {:?}", conn.debug_print(Timestamp::now()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!( return Ok(NetworkResult::no_connection_other(format!(
"connection already exists: {:?}", "connection already exists: {:?}",
@ -269,7 +269,7 @@ impl ConnectionManager {
Err(ConnectionTableAddError::TableFull(conn)) => { Err(ConnectionTableAddError::TableFull(conn)) => {
// Connection table is full // Connection table is full
let desc = conn.flow(); let desc = conn.flow();
log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp())); log_net!(debug "== Connection table full: {:?}", conn.debug_print(Timestamp::now()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!( return Ok(NetworkResult::no_connection_other(format!(
"connection table is full: {:?}", "connection table is full: {:?}",

View File

@ -388,7 +388,7 @@ impl ConnectionTable {
pub fn debug_print_table(&self) -> String { pub fn debug_print_table(&self) -> String {
let mut out = String::new(); let mut out = String::new();
let inner = self.inner.lock(); let inner = self.inner.lock();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
for t in 0..inner.conn_by_id.len() { for t in 0..inner.conn_by_id.len() {
out += &format!( out += &format!(
" {} Connections: ({}/{})\n", " {} Connections: ({}/{})\n",

View File

@ -529,11 +529,11 @@ impl NetworkManager {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
match inner.client_allowlist.entry(client) { match inner.client_allowlist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => { hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen_ts = get_aligned_timestamp() entry.get_mut().last_seen_ts = Timestamp::now()
} }
hashlink::lru_cache::Entry::Vacant(entry) => { hashlink::lru_cache::Entry::Vacant(entry) => {
entry.insert(ClientAllowlistEntry { entry.insert(ClientAllowlistEntry {
last_seen_ts: get_aligned_timestamp(), last_seen_ts: Timestamp::now(),
}); });
} }
} }
@ -545,7 +545,7 @@ impl NetworkManager {
match inner.client_allowlist.entry(client) { match inner.client_allowlist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => { hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen_ts = get_aligned_timestamp(); entry.get_mut().last_seen_ts = Timestamp::now();
true true
} }
hashlink::lru_cache::Entry::Vacant(_) => false, hashlink::lru_cache::Entry::Vacant(_) => false,
@ -556,7 +556,7 @@ impl NetworkManager {
let timeout_ms = self.with_config(|c| c.network.client_allowlist_timeout_ms); let timeout_ms = self.with_config(|c| c.network.client_allowlist_timeout_ms);
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let cutoff_timestamp = let cutoff_timestamp =
get_aligned_timestamp() - TimestampDuration::new((timeout_ms as u64) * 1000u64); Timestamp::now() - TimestampDuration::new((timeout_ms as u64) * 1000u64);
// Remove clients from the allowlist that haven't been since since our allowlist timeout // Remove clients from the allowlist that haven't been since since our allowlist timeout
while inner while inner
.client_allowlist .client_allowlist
@ -587,7 +587,7 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self, extra_data, callback))] #[instrument(level = "trace", skip(self, extra_data, callback))]
pub fn generate_receipt<D: AsRef<[u8]>>( pub fn generate_receipt<D: AsRef<[u8]>>(
&self, &self,
expiration_us: u64, expiration_us: TimestampDuration,
expected_returns: u32, expected_returns: u32,
extra_data: D, extra_data: D,
callback: impl ReceiptCallback, callback: impl ReceiptCallback,
@ -617,7 +617,7 @@ impl NetworkManager {
.wrap_err("failed to generate signed receipt")?; .wrap_err("failed to generate signed receipt")?;
// Record the receipt for later // Record the receipt for later
let exp_ts = get_aligned_timestamp() + expiration_us; let exp_ts = Timestamp::now() + expiration_us;
receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback); receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback);
Ok(out) Ok(out)
@ -627,7 +627,7 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self, extra_data))] #[instrument(level = "trace", skip(self, extra_data))]
pub fn generate_single_shot_receipt<D: AsRef<[u8]>>( pub fn generate_single_shot_receipt<D: AsRef<[u8]>>(
&self, &self,
expiration_us: u64, expiration_us: TimestampDuration,
extra_data: D, extra_data: D,
) -> EyreResult<(Vec<u8>, EventualValueFuture<ReceiptEvent>)> { ) -> EyreResult<(Vec<u8>, EventualValueFuture<ReceiptEvent>)> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
@ -656,7 +656,7 @@ impl NetworkManager {
.wrap_err("failed to generate signed receipt")?; .wrap_err("failed to generate signed receipt")?;
// Record the receipt for later // Record the receipt for later
let exp_ts = get_aligned_timestamp() + expiration_us; let exp_ts = Timestamp::now() + expiration_us;
let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled)); let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled));
let instance = eventual.instance(); let instance = eventual.instance();
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual); receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
@ -855,7 +855,7 @@ impl NetworkManager {
// XXX: do we need a delay here? or another hole punch packet? // XXX: do we need a delay here? or another hole punch packet?
// Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch // Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch
peer_nr.set_last_flow(unique_flow.flow, get_aligned_timestamp()); peer_nr.set_last_flow(unique_flow.flow, Timestamp::now());
// Return the receipt using the same dial info send the receipt to it // Return the receipt using the same dial info send the receipt to it
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
@ -883,7 +883,7 @@ impl NetworkManager {
let node_id_secret = routing_table.node_id_secret_key(vcrypto.kind()); let node_id_secret = routing_table.node_id_secret_key(vcrypto.kind());
// Get timestamp, nonce // Get timestamp, nonce
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
let nonce = vcrypto.random_nonce(); let nonce = vcrypto.random_nonce();
// Encode envelope // Encode envelope
@ -1064,7 +1064,7 @@ impl NetworkManager {
}); });
// Validate timestamp isn't too old // Validate timestamp isn't too old
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
let ets = envelope.get_timestamp(); let ets = envelope.get_timestamp();
if let Some(tsbehind) = tsbehind { if let Some(tsbehind) = tsbehind {
if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) { if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) {

View File

@ -291,7 +291,7 @@ impl IGDManager {
}; };
// Add to mapping list to keep alive // Add to mapping list to keep alive
let timestamp = get_aligned_timestamp(); let timestamp = Timestamp::now();
inner.port_maps.insert(PortMapKey { inner.port_maps.insert(PortMapKey {
llpt, llpt,
at, at,
@ -318,7 +318,7 @@ impl IGDManager {
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
{ {
let inner = self.inner.lock(); let inner = self.inner.lock();
let now = get_aligned_timestamp(); let now = Timestamp::now();
for (k, v) in &inner.port_maps { for (k, v) in &inner.port_maps {
let mapping_lifetime = now.saturating_sub(v.timestamp); let mapping_lifetime = now.saturating_sub(v.timestamp);
@ -373,7 +373,7 @@ impl IGDManager {
inner.port_maps.insert(k, PortMapValue { inner.port_maps.insert(k, PortMapValue {
ext_ip: v.ext_ip, ext_ip: v.ext_ip,
mapped_port, mapped_port,
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
renewal_lifetime: TimestampDuration::new((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64), renewal_lifetime: TimestampDuration::new((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64),
renewal_attempts: 0, renewal_attempts: 0,
}); });
@ -414,7 +414,7 @@ impl IGDManager {
inner.port_maps.insert(k, PortMapValue { inner.port_maps.insert(k, PortMapValue {
ext_ip: v.ext_ip, ext_ip: v.ext_ip,
mapped_port: v.mapped_port, mapped_port: v.mapped_port,
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(), renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(),
renewal_attempts: 0, renewal_attempts: 0,
}); });

View File

@ -114,7 +114,7 @@ impl NetworkConnection {
connection_id: id, connection_id: id,
flow, flow,
processor: None, processor: None,
established_time: get_aligned_timestamp(), established_time: Timestamp::now(),
stats: Arc::new(Mutex::new(NetworkConnectionStats { stats: Arc::new(Mutex::new(NetworkConnectionStats {
last_message_sent_time: None, last_message_sent_time: None,
last_message_recv_time: None, last_message_recv_time: None,
@ -165,7 +165,7 @@ impl NetworkConnection {
connection_id, connection_id,
flow, flow,
processor: Some(processor), processor: Some(processor),
established_time: get_aligned_timestamp(), established_time: Timestamp::now(),
stats, stats,
sender, sender,
stop_source: Some(stop_source), stop_source: Some(stop_source),
@ -227,7 +227,7 @@ impl NetworkConnection {
stats: Arc<Mutex<NetworkConnectionStats>>, stats: Arc<Mutex<NetworkConnectionStats>>,
message: Vec<u8>, message: Vec<u8>,
) -> io::Result<NetworkResult<()>> { ) -> io::Result<NetworkResult<()>> {
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
network_result_try!(protocol_connection.send(message).await?); network_result_try!(protocol_connection.send(message).await?);
let mut stats = stats.lock(); let mut stats = stats.lock();
@ -241,7 +241,7 @@ impl NetworkConnection {
protocol_connection: &ProtocolNetworkConnection, protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>, stats: Arc<Mutex<NetworkConnectionStats>>,
) -> io::Result<NetworkResult<Vec<u8>>> { ) -> io::Result<NetworkResult<Vec<u8>>> {
let ts = get_aligned_timestamp(); let ts = Timestamp::now();
let out = network_result_try!(protocol_connection.recv().await?); let out = network_result_try!(protocol_connection.recv().await?);
let mut stats = stats.lock(); let mut stats = stats.lock();

View File

@ -304,7 +304,7 @@ impl ReceiptManager {
}; };
(inner.next_oldest_ts, inner.timeout_task.clone(), stop_token) (inner.next_oldest_ts, inner.timeout_task.clone(), stop_token)
}; };
let now = get_aligned_timestamp(); let now = Timestamp::now();
// If we have at least one timestamp to expire, lets do it // If we have at least one timestamp to expire, lets do it
if let Some(next_oldest_ts) = next_oldest_ts { if let Some(next_oldest_ts) = next_oldest_ts {
if now >= next_oldest_ts { if now >= next_oldest_ts {

View File

@ -28,7 +28,7 @@ impl NetworkManager {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last flow since we just sent to it // Update timestamp for this last flow since we just sent to it
destination_node_ref destination_node_ref
.set_last_flow(unique_flow.flow, get_aligned_timestamp()); .set_last_flow(unique_flow.flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod { return Ok(NetworkResult::value(SendDataMethod {
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
@ -181,7 +181,7 @@ impl NetworkManager {
}; };
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_flow(flow, get_aligned_timestamp()); target_node_ref.set_last_flow(flow, Timestamp::now());
Ok(NetworkResult::value(SendDataMethod{ Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -218,7 +218,7 @@ impl NetworkManager {
}; };
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_flow(flow, get_aligned_timestamp()); target_node_ref.set_last_flow(flow, Timestamp::now());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -245,7 +245,7 @@ impl NetworkManager {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref target_node_ref
.set_last_flow(flow, get_aligned_timestamp()); .set_last_flow(flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod{ return Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -293,7 +293,7 @@ impl NetworkManager {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref target_node_ref
.set_last_flow(flow, get_aligned_timestamp()); .set_last_flow(flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod{ return Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -347,7 +347,7 @@ impl NetworkManager {
{ {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
node_ref.set_last_flow(flow, get_aligned_timestamp()); node_ref.set_last_flow(flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod{ return Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -370,7 +370,7 @@ impl NetworkManager {
network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?); network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?);
// If we connected to this node directly, save off the last connection so we can use it again // If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_flow(unique_flow.flow, get_aligned_timestamp()); node_ref.set_last_flow(unique_flow.flow, Timestamp::now());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Direct(dial_info), contact_method: NodeContactMethod::Direct(dial_info),
@ -569,12 +569,12 @@ impl NetworkManager {
}; };
// Build a return receipt for the signal // Build a return receipt for the signal
let receipt_timeout = ms_to_us( let receipt_timeout = TimestampDuration::new_ms(
self.unlocked_inner self.unlocked_inner
.config .config
.get() .get()
.network .network
.reverse_connection_receipt_time_ms, .reverse_connection_receipt_time_ms as u64,
); );
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
@ -680,12 +680,12 @@ impl NetworkManager {
.unwrap_or_default()); .unwrap_or_default());
// Build a return receipt for the signal // Build a return receipt for the signal
let receipt_timeout = ms_to_us( let receipt_timeout = TimestampDuration::new_ms(
self.unlocked_inner self.unlocked_inner
.config .config
.get() .get()
.network .network
.hole_punch_receipt_time_ms, .hole_punch_receipt_time_ms as u64,
); );
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;

View File

@ -196,7 +196,7 @@ impl NetworkManager {
// add them to our denylist (throttling) and go ahead and check for new // add them to our denylist (throttling) and go ahead and check for new
// public dialinfo // public dialinfo
let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT { let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US; let exp_ts = Timestamp::now() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
let pait = inner let pait = inner
.public_address_inconsistencies_table .public_address_inconsistencies_table
.entry(addr_proto_type_key) .entry(addr_proto_type_key)
@ -213,8 +213,8 @@ impl NetworkManager {
.public_address_inconsistencies_table .public_address_inconsistencies_table
.entry(addr_proto_type_key) .entry(addr_proto_type_key)
.or_default(); .or_default();
let exp_ts = get_aligned_timestamp() let exp_ts =
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US; Timestamp::now() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
for i in inconsistencies { for i in inconsistencies {
pait.insert(i, exp_ts); pait.insert(i, exp_ts);
} }

View File

@ -127,7 +127,7 @@ impl Bucket {
// Get the sorted list of entries by their kick order // Get the sorted list of entries by their kick order
let mut sorted_entries: Vec<(PublicKey, Arc<BucketEntry>)> = let mut sorted_entries: Vec<(PublicKey, Arc<BucketEntry>)> =
self.entries.iter().map(|(k, v)| (*k, v.clone())).collect(); self.entries.iter().map(|(k, v)| (*k, v.clone())).collect();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
sorted_entries.sort_by(|a, b| -> core::cmp::Ordering { sorted_entries.sort_by(|a, b| -> core::cmp::Ordering {
if a.0 == b.0 { if a.0 == b.0 {
return core::cmp::Ordering::Equal; return core::cmp::Ordering::Equal;

View File

@ -341,7 +341,7 @@ impl BucketEntryInner {
// No need to update the signednodeinfo though since the timestamp is the same // No need to update the signednodeinfo though since the timestamp is the same
// Let the node try to live again but don't mark it as seen yet // Let the node try to live again but don't mark it as seen yet
self.updated_since_last_network_change = true; self.updated_since_last_network_change = true;
self.make_not_dead(get_aligned_timestamp()); self.make_not_dead(Timestamp::now());
} }
return; return;
} }
@ -361,7 +361,7 @@ impl BucketEntryInner {
*opt_current_sni = Some(Box::new(signed_node_info)); *opt_current_sni = Some(Box::new(signed_node_info));
self.set_envelope_support(envelope_support); self.set_envelope_support(envelope_support);
self.updated_since_last_network_change = true; self.updated_since_last_network_change = true;
self.make_not_dead(get_aligned_timestamp()); self.make_not_dead(Timestamp::now());
// If we're updating an entry's node info, purge all // If we're updating an entry's node info, purge all
// but the last connection in our last connections list // but the last connection in our last connections list
@ -576,7 +576,7 @@ impl BucketEntryInner {
} else { } else {
// If this is not connection oriented, then we check our last seen time // If this is not connection oriented, then we check our last seen time
// to see if this mapping has expired (beyond our timeout) // to see if this mapping has expired (beyond our timeout)
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
(v.1 + TimestampDuration::new(CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts (v.1 + TimestampDuration::new(CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts
}; };
@ -809,8 +809,7 @@ impl BucketEntryInner {
let first_consecutive_seen_ts = let first_consecutive_seen_ts =
self.peer_stats.rpc_stats.first_consecutive_seen_ts.unwrap(); self.peer_stats.rpc_stats.first_consecutive_seen_ts.unwrap();
let start_of_reliable_time = first_consecutive_seen_ts let start_of_reliable_time = first_consecutive_seen_ts
+ ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64 + TimestampDuration::new_secs(UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS);
* 1_000_000u64);
let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time); let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time);
let reliable_last = let reliable_last =
latest_contact_time.saturating_sub(start_of_reliable_time); latest_contact_time.saturating_sub(start_of_reliable_time);
@ -946,7 +945,7 @@ impl BucketEntry {
// First node id should always be one we support since TypedKeySets are sorted and we must have at least one supported key // First node id should always be one we support since TypedKeySets are sorted and we must have at least one supported key
assert!(VALID_CRYPTO_KINDS.contains(&first_node_id.kind)); assert!(VALID_CRYPTO_KINDS.contains(&first_node_id.kind));
let now = get_aligned_timestamp(); let now = Timestamp::now();
let inner = BucketEntryInner { let inner = BucketEntryInner {
validated_node_ids: TypedKeyGroup::from(first_node_id), validated_node_ids: TypedKeyGroup::from(first_node_id),
unsupported_node_ids: TypedKeyGroup::new(), unsupported_node_ids: TypedKeyGroup::new(),

View File

@ -140,7 +140,7 @@ impl RoutingTable {
) -> String { ) -> String {
let inner = self.inner.read(); let inner = self.inner.read();
let inner = &*inner; let inner = &*inner;
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let mut out = String::new(); let mut out = String::new();
@ -211,15 +211,24 @@ impl RoutingTable {
} }
pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String { pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String {
let cur_ts = Timestamp::now();
let mut out = String::new(); let mut out = String::new();
out += &node_ref.operate(|_rt, e| format!("{:#?}\n", e)); out += &node_ref.operate(|_rti, e| {
let state_reason = e.state_reason(cur_ts);
format!(
"state: {}\n{:#?}\n",
Self::format_state_reason(state_reason),
e
)
});
out out
} }
pub(crate) fn debug_info_buckets(&self, min_state: BucketEntryState) -> String { pub(crate) fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.read(); let inner = self.inner.read();
let inner = &*inner; let inner = &*inner;
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let mut out = String::new(); let mut out = String::new();
const COLS: usize = 16; const COLS: usize = 16;

View File

@ -763,7 +763,7 @@ impl RoutingTable {
} }
pub fn clear_punishments(&self) { pub fn clear_punishments(&self) {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
self.inner self.inner
.write() .write()
.with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| { .with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| {

View File

@ -325,14 +325,14 @@ pub(crate) trait NodeRefBase: Sized {
} }
fn report_protected_connection_dropped(&self) { fn report_protected_connection_dropped(&self) {
self.stats_failed_to_send(get_aligned_timestamp(), false); self.stats_failed_to_send(Timestamp::now(), false);
} }
fn report_failed_route_test(&self) { fn report_failed_route_test(&self) {
self.stats_failed_to_send(get_aligned_timestamp(), false); self.stats_failed_to_send(Timestamp::now(), false);
} }
fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) { fn stats_question_sent(&self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) {
self.operate_mut(|rti, e| { self.operate_mut(|rti, e| {
rti.transfer_stats_accounting().add_up(bytes); rti.transfer_stats_accounting().add_up(bytes);
e.question_sent(ts, bytes, expects_answer); e.question_sent(ts, bytes, expects_answer);

View File

@ -254,7 +254,7 @@ impl RouteSpecStore {
.map(|nr| nr.locked(rti)); .map(|nr| nr.locked(rti));
// Get list of all nodes, and sort them for selection // Get list of all nodes, and sort them for selection
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let filter = Box::new( let filter = Box::new(
|_rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool { |_rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool {
// Exclude our own node from routes // Exclude our own node from routes
@ -838,7 +838,7 @@ impl RouteSpecStore {
/// Check if a route id is remote or not /// Check if a route id is remote or not
pub fn is_route_id_remote(&self, id: &RouteId) -> bool { pub fn is_route_id_remote(&self, id: &RouteId) -> bool {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
inner inner
.cache .cache
.peek_remote_private_route_mut(cur_ts, id) .peek_remote_private_route_mut(cur_ts, id)
@ -881,7 +881,7 @@ impl RouteSpecStore {
directions: DirectionSet, directions: DirectionSet,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> Option<RouteId> { ) -> Option<RouteId> {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let mut routes = Vec::new(); let mut routes = Vec::new();
@ -954,7 +954,7 @@ impl RouteSpecStore {
F: FnMut(&RouteId, &RemotePrivateRouteInfo) -> Option<R>, F: FnMut(&RouteId, &RemotePrivateRouteInfo) -> Option<R>,
{ {
let inner = self.inner.lock(); let inner = self.inner.lock();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let remote_route_ids = inner.cache.get_remote_private_route_ids(cur_ts); let remote_route_ids = inner.cache.get_remote_private_route_ids(cur_ts);
let mut out = Vec::with_capacity(remote_route_ids.len()); let mut out = Vec::with_capacity(remote_route_ids.len());
for id in remote_route_ids { for id in remote_route_ids {
@ -970,7 +970,7 @@ impl RouteSpecStore {
/// Get the debug description of a route /// Get the debug description of a route
pub fn debug_route(&self, id: &RouteId) -> Option<String> { pub fn debug_route(&self, id: &RouteId) -> Option<String> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
if let Some(rpri) = inner.cache.peek_remote_private_route(cur_ts, id) { if let Some(rpri) = inner.cache.peek_remote_private_route(cur_ts, id) {
return Some(format!("{:#?}", rpri)); return Some(format!("{:#?}", rpri));
} }
@ -985,7 +985,7 @@ impl RouteSpecStore {
/// Choose the best private route from a private route set to communicate with /// Choose the best private route from a private route set to communicate with
pub fn best_remote_private_route(&self, id: &RouteId) -> Option<PrivateRoute> { pub fn best_remote_private_route(&self, id: &RouteId) -> Option<PrivateRoute> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let rpri = inner.cache.get_remote_private_route(cur_ts, id)?; let rpri = inner.cache.get_remote_private_route(cur_ts, id)?;
rpri.best_private_route() rpri.best_private_route()
} }
@ -1525,7 +1525,7 @@ impl RouteSpecStore {
/// Returns a route set id /// Returns a route set id
#[instrument(level = "trace", target = "route", skip_all)] #[instrument(level = "trace", target = "route", skip_all)]
pub fn import_remote_private_route_blob(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> { pub fn import_remote_private_route_blob(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
// decode the pr blob // decode the pr blob
let private_routes = RouteSpecStore::blob_to_private_routes( let private_routes = RouteSpecStore::blob_to_private_routes(
@ -1565,7 +1565,7 @@ impl RouteSpecStore {
&self, &self,
private_route: PrivateRoute, private_route: PrivateRoute,
) -> VeilidAPIResult<RouteId> { ) -> VeilidAPIResult<RouteId> {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
// Make a single route set // Make a single route set
let private_routes = vec![private_route]; let private_routes = vec![private_route];
@ -1630,7 +1630,7 @@ impl RouteSpecStore {
} }
if let Some(rrid) = inner.cache.get_remote_private_route_id_by_key(key) { if let Some(rrid) = inner.cache.get_remote_private_route_id_by_key(key) {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
if let Some(rpri) = inner.cache.peek_remote_private_route(cur_ts, &rrid) { if let Some(rpri) = inner.cache.peek_remote_private_route(cur_ts, &rrid) {
let our_node_info_ts = self let our_node_info_ts = self
.unlocked_inner .unlocked_inner

View File

@ -235,7 +235,7 @@ impl RoutingTableInner {
} }
pub fn reset_all_updated_since_last_network_change(&mut self) { pub fn reset_all_updated_since_last_network_change(&mut self) {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| { self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| {
v.with_mut(rti, |_rti, e| { v.with_mut(rti, |_rti, e| {
e.reset_updated_since_last_network_change(); e.reset_updated_since_last_network_change();
@ -340,7 +340,7 @@ impl RoutingTableInner {
// If the local network topology has changed, nuke the existing local node info and let new local discovery happen // If the local network topology has changed, nuke the existing local node info and let new local discovery happen
if changed { if changed {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| { self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| {
e.with_mut(rti, |_rti, e| { e.with_mut(rti, |_rti, e| {
e.clear_signed_node_info(RoutingDomain::LocalNetwork); e.clear_signed_node_info(RoutingDomain::LocalNetwork);
@ -420,7 +420,7 @@ impl RoutingTableInner {
/// Only considers entries that have valid signed node info /// Only considers entries that have valid signed node info
pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts { pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts {
self.live_entry_count.clear(); self.live_entry_count.clear();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
self.with_entries_mut(cur_ts, BucketEntryState::Unreliable, |rti, entry| { self.with_entries_mut(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
entry.with_inner(|e| { entry.with_inner(|e| {
// Tally per routing domain and crypto kind // Tally per routing domain and crypto kind
@ -458,7 +458,7 @@ impl RoutingTableInner {
crypto_kinds: &[CryptoKind], crypto_kinds: &[CryptoKind],
) -> usize { ) -> usize {
let mut count = 0usize; let mut count = 0usize;
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
self.with_entries(cur_ts, min_state, |rti, e| { self.with_entries(cur_ts, min_state, |rti, e| {
if e.with_inner(|e| { if e.with_inner(|e| {
e.best_routing_domain(rti, routing_domain_set).is_some() e.best_routing_domain(rti, routing_domain_set).is_some()
@ -884,7 +884,7 @@ impl RoutingTableInner {
let mut unreliable_entry_count: usize = 0; let mut unreliable_entry_count: usize = 0;
let mut dead_entry_count: usize = 0; let mut dead_entry_count: usize = 0;
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
for entry in self.all_entries.iter() { for entry in self.all_entries.iter() {
match entry.with_inner(|e| e.state(cur_ts)) { match entry.with_inner(|e| e.state(cur_ts)) {
BucketEntryState::Reliable => { BucketEntryState::Reliable => {
@ -1078,7 +1078,7 @@ impl RoutingTableInner {
where where
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O, T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
{ {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search // always filter out self peer, as it is irrelevant to the 'fastest nodes' search
let filter_self = let filter_self =
@ -1158,7 +1158,7 @@ impl RoutingTableInner {
where where
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O, T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
{ {
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
// Get the crypto kind // Get the crypto kind
let crypto_kind = node_id.kind; let crypto_kind = node_id.kind;

View File

@ -44,7 +44,7 @@ impl SignedDirectNodeInfo {
typed_key_pairs: Vec<TypedKeyPair>, typed_key_pairs: Vec<TypedKeyPair>,
node_info: NodeInfo, node_info: NodeInfo,
) -> VeilidAPIResult<Self> { ) -> VeilidAPIResult<Self> {
let timestamp = get_aligned_timestamp(); let timestamp = Timestamp::now();
let node_info_bytes = Self::make_signature_bytes(&node_info, timestamp)?; let node_info_bytes = Self::make_signature_bytes(&node_info, timestamp)?;
let typed_signatures = let typed_signatures =
crypto.generate_signatures(&node_info_bytes, &typed_key_pairs, |kp, s| { crypto.generate_signatures(&node_info_bytes, &typed_key_pairs, |kp, s| {
@ -78,7 +78,7 @@ impl SignedDirectNodeInfo {
pub fn with_no_signature(node_info: NodeInfo) -> Self { pub fn with_no_signature(node_info: NodeInfo) -> Self {
Self { Self {
node_info, node_info,
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
signatures: Vec::new(), signatures: Vec::new(),
} }
} }

View File

@ -71,7 +71,7 @@ impl SignedRelayedNodeInfo {
relay_ids: TypedKeyGroup, relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo, relay_info: SignedDirectNodeInfo,
) -> VeilidAPIResult<Self> { ) -> VeilidAPIResult<Self> {
let timestamp = get_aligned_timestamp(); let timestamp = Timestamp::now();
let node_info_bytes = let node_info_bytes =
Self::make_signature_bytes(&node_info, &relay_ids, &relay_info, timestamp)?; Self::make_signature_bytes(&node_info, &relay_ids, &relay_info, timestamp)?;
let typed_signatures = let typed_signatures =

View File

@ -515,7 +515,7 @@ impl RPCProcessor {
// ensure we have some dial info for the entry already, // ensure we have some dial info for the entry already,
// and that the node is still alive // and that the node is still alive
// if not, we should keep looking for better info // if not, we should keep looking for better info
if nr.state(get_aligned_timestamp()).is_alive() && if nr.state(Timestamp::now()).is_alive() &&
nr.has_any_dial_info() { nr.has_any_dial_info() {
return Some(nr); return Some(nr);
} }
@ -560,7 +560,7 @@ impl RPCProcessor {
// ensure we have some dial info for the entry already, // ensure we have some dial info for the entry already,
// and that the node is still alive // and that the node is still alive
// if not, we should do the find_node anyway // if not, we should do the find_node anyway
if nr.state(get_aligned_timestamp()).is_alive() && if nr.state(Timestamp::now()).is_alive() &&
nr.has_any_dial_info() { nr.has_any_dial_info() {
return Ok(Some(nr)); return Ok(Some(nr));
} }
@ -627,7 +627,7 @@ impl RPCProcessor {
} }
Ok(TimeoutOr::Value((rpcreader, _))) => { Ok(TimeoutOr::Value((rpcreader, _))) => {
// Reply received // Reply received
let recv_ts = get_aligned_timestamp(); let recv_ts = Timestamp::now();
// Record answer received // Record answer received
self.record_answer_received( self.record_answer_received(
@ -1208,7 +1208,7 @@ impl RPCProcessor {
// Send question // Send question
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp(); let send_ts = Timestamp::now();
#[allow(unused_variables)] #[allow(unused_variables)]
let message_len = message.len(); let message_len = message.len();
let res = self let res = self
@ -1298,7 +1298,7 @@ impl RPCProcessor {
// Send statement // Send statement
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp(); let send_ts = Timestamp::now();
#[allow(unused_variables)] #[allow(unused_variables)]
let message_len = message.len(); let message_len = message.len();
let res = self let res = self
@ -1374,7 +1374,7 @@ impl RPCProcessor {
// Send the reply // Send the reply
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp(); let send_ts = Timestamp::now();
#[allow(unused_variables)] #[allow(unused_variables)]
let message_len = message.len(); let message_len = message.len();
let res = self let res = self
@ -1719,7 +1719,7 @@ impl RPCProcessor {
flow, flow,
routing_domain, routing_domain,
}), }),
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
body_len: ByteCount::new(body.len() as u64), body_len: ByteCount::new(body.len() as u64),
}; };
@ -1755,7 +1755,7 @@ impl RPCProcessor {
remote_safety_route, remote_safety_route,
sequencing, sequencing,
}), }),
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
body_len: (body.len() as u64).into(), body_len: (body.len() as u64).into(),
}; };
@ -1792,7 +1792,7 @@ impl RPCProcessor {
private_route, private_route,
safety_spec, safety_spec,
}), }),
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
body_len: (body.len() as u64).into(), body_len: (body.len() as u64).into(),
}; };

View File

@ -83,7 +83,7 @@ where
let (result_sender, result_receiver) = flume::bounded(1); let (result_sender, result_receiver) = flume::bounded(1);
let waiting_op = OperationWaitingOp { let waiting_op = OperationWaitingOp {
context, context,
timestamp: get_aligned_timestamp(), timestamp: Timestamp::now(),
result_sender, result_sender,
}; };
if inner.waiting_op_table.insert(op_id, waiting_op).is_some() { if inner.waiting_op_table.insert(op_id, waiting_op).is_some() {
@ -166,7 +166,7 @@ where
let result_fut = result_receiver.recv_async().in_current_span(); let result_fut = result_receiver.recv_async().in_current_span();
// wait for eventualvalue // wait for eventualvalue
let start_ts = get_aligned_timestamp(); let start_ts = Timestamp::now();
let res = timeout(timeout_ms, result_fut).await.into_timeout_or(); let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
match res { match res {
@ -175,7 +175,7 @@ where
Ok(TimeoutOr::Timeout) Ok(TimeoutOr::Timeout)
} }
TimeoutOr::Value(Ok((_span_id, ret))) => { TimeoutOr::Value(Ok((_span_id, ret))) => {
let end_ts = get_aligned_timestamp(); let end_ts = Timestamp::now();
//xxx: causes crash (Missing otel data span extensions) //xxx: causes crash (Missing otel data span extensions)
// Span::current().follows_from(span_id); // Span::current().follows_from(span_id);

View File

@ -38,7 +38,7 @@ impl RPCProcessor {
// Because this exits before calling 'question()', // Because this exits before calling 'question()',
// a failure to find a routing domain constitutes a send failure // a failure to find a routing domain constitutes a send failure
// Record the send failure on both the node and its relay // Record the send failure on both the node and its relay
let send_ts = get_aligned_timestamp(); let send_ts = Timestamp::now();
if let Some(node) = &opt_node { if let Some(node) = &opt_node {
self.record_send_failure(RPCKind::Question, send_ts, node.clone(), None, None); self.record_send_failure(RPCKind::Question, send_ts, node.clone(), None, None);
} }

View File

@ -22,7 +22,9 @@ impl RPCProcessor {
.ok_or(RPCError::try_again("not started up"))?; .ok_or(RPCError::try_again("not started up"))?;
let network_manager = self.network_manager(); let network_manager = self.network_manager();
let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms); let receipt_time = TimestampDuration::new_ms(
self.unlocked_inner.validate_dial_info_receipt_time_ms as u64,
);
// Generate receipt and waitable eventual so we can see if we get the receipt back // Generate receipt and waitable eventual so we can see if we get the receipt back
let (receipt, eventual_value) = network_manager let (receipt, eventual_value) = network_manager

View File

@ -464,7 +464,7 @@ where
out = Some(f(record)); out = Some(f(record));
// Touch // Touch
record.touch(get_aligned_timestamp()); record.touch(Timestamp::now());
} }
if out.is_some() { if out.is_some() {
// Marks as changed because the record was touched and we want to keep the // Marks as changed because the record was touched and we want to keep the
@ -503,7 +503,7 @@ where
out = Some(f(record)); out = Some(f(record));
// Touch // Touch
record.touch(get_aligned_timestamp()); record.touch(Timestamp::now());
} }
if out.is_some() { if out.is_some() {
// Marks as changed because the record was touched and we want to keep the // Marks as changed because the record was touched and we want to keep the
@ -1133,7 +1133,7 @@ where
/// See if any watched records have expired and clear them out /// See if any watched records have expired and clear them out
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub fn check_watched_records(&mut self) { pub fn check_watched_records(&mut self) {
let now = get_aligned_timestamp(); let now = Timestamp::now();
self.watched_records.retain(|key, watch_list| { self.watched_records.retain(|key, watch_list| {
watch_list.watches.retain(|w| { watch_list.watches.retain(|w| {
w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty() w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty()

View File

@ -256,7 +256,7 @@ impl StorageManagerInner {
)?); )?);
// Add new local value record // Add new local value record
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let local_record_detail = LocalRecordDetail::new(safety_selection); let local_record_detail = LocalRecordDetail::new(safety_selection);
let record = let record =
Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?; Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?;
@ -293,7 +293,7 @@ impl StorageManagerInner {
}; };
// Make local record // Make local record
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let local_record = Record::new( let local_record = Record::new(
cur_ts, cur_ts,
remote_record.descriptor().clone(), remote_record.descriptor().clone(),
@ -435,7 +435,7 @@ impl StorageManagerInner {
// Make and store a new record for this descriptor // Make and store a new record for this descriptor
let record = Record::<LocalRecordDetail>::new( let record = Record::<LocalRecordDetail>::new(
get_aligned_timestamp(), Timestamp::now(),
signed_value_descriptor, signed_value_descriptor,
LocalRecordDetail::new(safety_selection), LocalRecordDetail::new(safety_selection),
)?; )?;
@ -500,7 +500,7 @@ impl StorageManagerInner {
// Get local record store // Get local record store
let local_record_store = self.local_record_store.as_mut().unwrap(); let local_record_store = self.local_record_store.as_mut().unwrap();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
local_record_store.with_record_mut(key, |r| { local_record_store.with_record_mut(key, |r| {
let d = r.detail_mut(); let d = r.detail_mut();
@ -655,7 +655,7 @@ impl StorageManagerInner {
// See if we have a remote record already or not // See if we have a remote record already or not
if remote_record_store.with_record(key, |_| {}).is_none() { if remote_record_store.with_record(key, |_| {}).is_none() {
// record didn't exist, make it // record didn't exist, make it
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
let remote_record_detail = RemoteRecordDetail {}; let remote_record_detail = RemoteRecordDetail {};
let record = Record::<RemoteRecordDetail>::new( let record = Record::<RemoteRecordDetail>::new(
cur_ts, cur_ts,

View File

@ -18,7 +18,7 @@ impl StorageManager {
let opt_update_callback = inner.update_callback.clone(); let opt_update_callback = inner.update_callback.clone();
let cur_ts = get_aligned_timestamp(); let cur_ts = Timestamp::now();
for (k, v) in inner.opened_records.iter_mut() { for (k, v) in inner.opened_records.iter_mut() {
// If no active watch, then skip this // If no active watch, then skip this
let Some(active_watch) = v.active_watch() else { let Some(active_watch) = v.active_watch() else {

View File

@ -38,11 +38,11 @@ pub struct IOStatsInfo {
/// Number of write operations. /// Number of write operations.
pub writes: AlignedU64, pub writes: AlignedU64,
/// Number of bytes read /// Number of bytes read
pub bytes_read: AlignedU64, pub bytes_read: ByteCount,
/// Number of bytes read from cache /// Number of bytes read from cache
pub cache_read_bytes: AlignedU64, pub cache_read_bytes: ByteCount,
/// Number of bytes write /// Number of bytes write
pub bytes_written: AlignedU64, pub bytes_written: ByteCount,
/// Start of the statistic period. /// Start of the statistic period.
pub started: Timestamp, pub started: Timestamp,
/// Total duration of the statistic period. /// Total duration of the statistic period.
@ -672,9 +672,9 @@ impl TableStore {
reads: AlignedU64::new(io_stats_since_previous.reads), reads: AlignedU64::new(io_stats_since_previous.reads),
cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads), cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads),
writes: AlignedU64::new(io_stats_since_previous.writes), writes: AlignedU64::new(io_stats_since_previous.writes),
bytes_read: AlignedU64::new(io_stats_since_previous.bytes_read), bytes_read: ByteCount::new(io_stats_since_previous.bytes_read),
cache_read_bytes: AlignedU64::new(io_stats_since_previous.cache_read_bytes), cache_read_bytes: ByteCount::new(io_stats_since_previous.cache_read_bytes),
bytes_written: AlignedU64::new(io_stats_since_previous.bytes_written), bytes_written: ByteCount::new(io_stats_since_previous.bytes_written),
started: Timestamp::new( started: Timestamp::new(
io_stats_since_previous io_stats_since_previous
.started .started
@ -689,9 +689,9 @@ impl TableStore {
reads: AlignedU64::new(io_stats_overall.reads), reads: AlignedU64::new(io_stats_overall.reads),
cache_reads: AlignedU64::new(io_stats_overall.cache_reads), cache_reads: AlignedU64::new(io_stats_overall.cache_reads),
writes: AlignedU64::new(io_stats_overall.writes), writes: AlignedU64::new(io_stats_overall.writes),
bytes_read: AlignedU64::new(io_stats_overall.bytes_read), bytes_read: ByteCount::new(io_stats_overall.bytes_read),
cache_read_bytes: AlignedU64::new(io_stats_overall.cache_read_bytes), cache_read_bytes: ByteCount::new(io_stats_overall.cache_read_bytes),
bytes_written: AlignedU64::new(io_stats_overall.bytes_written), bytes_written: ByteCount::new(io_stats_overall.bytes_written),
started: Timestamp::new( started: Timestamp::new(
io_stats_overall io_stats_overall
.started .started

View File

@ -816,7 +816,7 @@ impl JsonRequestProcessor {
result: to_json_api_result_with_string(Crypto::generate_keypair(kind)), result: to_json_api_result_with_string(Crypto::generate_keypair(kind)),
}, },
RequestOp::Now => ResponseOp::Now { RequestOp::Now => ResponseOp::Now {
value: get_aligned_timestamp(), value: Timestamp::now(),
}, },
RequestOp::Debug { command } => ResponseOp::Debug { RequestOp::Debug { command } => ResponseOp::Debug {
result: to_json_api_result(self.api.debug(command).await), result: to_json_api_result(self.api.debug(command).await),

View File

@ -4,18 +4,18 @@ use crate::*;
pub fn fix_latencystats() -> LatencyStats { pub fn fix_latencystats() -> LatencyStats {
LatencyStats { LatencyStats {
fastest: AlignedU64::from(1234), fastest: TimestampDuration::from(1234),
average: AlignedU64::from(2345), average: TimestampDuration::from(2345),
slowest: AlignedU64::from(3456), slowest: TimestampDuration::from(3456),
} }
} }
pub fn fix_transferstats() -> TransferStats { pub fn fix_transferstats() -> TransferStats {
TransferStats { TransferStats {
total: AlignedU64::from(1_000_000), total: ByteCount::from(1_000_000),
maximum: AlignedU64::from(3456), maximum: ByteCount::from(3456),
average: AlignedU64::from(2345), average: ByteCount::from(2345),
minimum: AlignedU64::from(1234), minimum: ByteCount::from(1234),
} }
} }
@ -31,9 +31,9 @@ pub fn fix_rpcstats() -> RPCStats {
messages_sent: 1_000_000, messages_sent: 1_000_000,
messages_rcvd: 2_000_000, messages_rcvd: 2_000_000,
questions_in_flight: 42, questions_in_flight: 42,
last_question_ts: Some(AlignedU64::from(1685569084280)), last_question_ts: Some(Timestamp::from(1685569084280)),
last_seen_ts: Some(AlignedU64::from(1685569101256)), last_seen_ts: Some(Timestamp::from(1685569101256)),
first_consecutive_seen_ts: Some(AlignedU64::from(1685569111851)), first_consecutive_seen_ts: Some(Timestamp::from(1685569111851)),
recent_lost_answers: 5, recent_lost_answers: 5,
failed_to_send: 3, failed_to_send: 3,
} }
@ -41,7 +41,7 @@ pub fn fix_rpcstats() -> RPCStats {
pub fn fix_peerstats() -> PeerStats { pub fn fix_peerstats() -> PeerStats {
PeerStats { PeerStats {
time_added: AlignedU64::from(1685569176894), time_added: Timestamp::from(1685569176894),
rpc_stats: fix_rpcstats(), rpc_stats: fix_rpcstats(),
latency: Some(fix_latencystats()), latency: Some(fix_latencystats()),
transfer: fix_transferstatsdownup(), transfer: fix_transferstatsdownup(),

View File

@ -28,7 +28,7 @@ pub async fn test_veilidappcall() {
Some(fix_typedkey()), Some(fix_typedkey()),
Some(fix_cryptokey()), Some(fix_cryptokey()),
b"Well, hello!".to_vec(), b"Well, hello!".to_vec(),
AlignedU64::from(123), OperationId::from(123),
); );
let copy = deserialize_json(&serialize_json(&orig)).unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap();
@ -229,8 +229,8 @@ pub async fn test_peertabledata() {
pub async fn test_veilidstatenetwork() { pub async fn test_veilidstatenetwork() {
let orig = VeilidStateNetwork { let orig = VeilidStateNetwork {
started: true, started: true,
bps_down: AlignedU64::from(14_400), bps_down: ByteCount::from(14_400),
bps_up: AlignedU64::from(1200), bps_up: ByteCount::from(1200),
peers: vec![fix_peertabledata()], peers: vec![fix_peertabledata()],
}; };
let copy = deserialize_json(&serialize_json(&orig)).unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap();
@ -280,8 +280,8 @@ pub async fn test_veilidstate() {
}), }),
network: Box::new(VeilidStateNetwork { network: Box::new(VeilidStateNetwork {
started: true, started: true,
bps_down: AlignedU64::from(14_400), bps_down: ByteCount::from(14_400),
bps_up: AlignedU64::from(1200), bps_up: ByteCount::from(1200),
peers: vec![fix_peertabledata()], peers: vec![fix_peertabledata()],
}), }),
config: Box::new(VeilidStateConfig { config: Box::new(VeilidStateConfig {

View File

@ -1,135 +1,163 @@
use super::*; use super::*;
/// Aligned u64. /// Aligned u64 type generator
///
/// Required on 32-bit platforms for serialization because Rust aligns u64 on 4 byte boundaries. /// Required on 32-bit platforms for serialization because Rust aligns u64 on 4 byte boundaries.
/// Some zero-copy serialization frameworks also want 8-byte alignment. /// Some zero-copy serialization frameworks also want 8-byte alignment.
/// Supports serializing to string for JSON as well, since JSON can't handle 64-bit numbers to Javascript. /// Supports serializing to string for JSON as well, since JSON can't handle 64-bit numbers to Javascript.
#[derive( macro_rules! aligned_u64_type {
Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, Hash, Serialize, Deserialize, JsonSchema, ($name:ident) => {
)] #[derive(
#[cfg_attr(target_arch = "wasm32", derive(Tsify))] Clone,
#[repr(C, align(8))] Default,
#[serde(transparent)] PartialEq,
pub struct AlignedU64( Eq,
#[serde(with = "as_human_string")] PartialOrd,
#[schemars(with = "String")] Ord,
#[cfg_attr(target_arch = "wasm32", tsify(type = "string"))] Copy,
u64, Hash,
); Serialize,
Deserialize,
JsonSchema,
)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
#[repr(C, align(8))]
#[serde(transparent)]
pub struct $name(
#[serde(with = "as_human_string")]
#[schemars(with = "String")]
#[cfg_attr(target_arch = "wasm32", tsify(type = "string"))]
u64,
);
impl From<u64> for AlignedU64 { impl From<u64> for $name {
fn from(v: u64) -> Self { fn from(v: u64) -> Self {
AlignedU64(v) $name(v)
} }
} }
impl From<AlignedU64> for u64 { impl From<$name> for u64 {
fn from(v: AlignedU64) -> Self { fn from(v: $name) -> Self {
v.0 v.0
} }
}
impl $name {
pub const fn new(v: u64) -> Self {
Self(v)
}
pub fn as_u64(self) -> u64 {
self.0
}
}
};
} }
impl fmt::Display for AlignedU64 { macro_rules! aligned_u64_type_default_math_impl {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { ($name:ident) => {
(&self.0 as &dyn fmt::Display).fmt(f) impl<Rhs: Into<u64>> core::ops::Add<Rhs> for $name {
} type Output = Self;
fn add(self, rhs: Rhs) -> Self {
Self(self.0 + rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::AddAssign<Rhs> for $name {
fn add_assign(&mut self, rhs: Rhs) {
self.0 += rhs.into();
}
}
impl<Rhs: Into<u64>> core::ops::Sub<Rhs> for $name {
type Output = Self;
fn sub(self, rhs: Rhs) -> Self {
Self(self.0 - rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::SubAssign<Rhs> for $name {
fn sub_assign(&mut self, rhs: Rhs) {
self.0 -= rhs.into();
}
}
impl<Rhs: Into<u64>> core::ops::Mul<Rhs> for $name {
type Output = Self;
fn mul(self, rhs: Rhs) -> Self {
Self(self.0 * rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::MulAssign<Rhs> for $name {
fn mul_assign(&mut self, rhs: Rhs) {
self.0 *= rhs.into();
}
}
impl<Rhs: Into<u64>> core::ops::Div<Rhs> for $name {
type Output = Self;
fn div(self, rhs: Rhs) -> Self {
Self(self.0 / rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::DivAssign<Rhs> for $name {
fn div_assign(&mut self, rhs: Rhs) {
self.0 /= rhs.into();
}
}
impl $name {
pub fn saturating_sub(self, rhs: Self) -> Self {
Self(self.0.saturating_sub(rhs.0))
}
}
};
} }
impl fmt::Debug for AlignedU64 { macro_rules! aligned_u64_type_default_display_impl {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { ($name:ident) => {
(&self.0 as &dyn fmt::Debug).fmt(f) impl fmt::Display for $name {
} fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(&self.0 as &dyn fmt::Display).fmt(f)
}
}
impl FromStr for $name {
type Err = <u64 as FromStr>::Err;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok($name(u64::from_str(s)?))
}
}
};
} }
impl FromStr for AlignedU64 { macro_rules! aligned_u64_type_default_debug_impl {
type Err = <u64 as FromStr>::Err; ($name:ident) => {
fn from_str(s: &str) -> Result<Self, Self::Err> { impl fmt::Debug for $name {
Ok(AlignedU64(u64::from_str(s)?)) fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
} (&self.0 as &dyn fmt::Debug).fmt(f)
} }
}
impl<Rhs: Into<u64>> core::ops::Add<Rhs> for AlignedU64 { };
type Output = Self;
fn add(self, rhs: Rhs) -> Self {
Self(self.0 + rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::AddAssign<Rhs> for AlignedU64 {
fn add_assign(&mut self, rhs: Rhs) {
self.0 += rhs.into();
}
}
impl<Rhs: Into<u64>> core::ops::Sub<Rhs> for AlignedU64 {
type Output = Self;
fn sub(self, rhs: Rhs) -> Self {
Self(self.0 - rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::SubAssign<Rhs> for AlignedU64 {
fn sub_assign(&mut self, rhs: Rhs) {
self.0 -= rhs.into();
}
}
impl<Rhs: Into<u64>> core::ops::Mul<Rhs> for AlignedU64 {
type Output = Self;
fn mul(self, rhs: Rhs) -> Self {
Self(self.0 * rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::MulAssign<Rhs> for AlignedU64 {
fn mul_assign(&mut self, rhs: Rhs) {
self.0 *= rhs.into();
}
}
impl<Rhs: Into<u64>> core::ops::Div<Rhs> for AlignedU64 {
type Output = Self;
fn div(self, rhs: Rhs) -> Self {
Self(self.0 / rhs.into())
}
}
impl<Rhs: Into<u64>> core::ops::DivAssign<Rhs> for AlignedU64 {
fn div_assign(&mut self, rhs: Rhs) {
self.0 /= rhs.into();
}
}
impl AlignedU64 {
pub const fn new(v: u64) -> Self {
Self(v)
}
pub fn as_u64(self) -> u64 {
self.0
}
pub fn saturating_sub(self, rhs: Self) -> Self {
Self(self.0.saturating_sub(rhs.0))
}
} }
///////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////
/// Microseconds since epoch aligned_u64_type!(OperationId);
#[cfg_attr(target_arch = "wasm32", declare)] aligned_u64_type_default_display_impl!(OperationId);
pub type Timestamp = AlignedU64; aligned_u64_type_default_debug_impl!(OperationId);
pub fn get_aligned_timestamp() -> Timestamp {
get_timestamp().into() aligned_u64_type!(ByteCount);
} aligned_u64_type_default_display_impl!(ByteCount);
/// Microseconds duration aligned_u64_type_default_debug_impl!(ByteCount);
#[cfg_attr(target_arch = "wasm32", declare)] aligned_u64_type_default_math_impl!(ByteCount);
pub type TimestampDuration = AlignedU64;
/// Request/Response matching id aligned_u64_type!(AlignedU64);
#[cfg_attr(target_arch = "wasm32", declare)] aligned_u64_type_default_display_impl!(AlignedU64);
pub type OperationId = AlignedU64; aligned_u64_type_default_debug_impl!(AlignedU64);
/// Number of bytes aligned_u64_type_default_math_impl!(AlignedU64);
#[cfg_attr(target_arch = "wasm32", declare)]
pub type ByteCount = AlignedU64;

View File

@ -1,9 +1,12 @@
#[macro_use]
mod aligned_u64; mod aligned_u64;
mod app_message_call; mod app_message_call;
mod dht; mod dht;
mod fourcc; mod fourcc;
mod safety; mod safety;
mod stats; mod stats;
mod timestamp;
mod timestamp_duration;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
mod tunnel; mod tunnel;
mod veilid_log; mod veilid_log;
@ -17,6 +20,8 @@ pub use dht::*;
pub use fourcc::*; pub use fourcc::*;
pub use safety::*; pub use safety::*;
pub use stats::*; pub use stats::*;
pub use timestamp::*;
pub use timestamp_duration::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
pub use tunnel::*; pub use tunnel::*;
pub use veilid_log::*; pub use veilid_log::*;

View File

@ -1969,7 +1969,7 @@ pub extern "C" fn crypto_crypt_no_auth(
#[no_mangle] #[no_mangle]
#[instrument(level = "trace", target = "ffi", skip_all)] #[instrument(level = "trace", target = "ffi", skip_all)]
pub extern "C" fn now() -> u64 { pub extern "C" fn now() -> u64 {
veilid_core::get_aligned_timestamp().as_u64() veilid_core::Timestamp::now().as_u64()
} }
#[no_mangle] #[no_mangle]

View File

@ -1517,7 +1517,7 @@ pub fn crypto_crypt_no_auth(
#[wasm_bindgen()] #[wasm_bindgen()]
pub fn now() -> String { pub fn now() -> String {
veilid_core::get_aligned_timestamp().as_u64().to_string() veilid_core::Timestamp::now().as_u64().to_string()
} }
#[wasm_bindgen()] #[wasm_bindgen()]

View File

@ -178,7 +178,7 @@ impl VeilidClient {
/// Get the current timestamp, in string format /// Get the current timestamp, in string format
pub fn now() -> String { pub fn now() -> String {
veilid_core::get_aligned_timestamp().as_u64().to_string() veilid_core::Timestamp::now().as_u64().to_string()
} }
/// Execute an 'internal debug command'. /// Execute an 'internal debug command'.