mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
Merge branch 'newtype-timestamp' into 'main'
Make newtypes for AlignedU64 types See merge request veilid/veilid!305
This commit is contained in:
commit
a1dcd28e36
@ -188,7 +188,7 @@ impl AttachmentManager {
|
||||
|
||||
// Set timestamps
|
||||
if state == AttachmentState::Attaching {
|
||||
inner.attach_ts = Some(get_aligned_timestamp());
|
||||
inner.attach_ts = Some(Timestamp::now());
|
||||
} else if state == AttachmentState::Detached {
|
||||
inner.attach_ts = None;
|
||||
} else if state == AttachmentState::Detaching {
|
||||
|
@ -133,7 +133,7 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
|
||||
span_ref
|
||||
.extensions_mut()
|
||||
.insert::<SpanDuration>(SpanDuration {
|
||||
start: get_aligned_timestamp(),
|
||||
start: Timestamp::now(),
|
||||
end: Timestamp::default(),
|
||||
});
|
||||
}
|
||||
@ -145,7 +145,7 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
|
||||
if let Some(inner) = &mut *self.inner.lock() {
|
||||
if let Some(span_ref) = ctx.span(&id) {
|
||||
if let Some(span_duration) = span_ref.extensions_mut().get_mut::<SpanDuration>() {
|
||||
span_duration.end = get_aligned_timestamp();
|
||||
span_duration.end = Timestamp::now();
|
||||
let duration = span_duration.end.saturating_sub(span_duration.start);
|
||||
let meta = span_ref.metadata();
|
||||
self.emit_log(
|
||||
|
@ -255,7 +255,7 @@ impl AddressFilter {
|
||||
}
|
||||
|
||||
pub fn set_dial_info_failed(&self, dial_info: DialInfo) {
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES {
|
||||
@ -280,7 +280,7 @@ impl AddressFilter {
|
||||
|
||||
pub fn punish_ip_addr(&self, addr: IpAddr, reason: PunishmentReason) {
|
||||
log_net!(debug ">>> PUNISHED: {} for {:?}", addr, reason);
|
||||
let timestamp = get_aligned_timestamp();
|
||||
let timestamp = Timestamp::now();
|
||||
let punishment = Punishment { reason, timestamp };
|
||||
|
||||
let ipblock = ip_to_ipblock(
|
||||
@ -321,7 +321,7 @@ impl AddressFilter {
|
||||
nr.operate_mut(|_rti, e| e.set_punished(Some(reason)));
|
||||
}
|
||||
|
||||
let timestamp = get_aligned_timestamp();
|
||||
let timestamp = Timestamp::now();
|
||||
let punishment = Punishment { reason, timestamp };
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
@ -363,7 +363,7 @@ impl AddressFilter {
|
||||
return Err(AddressFilterError::Punished);
|
||||
}
|
||||
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
self.purge_old_timestamps(inner, ts);
|
||||
|
||||
match ipblock {
|
||||
@ -423,7 +423,7 @@ impl AddressFilter {
|
||||
addr,
|
||||
);
|
||||
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
self.purge_old_timestamps(&mut inner, ts);
|
||||
|
||||
match ipblock {
|
||||
|
@ -232,7 +232,7 @@ impl ConnectionManager {
|
||||
|
||||
// See if this should be a protected connection
|
||||
if let Some(protect_nr) = self.should_protect_connection(&conn) {
|
||||
log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(get_aligned_timestamp()), protect_nr);
|
||||
log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(Timestamp::now()), protect_nr);
|
||||
conn.protect(protect_nr);
|
||||
}
|
||||
|
||||
@ -244,7 +244,7 @@ impl ConnectionManager {
|
||||
Ok(Some(conn)) => {
|
||||
// Connection added and a different one LRU'd out
|
||||
// Send it to be terminated
|
||||
log_net!(debug "== LRU kill connection due to limit: {:?}", conn.debug_print(get_aligned_timestamp()));
|
||||
log_net!(debug "== LRU kill connection due to limit: {:?}", conn.debug_print(Timestamp::now()));
|
||||
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
|
||||
}
|
||||
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
|
||||
@ -259,7 +259,7 @@ impl ConnectionManager {
|
||||
Err(ConnectionTableAddError::AlreadyExists(conn)) => {
|
||||
// Connection already exists
|
||||
let desc = conn.flow();
|
||||
log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp()));
|
||||
log_net!(debug "== Connection already exists: {:?}", conn.debug_print(Timestamp::now()));
|
||||
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
|
||||
return Ok(NetworkResult::no_connection_other(format!(
|
||||
"connection already exists: {:?}",
|
||||
@ -269,7 +269,7 @@ impl ConnectionManager {
|
||||
Err(ConnectionTableAddError::TableFull(conn)) => {
|
||||
// Connection table is full
|
||||
let desc = conn.flow();
|
||||
log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp()));
|
||||
log_net!(debug "== Connection table full: {:?}", conn.debug_print(Timestamp::now()));
|
||||
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
|
||||
return Ok(NetworkResult::no_connection_other(format!(
|
||||
"connection table is full: {:?}",
|
||||
|
@ -388,7 +388,7 @@ impl ConnectionTable {
|
||||
pub fn debug_print_table(&self) -> String {
|
||||
let mut out = String::new();
|
||||
let inner = self.inner.lock();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
for t in 0..inner.conn_by_id.len() {
|
||||
out += &format!(
|
||||
" {} Connections: ({}/{})\n",
|
||||
|
@ -529,11 +529,11 @@ impl NetworkManager {
|
||||
let mut inner = self.inner.lock();
|
||||
match inner.client_allowlist.entry(client) {
|
||||
hashlink::lru_cache::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().last_seen_ts = get_aligned_timestamp()
|
||||
entry.get_mut().last_seen_ts = Timestamp::now()
|
||||
}
|
||||
hashlink::lru_cache::Entry::Vacant(entry) => {
|
||||
entry.insert(ClientAllowlistEntry {
|
||||
last_seen_ts: get_aligned_timestamp(),
|
||||
last_seen_ts: Timestamp::now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -545,7 +545,7 @@ impl NetworkManager {
|
||||
|
||||
match inner.client_allowlist.entry(client) {
|
||||
hashlink::lru_cache::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().last_seen_ts = get_aligned_timestamp();
|
||||
entry.get_mut().last_seen_ts = Timestamp::now();
|
||||
true
|
||||
}
|
||||
hashlink::lru_cache::Entry::Vacant(_) => false,
|
||||
@ -556,7 +556,7 @@ impl NetworkManager {
|
||||
let timeout_ms = self.with_config(|c| c.network.client_allowlist_timeout_ms);
|
||||
let mut inner = self.inner.lock();
|
||||
let cutoff_timestamp =
|
||||
get_aligned_timestamp() - TimestampDuration::new((timeout_ms as u64) * 1000u64);
|
||||
Timestamp::now() - TimestampDuration::new((timeout_ms as u64) * 1000u64);
|
||||
// Remove clients from the allowlist that haven't been since since our allowlist timeout
|
||||
while inner
|
||||
.client_allowlist
|
||||
@ -587,7 +587,7 @@ impl NetworkManager {
|
||||
#[instrument(level = "trace", skip(self, extra_data, callback))]
|
||||
pub fn generate_receipt<D: AsRef<[u8]>>(
|
||||
&self,
|
||||
expiration_us: u64,
|
||||
expiration_us: TimestampDuration,
|
||||
expected_returns: u32,
|
||||
extra_data: D,
|
||||
callback: impl ReceiptCallback,
|
||||
@ -617,7 +617,7 @@ impl NetworkManager {
|
||||
.wrap_err("failed to generate signed receipt")?;
|
||||
|
||||
// Record the receipt for later
|
||||
let exp_ts = get_aligned_timestamp() + expiration_us;
|
||||
let exp_ts = Timestamp::now() + expiration_us;
|
||||
receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback);
|
||||
|
||||
Ok(out)
|
||||
@ -627,7 +627,7 @@ impl NetworkManager {
|
||||
#[instrument(level = "trace", skip(self, extra_data))]
|
||||
pub fn generate_single_shot_receipt<D: AsRef<[u8]>>(
|
||||
&self,
|
||||
expiration_us: u64,
|
||||
expiration_us: TimestampDuration,
|
||||
extra_data: D,
|
||||
) -> EyreResult<(Vec<u8>, EventualValueFuture<ReceiptEvent>)> {
|
||||
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
|
||||
@ -656,7 +656,7 @@ impl NetworkManager {
|
||||
.wrap_err("failed to generate signed receipt")?;
|
||||
|
||||
// Record the receipt for later
|
||||
let exp_ts = get_aligned_timestamp() + expiration_us;
|
||||
let exp_ts = Timestamp::now() + expiration_us;
|
||||
let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled));
|
||||
let instance = eventual.instance();
|
||||
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
|
||||
@ -855,7 +855,7 @@ impl NetworkManager {
|
||||
// XXX: do we need a delay here? or another hole punch packet?
|
||||
|
||||
// Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch
|
||||
peer_nr.set_last_flow(unique_flow.flow, get_aligned_timestamp());
|
||||
peer_nr.set_last_flow(unique_flow.flow, Timestamp::now());
|
||||
|
||||
// Return the receipt using the same dial info send the receipt to it
|
||||
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
|
||||
@ -883,7 +883,7 @@ impl NetworkManager {
|
||||
let node_id_secret = routing_table.node_id_secret_key(vcrypto.kind());
|
||||
|
||||
// Get timestamp, nonce
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
let nonce = vcrypto.random_nonce();
|
||||
|
||||
// Encode envelope
|
||||
@ -1064,7 +1064,7 @@ impl NetworkManager {
|
||||
});
|
||||
|
||||
// Validate timestamp isn't too old
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
let ets = envelope.get_timestamp();
|
||||
if let Some(tsbehind) = tsbehind {
|
||||
if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) {
|
||||
|
@ -291,7 +291,7 @@ impl IGDManager {
|
||||
};
|
||||
|
||||
// Add to mapping list to keep alive
|
||||
let timestamp = get_aligned_timestamp();
|
||||
let timestamp = Timestamp::now();
|
||||
inner.port_maps.insert(PortMapKey {
|
||||
llpt,
|
||||
at,
|
||||
@ -318,7 +318,7 @@ impl IGDManager {
|
||||
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
|
||||
{
|
||||
let inner = self.inner.lock();
|
||||
let now = get_aligned_timestamp();
|
||||
let now = Timestamp::now();
|
||||
|
||||
for (k, v) in &inner.port_maps {
|
||||
let mapping_lifetime = now.saturating_sub(v.timestamp);
|
||||
@ -373,7 +373,7 @@ impl IGDManager {
|
||||
inner.port_maps.insert(k, PortMapValue {
|
||||
ext_ip: v.ext_ip,
|
||||
mapped_port,
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
renewal_lifetime: TimestampDuration::new((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64),
|
||||
renewal_attempts: 0,
|
||||
});
|
||||
@ -414,7 +414,7 @@ impl IGDManager {
|
||||
inner.port_maps.insert(k, PortMapValue {
|
||||
ext_ip: v.ext_ip,
|
||||
mapped_port: v.mapped_port,
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(),
|
||||
renewal_attempts: 0,
|
||||
});
|
||||
|
@ -114,7 +114,7 @@ impl NetworkConnection {
|
||||
connection_id: id,
|
||||
flow,
|
||||
processor: None,
|
||||
established_time: get_aligned_timestamp(),
|
||||
established_time: Timestamp::now(),
|
||||
stats: Arc::new(Mutex::new(NetworkConnectionStats {
|
||||
last_message_sent_time: None,
|
||||
last_message_recv_time: None,
|
||||
@ -165,7 +165,7 @@ impl NetworkConnection {
|
||||
connection_id,
|
||||
flow,
|
||||
processor: Some(processor),
|
||||
established_time: get_aligned_timestamp(),
|
||||
established_time: Timestamp::now(),
|
||||
stats,
|
||||
sender,
|
||||
stop_source: Some(stop_source),
|
||||
@ -227,7 +227,7 @@ impl NetworkConnection {
|
||||
stats: Arc<Mutex<NetworkConnectionStats>>,
|
||||
message: Vec<u8>,
|
||||
) -> io::Result<NetworkResult<()>> {
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
network_result_try!(protocol_connection.send(message).await?);
|
||||
|
||||
let mut stats = stats.lock();
|
||||
@ -241,7 +241,7 @@ impl NetworkConnection {
|
||||
protocol_connection: &ProtocolNetworkConnection,
|
||||
stats: Arc<Mutex<NetworkConnectionStats>>,
|
||||
) -> io::Result<NetworkResult<Vec<u8>>> {
|
||||
let ts = get_aligned_timestamp();
|
||||
let ts = Timestamp::now();
|
||||
let out = network_result_try!(protocol_connection.recv().await?);
|
||||
|
||||
let mut stats = stats.lock();
|
||||
|
@ -304,7 +304,7 @@ impl ReceiptManager {
|
||||
};
|
||||
(inner.next_oldest_ts, inner.timeout_task.clone(), stop_token)
|
||||
};
|
||||
let now = get_aligned_timestamp();
|
||||
let now = Timestamp::now();
|
||||
// If we have at least one timestamp to expire, lets do it
|
||||
if let Some(next_oldest_ts) = next_oldest_ts {
|
||||
if now >= next_oldest_ts {
|
||||
|
@ -28,7 +28,7 @@ impl NetworkManager {
|
||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||
// Update timestamp for this last flow since we just sent to it
|
||||
destination_node_ref
|
||||
.set_last_flow(unique_flow.flow, get_aligned_timestamp());
|
||||
.set_last_flow(unique_flow.flow, Timestamp::now());
|
||||
|
||||
return Ok(NetworkResult::value(SendDataMethod {
|
||||
opt_relayed_contact_method: None,
|
||||
@ -181,7 +181,7 @@ impl NetworkManager {
|
||||
};
|
||||
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
target_node_ref.set_last_flow(flow, get_aligned_timestamp());
|
||||
target_node_ref.set_last_flow(flow, Timestamp::now());
|
||||
|
||||
Ok(NetworkResult::value(SendDataMethod{
|
||||
contact_method: NodeContactMethod::Existing,
|
||||
@ -218,7 +218,7 @@ impl NetworkManager {
|
||||
};
|
||||
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
target_node_ref.set_last_flow(flow, get_aligned_timestamp());
|
||||
target_node_ref.set_last_flow(flow, Timestamp::now());
|
||||
|
||||
Ok(NetworkResult::value(SendDataMethod {
|
||||
contact_method: NodeContactMethod::Existing,
|
||||
@ -245,7 +245,7 @@ impl NetworkManager {
|
||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
target_node_ref
|
||||
.set_last_flow(flow, get_aligned_timestamp());
|
||||
.set_last_flow(flow, Timestamp::now());
|
||||
|
||||
return Ok(NetworkResult::value(SendDataMethod{
|
||||
contact_method: NodeContactMethod::Existing,
|
||||
@ -293,7 +293,7 @@ impl NetworkManager {
|
||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
target_node_ref
|
||||
.set_last_flow(flow, get_aligned_timestamp());
|
||||
.set_last_flow(flow, Timestamp::now());
|
||||
|
||||
return Ok(NetworkResult::value(SendDataMethod{
|
||||
contact_method: NodeContactMethod::Existing,
|
||||
@ -347,7 +347,7 @@ impl NetworkManager {
|
||||
{
|
||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
node_ref.set_last_flow(flow, get_aligned_timestamp());
|
||||
node_ref.set_last_flow(flow, Timestamp::now());
|
||||
|
||||
return Ok(NetworkResult::value(SendDataMethod{
|
||||
contact_method: NodeContactMethod::Existing,
|
||||
@ -370,7 +370,7 @@ impl NetworkManager {
|
||||
network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?);
|
||||
|
||||
// If we connected to this node directly, save off the last connection so we can use it again
|
||||
node_ref.set_last_flow(unique_flow.flow, get_aligned_timestamp());
|
||||
node_ref.set_last_flow(unique_flow.flow, Timestamp::now());
|
||||
|
||||
Ok(NetworkResult::value(SendDataMethod {
|
||||
contact_method: NodeContactMethod::Direct(dial_info),
|
||||
@ -569,12 +569,12 @@ impl NetworkManager {
|
||||
};
|
||||
|
||||
// Build a return receipt for the signal
|
||||
let receipt_timeout = ms_to_us(
|
||||
let receipt_timeout = TimestampDuration::new_ms(
|
||||
self.unlocked_inner
|
||||
.config
|
||||
.get()
|
||||
.network
|
||||
.reverse_connection_receipt_time_ms,
|
||||
.reverse_connection_receipt_time_ms as u64,
|
||||
);
|
||||
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
|
||||
|
||||
@ -680,12 +680,12 @@ impl NetworkManager {
|
||||
.unwrap_or_default());
|
||||
|
||||
// Build a return receipt for the signal
|
||||
let receipt_timeout = ms_to_us(
|
||||
let receipt_timeout = TimestampDuration::new_ms(
|
||||
self.unlocked_inner
|
||||
.config
|
||||
.get()
|
||||
.network
|
||||
.hole_punch_receipt_time_ms,
|
||||
.hole_punch_receipt_time_ms as u64,
|
||||
);
|
||||
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
|
||||
|
||||
|
@ -196,7 +196,7 @@ impl NetworkManager {
|
||||
// add them to our denylist (throttling) and go ahead and check for new
|
||||
// public dialinfo
|
||||
let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
|
||||
let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
|
||||
let exp_ts = Timestamp::now() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
|
||||
let pait = inner
|
||||
.public_address_inconsistencies_table
|
||||
.entry(addr_proto_type_key)
|
||||
@ -213,8 +213,8 @@ impl NetworkManager {
|
||||
.public_address_inconsistencies_table
|
||||
.entry(addr_proto_type_key)
|
||||
.or_default();
|
||||
let exp_ts = get_aligned_timestamp()
|
||||
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
|
||||
let exp_ts =
|
||||
Timestamp::now() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
|
||||
for i in inconsistencies {
|
||||
pait.insert(i, exp_ts);
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ impl Bucket {
|
||||
// Get the sorted list of entries by their kick order
|
||||
let mut sorted_entries: Vec<(PublicKey, Arc<BucketEntry>)> =
|
||||
self.entries.iter().map(|(k, v)| (*k, v.clone())).collect();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
sorted_entries.sort_by(|a, b| -> core::cmp::Ordering {
|
||||
if a.0 == b.0 {
|
||||
return core::cmp::Ordering::Equal;
|
||||
|
@ -341,7 +341,7 @@ impl BucketEntryInner {
|
||||
// No need to update the signednodeinfo though since the timestamp is the same
|
||||
// Let the node try to live again but don't mark it as seen yet
|
||||
self.updated_since_last_network_change = true;
|
||||
self.make_not_dead(get_aligned_timestamp());
|
||||
self.make_not_dead(Timestamp::now());
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -361,7 +361,7 @@ impl BucketEntryInner {
|
||||
*opt_current_sni = Some(Box::new(signed_node_info));
|
||||
self.set_envelope_support(envelope_support);
|
||||
self.updated_since_last_network_change = true;
|
||||
self.make_not_dead(get_aligned_timestamp());
|
||||
self.make_not_dead(Timestamp::now());
|
||||
|
||||
// If we're updating an entry's node info, purge all
|
||||
// but the last connection in our last connections list
|
||||
@ -576,7 +576,7 @@ impl BucketEntryInner {
|
||||
} else {
|
||||
// If this is not connection oriented, then we check our last seen time
|
||||
// to see if this mapping has expired (beyond our timeout)
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
(v.1 + TimestampDuration::new(CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts
|
||||
};
|
||||
|
||||
@ -809,8 +809,7 @@ impl BucketEntryInner {
|
||||
let first_consecutive_seen_ts =
|
||||
self.peer_stats.rpc_stats.first_consecutive_seen_ts.unwrap();
|
||||
let start_of_reliable_time = first_consecutive_seen_ts
|
||||
+ ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64
|
||||
* 1_000_000u64);
|
||||
+ TimestampDuration::new_secs(UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS);
|
||||
let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time);
|
||||
let reliable_last =
|
||||
latest_contact_time.saturating_sub(start_of_reliable_time);
|
||||
@ -946,7 +945,7 @@ impl BucketEntry {
|
||||
// First node id should always be one we support since TypedKeySets are sorted and we must have at least one supported key
|
||||
assert!(VALID_CRYPTO_KINDS.contains(&first_node_id.kind));
|
||||
|
||||
let now = get_aligned_timestamp();
|
||||
let now = Timestamp::now();
|
||||
let inner = BucketEntryInner {
|
||||
validated_node_ids: TypedKeyGroup::from(first_node_id),
|
||||
unsupported_node_ids: TypedKeyGroup::new(),
|
||||
|
@ -140,7 +140,7 @@ impl RoutingTable {
|
||||
) -> String {
|
||||
let inner = self.inner.read();
|
||||
let inner = &*inner;
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
let mut out = String::new();
|
||||
|
||||
@ -211,15 +211,24 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String {
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
let mut out = String::new();
|
||||
out += &node_ref.operate(|_rt, e| format!("{:#?}\n", e));
|
||||
out += &node_ref.operate(|_rti, e| {
|
||||
let state_reason = e.state_reason(cur_ts);
|
||||
format!(
|
||||
"state: {}\n{:#?}\n",
|
||||
Self::format_state_reason(state_reason),
|
||||
e
|
||||
)
|
||||
});
|
||||
out
|
||||
}
|
||||
|
||||
pub(crate) fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
|
||||
let inner = self.inner.read();
|
||||
let inner = &*inner;
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
let mut out = String::new();
|
||||
const COLS: usize = 16;
|
||||
|
@ -763,7 +763,7 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
pub fn clear_punishments(&self) {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
self.inner
|
||||
.write()
|
||||
.with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| {
|
||||
|
@ -325,14 +325,14 @@ pub(crate) trait NodeRefBase: Sized {
|
||||
}
|
||||
|
||||
fn report_protected_connection_dropped(&self) {
|
||||
self.stats_failed_to_send(get_aligned_timestamp(), false);
|
||||
self.stats_failed_to_send(Timestamp::now(), false);
|
||||
}
|
||||
|
||||
fn report_failed_route_test(&self) {
|
||||
self.stats_failed_to_send(get_aligned_timestamp(), false);
|
||||
self.stats_failed_to_send(Timestamp::now(), false);
|
||||
}
|
||||
|
||||
fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) {
|
||||
fn stats_question_sent(&self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) {
|
||||
self.operate_mut(|rti, e| {
|
||||
rti.transfer_stats_accounting().add_up(bytes);
|
||||
e.question_sent(ts, bytes, expects_answer);
|
||||
|
@ -254,7 +254,7 @@ impl RouteSpecStore {
|
||||
.map(|nr| nr.locked(rti));
|
||||
|
||||
// Get list of all nodes, and sort them for selection
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
let filter = Box::new(
|
||||
|_rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool {
|
||||
// Exclude our own node from routes
|
||||
@ -838,7 +838,7 @@ impl RouteSpecStore {
|
||||
/// Check if a route id is remote or not
|
||||
pub fn is_route_id_remote(&self, id: &RouteId) -> bool {
|
||||
let inner = &mut *self.inner.lock();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
inner
|
||||
.cache
|
||||
.peek_remote_private_route_mut(cur_ts, id)
|
||||
@ -881,7 +881,7 @@ impl RouteSpecStore {
|
||||
directions: DirectionSet,
|
||||
avoid_nodes: &[TypedKey],
|
||||
) -> Option<RouteId> {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
let mut routes = Vec::new();
|
||||
|
||||
@ -954,7 +954,7 @@ impl RouteSpecStore {
|
||||
F: FnMut(&RouteId, &RemotePrivateRouteInfo) -> Option<R>,
|
||||
{
|
||||
let inner = self.inner.lock();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
let remote_route_ids = inner.cache.get_remote_private_route_ids(cur_ts);
|
||||
let mut out = Vec::with_capacity(remote_route_ids.len());
|
||||
for id in remote_route_ids {
|
||||
@ -970,7 +970,7 @@ impl RouteSpecStore {
|
||||
/// Get the debug description of a route
|
||||
pub fn debug_route(&self, id: &RouteId) -> Option<String> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
if let Some(rpri) = inner.cache.peek_remote_private_route(cur_ts, id) {
|
||||
return Some(format!("{:#?}", rpri));
|
||||
}
|
||||
@ -985,7 +985,7 @@ impl RouteSpecStore {
|
||||
/// Choose the best private route from a private route set to communicate with
|
||||
pub fn best_remote_private_route(&self, id: &RouteId) -> Option<PrivateRoute> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
let rpri = inner.cache.get_remote_private_route(cur_ts, id)?;
|
||||
rpri.best_private_route()
|
||||
}
|
||||
@ -1525,7 +1525,7 @@ impl RouteSpecStore {
|
||||
/// Returns a route set id
|
||||
#[instrument(level = "trace", target = "route", skip_all)]
|
||||
pub fn import_remote_private_route_blob(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
// decode the pr blob
|
||||
let private_routes = RouteSpecStore::blob_to_private_routes(
|
||||
@ -1565,7 +1565,7 @@ impl RouteSpecStore {
|
||||
&self,
|
||||
private_route: PrivateRoute,
|
||||
) -> VeilidAPIResult<RouteId> {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
// Make a single route set
|
||||
let private_routes = vec![private_route];
|
||||
@ -1630,7 +1630,7 @@ impl RouteSpecStore {
|
||||
}
|
||||
|
||||
if let Some(rrid) = inner.cache.get_remote_private_route_id_by_key(key) {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
if let Some(rpri) = inner.cache.peek_remote_private_route(cur_ts, &rrid) {
|
||||
let our_node_info_ts = self
|
||||
.unlocked_inner
|
||||
|
@ -235,7 +235,7 @@ impl RoutingTableInner {
|
||||
}
|
||||
|
||||
pub fn reset_all_updated_since_last_network_change(&mut self) {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| {
|
||||
v.with_mut(rti, |_rti, e| {
|
||||
e.reset_updated_since_last_network_change();
|
||||
@ -340,7 +340,7 @@ impl RoutingTableInner {
|
||||
|
||||
// If the local network topology has changed, nuke the existing local node info and let new local discovery happen
|
||||
if changed {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| {
|
||||
e.with_mut(rti, |_rti, e| {
|
||||
e.clear_signed_node_info(RoutingDomain::LocalNetwork);
|
||||
@ -420,7 +420,7 @@ impl RoutingTableInner {
|
||||
/// Only considers entries that have valid signed node info
|
||||
pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts {
|
||||
self.live_entry_count.clear();
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
self.with_entries_mut(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
|
||||
entry.with_inner(|e| {
|
||||
// Tally per routing domain and crypto kind
|
||||
@ -458,7 +458,7 @@ impl RoutingTableInner {
|
||||
crypto_kinds: &[CryptoKind],
|
||||
) -> usize {
|
||||
let mut count = 0usize;
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
self.with_entries(cur_ts, min_state, |rti, e| {
|
||||
if e.with_inner(|e| {
|
||||
e.best_routing_domain(rti, routing_domain_set).is_some()
|
||||
@ -884,7 +884,7 @@ impl RoutingTableInner {
|
||||
let mut unreliable_entry_count: usize = 0;
|
||||
let mut dead_entry_count: usize = 0;
|
||||
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
for entry in self.all_entries.iter() {
|
||||
match entry.with_inner(|e| e.state(cur_ts)) {
|
||||
BucketEntryState::Reliable => {
|
||||
@ -1078,7 +1078,7 @@ impl RoutingTableInner {
|
||||
where
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search
|
||||
let filter_self =
|
||||
@ -1158,7 +1158,7 @@ impl RoutingTableInner {
|
||||
where
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
|
||||
// Get the crypto kind
|
||||
let crypto_kind = node_id.kind;
|
||||
|
@ -44,7 +44,7 @@ impl SignedDirectNodeInfo {
|
||||
typed_key_pairs: Vec<TypedKeyPair>,
|
||||
node_info: NodeInfo,
|
||||
) -> VeilidAPIResult<Self> {
|
||||
let timestamp = get_aligned_timestamp();
|
||||
let timestamp = Timestamp::now();
|
||||
let node_info_bytes = Self::make_signature_bytes(&node_info, timestamp)?;
|
||||
let typed_signatures =
|
||||
crypto.generate_signatures(&node_info_bytes, &typed_key_pairs, |kp, s| {
|
||||
@ -78,7 +78,7 @@ impl SignedDirectNodeInfo {
|
||||
pub fn with_no_signature(node_info: NodeInfo) -> Self {
|
||||
Self {
|
||||
node_info,
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
signatures: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ impl SignedRelayedNodeInfo {
|
||||
relay_ids: TypedKeyGroup,
|
||||
relay_info: SignedDirectNodeInfo,
|
||||
) -> VeilidAPIResult<Self> {
|
||||
let timestamp = get_aligned_timestamp();
|
||||
let timestamp = Timestamp::now();
|
||||
let node_info_bytes =
|
||||
Self::make_signature_bytes(&node_info, &relay_ids, &relay_info, timestamp)?;
|
||||
let typed_signatures =
|
||||
|
@ -515,7 +515,7 @@ impl RPCProcessor {
|
||||
// ensure we have some dial info for the entry already,
|
||||
// and that the node is still alive
|
||||
// if not, we should keep looking for better info
|
||||
if nr.state(get_aligned_timestamp()).is_alive() &&
|
||||
if nr.state(Timestamp::now()).is_alive() &&
|
||||
nr.has_any_dial_info() {
|
||||
return Some(nr);
|
||||
}
|
||||
@ -560,7 +560,7 @@ impl RPCProcessor {
|
||||
// ensure we have some dial info for the entry already,
|
||||
// and that the node is still alive
|
||||
// if not, we should do the find_node anyway
|
||||
if nr.state(get_aligned_timestamp()).is_alive() &&
|
||||
if nr.state(Timestamp::now()).is_alive() &&
|
||||
nr.has_any_dial_info() {
|
||||
return Ok(Some(nr));
|
||||
}
|
||||
@ -627,7 +627,7 @@ impl RPCProcessor {
|
||||
}
|
||||
Ok(TimeoutOr::Value((rpcreader, _))) => {
|
||||
// Reply received
|
||||
let recv_ts = get_aligned_timestamp();
|
||||
let recv_ts = Timestamp::now();
|
||||
|
||||
// Record answer received
|
||||
self.record_answer_received(
|
||||
@ -1208,7 +1208,7 @@ impl RPCProcessor {
|
||||
|
||||
// Send question
|
||||
let bytes: ByteCount = (message.len() as u64).into();
|
||||
let send_ts = get_aligned_timestamp();
|
||||
let send_ts = Timestamp::now();
|
||||
#[allow(unused_variables)]
|
||||
let message_len = message.len();
|
||||
let res = self
|
||||
@ -1298,7 +1298,7 @@ impl RPCProcessor {
|
||||
|
||||
// Send statement
|
||||
let bytes: ByteCount = (message.len() as u64).into();
|
||||
let send_ts = get_aligned_timestamp();
|
||||
let send_ts = Timestamp::now();
|
||||
#[allow(unused_variables)]
|
||||
let message_len = message.len();
|
||||
let res = self
|
||||
@ -1374,7 +1374,7 @@ impl RPCProcessor {
|
||||
|
||||
// Send the reply
|
||||
let bytes: ByteCount = (message.len() as u64).into();
|
||||
let send_ts = get_aligned_timestamp();
|
||||
let send_ts = Timestamp::now();
|
||||
#[allow(unused_variables)]
|
||||
let message_len = message.len();
|
||||
let res = self
|
||||
@ -1719,7 +1719,7 @@ impl RPCProcessor {
|
||||
flow,
|
||||
routing_domain,
|
||||
}),
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
body_len: ByteCount::new(body.len() as u64),
|
||||
};
|
||||
|
||||
@ -1755,7 +1755,7 @@ impl RPCProcessor {
|
||||
remote_safety_route,
|
||||
sequencing,
|
||||
}),
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
body_len: (body.len() as u64).into(),
|
||||
};
|
||||
|
||||
@ -1792,7 +1792,7 @@ impl RPCProcessor {
|
||||
private_route,
|
||||
safety_spec,
|
||||
}),
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
body_len: (body.len() as u64).into(),
|
||||
};
|
||||
|
||||
|
@ -83,7 +83,7 @@ where
|
||||
let (result_sender, result_receiver) = flume::bounded(1);
|
||||
let waiting_op = OperationWaitingOp {
|
||||
context,
|
||||
timestamp: get_aligned_timestamp(),
|
||||
timestamp: Timestamp::now(),
|
||||
result_sender,
|
||||
};
|
||||
if inner.waiting_op_table.insert(op_id, waiting_op).is_some() {
|
||||
@ -166,7 +166,7 @@ where
|
||||
let result_fut = result_receiver.recv_async().in_current_span();
|
||||
|
||||
// wait for eventualvalue
|
||||
let start_ts = get_aligned_timestamp();
|
||||
let start_ts = Timestamp::now();
|
||||
let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
|
||||
|
||||
match res {
|
||||
@ -175,7 +175,7 @@ where
|
||||
Ok(TimeoutOr::Timeout)
|
||||
}
|
||||
TimeoutOr::Value(Ok((_span_id, ret))) => {
|
||||
let end_ts = get_aligned_timestamp();
|
||||
let end_ts = Timestamp::now();
|
||||
|
||||
//xxx: causes crash (Missing otel data span extensions)
|
||||
// Span::current().follows_from(span_id);
|
||||
|
@ -38,7 +38,7 @@ impl RPCProcessor {
|
||||
// Because this exits before calling 'question()',
|
||||
// a failure to find a routing domain constitutes a send failure
|
||||
// Record the send failure on both the node and its relay
|
||||
let send_ts = get_aligned_timestamp();
|
||||
let send_ts = Timestamp::now();
|
||||
if let Some(node) = &opt_node {
|
||||
self.record_send_failure(RPCKind::Question, send_ts, node.clone(), None, None);
|
||||
}
|
||||
|
@ -22,7 +22,9 @@ impl RPCProcessor {
|
||||
.ok_or(RPCError::try_again("not started up"))?;
|
||||
|
||||
let network_manager = self.network_manager();
|
||||
let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms);
|
||||
let receipt_time = TimestampDuration::new_ms(
|
||||
self.unlocked_inner.validate_dial_info_receipt_time_ms as u64,
|
||||
);
|
||||
|
||||
// Generate receipt and waitable eventual so we can see if we get the receipt back
|
||||
let (receipt, eventual_value) = network_manager
|
||||
|
@ -464,7 +464,7 @@ where
|
||||
out = Some(f(record));
|
||||
|
||||
// Touch
|
||||
record.touch(get_aligned_timestamp());
|
||||
record.touch(Timestamp::now());
|
||||
}
|
||||
if out.is_some() {
|
||||
// Marks as changed because the record was touched and we want to keep the
|
||||
@ -503,7 +503,7 @@ where
|
||||
out = Some(f(record));
|
||||
|
||||
// Touch
|
||||
record.touch(get_aligned_timestamp());
|
||||
record.touch(Timestamp::now());
|
||||
}
|
||||
if out.is_some() {
|
||||
// Marks as changed because the record was touched and we want to keep the
|
||||
@ -1133,7 +1133,7 @@ where
|
||||
/// See if any watched records have expired and clear them out
|
||||
#[instrument(level = "trace", target = "stor", skip_all)]
|
||||
pub fn check_watched_records(&mut self) {
|
||||
let now = get_aligned_timestamp();
|
||||
let now = Timestamp::now();
|
||||
self.watched_records.retain(|key, watch_list| {
|
||||
watch_list.watches.retain(|w| {
|
||||
w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty()
|
||||
|
@ -256,7 +256,7 @@ impl StorageManagerInner {
|
||||
)?);
|
||||
|
||||
// Add new local value record
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
let local_record_detail = LocalRecordDetail::new(safety_selection);
|
||||
let record =
|
||||
Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?;
|
||||
@ -293,7 +293,7 @@ impl StorageManagerInner {
|
||||
};
|
||||
|
||||
// Make local record
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
let local_record = Record::new(
|
||||
cur_ts,
|
||||
remote_record.descriptor().clone(),
|
||||
@ -435,7 +435,7 @@ impl StorageManagerInner {
|
||||
|
||||
// Make and store a new record for this descriptor
|
||||
let record = Record::<LocalRecordDetail>::new(
|
||||
get_aligned_timestamp(),
|
||||
Timestamp::now(),
|
||||
signed_value_descriptor,
|
||||
LocalRecordDetail::new(safety_selection),
|
||||
)?;
|
||||
@ -500,7 +500,7 @@ impl StorageManagerInner {
|
||||
// Get local record store
|
||||
let local_record_store = self.local_record_store.as_mut().unwrap();
|
||||
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
local_record_store.with_record_mut(key, |r| {
|
||||
let d = r.detail_mut();
|
||||
|
||||
@ -655,7 +655,7 @@ impl StorageManagerInner {
|
||||
// See if we have a remote record already or not
|
||||
if remote_record_store.with_record(key, |_| {}).is_none() {
|
||||
// record didn't exist, make it
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
let remote_record_detail = RemoteRecordDetail {};
|
||||
let record = Record::<RemoteRecordDetail>::new(
|
||||
cur_ts,
|
||||
|
@ -18,7 +18,7 @@ impl StorageManager {
|
||||
|
||||
let opt_update_callback = inner.update_callback.clone();
|
||||
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let cur_ts = Timestamp::now();
|
||||
for (k, v) in inner.opened_records.iter_mut() {
|
||||
// If no active watch, then skip this
|
||||
let Some(active_watch) = v.active_watch() else {
|
||||
|
@ -38,11 +38,11 @@ pub struct IOStatsInfo {
|
||||
/// Number of write operations.
|
||||
pub writes: AlignedU64,
|
||||
/// Number of bytes read
|
||||
pub bytes_read: AlignedU64,
|
||||
pub bytes_read: ByteCount,
|
||||
/// Number of bytes read from cache
|
||||
pub cache_read_bytes: AlignedU64,
|
||||
pub cache_read_bytes: ByteCount,
|
||||
/// Number of bytes write
|
||||
pub bytes_written: AlignedU64,
|
||||
pub bytes_written: ByteCount,
|
||||
/// Start of the statistic period.
|
||||
pub started: Timestamp,
|
||||
/// Total duration of the statistic period.
|
||||
@ -672,9 +672,9 @@ impl TableStore {
|
||||
reads: AlignedU64::new(io_stats_since_previous.reads),
|
||||
cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads),
|
||||
writes: AlignedU64::new(io_stats_since_previous.writes),
|
||||
bytes_read: AlignedU64::new(io_stats_since_previous.bytes_read),
|
||||
cache_read_bytes: AlignedU64::new(io_stats_since_previous.cache_read_bytes),
|
||||
bytes_written: AlignedU64::new(io_stats_since_previous.bytes_written),
|
||||
bytes_read: ByteCount::new(io_stats_since_previous.bytes_read),
|
||||
cache_read_bytes: ByteCount::new(io_stats_since_previous.cache_read_bytes),
|
||||
bytes_written: ByteCount::new(io_stats_since_previous.bytes_written),
|
||||
started: Timestamp::new(
|
||||
io_stats_since_previous
|
||||
.started
|
||||
@ -689,9 +689,9 @@ impl TableStore {
|
||||
reads: AlignedU64::new(io_stats_overall.reads),
|
||||
cache_reads: AlignedU64::new(io_stats_overall.cache_reads),
|
||||
writes: AlignedU64::new(io_stats_overall.writes),
|
||||
bytes_read: AlignedU64::new(io_stats_overall.bytes_read),
|
||||
cache_read_bytes: AlignedU64::new(io_stats_overall.cache_read_bytes),
|
||||
bytes_written: AlignedU64::new(io_stats_overall.bytes_written),
|
||||
bytes_read: ByteCount::new(io_stats_overall.bytes_read),
|
||||
cache_read_bytes: ByteCount::new(io_stats_overall.cache_read_bytes),
|
||||
bytes_written: ByteCount::new(io_stats_overall.bytes_written),
|
||||
started: Timestamp::new(
|
||||
io_stats_overall
|
||||
.started
|
||||
|
@ -816,7 +816,7 @@ impl JsonRequestProcessor {
|
||||
result: to_json_api_result_with_string(Crypto::generate_keypair(kind)),
|
||||
},
|
||||
RequestOp::Now => ResponseOp::Now {
|
||||
value: get_aligned_timestamp(),
|
||||
value: Timestamp::now(),
|
||||
},
|
||||
RequestOp::Debug { command } => ResponseOp::Debug {
|
||||
result: to_json_api_result(self.api.debug(command).await),
|
||||
|
@ -4,18 +4,18 @@ use crate::*;
|
||||
|
||||
pub fn fix_latencystats() -> LatencyStats {
|
||||
LatencyStats {
|
||||
fastest: AlignedU64::from(1234),
|
||||
average: AlignedU64::from(2345),
|
||||
slowest: AlignedU64::from(3456),
|
||||
fastest: TimestampDuration::from(1234),
|
||||
average: TimestampDuration::from(2345),
|
||||
slowest: TimestampDuration::from(3456),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fix_transferstats() -> TransferStats {
|
||||
TransferStats {
|
||||
total: AlignedU64::from(1_000_000),
|
||||
maximum: AlignedU64::from(3456),
|
||||
average: AlignedU64::from(2345),
|
||||
minimum: AlignedU64::from(1234),
|
||||
total: ByteCount::from(1_000_000),
|
||||
maximum: ByteCount::from(3456),
|
||||
average: ByteCount::from(2345),
|
||||
minimum: ByteCount::from(1234),
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,9 +31,9 @@ pub fn fix_rpcstats() -> RPCStats {
|
||||
messages_sent: 1_000_000,
|
||||
messages_rcvd: 2_000_000,
|
||||
questions_in_flight: 42,
|
||||
last_question_ts: Some(AlignedU64::from(1685569084280)),
|
||||
last_seen_ts: Some(AlignedU64::from(1685569101256)),
|
||||
first_consecutive_seen_ts: Some(AlignedU64::from(1685569111851)),
|
||||
last_question_ts: Some(Timestamp::from(1685569084280)),
|
||||
last_seen_ts: Some(Timestamp::from(1685569101256)),
|
||||
first_consecutive_seen_ts: Some(Timestamp::from(1685569111851)),
|
||||
recent_lost_answers: 5,
|
||||
failed_to_send: 3,
|
||||
}
|
||||
@ -41,7 +41,7 @@ pub fn fix_rpcstats() -> RPCStats {
|
||||
|
||||
pub fn fix_peerstats() -> PeerStats {
|
||||
PeerStats {
|
||||
time_added: AlignedU64::from(1685569176894),
|
||||
time_added: Timestamp::from(1685569176894),
|
||||
rpc_stats: fix_rpcstats(),
|
||||
latency: Some(fix_latencystats()),
|
||||
transfer: fix_transferstatsdownup(),
|
||||
|
@ -28,7 +28,7 @@ pub async fn test_veilidappcall() {
|
||||
Some(fix_typedkey()),
|
||||
Some(fix_cryptokey()),
|
||||
b"Well, hello!".to_vec(),
|
||||
AlignedU64::from(123),
|
||||
OperationId::from(123),
|
||||
);
|
||||
let copy = deserialize_json(&serialize_json(&orig)).unwrap();
|
||||
|
||||
@ -229,8 +229,8 @@ pub async fn test_peertabledata() {
|
||||
pub async fn test_veilidstatenetwork() {
|
||||
let orig = VeilidStateNetwork {
|
||||
started: true,
|
||||
bps_down: AlignedU64::from(14_400),
|
||||
bps_up: AlignedU64::from(1200),
|
||||
bps_down: ByteCount::from(14_400),
|
||||
bps_up: ByteCount::from(1200),
|
||||
peers: vec![fix_peertabledata()],
|
||||
};
|
||||
let copy = deserialize_json(&serialize_json(&orig)).unwrap();
|
||||
@ -280,8 +280,8 @@ pub async fn test_veilidstate() {
|
||||
}),
|
||||
network: Box::new(VeilidStateNetwork {
|
||||
started: true,
|
||||
bps_down: AlignedU64::from(14_400),
|
||||
bps_up: AlignedU64::from(1200),
|
||||
bps_down: ByteCount::from(14_400),
|
||||
bps_up: ByteCount::from(1200),
|
||||
peers: vec![fix_peertabledata()],
|
||||
}),
|
||||
config: Box::new(VeilidStateConfig {
|
||||
|
@ -1,135 +1,163 @@
|
||||
use super::*;
|
||||
|
||||
/// Aligned u64.
|
||||
/// Aligned u64 type generator
|
||||
///
|
||||
/// Required on 32-bit platforms for serialization because Rust aligns u64 on 4 byte boundaries.
|
||||
/// Some zero-copy serialization frameworks also want 8-byte alignment.
|
||||
/// Supports serializing to string for JSON as well, since JSON can't handle 64-bit numbers to Javascript.
|
||||
|
||||
#[derive(
|
||||
Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, Hash, Serialize, Deserialize, JsonSchema,
|
||||
)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
#[repr(C, align(8))]
|
||||
#[serde(transparent)]
|
||||
pub struct AlignedU64(
|
||||
macro_rules! aligned_u64_type {
|
||||
($name:ident) => {
|
||||
#[derive(
|
||||
Clone,
|
||||
Default,
|
||||
PartialEq,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Copy,
|
||||
Hash,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
JsonSchema,
|
||||
)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
#[repr(C, align(8))]
|
||||
#[serde(transparent)]
|
||||
pub struct $name(
|
||||
#[serde(with = "as_human_string")]
|
||||
#[schemars(with = "String")]
|
||||
#[cfg_attr(target_arch = "wasm32", tsify(type = "string"))]
|
||||
u64,
|
||||
);
|
||||
);
|
||||
|
||||
impl From<u64> for AlignedU64 {
|
||||
impl From<u64> for $name {
|
||||
fn from(v: u64) -> Self {
|
||||
AlignedU64(v)
|
||||
$name(v)
|
||||
}
|
||||
}
|
||||
impl From<AlignedU64> for u64 {
|
||||
fn from(v: AlignedU64) -> Self {
|
||||
}
|
||||
impl From<$name> for u64 {
|
||||
fn from(v: $name) -> Self {
|
||||
v.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for AlignedU64 {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
(&self.0 as &dyn fmt::Display).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for AlignedU64 {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
(&self.0 as &dyn fmt::Debug).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for AlignedU64 {
|
||||
type Err = <u64 as FromStr>::Err;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(AlignedU64(u64::from_str(s)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Add<Rhs> for AlignedU64 {
|
||||
type Output = Self;
|
||||
|
||||
fn add(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 + rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::AddAssign<Rhs> for AlignedU64 {
|
||||
fn add_assign(&mut self, rhs: Rhs) {
|
||||
self.0 += rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Sub<Rhs> for AlignedU64 {
|
||||
type Output = Self;
|
||||
|
||||
fn sub(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 - rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::SubAssign<Rhs> for AlignedU64 {
|
||||
fn sub_assign(&mut self, rhs: Rhs) {
|
||||
self.0 -= rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Mul<Rhs> for AlignedU64 {
|
||||
type Output = Self;
|
||||
|
||||
fn mul(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 * rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::MulAssign<Rhs> for AlignedU64 {
|
||||
fn mul_assign(&mut self, rhs: Rhs) {
|
||||
self.0 *= rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Div<Rhs> for AlignedU64 {
|
||||
type Output = Self;
|
||||
|
||||
fn div(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 / rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::DivAssign<Rhs> for AlignedU64 {
|
||||
fn div_assign(&mut self, rhs: Rhs) {
|
||||
self.0 /= rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl AlignedU64 {
|
||||
impl $name {
|
||||
pub const fn new(v: u64) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
pub fn as_u64(self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! aligned_u64_type_default_math_impl {
|
||||
($name:ident) => {
|
||||
impl<Rhs: Into<u64>> core::ops::Add<Rhs> for $name {
|
||||
type Output = Self;
|
||||
|
||||
fn add(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 + rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::AddAssign<Rhs> for $name {
|
||||
fn add_assign(&mut self, rhs: Rhs) {
|
||||
self.0 += rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Sub<Rhs> for $name {
|
||||
type Output = Self;
|
||||
|
||||
fn sub(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 - rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::SubAssign<Rhs> for $name {
|
||||
fn sub_assign(&mut self, rhs: Rhs) {
|
||||
self.0 -= rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Mul<Rhs> for $name {
|
||||
type Output = Self;
|
||||
|
||||
fn mul(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 * rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::MulAssign<Rhs> for $name {
|
||||
fn mul_assign(&mut self, rhs: Rhs) {
|
||||
self.0 *= rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::Div<Rhs> for $name {
|
||||
type Output = Self;
|
||||
|
||||
fn div(self, rhs: Rhs) -> Self {
|
||||
Self(self.0 / rhs.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Rhs: Into<u64>> core::ops::DivAssign<Rhs> for $name {
|
||||
fn div_assign(&mut self, rhs: Rhs) {
|
||||
self.0 /= rhs.into();
|
||||
}
|
||||
}
|
||||
|
||||
impl $name {
|
||||
pub fn saturating_sub(self, rhs: Self) -> Self {
|
||||
Self(self.0.saturating_sub(rhs.0))
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! aligned_u64_type_default_display_impl {
|
||||
($name:ident) => {
|
||||
impl fmt::Display for $name {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
(&self.0 as &dyn fmt::Display).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for $name {
|
||||
type Err = <u64 as FromStr>::Err;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok($name(u64::from_str(s)?))
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! aligned_u64_type_default_debug_impl {
|
||||
($name:ident) => {
|
||||
impl fmt::Debug for $name {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
(&self.0 as &dyn fmt::Debug).fmt(f)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Microseconds since epoch
|
||||
#[cfg_attr(target_arch = "wasm32", declare)]
|
||||
pub type Timestamp = AlignedU64;
|
||||
pub fn get_aligned_timestamp() -> Timestamp {
|
||||
get_timestamp().into()
|
||||
}
|
||||
/// Microseconds duration
|
||||
#[cfg_attr(target_arch = "wasm32", declare)]
|
||||
pub type TimestampDuration = AlignedU64;
|
||||
/// Request/Response matching id
|
||||
#[cfg_attr(target_arch = "wasm32", declare)]
|
||||
pub type OperationId = AlignedU64;
|
||||
/// Number of bytes
|
||||
#[cfg_attr(target_arch = "wasm32", declare)]
|
||||
pub type ByteCount = AlignedU64;
|
||||
aligned_u64_type!(OperationId);
|
||||
aligned_u64_type_default_display_impl!(OperationId);
|
||||
aligned_u64_type_default_debug_impl!(OperationId);
|
||||
|
||||
aligned_u64_type!(ByteCount);
|
||||
aligned_u64_type_default_display_impl!(ByteCount);
|
||||
aligned_u64_type_default_debug_impl!(ByteCount);
|
||||
aligned_u64_type_default_math_impl!(ByteCount);
|
||||
|
||||
aligned_u64_type!(AlignedU64);
|
||||
aligned_u64_type_default_display_impl!(AlignedU64);
|
||||
aligned_u64_type_default_debug_impl!(AlignedU64);
|
||||
aligned_u64_type_default_math_impl!(AlignedU64);
|
||||
|
@ -1,9 +1,12 @@
|
||||
#[macro_use]
|
||||
mod aligned_u64;
|
||||
mod app_message_call;
|
||||
mod dht;
|
||||
mod fourcc;
|
||||
mod safety;
|
||||
mod stats;
|
||||
mod timestamp;
|
||||
mod timestamp_duration;
|
||||
#[cfg(feature = "unstable-tunnels")]
|
||||
mod tunnel;
|
||||
mod veilid_log;
|
||||
@ -17,6 +20,8 @@ pub use dht::*;
|
||||
pub use fourcc::*;
|
||||
pub use safety::*;
|
||||
pub use stats::*;
|
||||
pub use timestamp::*;
|
||||
pub use timestamp_duration::*;
|
||||
#[cfg(feature = "unstable-tunnels")]
|
||||
pub use tunnel::*;
|
||||
pub use veilid_log::*;
|
||||
|
57
veilid-core/src/veilid_api/types/timestamp.rs
Normal file
57
veilid-core/src/veilid_api/types/timestamp.rs
Normal file
@ -0,0 +1,57 @@
|
||||
/// Microseconds since epoch
|
||||
use super::*;
|
||||
|
||||
aligned_u64_type!(Timestamp);
|
||||
aligned_u64_type_default_display_impl!(Timestamp);
|
||||
|
||||
impl fmt::Debug for Timestamp {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", debug_ts(self.as_u64()))
|
||||
}
|
||||
}
|
||||
|
||||
impl core::ops::Add<TimestampDuration> for Timestamp {
|
||||
type Output = Self;
|
||||
|
||||
fn add(self, rhs: TimestampDuration) -> Self {
|
||||
Self(self.0 + rhs.as_u64())
|
||||
}
|
||||
}
|
||||
|
||||
impl core::ops::AddAssign<TimestampDuration> for Timestamp {
|
||||
fn add_assign(&mut self, rhs: TimestampDuration) {
|
||||
self.0 += rhs.as_u64();
|
||||
}
|
||||
}
|
||||
|
||||
impl core::ops::Sub<Timestamp> for Timestamp {
|
||||
type Output = TimestampDuration;
|
||||
|
||||
fn sub(self, rhs: Timestamp) -> TimestampDuration {
|
||||
TimestampDuration::new(self.0 - rhs.as_u64())
|
||||
}
|
||||
}
|
||||
|
||||
impl core::ops::Sub<TimestampDuration> for Timestamp {
|
||||
type Output = Timestamp;
|
||||
|
||||
fn sub(self, rhs: TimestampDuration) -> Timestamp {
|
||||
Timestamp(self.0 - rhs.as_u64())
|
||||
}
|
||||
}
|
||||
|
||||
impl core::ops::SubAssign<TimestampDuration> for Timestamp {
|
||||
fn sub_assign(&mut self, rhs: TimestampDuration) {
|
||||
self.0 -= rhs.as_u64();
|
||||
}
|
||||
}
|
||||
|
||||
impl Timestamp {
|
||||
pub fn now() -> Timestamp {
|
||||
Timestamp::new(get_timestamp())
|
||||
}
|
||||
|
||||
pub fn saturating_sub(self, rhs: Self) -> TimestampDuration {
|
||||
TimestampDuration::new(self.0.saturating_sub(rhs.0))
|
||||
}
|
||||
}
|
21
veilid-core/src/veilid_api/types/timestamp_duration.rs
Normal file
21
veilid-core/src/veilid_api/types/timestamp_duration.rs
Normal file
@ -0,0 +1,21 @@
|
||||
/// Microseconds since epoch
|
||||
use super::*;
|
||||
|
||||
aligned_u64_type!(TimestampDuration);
|
||||
aligned_u64_type_default_display_impl!(TimestampDuration);
|
||||
aligned_u64_type_default_math_impl!(TimestampDuration);
|
||||
|
||||
impl fmt::Debug for TimestampDuration {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", debug_duration(self.as_u64()))
|
||||
}
|
||||
}
|
||||
|
||||
impl TimestampDuration {
|
||||
pub fn new_secs<N: num_traits::Unsigned + num_traits::ToPrimitive>(secs: N) -> Self {
|
||||
TimestampDuration::new(secs.to_u64().unwrap() * 1_000_000u64)
|
||||
}
|
||||
pub fn new_ms<N: num_traits::Unsigned + num_traits::ToPrimitive>(ms: N) -> Self {
|
||||
TimestampDuration::new(ms.to_u64().unwrap() * 1_000u64)
|
||||
}
|
||||
}
|
@ -1969,7 +1969,7 @@ pub extern "C" fn crypto_crypt_no_auth(
|
||||
#[no_mangle]
|
||||
#[instrument(level = "trace", target = "ffi", skip_all)]
|
||||
pub extern "C" fn now() -> u64 {
|
||||
veilid_core::get_aligned_timestamp().as_u64()
|
||||
veilid_core::Timestamp::now().as_u64()
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
|
@ -1517,7 +1517,7 @@ pub fn crypto_crypt_no_auth(
|
||||
|
||||
#[wasm_bindgen()]
|
||||
pub fn now() -> String {
|
||||
veilid_core::get_aligned_timestamp().as_u64().to_string()
|
||||
veilid_core::Timestamp::now().as_u64().to_string()
|
||||
}
|
||||
|
||||
#[wasm_bindgen()]
|
||||
|
@ -178,7 +178,7 @@ impl VeilidClient {
|
||||
|
||||
/// Get the current timestamp, in string format
|
||||
pub fn now() -> String {
|
||||
veilid_core::get_aligned_timestamp().as_u64().to_string()
|
||||
veilid_core::Timestamp::now().as_u64().to_string()
|
||||
}
|
||||
|
||||
/// Execute an 'internal debug command'.
|
||||
|
Loading…
Reference in New Issue
Block a user