checkpoint

This commit is contained in:
John Smith 2022-10-12 15:52:19 -04:00
parent 9c59507ea0
commit a06c2fb5a3
3 changed files with 137 additions and 71 deletions

View File

@ -224,7 +224,7 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), err)]
pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
let (bootstrap, bootstrap_nodes) = {
let c = self.config.get();
let c = self.unlocked_inner.config.get();
(
c.network.bootstrap.clone(),
c.network.bootstrap_nodes.clone(),
@ -487,7 +487,7 @@ impl NetworkManager {
let routing_table = self.routing_table();
let mut ord = FuturesOrdered::new();
let min_peer_count = {
let c = self.config.get();
let c = self.unlocked_inner.config.get();
c.network.dht.min_peer_count as usize
};

View File

@ -74,13 +74,13 @@ impl RoutingTable {
}
// Retrieve the fastest nodes in the routing table matching an entry filter
pub fn find_fast_public_nodes_filtered<'r, 'e, F>(
pub fn find_fast_public_nodes_filtered<'a, 'b, F>(
&self,
node_count: usize,
mut entry_filter: F,
) -> Vec<NodeRef>
where
F: FnMut(&'r RoutingTableInner, &'e BucketEntryInner) -> bool,
F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool,
{
self.find_fastest_nodes(
// count
@ -201,7 +201,7 @@ impl RoutingTable {
}
}
pub fn find_peers_with_sort_and_filter<F, C, T, O>(
pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>(
&self,
node_count: usize,
cur_ts: u64,
@ -210,13 +210,13 @@ impl RoutingTable {
mut transform: T,
) -> Vec<O>
where
F: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
C: FnMut(
&RoutingTableInner,
&(DHTKey, Option<Arc<BucketEntry>>),
&(DHTKey, Option<Arc<BucketEntry>>),
&'a RoutingTableInner,
&'b (DHTKey, Option<Arc<BucketEntry>>),
&'b (DHTKey, Option<Arc<BucketEntry>>),
) -> core::cmp::Ordering,
T: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
{
let inner = self.inner.read();
let inner = &*inner;
@ -259,15 +259,15 @@ impl RoutingTable {
out
}
pub fn find_fastest_nodes<T, F, O>(
pub fn find_fastest_nodes<'a, T, F, O>(
&self,
node_count: usize,
mut filter: F,
transform: T,
) -> Vec<O>
where
F: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
{
let cur_ts = intf::get_timestamp();
let out = self.find_peers_with_sort_and_filter(
@ -341,15 +341,15 @@ impl RoutingTable {
out
}
pub fn find_closest_nodes<F, T, O>(
pub fn find_closest_nodes<'a, F, T, O>(
&self,
node_id: DHTKey,
filter: F,
mut transform: T,
) -> Vec<O>
where
F: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
{
let cur_ts = intf::get_timestamp();
let node_count = {

View File

@ -37,7 +37,7 @@ struct RouteSpecDetail {
}
/// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct RouteSpecStoreContent {
/// All of the routes we have allocated so far
details: HashMap<DHTKey, RouteSpecDetail>,
@ -154,32 +154,95 @@ impl RouteSpecStore {
}
}
pub fn load(routing_table: RoutingTable) -> Result<RouteSpecStore, VeilidAPIError> {
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> {
// Get cbor blob from table store
let content: RouteSpecStoreContent = serde_cbor::from_slice(cbor)
.map_err(|e| VeilidAPIError::parse_error("invalid route spec store content", e))?;
let rss = RouteSpecStore {
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default();
let mut rss = RouteSpecStore {
content,
cache: Default::default(),
};
// Load secrets from pstore
let pstore = routing_table.network_manager().protected_store();
let mut dead_keys = Vec::new();
for (k, v) in &mut rss.content.details {
if let Some(secret_key) = pstore
.load_user_secret(&format!("RouteSpecStore_{}", k.encode()))
.await?
{
match secret_key.try_into() {
Ok(s) => {
v.secret_key = DHTKeySecret::new(s);
}
Err(_) => {
dead_keys.push(*k);
}
}
} else {
dead_keys.push(*k);
}
}
for k in dead_keys {
log_rtab!(debug "killing off private route: {}", k.encode());
rss.content.details.remove(&k);
}
// Rebuild the routespecstore cache
rss.rebuild_cache(routing_table);
Ok(rss)
}
pub fn save(&self, routing_table: RoutingTable) -> Result<(), VeilidAPIError> {
pub async fn save(&self, routing_table: RoutingTable) -> EyreResult<()> {
// Save all the fields we care about to the cbor blob in table storage
let cbor = serde_cbor::to_vec(&self.content).unwrap();
let table_store = routing_table.network_manager().table_store();
table_store.open("")
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
rsstdb.store_cbor(0, b"content", &self.content).await?;
// Keep secrets in protected store as well
let pstore = routing_table.network_manager().protected_store();
for (k, v) in &self.content.details {
if pstore
.save_user_secret(
&format!("RouteSpecStore_{}", k.encode()),
&v.secret_key.bytes,
)
.await?
{
panic!("route spec should not already have secret key saved");
}
}
Ok(())
}
fn add_to_cache(&mut self, cache_key: Vec<u8>, rsd: &RouteSpecDetail) {
if !self.cache.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice");
}
for h in &rsd.hops {
self.cache
.used_nodes
.entry(*h)
.and_modify(|e| *e += 1)
.or_insert(1);
}
self.cache
.used_end_nodes
.entry(*rsd.hops.last().unwrap())
.and_modify(|e| *e += 1)
.or_insert(1);
}
fn rebuild_cache(&mut self, routing_table: RoutingTable) {
//
// xxx also load secrets from pstore
let pstore = routing_table.network_manager().protected_store();
for v in self.content.details.values() {
let cache_key = route_hops_to_hop_cache(&v.hops);
self.add_to_cache(cache_key, &v);
}
}
fn detail_mut(&mut self, public_key: DHTKey) -> &mut RouteSpecDetail {
fn detail_mut(&mut self, public_key: &DHTKey) -> &mut RouteSpecDetail {
self.content.details.get_mut(&public_key).unwrap()
}
@ -187,13 +250,13 @@ impl RouteSpecStore {
/// Prefers nodes that are not currently in use by another route
/// The route is not yet tested for its reachability
/// Returns None if no route could be allocated at this time
pub fn allocate_route(
pub async fn allocate_route(
&mut self,
routing_table: RoutingTable,
reliable: bool,
hop_count: usize,
directions: DirectionSet,
) -> Option<DHTKey> {
) -> EyreResult<Option<DHTKey>> {
use core::cmp::Ordering;
let max_route_hop_count = {
@ -204,13 +267,11 @@ impl RouteSpecStore {
};
if hop_count < 2 {
log_rtab!(error "Not allocating route less than two hops in length");
return None;
bail!("Not allocating route less than two hops in length");
}
if hop_count > max_route_hop_count {
log_rtab!(error "Not allocating route longer than max route hop count");
return None;
bail!("Not allocating route longer than max route hop count");
}
// Get list of all nodes, and sort them for selection
@ -293,8 +354,8 @@ impl RouteSpecStore {
}
// always prioritize reliable nodes, but sort by oldest or fastest
let cmpout = v1.1.unwrap().with(rti, |rti, e1| {
v2.1.unwrap().with(rti, |_rti, e2| {
let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| {
v2.1.as_ref().unwrap().with(rti, |_rti, e2| {
if reliable {
BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2)
} else {
@ -321,13 +382,13 @@ impl RouteSpecStore {
RoutingDomain::PublicInternet.into(),
BucketEntryState::Unreliable,
);
let mut nodes = routing_table
let nodes = routing_table
.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform);
// If we couldn't find enough nodes, wait until we have more nodes in the routing table
if nodes.len() < hop_count {
log_rtab!(debug "Not enough nodes to construct route at this time. Try again later.");
return None;
log_rtab!(debug "not enough nodes to construct route at this time");
return Ok(None);
}
// Now go through nodes and try to build a route we haven't seen yet
@ -396,7 +457,8 @@ impl RouteSpecStore {
}
}
if route_nodes.is_empty() {
return None;
log_rtab!(debug "unable to find unique route at this time");
return Ok(None);
}
// Got a unique route, lets build the detail, register it, and return it
@ -423,25 +485,13 @@ impl RouteSpecStore {
reliable,
};
// Add to cache
self.add_to_cache(cache_key, &rsd);
// Keep route in spec store
self.content.details.insert(public_key, rsd);
if !self.cache.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice");
}
for h in &hops {
self.cache
.used_nodes
.entry(*h)
.and_modify(|e| *e += 1)
.or_insert(1);
}
self.cache
.used_end_nodes
.entry(*hops.last().unwrap())
.and_modify(|e| *e += 1)
.or_insert(1);
Some(public_key)
Ok(Some(public_key))
}
pub fn release_route(&mut self, public_key: DHTKey) {
@ -482,7 +532,7 @@ impl RouteSpecStore {
}
}
pub fn best_route(
pub fn first_unpublished_route(
&mut self,
reliable: bool,
min_hop_count: usize,
@ -494,6 +544,7 @@ impl RouteSpecStore {
&& detail.1.hops.len() >= min_hop_count
&& detail.1.hops.len() <= max_hop_count
&& detail.1.directions.is_subset(directions)
&& !detail.1.published
{
return Some(*detail.0);
}
@ -507,31 +558,46 @@ impl RouteSpecStore {
/// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain
/// in the RouteSpecStore.
pub fn mark_route_published(&mut self, spec: Arc<RouteSpec>) {
self.detail_mut(spec).published = true;
pub fn mark_route_published(&mut self, key: &DHTKey) {
self.detail_mut(&key).published = true;
}
/// Mark route as checked
pub fn touch_route_checked(&mut self, spec: Arc<RouteSpec>, cur_ts: u64) {
self.detail_mut(spec).last_checked_ts = cur_ts;
pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) {
self.detail_mut(&key).last_checked_ts = Some(cur_ts);
}
pub fn record_latency(&mut self, spec: Arc<RouteSpec>, latency: u64) {
let lsa = self.detail_mut(spec).latency_stats_accounting;
self.detail_mut(spec).latency_stats = lsa.record_latency(latency);
/// Record latency on the route
pub fn record_latency(&mut self, key: &DHTKey, latency: u64) {
let lsa = &mut self.detail_mut(&key).latency_stats_accounting;
self.detail_mut(&key).latency_stats = lsa.record_latency(latency);
}
pub fn latency_stats(&self, spec: Arc<RouteSpec>) -> LatencyStats {
self.detail_mut(spec).latency_stats.clone()
/// Get the calculated latency stats
pub fn latency_stats(&self, key: &DHTKey) -> LatencyStats {
self.detail_mut(&key).latency_stats.clone()
}
pub fn add_down(&mut self, spec: Arc<RouteSpec>, bytes: u64) {
self.current_transfer.down += bytes;
/// Add download transfers to route
pub fn add_down(&mut self, key: &DHTKey, bytes: u64) {
let tsa = &mut self.detail_mut(&key).transfer_stats_accounting;
tsa.add_down(bytes);
}
pub fn add_up(&mut self, spec: Arc<RouteSpec>, bytes: u64) {}
/// Add upload transfers to route
pub fn add_up(&mut self, key: &DHTKey, bytes: u64) {
let tsa = &mut self.detail_mut(&key).transfer_stats_accounting;
tsa.add_up(bytes);
}
pub fn roll_transfers(&mut self) {
//
/// Process transfer statistics to get averages
pub fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
for rsd in self.content.details.values_mut() {
rsd.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut rsd.transfer_stats_down_up,
);
}
}
}