1use std::ops::{Deref, DerefMut};
2use std::time::{SystemTime, UNIX_EPOCH};
3use std::sync::Arc;
4
5use anyhow::Context;
6use bdk_esplora::EsploraAsyncExt;
7use bdk_wallet::chain::{ChainPosition, CheckPoint};
8use bdk_wallet::Wallet as BdkWallet;
9use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
10use bdk_wallet::{Balance, KeychainKind, LocalOutput, TxBuilder, TxOrdering};
11use bitcoin::{
12 Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut, Txid, Weight, bip32, psbt
13};
14use log::{debug, error, info, trace, warn};
15
16use ark::vtxo::policy::signing::VtxoSigner;
17use bitcoin_ext::{BlockHeight, BlockRef};
18use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
19use bitcoin_ext::cpfp::CpfpError;
20use bitcoin_ext::rpc::RpcApi;
21
22use crate::chain::{ChainSource, ChainSourceClient};
23use crate::exit::{ExitVtxo, ExitState};
24use crate::onchain::{
25 ChainSync, GetBalance, GetSpendingTx, GetWalletTx, LocalUtxo,
26 MakeCpfp, MakeCpfpFees, PreparePsbt, SignPsbt, Utxo
27};
28use crate::persist::BarkPersister;
29use crate::psbtext::PsbtInputExt;
30use crate::Wallet;
31
32const STOP_GAP: usize = 50;
33const PARALLEL_REQS: usize = 4;
34const GENESIS_HEIGHT: u32 = 0;
35
36impl From<LocalOutput> for LocalUtxo {
37 fn from(value: LocalOutput) -> Self {
38 LocalUtxo {
39 outpoint: value.outpoint,
40 amount: value.txout.value,
41 confirmation_height: value.chain_position.confirmation_height_upper_bound(),
42 }
43 }
44}
45
46#[async_trait]
51pub trait TxBuilderExt: Send + Sync {
52 async fn add_exit_claim_inputs(
53 &mut self,
54 wallet: &Wallet,
55 exit_outputs: &[&ExitVtxo],
56 ) -> anyhow::Result<()>;
57}
58
59#[async_trait]
60impl<Cs: Send + Sync> TxBuilderExt for TxBuilder<'_, Cs> {
61 async fn add_exit_claim_inputs(
62 &mut self,
63 wallet: &Wallet,
64 exit_outputs: &[&ExitVtxo],
65 ) -> anyhow::Result<()> {
66 self.version(2);
67
68 for input in exit_outputs {
69 if !matches!(input.state(), ExitState::Claimable(..)) {
70 bail!("VTXO exit is not spendable");
71 }
72
73 let vtxo = wallet.db.get_wallet_vtxo(input.id()).await?
74 .context(format!("Unable to load VTXO for exit: {}", input.id()))?;
75 let mut psbt_in = psbt::Input::default();
76 psbt_in.set_exit_claim_input(&vtxo);
77 psbt_in.witness_utxo = Some(TxOut {
78 script_pubkey: vtxo.output_script_pubkey(),
79 value: vtxo.amount(),
80 });
81
82 let clause = wallet.find_signable_clause(&vtxo).await
83 .context("Cannot sign vtxo")?;
84
85 let witness_weight = {
86 let witness_size = clause.witness_size(&vtxo);
87 Weight::from_witness_data_size(witness_size as u64)
88 };
89
90 self.add_foreign_utxo_with_sequence(
91 vtxo.point(),
92 psbt_in,
93 witness_weight,
94 clause.sequence().unwrap_or(Sequence::ZERO),
95 ).expect("error adding foreign utxo for claim input");
96 }
97
98 Ok(())
99 }
100}
101
102impl <W: Deref<Target = BdkWallet>> GetBalance for W {
103 fn get_balance(&self) -> Amount {
104 self.deref().balance().total()
105 }
106}
107
108#[async_trait]
109impl SignPsbt for BdkWallet {
110 async fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
111 #[allow(deprecated)]
112 let opts = bdk_wallet::SignOptions {
113 trust_witness_utxo: true,
114 ..Default::default()
115 };
116
117 let finalized = self.sign(&mut psbt, opts).context("signing error")?;
118 assert!(finalized);
119 let tx = psbt.extract_tx()?;
120 let unix = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
121 self.apply_unconfirmed_txs([(tx.clone(), unix)]);
122 Ok(tx)
123 }
124}
125
126impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
127 fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
132 self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
133 }
134
135 fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
136 match self.deref().get_tx(txid) {
137 Some(tx) => match tx.chain_position {
138 ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
139 ChainPosition::Unconfirmed { .. } => Ok(None),
140 },
141 None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
142 }
143 }
144}
145
146impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
147 fn prepare_tx(
148 &mut self,
149 destinations: &[(Address, Amount)],
150 fee_rate: FeeRate,
151 ) -> anyhow::Result<Psbt> {
152 let mut b = self.deref_mut().build_tx();
153 b.ordering(TxOrdering::Untouched);
154 for (dest, amount) in destinations {
155 b.add_recipient(dest.script_pubkey(), *amount);
156 }
157 b.fee_rate(fee_rate);
158 b.finish().context("error building tx")
159 }
160
161 fn prepare_drain_tx(
162 &mut self,
163 destination: Address,
164 fee_rate: FeeRate,
165 ) -> anyhow::Result<Psbt> {
166 let mut b = self.deref_mut().build_tx();
167 b.drain_to(destination.script_pubkey());
168 b.fee_rate(fee_rate);
169 b.drain_wallet();
170 b.finish().context("error building tx")
171 }
172}
173
174impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
175 fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
176 for transaction in self.deref().transactions() {
177 if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
178 return Some(transaction.tx_node.tx);
179 }
180 }
181 None
182 }
183}
184
185#[async_trait]
186impl MakeCpfp for BdkWallet {
187 fn make_signed_p2a_cpfp(
188 &mut self,
189 tx: &Transaction,
190 fees: MakeCpfpFees,
191 ) -> Result<Transaction, CpfpError> {
192 WalletExt::make_signed_p2a_cpfp(self, tx, fees)
193 .inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
194 .map_err(|e| match e {
195 CpfpInternalError::General(s) => CpfpError::InternalError(s),
196 CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
197 CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
198 CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
199 CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
200 CpfpInternalError::InsufficientConfirmedFunds(f) => {
201 CpfpError::InsufficientConfirmedFunds {
202 needed: f.needed, available: f.available,
203 }
204 },
205 CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
206 CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
207 })
208 }
209
210 async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
211 let unix = SystemTime::now().duration_since(UNIX_EPOCH)
212 .map_err(|e| CpfpError::InternalError(
213 format!("Unable to calculate time since UNIX epoch: {}", e.to_string()))
214 )?.as_secs();
215 self.apply_unconfirmed_txs([(tx.clone(), unix)]);
216 trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
217 Ok(())
218 }
219}
220
221pub struct OnchainWallet {
228 pub inner: BdkWallet,
229 db: Arc<dyn BarkPersister>,
230}
231
232impl Deref for OnchainWallet {
233 type Target = BdkWallet;
234
235 fn deref(&self) -> &Self::Target {
236 &self.inner
237 }
238}
239
240impl DerefMut for OnchainWallet {
241 fn deref_mut(&mut self) -> &mut Self::Target {
242 &mut self.inner
243 }
244}
245
246impl OnchainWallet {
247 pub async fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
248 let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
249 let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
250
251 let changeset = db.initialize_bdk_wallet().await.context("error reading bdk wallet state")?;
252 let wallet_opt = bdk_wallet::Wallet::load()
253 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
254 .extract_keys()
255 .check_network(network)
256 .load_wallet_no_persist(changeset)?;
257
258 let wallet = match wallet_opt {
259 Some(wallet) => wallet,
260 None => bdk_wallet::Wallet::create_single(desc)
261 .network(network)
262 .create_wallet_no_persist()?,
263 };
264
265 Ok(Self { inner: wallet, db })
266 }
267}
268
269#[async_trait]
270impl MakeCpfp for OnchainWallet {
271 fn make_signed_p2a_cpfp(
272 &mut self,
273 tx: &Transaction,
274 fees: MakeCpfpFees,
275 ) -> Result<Transaction, CpfpError> {
276 MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
277 }
278
279 async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
280 self.inner.store_signed_p2a_cpfp(tx).await?;
281 self.persist().await
282 .map_err(|e| CpfpError::StoreError(e.to_string()))
283 }
284}
285
286#[async_trait]
287impl SignPsbt for OnchainWallet {
288 async fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
289 let tx = self.inner.finish_tx(psbt).await?;
290 self.persist().await?;
291 Ok(tx)
292 }
293}
294
295#[async_trait]
296impl ChainSync for OnchainWallet {
297 async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<()> {
298 debug!("Starting wallet sync...");
299 debug!("Starting balance: {}", self.inner.balance());
300 trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
301 let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
302
303 match chain.inner() {
304 ChainSourceClient::Bitcoind(bitcoind) => {
305 let prev_tip = self.inner.latest_checkpoint();
306 self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
307 },
308 ChainSourceClient::Esplora(client) => {
309 debug!("Syncing with esplora...");
310 let request = self.inner.start_sync_with_revealed_spks()
311 .outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
312 .txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
313
314 let update = client.sync(request, PARALLEL_REQS).await?;
315 self.inner.apply_update(update)?;
316 self.persist().await?;
317 debug!("Finished syncing with esplora");
318 },
319 }
320
321 debug!("Current balance: {}", self.inner.balance());
322 trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
323 self.rebroadcast_txs(chain, now).await?;
324
325 Ok(())
326 }
327}
328
329impl OnchainWallet {
330 pub fn balance(&self) -> Balance {
331 self.inner.balance()
332 }
333
334 pub fn list_unspent(&self) -> Vec<LocalOutput> {
335 self.inner.list_unspent().collect()
336 }
337
338 pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
339 self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
340 }
341
342 pub async fn address(&mut self) -> anyhow::Result<Address> {
343 let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
344 self.persist().await?;
345 Ok(ret)
346 }
347
348 pub fn utxos(&self) -> Vec<Utxo> {
349 self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
350 }
351
352 pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
353 ) -> anyhow::Result<Txid> {
354 let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
355 let tx = self.finish_tx(psbt).await?;
356 chain.broadcast_tx(&tx).await?;
357 Ok(tx.compute_txid())
358 }
359
360 pub async fn send_many(
361 &mut self,
362 chain: &ChainSource,
363 destinations: &[(Address, Amount)],
364 fee_rate: FeeRate,
365 ) -> anyhow::Result<Txid> {
366 let pbst = self.prepare_tx(destinations, fee_rate)?;
367 let tx = self.finish_tx(pbst).await?;
368 chain.broadcast_tx(&tx).await?;
369 Ok(tx.compute_txid())
370 }
371
372
373 pub async fn drain(
374 &mut self,
375 chain: &ChainSource,
376 destination: Address,
377 fee_rate: FeeRate,
378 ) -> anyhow::Result<Txid> {
379 let psbt = self.prepare_drain_tx(destination, fee_rate)?;
380 let tx = self.finish_tx(psbt).await?;
381 chain.broadcast_tx(&tx).await?;
382 Ok(tx.compute_txid())
383 }
384
385 pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
386 self.inner.build_tx()
387 }
388
389 async fn inner_sync_bitcoind(
390 &mut self,
391 bitcoind: &bitcoin_ext::rpc::Client,
392 prev_tip: CheckPoint,
393 ) -> anyhow::Result<()> {
394 debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
395 let mut emitter = bdk_bitcoind_rpc::Emitter::new(
396 bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
397 );
398 let mut count = 0;
399 while let Some(em) = emitter.next_block()? {
400 self.inner.apply_block_connected_to(
401 &em.block, em.block_height(), em.connected_to(),
402 )?;
403 count += 1;
404
405 if count % 10_000 == 0 {
406 self.persist().await?;
407 info!("Synced until block height {}", em.block_height());
408 }
409 }
410
411 let mempool = emitter.mempool()?;
412 self.inner.apply_evicted_txs(mempool.evicted);
413 self.inner.apply_unconfirmed_txs(mempool.update);
414 self.persist().await?;
415 debug!("Finished syncing with bitcoind");
416
417 Ok(())
418 }
419
420 async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
421 let balance = self.inner.balance();
422
423 let transactions = self.inner.transactions().filter(|tx| {
425 if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
426 match last_seen {
427 Some(last_seen) => last_seen < sync_start,
428 None => true,
429 }
430 } else {
431 false
432 }
433 }).collect::<Vec<_>>();
434
435 for tx in transactions {
436 if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
437 warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
438 }
439 }
440
441 Ok(balance.total())
442 }
443
444 pub async fn initial_wallet_scan(
445 &mut self,
446 chain: &ChainSource,
447 start_height: Option<BlockHeight>,
448 ) -> anyhow::Result<Amount> {
449 info!("Starting initial wallet sync...");
450 debug!("Starting balance: {}", self.inner.balance());
451 let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
452
453 match chain.inner() {
454 ChainSourceClient::Bitcoind(bitcoind) => {
455 let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
457 let block_hash = bitcoind.get_block_hash(height as u64)?;
458 self.inner.set_checkpoint(height, block_hash);
459 self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
460 },
461 ChainSourceClient::Esplora(client) => {
463 debug!("Starting full scan with esplora...");
464 let request = self.inner.start_full_scan();
465 let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
466 self.inner.apply_update(update)?;
467 self.persist().await?;
468 debug!("Finished scanning with esplora");
469 },
470 }
471
472 debug!("Current balance: {}", self.inner.balance());
473 self.rebroadcast_txs(chain, now).await
474 }
475
476
477 async fn persist(&mut self) -> anyhow::Result<()> {
478 if let Some(stage) = self.inner.staged() {
479 self.db.store_bdk_wallet_changeset(&*stage).await?;
480 let _ = self.inner.take_staged();
481 }
482 Ok(())
483 }
484}