1use std::ops::{Deref, DerefMut};
2use std::sync::Arc;
3
4use anyhow::Context;
5use ark::time::timestamp_secs;
6use bdk_esplora::EsploraAsyncExt;
7use bdk_wallet::chain::{ChainPosition, CheckPoint};
8use bdk_wallet::Wallet as BdkWallet;
9use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
10use bdk_wallet::{Balance, KeychainKind, LocalOutput, TxBuilder, TxOrdering};
11use bitcoin::{
12 Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut, Txid, Weight, bip32, psbt
13};
14use log::{debug, error, info, trace, warn};
15
16use ark::vtxo::policy::signing::VtxoSigner;
17use bitcoin_ext::{BlockHeight, BlockRef};
18use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
19use bitcoin_ext::cpfp::CpfpError;
20use bitcoin_ext::rpc::RpcApi;
21
22use crate::chain::{ChainSource, ChainSourceClient};
23use crate::exit::{ExitVtxo, ExitState};
24use crate::onchain::{
25 ChainSync, GetBalance, GetSpendingTx, GetWalletTx, LocalUtxo,
26 MakeCpfp, MakeCpfpFees, PreparePsbt, SignPsbt, Utxo
27};
28use crate::persist::BarkPersister;
29use crate::psbtext::PsbtInputExt;
30use crate::Wallet;
31
32const STOP_GAP: usize = 50;
33const PARALLEL_REQS: usize = 4;
34const GENESIS_HEIGHT: u32 = 0;
35
36impl From<LocalOutput> for LocalUtxo {
37 fn from(value: LocalOutput) -> Self {
38 LocalUtxo {
39 outpoint: value.outpoint,
40 amount: value.txout.value,
41 confirmation_height: value.chain_position.confirmation_height_upper_bound(),
42 }
43 }
44}
45
46#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
51#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
52pub trait TxBuilderExt: Send + Sync {
53 async fn add_exit_claim_inputs(
54 &mut self,
55 wallet: &Wallet,
56 exit_outputs: &[&ExitVtxo],
57 ) -> anyhow::Result<()>;
58}
59
60#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
61#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
62impl<Cs: Send + Sync> TxBuilderExt for TxBuilder<'_, Cs> {
63 async fn add_exit_claim_inputs(
64 &mut self,
65 wallet: &Wallet,
66 exit_outputs: &[&ExitVtxo],
67 ) -> anyhow::Result<()> {
68 self.version(2);
69
70 for input in exit_outputs {
71 if !matches!(input.state(), ExitState::Claimable(..)) {
72 bail!("VTXO exit is not spendable");
73 }
74
75 let vtxo = wallet.db.get_wallet_vtxo(input.id()).await?
76 .context(format!("Unable to load VTXO for exit: {}", input.id()))?;
77 let mut psbt_in = psbt::Input::default();
78 psbt_in.set_exit_claim_input(&vtxo);
79 psbt_in.witness_utxo = Some(TxOut {
80 script_pubkey: vtxo.output_script_pubkey(),
81 value: vtxo.amount(),
82 });
83
84 let clause = wallet.find_signable_clause(&vtxo).await
85 .context("Cannot sign vtxo")?;
86
87 let witness_weight = {
88 let witness_size = clause.witness_size(&vtxo);
89 Weight::from_witness_data_size(witness_size as u64)
90 };
91
92 self.add_foreign_utxo_with_sequence(
93 vtxo.point(),
94 psbt_in,
95 witness_weight,
96 clause.sequence().unwrap_or(Sequence::ZERO),
97 ).expect("error adding foreign utxo for claim input");
98 }
99
100 Ok(())
101 }
102}
103
104impl <W: Deref<Target = BdkWallet>> GetBalance for W {
105 fn get_balance(&self) -> Amount {
106 self.deref().balance().total()
107 }
108}
109
110#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
111#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
112impl SignPsbt for BdkWallet {
113 async fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
114 #[allow(deprecated)]
115 let opts = bdk_wallet::SignOptions {
116 trust_witness_utxo: true,
117 ..Default::default()
118 };
119
120 let finalized = self.sign(&mut psbt, opts).context("signing error")?;
121 assert!(finalized);
122 let tx = psbt.extract_tx()?;
123 self.apply_unconfirmed_txs([(tx.clone(), timestamp_secs())]);
124 Ok(tx)
125 }
126}
127
128impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
129 fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
134 self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
135 }
136
137 fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
138 match self.deref().get_tx(txid) {
139 Some(tx) => match tx.chain_position {
140 ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
141 ChainPosition::Unconfirmed { .. } => Ok(None),
142 },
143 None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
144 }
145 }
146}
147
148impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
149 fn prepare_tx(
150 &mut self,
151 destinations: &[(Address, Amount)],
152 fee_rate: FeeRate,
153 ) -> anyhow::Result<Psbt> {
154 let mut b = self.deref_mut().build_tx();
155 b.ordering(TxOrdering::Untouched);
156 for (dest, amount) in destinations {
157 b.add_recipient(dest.script_pubkey(), *amount);
158 }
159 b.fee_rate(fee_rate);
160 b.finish().context("error building tx")
161 }
162
163 fn prepare_drain_tx(
164 &mut self,
165 destination: Address,
166 fee_rate: FeeRate,
167 ) -> anyhow::Result<Psbt> {
168 let mut b = self.deref_mut().build_tx();
169 b.drain_to(destination.script_pubkey());
170 b.fee_rate(fee_rate);
171 b.drain_wallet();
172 b.finish().context("error building tx")
173 }
174}
175
176impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
177 fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
178 for transaction in self.deref().transactions() {
179 if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
180 return Some(transaction.tx_node.tx);
181 }
182 }
183 None
184 }
185}
186
187#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
188#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
189impl MakeCpfp for BdkWallet {
190 fn make_signed_p2a_cpfp(
191 &mut self,
192 tx: &Transaction,
193 fees: MakeCpfpFees,
194 ) -> Result<Transaction, CpfpError> {
195 WalletExt::make_signed_p2a_cpfp(self, tx, fees)
196 .inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
197 .map_err(|e| match e {
198 CpfpInternalError::General(s) => CpfpError::InternalError(s),
199 CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
200 CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
201 CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
202 CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
203 CpfpInternalError::InsufficientConfirmedFunds(f) => {
204 CpfpError::InsufficientConfirmedFunds {
205 needed: f.needed, available: f.available,
206 }
207 },
208 CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
209 CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
210 })
211 }
212
213 async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
214 self.apply_unconfirmed_txs([(tx.clone(), timestamp_secs())]);
215 trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
216 Ok(())
217 }
218}
219
220pub struct OnchainWallet {
227 pub inner: BdkWallet,
228 db: Arc<dyn BarkPersister>,
229}
230
231impl Deref for OnchainWallet {
232 type Target = BdkWallet;
233
234 fn deref(&self) -> &Self::Target {
235 &self.inner
236 }
237}
238
239impl DerefMut for OnchainWallet {
240 fn deref_mut(&mut self) -> &mut Self::Target {
241 &mut self.inner
242 }
243}
244
245impl OnchainWallet {
246 pub async fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
247 let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
248 let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
249
250 let changeset = db.initialize_bdk_wallet().await.context("error reading bdk wallet state")?;
251 let wallet_opt = bdk_wallet::Wallet::load()
252 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
253 .extract_keys()
254 .check_network(network)
255 .load_wallet_no_persist(changeset)?;
256
257 let wallet = match wallet_opt {
258 Some(wallet) => wallet,
259 None => bdk_wallet::Wallet::create_single(desc)
260 .network(network)
261 .create_wallet_no_persist()?,
262 };
263
264 Ok(Self { inner: wallet, db })
265 }
266}
267
268#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
269#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
270impl MakeCpfp for OnchainWallet {
271 fn make_signed_p2a_cpfp(
272 &mut self,
273 tx: &Transaction,
274 fees: MakeCpfpFees,
275 ) -> Result<Transaction, CpfpError> {
276 MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
277 }
278
279 async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
280 self.inner.store_signed_p2a_cpfp(tx).await?;
281 self.persist().await
282 .map_err(|e| CpfpError::StoreError(e.to_string()))
283 }
284}
285
286#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
287#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
288impl SignPsbt for OnchainWallet {
289 async fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
290 let tx = self.inner.finish_tx(psbt).await?;
291 self.persist().await?;
292 Ok(tx)
293 }
294}
295
296#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
297#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
298impl ChainSync for OnchainWallet {
299 async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<()> {
300 debug!("Starting wallet sync...");
301 debug!("Starting balance: {}", self.inner.balance());
302 trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
303
304 match chain.inner() {
305 ChainSourceClient::Bitcoind(bitcoind) => {
306 let prev_tip = self.inner.latest_checkpoint();
307 self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
308 },
309 ChainSourceClient::Esplora(client) => {
310 debug!("Syncing with esplora...");
311 let request = self.inner.start_sync_with_revealed_spks()
312 .outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
313 .txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
314
315 let update = client.sync(request, PARALLEL_REQS).await?;
316 self.inner.apply_update(update)?;
317 self.persist().await?;
318 debug!("Finished syncing with esplora");
319 },
320 }
321
322 debug!("Current balance: {}", self.inner.balance());
323 trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
324 self.rebroadcast_txs(chain, timestamp_secs()).await?;
325
326 Ok(())
327 }
328}
329
330impl OnchainWallet {
331 pub fn balance(&self) -> Balance {
332 self.inner.balance()
333 }
334
335 pub fn list_unspent(&self) -> Vec<LocalOutput> {
336 self.inner.list_unspent().collect()
337 }
338
339 pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
340 self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
341 }
342
343 pub async fn address(&mut self) -> anyhow::Result<Address> {
344 let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
345 self.persist().await?;
346 Ok(ret)
347 }
348
349 pub fn utxos(&self) -> Vec<Utxo> {
350 self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
351 }
352
353 pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
354 ) -> anyhow::Result<Txid> {
355 let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
356 let tx = self.finish_tx(psbt).await?;
357 chain.broadcast_tx(&tx).await?;
358 Ok(tx.compute_txid())
359 }
360
361 pub async fn send_many(
362 &mut self,
363 chain: &ChainSource,
364 destinations: &[(Address, Amount)],
365 fee_rate: FeeRate,
366 ) -> anyhow::Result<Txid> {
367 let pbst = self.prepare_tx(destinations, fee_rate)?;
368 let tx = self.finish_tx(pbst).await?;
369 chain.broadcast_tx(&tx).await?;
370 Ok(tx.compute_txid())
371 }
372
373
374 pub async fn drain(
375 &mut self,
376 chain: &ChainSource,
377 destination: Address,
378 fee_rate: FeeRate,
379 ) -> anyhow::Result<Txid> {
380 let psbt = self.prepare_drain_tx(destination, fee_rate)?;
381 let tx = self.finish_tx(psbt).await?;
382 chain.broadcast_tx(&tx).await?;
383 Ok(tx.compute_txid())
384 }
385
386 pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
387 self.inner.build_tx()
388 }
389
390 async fn inner_sync_bitcoind(
391 &mut self,
392 bitcoind: &bitcoin_ext::rpc::Client,
393 prev_tip: CheckPoint,
394 ) -> anyhow::Result<()> {
395 debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
396 let mut emitter = bdk_bitcoind_rpc::Emitter::new(
397 bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
398 );
399 let mut count = 0;
400 while let Some(em) = emitter.next_block()? {
401 self.inner.apply_block_connected_to(
402 &em.block, em.block_height(), em.connected_to(),
403 )?;
404 count += 1;
405
406 if count % 10_000 == 0 {
407 self.persist().await?;
408 info!("Synced until block height {}", em.block_height());
409 }
410 }
411
412 let mempool = emitter.mempool()?;
413 self.inner.apply_evicted_txs(mempool.evicted);
414 self.inner.apply_unconfirmed_txs(mempool.update);
415 self.persist().await?;
416 debug!("Finished syncing with bitcoind");
417
418 Ok(())
419 }
420
421 async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
422 let balance = self.inner.balance();
423
424 let transactions = self.inner.transactions().filter(|tx| {
426 if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
427 match last_seen {
428 Some(last_seen) => last_seen < sync_start,
429 None => true,
430 }
431 } else {
432 false
433 }
434 }).collect::<Vec<_>>();
435
436 for tx in transactions {
437 if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
438 warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
439 }
440 }
441
442 Ok(balance.total())
443 }
444
445 pub async fn initial_wallet_scan(
446 &mut self,
447 chain: &ChainSource,
448 start_height: Option<BlockHeight>,
449 ) -> anyhow::Result<Amount> {
450 info!("Starting initial wallet sync...");
451 debug!("Starting balance: {}", self.inner.balance());
452
453 match chain.inner() {
454 ChainSourceClient::Bitcoind(bitcoind) => {
455 let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
457 let block_hash = bitcoind.get_block_hash(height as u64)?;
458 self.inner.set_checkpoint(height, block_hash);
459 self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
460 },
461 ChainSourceClient::Esplora(client) => {
463 debug!("Starting full scan with esplora...");
464 let request = self.inner.start_full_scan();
465 let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
466 self.inner.apply_update(update)?;
467 self.persist().await?;
468 debug!("Finished scanning with esplora");
469 },
470 }
471
472 debug!("Current balance: {}", self.inner.balance());
473 self.rebroadcast_txs(chain, timestamp_secs()).await
474 }
475
476
477 async fn persist(&mut self) -> anyhow::Result<()> {
478 if let Some(stage) = self.inner.staged() {
479 self.db.store_bdk_wallet_changeset(&*stage).await?;
480 let _ = self.inner.take_staged();
481 }
482 Ok(())
483 }
484}