lightning/util/
sweep.rs

1// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4// You may not use this file except in accordance with one or both of these
5// licenses.
6
7//! This module contains an [`OutputSweeper`] utility that keeps track of
8//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStoreSync`] and regularly retries
9//! sweeping them.
10
11use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS};
13use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14use crate::io;
15use crate::ln::msgs::DecodeError;
16use crate::ln::types::ChannelId;
17use crate::prelude::*;
18use crate::sign::{
19	ChangeDestinationSource, ChangeDestinationSourceSync, ChangeDestinationSourceSyncWrapper,
20	OutputSpender, SpendableOutputDescriptor,
21};
22use crate::sync::Mutex;
23use crate::util::logger::Logger;
24use crate::util::persist::{
25	KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY,
26	OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
27};
28use crate::util::ser::{Readable, ReadableArgs, Writeable};
29use crate::{impl_writeable_tlv_based, log_debug, log_error};
30
31use bitcoin::block::Header;
32use bitcoin::locktime::absolute::LockTime;
33use bitcoin::secp256k1::Secp256k1;
34use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid};
35
36use core::future::Future;
37use core::ops::Deref;
38use core::sync::atomic::{AtomicBool, Ordering};
39use core::task;
40
41use super::async_poll::{dummy_waker, AsyncResult};
42
43/// The number of blocks we wait before we prune the tracked spendable outputs.
44pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY;
45
46/// The state of a spendable output currently tracked by an [`OutputSweeper`].
47#[derive(Clone, Debug, PartialEq, Eq)]
48pub struct TrackedSpendableOutput {
49	/// The tracked output descriptor.
50	pub descriptor: SpendableOutputDescriptor,
51	/// The channel this output belongs to.
52	///
53	/// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`]
54	pub channel_id: Option<ChannelId>,
55	/// The current status of the output spend.
56	pub status: OutputSpendStatus,
57}
58
59impl TrackedSpendableOutput {
60	fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
61		let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
62		match &self.descriptor {
63			SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
64				WatchedOutput {
65					block_hash,
66					outpoint: *outpoint,
67					script_pubkey: output.script_pubkey.clone(),
68				}
69			},
70			SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
71				block_hash,
72				outpoint: output.outpoint,
73				script_pubkey: output.output.script_pubkey.clone(),
74			},
75			SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
76				block_hash,
77				outpoint: output.outpoint,
78				script_pubkey: output.output.script_pubkey.clone(),
79			},
80		}
81	}
82
83	/// Returns whether the output is spent in the given transaction.
84	pub fn is_spent_in(&self, tx: &Transaction) -> bool {
85		let prev_outpoint = self.descriptor.spendable_outpoint().into_bitcoin_outpoint();
86		tx.input.iter().any(|input| input.previous_output == prev_outpoint)
87	}
88}
89
90impl_writeable_tlv_based!(TrackedSpendableOutput, {
91	(0, descriptor, required),
92	(2, channel_id, option),
93	(4, status, required),
94});
95
96/// The current status of the output spend.
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum OutputSpendStatus {
99	/// The output is tracked but an initial spending transaction hasn't been generated and
100	/// broadcasted yet.
101	PendingInitialBroadcast {
102		/// The height at which we will first generate and broadcast a spending transaction.
103		delayed_until_height: Option<u32>,
104	},
105	/// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain.
106	PendingFirstConfirmation {
107		/// The hash of the chain tip when we first broadcast a transaction spending this output.
108		first_broadcast_hash: BlockHash,
109		/// The best height when we last broadcast a transaction spending this output.
110		latest_broadcast_height: u32,
111		/// The transaction spending this output we last broadcasted.
112		latest_spending_tx: Transaction,
113	},
114	/// A transaction spending the output has been confirmed on-chain but will be tracked until it
115	/// reaches at least [`PRUNE_DELAY_BLOCKS`] confirmations to ensure [`Event::SpendableOutputs`]
116	/// stemming from lingering [`ChannelMonitor`]s can safely be replayed.
117	///
118	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
119	/// [`ChannelMonitor`]: crate::chain::channelmonitor::ChannelMonitor
120	PendingThresholdConfirmations {
121		/// The hash of the chain tip when we first broadcast a transaction spending this output.
122		first_broadcast_hash: BlockHash,
123		/// The best height when we last broadcast a transaction spending this output.
124		latest_broadcast_height: u32,
125		/// The transaction spending this output we saw confirmed on-chain.
126		latest_spending_tx: Transaction,
127		/// The height at which the spending transaction was confirmed.
128		confirmation_height: u32,
129		/// The hash of the block in which the spending transaction was confirmed.
130		confirmation_hash: BlockHash,
131	},
132}
133
134impl OutputSpendStatus {
135	fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
136		match self {
137			Self::PendingInitialBroadcast { delayed_until_height } => {
138				if let Some(delayed_until_height) = delayed_until_height {
139					debug_assert!(
140						cur_height >= *delayed_until_height,
141						"We should never broadcast before the required height is reached."
142					);
143				}
144				*self = Self::PendingFirstConfirmation {
145					first_broadcast_hash: cur_hash,
146					latest_broadcast_height: cur_height,
147					latest_spending_tx,
148				};
149			},
150			Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
151				*self = Self::PendingFirstConfirmation {
152					first_broadcast_hash: *first_broadcast_hash,
153					latest_broadcast_height: cur_height,
154					latest_spending_tx,
155				};
156			},
157			Self::PendingThresholdConfirmations { .. } => {
158				debug_assert!(false, "We should never rebroadcast confirmed transactions.");
159			},
160		}
161	}
162
163	fn confirmed(
164		&mut self, confirmation_hash: BlockHash, confirmation_height: u32,
165		latest_spending_tx: Transaction,
166	) {
167		match self {
168			Self::PendingInitialBroadcast { .. } => {
169				// Generally we can't see any of our transactions confirmed if they haven't been
170				// broadcasted yet, so this should never be reachable via `transactions_confirmed`.
171				debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
172				*self = Self::PendingThresholdConfirmations {
173					first_broadcast_hash: confirmation_hash,
174					latest_broadcast_height: confirmation_height,
175					latest_spending_tx,
176					confirmation_height,
177					confirmation_hash,
178				};
179			},
180			Self::PendingFirstConfirmation {
181				first_broadcast_hash,
182				latest_broadcast_height,
183				..
184			} => {
185				*self = Self::PendingThresholdConfirmations {
186					first_broadcast_hash: *first_broadcast_hash,
187					latest_broadcast_height: *latest_broadcast_height,
188					latest_spending_tx,
189					confirmation_height,
190					confirmation_hash,
191				};
192			},
193			Self::PendingThresholdConfirmations {
194				first_broadcast_hash,
195				latest_broadcast_height,
196				..
197			} => {
198				*self = Self::PendingThresholdConfirmations {
199					first_broadcast_hash: *first_broadcast_hash,
200					latest_broadcast_height: *latest_broadcast_height,
201					latest_spending_tx,
202					confirmation_height,
203					confirmation_hash,
204				};
205			},
206		}
207	}
208
209	fn unconfirmed(&mut self) {
210		match self {
211			Self::PendingInitialBroadcast { .. } => {
212				debug_assert!(
213					false,
214					"We should only mark a spend as unconfirmed if it used to be confirmed."
215				);
216			},
217			Self::PendingFirstConfirmation { .. } => {
218				debug_assert!(
219					false,
220					"We should only mark a spend as unconfirmed if it used to be confirmed."
221				);
222			},
223			Self::PendingThresholdConfirmations {
224				first_broadcast_hash,
225				latest_broadcast_height,
226				latest_spending_tx,
227				..
228			} => {
229				*self = Self::PendingFirstConfirmation {
230					first_broadcast_hash: *first_broadcast_hash,
231					latest_broadcast_height: *latest_broadcast_height,
232					latest_spending_tx: latest_spending_tx.clone(),
233				};
234			},
235		}
236	}
237
238	fn is_delayed(&self, cur_height: u32) -> bool {
239		match self {
240			Self::PendingInitialBroadcast { delayed_until_height } => {
241				delayed_until_height.map_or(false, |req_height| cur_height < req_height)
242			},
243			Self::PendingFirstConfirmation { .. } => false,
244			Self::PendingThresholdConfirmations { .. } => false,
245		}
246	}
247
248	fn first_broadcast_hash(&self) -> Option<BlockHash> {
249		match self {
250			Self::PendingInitialBroadcast { .. } => None,
251			Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
252				Some(*first_broadcast_hash)
253			},
254			Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
255				Some(*first_broadcast_hash)
256			},
257		}
258	}
259
260	fn latest_broadcast_height(&self) -> Option<u32> {
261		match self {
262			Self::PendingInitialBroadcast { .. } => None,
263			Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
264				Some(*latest_broadcast_height)
265			},
266			Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
267				Some(*latest_broadcast_height)
268			},
269		}
270	}
271
272	fn confirmation_height(&self) -> Option<u32> {
273		match self {
274			Self::PendingInitialBroadcast { .. } => None,
275			Self::PendingFirstConfirmation { .. } => None,
276			Self::PendingThresholdConfirmations { confirmation_height, .. } => {
277				Some(*confirmation_height)
278			},
279		}
280	}
281
282	fn latest_spending_tx(&self) -> Option<&Transaction> {
283		match self {
284			Self::PendingInitialBroadcast { .. } => None,
285			Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
286			Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
287				Some(latest_spending_tx)
288			},
289		}
290	}
291
292	fn is_confirmed(&self) -> bool {
293		match self {
294			Self::PendingInitialBroadcast { .. } => false,
295			Self::PendingFirstConfirmation { .. } => false,
296			Self::PendingThresholdConfirmations { .. } => true,
297		}
298	}
299}
300
301impl_writeable_tlv_based_enum!(OutputSpendStatus,
302	(0, PendingInitialBroadcast) => {
303		(0, delayed_until_height, option),
304	},
305	(2, PendingFirstConfirmation) => {
306		(0, first_broadcast_hash, required),
307		(2, latest_broadcast_height, required),
308		(4, latest_spending_tx, required),
309	},
310	(4, PendingThresholdConfirmations) => {
311		(0, first_broadcast_hash, required),
312		(2, latest_broadcast_height, required),
313		(4, latest_spending_tx, required),
314		(6, confirmation_height, required),
315		(8, confirmation_hash, required),
316	},
317);
318
319/// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
320/// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor
321/// methods.
322///
323/// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s
324/// received via [`Event::SpendableOutputs`].
325///
326/// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
327/// implementation and hence has to be connected with the utilized chain data sources.
328///
329/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
330/// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
331/// constructor.
332///
333/// For a synchronous version of this struct, see [`OutputSweeperSync`].
334///
335/// This is not exported to bindings users as async is not supported outside of Rust.
336///
337/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
338// Note that updates to documentation on this struct should be copied to the synchronous version.
339pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
340where
341	B::Target: BroadcasterInterface,
342	D::Target: ChangeDestinationSource,
343	E::Target: FeeEstimator,
344	F::Target: Filter,
345	K::Target: KVStore,
346	L::Target: Logger,
347	O::Target: OutputSpender,
348{
349	sweeper_state: Mutex<SweeperState>,
350	pending_sweep: AtomicBool,
351	broadcaster: B,
352	fee_estimator: E,
353	chain_data_source: Option<F>,
354	output_spender: O,
355	change_destination_source: D,
356	kv_store: K,
357	logger: L,
358}
359
360impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
361	OutputSweeper<B, D, E, F, K, L, O>
362where
363	B::Target: BroadcasterInterface,
364	D::Target: ChangeDestinationSource,
365	E::Target: FeeEstimator,
366	F::Target: Filter,
367	K::Target: KVStore,
368	L::Target: Logger,
369	O::Target: OutputSpender,
370{
371	/// Constructs a new [`OutputSweeper`].
372	///
373	/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
374	/// need to register their [`Filter`] implementation via the given `chain_data_source`.
375	pub fn new(
376		best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
377		output_spender: O, change_destination_source: D, kv_store: K, logger: L,
378	) -> Self {
379		let outputs = Vec::new();
380		let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
381		Self {
382			sweeper_state,
383			pending_sweep: AtomicBool::new(false),
384			broadcaster,
385			fee_estimator,
386			chain_data_source,
387			output_spender,
388			change_destination_source,
389			kv_store,
390			logger,
391		}
392	}
393
394	/// Tells the sweeper to track the given outputs descriptors.
395	///
396	/// Usually, this should be called based on the values emitted by the
397	/// [`Event::SpendableOutputs`].
398	///
399	/// The given `exclude_static_outputs` flag controls whether the sweeper will filter out
400	/// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
401	/// wallet implementation.
402	///
403	/// If `delay_until_height` is set, we will delay the spending until the respective block
404	/// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees.
405	///
406	/// Returns `Err` on persistence failure, in which case the call may be safely retried.
407	///
408	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
409	pub async fn track_spendable_outputs(
410		&self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
411		exclude_static_outputs: bool, delay_until_height: Option<u32>,
412	) -> Result<(), ()> {
413		let mut relevant_descriptors = output_descriptors
414			.into_iter()
415			.filter(|desc| {
416				!(exclude_static_outputs
417					&& matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
418			})
419			.peekable();
420
421		if relevant_descriptors.peek().is_none() {
422			return Ok(());
423		}
424
425		self.update_state(|state_lock| -> Result<((), bool), ()> {
426			for descriptor in relevant_descriptors {
427				let output_info = TrackedSpendableOutput {
428					descriptor,
429					channel_id,
430					status: OutputSpendStatus::PendingInitialBroadcast {
431						delayed_until_height: delay_until_height,
432					},
433				};
434
435				if state_lock
436					.outputs
437					.iter()
438					.find(|o| o.descriptor == output_info.descriptor)
439					.is_some()
440				{
441					continue;
442				}
443
444				state_lock.outputs.push(output_info);
445				state_lock.dirty = true;
446			}
447
448			Ok(((), false))
449		})
450		.await
451	}
452
453	/// Returns a list of the currently tracked spendable outputs.
454	pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
455		self.sweeper_state.lock().unwrap().outputs.clone()
456	}
457
458	/// Gets the latest best block which was connected either via the [`Listen`] or
459	/// [`Confirm`] interfaces.
460	pub fn current_best_block(&self) -> BestBlock {
461		self.sweeper_state.lock().unwrap().best_block
462	}
463
464	/// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a
465	/// no-op if a sweep is already pending.
466	pub async fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
467		// Prevent concurrent sweeps.
468		if self
469			.pending_sweep
470			.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
471			.is_err()
472		{
473			return Ok(());
474		}
475
476		let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
477
478		// Release the pending sweep flag again, regardless of result.
479		self.pending_sweep.store(false, Ordering::Release);
480
481		result
482	}
483
484	/// Regenerates and broadcasts the spending transaction for any outputs that are pending
485	async fn regenerate_and_broadcast_spend_if_necessary_internal(&self) -> Result<(), ()> {
486		let filter_fn = |o: &TrackedSpendableOutput, cur_height: u32| {
487			if o.status.is_confirmed() {
488				// Don't rebroadcast confirmed txs.
489				return false;
490			}
491
492			if o.status.is_delayed(cur_height) {
493				// Don't generate and broadcast if still delayed
494				return false;
495			}
496
497			if o.status.latest_broadcast_height() >= Some(cur_height) {
498				// Only broadcast once per block height.
499				return false;
500			}
501
502			true
503		};
504
505		// See if there is anything to sweep before requesting a change address.
506		let has_respends = self
507			.update_state(|sweeper_state| -> Result<(bool, bool), ()> {
508				let cur_height = sweeper_state.best_block.height;
509				let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
510
511				// If there are respends, we can postpone persisting a potentially dirty state until after the sweep.
512				Ok((has_respends, has_respends))
513			})
514			.await?;
515
516		if !has_respends {
517			return Ok(());
518		}
519
520		// Request a new change address outside of the mutex to avoid the mutex crossing await.
521		let change_destination_script =
522			self.change_destination_source.get_change_destination_script().await?;
523
524		// Sweep the outputs.
525		let spending_tx = self
526			.update_state(|sweeper_state| -> Result<(Option<Transaction>, bool), ()> {
527				let cur_height = sweeper_state.best_block.height;
528				let cur_hash = sweeper_state.best_block.block_hash;
529
530				let respend_descriptors_set: HashSet<&SpendableOutputDescriptor> = sweeper_state
531					.outputs
532					.iter()
533					.filter(|o| filter_fn(*o, cur_height))
534					.map(|o| &o.descriptor)
535					.collect();
536
537				// we first collect into a set to avoid duplicates and to "randomize" the order
538				// in which outputs are spent. Then we collect into a vec as that is what
539				// `spend_outputs` requires.
540				let respend_descriptors: Vec<&SpendableOutputDescriptor> =
541					respend_descriptors_set.into_iter().collect();
542
543				// Generate the spending transaction and broadcast it.
544				if !respend_descriptors.is_empty() {
545					let spending_tx = self
546						.spend_outputs(
547							&sweeper_state,
548							&respend_descriptors,
549							change_destination_script,
550						)
551						.map_err(|e| {
552							log_error!(self.logger, "Error spending outputs: {:?}", e);
553						})?;
554
555					log_debug!(
556						self.logger,
557						"Generating and broadcasting sweeping transaction {}",
558						spending_tx.compute_txid()
559					);
560
561					// As we didn't modify the state so far, the same filter_fn yields the same elements as
562					// above.
563					let respend_outputs =
564						sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
565					for output_info in respend_outputs {
566						if let Some(filter) = self.chain_data_source.as_ref() {
567							let watched_output = output_info.to_watched_output(cur_hash);
568							filter.register_output(watched_output);
569						}
570
571						output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
572						sweeper_state.dirty = true;
573					}
574
575					Ok((Some(spending_tx), false))
576				} else {
577					Ok((None, false))
578				}
579			})
580			.await?;
581
582		// Persistence completely successfully. If we have a spending transaction, we broadcast it.
583		if let Some(spending_tx) = spending_tx {
584			self.broadcaster.broadcast_transactions(&[&spending_tx]);
585		}
586
587		Ok(())
588	}
589
590	fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
591		let cur_height = sweeper_state.best_block.height;
592
593		// Prune all outputs that have sufficient depth by now.
594		sweeper_state.outputs.retain(|o| {
595			if let Some(confirmation_height) = o.status.confirmation_height() {
596				// We wait at least `PRUNE_DELAY_BLOCKS` as before that
597				// `Event::SpendableOutputs` from lingering monitors might get replayed.
598				if cur_height >= confirmation_height + PRUNE_DELAY_BLOCKS - 1 {
599					log_debug!(self.logger,
600						"Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
601						o.status.latest_spending_tx().map(|t| t.compute_txid()), o.descriptor
602					);
603					return false;
604				}
605			}
606			true
607		});
608
609		sweeper_state.dirty = true;
610	}
611
612	fn persist_state<'a>(&self, sweeper_state: &SweeperState) -> AsyncResult<'a, (), io::Error> {
613		let encoded = sweeper_state.encode();
614
615		self.kv_store.write(
616			OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
617			OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
618			OUTPUT_SWEEPER_PERSISTENCE_KEY,
619			encoded,
620		)
621	}
622
623	/// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty,
624	/// unless skip_persist is true. Returning true for skip_persist allows the callback to postpone persisting a
625	/// potentially dirty state.
626	async fn update_state<X>(
627		&self, callback: impl FnOnce(&mut SweeperState) -> Result<(X, bool), ()>,
628	) -> Result<X, ()> {
629		let (fut, res) = {
630			let mut state_lock = self.sweeper_state.lock().unwrap();
631
632			let (res, skip_persist) = callback(&mut state_lock)?;
633			if !state_lock.dirty || skip_persist {
634				return Ok(res);
635			}
636
637			state_lock.dirty = false;
638
639			(self.persist_state(&state_lock), res)
640		};
641
642		fut.await.map_err(|e| {
643			self.sweeper_state.lock().unwrap().dirty = true;
644
645			log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
646		})?;
647
648		Ok(res)
649	}
650
651	fn spend_outputs(
652		&self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
653		change_destination_script: ScriptBuf,
654	) -> Result<Transaction, ()> {
655		let tx_feerate =
656			self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
657		let cur_height = sweeper_state.best_block.height;
658		let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
659		self.output_spender.spend_spendable_outputs(
660			descriptors,
661			Vec::new(),
662			change_destination_script,
663			tx_feerate,
664			locktime,
665			&Secp256k1::new(),
666		)
667	}
668
669	fn transactions_confirmed_internal(
670		&self, sweeper_state: &mut SweeperState, header: &Header,
671		txdata: &chain::transaction::TransactionData, height: u32,
672	) {
673		let confirmation_hash = header.block_hash();
674		for (_, tx) in txdata {
675			for output_info in sweeper_state.outputs.iter_mut() {
676				if output_info.is_spent_in(*tx) {
677					output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
678				}
679			}
680		}
681
682		sweeper_state.dirty = true;
683	}
684
685	fn best_block_updated_internal(
686		&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
687	) {
688		sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
689		self.prune_confirmed_outputs(sweeper_state);
690
691		sweeper_state.dirty = true;
692	}
693}
694
695impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
696	for OutputSweeper<B, D, E, F, K, L, O>
697where
698	B::Target: BroadcasterInterface,
699	D::Target: ChangeDestinationSource,
700	E::Target: FeeEstimator,
701	F::Target: Filter + Sync + Send,
702	K::Target: KVStore,
703	L::Target: Logger,
704	O::Target: OutputSpender,
705{
706	fn filtered_block_connected(
707		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
708	) {
709		let mut state_lock = self.sweeper_state.lock().unwrap();
710		assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
711			"Blocks must be connected in chain-order - the connected header must build on the last connected header");
712		assert_eq!(state_lock.best_block.height, height - 1,
713			"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
714
715		self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
716		self.best_block_updated_internal(&mut state_lock, header, height);
717	}
718
719	fn blocks_disconnected(&self, fork_point: BestBlock) {
720		let mut state_lock = self.sweeper_state.lock().unwrap();
721
722		assert!(state_lock.best_block.height > fork_point.height,
723			"Blocks disconnected must indicate disconnection from the current best height, i.e. the new chain tip must be lower than the previous best height");
724		state_lock.best_block = fork_point;
725
726		for output_info in state_lock.outputs.iter_mut() {
727			if output_info.status.confirmation_height() > Some(fork_point.height) {
728				output_info.status.unconfirmed();
729			}
730		}
731
732		state_lock.dirty = true;
733	}
734}
735
736impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
737	for OutputSweeper<B, D, E, F, K, L, O>
738where
739	B::Target: BroadcasterInterface,
740	D::Target: ChangeDestinationSource,
741	E::Target: FeeEstimator,
742	F::Target: Filter + Sync + Send,
743	K::Target: KVStore,
744	L::Target: Logger,
745	O::Target: OutputSpender,
746{
747	fn transactions_confirmed(
748		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
749	) {
750		let mut state_lock = self.sweeper_state.lock().unwrap();
751		self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
752	}
753
754	fn transaction_unconfirmed(&self, txid: &Txid) {
755		let mut state_lock = self.sweeper_state.lock().unwrap();
756
757		// Get what height was unconfirmed.
758		let unconf_height = state_lock
759			.outputs
760			.iter()
761			.find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
762			.and_then(|o| o.status.confirmation_height());
763
764		if let Some(unconf_height) = unconf_height {
765			// Unconfirm all >= this height.
766			state_lock
767				.outputs
768				.iter_mut()
769				.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
770				.for_each(|o| o.status.unconfirmed());
771
772			state_lock.dirty = true;
773		}
774	}
775
776	fn best_block_updated(&self, header: &Header, height: u32) {
777		let mut state_lock = self.sweeper_state.lock().unwrap();
778		self.best_block_updated_internal(&mut state_lock, header, height);
779	}
780
781	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
782		let state_lock = self.sweeper_state.lock().unwrap();
783		state_lock
784			.outputs
785			.iter()
786			.filter_map(|o| match o.status {
787				OutputSpendStatus::PendingThresholdConfirmations {
788					ref latest_spending_tx,
789					confirmation_height,
790					confirmation_hash,
791					..
792				} => Some((
793					latest_spending_tx.compute_txid(),
794					confirmation_height,
795					Some(confirmation_hash),
796				)),
797				_ => None,
798			})
799			.collect::<Vec<_>>()
800	}
801}
802
803#[derive(Debug, Clone)]
804struct SweeperState {
805	outputs: Vec<TrackedSpendableOutput>,
806	best_block: BestBlock,
807	dirty: bool,
808}
809
810impl_writeable_tlv_based!(SweeperState, {
811	(0, outputs, required_vec),
812	(2, best_block, required),
813	(_unused, dirty, (static_value, false)),
814});
815
816/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
817/// future block height is reached.
818#[derive(Debug, Clone)]
819pub enum SpendingDelay {
820	/// A relative delay indicating we shouldn't spend the output before `cur_height + num_blocks`
821	/// is reached.
822	Relative {
823		/// The number of blocks until we'll generate and broadcast the spending transaction.
824		num_blocks: u32,
825	},
826	/// An absolute delay indicating we shouldn't spend the output before `height` is reached.
827	Absolute {
828		/// The height at which we'll generate and broadcast the spending transaction.
829		height: u32,
830	},
831}
832
833impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
834	ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
835where
836	B::Target: BroadcasterInterface,
837	D::Target: ChangeDestinationSource,
838	E::Target: FeeEstimator,
839	F::Target: Filter + Sync + Send,
840	K::Target: KVStore,
841	L::Target: Logger,
842	O::Target: OutputSpender,
843{
844	#[inline]
845	fn read<R: io::Read>(
846		reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
847	) -> Result<Self, DecodeError> {
848		let (
849			broadcaster,
850			fee_estimator,
851			chain_data_source,
852			output_spender,
853			change_destination_source,
854			kv_store,
855			logger,
856		) = args;
857		let state = SweeperState::read(reader)?;
858		let best_block = state.best_block;
859
860		if let Some(filter) = chain_data_source.as_ref() {
861			for output_info in &state.outputs {
862				let watched_output = output_info.to_watched_output(best_block.block_hash);
863				filter.register_output(watched_output);
864			}
865		}
866
867		let sweeper_state = Mutex::new(state);
868		Ok((
869			best_block,
870			OutputSweeper {
871				sweeper_state,
872				pending_sweep: AtomicBool::new(false),
873				broadcaster,
874				fee_estimator,
875				chain_data_source,
876				output_spender,
877				change_destination_source,
878				kv_store,
879				logger,
880			},
881		))
882	}
883}
884
885/// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
886/// [`KVStoreSync`] and regularly retries sweeping them based on a callback given to the constructor
887/// methods.
888///
889/// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s
890/// received via [`Event::SpendableOutputs`].
891///
892/// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
893/// implementation and hence has to be connected with the utilized chain data sources.
894///
895/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
896/// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
897/// constructor.
898///
899/// For an asynchronous version of this struct, see [`OutputSweeper`].
900///
901/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
902// Note that updates to documentation on this struct should be copied to the asynchronous version.
903pub struct OutputSweeperSync<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
904where
905	B::Target: BroadcasterInterface,
906	D::Target: ChangeDestinationSourceSync,
907	E::Target: FeeEstimator,
908	F::Target: Filter,
909	K::Target: KVStoreSync,
910	L::Target: Logger,
911	O::Target: OutputSpender,
912{
913	sweeper:
914		OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>,
915}
916
917impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
918	OutputSweeperSync<B, D, E, F, K, L, O>
919where
920	B::Target: BroadcasterInterface,
921	D::Target: ChangeDestinationSourceSync,
922	E::Target: FeeEstimator,
923	F::Target: Filter,
924	K::Target: KVStoreSync,
925	L::Target: Logger,
926	O::Target: OutputSpender,
927{
928	/// Constructs a new [`OutputSweeperSync`] instance.
929	///
930	/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
931	/// need to register their [`Filter`] implementation via the given `chain_data_source`.
932	pub fn new(
933		best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
934		output_spender: O, change_destination_source: D, kv_store: K, logger: L,
935	) -> Self {
936		let change_destination_source =
937			ChangeDestinationSourceSyncWrapper::new(change_destination_source);
938
939		let kv_store = KVStoreSyncWrapper(kv_store);
940
941		let sweeper = OutputSweeper::new(
942			best_block,
943			broadcaster,
944			fee_estimator,
945			chain_data_source,
946			output_spender,
947			change_destination_source,
948			kv_store,
949			logger,
950		);
951		Self { sweeper }
952	}
953
954	/// Tells the sweeper to track the given outputs descriptors.
955	///
956	/// Usually, this should be called based on the values emitted by the
957	/// [`Event::SpendableOutputs`].
958	///
959	/// The given `exclude_static_outputs` flag controls whether the sweeper will filter out
960	/// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
961	/// wallet implementation.
962	///
963	/// If `delay_until_height` is set, we will delay the spending until the respective block
964	/// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees.
965	///
966	/// Returns `Err` on persistence failure, in which case the call may be safely retried.
967	///
968	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
969	pub fn track_spendable_outputs(
970		&self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
971		exclude_static_outputs: bool, delay_until_height: Option<u32>,
972	) -> Result<(), ()> {
973		let mut fut = Box::pin(self.sweeper.track_spendable_outputs(
974			output_descriptors,
975			channel_id,
976			exclude_static_outputs,
977			delay_until_height,
978		));
979		let mut waker = dummy_waker();
980		let mut ctx = task::Context::from_waker(&mut waker);
981		match fut.as_mut().poll(&mut ctx) {
982			task::Poll::Ready(result) => result,
983			task::Poll::Pending => {
984				// In a sync context, we can't wait for the future to complete.
985				unreachable!("OutputSweeper::track_spendable_outputs should not be pending in a sync context");
986			},
987		}
988	}
989
990	/// Returns a list of the currently tracked spendable outputs.
991	///
992	/// Wraps [`OutputSweeper::tracked_spendable_outputs`].
993	pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
994		self.sweeper.tracked_spendable_outputs()
995	}
996
997	/// Gets the latest best block which was connected either via [`Listen`] or [`Confirm`]
998	/// interfaces.
999	pub fn current_best_block(&self) -> BestBlock {
1000		self.sweeper.current_best_block()
1001	}
1002
1003	/// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a
1004	/// no-op if a sweep is already pending.
1005	///
1006	/// Wraps [`OutputSweeper::regenerate_and_broadcast_spend_if_necessary`].
1007	pub fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
1008		let mut fut = Box::pin(self.sweeper.regenerate_and_broadcast_spend_if_necessary());
1009		let mut waker = dummy_waker();
1010		let mut ctx = task::Context::from_waker(&mut waker);
1011		match fut.as_mut().poll(&mut ctx) {
1012			task::Poll::Ready(result) => result,
1013			task::Poll::Pending => {
1014				// In a sync context, we can't wait for the future to complete.
1015				unreachable!("OutputSweeper::regenerate_and_broadcast_spend_if_necessary should not be pending in a sync context");
1016			},
1017		}
1018	}
1019
1020	/// Fetch the inner async sweeper.
1021	///
1022	/// In general you shouldn't have much reason to use this - you have a sync [`KVStore`] backing
1023	/// this [`OutputSweeperSync`], fetching an async [`OutputSweeper`] won't accomplish much, all
1024	/// the async methods will hang waiting on your sync [`KVStore`] and likely confuse your async
1025	/// runtime. This exists primarily for LDK-internal use, including outside of this crate.
1026	///
1027	/// This is not exported to bindings users as async is not supported outside of Rust.
1028	#[doc(hidden)]
1029	pub fn sweeper_async(
1030		&self,
1031	) -> &OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>
1032	{
1033		&self.sweeper
1034	}
1035}
1036
1037impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
1038	for OutputSweeperSync<B, D, E, F, K, L, O>
1039where
1040	B::Target: BroadcasterInterface,
1041	D::Target: ChangeDestinationSourceSync,
1042	E::Target: FeeEstimator,
1043	F::Target: Filter + Sync + Send,
1044	K::Target: KVStoreSync,
1045	L::Target: Logger,
1046	O::Target: OutputSpender,
1047{
1048	fn filtered_block_connected(
1049		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1050	) {
1051		self.sweeper.filtered_block_connected(header, txdata, height);
1052	}
1053
1054	fn blocks_disconnected(&self, fork_point: BestBlock) {
1055		self.sweeper.blocks_disconnected(fork_point);
1056	}
1057}
1058
1059impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
1060	for OutputSweeperSync<B, D, E, F, K, L, O>
1061where
1062	B::Target: BroadcasterInterface,
1063	D::Target: ChangeDestinationSourceSync,
1064	E::Target: FeeEstimator,
1065	F::Target: Filter + Sync + Send,
1066	K::Target: KVStoreSync,
1067	L::Target: Logger,
1068	O::Target: OutputSpender,
1069{
1070	fn transactions_confirmed(
1071		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1072	) {
1073		self.sweeper.transactions_confirmed(header, txdata, height)
1074	}
1075
1076	fn transaction_unconfirmed(&self, txid: &Txid) {
1077		self.sweeper.transaction_unconfirmed(txid)
1078	}
1079
1080	fn best_block_updated(&self, header: &Header, height: u32) {
1081		self.sweeper.best_block_updated(header, height);
1082	}
1083
1084	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
1085		self.sweeper.get_relevant_txids()
1086	}
1087}
1088
1089impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
1090	ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeperSync<B, D, E, F, K, L, O>)
1091where
1092	B::Target: BroadcasterInterface,
1093	D::Target: ChangeDestinationSourceSync,
1094	E::Target: FeeEstimator,
1095	F::Target: Filter + Sync + Send,
1096	K::Target: KVStoreSync,
1097	L::Target: Logger,
1098	O::Target: OutputSpender,
1099{
1100	#[inline]
1101	fn read<R: io::Read>(
1102		reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
1103	) -> Result<Self, DecodeError> {
1104		let (a, b, c, d, change_destination_source, kv_store, e) = args;
1105		let change_destination_source =
1106			ChangeDestinationSourceSyncWrapper::new(change_destination_source);
1107		let kv_store = KVStoreSyncWrapper(kv_store);
1108		let args = (a, b, c, d, change_destination_source, kv_store, e);
1109		let (best_block, sweeper) =
1110			<(BestBlock, OutputSweeper<_, _, _, _, _, _, _>)>::read(reader, args)?;
1111		Ok((best_block, OutputSweeperSync { sweeper }))
1112	}
1113}