bdk_bitcoind_rpc/
lib.rs

1//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface. It does not
2//! use the wallet RPC API, so this crate can be used with wallet-disabled Bitcoin Core nodes.
3//!
4//! [`Emitter`] is the main structure which sources blockchain data from
5//! [`bitcoincore_rpc::Client`].
6//!
7//! To only get block updates (exclude mempool transactions), the caller can use
8//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
9//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
10#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
11#![warn(missing_docs)]
12
13#[allow(unused_imports)]
14#[macro_use]
15extern crate alloc;
16
17use alloc::sync::Arc;
18use bdk_core::collections::{HashMap, HashSet};
19use bdk_core::{BlockId, CheckPoint};
20use bitcoin::{Block, BlockHash, Transaction, Txid};
21use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
22use core::ops::Deref;
23
24pub mod bip158;
25
26pub use bitcoincore_rpc;
27
28/// The [`Emitter`] is used to emit data sourced from [`bitcoincore_rpc::Client`].
29///
30/// Refer to [module-level documentation] for more.
31///
32/// [module-level documentation]: crate
33pub struct Emitter<C> {
34    client: C,
35    start_height: u32,
36
37    /// The checkpoint of the last-emitted block that is in the best chain. If it is later found
38    /// that the block is no longer in the best chain, it will be popped off from here.
39    last_cp: CheckPoint,
40
41    /// The block result returned from rpc of the last-emitted block. As this result contains the
42    /// next block's block hash (which we use to fetch the next block), we set this to `None`
43    /// whenever there are no more blocks, or the next block is no longer in the best chain. This
44    /// gives us an opportunity to re-fetch this result.
45    last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
46
47    /// The last snapshot of mempool transactions.
48    ///
49    /// This is used to detect mempool evictions and as a cache for transactions to emit.
50    ///
51    /// For mempool evictions, the latest call to `getrawmempool` is compared against this field.
52    /// Any transaction that is missing from this field is considered evicted. The exception is if
53    /// the transaction is confirmed into a block - therefore, we only emit evictions when we are
54    /// sure the tip block is already emitted. When a block is emitted, the transactions in the
55    /// block are removed from this field.
56    mempool_snapshot: HashMap<Txid, Arc<Transaction>>,
57}
58
59/// Indicates that there are no initially-expected mempool transactions.
60///
61/// Use this as the `expected_mempool_txs` field of [`Emitter::new`] when the wallet is known
62/// to start empty (i.e. with no unconfirmed transactions).
63pub const NO_EXPECTED_MEMPOOL_TXS: core::iter::Empty<Arc<Transaction>> = core::iter::empty();
64
65impl<C> Emitter<C>
66where
67    C: Deref,
68    C::Target: RpcApi,
69{
70    /// Construct a new [`Emitter`].
71    ///
72    /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
73    /// can start emission from a block that connects to the original chain.
74    ///
75    /// `start_height` starts emission from a given height (if there are no conflicts with the
76    /// original chain).
77    ///
78    /// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the
79    /// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
80    /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used.
81    pub fn new(
82        client: C,
83        last_cp: CheckPoint,
84        start_height: u32,
85        expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
86    ) -> Self {
87        Self {
88            client,
89            start_height,
90            last_cp,
91            last_block: None,
92            mempool_snapshot: expected_mempool_txs
93                .into_iter()
94                .map(|tx| {
95                    let tx: Arc<Transaction> = tx.into();
96                    (tx.compute_txid(), tx)
97                })
98                .collect(),
99        }
100    }
101
102    /// Emit mempool transactions and any evicted [`Txid`]s.
103    ///
104    /// This method returns a [`MempoolEvent`] containing the full transactions (with their
105    /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted`] which are
106    /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
107    /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
108    /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
109    /// return an empty `evicted` set.
110    #[cfg(feature = "std")]
111    pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
112        let sync_time = std::time::UNIX_EPOCH
113            .elapsed()
114            .expect("must get current time")
115            .as_secs();
116        self.mempool_at(sync_time)
117    }
118
119    /// Emit mempool transactions and any evicted [`Txid`]s at the given `sync_time`.
120    ///
121    /// `sync_time` is in unix seconds.
122    ///
123    /// This is the no-std version of [`mempool`](Self::mempool).
124    pub fn mempool_at(&mut self, sync_time: u64) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
125        let client = &*self.client;
126
127        let mut rpc_tip_height;
128        let mut rpc_tip_hash;
129        let mut rpc_mempool;
130        let mut rpc_mempool_txids;
131
132        // Ensure we get a mempool snapshot consistent with `rpc_tip_hash` as the tip.
133        loop {
134            rpc_tip_height = client.get_block_count()?;
135            rpc_tip_hash = client.get_block_hash(rpc_tip_height)?;
136            rpc_mempool = client.get_raw_mempool()?;
137            rpc_mempool_txids = rpc_mempool.iter().copied().collect::<HashSet<Txid>>();
138            let is_still_at_tip = rpc_tip_hash == client.get_block_hash(rpc_tip_height)?
139                && rpc_tip_height == client.get_block_count()?;
140            if is_still_at_tip {
141                break;
142            }
143        }
144
145        let mut mempool_event = MempoolEvent {
146            update: rpc_mempool
147                .into_iter()
148                .filter_map(|txid| -> Option<Result<_, bitcoincore_rpc::Error>> {
149                    let tx = match self.mempool_snapshot.get(&txid) {
150                        Some(tx) => tx.clone(),
151                        None => match client.get_raw_transaction(&txid, None) {
152                            Ok(tx) => {
153                                let tx = Arc::new(tx);
154                                self.mempool_snapshot.insert(txid, tx.clone());
155                                tx
156                            }
157                            Err(err) if err.is_not_found_error() => return None,
158                            Err(err) => return Some(Err(err)),
159                        },
160                    };
161                    Some(Ok((tx, sync_time)))
162                })
163                .collect::<Result<Vec<_>, _>>()?,
164            ..Default::default()
165        };
166
167        let at_tip =
168            rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash();
169
170        if at_tip {
171            // We only emit evicted transactions when we have already emitted the RPC tip. This is
172            // because we cannot differentiate between transactions that are confirmed and
173            // transactions that are evicted, so we rely on emitted blocks to remove
174            // transactions from the `mempool_snapshot`.
175            mempool_event.evicted = self
176                .mempool_snapshot
177                .keys()
178                .filter(|&txid| !rpc_mempool_txids.contains(txid))
179                .map(|&txid| (txid, sync_time))
180                .collect();
181            self.mempool_snapshot = mempool_event
182                .update
183                .iter()
184                .map(|(tx, _)| (tx.compute_txid(), tx.clone()))
185                .collect();
186        } else {
187            // Since we are still catching up to the tip (a.k.a tip has not been emitted), we
188            // accumulate more transactions in `mempool_snapshot` so that we can emit evictions in
189            // a batch once we catch up.
190            self.mempool_snapshot.extend(
191                mempool_event
192                    .update
193                    .iter()
194                    .map(|(tx, _)| (tx.compute_txid(), tx.clone())),
195            );
196        };
197
198        Ok(mempool_event)
199    }
200
201    /// Emit the next block height and block (if any).
202    pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
203        if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
204            // Stop tracking unconfirmed transactions that have been confirmed in this block.
205            for tx in &block.txdata {
206                self.mempool_snapshot.remove(&tx.compute_txid());
207            }
208            return Ok(Some(BlockEvent { block, checkpoint }));
209        }
210        Ok(None)
211    }
212}
213
214/// A new emission from mempool.
215#[derive(Debug, Default)]
216pub struct MempoolEvent {
217    /// Transactions currently in the mempool alongside their seen-at timestamp.
218    pub update: Vec<(Arc<Transaction>, u64)>,
219
220    /// Transactions evicted from the mempool alongside their evicted-at timestamp.
221    pub evicted: Vec<(Txid, u64)>,
222}
223
224/// A newly emitted block from [`Emitter`].
225#[derive(Debug)]
226pub struct BlockEvent<B> {
227    /// The block.
228    pub block: B,
229
230    /// The checkpoint of the new block.
231    ///
232    /// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to
233    /// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since
234    /// then. These blocks are guaranteed to be of the same chain.
235    ///
236    /// This is important as BDK structures require block-to-apply to be connected with another
237    /// block in the original chain.
238    pub checkpoint: CheckPoint,
239}
240
241impl<B> BlockEvent<B> {
242    /// The block height of this new block.
243    pub fn block_height(&self) -> u32 {
244        self.checkpoint.height()
245    }
246
247    /// The block hash of this new block.
248    pub fn block_hash(&self) -> BlockHash {
249        self.checkpoint.hash()
250    }
251
252    /// The [`BlockId`] of a previous block that this block connects to.
253    ///
254    /// This either returns a [`BlockId`] of a previously emitted block or from the chain we started
255    /// with (passed in as `last_cp` in [`Emitter::new`]).
256    ///
257    /// This value is derived from [`BlockEvent::checkpoint`].
258    pub fn connected_to(&self) -> BlockId {
259        match self.checkpoint.prev() {
260            Some(prev_cp) => prev_cp.block_id(),
261            // there is no previous checkpoint, so just connect with itself
262            None => self.checkpoint.block_id(),
263        }
264    }
265}
266
267enum PollResponse {
268    Block(bitcoincore_rpc_json::GetBlockResult),
269    NoMoreBlocks,
270    /// Fetched block is not in the best chain.
271    BlockNotInBestChain,
272    AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
273    /// Force the genesis checkpoint down the receiver's throat.
274    AgreementPointNotFound(BlockHash),
275}
276
277fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
278where
279    C: Deref,
280    C::Target: RpcApi,
281{
282    let client = &*emitter.client;
283
284    if let Some(last_res) = &emitter.last_block {
285        let next_hash = if last_res.height < emitter.start_height as _ {
286            // enforce start height
287            let next_hash = client.get_block_hash(emitter.start_height as _)?;
288            // make sure last emission is still in best chain
289            if client.get_block_hash(last_res.height as _)? != last_res.hash {
290                return Ok(PollResponse::BlockNotInBestChain);
291            }
292            next_hash
293        } else {
294            match last_res.nextblockhash {
295                None => return Ok(PollResponse::NoMoreBlocks),
296                Some(next_hash) => next_hash,
297            }
298        };
299
300        let res = client.get_block_info(&next_hash)?;
301        if res.confirmations < 0 {
302            return Ok(PollResponse::BlockNotInBestChain);
303        }
304
305        return Ok(PollResponse::Block(res));
306    }
307
308    for cp in emitter.last_cp.iter() {
309        let res = match client.get_block_info(&cp.hash()) {
310            // block not in best chain
311            Ok(res) if res.confirmations < 0 => continue,
312            Ok(res) => res,
313            Err(e) if e.is_not_found_error() => {
314                if cp.height() > 0 {
315                    continue;
316                }
317                // if we can't find genesis block, we can't create an update that connects
318                break;
319            }
320            Err(e) => return Err(e),
321        };
322
323        // agreement point found
324        return Ok(PollResponse::AgreementFound(res, cp));
325    }
326
327    let genesis_hash = client.get_block_hash(0)?;
328    Ok(PollResponse::AgreementPointNotFound(genesis_hash))
329}
330
331fn poll<C, V, F>(
332    emitter: &mut Emitter<C>,
333    get_item: F,
334) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
335where
336    C: Deref,
337    C::Target: RpcApi,
338    F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
339{
340    loop {
341        match poll_once(emitter)? {
342            PollResponse::Block(res) => {
343                let height = res.height as u32;
344                let hash = res.hash;
345                let item = get_item(&hash, &emitter.client)?;
346
347                let new_cp = emitter
348                    .last_cp
349                    .clone()
350                    .push(BlockId { height, hash })
351                    .expect("must push");
352                emitter.last_cp = new_cp.clone();
353                emitter.last_block = Some(res);
354                return Ok(Some((new_cp, item)));
355            }
356            PollResponse::NoMoreBlocks => {
357                emitter.last_block = None;
358                return Ok(None);
359            }
360            PollResponse::BlockNotInBestChain => {
361                emitter.last_block = None;
362                continue;
363            }
364            PollResponse::AgreementFound(res, cp) => {
365                // get rid of evicted blocks
366                emitter.last_cp = cp;
367                emitter.last_block = Some(res);
368                continue;
369            }
370            PollResponse::AgreementPointNotFound(genesis_hash) => {
371                emitter.last_cp = CheckPoint::new(BlockId {
372                    height: 0,
373                    hash: genesis_hash,
374                });
375                emitter.last_block = None;
376                continue;
377            }
378        }
379    }
380}
381
382/// Extends [`bitcoincore_rpc::Error`].
383pub trait BitcoindRpcErrorExt {
384    /// Returns whether the error is a "not found" error.
385    ///
386    /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
387    /// [`Iterator::Item`].
388    fn is_not_found_error(&self) -> bool;
389}
390
391impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
392    fn is_not_found_error(&self) -> bool {
393        if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
394        {
395            rpc_err.code == -5
396        } else {
397            false
398        }
399    }
400}
401
402#[cfg(test)]
403#[cfg_attr(coverage_nightly, coverage(off))]
404mod test {
405    use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXS};
406    use bdk_chain::local_chain::LocalChain;
407    use bdk_testenv::{anyhow, TestEnv};
408    use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
409    use std::collections::HashSet;
410
411    #[test]
412    fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
413        let env = TestEnv::new()?;
414        let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
415        let chain_tip = chain.tip();
416        let mut emitter = Emitter::new(
417            env.rpc_client(),
418            chain_tip.clone(),
419            1,
420            NO_EXPECTED_MEMPOOL_TXS,
421        );
422
423        env.mine_blocks(100, None)?;
424        while emitter.next_block()?.is_some() {}
425
426        let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
427        let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
428        let mut mempool_txids = HashSet::new();
429
430        // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
431        for _ in 0..10 {
432            let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
433            mempool_txids.insert(sent_txid);
434            emitter.mempool()?;
435            env.mine_blocks(1, None)?;
436
437            for txid in &mempool_txids {
438                assert!(
439                    emitter.mempool_snapshot.contains_key(txid),
440                    "Expected txid {txid:?} missing"
441                );
442            }
443        }
444
445        // Process each block and check that confirmed txids are removed from from
446        // expected_mempool_txids.
447        while let Some(block_event) = emitter.next_block()? {
448            let confirmed_txids: HashSet<Txid> = block_event
449                .block
450                .txdata
451                .iter()
452                .map(|tx| tx.compute_txid())
453                .collect();
454            mempool_txids = mempool_txids
455                .difference(&confirmed_txids)
456                .copied()
457                .collect::<HashSet<_>>();
458            for txid in confirmed_txids {
459                assert!(
460                    !emitter.mempool_snapshot.contains_key(&txid),
461                    "Expected txid {txid:?} should have been removed"
462                );
463            }
464            for txid in &mempool_txids {
465                assert!(
466                    emitter.mempool_snapshot.contains_key(txid),
467                    "Expected txid {txid:?} missing"
468                );
469            }
470        }
471
472        assert!(emitter.mempool_snapshot.is_empty());
473
474        Ok(())
475    }
476}