1use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS};
13use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14use crate::io;
15use crate::ln::msgs::DecodeError;
16use crate::ln::types::ChannelId;
17use crate::prelude::*;
18use crate::sign::{
19 ChangeDestinationSource, ChangeDestinationSourceSync, ChangeDestinationSourceSyncWrapper,
20 OutputSpender, SpendableOutputDescriptor,
21};
22use crate::sync::Mutex;
23use crate::util::logger::Logger;
24use crate::util::persist::{
25 KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY,
26 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
27};
28use crate::util::ser::{Readable, ReadableArgs, Writeable};
29use crate::{impl_writeable_tlv_based, log_debug, log_error};
30
31use bitcoin::block::Header;
32use bitcoin::locktime::absolute::LockTime;
33use bitcoin::secp256k1::Secp256k1;
34use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid};
35
36use core::future::Future;
37use core::ops::Deref;
38use core::sync::atomic::{AtomicBool, Ordering};
39use core::task;
40
41use super::async_poll::{dummy_waker, AsyncResult};
42
43pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY;
45
46#[derive(Clone, Debug, PartialEq, Eq)]
48pub struct TrackedSpendableOutput {
49 pub descriptor: SpendableOutputDescriptor,
51 pub channel_id: Option<ChannelId>,
55 pub status: OutputSpendStatus,
57}
58
59impl TrackedSpendableOutput {
60 fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
61 let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
62 match &self.descriptor {
63 SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
64 WatchedOutput {
65 block_hash,
66 outpoint: *outpoint,
67 script_pubkey: output.script_pubkey.clone(),
68 }
69 },
70 SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
71 block_hash,
72 outpoint: output.outpoint,
73 script_pubkey: output.output.script_pubkey.clone(),
74 },
75 SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
76 block_hash,
77 outpoint: output.outpoint,
78 script_pubkey: output.output.script_pubkey.clone(),
79 },
80 }
81 }
82
83 pub fn is_spent_in(&self, tx: &Transaction) -> bool {
85 let prev_outpoint = self.descriptor.spendable_outpoint().into_bitcoin_outpoint();
86 tx.input.iter().any(|input| input.previous_output == prev_outpoint)
87 }
88}
89
90impl_writeable_tlv_based!(TrackedSpendableOutput, {
91 (0, descriptor, required),
92 (2, channel_id, option),
93 (4, status, required),
94});
95
96#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum OutputSpendStatus {
99 PendingInitialBroadcast {
102 delayed_until_height: Option<u32>,
104 },
105 PendingFirstConfirmation {
107 first_broadcast_hash: BlockHash,
109 latest_broadcast_height: u32,
111 latest_spending_tx: Transaction,
113 },
114 PendingThresholdConfirmations {
121 first_broadcast_hash: BlockHash,
123 latest_broadcast_height: u32,
125 latest_spending_tx: Transaction,
127 confirmation_height: u32,
129 confirmation_hash: BlockHash,
131 },
132}
133
134impl OutputSpendStatus {
135 fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
136 match self {
137 Self::PendingInitialBroadcast { delayed_until_height } => {
138 if let Some(delayed_until_height) = delayed_until_height {
139 debug_assert!(
140 cur_height >= *delayed_until_height,
141 "We should never broadcast before the required height is reached."
142 );
143 }
144 *self = Self::PendingFirstConfirmation {
145 first_broadcast_hash: cur_hash,
146 latest_broadcast_height: cur_height,
147 latest_spending_tx,
148 };
149 },
150 Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
151 *self = Self::PendingFirstConfirmation {
152 first_broadcast_hash: *first_broadcast_hash,
153 latest_broadcast_height: cur_height,
154 latest_spending_tx,
155 };
156 },
157 Self::PendingThresholdConfirmations { .. } => {
158 debug_assert!(false, "We should never rebroadcast confirmed transactions.");
159 },
160 }
161 }
162
163 fn confirmed(
164 &mut self, confirmation_hash: BlockHash, confirmation_height: u32,
165 latest_spending_tx: Transaction,
166 ) {
167 match self {
168 Self::PendingInitialBroadcast { .. } => {
169 debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
172 *self = Self::PendingThresholdConfirmations {
173 first_broadcast_hash: confirmation_hash,
174 latest_broadcast_height: confirmation_height,
175 latest_spending_tx,
176 confirmation_height,
177 confirmation_hash,
178 };
179 },
180 Self::PendingFirstConfirmation {
181 first_broadcast_hash,
182 latest_broadcast_height,
183 ..
184 } => {
185 *self = Self::PendingThresholdConfirmations {
186 first_broadcast_hash: *first_broadcast_hash,
187 latest_broadcast_height: *latest_broadcast_height,
188 latest_spending_tx,
189 confirmation_height,
190 confirmation_hash,
191 };
192 },
193 Self::PendingThresholdConfirmations {
194 first_broadcast_hash,
195 latest_broadcast_height,
196 ..
197 } => {
198 *self = Self::PendingThresholdConfirmations {
199 first_broadcast_hash: *first_broadcast_hash,
200 latest_broadcast_height: *latest_broadcast_height,
201 latest_spending_tx,
202 confirmation_height,
203 confirmation_hash,
204 };
205 },
206 }
207 }
208
209 fn unconfirmed(&mut self) {
210 match self {
211 Self::PendingInitialBroadcast { .. } => {
212 debug_assert!(
213 false,
214 "We should only mark a spend as unconfirmed if it used to be confirmed."
215 );
216 },
217 Self::PendingFirstConfirmation { .. } => {
218 debug_assert!(
219 false,
220 "We should only mark a spend as unconfirmed if it used to be confirmed."
221 );
222 },
223 Self::PendingThresholdConfirmations {
224 first_broadcast_hash,
225 latest_broadcast_height,
226 latest_spending_tx,
227 ..
228 } => {
229 *self = Self::PendingFirstConfirmation {
230 first_broadcast_hash: *first_broadcast_hash,
231 latest_broadcast_height: *latest_broadcast_height,
232 latest_spending_tx: latest_spending_tx.clone(),
233 };
234 },
235 }
236 }
237
238 fn is_delayed(&self, cur_height: u32) -> bool {
239 match self {
240 Self::PendingInitialBroadcast { delayed_until_height } => {
241 delayed_until_height.map_or(false, |req_height| cur_height < req_height)
242 },
243 Self::PendingFirstConfirmation { .. } => false,
244 Self::PendingThresholdConfirmations { .. } => false,
245 }
246 }
247
248 fn first_broadcast_hash(&self) -> Option<BlockHash> {
249 match self {
250 Self::PendingInitialBroadcast { .. } => None,
251 Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
252 Some(*first_broadcast_hash)
253 },
254 Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
255 Some(*first_broadcast_hash)
256 },
257 }
258 }
259
260 fn latest_broadcast_height(&self) -> Option<u32> {
261 match self {
262 Self::PendingInitialBroadcast { .. } => None,
263 Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
264 Some(*latest_broadcast_height)
265 },
266 Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
267 Some(*latest_broadcast_height)
268 },
269 }
270 }
271
272 fn confirmation_height(&self) -> Option<u32> {
273 match self {
274 Self::PendingInitialBroadcast { .. } => None,
275 Self::PendingFirstConfirmation { .. } => None,
276 Self::PendingThresholdConfirmations { confirmation_height, .. } => {
277 Some(*confirmation_height)
278 },
279 }
280 }
281
282 fn latest_spending_tx(&self) -> Option<&Transaction> {
283 match self {
284 Self::PendingInitialBroadcast { .. } => None,
285 Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
286 Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
287 Some(latest_spending_tx)
288 },
289 }
290 }
291
292 fn is_confirmed(&self) -> bool {
293 match self {
294 Self::PendingInitialBroadcast { .. } => false,
295 Self::PendingFirstConfirmation { .. } => false,
296 Self::PendingThresholdConfirmations { .. } => true,
297 }
298 }
299}
300
301impl_writeable_tlv_based_enum!(OutputSpendStatus,
302 (0, PendingInitialBroadcast) => {
303 (0, delayed_until_height, option),
304 },
305 (2, PendingFirstConfirmation) => {
306 (0, first_broadcast_hash, required),
307 (2, latest_broadcast_height, required),
308 (4, latest_spending_tx, required),
309 },
310 (4, PendingThresholdConfirmations) => {
311 (0, first_broadcast_hash, required),
312 (2, latest_broadcast_height, required),
313 (4, latest_spending_tx, required),
314 (6, confirmation_height, required),
315 (8, confirmation_hash, required),
316 },
317);
318
319pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
340where
341 B::Target: BroadcasterInterface,
342 D::Target: ChangeDestinationSource,
343 E::Target: FeeEstimator,
344 F::Target: Filter,
345 K::Target: KVStore,
346 L::Target: Logger,
347 O::Target: OutputSpender,
348{
349 sweeper_state: Mutex<SweeperState>,
350 pending_sweep: AtomicBool,
351 broadcaster: B,
352 fee_estimator: E,
353 chain_data_source: Option<F>,
354 output_spender: O,
355 change_destination_source: D,
356 kv_store: K,
357 logger: L,
358}
359
360impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
361 OutputSweeper<B, D, E, F, K, L, O>
362where
363 B::Target: BroadcasterInterface,
364 D::Target: ChangeDestinationSource,
365 E::Target: FeeEstimator,
366 F::Target: Filter,
367 K::Target: KVStore,
368 L::Target: Logger,
369 O::Target: OutputSpender,
370{
371 pub fn new(
376 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
377 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
378 ) -> Self {
379 let outputs = Vec::new();
380 let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
381 Self {
382 sweeper_state,
383 pending_sweep: AtomicBool::new(false),
384 broadcaster,
385 fee_estimator,
386 chain_data_source,
387 output_spender,
388 change_destination_source,
389 kv_store,
390 logger,
391 }
392 }
393
394 pub async fn track_spendable_outputs(
410 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
411 exclude_static_outputs: bool, delay_until_height: Option<u32>,
412 ) -> Result<(), ()> {
413 let mut relevant_descriptors = output_descriptors
414 .into_iter()
415 .filter(|desc| {
416 !(exclude_static_outputs
417 && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
418 })
419 .peekable();
420
421 if relevant_descriptors.peek().is_none() {
422 return Ok(());
423 }
424
425 self.update_state(|state_lock| -> Result<((), bool), ()> {
426 for descriptor in relevant_descriptors {
427 let output_info = TrackedSpendableOutput {
428 descriptor,
429 channel_id,
430 status: OutputSpendStatus::PendingInitialBroadcast {
431 delayed_until_height: delay_until_height,
432 },
433 };
434
435 if state_lock
436 .outputs
437 .iter()
438 .find(|o| o.descriptor == output_info.descriptor)
439 .is_some()
440 {
441 continue;
442 }
443
444 state_lock.outputs.push(output_info);
445 state_lock.dirty = true;
446 }
447
448 Ok(((), false))
449 })
450 .await
451 }
452
453 pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
455 self.sweeper_state.lock().unwrap().outputs.clone()
456 }
457
458 pub fn current_best_block(&self) -> BestBlock {
461 self.sweeper_state.lock().unwrap().best_block
462 }
463
464 pub async fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
467 if self
469 .pending_sweep
470 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
471 .is_err()
472 {
473 return Ok(());
474 }
475
476 let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
477
478 self.pending_sweep.store(false, Ordering::Release);
480
481 result
482 }
483
484 async fn regenerate_and_broadcast_spend_if_necessary_internal(&self) -> Result<(), ()> {
486 let filter_fn = |o: &TrackedSpendableOutput, cur_height: u32| {
487 if o.status.is_confirmed() {
488 return false;
490 }
491
492 if o.status.is_delayed(cur_height) {
493 return false;
495 }
496
497 if o.status.latest_broadcast_height() >= Some(cur_height) {
498 return false;
500 }
501
502 true
503 };
504
505 let has_respends = self
507 .update_state(|sweeper_state| -> Result<(bool, bool), ()> {
508 let cur_height = sweeper_state.best_block.height;
509 let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
510
511 Ok((has_respends, has_respends))
513 })
514 .await?;
515
516 if !has_respends {
517 return Ok(());
518 }
519
520 let change_destination_script =
522 self.change_destination_source.get_change_destination_script().await?;
523
524 let spending_tx = self
526 .update_state(|sweeper_state| -> Result<(Option<Transaction>, bool), ()> {
527 let cur_height = sweeper_state.best_block.height;
528 let cur_hash = sweeper_state.best_block.block_hash;
529
530 let respend_descriptors_set: HashSet<&SpendableOutputDescriptor> = sweeper_state
531 .outputs
532 .iter()
533 .filter(|o| filter_fn(*o, cur_height))
534 .map(|o| &o.descriptor)
535 .collect();
536
537 let respend_descriptors: Vec<&SpendableOutputDescriptor> =
541 respend_descriptors_set.into_iter().collect();
542
543 if !respend_descriptors.is_empty() {
545 let spending_tx = self
546 .spend_outputs(
547 &sweeper_state,
548 &respend_descriptors,
549 change_destination_script,
550 )
551 .map_err(|e| {
552 log_error!(self.logger, "Error spending outputs: {:?}", e);
553 })?;
554
555 log_debug!(
556 self.logger,
557 "Generating and broadcasting sweeping transaction {}",
558 spending_tx.compute_txid()
559 );
560
561 let respend_outputs =
564 sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
565 for output_info in respend_outputs {
566 if let Some(filter) = self.chain_data_source.as_ref() {
567 let watched_output = output_info.to_watched_output(cur_hash);
568 filter.register_output(watched_output);
569 }
570
571 output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
572 sweeper_state.dirty = true;
573 }
574
575 Ok((Some(spending_tx), false))
576 } else {
577 Ok((None, false))
578 }
579 })
580 .await?;
581
582 if let Some(spending_tx) = spending_tx {
584 self.broadcaster.broadcast_transactions(&[&spending_tx]);
585 }
586
587 Ok(())
588 }
589
590 fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
591 let cur_height = sweeper_state.best_block.height;
592
593 sweeper_state.outputs.retain(|o| {
595 if let Some(confirmation_height) = o.status.confirmation_height() {
596 if cur_height >= confirmation_height + PRUNE_DELAY_BLOCKS - 1 {
599 log_debug!(self.logger,
600 "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
601 o.status.latest_spending_tx().map(|t| t.compute_txid()), o.descriptor
602 );
603 return false;
604 }
605 }
606 true
607 });
608
609 sweeper_state.dirty = true;
610 }
611
612 fn persist_state<'a>(&self, sweeper_state: &SweeperState) -> AsyncResult<'a, (), io::Error> {
613 let encoded = sweeper_state.encode();
614
615 self.kv_store.write(
616 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
617 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
618 OUTPUT_SWEEPER_PERSISTENCE_KEY,
619 encoded,
620 )
621 }
622
623 async fn update_state<X>(
627 &self, callback: impl FnOnce(&mut SweeperState) -> Result<(X, bool), ()>,
628 ) -> Result<X, ()> {
629 let (fut, res) = {
630 let mut state_lock = self.sweeper_state.lock().unwrap();
631
632 let (res, skip_persist) = callback(&mut state_lock)?;
633 if !state_lock.dirty || skip_persist {
634 return Ok(res);
635 }
636
637 state_lock.dirty = false;
638
639 (self.persist_state(&state_lock), res)
640 };
641
642 fut.await.map_err(|e| {
643 self.sweeper_state.lock().unwrap().dirty = true;
644
645 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
646 })?;
647
648 Ok(res)
649 }
650
651 fn spend_outputs(
652 &self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
653 change_destination_script: ScriptBuf,
654 ) -> Result<Transaction, ()> {
655 let tx_feerate =
656 self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
657 let cur_height = sweeper_state.best_block.height;
658 let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
659 self.output_spender.spend_spendable_outputs(
660 descriptors,
661 Vec::new(),
662 change_destination_script,
663 tx_feerate,
664 locktime,
665 &Secp256k1::new(),
666 )
667 }
668
669 fn transactions_confirmed_internal(
670 &self, sweeper_state: &mut SweeperState, header: &Header,
671 txdata: &chain::transaction::TransactionData, height: u32,
672 ) {
673 let confirmation_hash = header.block_hash();
674 for (_, tx) in txdata {
675 for output_info in sweeper_state.outputs.iter_mut() {
676 if output_info.is_spent_in(*tx) {
677 output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
678 }
679 }
680 }
681
682 sweeper_state.dirty = true;
683 }
684
685 fn best_block_updated_internal(
686 &self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
687 ) {
688 sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
689 self.prune_confirmed_outputs(sweeper_state);
690
691 sweeper_state.dirty = true;
692 }
693}
694
695impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
696 for OutputSweeper<B, D, E, F, K, L, O>
697where
698 B::Target: BroadcasterInterface,
699 D::Target: ChangeDestinationSource,
700 E::Target: FeeEstimator,
701 F::Target: Filter + Sync + Send,
702 K::Target: KVStore,
703 L::Target: Logger,
704 O::Target: OutputSpender,
705{
706 fn filtered_block_connected(
707 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
708 ) {
709 let mut state_lock = self.sweeper_state.lock().unwrap();
710 assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
711 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
712 assert_eq!(state_lock.best_block.height, height - 1,
713 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
714
715 self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
716 self.best_block_updated_internal(&mut state_lock, header, height);
717 }
718
719 fn blocks_disconnected(&self, fork_point: BestBlock) {
720 let mut state_lock = self.sweeper_state.lock().unwrap();
721
722 assert!(state_lock.best_block.height > fork_point.height,
723 "Blocks disconnected must indicate disconnection from the current best height, i.e. the new chain tip must be lower than the previous best height");
724 state_lock.best_block = fork_point;
725
726 for output_info in state_lock.outputs.iter_mut() {
727 if output_info.status.confirmation_height() > Some(fork_point.height) {
728 output_info.status.unconfirmed();
729 }
730 }
731
732 state_lock.dirty = true;
733 }
734}
735
736impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
737 for OutputSweeper<B, D, E, F, K, L, O>
738where
739 B::Target: BroadcasterInterface,
740 D::Target: ChangeDestinationSource,
741 E::Target: FeeEstimator,
742 F::Target: Filter + Sync + Send,
743 K::Target: KVStore,
744 L::Target: Logger,
745 O::Target: OutputSpender,
746{
747 fn transactions_confirmed(
748 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
749 ) {
750 let mut state_lock = self.sweeper_state.lock().unwrap();
751 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
752 }
753
754 fn transaction_unconfirmed(&self, txid: &Txid) {
755 let mut state_lock = self.sweeper_state.lock().unwrap();
756
757 let unconf_height = state_lock
759 .outputs
760 .iter()
761 .find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
762 .and_then(|o| o.status.confirmation_height());
763
764 if let Some(unconf_height) = unconf_height {
765 state_lock
767 .outputs
768 .iter_mut()
769 .filter(|o| o.status.confirmation_height() >= Some(unconf_height))
770 .for_each(|o| o.status.unconfirmed());
771
772 state_lock.dirty = true;
773 }
774 }
775
776 fn best_block_updated(&self, header: &Header, height: u32) {
777 let mut state_lock = self.sweeper_state.lock().unwrap();
778 self.best_block_updated_internal(&mut state_lock, header, height);
779 }
780
781 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
782 let state_lock = self.sweeper_state.lock().unwrap();
783 state_lock
784 .outputs
785 .iter()
786 .filter_map(|o| match o.status {
787 OutputSpendStatus::PendingThresholdConfirmations {
788 ref latest_spending_tx,
789 confirmation_height,
790 confirmation_hash,
791 ..
792 } => Some((
793 latest_spending_tx.compute_txid(),
794 confirmation_height,
795 Some(confirmation_hash),
796 )),
797 _ => None,
798 })
799 .collect::<Vec<_>>()
800 }
801}
802
803#[derive(Debug, Clone)]
804struct SweeperState {
805 outputs: Vec<TrackedSpendableOutput>,
806 best_block: BestBlock,
807 dirty: bool,
808}
809
810impl_writeable_tlv_based!(SweeperState, {
811 (0, outputs, required_vec),
812 (2, best_block, required),
813 (_unused, dirty, (static_value, false)),
814});
815
816#[derive(Debug, Clone)]
819pub enum SpendingDelay {
820 Relative {
823 num_blocks: u32,
825 },
826 Absolute {
828 height: u32,
830 },
831}
832
833impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
834 ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
835where
836 B::Target: BroadcasterInterface,
837 D::Target: ChangeDestinationSource,
838 E::Target: FeeEstimator,
839 F::Target: Filter + Sync + Send,
840 K::Target: KVStore,
841 L::Target: Logger,
842 O::Target: OutputSpender,
843{
844 #[inline]
845 fn read<R: io::Read>(
846 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
847 ) -> Result<Self, DecodeError> {
848 let (
849 broadcaster,
850 fee_estimator,
851 chain_data_source,
852 output_spender,
853 change_destination_source,
854 kv_store,
855 logger,
856 ) = args;
857 let state = SweeperState::read(reader)?;
858 let best_block = state.best_block;
859
860 if let Some(filter) = chain_data_source.as_ref() {
861 for output_info in &state.outputs {
862 let watched_output = output_info.to_watched_output(best_block.block_hash);
863 filter.register_output(watched_output);
864 }
865 }
866
867 let sweeper_state = Mutex::new(state);
868 Ok((
869 best_block,
870 OutputSweeper {
871 sweeper_state,
872 pending_sweep: AtomicBool::new(false),
873 broadcaster,
874 fee_estimator,
875 chain_data_source,
876 output_spender,
877 change_destination_source,
878 kv_store,
879 logger,
880 },
881 ))
882 }
883}
884
885pub struct OutputSweeperSync<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
904where
905 B::Target: BroadcasterInterface,
906 D::Target: ChangeDestinationSourceSync,
907 E::Target: FeeEstimator,
908 F::Target: Filter,
909 K::Target: KVStoreSync,
910 L::Target: Logger,
911 O::Target: OutputSpender,
912{
913 sweeper:
914 OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>,
915}
916
917impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
918 OutputSweeperSync<B, D, E, F, K, L, O>
919where
920 B::Target: BroadcasterInterface,
921 D::Target: ChangeDestinationSourceSync,
922 E::Target: FeeEstimator,
923 F::Target: Filter,
924 K::Target: KVStoreSync,
925 L::Target: Logger,
926 O::Target: OutputSpender,
927{
928 pub fn new(
933 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
934 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
935 ) -> Self {
936 let change_destination_source =
937 ChangeDestinationSourceSyncWrapper::new(change_destination_source);
938
939 let kv_store = KVStoreSyncWrapper(kv_store);
940
941 let sweeper = OutputSweeper::new(
942 best_block,
943 broadcaster,
944 fee_estimator,
945 chain_data_source,
946 output_spender,
947 change_destination_source,
948 kv_store,
949 logger,
950 );
951 Self { sweeper }
952 }
953
954 pub fn track_spendable_outputs(
970 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
971 exclude_static_outputs: bool, delay_until_height: Option<u32>,
972 ) -> Result<(), ()> {
973 let mut fut = Box::pin(self.sweeper.track_spendable_outputs(
974 output_descriptors,
975 channel_id,
976 exclude_static_outputs,
977 delay_until_height,
978 ));
979 let mut waker = dummy_waker();
980 let mut ctx = task::Context::from_waker(&mut waker);
981 match fut.as_mut().poll(&mut ctx) {
982 task::Poll::Ready(result) => result,
983 task::Poll::Pending => {
984 unreachable!("OutputSweeper::track_spendable_outputs should not be pending in a sync context");
986 },
987 }
988 }
989
990 pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
994 self.sweeper.tracked_spendable_outputs()
995 }
996
997 pub fn current_best_block(&self) -> BestBlock {
1000 self.sweeper.current_best_block()
1001 }
1002
1003 pub fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
1008 let mut fut = Box::pin(self.sweeper.regenerate_and_broadcast_spend_if_necessary());
1009 let mut waker = dummy_waker();
1010 let mut ctx = task::Context::from_waker(&mut waker);
1011 match fut.as_mut().poll(&mut ctx) {
1012 task::Poll::Ready(result) => result,
1013 task::Poll::Pending => {
1014 unreachable!("OutputSweeper::regenerate_and_broadcast_spend_if_necessary should not be pending in a sync context");
1016 },
1017 }
1018 }
1019
1020 #[doc(hidden)]
1029 pub fn sweeper_async(
1030 &self,
1031 ) -> &OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>
1032 {
1033 &self.sweeper
1034 }
1035}
1036
1037impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
1038 for OutputSweeperSync<B, D, E, F, K, L, O>
1039where
1040 B::Target: BroadcasterInterface,
1041 D::Target: ChangeDestinationSourceSync,
1042 E::Target: FeeEstimator,
1043 F::Target: Filter + Sync + Send,
1044 K::Target: KVStoreSync,
1045 L::Target: Logger,
1046 O::Target: OutputSpender,
1047{
1048 fn filtered_block_connected(
1049 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1050 ) {
1051 self.sweeper.filtered_block_connected(header, txdata, height);
1052 }
1053
1054 fn blocks_disconnected(&self, fork_point: BestBlock) {
1055 self.sweeper.blocks_disconnected(fork_point);
1056 }
1057}
1058
1059impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
1060 for OutputSweeperSync<B, D, E, F, K, L, O>
1061where
1062 B::Target: BroadcasterInterface,
1063 D::Target: ChangeDestinationSourceSync,
1064 E::Target: FeeEstimator,
1065 F::Target: Filter + Sync + Send,
1066 K::Target: KVStoreSync,
1067 L::Target: Logger,
1068 O::Target: OutputSpender,
1069{
1070 fn transactions_confirmed(
1071 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1072 ) {
1073 self.sweeper.transactions_confirmed(header, txdata, height)
1074 }
1075
1076 fn transaction_unconfirmed(&self, txid: &Txid) {
1077 self.sweeper.transaction_unconfirmed(txid)
1078 }
1079
1080 fn best_block_updated(&self, header: &Header, height: u32) {
1081 self.sweeper.best_block_updated(header, height);
1082 }
1083
1084 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
1085 self.sweeper.get_relevant_txids()
1086 }
1087}
1088
1089impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
1090 ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeperSync<B, D, E, F, K, L, O>)
1091where
1092 B::Target: BroadcasterInterface,
1093 D::Target: ChangeDestinationSourceSync,
1094 E::Target: FeeEstimator,
1095 F::Target: Filter + Sync + Send,
1096 K::Target: KVStoreSync,
1097 L::Target: Logger,
1098 O::Target: OutputSpender,
1099{
1100 #[inline]
1101 fn read<R: io::Read>(
1102 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
1103 ) -> Result<Self, DecodeError> {
1104 let (a, b, c, d, change_destination_source, kv_store, e) = args;
1105 let change_destination_source =
1106 ChangeDestinationSourceSyncWrapper::new(change_destination_source);
1107 let kv_store = KVStoreSyncWrapper(kv_store);
1108 let args = (a, b, c, d, change_destination_source, kv_store, e);
1109 let (best_block, sweeper) =
1110 <(BestBlock, OutputSweeper<_, _, _, _, _, _, _>)>::read(reader, args)?;
1111 Ok((best_block, OutputSweeperSync { sweeper }))
1112 }
1113}