Poll block headers for latest block on each iteration

The Electrum block-header subscription did not provide us with block headers, because upon the connection being closed by a node the subscription would end.
Re-newing the the subscription upon re-connect is not easily achievable, that's why we opted for a polling mode for now, where we start a block header subscription on every update iteration, that is only used once (when the subscription is made).
This commit is contained in:
Daniel Karzel 2021-05-17 18:40:10 +10:00
parent f2e43ea565
commit efb51820b1
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E

View File

@ -527,7 +527,7 @@ impl Watchable for (Txid, Script) {
pub struct Client {
electrum: bdk::electrum_client::Client,
latest_block: BlockHeight,
latest_block_height: BlockHeight,
last_ping: Instant,
interval: Duration,
script_history: BTreeMap<Script, Vec<GetHistoryRes>>,
@ -536,13 +536,15 @@ pub struct Client {
impl Client {
fn new(electrum: bdk::electrum_client::Client, interval: Duration) -> Result<Self> {
// Initially fetch the latest block for storing the height.
// We do not act on this subscription after this call.
let latest_block = electrum
.block_headers_subscribe()
.context("Failed to subscribe to header notifications")?;
Ok(Self {
electrum,
latest_block: BlockHeight::try_from(latest_block)?,
latest_block_height: BlockHeight::try_from(latest_block)?,
last_ping: Instant::now(),
interval,
script_history: Default::default(),
@ -572,14 +574,14 @@ impl Client {
}
}
fn drain_notifications(&mut self) -> Result<()> {
fn update_state(&mut self) -> Result<()> {
let pinged = self.ping();
if !pinged {
return Ok(());
}
self.drain_blockheight_notifications()?;
self.update_latest_block()?;
self.update_script_histories()?;
Ok(())
@ -596,7 +598,7 @@ impl Client {
self.script_history.insert(script.clone(), vec![]);
}
self.drain_notifications()?;
self.update_state()?;
let history = self.script_history.entry(script).or_default();
@ -618,7 +620,7 @@ impl Client {
Ok(ScriptStatus::Confirmed(
Confirmed::from_inclusion_and_latest_block(
u32::try_from(last.height)?,
u32::from(self.latest_block),
u32::from(self.latest_block_height),
),
))
}
@ -626,18 +628,24 @@ impl Client {
}
}
fn drain_blockheight_notifications(&mut self) -> Result<()> {
let latest_block = std::iter::from_fn(|| self.electrum.block_headers_pop().transpose())
.last()
.transpose()
.context("Failed to pop header notification")?;
fn update_latest_block(&mut self) -> Result<()> {
// Fetch the latest block for storing the height.
// We do not act on this subscription after this call, as we cannot rely on
// subscription push notifications because eventually the Electrum server will
// close the connection and subscriptions are not automatically renewed
// upon renewing the connection.
let latest_block = self
.electrum
.block_headers_subscribe()
.context("Failed to subscribe to header notifications")?;
let latest_block_height = BlockHeight::try_from(latest_block)?;
if let Some(new_block) = latest_block {
if latest_block_height > self.latest_block_height {
tracing::debug!(
block_height = new_block.height,
block_height = u32::from(latest_block_height),
"Got notification for new block"
);
self.latest_block = BlockHeight::try_from(new_block)?;
self.latest_block_height = latest_block_height;
}
Ok(())