bark/onchain/
bdk.rs

1use std::ops::{Deref, DerefMut};
2use std::time::{SystemTime, UNIX_EPOCH};
3use std::sync::Arc;
4
5use anyhow::Context;
6use bdk_esplora::EsploraAsyncExt;
7use bdk_wallet::chain::{ChainPosition, CheckPoint};
8use bdk_wallet::Wallet as BdkWallet;
9use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
10use bdk_wallet::{Balance, KeychainKind, LocalOutput, TxBuilder, TxOrdering};
11use bitcoin::{
12	Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut, Txid, Weight, bip32, psbt
13};
14use log::{debug, error, info, trace, warn};
15
16use ark::vtxo::policy::signing::VtxoSigner;
17use bitcoin_ext::{BlockHeight, BlockRef};
18use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
19use bitcoin_ext::cpfp::CpfpError;
20use bitcoin_ext::rpc::RpcApi;
21
22use crate::chain::{ChainSource, ChainSourceClient};
23use crate::exit::{ExitVtxo, ExitState};
24use crate::onchain::{
25	ChainSync, GetBalance, GetSpendingTx, GetWalletTx, LocalUtxo,
26	MakeCpfp, MakeCpfpFees, PreparePsbt, SignPsbt, Utxo
27};
28use crate::persist::BarkPersister;
29use crate::psbtext::PsbtInputExt;
30use crate::Wallet;
31
32const STOP_GAP: usize = 50;
33const PARALLEL_REQS: usize = 4;
34const GENESIS_HEIGHT: u32 = 0;
35
36impl From<LocalOutput> for LocalUtxo {
37	fn from(value: LocalOutput) -> Self {
38		LocalUtxo {
39			outpoint: value.outpoint,
40			amount: value.txout.value,
41			confirmation_height: value.chain_position.confirmation_height_upper_bound(),
42		}
43	}
44}
45
46/// Trait extension for TxBuilder to add exit outputs
47///
48/// When used, the resulting PSBT should be signed using
49/// [crate::exit::Exit::sign_exit_claim_inputs].
50#[async_trait]
51pub trait TxBuilderExt: Send + Sync {
52	async fn add_exit_claim_inputs(
53		&mut self,
54		wallet: &Wallet,
55		exit_outputs: &[&ExitVtxo],
56	) -> anyhow::Result<()>;
57}
58
59#[async_trait]
60impl<Cs: Send + Sync> TxBuilderExt for TxBuilder<'_, Cs> {
61	async fn add_exit_claim_inputs(
62		&mut self,
63		wallet: &Wallet,
64		exit_outputs: &[&ExitVtxo],
65	) -> anyhow::Result<()> {
66		self.version(2);
67
68		for input in exit_outputs {
69			if !matches!(input.state(), ExitState::Claimable(..)) {
70				bail!("VTXO exit is not spendable");
71			}
72
73			let vtxo = wallet.db.get_wallet_vtxo(input.id()).await?
74				.context(format!("Unable to load VTXO for exit: {}", input.id()))?;
75			let mut psbt_in = psbt::Input::default();
76			psbt_in.set_exit_claim_input(&vtxo);
77			psbt_in.witness_utxo = Some(TxOut {
78				script_pubkey: vtxo.output_script_pubkey(),
79				value: vtxo.amount(),
80			});
81
82			let clause = wallet.find_signable_clause(&vtxo).await
83				.context("Cannot sign vtxo")?;
84
85			let witness_weight = {
86				let witness_size = clause.witness_size(&vtxo);
87				Weight::from_witness_data_size(witness_size as u64)
88			};
89
90			self.add_foreign_utxo_with_sequence(
91				vtxo.point(),
92				psbt_in,
93				witness_weight,
94				clause.sequence().unwrap_or(Sequence::ZERO),
95			).expect("error adding foreign utxo for claim input");
96		}
97
98		Ok(())
99	}
100}
101
102impl <W: Deref<Target = BdkWallet>> GetBalance for W {
103	fn get_balance(&self) -> Amount {
104		self.deref().balance().total()
105	}
106}
107
108#[async_trait]
109impl SignPsbt for BdkWallet {
110	async fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
111		#[allow(deprecated)]
112		let opts = bdk_wallet::SignOptions {
113			trust_witness_utxo: true,
114			..Default::default()
115		};
116
117		let finalized = self.sign(&mut psbt, opts).context("signing error")?;
118		assert!(finalized);
119		let tx = psbt.extract_tx()?;
120		let unix = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
121		self.apply_unconfirmed_txs([(tx.clone(), unix)]);
122		Ok(tx)
123	}
124}
125
126impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
127	/// Retrieves a transaction from the wallet
128	///
129	/// This method will only check the database and will not
130	/// use a chain-source to find the transaction
131	fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
132		self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
133	}
134
135	fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
136		match self.deref().get_tx(txid) {
137			Some(tx) => match tx.chain_position {
138				ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
139				ChainPosition::Unconfirmed { .. } => Ok(None),
140			},
141			None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
142		}
143	}
144}
145
146impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
147	fn prepare_tx(
148		&mut self,
149		destinations: &[(Address, Amount)],
150		fee_rate: FeeRate,
151	) -> anyhow::Result<Psbt> {
152		let mut b = self.deref_mut().build_tx();
153		b.ordering(TxOrdering::Untouched);
154		for (dest, amount) in destinations {
155			b.add_recipient(dest.script_pubkey(), *amount);
156		}
157		b.fee_rate(fee_rate);
158		b.finish().context("error building tx")
159	}
160
161	fn prepare_drain_tx(
162		&mut self,
163		destination: Address,
164		fee_rate: FeeRate,
165	) -> anyhow::Result<Psbt> {
166		let mut b = self.deref_mut().build_tx();
167		b.drain_to(destination.script_pubkey());
168		b.fee_rate(fee_rate);
169		b.drain_wallet();
170		b.finish().context("error building tx")
171	}
172}
173
174impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
175	fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
176		for transaction in self.deref().transactions() {
177			if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
178				return Some(transaction.tx_node.tx);
179			}
180		}
181		None
182	}
183}
184
185#[async_trait]
186impl MakeCpfp for BdkWallet {
187	fn make_signed_p2a_cpfp(
188		&mut self,
189		tx: &Transaction,
190		fees: MakeCpfpFees,
191	) -> Result<Transaction, CpfpError> {
192		 WalletExt::make_signed_p2a_cpfp(self, tx, fees)
193			 .inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
194			 .map_err(|e| match e {
195				 CpfpInternalError::General(s) => CpfpError::InternalError(s),
196				 CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
197				 CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
198				 CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
199				 CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
200				 CpfpInternalError::InsufficientConfirmedFunds(f) => {
201					 CpfpError::InsufficientConfirmedFunds {
202						 needed: f.needed, available: f.available,
203					 }
204				 },
205				 CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
206				 CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
207			 })
208	}
209
210	async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
211		let unix = SystemTime::now().duration_since(UNIX_EPOCH)
212			.map_err(|e| CpfpError::InternalError(
213				format!("Unable to calculate time since UNIX epoch: {}", e.to_string()))
214			)?.as_secs();
215		self.apply_unconfirmed_txs([(tx.clone(), unix)]);
216		trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
217		Ok(())
218	}
219}
220
221/// A basic wrapper around the bdk wallet to showcase
222/// how to use bark with an external onchain wallet.
223///
224/// Note: BDK wallet already implements all the traits
225/// to be used as an onboard and exit wallet, so that
226/// wrapper only needs to proxy the methods.
227pub struct OnchainWallet {
228	pub inner: BdkWallet,
229	db: Arc<dyn BarkPersister>,
230}
231
232impl Deref for OnchainWallet {
233	type Target = BdkWallet;
234
235	fn deref(&self) -> &Self::Target {
236		&self.inner
237	}
238}
239
240impl DerefMut for OnchainWallet {
241	fn deref_mut(&mut self) -> &mut Self::Target {
242		&mut self.inner
243	}
244}
245
246impl OnchainWallet {
247	pub async fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
248		let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
249		let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
250
251		let changeset = db.initialize_bdk_wallet().await.context("error reading bdk wallet state")?;
252		let wallet_opt = bdk_wallet::Wallet::load()
253			.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
254			.extract_keys()
255			.check_network(network)
256			.load_wallet_no_persist(changeset)?;
257
258		let wallet = match wallet_opt {
259			Some(wallet) => wallet,
260			None => bdk_wallet::Wallet::create_single(desc)
261				.network(network)
262				.create_wallet_no_persist()?,
263		};
264
265		Ok(Self { inner: wallet, db })
266	}
267}
268
269#[async_trait]
270impl MakeCpfp for OnchainWallet {
271	fn make_signed_p2a_cpfp(
272		&mut self,
273		tx: &Transaction,
274		fees: MakeCpfpFees,
275	) -> Result<Transaction, CpfpError> {
276		MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
277	}
278
279	async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
280		self.inner.store_signed_p2a_cpfp(tx).await?;
281		self.persist().await
282			.map_err(|e| CpfpError::StoreError(e.to_string()))
283	}
284}
285
286#[async_trait]
287impl SignPsbt for OnchainWallet {
288	async fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
289		let tx = self.inner.finish_tx(psbt).await?;
290		self.persist().await?;
291		Ok(tx)
292	}
293}
294
295#[async_trait]
296impl ChainSync for OnchainWallet {
297	async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<()> {
298		debug!("Starting wallet sync...");
299		debug!("Starting balance: {}", self.inner.balance());
300		trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
301		let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
302
303		match chain.inner() {
304			ChainSourceClient::Bitcoind(bitcoind) => {
305				let prev_tip = self.inner.latest_checkpoint();
306				self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
307			},
308			ChainSourceClient::Esplora(client) => {
309				debug!("Syncing with esplora...");
310				let request = self.inner.start_sync_with_revealed_spks()
311					.outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
312					.txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
313
314				let update = client.sync(request, PARALLEL_REQS).await?;
315				self.inner.apply_update(update)?;
316				self.persist().await?;
317				debug!("Finished syncing with esplora");
318			},
319		}
320
321		debug!("Current balance: {}", self.inner.balance());
322		trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
323		self.rebroadcast_txs(chain, now).await?;
324
325		Ok(())
326	}
327}
328
329impl OnchainWallet {
330	pub fn balance(&self) -> Balance {
331		self.inner.balance()
332	}
333
334	pub fn list_unspent(&self) -> Vec<LocalOutput> {
335		self.inner.list_unspent().collect()
336	}
337
338	pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
339		self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
340	}
341
342	pub async fn address(&mut self) -> anyhow::Result<Address> {
343		let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
344		self.persist().await?;
345		Ok(ret)
346	}
347
348	pub fn utxos(&self) -> Vec<Utxo> {
349		self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
350	}
351
352	pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
353	)	-> anyhow::Result<Txid> {
354		let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
355		let tx = self.finish_tx(psbt).await?;
356		chain.broadcast_tx(&tx).await?;
357		Ok(tx.compute_txid())
358	}
359
360	pub async fn send_many(
361		&mut self,
362		chain: &ChainSource,
363		destinations: &[(Address, Amount)],
364		fee_rate: FeeRate,
365	) -> anyhow::Result<Txid> {
366		let pbst = self.prepare_tx(destinations, fee_rate)?;
367		let tx = self.finish_tx(pbst).await?;
368		chain.broadcast_tx(&tx).await?;
369		Ok(tx.compute_txid())
370	}
371
372
373	pub async fn drain(
374		&mut self,
375		chain: &ChainSource,
376		destination: Address,
377		fee_rate: FeeRate,
378	) -> anyhow::Result<Txid> {
379		let psbt = self.prepare_drain_tx(destination, fee_rate)?;
380		let tx = self.finish_tx(psbt).await?;
381		chain.broadcast_tx(&tx).await?;
382		Ok(tx.compute_txid())
383	}
384
385	pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
386		self.inner.build_tx()
387	}
388
389	async fn inner_sync_bitcoind(
390		&mut self,
391		bitcoind: &bitcoin_ext::rpc::Client,
392		prev_tip: CheckPoint,
393	) -> anyhow::Result<()> {
394		debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
395		let mut emitter = bdk_bitcoind_rpc::Emitter::new(
396			bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
397		);
398		let mut count = 0;
399		while let Some(em) = emitter.next_block()? {
400			self.inner.apply_block_connected_to(
401				&em.block, em.block_height(), em.connected_to(),
402			)?;
403			count += 1;
404
405			if count % 10_000 == 0 {
406				self.persist().await?;
407				info!("Synced until block height {}", em.block_height());
408			}
409		}
410
411		let mempool = emitter.mempool()?;
412		self.inner.apply_evicted_txs(mempool.evicted);
413		self.inner.apply_unconfirmed_txs(mempool.update);
414		self.persist().await?;
415		debug!("Finished syncing with bitcoind");
416
417		Ok(())
418	}
419
420	async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
421		let balance = self.inner.balance();
422
423		// Ultimately, let's try to rebroadcast all our unconfirmed txs.
424		let transactions = self.inner.transactions().filter(|tx| {
425			if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
426				match last_seen {
427					Some(last_seen) => last_seen < sync_start,
428					None => true,
429				}
430			} else {
431				false
432			}
433		}).collect::<Vec<_>>();
434
435		for tx in transactions {
436			if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
437				warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
438			}
439		}
440
441		Ok(balance.total())
442	}
443
444	pub async fn initial_wallet_scan(
445		&mut self,
446		chain: &ChainSource,
447		start_height: Option<BlockHeight>,
448	) -> anyhow::Result<Amount> {
449		info!("Starting initial wallet sync...");
450		debug!("Starting balance: {}", self.inner.balance());
451		let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
452
453		match chain.inner() {
454			ChainSourceClient::Bitcoind(bitcoind) => {
455				// Make sure we include the given start_height in the scan
456				let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
457				let block_hash = bitcoind.get_block_hash(height as u64)?;
458				self.inner.set_checkpoint(height, block_hash);
459				self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
460			},
461			// Esplora can't do a full scan from a given block height, so we can ignore start_height
462			ChainSourceClient::Esplora(client) => {
463				debug!("Starting full scan with esplora...");
464				let request = self.inner.start_full_scan();
465				let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
466				self.inner.apply_update(update)?;
467				self.persist().await?;
468				debug!("Finished scanning with esplora");
469			},
470		}
471
472		debug!("Current balance: {}", self.inner.balance());
473		self.rebroadcast_txs(chain, now).await
474	}
475
476
477	async fn persist(&mut self) -> anyhow::Result<()> {
478		if let Some(stage) = self.inner.staged() {
479			self.db.store_bdk_wallet_changeset(&*stage).await?;
480			let _ = self.inner.take_staged();
481		}
482		Ok(())
483	}
484}