Merge branch 'dht-optimization' into 'main'

DHT optimization and bugfixes

See merge request veilid/veilid!270
This commit is contained in:
Christien Rioux 2024-04-20 15:11:30 +00:00
commit 18050a5f86
13 changed files with 159 additions and 26 deletions

View File

@ -284,14 +284,13 @@ where
};
// Initialize closest nodes list
if init_fanout_queue.is_empty() {
if let Err(e) = self.clone().init_closest_nodes() {
return TimeoutOr::value(Err(e));
}
} else {
self.clone().add_to_fanout_queue(&init_fanout_queue);
if let Err(e) = self.clone().init_closest_nodes() {
return TimeoutOr::value(Err(e));
}
// Ensure we include the most recent nodes
self.clone().add_to_fanout_queue(&init_fanout_queue);
// Do a quick check to see if we're already done
{
let mut ctx = self.context.lock();

View File

@ -43,6 +43,12 @@ impl StorageManager {
)
};
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
let inner = self.inner.lock().await;
inner.get_value_nodes(key)?.unwrap_or_default()
};
// Make do-get-value answer context
let schema = if let Some(d) = &last_get_result.opt_descriptor {
Some(d.schema()?)
@ -179,7 +185,7 @@ impl StorageManager {
check_done,
);
let kind = match fanout_call.run(vec![]).await {
let kind = match fanout_call.run(init_fanout_queue).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => FanoutResultKind::Timeout,
// If we finished with or without consensus (enough nodes returning the same value)

View File

@ -82,6 +82,12 @@ impl StorageManager {
}
};
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
let inner = self.inner.lock().await;
inner.get_value_nodes(key)?.unwrap_or_default()
};
// Make do-inspect-value answer context
let opt_descriptor_info = if let Some(descriptor) = &local_inspect_result.opt_descriptor {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network.
@ -253,7 +259,7 @@ impl StorageManager {
check_done,
);
let kind = match fanout_call.run(vec![]).await {
let kind = match fanout_call.run(init_fanout_queue).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => FanoutResultKind::Timeout,
// If we finished with or without consensus (enough nodes returning the same value)

View File

@ -795,10 +795,19 @@ impl StorageManager {
"more subkeys returned locally than requested"
);
// Get the offline subkeys for this record still only returning the ones we're inspecting
let offline_subkey_writes = inner
.offline_subkey_writes
.get(&key)
.map(|o| o.subkeys.clone())
.unwrap_or_default()
.intersect(&subkeys);
// If this is the maximum scope we're interested in, return the report
if matches!(scope, DHTReportScope::Local) {
return Ok(DHTRecordReport::new(
local_inspect_result.subkeys,
offline_subkey_writes,
local_inspect_result.seqs,
vec![],
));
@ -864,6 +873,7 @@ impl StorageManager {
Ok(DHTRecordReport::new(
result.inspect_result.subkeys,
offline_subkey_writes,
local_inspect_result.seqs,
result.inspect_result.seqs,
))

View File

@ -44,6 +44,12 @@ impl StorageManager {
)
};
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
let inner = self.inner.lock().await;
inner.get_value_nodes(key)?.unwrap_or_default()
};
// Make do-set-value answer context
let schema = descriptor.schema()?;
let context = Arc::new(Mutex::new(OutboundSetValueContext {
@ -170,7 +176,7 @@ impl StorageManager {
check_done,
);
let kind = match fanout_call.run(vec![]).await {
let kind = match fanout_call.run(init_fanout_queue).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => FanoutResultKind::Timeout,
// If we finished with or without consensus (enough nodes returning the same value)

View File

@ -10,13 +10,17 @@ impl StorageManager {
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let offline_subkey_writes = {
let inner = self.lock().await?;
inner.offline_subkey_writes.clone()
let (mut offline_subkey_writes, opt_update_callback) = {
let mut inner = self.lock().await?;
let out = (
inner.offline_subkey_writes.clone(),
inner.update_callback.clone(),
);
inner.offline_subkey_writes.clear();
out
};
// make a safety selection that is conservative
for (key, osw) in offline_subkey_writes {
for (key, osw) in offline_subkey_writes.iter_mut() {
if poll!(stop_token.clone()).is_ready() {
log_stor!(debug "Offline subkey writes cancelled.");
break;
@ -25,10 +29,12 @@ impl StorageManager {
log_stor!(debug "Offline subkey writes stopped for network.");
break;
};
let mut written_subkeys = ValueSubkeyRangeSet::new();
for subkey in osw.subkeys.iter() {
let get_result = {
let mut inner = self.lock().await?;
inner.handle_get_local_value(key, subkey, true).await
inner.handle_get_local_value(*key, subkey, true).await
};
let Ok(get_result) = get_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
@ -43,22 +49,52 @@ impl StorageManager {
continue;
};
log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len());
if let Err(e) = self
let osvres = self
.outbound_set_value(
rpc_processor.clone(),
key,
*key,
subkey,
osw.safety_selection,
value,
descriptor,
)
.await
{
log_stor!(debug "failed to write offline subkey: {}", e);
.await;
match osvres {
Ok(osv) => {
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(
VeilidValueChange {
key: *key,
subkeys: ValueSubkeyRangeSet::single(subkey),
count: u32::MAX,
value: Some(osv.signed_value_data.value_data().clone()),
},
)));
}
written_subkeys.insert(subkey);
}
Err(e) => {
log_stor!(debug "failed to write offline subkey: {}", e);
}
}
}
let mut inner = self.lock().await?;
inner.offline_subkey_writes.remove(&key);
osw.subkeys = osw.subkeys.difference(&written_subkeys);
}
// Add any subkeys back in that were not successfully written
let mut inner = self.lock().await?;
for (key, osw) in offline_subkey_writes {
if !osw.subkeys.is_empty() {
inner
.offline_subkey_writes
.entry(key)
.and_modify(|x| {
x.subkeys = x.subkeys.union(&osw.subkeys);
})
.or_insert(osw);
}
}
Ok(())

View File

@ -12,6 +12,8 @@ pub struct DHTRecordReport {
/// This may be a subset of the requested range if it exceeds the schema limits
/// or has more than 512 subkeys
subkeys: ValueSubkeyRangeSet,
/// The subkeys that have been writen offline that still need to be flushed
offline_subkeys: ValueSubkeyRangeSet,
/// The sequence numbers of each subkey requested from a locally stored DHT Record
local_seqs: Vec<ValueSeqNum>,
/// The sequence numbers of each subkey requested from the DHT over the network
@ -22,11 +24,13 @@ from_impl_to_jsvalue!(DHTRecordReport);
impl DHTRecordReport {
pub fn new(
subkeys: ValueSubkeyRangeSet,
offline_subkeys: ValueSubkeyRangeSet,
local_seqs: Vec<ValueSeqNum>,
network_seqs: Vec<ValueSeqNum>,
) -> Self {
Self {
subkeys,
offline_subkeys,
local_seqs,
network_seqs,
}
@ -35,6 +39,9 @@ impl DHTRecordReport {
pub fn subkeys(&self) -> &ValueSubkeyRangeSet {
&self.subkeys
}
pub fn offline_subkeys(&self) -> &ValueSubkeyRangeSet {
&self.offline_subkeys
}
pub fn local_seqs(&self) -> &[ValueSeqNum] {
&self.local_seqs
}
@ -47,8 +54,9 @@ impl fmt::Debug for DHTRecordReport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"DHTRecordReport {{\n subkeys: {:?}\n local_seqs:\n{}\n remote_seqs:\n{}\n}}\n",
"DHTRecordReport {{\n subkeys: {:?}\n offline_subkeys: {:?}\n local_seqs:\n{}\n remote_seqs:\n{}\n}}\n",
&self.subkeys,
&self.offline_subkeys,
&debug_seqs(&self.local_seqs),
&debug_seqs(&self.network_seqs)
)

View File

@ -55,7 +55,7 @@ class HistoryWrapper {
}
});
},
focusNode: FocusNode(onKey: (FocusNode node, RawKeyEvent event) {
focusNode: FocusNode(onKeyEvent: (FocusNode node, KeyEvent event) {
if (event.logicalKey == LogicalKeyboardKey.arrowDown ||
event.logicalKey == LogicalKeyboardKey.arrowUp) {
return KeyEventResult.handled;

View File

@ -246,6 +246,7 @@ class RouteBlob with _$RouteBlob {
class DHTRecordReport with _$DHTRecordReport {
const factory DHTRecordReport({
required List<ValueSubkeyRange> subkeys,
required List<ValueSubkeyRange> offlineSubkeys,
required List<int> localSeqs,
required List<int> networkSeqs,
}) = _DHTRecordReport;

View File

@ -1363,6 +1363,8 @@ DHTRecordReport _$DHTRecordReportFromJson(Map<String, dynamic> json) {
/// @nodoc
mixin _$DHTRecordReport {
List<ValueSubkeyRange> get subkeys => throw _privateConstructorUsedError;
List<ValueSubkeyRange> get offlineSubkeys =>
throw _privateConstructorUsedError;
List<int> get localSeqs => throw _privateConstructorUsedError;
List<int> get networkSeqs => throw _privateConstructorUsedError;
@ -1380,6 +1382,7 @@ abstract class $DHTRecordReportCopyWith<$Res> {
@useResult
$Res call(
{List<ValueSubkeyRange> subkeys,
List<ValueSubkeyRange> offlineSubkeys,
List<int> localSeqs,
List<int> networkSeqs});
}
@ -1398,6 +1401,7 @@ class _$DHTRecordReportCopyWithImpl<$Res, $Val extends DHTRecordReport>
@override
$Res call({
Object? subkeys = null,
Object? offlineSubkeys = null,
Object? localSeqs = null,
Object? networkSeqs = null,
}) {
@ -1406,6 +1410,10 @@ class _$DHTRecordReportCopyWithImpl<$Res, $Val extends DHTRecordReport>
? _value.subkeys
: subkeys // ignore: cast_nullable_to_non_nullable
as List<ValueSubkeyRange>,
offlineSubkeys: null == offlineSubkeys
? _value.offlineSubkeys
: offlineSubkeys // ignore: cast_nullable_to_non_nullable
as List<ValueSubkeyRange>,
localSeqs: null == localSeqs
? _value.localSeqs
: localSeqs // ignore: cast_nullable_to_non_nullable
@ -1428,6 +1436,7 @@ abstract class _$$DHTRecordReportImplCopyWith<$Res>
@useResult
$Res call(
{List<ValueSubkeyRange> subkeys,
List<ValueSubkeyRange> offlineSubkeys,
List<int> localSeqs,
List<int> networkSeqs});
}
@ -1444,6 +1453,7 @@ class __$$DHTRecordReportImplCopyWithImpl<$Res>
@override
$Res call({
Object? subkeys = null,
Object? offlineSubkeys = null,
Object? localSeqs = null,
Object? networkSeqs = null,
}) {
@ -1452,6 +1462,10 @@ class __$$DHTRecordReportImplCopyWithImpl<$Res>
? _value._subkeys
: subkeys // ignore: cast_nullable_to_non_nullable
as List<ValueSubkeyRange>,
offlineSubkeys: null == offlineSubkeys
? _value._offlineSubkeys
: offlineSubkeys // ignore: cast_nullable_to_non_nullable
as List<ValueSubkeyRange>,
localSeqs: null == localSeqs
? _value._localSeqs
: localSeqs // ignore: cast_nullable_to_non_nullable
@ -1469,9 +1483,11 @@ class __$$DHTRecordReportImplCopyWithImpl<$Res>
class _$DHTRecordReportImpl implements _DHTRecordReport {
const _$DHTRecordReportImpl(
{required final List<ValueSubkeyRange> subkeys,
required final List<ValueSubkeyRange> offlineSubkeys,
required final List<int> localSeqs,
required final List<int> networkSeqs})
: _subkeys = subkeys,
_offlineSubkeys = offlineSubkeys,
_localSeqs = localSeqs,
_networkSeqs = networkSeqs;
@ -1486,6 +1502,14 @@ class _$DHTRecordReportImpl implements _DHTRecordReport {
return EqualUnmodifiableListView(_subkeys);
}
final List<ValueSubkeyRange> _offlineSubkeys;
@override
List<ValueSubkeyRange> get offlineSubkeys {
if (_offlineSubkeys is EqualUnmodifiableListView) return _offlineSubkeys;
// ignore: implicit_dynamic_type
return EqualUnmodifiableListView(_offlineSubkeys);
}
final List<int> _localSeqs;
@override
List<int> get localSeqs {
@ -1504,7 +1528,7 @@ class _$DHTRecordReportImpl implements _DHTRecordReport {
@override
String toString() {
return 'DHTRecordReport(subkeys: $subkeys, localSeqs: $localSeqs, networkSeqs: $networkSeqs)';
return 'DHTRecordReport(subkeys: $subkeys, offlineSubkeys: $offlineSubkeys, localSeqs: $localSeqs, networkSeqs: $networkSeqs)';
}
@override
@ -1513,6 +1537,8 @@ class _$DHTRecordReportImpl implements _DHTRecordReport {
(other.runtimeType == runtimeType &&
other is _$DHTRecordReportImpl &&
const DeepCollectionEquality().equals(other._subkeys, _subkeys) &&
const DeepCollectionEquality()
.equals(other._offlineSubkeys, _offlineSubkeys) &&
const DeepCollectionEquality()
.equals(other._localSeqs, _localSeqs) &&
const DeepCollectionEquality()
@ -1524,6 +1550,7 @@ class _$DHTRecordReportImpl implements _DHTRecordReport {
int get hashCode => Object.hash(
runtimeType,
const DeepCollectionEquality().hash(_subkeys),
const DeepCollectionEquality().hash(_offlineSubkeys),
const DeepCollectionEquality().hash(_localSeqs),
const DeepCollectionEquality().hash(_networkSeqs));
@ -1545,6 +1572,7 @@ class _$DHTRecordReportImpl implements _DHTRecordReport {
abstract class _DHTRecordReport implements DHTRecordReport {
const factory _DHTRecordReport(
{required final List<ValueSubkeyRange> subkeys,
required final List<ValueSubkeyRange> offlineSubkeys,
required final List<int> localSeqs,
required final List<int> networkSeqs}) = _$DHTRecordReportImpl;
@ -1554,6 +1582,8 @@ abstract class _DHTRecordReport implements DHTRecordReport {
@override
List<ValueSubkeyRange> get subkeys;
@override
List<ValueSubkeyRange> get offlineSubkeys;
@override
List<int> get localSeqs;
@override
List<int> get networkSeqs;

View File

@ -116,6 +116,9 @@ _$DHTRecordReportImpl _$$DHTRecordReportImplFromJson(
subkeys: (json['subkeys'] as List<dynamic>)
.map(ValueSubkeyRange.fromJson)
.toList(),
offlineSubkeys: (json['offline_subkeys'] as List<dynamic>)
.map(ValueSubkeyRange.fromJson)
.toList(),
localSeqs:
(json['local_seqs'] as List<dynamic>).map((e) => e as int).toList(),
networkSeqs:
@ -126,6 +129,8 @@ Map<String, dynamic> _$$DHTRecordReportImplToJson(
_$DHTRecordReportImpl instance) =>
<String, dynamic>{
'subkeys': instance.subkeys.map((e) => e.toJson()).toList(),
'offline_subkeys':
instance.offlineSubkeys.map((e) => e.toJson()).toList(),
'local_seqs': instance.localSeqs,
'network_seqs': instance.networkSeqs,
};

View File

@ -2805,6 +2805,7 @@
"required": [
"local_seqs",
"network_seqs",
"offline_subkeys",
"subkeys"
],
"properties": {
@ -2826,6 +2827,27 @@
"minimum": 0.0
}
},
"offline_subkeys": {
"description": "The subkeys that have been writen offline that still need to be flushed",
"type": "array",
"items": {
"type": "array",
"items": [
{
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
{
"type": "integer",
"format": "uint32",
"minimum": 0.0
}
],
"maxItems": 2,
"minItems": 2
}
},
"subkeys": {
"description": "The actual subkey range within the schema being reported on This may be a subset of the requested range if it exceeds the schema limits or has more than 512 subkeys",
"type": "array",

View File

@ -382,26 +382,30 @@ class DHTRecordDescriptor:
class DHTRecordReport:
subkeys: list[tuple[ValueSubkey, ValueSubkey]]
offline_subkeys: list[tuple[ValueSubkey, ValueSubkey]]
local_seqs: list[ValueSeqNum]
network_seqs: list[ValueSeqNum]
def __init__(
self,
subkeys: list[tuple[ValueSubkey, ValueSubkey]],
offline_subkeys: list[tuple[ValueSubkey, ValueSubkey]],
local_seqs: list[ValueSeqNum],
network_seqs: list[ValueSeqNum],
):
self.subkeys = subkeys
self.offline_subkey = offline_subkeys
self.local_seqs = local_seqs
self.network_seqs = network_seqs
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(subkeys={self.subkeys!r}, local_seqs={self.local_seqs!r}, network_seqs={self.network_seqs!r})>"
return f"<{self.__class__.__name__}(subkeys={self.subkeys!r}, offline_subkeys={self.offline_subkeys!r}, local_seqs={self.local_seqs!r}, network_seqs={self.network_seqs!r})>"
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(
[[p[0], p[1]] for p in j["subkeys"]],
[[p[0], p[1]] for p in j["offline_subkeys"]],
[ValueSeqNum(s) for s in j["local_seqs"]],
[ValueSeqNum(s) for s in j["network_seqs"]],
)