valueset bugfix

This commit is contained in:
Christien Rioux 2024-05-17 21:23:42 -04:00
parent c272c768fc
commit 8e90a83142
2 changed files with 326 additions and 329 deletions

541
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -89,12 +89,28 @@ impl StorageManager {
); );
// If the node was close enough to possibly set the value // If the node was close enough to possibly set the value
if sva.answer.set {
let mut ctx = context.lock(); let mut ctx = context.lock();
if !sva.answer.set {
ctx.missed_since_last_set += 1;
// Return peers if we have some
log_network_result!(debug "SetValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_set, sva.answer.peers.len());
return Ok(NetworkResult::value(sva.answer.peers));
}
// See if we got a value back
let Some(value) = sva.answer.value else {
// No newer value was found and returned, so increase our consensus count
ctx.value_nodes.push(next_node);
ctx.missed_since_last_set = 0;
// Return peers if we have some
log_network_result!(debug "SetValue returned no value, fanout call returned peers {}", sva.answer.peers.len());
return Ok(NetworkResult::value(sva.answer.peers));
};
// Keep the value if we got one and it is newer and it passes schema validation // Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = sva.answer.value { log_dht!(debug "SetValue got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
// Validate with schema // Validate with schema
if !ctx.schema.check_subkey_value_data( if !ctx.schema.check_subkey_value_data(
@ -107,40 +123,29 @@ impl StorageManager {
} }
// If we got a value back it should be different than the one we are setting // If we got a value back it should be different than the one we are setting
// But in the case of a benign bug, we can just move to the next node
if ctx.value.value_data() == value.value_data() { if ctx.value.value_data() == value.value_data() {
// Move to the next node ctx.value_nodes.push(next_node);
return Ok(NetworkResult::invalid_message("same value returned")); ctx.missed_since_last_set = 0;
return Ok(NetworkResult::value(sva.answer.peers));
} }
// We have a prior value, ensure this is a newer sequence number // We have a prior value, ensure this is a newer sequence number
let prior_seq = ctx.value.value_data().seq(); let prior_seq = ctx.value.value_data().seq();
let new_seq = value.value_data().seq(); let new_seq = value.value_data().seq();
if new_seq >= prior_seq { if new_seq < prior_seq {
// If the sequence number is older node should have not returned a value here.
// Skip this node and its closer list because it is misbehaving
// Ignore this value and pretend we never saw this node
return Ok(NetworkResult::invalid_message("Sequence number is older"));
}
// If the sequence number is greater or equal, keep it // If the sequence number is greater or equal, keep it
// even if the sequence number is the same, accept all conflicts in an attempt to resolve them
ctx.value = Arc::new(value); ctx.value = Arc::new(value);
// One node has shown us this value so far // One node has shown us this value so far
ctx.value_nodes = vec![next_node]; ctx.value_nodes = vec![next_node];
ctx.missed_since_last_set = 0; ctx.missed_since_last_set = 0;
} else {
// If the sequence number is older, or an equal sequence number,
// node should have not returned a value here.
// Skip this node and its closer list because it is misbehaving
return Ok(NetworkResult::invalid_message("Sequence number is older"));
}
} else {
// It was set on this node and no newer value was found and returned,
// so increase our consensus count
ctx.value_nodes.push(next_node);
ctx.missed_since_last_set = 0;
}
} else {
let mut ctx = context.lock();
ctx.missed_since_last_set += 1;
}
// Return peers if we have some
log_network_result!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
Ok(NetworkResult::value(sva.answer.peers)) Ok(NetworkResult::value(sva.answer.peers))
} }
}; };
@ -232,8 +237,17 @@ impl StorageManager {
// Make sure this value would actually be newer // Make sure this value would actually be newer
if let Some(last_value) = &last_get_result.opt_value { if let Some(last_value) = &last_get_result.opt_value {
if value.value_data().seq() <= last_value.value_data().seq() { if value.value_data().seq() < last_value.value_data().seq() {
// inbound value is older than or equal to the sequence number that we have, just return the one we have // inbound value is older than the sequence number that we have, just return the one we have
return Ok(NetworkResult::value(Some(last_value.clone())));
} else if value.value_data().seq() == last_value.value_data().seq() {
// inbound value is equal to the sequence number that we have
// if the value is the same including the writer, return nothing,
// otherwise return the existing value because it was here first
if value.value_data() == last_value.value_data() {
return Ok(NetworkResult::value(None));
}
// sequence number is the same but there's a value conflict, return what we have
return Ok(NetworkResult::value(Some(last_value.clone()))); return Ok(NetworkResult::value(Some(last_value.clone())));
} }
} }