convert deferred stream processor to generic streams instead of flume channel

more aggressive relay reconnection
more discovery context improvement
This commit is contained in:
Christien Rioux 2024-09-25 21:28:30 -04:00
parent 55f07d7bcc
commit 545b646d8f
17 changed files with 240 additions and 63 deletions

4
Cargo.lock generated
View File

@ -6299,9 +6299,9 @@ dependencies = [
[[package]]
name = "veilid-hashlink"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a3dabbda02cfe176635dcaa18a021416ff2eb4d0b47a913e3fdc7f62049d7b1"
checksum = "2070d1d09dad90091d23e49743408f82f8874994dec5ae0a8d3689b061bba426"
dependencies = [
"hashbrown 0.14.5",
"serde",

View File

@ -88,7 +88,7 @@ enumset = { version = "1.1.3", features = ["serde"] }
keyvaluedb = "0.1.2"
range-set-blaze = "0.1.16"
weak-table = "0.3.2"
hashlink = { package = "veilid-hashlink", version = "0.1.0", features = [
hashlink = { package = "veilid-hashlink", version = "0.1.1", features = [
"serde_impl",
] }

View File

@ -55,6 +55,7 @@ struct ConnectionManagerInner {
async_processor_jh: Option<MustJoinHandle<()>>,
stop_source: Option<StopSource>,
protected_addresses: HashMap<SocketAddress, ProtectedAddress>,
reconnection_processor: DeferredStreamProcessor,
}
struct ConnectionManagerArc {
@ -84,6 +85,7 @@ impl ConnectionManager {
stop_source: StopSource,
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: MustJoinHandle<()>,
reconnection_processor: DeferredStreamProcessor,
) -> ConnectionManagerInner {
ConnectionManagerInner {
next_id: 0.into(),
@ -91,6 +93,7 @@ impl ConnectionManager {
sender,
async_processor_jh: Some(async_processor_jh),
protected_addresses: HashMap::new(),
reconnection_processor,
}
}
fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc {
@ -133,11 +136,6 @@ impl ConnectionManager {
log_net!(debug "startup connection manager");
let mut inner = self.arc.inner.lock();
if inner.is_some() {
panic!("shouldn't start connection manager twice without shutting it down first");
}
// Create channel for async_processor to receive notifications of networking events
let (sender, receiver) = flume::unbounded();
@ -150,8 +148,21 @@ impl ConnectionManager {
self.clone().async_processor(stop_source.token(), receiver),
);
// Spawn the reconnection processor
let mut reconnection_processor = DeferredStreamProcessor::new();
reconnection_processor.init().await;
// Store in the inner object
*inner = Some(Self::new_inner(stop_source, sender, async_processor));
let mut inner = self.arc.inner.lock();
if inner.is_some() {
panic!("shouldn't start connection manager twice without shutting it down first");
}
*inner = Some(Self::new_inner(
stop_source,
sender,
async_processor,
reconnection_processor,
));
guard.success();
@ -175,7 +186,9 @@ impl ConnectionManager {
}
}
};
// Stop the reconnection processor
log_net!(debug "stopping reconnection processor task");
inner.reconnection_processor.terminate().await;
// Stop all the connections and the async processor
log_net!(debug "stopping async processor task");
drop(inner.stop_source.take());
@ -278,11 +291,12 @@ impl ConnectionManager {
&self,
inner: &mut ConnectionManagerInner,
prot_conn: ProtocolNetworkConnection,
opt_dial_info: Option<DialInfo>,
) -> EyreResult<NetworkResult<ConnectionHandle>> {
// Get next connection id to use
let id = inner.next_id;
inner.next_id += 1u64;
log_net!(debug
log_net!(
"on_new_protocol_network_connection: id={} prot_conn={:?}",
id,
prot_conn
@ -294,7 +308,13 @@ impl ConnectionManager {
None => bail!("not creating connection because we are stopping"),
};
let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
let mut conn = NetworkConnection::from_protocol(
self.clone(),
stop_token,
prot_conn,
id,
opt_dial_info,
);
let handle = conn.get_handle();
// See if this should be a protected connection
@ -396,7 +416,7 @@ impl ConnectionManager {
// Async lock on the remote address for atomicity per remote
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await;
log_net!(debug "== get_or_create_connection dial_info={:?}", dial_info);
log_net!("== get_or_create_connection dial_info={:?}", dial_info);
// If any connection to this remote exists that has the same protocol, return it
// Any connection will do, we don't have to match the local address but if we can
@ -406,7 +426,7 @@ impl ConnectionManager {
.connection_table
.get_best_connection_by_remote(best_port, peer_address)
{
log_net!(debug
log_net!(
"== Returning best existing connection {:?}",
best_existing_conn
);
@ -468,7 +488,7 @@ impl ConnectionManager {
}
};
self.on_new_protocol_network_connection(inner, prot_conn)
self.on_new_protocol_network_connection(inner, prot_conn, Some(dial_info))
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@ -502,7 +522,7 @@ impl ConnectionManager {
// We don't care if this fails, since nobody here asked for the inbound connection.
// If it does, we just drop the connection
let _ = self.on_new_protocol_network_connection(inner, prot_conn);
let _ = self.on_new_protocol_network_connection(inner, prot_conn, None);
}
None => {
// If this somehow happens, we're shutting down
@ -598,6 +618,9 @@ impl ConnectionManager {
// See if we've had more than the threshold number of drops in the last span
let cur_ts = Timestamp::now();
let duration = cur_ts.saturating_sub(pa.span_start_ts);
let mut reconnect = true;
if duration < PROTECTED_CONNECTION_DROP_SPAN {
pa.drops_in_span += 1;
log_net!(debug "== Protected connection dropped (count={}): {} -> {} for node {}", pa.drops_in_span, conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
@ -605,6 +628,7 @@ impl ConnectionManager {
if pa.drops_in_span >= PROTECTED_CONNECTION_DROP_COUNT {
// Consider this as a failure to send if we've dropped the connection too many times in a single timespan
protect_nr.report_protected_connection_dropped();
reconnect = false;
// Reset the drop counter
pa.drops_in_span = 0;
@ -618,6 +642,15 @@ impl ConnectionManager {
log_net!(debug "== Protected connection dropped (count={}): {} -> {} for node {}", pa.drops_in_span, conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
}
// Reconnect the protected connection immediately
if reconnect {
if let Some(dial_info) = conn.dial_info() {
self.spawn_reconnector_inner(inner, dial_info);
} else {
log_net!(debug "Can't reconnect to accepted protected connection: {} -> {} for node {}", conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
}
}
break;
}
}
@ -627,6 +660,30 @@ impl ConnectionManager {
}
}
fn spawn_reconnector_inner(&self, inner: &mut ConnectionManagerInner, dial_info: DialInfo) {
let this = self.clone();
inner.reconnection_processor.add(
Box::pin(futures_util::stream::once(async { dial_info })),
move |dial_info| {
let this = this.clone();
Box::pin(async move {
match this.get_or_create_connection(dial_info.clone()).await {
Ok(NetworkResult::Value(conn)) => {
log_net!(debug "Reconnection successful to {}: {:?}", dial_info,conn);
}
Ok(res) => {
log_net!(debug "Reconnection unsuccessful to {}: {:?}", dial_info, res);
}
Err(e) => {
log_net!(debug "Reconnection error to {}: {}", dial_info, e);
}
}
false
})
},
);
}
pub async fn debug_print(&self) -> String {
//let inner = self.arc.inner.lock();
format!(

View File

@ -1210,7 +1210,7 @@ impl NetworkManager {
}
// Report peer info changes
pub fn report_peer_info_change(&self, peer_info: Arc<PeerInfo>) {
pub fn report_peer_info_change(&mut self, peer_info: Arc<PeerInfo>) {
let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_peer_info_change(peer_info);

View File

@ -199,7 +199,6 @@ impl DiscoveryContext {
}
// For each peer, ask them for our public address, filtering on desired dial info
let mut unord = FuturesUnordered::new();
let get_public_address_func = |node: NodeRef| {
let this = self.clone();
@ -225,12 +224,12 @@ impl DiscoveryContext {
};
let mut external_address_infos = Vec::new();
for node in nodes.iter().take(nodes.len() - 1).cloned() {
let mut unord = FuturesUnordered::new();
for node in nodes.iter().cloned() {
let gpa_future = get_public_address_func(node);
unord.push(gpa_future);
// Always process two at a time so we get both addresses in parallel if possible
// Always process N at a time so we get all addresses in parallel if possible
if unord.len() == EXTERNAL_INFO_VALIDATIONS {
// Process one
if let Some(Some(ei)) = unord.next().in_current_span().await {
@ -280,10 +279,10 @@ impl DiscoveryContext {
{
let mut inner = self.inner.lock();
inner.external_info = external_address_infos;
log_net!(debug "external_info ({:?}:{:?})[{}]",
log_net!(debug "External Addresses: ({:?}:{:?})[{}]",
self.unlocked_inner.protocol_type,
self.unlocked_inner.address_type,
inner.external_info.iter().map(|x| format!("{}",x.address)).collect::<Vec<_>>().join(", "));
inner.external_info.iter().map(|x| format!("{} <- {}",x.address, x.node)).collect::<Vec<_>>().join(", "));
}
true

View File

@ -30,6 +30,10 @@ use std::path::{Path, PathBuf};
/////////////////////////////////////////////////////////////////
pub const UPDATE_NETWORK_CLASS_TASK_TICK_PERIOD_SECS: u32 = 1;
pub const NETWORK_INTERFACES_TASK_TICK_PERIOD_SECS: u32 = 1;
pub const UPNP_TASK_TICK_PERIOD_SECS: u32 = 1;
pub const PEEK_DETECT_LEN: usize = 64;
cfg_if! {
@ -168,9 +172,15 @@ impl Network {
routing_table,
connection_manager,
interfaces: NetworkInterfaces::new(),
update_network_class_task: TickTask::new("update_network_class_task", 1),
network_interfaces_task: TickTask::new("network_interfaces_task", 1),
upnp_task: TickTask::new("upnp_task", 1),
update_network_class_task: TickTask::new(
"update_network_class_task",
UPDATE_NETWORK_CLASS_TASK_TICK_PERIOD_SECS,
),
network_interfaces_task: TickTask::new(
"network_interfaces_task",
NETWORK_INTERFACES_TASK_TICK_PERIOD_SECS,
),
upnp_task: TickTask::new("upnp_task", UPNP_TASK_TICK_PERIOD_SECS),
network_task_lock: AsyncMutex::new(()),
igd_manager: igd_manager::IGDManager::new(config.clone()),
}

View File

@ -14,16 +14,15 @@ impl Network {
let _guard = self.unlocked_inner.network_task_lock.lock().await;
// Do the public dial info check
let out = self.do_public_dial_info_check(stop_token, l, t).await;
let finished = self.do_public_dial_info_check(stop_token, l, t).await?;
// Done with public dial info check
{
if finished {
let mut inner = self.inner.lock();
inner.needs_public_dial_info_check = false;
inner.public_dial_info_check_punishment = None;
}
out
Ok(())
}
#[instrument(level = "trace", skip(self, editor), err)]
@ -54,7 +53,7 @@ impl Network {
stop_token: StopToken,
_l: Timestamp,
_t: Timestamp,
) -> EyreResult<()> {
) -> EyreResult<bool> {
// Figure out if we can optimize TCP/WS checking since they are often on the same port
let (protocol_config, inbound_protocol_map) = {
let mut inner = self.inner.lock();
@ -105,7 +104,7 @@ impl Network {
.into_iter()
.collect();
// Set most permissive network config
// Set most permissive network config and start from scratch
let mut editor = self.routing_table().edit_public_internet_routing_domain();
editor.setup_network(
protocol_config.outbound,
@ -113,11 +112,9 @@ impl Network {
protocol_config.family_global,
protocol_config.public_internet_capabilities.clone(),
);
editor.commit(true).await;
// Start from scratch
editor.clear_dial_info_details(None, None);
editor.set_network_class(None);
editor.commit(true).await;
// Process all protocol and address combinations
let mut unord = FuturesUnordered::new();
@ -125,20 +122,15 @@ impl Network {
for ((at, _llpt, port), protocols) in &inbound_protocol_map {
let first_pt = protocols.first().unwrap();
let discovery_context = DiscoveryContext::new(
self.routing_table(),
self.clone(),
*first_pt,
*at,
*port,
// clear_network_callback.clone(),
);
let discovery_context =
DiscoveryContext::new(self.routing_table(), self.clone(), *first_pt, *at, *port);
discovery_context.discover(&mut unord).await;
}
// Wait for all discovery futures to complete and apply discoverycontexts
let mut all_address_types = AddressTypeSet::new();
let mut force_outbound_only = false;
let mut success = true;
loop {
match unord
.next()
@ -181,7 +173,8 @@ impl Network {
}
}
Ok(Some(None)) => {
// Found no new dial info for this protocol/address combination
// Found no dial info for this protocol/address combination
success = false;
}
Ok(None) => {
// All done, normally
@ -189,11 +182,16 @@ impl Network {
}
Err(_) => {
// Stop token, exit early without error propagation
return Ok(());
return Ok(true);
}
}
}
if !success {
log_net!(debug "Network class discovery failed, trying again");
return Ok(false);
}
// All done
log_net!(debug "Network class discovery finished with address_types {:?}", all_address_types);
@ -230,7 +228,7 @@ impl Network {
}
}
Ok(())
Ok(true)
}
/// Make a dialinfo from an address and protocol type

View File

@ -82,16 +82,29 @@ pub struct NetworkConnectionStats {
last_message_recv_time: Option<Timestamp>,
}
/// Represents a connection in the connection table for connection-oriented protocols
#[derive(Debug)]
pub(in crate::network_manager) struct NetworkConnection {
/// A unique id for this connection
connection_id: NetworkConnectionId,
/// The dial info used to make this connection if it was made with 'connect'
/// None if the connection was 'accepted'
opt_dial_info: Option<DialInfo>,
/// The network flow 5-tuple this connection is over
flow: Flow,
/// Each connection has a processor and this is the task we wait for to ensure it exits cleanly
processor: Option<MustJoinHandle<()>>,
/// When this connection was connected or accepted
established_time: Timestamp,
/// Statistics about network traffic
stats: Arc<Mutex<NetworkConnectionStats>>,
/// To send data out this connection, it is places in this channel
sender: flume::Sender<(Option<Id>, Vec<u8>)>,
/// Drop this when we want to drop the connection
stop_source: Option<StopSource>,
/// The node we are responsible for protecting the connection for if it is protected
protected_nr: Option<NodeRef>,
/// The number of references to the network connection that exist (handles)
ref_count: usize,
}
@ -110,6 +123,7 @@ impl NetworkConnection {
Self {
connection_id: id,
opt_dial_info: None,
flow,
processor: None,
established_time: Timestamp::now(),
@ -129,6 +143,7 @@ impl NetworkConnection {
manager_stop_token: StopToken,
protocol_connection: ProtocolNetworkConnection,
connection_id: NetworkConnectionId,
opt_dial_info: Option<DialInfo>,
) -> Self {
// Get flow
let flow = protocol_connection.flow();
@ -164,6 +179,7 @@ impl NetworkConnection {
// Return the connection
Self {
connection_id,
opt_dial_info,
flow,
processor: Some(processor),
established_time: Timestamp::now(),
@ -183,6 +199,10 @@ impl NetworkConnection {
self.flow
}
pub fn dial_info(&self) -> Option<DialInfo> {
self.opt_dial_info.clone()
}
#[expect(dead_code)]
pub fn unique_flow(&self) -> UniqueFlow {
UniqueFlow {

View File

@ -230,6 +230,64 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_entries_fastest(
&self,
min_state: BucketEntryState,
capabilities: Vec<FourCC>,
node_count: usize,
) -> String {
let cur_ts = Timestamp::now();
let mut filters = VecDeque::new();
filters.push_front(
Box::new(|rti: &RoutingTableInner, e: Option<Arc<BucketEntry>>| {
let Some(e) = e else {
return false;
};
let cap_match = e.with(rti, |_rti, e| {
e.has_all_capabilities(RoutingDomain::PublicInternet, &capabilities)
});
let state = e.with(rti, |_rti, e| e.state(cur_ts));
state >= min_state && cap_match
}) as RoutingTableEntryFilter,
);
let nodes = self.find_preferred_fastest_nodes(
node_count,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
NodeRef::new(self.clone(), entry.unwrap().clone())
},
);
let mut out = String::new();
for node in nodes {
out += &node.operate(|_rti, e| {
let state_reason = e.state_reason(cur_ts);
format!(
" {} [{}] {} [{}]\n",
node,
Self::format_state_reason(state_reason),
e.peer_stats()
.latency
.as_ref()
.map(|l| {
format!("{:.2}ms", timestamp_to_secs(l.average.as_u64()) * 1000.0)
})
.unwrap_or_else(|| "???.??ms".to_string()),
if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) {
ni.capabilities()
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",")
} else {
"???".to_owned()
}
)
});
}
out
}
pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String {
let cur_ts = Timestamp::now();

View File

@ -2,6 +2,7 @@ use super::*;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::FutureExt;
use stop_token::future::FutureExt as _;
const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2;
@ -102,10 +103,10 @@ impl RoutingTable {
}
/// Test set of routes and remove the ones that don't test clean
#[instrument(level = "trace", skip(self, _stop_token), err)]
#[instrument(level = "trace", skip(self, stop_token), err)]
async fn test_route_set(
&self,
_stop_token: StopToken,
stop_token: StopToken,
routes_needing_testing: Vec<RouteId>,
) -> EyreResult<()> {
if routes_needing_testing.is_empty() {
@ -152,7 +153,7 @@ impl RoutingTable {
}
// Wait for test_route futures to complete in parallel
while unord.next().await.is_some() {}
while let Ok(Some(())) = unord.next().timeout_at(stop_token.clone()).await {}
}
// Process failed routes

View File

@ -1,12 +1,18 @@
use super::*;
// Keep member order appropriate for sorting < preference
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoDetail {
pub class: DialInfoClass,
pub dial_info: DialInfo,
}
impl fmt::Debug for DialInfoDetail {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}:{}", self.class, self.dial_info)
}
}
impl MatchesDialInfoFilter for DialInfoDetail {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
self.dial_info.matches_filter(filter)

View File

@ -15,7 +15,7 @@ pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC");
pub const DISTANCE_METRIC_CAPABILITIES: &[Capability] = &[CAP_DHT, CAP_DHT_WATCH];
#[derive(Clone, Default, PartialEq, Eq, Debug, Serialize, Deserialize)]
#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeInfo {
network_class: NetworkClass,
outbound_protocols: ProtocolTypeSet,
@ -26,6 +26,24 @@ pub struct NodeInfo {
dial_info_detail_list: Vec<DialInfoDetail>,
}
impl fmt::Debug for NodeInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// xxx: use field_with once that is on stable. trying to make this structure use fewer log lines when pretty printed
f.debug_struct("NodeInfo")
.field("network_class", &self.network_class)
.field(
"outbound_protocols",
&format!("{:?}", &self.outbound_protocols),
)
.field("address_types", &format!("{:?}", &self.address_types))
.field("envelope_support", &format!("{:?}", &self.envelope_support))
.field("crypto_support", &format!("{:?}", &self.crypto_support))
.field("capabilities", &format!("{:?}", &self.capabilities))
.field("dial_info_detail_list", &self.dial_info_detail_list)
.finish()
}
}
impl NodeInfo {
pub fn new(
network_class: NetworkClass,

View File

@ -743,6 +743,7 @@ impl StorageManagerInner {
receiver: flume::Receiver<T>,
handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
) -> bool {
self.deferred_result_processor.add(receiver, handler)
self.deferred_result_processor
.add(receiver.into_stream(), handler)
}
}

View File

@ -204,9 +204,7 @@ impl StorageManager {
}
}
std::collections::hash_map::Entry::Vacant(_) => {
panic!(
"offline write work items should always be on offline_subkey_writes entries that exist"
)
warn!("offline write work items should always be on offline_subkey_writes entries that exist: ignoring key {}", result.key)
}
}

View File

@ -610,9 +610,12 @@ impl VeilidAPI {
let mut min_state = BucketEntryState::Unreliable;
let mut capabilities = vec![];
let mut fastest = false;
for arg in args {
if let Some(ms) = get_bucket_entry_state(&arg) {
min_state = ms;
} else if arg == "fastest" {
fastest = true;
} else {
for cap in arg.split(',') {
if let Ok(capfcc) = FourCC::from_str(cap) {
@ -626,7 +629,10 @@ impl VeilidAPI {
// Dump routing table entries
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_entries(min_state, capabilities))
Ok(match fastest {
true => routing_table.debug_info_entries_fastest(min_state, capabilities, 100000),
false => routing_table.debug_info_entries(min_state, capabilities),
})
}
async fn debug_entry(&self, args: String) -> VeilidAPIResult<String> {

View File

@ -398,10 +398,12 @@ async def test_dht_integration_writer_reader():
for desc0 in records:
while True:
rr = await rc0.inspect_dht_record(desc0.key, [])
if len(rr.offline_subkeys) == 0:
left = len(rr.offline_subkeys)
if left == 0:
await rc0.close_dht_record(desc0.key)
break
time.sleep(1)
print(f' {left} left')
time.sleep(0.1)
# read dht records on server 1
print(f'reading {COUNT} records')
@ -455,13 +457,15 @@ async def test_dht_write_read_local():
print(f' {n}')
print(f'syncing records to the network')
print('syncing records to the network')
for desc0 in records:
while True:
rr = await rc0.inspect_dht_record(desc0.key, [])
if len(rr.offline_subkeys) == 0:
left = len(rr.offline_subkeys)
if left == 0:
await rc0.close_dht_record(desc0.key)
break
print(f' {left} left')
time.sleep(0.1)
# read dht records on server 0

View File

@ -9,6 +9,7 @@ use super::*;
/// Background processor for streams
/// Handles streams to completion, passing each item from the stream to a callback
#[derive(Debug)]
pub struct DeferredStreamProcessor {
pub opt_deferred_stream_channel: Option<flume::Sender<SendPinBoxFuture<()>>>,
pub opt_stopper: Option<StopSource>,
@ -98,9 +99,9 @@ impl DeferredStreamProcessor {
/// * 'handler' is the callback to handle each item from the stream
///
/// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
pub fn add<T: Send + 'static>(
pub fn add<T: Send + 'static, S: futures_util::Stream<Item = T> + Unpin + Send + 'static>(
&mut self,
receiver: flume::Receiver<T>,
mut receiver: S,
mut handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
) -> bool {
let Some(st) = self.opt_stopper.as_ref().map(|s| s.token()) else {
@ -110,7 +111,7 @@ impl DeferredStreamProcessor {
return false;
};
let drp = Box::pin(async move {
while let Ok(Ok(res)) = receiver.recv_async().timeout_at(st.clone()).await {
while let Ok(Some(res)) = receiver.next().timeout_at(st.clone()).await {
if !handler(res).await {
break;
}