1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
11#![warn(missing_docs)]
12
13#[allow(unused_imports)]
14#[macro_use]
15extern crate alloc;
16
17use alloc::sync::Arc;
18use bdk_core::collections::{HashMap, HashSet};
19use bdk_core::{BlockId, CheckPoint};
20use bitcoin::{Block, BlockHash, Transaction, Txid};
21use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
22use core::ops::Deref;
23
24pub mod bip158;
25
26pub use bitcoincore_rpc;
27
28pub struct Emitter<C> {
34 client: C,
35 start_height: u32,
36
37 last_cp: CheckPoint,
40
41 last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
46
47 mempool_snapshot: HashMap<Txid, Arc<Transaction>>,
57}
58
59pub const NO_EXPECTED_MEMPOOL_TXS: core::iter::Empty<Arc<Transaction>> = core::iter::empty();
64
65impl<C> Emitter<C>
66where
67 C: Deref,
68 C::Target: RpcApi,
69{
70 pub fn new(
82 client: C,
83 last_cp: CheckPoint,
84 start_height: u32,
85 expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
86 ) -> Self {
87 Self {
88 client,
89 start_height,
90 last_cp,
91 last_block: None,
92 mempool_snapshot: expected_mempool_txs
93 .into_iter()
94 .map(|tx| {
95 let tx: Arc<Transaction> = tx.into();
96 (tx.compute_txid(), tx)
97 })
98 .collect(),
99 }
100 }
101
102 #[cfg(feature = "std")]
111 pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
112 let sync_time = std::time::UNIX_EPOCH
113 .elapsed()
114 .expect("must get current time")
115 .as_secs();
116 self.mempool_at(sync_time)
117 }
118
119 pub fn mempool_at(&mut self, sync_time: u64) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
125 let client = &*self.client;
126
127 let mut rpc_tip_height;
128 let mut rpc_tip_hash;
129 let mut rpc_mempool;
130 let mut rpc_mempool_txids;
131
132 loop {
134 rpc_tip_height = client.get_block_count()?;
135 rpc_tip_hash = client.get_block_hash(rpc_tip_height)?;
136 rpc_mempool = client.get_raw_mempool()?;
137 rpc_mempool_txids = rpc_mempool.iter().copied().collect::<HashSet<Txid>>();
138 let is_still_at_tip = rpc_tip_hash == client.get_block_hash(rpc_tip_height)?
139 && rpc_tip_height == client.get_block_count()?;
140 if is_still_at_tip {
141 break;
142 }
143 }
144
145 let mut mempool_event = MempoolEvent {
146 update: rpc_mempool
147 .into_iter()
148 .filter_map(|txid| -> Option<Result<_, bitcoincore_rpc::Error>> {
149 let tx = match self.mempool_snapshot.get(&txid) {
150 Some(tx) => tx.clone(),
151 None => match client.get_raw_transaction(&txid, None) {
152 Ok(tx) => {
153 let tx = Arc::new(tx);
154 self.mempool_snapshot.insert(txid, tx.clone());
155 tx
156 }
157 Err(err) if err.is_not_found_error() => return None,
158 Err(err) => return Some(Err(err)),
159 },
160 };
161 Some(Ok((tx, sync_time)))
162 })
163 .collect::<Result<Vec<_>, _>>()?,
164 ..Default::default()
165 };
166
167 let at_tip =
168 rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash();
169
170 if at_tip {
171 mempool_event.evicted = self
176 .mempool_snapshot
177 .keys()
178 .filter(|&txid| !rpc_mempool_txids.contains(txid))
179 .map(|&txid| (txid, sync_time))
180 .collect();
181 self.mempool_snapshot = mempool_event
182 .update
183 .iter()
184 .map(|(tx, _)| (tx.compute_txid(), tx.clone()))
185 .collect();
186 } else {
187 self.mempool_snapshot.extend(
191 mempool_event
192 .update
193 .iter()
194 .map(|(tx, _)| (tx.compute_txid(), tx.clone())),
195 );
196 };
197
198 Ok(mempool_event)
199 }
200
201 pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
203 if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
204 for tx in &block.txdata {
206 self.mempool_snapshot.remove(&tx.compute_txid());
207 }
208 return Ok(Some(BlockEvent { block, checkpoint }));
209 }
210 Ok(None)
211 }
212}
213
214#[derive(Debug, Default)]
216pub struct MempoolEvent {
217 pub update: Vec<(Arc<Transaction>, u64)>,
219
220 pub evicted: Vec<(Txid, u64)>,
222}
223
224#[derive(Debug)]
226pub struct BlockEvent<B> {
227 pub block: B,
229
230 pub checkpoint: CheckPoint,
239}
240
241impl<B> BlockEvent<B> {
242 pub fn block_height(&self) -> u32 {
244 self.checkpoint.height()
245 }
246
247 pub fn block_hash(&self) -> BlockHash {
249 self.checkpoint.hash()
250 }
251
252 pub fn connected_to(&self) -> BlockId {
259 match self.checkpoint.prev() {
260 Some(prev_cp) => prev_cp.block_id(),
261 None => self.checkpoint.block_id(),
263 }
264 }
265}
266
267enum PollResponse {
268 Block(bitcoincore_rpc_json::GetBlockResult),
269 NoMoreBlocks,
270 BlockNotInBestChain,
272 AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
273 AgreementPointNotFound(BlockHash),
275}
276
277fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
278where
279 C: Deref,
280 C::Target: RpcApi,
281{
282 let client = &*emitter.client;
283
284 if let Some(last_res) = &emitter.last_block {
285 let next_hash = if last_res.height < emitter.start_height as _ {
286 let next_hash = client.get_block_hash(emitter.start_height as _)?;
288 if client.get_block_hash(last_res.height as _)? != last_res.hash {
290 return Ok(PollResponse::BlockNotInBestChain);
291 }
292 next_hash
293 } else {
294 match last_res.nextblockhash {
295 None => return Ok(PollResponse::NoMoreBlocks),
296 Some(next_hash) => next_hash,
297 }
298 };
299
300 let res = client.get_block_info(&next_hash)?;
301 if res.confirmations < 0 {
302 return Ok(PollResponse::BlockNotInBestChain);
303 }
304
305 return Ok(PollResponse::Block(res));
306 }
307
308 for cp in emitter.last_cp.iter() {
309 let res = match client.get_block_info(&cp.hash()) {
310 Ok(res) if res.confirmations < 0 => continue,
312 Ok(res) => res,
313 Err(e) if e.is_not_found_error() => {
314 if cp.height() > 0 {
315 continue;
316 }
317 break;
319 }
320 Err(e) => return Err(e),
321 };
322
323 return Ok(PollResponse::AgreementFound(res, cp));
325 }
326
327 let genesis_hash = client.get_block_hash(0)?;
328 Ok(PollResponse::AgreementPointNotFound(genesis_hash))
329}
330
331fn poll<C, V, F>(
332 emitter: &mut Emitter<C>,
333 get_item: F,
334) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
335where
336 C: Deref,
337 C::Target: RpcApi,
338 F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
339{
340 loop {
341 match poll_once(emitter)? {
342 PollResponse::Block(res) => {
343 let height = res.height as u32;
344 let hash = res.hash;
345 let item = get_item(&hash, &emitter.client)?;
346
347 let new_cp = emitter
348 .last_cp
349 .clone()
350 .push(BlockId { height, hash })
351 .expect("must push");
352 emitter.last_cp = new_cp.clone();
353 emitter.last_block = Some(res);
354 return Ok(Some((new_cp, item)));
355 }
356 PollResponse::NoMoreBlocks => {
357 emitter.last_block = None;
358 return Ok(None);
359 }
360 PollResponse::BlockNotInBestChain => {
361 emitter.last_block = None;
362 continue;
363 }
364 PollResponse::AgreementFound(res, cp) => {
365 emitter.last_cp = cp;
367 emitter.last_block = Some(res);
368 continue;
369 }
370 PollResponse::AgreementPointNotFound(genesis_hash) => {
371 emitter.last_cp = CheckPoint::new(BlockId {
372 height: 0,
373 hash: genesis_hash,
374 });
375 emitter.last_block = None;
376 continue;
377 }
378 }
379 }
380}
381
382pub trait BitcoindRpcErrorExt {
384 fn is_not_found_error(&self) -> bool;
389}
390
391impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
392 fn is_not_found_error(&self) -> bool {
393 if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
394 {
395 rpc_err.code == -5
396 } else {
397 false
398 }
399 }
400}
401
402#[cfg(test)]
403#[cfg_attr(coverage_nightly, coverage(off))]
404mod test {
405 use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXS};
406 use bdk_chain::local_chain::LocalChain;
407 use bdk_testenv::{anyhow, TestEnv};
408 use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
409 use std::collections::HashSet;
410
411 #[test]
412 fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
413 let env = TestEnv::new()?;
414 let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
415 let chain_tip = chain.tip();
416 let mut emitter = Emitter::new(
417 env.rpc_client(),
418 chain_tip.clone(),
419 1,
420 NO_EXPECTED_MEMPOOL_TXS,
421 );
422
423 env.mine_blocks(100, None)?;
424 while emitter.next_block()?.is_some() {}
425
426 let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
427 let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
428 let mut mempool_txids = HashSet::new();
429
430 for _ in 0..10 {
432 let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
433 mempool_txids.insert(sent_txid);
434 emitter.mempool()?;
435 env.mine_blocks(1, None)?;
436
437 for txid in &mempool_txids {
438 assert!(
439 emitter.mempool_snapshot.contains_key(txid),
440 "Expected txid {txid:?} missing"
441 );
442 }
443 }
444
445 while let Some(block_event) = emitter.next_block()? {
448 let confirmed_txids: HashSet<Txid> = block_event
449 .block
450 .txdata
451 .iter()
452 .map(|tx| tx.compute_txid())
453 .collect();
454 mempool_txids = mempool_txids
455 .difference(&confirmed_txids)
456 .copied()
457 .collect::<HashSet<_>>();
458 for txid in confirmed_txids {
459 assert!(
460 !emitter.mempool_snapshot.contains_key(&txid),
461 "Expected txid {txid:?} should have been removed"
462 );
463 }
464 for txid in &mempool_txids {
465 assert!(
466 emitter.mempool_snapshot.contains_key(txid),
467 "Expected txid {txid:?} missing"
468 );
469 }
470 }
471
472 assert!(emitter.mempool_snapshot.is_empty());
473
474 Ok(())
475 }
476}