bark/onchain/
bdk.rs

1use std::ops::{Deref, DerefMut};
2use std::time::{SystemTime, UNIX_EPOCH};
3use std::sync::Arc;
4
5use anyhow::Context;
6use bdk_esplora::EsploraAsyncExt;
7use bdk_wallet::chain::{ChainPosition, CheckPoint};
8use bdk_wallet::Wallet as BdkWallet;
9use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
10use bdk_wallet::{Balance, KeychainKind, LocalOutput, SignOptions, TxBuilder, TxOrdering};
11use bitcoin::{
12	bip32, psbt, Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut,
13	Txid,
14};
15use log::{debug, error, info, trace, warn};
16
17use bitcoin_ext::{BlockHeight, BlockRef};
18use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
19use bitcoin_ext::cpfp::CpfpError;
20use bitcoin_ext::rpc::RpcApi;
21
22use crate::exit::ExitVtxo;
23use crate::exit::models::ExitState;
24use crate::onchain::{
25	ChainSource, LocalUtxo, GetBalance, GetSpendingTx, GetWalletTx, MakeCpfp, MakeCpfpFees,
26	PreparePsbt, SignPsbt, Utxo,
27};
28use crate::onchain::chain::ChainSourceClient;
29use crate::persist::BarkPersister;
30use crate::psbtext::PsbtInputExt;
31
32const STOP_GAP: usize = 50;
33const PARALLEL_REQS: usize = 4;
34const GENESIS_HEIGHT: u32 = 0;
35
36impl From<LocalOutput> for LocalUtxo {
37	fn from(value: LocalOutput) -> Self {
38		LocalUtxo {
39			outpoint: value.outpoint,
40			amount: value.txout.value,
41			confirmation_height: value.chain_position.confirmation_height_upper_bound(),
42		}
43	}
44}
45
46/// Trait extension for TxBuilder to add exit outputs
47///
48/// When used, the resulting PSBT should be signed using
49/// [crate::exit::Exit::sign_exit_claim_inputs].
50pub trait TxBuilderExt {
51	fn add_exit_claim_inputs(&mut self, exit_outputs: &[&ExitVtxo]) -> anyhow::Result<()>;
52}
53
54impl<Cs> TxBuilderExt for TxBuilder<'_, Cs> {
55	fn add_exit_claim_inputs(&mut self, exit_outputs: &[&ExitVtxo]) -> anyhow::Result<()> {
56		self.version(2);
57
58		for input in exit_outputs {
59			if !matches!(input.state(), ExitState::Claimable(..)) {
60				bail!("VTXO exit is not spendable");
61			}
62
63			let vtxo = input.vtxo();
64
65			let mut psbt_in = psbt::Input::default();
66			psbt_in.set_exit_claim_input(&vtxo);
67			psbt_in.witness_utxo = Some(TxOut {
68				script_pubkey: vtxo.output_script_pubkey(),
69				value: vtxo.amount(),
70			});
71
72			self.add_foreign_utxo_with_sequence(
73				vtxo.point(),
74				psbt_in,
75				vtxo.claim_satisfaction_weight(),
76				Sequence::from_height(vtxo.exit_delta()),
77			).expect("error adding foreign utxo for claim input");
78		}
79
80		Ok(())
81	}
82}
83
84impl <W: Deref<Target = BdkWallet>> GetBalance for W {
85	fn get_balance(&self) -> Amount {
86		self.deref().balance().total()
87	}
88}
89
90impl SignPsbt for BdkWallet {
91	fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
92		let opts = SignOptions {
93			trust_witness_utxo: true,
94			..Default::default()
95		};
96
97		let finalized = self.sign(&mut psbt, opts).context("signing error")?;
98		assert!(finalized);
99		let tx = psbt.extract_tx()?;
100		let unix = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
101		self.apply_unconfirmed_txs([(tx.clone(), unix)]);
102		Ok(tx)
103	}
104}
105
106impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
107	/// Retrieves a transaction from the wallet
108	///
109	/// This method will only check the database and will not
110	/// use a chain-source to find the transaction
111	fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
112		self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
113	}
114
115	fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
116		match self.deref().get_tx(txid) {
117			Some(tx) => match tx.chain_position {
118				ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
119				ChainPosition::Unconfirmed { .. } => Ok(None),
120			},
121			None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
122		}
123	}
124}
125
126impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
127	fn prepare_tx(
128		&mut self,
129		destinations: &[(Address, Amount)],
130		fee_rate: FeeRate,
131	) -> anyhow::Result<Psbt> {
132		let mut b = self.deref_mut().build_tx();
133		b.ordering(TxOrdering::Untouched);
134		for (dest, amount) in destinations {
135			b.add_recipient(dest.script_pubkey(), *amount);
136		}
137		b.fee_rate(fee_rate);
138		b.finish().context("error building tx")
139	}
140
141	fn prepare_drain_tx(
142		&mut self,
143		destination: Address,
144		fee_rate: FeeRate,
145	) -> anyhow::Result<Psbt> {
146		let mut b = self.deref_mut().build_tx();
147		b.drain_to(destination.script_pubkey());
148		b.fee_rate(fee_rate);
149		b.drain_wallet();
150		b.finish().context("error building tx")
151	}
152}
153
154impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
155	fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
156		for transaction in self.deref().transactions() {
157			if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
158				return Some(transaction.tx_node.tx);
159			}
160		}
161		None
162	}
163}
164
165impl MakeCpfp for BdkWallet {
166	fn make_signed_p2a_cpfp(
167		&mut self,
168		tx: &Transaction,
169		fees: MakeCpfpFees,
170	) -> Result<Transaction, CpfpError> {
171		 WalletExt::make_signed_p2a_cpfp(self, tx, fees)
172			 .inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
173			 .map_err(|e| match e {
174				 CpfpInternalError::General(s) => CpfpError::InternalError(s),
175				 CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
176				 CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
177				 CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
178				 CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
179				 CpfpInternalError::InsufficientConfirmedFunds(f) => {
180					 CpfpError::InsufficientConfirmedFunds {
181						 needed: f.needed, available: f.available,
182					 }
183				 },
184				 CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
185				 CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
186			 })
187	}
188
189	fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
190		let unix = SystemTime::now().duration_since(UNIX_EPOCH)
191			.map_err(|e| CpfpError::InternalError(
192				format!("Unable to calculate time since UNIX epoch: {}", e.to_string()))
193			)?.as_secs();
194		self.apply_unconfirmed_txs([(tx.clone(), unix)]);
195		trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
196		Ok(())
197	}
198}
199
200/// A basic wrapper around the bdk wallet to showcase
201/// how to use bark with an external onchain wallet.
202///
203/// Note: BDK wallet already implements all the traits
204/// to be used as an onboard and exit wallet, so that
205/// wrapper only needs to proxy the methods.
206pub struct OnchainWallet {
207	pub inner: BdkWallet,
208	db: Arc<dyn BarkPersister>,
209}
210
211impl Deref for OnchainWallet {
212	type Target = BdkWallet;
213
214	fn deref(&self) -> &Self::Target {
215		&self.inner
216	}
217}
218
219impl DerefMut for OnchainWallet {
220	fn deref_mut(&mut self) -> &mut Self::Target {
221		&mut self.inner
222	}
223}
224
225impl OnchainWallet {
226	pub fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
227		let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
228		let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
229
230		let changeset = db.initialize_bdk_wallet().context("error reading bdk wallet state")?;
231		let wallet_opt = bdk_wallet::Wallet::load()
232			.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
233			.extract_keys()
234			.check_network(network)
235			.load_wallet_no_persist(changeset)?;
236
237		let wallet = match wallet_opt {
238			Some(wallet) => wallet,
239			None => bdk_wallet::Wallet::create_single(desc)
240				.network(network)
241				.create_wallet_no_persist()?,
242		};
243
244		Ok(Self { inner: wallet, db })
245	}
246}
247
248impl MakeCpfp for OnchainWallet {
249	fn make_signed_p2a_cpfp(
250		&mut self,
251		tx: &Transaction,
252		fees: MakeCpfpFees,
253	) -> Result<Transaction, CpfpError> {
254		MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
255	}
256
257	fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
258		self.inner.store_signed_p2a_cpfp(tx)?;
259		self.persist()
260			.map_err(|e| CpfpError::StoreError(e.to_string()))
261	}
262}
263
264impl SignPsbt for OnchainWallet {
265	fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
266		let tx = self.inner.finish_tx(psbt)?;
267		self.persist()?;
268		Ok(tx)
269	}
270}
271
272impl OnchainWallet {
273	pub fn balance(&self) -> Balance {
274		self.inner.balance()
275	}
276
277	pub fn list_unspent(&self) -> Vec<LocalOutput> {
278		self.inner.list_unspent().collect()
279	}
280
281	pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
282		self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
283	}
284
285	pub fn address(&mut self) -> anyhow::Result<Address> {
286		let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
287		self.persist()?;
288		Ok(ret)
289	}
290
291	pub fn utxos(&self) -> Vec<Utxo> {
292		self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
293	}
294
295	pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
296	)	-> anyhow::Result<Txid> {
297		let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
298		let tx = self.finish_tx(psbt)?;
299		chain.broadcast_tx(&tx).await?;
300		Ok(tx.compute_txid())
301	}
302
303	pub async fn send_many(
304		&mut self,
305		chain: &ChainSource,
306		destinations: &[(Address, Amount)],
307		fee_rate: FeeRate,
308	) -> anyhow::Result<Txid> {
309		let pbst = self.prepare_tx(destinations, fee_rate)?;
310		let tx = self.finish_tx(pbst)?;
311		chain.broadcast_tx(&tx).await?;
312		Ok(tx.compute_txid())
313	}
314
315
316	pub async fn drain(
317		&mut self,
318		chain: &ChainSource,
319		destination: Address,
320		fee_rate: FeeRate,
321	) -> anyhow::Result<Txid> {
322		let psbt = self.prepare_drain_tx(destination, fee_rate)?;
323		let tx = self.finish_tx(psbt)?;
324		chain.broadcast_tx(&tx).await?;
325		Ok(tx.compute_txid())
326	}
327
328	pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
329		self.inner.build_tx()
330	}
331
332	async fn inner_sync_bitcoind(
333		&mut self,
334		bitcoind: &bitcoin_ext::rpc::Client,
335		prev_tip: CheckPoint,
336	) -> anyhow::Result<()> {
337		debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
338		let mut emitter = bdk_bitcoind_rpc::Emitter::new(
339			bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
340		);
341		let mut count = 0;
342		while let Some(em) = emitter.next_block()? {
343			self.inner.apply_block_connected_to(
344				&em.block, em.block_height(), em.connected_to(),
345			)?;
346			count += 1;
347
348			if count % 10_000 == 0 {
349				self.persist()?;
350				info!("Synced until block height {}", em.block_height());
351			}
352		}
353
354		let mempool = emitter.mempool()?;
355		self.inner.apply_evicted_txs(mempool.evicted);
356		self.inner.apply_unconfirmed_txs(mempool.update);
357		self.persist()?;
358		debug!("Finished syncing with bitcoind");
359
360		Ok(())
361	}
362
363	async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
364		let balance = self.inner.balance();
365
366		// Ultimately, let's try to rebroadcast all our unconfirmed txs.
367		let transactions = self.inner.transactions().filter(|tx| {
368			if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
369				match last_seen {
370					Some(last_seen) => last_seen < sync_start,
371					None => true,
372				}
373			} else {
374				false
375			}
376		}).collect::<Vec<_>>();
377
378		for tx in transactions {
379			if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
380				warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
381			}
382		}
383
384		Ok(balance.total())
385	}
386
387	pub async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<Amount> {
388		debug!("Starting wallet sync...");
389		debug!("Starting balance: {}", self.inner.balance());
390		trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
391		let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
392
393		match chain.inner() {
394			ChainSourceClient::Bitcoind(bitcoind) => {
395				let prev_tip = self.inner.latest_checkpoint();
396				self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
397			},
398			ChainSourceClient::Esplora(client) => {
399				debug!("Syncing with esplora...");
400				let request = self.inner.start_sync_with_revealed_spks()
401					.outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
402					.txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
403
404				let update = client.sync(request, PARALLEL_REQS).await?;
405				self.inner.apply_update(update)?;
406				self.persist()?;
407				debug!("Finished syncing with esplora");
408			},
409		}
410
411		debug!("Current balance: {}", self.inner.balance());
412		trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
413		self.rebroadcast_txs(chain, now).await
414	}
415
416	pub async fn initial_wallet_scan(
417		&mut self,
418		chain: &ChainSource,
419		start_height: Option<BlockHeight>,
420	) -> anyhow::Result<Amount> {
421		info!("Starting initial wallet sync...");
422		debug!("Starting balance: {}", self.inner.balance());
423		let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
424
425		match chain.inner() {
426			ChainSourceClient::Bitcoind(bitcoind) => {
427				// Make sure we include the given start_height in the scan
428				let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
429				let block_hash = bitcoind.get_block_hash(height as u64)?;
430				self.inner.set_checkpoint(height, block_hash);
431				self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
432			},
433			// Esplora can't do a full scan from a given block height, so we can ignore start_height
434			ChainSourceClient::Esplora(client) => {
435				debug!("Starting full scan with esplora...");
436				let request = self.inner.start_full_scan();
437				let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
438				self.inner.apply_update(update)?;
439				self.persist()?;
440				debug!("Finished scanning with esplora");
441			},
442		}
443
444		debug!("Current balance: {}", self.inner.balance());
445		self.rebroadcast_txs(chain, now).await
446	}
447
448
449	fn persist(&mut self) -> anyhow::Result<()> {
450		if let Some(stage) = self.inner.staged() {
451			self.db.store_bdk_wallet_changeset(&*stage)?;
452			let _ = self.inner.take_staged();
453		}
454		Ok(())
455	}
456}