bark/onchain/
bdk.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::Arc;
3
4use anyhow::Context;
5use ark::time::timestamp_secs;
6use bdk_esplora::EsploraAsyncExt;
7use bdk_wallet::chain::{ChainPosition, CheckPoint};
8use bdk_wallet::Wallet as BdkWallet;
9use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
10use bdk_wallet::{Balance, KeychainKind, LocalOutput, TxBuilder, TxOrdering};
11use bitcoin::{
12	Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut, Txid, Weight, bip32, psbt
13};
14use log::{debug, error, info, trace, warn};
15
16use ark::vtxo::policy::signing::VtxoSigner;
17use bitcoin_ext::{BlockHeight, BlockRef};
18use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
19use bitcoin_ext::cpfp::CpfpError;
20use bitcoin_ext::rpc::RpcApi;
21
22use crate::chain::{ChainSource, ChainSourceClient};
23use crate::exit::{ExitVtxo, ExitState};
24use crate::onchain::{
25	ChainSync, GetBalance, GetSpendingTx, GetWalletTx, LocalUtxo,
26	MakeCpfp, MakeCpfpFees, PreparePsbt, SignPsbt, Utxo
27};
28use crate::persist::BarkPersister;
29use crate::psbtext::PsbtInputExt;
30use crate::Wallet;
31
32const STOP_GAP: usize = 50;
33const PARALLEL_REQS: usize = 4;
34const GENESIS_HEIGHT: u32 = 0;
35
36impl From<LocalOutput> for LocalUtxo {
37	fn from(value: LocalOutput) -> Self {
38		LocalUtxo {
39			outpoint: value.outpoint,
40			amount: value.txout.value,
41			confirmation_height: value.chain_position.confirmation_height_upper_bound(),
42		}
43	}
44}
45
46/// Trait extension for TxBuilder to add exit outputs
47///
48/// When used, the resulting PSBT should be signed using
49/// [crate::exit::Exit::sign_exit_claim_inputs].
50#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
51#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
52pub trait TxBuilderExt: Send + Sync {
53	async fn add_exit_claim_inputs(
54		&mut self,
55		wallet: &Wallet,
56		exit_outputs: &[&ExitVtxo],
57	) -> anyhow::Result<()>;
58}
59
60#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
61#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
62impl<Cs: Send + Sync> TxBuilderExt for TxBuilder<'_, Cs> {
63	async fn add_exit_claim_inputs(
64		&mut self,
65		wallet: &Wallet,
66		exit_outputs: &[&ExitVtxo],
67	) -> anyhow::Result<()> {
68		self.version(2);
69
70		for input in exit_outputs {
71			if !matches!(input.state(), ExitState::Claimable(..)) {
72				bail!("VTXO exit is not spendable");
73			}
74
75			let vtxo = wallet.db.get_wallet_vtxo(input.id()).await?
76				.context(format!("Unable to load VTXO for exit: {}", input.id()))?;
77			let mut psbt_in = psbt::Input::default();
78			psbt_in.set_exit_claim_input(&vtxo);
79			psbt_in.witness_utxo = Some(TxOut {
80				script_pubkey: vtxo.output_script_pubkey(),
81				value: vtxo.amount(),
82			});
83
84			let clause = wallet.find_signable_clause(&vtxo).await
85				.context("Cannot sign vtxo")?;
86
87			let witness_weight = {
88				let witness_size = clause.witness_size(&vtxo);
89				Weight::from_witness_data_size(witness_size as u64)
90			};
91
92			self.add_foreign_utxo_with_sequence(
93				vtxo.point(),
94				psbt_in,
95				witness_weight,
96				clause.sequence().unwrap_or(Sequence::ZERO),
97			).expect("error adding foreign utxo for claim input");
98		}
99
100		Ok(())
101	}
102}
103
104impl <W: Deref<Target = BdkWallet>> GetBalance for W {
105	fn get_balance(&self) -> Amount {
106		self.deref().balance().total()
107	}
108}
109
110#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
111#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
112impl SignPsbt for BdkWallet {
113	async fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
114		#[allow(deprecated)]
115		let opts = bdk_wallet::SignOptions {
116			trust_witness_utxo: true,
117			..Default::default()
118		};
119
120		let finalized = self.sign(&mut psbt, opts).context("signing error")?;
121		assert!(finalized);
122		let tx = psbt.extract_tx()?;
123		self.apply_unconfirmed_txs([(tx.clone(), timestamp_secs())]);
124		Ok(tx)
125	}
126}
127
128impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
129	/// Retrieves a transaction from the wallet
130	///
131	/// This method will only check the database and will not
132	/// use a chain-source to find the transaction
133	fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
134		self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
135	}
136
137	fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
138		match self.deref().get_tx(txid) {
139			Some(tx) => match tx.chain_position {
140				ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
141				ChainPosition::Unconfirmed { .. } => Ok(None),
142			},
143			None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
144		}
145	}
146}
147
148impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
149	fn prepare_tx(
150		&mut self,
151		destinations: &[(Address, Amount)],
152		fee_rate: FeeRate,
153	) -> anyhow::Result<Psbt> {
154		let mut b = self.deref_mut().build_tx();
155		b.ordering(TxOrdering::Untouched);
156		for (dest, amount) in destinations {
157			b.add_recipient(dest.script_pubkey(), *amount);
158		}
159		b.fee_rate(fee_rate);
160		b.finish().context("error building tx")
161	}
162
163	fn prepare_drain_tx(
164		&mut self,
165		destination: Address,
166		fee_rate: FeeRate,
167	) -> anyhow::Result<Psbt> {
168		let mut b = self.deref_mut().build_tx();
169		b.drain_to(destination.script_pubkey());
170		b.fee_rate(fee_rate);
171		b.drain_wallet();
172		b.finish().context("error building tx")
173	}
174}
175
176impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
177	fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
178		for transaction in self.deref().transactions() {
179			if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
180				return Some(transaction.tx_node.tx);
181			}
182		}
183		None
184	}
185}
186
187#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
188#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
189impl MakeCpfp for BdkWallet {
190	fn make_signed_p2a_cpfp(
191		&mut self,
192		tx: &Transaction,
193		fees: MakeCpfpFees,
194	) -> Result<Transaction, CpfpError> {
195		 WalletExt::make_signed_p2a_cpfp(self, tx, fees)
196			 .inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
197			 .map_err(|e| match e {
198				 CpfpInternalError::General(s) => CpfpError::InternalError(s),
199				 CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
200				 CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
201				 CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
202				 CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
203				 CpfpInternalError::InsufficientConfirmedFunds(f) => {
204					 CpfpError::InsufficientConfirmedFunds {
205						 needed: f.needed, available: f.available,
206					 }
207				 },
208				 CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
209				 CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
210			 })
211	}
212
213	async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
214		self.apply_unconfirmed_txs([(tx.clone(), timestamp_secs())]);
215		trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
216		Ok(())
217	}
218}
219
220/// A basic wrapper around the bdk wallet to showcase
221/// how to use bark with an external onchain wallet.
222///
223/// Note: BDK wallet already implements all the traits
224/// to be used as an onboard and exit wallet, so that
225/// wrapper only needs to proxy the methods.
226pub struct OnchainWallet {
227	pub inner: BdkWallet,
228	db: Arc<dyn BarkPersister>,
229}
230
231impl Deref for OnchainWallet {
232	type Target = BdkWallet;
233
234	fn deref(&self) -> &Self::Target {
235		&self.inner
236	}
237}
238
239impl DerefMut for OnchainWallet {
240	fn deref_mut(&mut self) -> &mut Self::Target {
241		&mut self.inner
242	}
243}
244
245impl OnchainWallet {
246	pub async fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
247		let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
248		let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
249
250		let changeset = db.initialize_bdk_wallet().await.context("error reading bdk wallet state")?;
251		let wallet_opt = bdk_wallet::Wallet::load()
252			.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
253			.extract_keys()
254			.check_network(network)
255			.load_wallet_no_persist(changeset)?;
256
257		let wallet = match wallet_opt {
258			Some(wallet) => wallet,
259			None => bdk_wallet::Wallet::create_single(desc)
260				.network(network)
261				.create_wallet_no_persist()?,
262		};
263
264		Ok(Self { inner: wallet, db })
265	}
266}
267
268#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
269#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
270impl MakeCpfp for OnchainWallet {
271	fn make_signed_p2a_cpfp(
272		&mut self,
273		tx: &Transaction,
274		fees: MakeCpfpFees,
275	) -> Result<Transaction, CpfpError> {
276		MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
277	}
278
279	async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
280		self.inner.store_signed_p2a_cpfp(tx).await?;
281		self.persist().await
282			.map_err(|e| CpfpError::StoreError(e.to_string()))
283	}
284}
285
286#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
287#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
288impl SignPsbt for OnchainWallet {
289	async fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
290		let tx = self.inner.finish_tx(psbt).await?;
291		self.persist().await?;
292		Ok(tx)
293	}
294}
295
296#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
297#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
298impl ChainSync for OnchainWallet {
299	async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<()> {
300		debug!("Starting wallet sync...");
301		debug!("Starting balance: {}", self.inner.balance());
302		trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
303
304		match chain.inner() {
305			ChainSourceClient::Bitcoind(bitcoind) => {
306				let prev_tip = self.inner.latest_checkpoint();
307				self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
308			},
309			ChainSourceClient::Esplora(client) => {
310				debug!("Syncing with esplora...");
311				let request = self.inner.start_sync_with_revealed_spks()
312					.outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
313					.txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
314
315				let update = client.sync(request, PARALLEL_REQS).await?;
316				self.inner.apply_update(update)?;
317				self.persist().await?;
318				debug!("Finished syncing with esplora");
319			},
320		}
321
322		debug!("Current balance: {}", self.inner.balance());
323		trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
324		self.rebroadcast_txs(chain, timestamp_secs()).await?;
325
326		Ok(())
327	}
328}
329
330impl OnchainWallet {
331	pub fn balance(&self) -> Balance {
332		self.inner.balance()
333	}
334
335	pub fn list_unspent(&self) -> Vec<LocalOutput> {
336		self.inner.list_unspent().collect()
337	}
338
339	pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
340		self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
341	}
342
343	pub async fn address(&mut self) -> anyhow::Result<Address> {
344		let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
345		self.persist().await?;
346		Ok(ret)
347	}
348
349	pub fn utxos(&self) -> Vec<Utxo> {
350		self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
351	}
352
353	pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
354	)	-> anyhow::Result<Txid> {
355		let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
356		let tx = self.finish_tx(psbt).await?;
357		chain.broadcast_tx(&tx).await?;
358		Ok(tx.compute_txid())
359	}
360
361	pub async fn send_many(
362		&mut self,
363		chain: &ChainSource,
364		destinations: &[(Address, Amount)],
365		fee_rate: FeeRate,
366	) -> anyhow::Result<Txid> {
367		let pbst = self.prepare_tx(destinations, fee_rate)?;
368		let tx = self.finish_tx(pbst).await?;
369		chain.broadcast_tx(&tx).await?;
370		Ok(tx.compute_txid())
371	}
372
373
374	pub async fn drain(
375		&mut self,
376		chain: &ChainSource,
377		destination: Address,
378		fee_rate: FeeRate,
379	) -> anyhow::Result<Txid> {
380		let psbt = self.prepare_drain_tx(destination, fee_rate)?;
381		let tx = self.finish_tx(psbt).await?;
382		chain.broadcast_tx(&tx).await?;
383		Ok(tx.compute_txid())
384	}
385
386	pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
387		self.inner.build_tx()
388	}
389
390	async fn inner_sync_bitcoind(
391		&mut self,
392		bitcoind: &bitcoin_ext::rpc::Client,
393		prev_tip: CheckPoint,
394	) -> anyhow::Result<()> {
395		debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
396		let mut emitter = bdk_bitcoind_rpc::Emitter::new(
397			bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
398		);
399		let mut count = 0;
400		while let Some(em) = emitter.next_block()? {
401			self.inner.apply_block_connected_to(
402				&em.block, em.block_height(), em.connected_to(),
403			)?;
404			count += 1;
405
406			if count % 10_000 == 0 {
407				self.persist().await?;
408				info!("Synced until block height {}", em.block_height());
409			}
410		}
411
412		let mempool = emitter.mempool()?;
413		self.inner.apply_evicted_txs(mempool.evicted);
414		self.inner.apply_unconfirmed_txs(mempool.update);
415		self.persist().await?;
416		debug!("Finished syncing with bitcoind");
417
418		Ok(())
419	}
420
421	async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
422		let balance = self.inner.balance();
423
424		// Ultimately, let's try to rebroadcast all our unconfirmed txs.
425		let transactions = self.inner.transactions().filter(|tx| {
426			if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
427				match last_seen {
428					Some(last_seen) => last_seen < sync_start,
429					None => true,
430				}
431			} else {
432				false
433			}
434		}).collect::<Vec<_>>();
435
436		for tx in transactions {
437			if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
438				warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
439			}
440		}
441
442		Ok(balance.total())
443	}
444
445	pub async fn initial_wallet_scan(
446		&mut self,
447		chain: &ChainSource,
448		start_height: Option<BlockHeight>,
449	) -> anyhow::Result<Amount> {
450		info!("Starting initial wallet sync...");
451		debug!("Starting balance: {}", self.inner.balance());
452
453		match chain.inner() {
454			ChainSourceClient::Bitcoind(bitcoind) => {
455				// Make sure we include the given start_height in the scan
456				let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
457				let block_hash = bitcoind.get_block_hash(height as u64)?;
458				self.inner.set_checkpoint(height, block_hash);
459				self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
460			},
461			// Esplora can't do a full scan from a given block height, so we can ignore start_height
462			ChainSourceClient::Esplora(client) => {
463				debug!("Starting full scan with esplora...");
464				let request = self.inner.start_full_scan();
465				let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
466				self.inner.apply_update(update)?;
467				self.persist().await?;
468				debug!("Finished scanning with esplora");
469			},
470		}
471
472		debug!("Current balance: {}", self.inner.balance());
473		self.rebroadcast_txs(chain, timestamp_secs()).await
474	}
475
476
477	async fn persist(&mut self) -> anyhow::Result<()> {
478		if let Some(stage) = self.inner.staged() {
479			self.db.store_bdk_wallet_changeset(&*stage).await?;
480			let _ = self.inner.take_staged();
481		}
482		Ok(())
483	}
484}