1use std::ops::{Deref, DerefMut};
2use std::time::{SystemTime, UNIX_EPOCH};
3use std::sync::Arc;
4
5use anyhow::Context;
6use bdk_esplora::EsploraAsyncExt;
7use bdk_wallet::chain::{ChainPosition, CheckPoint};
8use bdk_wallet::Wallet as BdkWallet;
9use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
10use bdk_wallet::{Balance, KeychainKind, LocalOutput, SignOptions, TxBuilder, TxOrdering};
11use bitcoin::{
12 bip32, psbt, Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut,
13 Txid,
14};
15use log::{debug, error, info, trace, warn};
16
17use bitcoin_ext::{BlockHeight, BlockRef};
18use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
19use bitcoin_ext::cpfp::CpfpError;
20use bitcoin_ext::rpc::RpcApi;
21
22use crate::exit::ExitVtxo;
23use crate::exit::models::ExitState;
24use crate::onchain::{
25 ChainSource, LocalUtxo, GetBalance, GetSpendingTx, GetWalletTx, MakeCpfp, MakeCpfpFees,
26 PreparePsbt, SignPsbt, Utxo,
27};
28use crate::onchain::chain::ChainSourceClient;
29use crate::persist::BarkPersister;
30use crate::psbtext::PsbtInputExt;
31
32const STOP_GAP: usize = 50;
33const PARALLEL_REQS: usize = 4;
34const GENESIS_HEIGHT: u32 = 0;
35
36impl From<LocalOutput> for LocalUtxo {
37 fn from(value: LocalOutput) -> Self {
38 LocalUtxo {
39 outpoint: value.outpoint,
40 amount: value.txout.value,
41 confirmation_height: value.chain_position.confirmation_height_upper_bound(),
42 }
43 }
44}
45
46pub trait TxBuilderExt {
51 fn add_exit_claim_inputs(&mut self, exit_outputs: &[&ExitVtxo]) -> anyhow::Result<()>;
52}
53
54impl<Cs> TxBuilderExt for TxBuilder<'_, Cs> {
55 fn add_exit_claim_inputs(&mut self, exit_outputs: &[&ExitVtxo]) -> anyhow::Result<()> {
56 self.version(2);
57
58 for input in exit_outputs {
59 if !matches!(input.state(), ExitState::Claimable(..)) {
60 bail!("VTXO exit is not spendable");
61 }
62
63 let vtxo = input.vtxo();
64
65 let mut psbt_in = psbt::Input::default();
66 psbt_in.set_exit_claim_input(&vtxo);
67 psbt_in.witness_utxo = Some(TxOut {
68 script_pubkey: vtxo.output_script_pubkey(),
69 value: vtxo.amount(),
70 });
71
72 self.add_foreign_utxo_with_sequence(
73 vtxo.point(),
74 psbt_in,
75 vtxo.claim_satisfaction_weight(),
76 Sequence::from_height(vtxo.exit_delta()),
77 ).expect("error adding foreign utxo for claim input");
78 }
79
80 Ok(())
81 }
82}
83
84impl <W: Deref<Target = BdkWallet>> GetBalance for W {
85 fn get_balance(&self) -> Amount {
86 self.deref().balance().total()
87 }
88}
89
90impl SignPsbt for BdkWallet {
91 fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
92 let opts = SignOptions {
93 trust_witness_utxo: true,
94 ..Default::default()
95 };
96
97 let finalized = self.sign(&mut psbt, opts).context("signing error")?;
98 assert!(finalized);
99 let tx = psbt.extract_tx()?;
100 let unix = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
101 self.apply_unconfirmed_txs([(tx.clone(), unix)]);
102 Ok(tx)
103 }
104}
105
106impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
107 fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
112 self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
113 }
114
115 fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
116 match self.deref().get_tx(txid) {
117 Some(tx) => match tx.chain_position {
118 ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
119 ChainPosition::Unconfirmed { .. } => Ok(None),
120 },
121 None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
122 }
123 }
124}
125
126impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
127 fn prepare_tx(
128 &mut self,
129 destinations: &[(Address, Amount)],
130 fee_rate: FeeRate,
131 ) -> anyhow::Result<Psbt> {
132 let mut b = self.deref_mut().build_tx();
133 b.ordering(TxOrdering::Untouched);
134 for (dest, amount) in destinations {
135 b.add_recipient(dest.script_pubkey(), *amount);
136 }
137 b.fee_rate(fee_rate);
138 b.finish().context("error building tx")
139 }
140
141 fn prepare_drain_tx(
142 &mut self,
143 destination: Address,
144 fee_rate: FeeRate,
145 ) -> anyhow::Result<Psbt> {
146 let mut b = self.deref_mut().build_tx();
147 b.drain_to(destination.script_pubkey());
148 b.fee_rate(fee_rate);
149 b.drain_wallet();
150 b.finish().context("error building tx")
151 }
152}
153
154impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
155 fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
156 for transaction in self.deref().transactions() {
157 if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
158 return Some(transaction.tx_node.tx);
159 }
160 }
161 None
162 }
163}
164
165impl MakeCpfp for BdkWallet {
166 fn make_signed_p2a_cpfp(
167 &mut self,
168 tx: &Transaction,
169 fees: MakeCpfpFees,
170 ) -> Result<Transaction, CpfpError> {
171 WalletExt::make_signed_p2a_cpfp(self, tx, fees)
172 .inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
173 .map_err(|e| match e {
174 CpfpInternalError::General(s) => CpfpError::InternalError(s),
175 CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
176 CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
177 CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
178 CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
179 CpfpInternalError::InsufficientConfirmedFunds(f) => {
180 CpfpError::InsufficientConfirmedFunds {
181 needed: f.needed, available: f.available,
182 }
183 },
184 CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
185 CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
186 })
187 }
188
189 fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
190 let unix = SystemTime::now().duration_since(UNIX_EPOCH)
191 .map_err(|e| CpfpError::InternalError(
192 format!("Unable to calculate time since UNIX epoch: {}", e.to_string()))
193 )?.as_secs();
194 self.apply_unconfirmed_txs([(tx.clone(), unix)]);
195 trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
196 Ok(())
197 }
198}
199
200pub struct OnchainWallet {
207 pub inner: BdkWallet,
208 db: Arc<dyn BarkPersister>,
209}
210
211impl Deref for OnchainWallet {
212 type Target = BdkWallet;
213
214 fn deref(&self) -> &Self::Target {
215 &self.inner
216 }
217}
218
219impl DerefMut for OnchainWallet {
220 fn deref_mut(&mut self) -> &mut Self::Target {
221 &mut self.inner
222 }
223}
224
225impl OnchainWallet {
226 pub fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
227 let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
228 let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
229
230 let changeset = db.initialize_bdk_wallet().context("error reading bdk wallet state")?;
231 let wallet_opt = bdk_wallet::Wallet::load()
232 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
233 .extract_keys()
234 .check_network(network)
235 .load_wallet_no_persist(changeset)?;
236
237 let wallet = match wallet_opt {
238 Some(wallet) => wallet,
239 None => bdk_wallet::Wallet::create_single(desc)
240 .network(network)
241 .create_wallet_no_persist()?,
242 };
243
244 Ok(Self { inner: wallet, db })
245 }
246}
247
248impl MakeCpfp for OnchainWallet {
249 fn make_signed_p2a_cpfp(
250 &mut self,
251 tx: &Transaction,
252 fees: MakeCpfpFees,
253 ) -> Result<Transaction, CpfpError> {
254 MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
255 }
256
257 fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
258 self.inner.store_signed_p2a_cpfp(tx)?;
259 self.persist()
260 .map_err(|e| CpfpError::StoreError(e.to_string()))
261 }
262}
263
264impl SignPsbt for OnchainWallet {
265 fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
266 let tx = self.inner.finish_tx(psbt)?;
267 self.persist()?;
268 Ok(tx)
269 }
270}
271
272impl OnchainWallet {
273 pub fn balance(&self) -> Balance {
274 self.inner.balance()
275 }
276
277 pub fn list_unspent(&self) -> Vec<LocalOutput> {
278 self.inner.list_unspent().collect()
279 }
280
281 pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
282 self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
283 }
284
285 pub fn address(&mut self) -> anyhow::Result<Address> {
286 let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
287 self.persist()?;
288 Ok(ret)
289 }
290
291 pub fn utxos(&self) -> Vec<Utxo> {
292 self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
293 }
294
295 pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
296 ) -> anyhow::Result<Txid> {
297 let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
298 let tx = self.finish_tx(psbt)?;
299 chain.broadcast_tx(&tx).await?;
300 Ok(tx.compute_txid())
301 }
302
303 pub async fn send_many(
304 &mut self,
305 chain: &ChainSource,
306 destinations: &[(Address, Amount)],
307 fee_rate: FeeRate,
308 ) -> anyhow::Result<Txid> {
309 let pbst = self.prepare_tx(destinations, fee_rate)?;
310 let tx = self.finish_tx(pbst)?;
311 chain.broadcast_tx(&tx).await?;
312 Ok(tx.compute_txid())
313 }
314
315
316 pub async fn drain(
317 &mut self,
318 chain: &ChainSource,
319 destination: Address,
320 fee_rate: FeeRate,
321 ) -> anyhow::Result<Txid> {
322 let psbt = self.prepare_drain_tx(destination, fee_rate)?;
323 let tx = self.finish_tx(psbt)?;
324 chain.broadcast_tx(&tx).await?;
325 Ok(tx.compute_txid())
326 }
327
328 pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
329 self.inner.build_tx()
330 }
331
332 async fn inner_sync_bitcoind(
333 &mut self,
334 bitcoind: &bitcoin_ext::rpc::Client,
335 prev_tip: CheckPoint,
336 ) -> anyhow::Result<()> {
337 debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
338 let mut emitter = bdk_bitcoind_rpc::Emitter::new(
339 bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
340 );
341 let mut count = 0;
342 while let Some(em) = emitter.next_block()? {
343 self.inner.apply_block_connected_to(
344 &em.block, em.block_height(), em.connected_to(),
345 )?;
346 count += 1;
347
348 if count % 10_000 == 0 {
349 self.persist()?;
350 info!("Synced until block height {}", em.block_height());
351 }
352 }
353
354 let mempool = emitter.mempool()?;
355 self.inner.apply_evicted_txs(mempool.evicted);
356 self.inner.apply_unconfirmed_txs(mempool.update);
357 self.persist()?;
358 debug!("Finished syncing with bitcoind");
359
360 Ok(())
361 }
362
363 async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
364 let balance = self.inner.balance();
365
366 let transactions = self.inner.transactions().filter(|tx| {
368 if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
369 match last_seen {
370 Some(last_seen) => last_seen < sync_start,
371 None => true,
372 }
373 } else {
374 false
375 }
376 }).collect::<Vec<_>>();
377
378 for tx in transactions {
379 if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
380 warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
381 }
382 }
383
384 Ok(balance.total())
385 }
386
387 pub async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<Amount> {
388 debug!("Starting wallet sync...");
389 debug!("Starting balance: {}", self.inner.balance());
390 trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
391 let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
392
393 match chain.inner() {
394 ChainSourceClient::Bitcoind(bitcoind) => {
395 let prev_tip = self.inner.latest_checkpoint();
396 self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
397 },
398 ChainSourceClient::Esplora(client) => {
399 debug!("Syncing with esplora...");
400 let request = self.inner.start_sync_with_revealed_spks()
401 .outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
402 .txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
403
404 let update = client.sync(request, PARALLEL_REQS).await?;
405 self.inner.apply_update(update)?;
406 self.persist()?;
407 debug!("Finished syncing with esplora");
408 },
409 }
410
411 debug!("Current balance: {}", self.inner.balance());
412 trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
413 self.rebroadcast_txs(chain, now).await
414 }
415
416 pub async fn initial_wallet_scan(
417 &mut self,
418 chain: &ChainSource,
419 start_height: Option<BlockHeight>,
420 ) -> anyhow::Result<Amount> {
421 info!("Starting initial wallet sync...");
422 debug!("Starting balance: {}", self.inner.balance());
423 let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("now").as_secs();
424
425 match chain.inner() {
426 ChainSourceClient::Bitcoind(bitcoind) => {
427 let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
429 let block_hash = bitcoind.get_block_hash(height as u64)?;
430 self.inner.set_checkpoint(height, block_hash);
431 self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
432 },
433 ChainSourceClient::Esplora(client) => {
435 debug!("Starting full scan with esplora...");
436 let request = self.inner.start_full_scan();
437 let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
438 self.inner.apply_update(update)?;
439 self.persist()?;
440 debug!("Finished scanning with esplora");
441 },
442 }
443
444 debug!("Current balance: {}", self.inner.balance());
445 self.rebroadcast_txs(chain, now).await
446 }
447
448
449 fn persist(&mut self) -> anyhow::Result<()> {
450 if let Some(stage) = self.inner.staged() {
451 self.db.store_bdk_wallet_changeset(&*stage)?;
452 let _ = self.inner.take_staged();
453 }
454 Ok(())
455 }
456}