1
2
3use std::borrow::Borrow;
4use std::collections::{HashMap, HashSet};
5use std::str::FromStr as _;
6
7use anyhow::Context;
8use bdk_core::{BlockId, CheckPoint};
9use bdk_esplora::esplora_client;
10use bitcoin::constants::genesis_block;
11use bitcoin::{
12 Amount, Block, BlockHash, FeeRate, Network, OutPoint, Transaction, Txid, Weight, Wtxid,
13};
14use log::{debug, info, warn};
15use tokio::sync::RwLock;
16
17use bitcoin_ext::{BlockHeight, BlockRef, FeeRateExt, TxStatus};
18use bitcoin_ext::rpc::{self, BitcoinRpcExt, BitcoinRpcErrorExt, RpcApi};
19use bitcoin_ext::esplora::EsploraClientExt;
20
21const FEE_RATE_TARGET_CONF_FAST: u16 = 1;
22const FEE_RATE_TARGET_CONF_REGULAR: u16 = 3;
23const FEE_RATE_TARGET_CONF_SLOW: u16 = 6;
24
25const TX_ALREADY_IN_CHAIN_ERROR: i32 = -27;
26const MIN_BITCOIND_VERSION: usize = 290000;
27
28#[derive(Clone, Debug)]
40pub enum ChainSourceSpec {
41 Bitcoind {
42 url: String,
44 auth: rpc::Auth,
46 },
47 Esplora {
48 url: String,
50 },
51}
52
53pub enum ChainSourceClient {
54 Bitcoind(rpc::Client),
55 Esplora(esplora_client::AsyncClient),
56}
57
58impl ChainSourceClient {
59 async fn check_network(&self, expected: Network) -> anyhow::Result<()> {
60 match self {
61 ChainSourceClient::Bitcoind(bitcoind) => {
62 let network = bitcoind.get_blockchain_info()?;
63 if expected != network.chain {
64 bail!("Network mismatch: expected {:?}, got {:?}", expected, network.chain);
65 }
66 },
67 ChainSourceClient::Esplora(client) => {
68 let res = client.client().get(format!("{}/block-height/0", client.url()))
69 .send().await?.text().await?;
70 let genesis_hash = BlockHash::from_str(&res)
71 .context("bad response from server (not a blockhash). Esplora client possibly misconfigured")?;
72 if genesis_hash != genesis_block(expected).block_hash() {
73 bail!("Network mismatch: expected {:?}, got {:?}", expected, genesis_hash);
74 }
75 },
76 };
77
78 Ok(())
79 }
80}
81
82pub struct ChainSource {
114 inner: ChainSourceClient,
115 network: Network,
116 fee_rates: RwLock<FeeRates>,
117}
118
119impl ChainSource {
120 pub fn require_version(&self) -> anyhow::Result<()> {
125 if let ChainSourceClient::Bitcoind(bitcoind) = self.inner() {
126 if bitcoind.version()? < MIN_BITCOIND_VERSION {
127 bail!("Bitcoin Core version is too old, you can participate in rounds but won't be able to unilaterally exit. Please upgrade to 29.0 or higher.");
128 }
129 }
130
131 Ok(())
132 }
133
134 pub(crate) fn inner(&self) -> &ChainSourceClient {
135 &self.inner
136 }
137
138 pub async fn fee_rates(&self) -> FeeRates {
140 self.fee_rates.read().await.clone()
141 }
142
143 pub fn network(&self) -> Network {
145 self.network
146 }
147
148 pub async fn new(spec: ChainSourceSpec, network: Network, fallback_fee: Option<FeeRate>) -> anyhow::Result<Self> {
181 let inner = match spec {
182 ChainSourceSpec::Bitcoind { url, auth } => ChainSourceClient::Bitcoind(
183 rpc::Client::new(&url, auth)
184 .context("failed to create bitcoind rpc client")?
185 ),
186 ChainSourceSpec::Esplora { url } => ChainSourceClient::Esplora({
187 let url = url.strip_suffix("/").unwrap_or(&url);
189 esplora_client::Builder::new(url).build_async()
190 .with_context(|| format!("failed to create esplora client for url {}", url))?
191 }),
192 };
193
194 inner.check_network(network).await?;
195
196 let fee = fallback_fee.unwrap_or(FeeRate::BROADCAST_MIN);
197 let fee_rates = RwLock::new(FeeRates { fast: fee, regular: fee, slow: fee });
198
199 Ok(Self { inner, network, fee_rates })
200 }
201
202 async fn fetch_fee_rates(&self) -> anyhow::Result<FeeRates> {
203 match self.inner() {
204 ChainSourceClient::Bitcoind(bitcoind) => {
205 let get_fee_rate = |target| {
206 let fee = bitcoind.estimate_smart_fee(
207 target, Some(rpc::json::EstimateMode::Economical),
208 )?;
209 if let Some(fee_rate) = fee.fee_rate {
210 Ok(FeeRate::from_amount_per_kvb_ceil(fee_rate))
211 } else {
212 Err(anyhow!("No rate returned from estimate_smart_fee for a {} confirmation target", target))
213 }
214 };
215 Ok(FeeRates {
216 fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST)?,
217 regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR).expect("should exist"),
218 slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW).expect("should exist"),
219 })
220 },
221 ChainSourceClient::Esplora(client) => {
222 let estimates = client.get_fee_estimates().await?;
224 let get_fee_rate = |target| {
225 let fee = estimates.get(&target).with_context(||
226 format!("No rate returned from get_fee_estimates for a {} confirmation target", target)
227 )?;
228 FeeRate::from_sat_per_vb_decimal_checked_ceil(*fee).with_context(||
229 format!("Invalid rate returned from get_fee_estimates {} for a {} confirmation target", fee, target)
230 )
231 };
232 Ok(FeeRates {
233 fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST)?,
234 regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR)?,
235 slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW)?,
236 })
237 }
238 }
239 }
240
241 pub async fn tip(&self) -> anyhow::Result<BlockHeight> {
242 match self.inner() {
243 ChainSourceClient::Bitcoind(bitcoind) => {
244 Ok(bitcoind.get_block_count()? as BlockHeight)
245 },
246 ChainSourceClient::Esplora(client) => {
247 Ok(client.get_height().await?)
248 },
249 }
250 }
251
252 pub async fn tip_ref(&self) -> anyhow::Result<BlockRef> {
253 self.block_ref(self.tip().await?).await
254 }
255
256 pub async fn block_ref(&self, height: BlockHeight) -> anyhow::Result<BlockRef> {
257 match self.inner() {
258 ChainSourceClient::Bitcoind(bitcoind) => {
259 let hash = bitcoind.get_block_hash(height as u64)?;
260 Ok(BlockRef { height, hash })
261 },
262 ChainSourceClient::Esplora(client) => {
263 let hash = client.get_block_hash(height).await?;
264 Ok(BlockRef { height, hash })
265 },
266 }
267 }
268
269 pub async fn block(&self, hash: BlockHash) -> anyhow::Result<Option<Block>> {
270 match self.inner() {
271 ChainSourceClient::Bitcoind(bitcoind) => {
272 match bitcoind.get_block(&hash) {
273 Ok(b) => Ok(Some(b)),
274 Err(e) if e.is_not_found() => Ok(None),
275 Err(e) => Err(e.into()),
276 }
277 },
278 ChainSourceClient::Esplora(client) => {
279 Ok(client.get_block_by_hash(&hash).await?)
280 },
281 }
282 }
283
284 pub async fn mempool_ancestor_info(&self, txid: Txid) -> anyhow::Result<MempoolAncestorInfo> {
287 let mut result = MempoolAncestorInfo::new(txid);
288
289 match self.inner() {
292 ChainSourceClient::Bitcoind(bitcoind) => {
293 let entry = bitcoind.get_mempool_entry(&txid)?;
294 let err = || anyhow!("missing weight parameter from getmempoolentry");
295
296 result.total_fee = entry.fees.ancestor;
297 result.total_weight = Weight::from_wu(entry.weight.ok_or_else(err)?) +
298 Weight::from_vb(entry.ancestor_size).ok_or_else(err)?;
299 },
300 ChainSourceClient::Esplora(client) => {
301 let status = self.tx_status(txid).await?;
304 if !matches!(status, TxStatus::Mempool) {
305 return Err(anyhow!("{} is not in the mempool, status is {:?}", txid, status));
306 }
307
308 let mut info_map: HashMap<Txid, esplora_client::Tx> = HashMap::new();
309 let mut set = HashSet::from([txid]);
310 while !set.is_empty() {
311 let requests = set.iter().filter_map(|txid| if info_map.contains_key(txid) {
313 None
314 } else {
315 Some((txid, client.get_tx_info(&txid)))
316 }).collect::<Vec<_>>();
317
318 let mut next_set = HashSet::new();
320
321 for (txid, request) in requests {
323 let info = request.await?
324 .ok_or_else(|| anyhow!("unable to retrieve tx info for {}", txid))?;
325 if !info.status.confirmed {
326 for vin in info.vin.iter() {
327 next_set.insert(vin.txid);
328 }
329 }
330 info_map.insert(*txid, info);
331 }
332 set = next_set;
333 }
334 for info in info_map.into_values().filter(|info| !info.status.confirmed) {
336 result.total_fee += info.fee();
337 result.total_weight += info.weight();
338 }
339 },
340 }
341 Ok(result)
343 }
344
345 pub async fn txs_spending_inputs<T: IntoIterator<Item = OutPoint>>(
348 &self,
349 outpoints: T,
350 block_scan_start: BlockHeight,
351 ) -> anyhow::Result<TxsSpendingInputsResult> {
352 let mut res = TxsSpendingInputsResult::new();
353 match self.inner() {
354 ChainSourceClient::Bitcoind(bitcoind) => {
355 let start = block_scan_start.saturating_sub(1);
357 let block_ref = self.block_ref(start).await?;
358 let cp = CheckPoint::new(BlockId {
359 height: block_ref.height,
360 hash: block_ref.hash,
361 });
362
363 let mut emitter = bdk_bitcoind_rpc::Emitter::new(
364 bitcoind, cp.clone(), cp.height(), bdk_bitcoind_rpc::NO_EXPECTED_MEMPOOL_TXS,
365 );
366
367 debug!("Scanning blocks for spent outpoints with bitcoind, starting at block height {}...", block_scan_start);
368 let outpoint_set = outpoints.into_iter().collect::<HashSet<_>>();
369 while let Some(em) = emitter.next_block()? {
370 if em.block_height() % 1000 == 0 {
372 info!("Scanned for spent outpoints until block height {}", em.block_height());
373 }
374 for tx in &em.block.txdata {
375 for txin in tx.input.iter() {
376 if outpoint_set.contains(&txin.previous_output) {
377 res.add(
378 txin.previous_output.clone(),
379 tx.compute_txid(),
380 TxStatus::Confirmed(BlockRef {
381 height: em.block_height(), hash: em.block.block_hash().clone()
382 })
383 );
384 if res.map.len() == outpoint_set.len() {
386 return Ok(res);
387 }
388 }
389 }
390 }
391 }
392
393 debug!("Finished scanning blocks for spent outpoints, now checking the mempool...");
394 let mempool = emitter.mempool()?;
395 for (tx, _last_seen) in &mempool.update {
396 for txin in tx.input.iter() {
397 if outpoint_set.contains(&txin.previous_output) {
398 res.add(
399 txin.previous_output.clone(),
400 tx.compute_txid(),
401 TxStatus::Mempool,
402 );
403
404 if res.map.len() == outpoint_set.len() {
406 return Ok(res);
407 }
408 }
409 }
410 }
411 debug!("Finished checking the mempool for spent outpoints");
412 },
413 ChainSourceClient::Esplora(client) => {
414 for outpoint in outpoints {
415 let output_status = client.get_output_status(&outpoint.txid, outpoint.vout.into()).await?;
416
417 if let Some(output_status) = output_status {
418 if output_status.spent {
419 let tx_status = {
420 let status = output_status.status.expect("Status should be valid if an outpoint is spent");
421 if status.confirmed {
422 TxStatus::Confirmed(BlockRef {
423 height: status.block_height.expect("Confirmed transaction missing block_height"),
424 hash: status.block_hash.expect("Confirmed transaction missing block_hash"),
425 })
426 } else {
427 TxStatus::Mempool
428 }
429 };
430 let txid = output_status.txid.expect("Txid should be valid if an outpoint is spent");
431 res.add(outpoint, txid, tx_status);
432 }
433 }
434 }
435 },
436 }
437
438 Ok(res)
439 }
440
441 pub async fn broadcast_tx(&self, tx: &Transaction) -> anyhow::Result<()> {
442 match self.inner() {
443 ChainSourceClient::Bitcoind(bitcoind) => {
444 match bitcoind.send_raw_transaction(tx) {
445 Ok(_) => Ok(()),
446 Err(rpc::Error::JsonRpc(
447 rpc::jsonrpc::Error::Rpc(e))
448 ) if e.code == TX_ALREADY_IN_CHAIN_ERROR => Ok(()),
449 Err(e) => Err(e.into()),
450 }
451 },
452 ChainSourceClient::Esplora(client) => {
453 client.broadcast(tx).await?;
454 Ok(())
455 },
456 }
457 }
458
459 pub async fn broadcast_package(&self, txs: &[impl Borrow<Transaction>]) -> anyhow::Result<()> {
460 #[derive(Debug, Deserialize)]
461 struct PackageTxInfo {
462 txid: Txid,
463 error: Option<String>,
464 }
465 #[derive(Debug, Deserialize)]
466 struct SubmitPackageResponse {
467 #[serde(rename = "tx-results")]
468 tx_results: HashMap<Wtxid, PackageTxInfo>,
469 package_msg: String,
470 }
471
472 match self.inner() {
473 ChainSourceClient::Bitcoind(bitcoind) => {
474 let hexes = txs.iter()
475 .map(|t| bitcoin::consensus::encode::serialize_hex(t.borrow()))
476 .collect::<Vec<_>>();
477 let res = bitcoind.call::<SubmitPackageResponse>("submitpackage", &[hexes.into()])?;
478 if res.package_msg != "success" {
479 let errors = res.tx_results.values()
480 .map(|t| format!("tx {}: {}",
481 t.txid, t.error.as_ref().map(|s| s.as_str()).unwrap_or("(no error)"),
482 ))
483 .collect::<Vec<_>>();
484 bail!("msg: '{}', errors: {:?}", res.package_msg, errors);
485 }
486 Ok(())
487 },
488 ChainSourceClient::Esplora(client) => {
489 let txs = txs.iter().map(|t| t.borrow().clone()).collect::<Vec<_>>();
490 let res = client.submit_package(&txs, None, None).await?;
491 if res.package_msg != "success" {
492 let errors = res.tx_results.values()
493 .map(|t| format!("tx {}: {}",
494 t.txid, t.error.as_ref().map(|s| s.as_str()).unwrap_or("(no error)"),
495 ))
496 .collect::<Vec<_>>();
497 bail!("msg: '{}', errors: {:?}", res.package_msg, errors);
498 }
499
500 Ok(())
501 },
502 }
503 }
504
505 pub async fn get_tx(&self, txid: &Txid) -> anyhow::Result<Option<Transaction>> {
506 match self.inner() {
507 ChainSourceClient::Bitcoind(bitcoind) => {
508 match bitcoind.get_raw_transaction(txid, None) {
509 Ok(tx) => Ok(Some(tx)),
510 Err(e) if e.is_not_found() => Ok(None),
511 Err(e) => Err(e.into()),
512 }
513 },
514 ChainSourceClient::Esplora(client) => {
515 Ok(client.get_tx(txid).await?)
516 },
517 }
518 }
519
520 pub async fn tx_confirmed(&self, txid: Txid) -> anyhow::Result<Option<BlockHeight>> {
522 Ok(self.tx_status(txid).await?.confirmed_height())
523 }
524
525 pub async fn tx_status(&self, txid: Txid) -> anyhow::Result<TxStatus> {
527 match self.inner() {
528 ChainSourceClient::Bitcoind(bitcoind) => Ok(bitcoind.tx_status(&txid)?),
529 ChainSourceClient::Esplora(esplora) => {
530 match esplora.get_tx_info(&txid).await? {
531 Some(info) => match (info.status.block_height, info.status.block_hash) {
532 (Some(block_height), Some(block_hash)) => Ok(TxStatus::Confirmed(BlockRef {
533 height: block_height,
534 hash: block_hash,
535 } )),
536 _ => Ok(TxStatus::Mempool),
537 },
538 None => Ok(TxStatus::NotFound),
539 }
540 },
541 }
542 }
543
544 #[allow(unused)]
545 pub async fn txout_value(&self, outpoint: &OutPoint) -> anyhow::Result<Amount> {
546 let tx = match self.inner() {
547 ChainSourceClient::Bitcoind(bitcoind) => {
548 bitcoind.get_raw_transaction(&outpoint.txid, None)
549 .with_context(|| format!("tx {} unknown", outpoint.txid))?
550 },
551 ChainSourceClient::Esplora(client) => {
552 client.get_tx(&outpoint.txid).await?
553 .with_context(|| format!("tx {} unknown", outpoint.txid))?
554 },
555 };
556 Ok(tx.output.get(outpoint.vout as usize).context("outpoint vout out of range")?.value)
557 }
558
559 pub async fn update_fee_rates(&self, fallback_fee: Option<FeeRate>) -> anyhow::Result<()> {
562 let fee_rates = match (self.fetch_fee_rates().await, fallback_fee) {
563 (Ok(fee_rates), _) => Ok(fee_rates),
564 (Err(e), None) => Err(e),
565 (Err(e), Some(fallback)) => {
566 warn!("Error getting fee rates, falling back to {} sat/kvB: {}",
567 fallback.to_btc_per_kvb(), e,
568 );
569 Ok(FeeRates { fast: fallback, regular: fallback, slow: fallback })
570 }
571 }?;
572
573 *self.fee_rates.write().await = fee_rates;
574 Ok(())
575 }
576}
577
578#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
580pub struct FeeRates {
581 pub fast: FeeRate,
583 pub regular: FeeRate,
585 pub slow: FeeRate,
587}
588
589#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
591pub struct MempoolAncestorInfo {
592 pub txid: Txid,
594 pub total_fee: Amount,
597 pub total_weight: Weight,
599}
600
601impl MempoolAncestorInfo {
602 pub fn new(txid: Txid) -> Self {
603 Self {
604 txid,
605 total_fee: Amount::ZERO,
606 total_weight: Weight::ZERO,
607 }
608 }
609
610 pub fn effective_fee_rate(&self) -> Option<FeeRate> {
611 FeeRate::from_amount_and_weight_ceil(self.total_fee, self.total_weight)
612 }
613}
614
615#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
616pub struct TxsSpendingInputsResult {
617 pub map: HashMap<OutPoint, (Txid, TxStatus)>,
618}
619
620impl TxsSpendingInputsResult {
621 pub fn new() -> Self {
622 Self { map: HashMap::new() }
623 }
624
625 pub fn add(&mut self, outpoint: OutPoint, txid: Txid, status: TxStatus) {
626 self.map.insert(outpoint, (txid, status));
627 }
628
629 pub fn get(&self, outpoint: &OutPoint) -> Option<&(Txid, TxStatus)> {
630 self.map.get(outpoint)
631 }
632
633 pub fn confirmed_txids(&self) -> impl Iterator<Item = (Txid, BlockRef)> + '_ {
634 self.map
635 .iter()
636 .filter_map(|(_, (txid, status))| {
637 match status {
638 TxStatus::Confirmed(block) => Some((*txid, *block)),
639 _ => None,
640 }
641 })
642 }
643
644 pub fn mempool_txids(&self) -> impl Iterator<Item = Txid> + '_ {
645 self.map
646 .iter()
647 .filter(|(_, (_, status))| matches!(status, TxStatus::Mempool))
648 .map(|(_, (txid, _))| *txid)
649 }
650}