fix address in use error and don't send valuechanged to node that sets value

This commit is contained in:
Christien Rioux 2023-12-06 17:32:11 -05:00
parent 05f89e070c
commit 96ead7dba5
7 changed files with 83 additions and 16 deletions

View File

@ -331,7 +331,7 @@ impl ConnectionManager {
}
// Attempt new connection
let mut retry_count = 0; // Someday, if we need this
let mut retry_count = 1;
let prot_conn = network_result_try!(loop {
let result_net_res = ProtocolNetworkConnection::connect(
@ -350,12 +350,18 @@ impl ConnectionManager {
}
Err(e) => {
if retry_count == 0 {
return Err(e).wrap_err("failed to connect");
return Err(e).wrap_err(format!(
"failed to connect: {:?} -> {:?}",
preferred_local_address, dial_info
));
}
}
};
log_net!(debug "get_or_create_connection retries left: {}", retry_count);
retry_count -= 1;
// Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
preferred_local_address = None;
sleep(500).await;
});

View File

@ -222,6 +222,10 @@ impl RPCProcessor {
// Destructure
let (key, subkey, value, descriptor) = set_value_q.destructure();
// Get target for ValueChanged notifications
let dest = network_result_try!(self.get_respond_to_destination(&msg));
let target = dest.get_target();
// Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
@ -257,7 +261,7 @@ impl RPCProcessor {
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
let new_value = network_result_try!(storage_manager
.inbound_set_value(key, subkey, Arc::new(value), descriptor.map(Arc::new))
.inbound_set_value(key, subkey, Arc::new(value), descriptor.map(Arc::new), target)
.await
.map_err(RPCError::internal)?);

View File

@ -416,7 +416,12 @@ impl StorageManager {
// If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq {
inner
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.handle_set_local_value(
key,
subkey,
subkey_result_value.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
}
Ok(Some(subkey_result_value.value_data().clone()))
@ -496,7 +501,12 @@ impl StorageManager {
// Offline, just write it locally and return immediately
inner
.handle_set_local_value(key, subkey, signed_value_data.clone())
.handle_set_local_value(
key,
subkey,
signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
@ -535,7 +545,12 @@ impl StorageManager {
// Whatever record we got back, store it locally, might be newer than the one we asked to save
inner
.handle_set_local_value(key, subkey, result.signed_value_data.clone())
.handle_set_local_value(
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
// Return the new value if it differs from what was asked to set

View File

@ -40,6 +40,12 @@ struct WatchedRecord {
watchers: Vec<WatchedRecordWatch>,
}
pub(super) enum WatchUpdateMode {
NoUpdate,
UpdateAll,
ExcludeTarget(Target),
}
pub(super) struct RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
@ -599,7 +605,12 @@ where
}))
}
async fn update_watched_value(&mut self, key: TypedKey, subkey: ValueSubkey) {
async fn update_watched_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
opt_ignore_target: Option<Target>,
) {
let rtk = RecordTableKey { key };
let Some(wr) = self.watched_records.get_mut(&rtk) else {
return;
@ -608,7 +619,12 @@ where
let mut changed = false;
for w in &mut wr.watchers {
// If this watcher is watching the changed subkey then add to the watcher's changed list
if w.subkeys.contains(subkey) && w.changed.insert(subkey) {
// Don't bother marking changes for value sets coming from the same watching node/target because they
// are already going to be aware of the changes in that case
if Some(&w.target) != opt_ignore_target.as_ref()
&& w.subkeys.contains(subkey)
&& w.changed.insert(subkey)
{
changed = true;
}
}
@ -622,6 +638,7 @@ where
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: WatchUpdateMode,
) -> VeilidAPIResult<()> {
// Check size limit for data
if signed_value_data.value_data().data().len() > self.limits.max_subkey_size {
@ -710,7 +727,16 @@ where
self.total_storage_space.commit().unwrap();
// Update watched value
self.update_watched_value(key, subkey).await;
let (do_update, opt_ignore_target) = match watch_update_mode {
WatchUpdateMode::NoUpdate => (false, None),
WatchUpdateMode::UpdateAll => (true, None),
WatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
};
if do_update {
self.update_watched_value(key, subkey, opt_ignore_target)
.await;
}
Ok(())
}

View File

@ -227,6 +227,7 @@ impl StorageManager {
subkey: ValueSubkey,
value: Arc<SignedValueData>,
descriptor: Option<Arc<SignedValueDescriptor>>,
target: Target,
) -> VeilidAPIResult<NetworkResult<Option<Arc<SignedValueData>>>> {
let mut inner = self.lock().await?;
@ -290,10 +291,18 @@ impl StorageManager {
// Do the set and return no new value
let res = if is_local {
inner.handle_set_local_value(key, subkey, value).await
inner
.handle_set_local_value(key, subkey, value, WatchUpdateMode::ExcludeTarget(target))
.await
} else {
inner
.handle_set_remote_value(key, subkey, value, actual_descriptor)
.handle_set_remote_value(
key,
subkey,
value,
actual_descriptor,
WatchUpdateMode::ExcludeTarget(target),
)
.await
};
match res {

View File

@ -291,7 +291,7 @@ impl StorageManagerInner {
continue;
};
local_record_store
.set_subkey(key, subkey, subkey_data)
.set_subkey(key, subkey, subkey_data, WatchUpdateMode::NoUpdate)
.await?;
}
@ -420,7 +420,7 @@ impl StorageManagerInner {
if let Some(signed_value_data) = subkey_result.value {
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data)
.set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate)
.await?;
}
@ -519,6 +519,7 @@ impl StorageManagerInner {
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: WatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
@ -527,7 +528,7 @@ impl StorageManagerInner {
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data)
.set_subkey(key, subkey, signed_value_data, watch_update_mode)
.await?;
Ok(())
@ -580,6 +581,7 @@ impl StorageManagerInner {
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
signed_value_descriptor: Arc<SignedValueDescriptor>,
watch_update_mode: WatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
@ -601,7 +603,7 @@ impl StorageManagerInner {
// Write subkey to remote store
remote_record_store
.set_subkey(key, subkey, signed_value_data)
.set_subkey(key, subkey, signed_value_data, watch_update_mode)
.await?;
Ok(())

View File

@ -228,7 +228,12 @@ impl StorageManager {
let res = if let Some(first_subkey) = subkeys.first() {
inner
.handle_set_local_value(key, first_subkey, value.clone())
.handle_set_local_value(
key,
first_subkey,
value.clone(),
WatchUpdateMode::NoUpdate,
)
.await
} else {
VeilidAPIResult::Ok(())