lightning/chain/
chainmonitor.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Logic to connect off-chain channel management with on-chain transaction monitoring.
11//!
12//! [`ChainMonitor`] is an implementation of [`chain::Watch`] used both to process blocks and to
13//! update [`ChannelMonitor`]s accordingly. If any on-chain events need further processing, it will
14//! make those available as [`MonitorEvent`]s to be consumed.
15//!
16//! [`ChainMonitor`] is parameterized by an optional chain source, which must implement the
17//! [`chain::Filter`] trait. This provides a mechanism to signal new relevant outputs back to light
18//! clients, such that transactions spending those outputs are included in block data.
19//!
20//! [`ChainMonitor`] may be used directly to monitor channels locally or as a part of a distributed
21//! setup to monitor channels remotely. In the latter case, a custom [`chain::Watch`] implementation
22//! would be responsible for routing each update to a remote server and for retrieving monitor
23//! events. The remote server would make use of [`ChainMonitor`] for block processing and for
24//! servicing [`ChannelMonitor`] updates from the client.
25
26use bitcoin::block::Header;
27use bitcoin::hash_types::{BlockHash, Txid};
28
29use bitcoin::secp256k1::PublicKey;
30
31use crate::chain;
32use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
33#[cfg(peer_storage)]
34use crate::chain::channelmonitor::write_chanmon_internal;
35use crate::chain::channelmonitor::{
36	Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs,
37	WithChannelMonitor,
38};
39use crate::chain::transaction::{OutPoint, TransactionData};
40use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
41use crate::events::{self, Event, EventHandler, ReplayEvent};
42use crate::ln::channel_state::ChannelDetails;
43#[cfg(peer_storage)]
44use crate::ln::msgs::PeerStorage;
45use crate::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
46#[cfg(peer_storage)]
47use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder};
48use crate::ln::types::ChannelId;
49use crate::prelude::*;
50use crate::sign::ecdsa::EcdsaChannelSigner;
51use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
52use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
53use crate::types::features::{InitFeatures, NodeFeatures};
54use crate::util::async_poll::{MaybeSend, MaybeSync};
55use crate::util::errors::APIError;
56use crate::util::logger::{Logger, WithContext};
57use crate::util::native_async::FutureSpawner;
58use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
59#[cfg(peer_storage)]
60use crate::util::ser::{VecWriter, Writeable};
61use crate::util::wakers::{Future, Notifier};
62
63use alloc::sync::Arc;
64#[cfg(peer_storage)]
65use core::iter::Cycle;
66use core::ops::Deref;
67use core::sync::atomic::{AtomicUsize, Ordering};
68
69/// `Persist` defines behavior for persisting channel monitors: this could mean
70/// writing once to disk, and/or uploading to one or more backup services.
71///
72/// Persistence can happen in one of two ways - synchronously completing before the trait method
73/// calls return or asynchronously in the background.
74///
75/// # For those implementing synchronous persistence
76///
77///  * If persistence completes fully (including any relevant `fsync()` calls), the implementation
78///    should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation
79///    should continue.
80///
81///  * If persistence fails for some reason, implementations should consider returning
82///    [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in
83///    the background with [`ChainMonitor::list_pending_monitor_updates`] and
84///    [`ChainMonitor::get_monitor`].
85///
86///    Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can
87///    be marked as complete via [`ChainMonitor::channel_monitor_updated`].
88///
89///    If at some point no further progress can be made towards persisting the pending updates, the
90///    node should simply shut down.
91///
92///  * If the persistence has failed and cannot be retried further (e.g. because of an outage),
93///    [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in
94///    an immediate panic and future operations in LDK generally failing.
95///
96/// # For those implementing asynchronous persistence
97///
98///  All calls should generally spawn a background task and immediately return
99///  [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes,
100///  [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding
101///  [`ChannelMonitor::get_latest_update_id`] or [`ChannelMonitorUpdate::update_id`].
102///
103///  Note that unlike the direct [`chain::Watch`] interface,
104///  [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
105///
106///  If at some point no further progress can be made towards persisting a pending update, the node
107///  should simply shut down. Until then, the background task should either loop indefinitely, or
108///  persistence should be regularly retried with [`ChainMonitor::list_pending_monitor_updates`]
109///  and [`ChainMonitor::get_monitor`] (note that if a full monitor is persisted all pending
110///  monitor updates may be marked completed).
111///
112/// # Using remote watchtowers
113///
114/// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async
115/// update process described above while the watchtower is being updated. The following methods are
116/// provided for bulding transactions for a watchtower:
117/// [`ChannelMonitor::initial_counterparty_commitment_tx`],
118/// [`ChannelMonitor::counterparty_commitment_txs_from_update`],
119/// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`],
120/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`].
121///
122/// [`TrustedCommitmentTransaction::revokeable_output_index`]: crate::ln::chan_utils::TrustedCommitmentTransaction::revokeable_output_index
123/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`]: crate::ln::chan_utils::TrustedCommitmentTransaction::build_to_local_justice_tx
124pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
125	/// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
126	/// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup,
127	/// with the `monitor_name` returned by [`ChannelMonitor::persistence_key`].
128	///
129	/// The data can be stored any way you want, so long as `monitor_name` is used to maintain a
130	/// correct mapping with the stored channel data (i.e., calls to `update_persisted_channel` with
131	/// the same `monitor_name` must be applied to or overwrite this data). Note that you **must**
132	/// persist every new monitor to disk.
133	///
134	/// The [`ChannelMonitor::get_latest_update_id`] uniquely links this call to [`ChainMonitor::channel_monitor_updated`].
135	/// For [`Persist::persist_new_channel`], it is only necessary to call [`ChainMonitor::channel_monitor_updated`]
136	/// when you return [`ChannelMonitorUpdateStatus::InProgress`].
137	///
138	/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
139	/// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
140	///
141	/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
142	/// [`Writeable::write`]: crate::util::ser::Writeable::write
143	fn persist_new_channel(
144		&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
145	) -> ChannelMonitorUpdateStatus;
146
147	/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
148	/// update.
149	///
150	/// Note that on every update, you **must** persist either the [`ChannelMonitorUpdate`] or the
151	/// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more
152	/// details.
153	///
154	/// During blockchain synchronization operations, and in some rare cases, this may be called with
155	/// no [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
156	/// Note that after the full [`ChannelMonitor`] is persisted any previous
157	/// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be
158	/// applied to the persisted [`ChannelMonitor`] as they were already applied.
159	///
160	/// If an implementer chooses to persist the updates only, they need to make
161	/// sure that all the updates are applied to the `ChannelMonitors` *before*
162	/// the set of channel monitors is given to the `ChannelManager`
163	/// deserialization routine. If there are any gaps in the persisted [`ChannelMonitorUpdate`]s,
164	/// implementer can safely ignore [`ChannelMonitorUpdate`]s after the gap and load without them.
165	/// See [`ChannelMonitor::update_monitor`] for
166	/// applying a monitor update to a monitor. If full `ChannelMonitors` are
167	/// persisted, then there is no need to persist individual updates.
168	///
169	/// Note that there could be a performance tradeoff between persisting complete
170	/// channel monitors on every update vs. persisting only updates and applying
171	/// them in batches. The size of each monitor grows `O(number of state updates)`
172	/// whereas updates are small and `O(1)`.
173	///
174	/// The [`ChannelMonitorUpdate::update_id`] or [`ChannelMonitor::get_latest_update_id`] uniquely
175	/// links this call to [`ChainMonitor::channel_monitor_updated`].
176	/// For [`Persist::update_persisted_channel`], it is only necessary to call [`ChainMonitor::channel_monitor_updated`]
177	/// when a [`ChannelMonitorUpdate`] is provided and when you return [`ChannelMonitorUpdateStatus::InProgress`].
178	///
179	/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
180	/// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
181	/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
182	///
183	/// [`Writeable::write`]: crate::util::ser::Writeable::write
184	fn update_persisted_channel(
185		&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
186		monitor: &ChannelMonitor<ChannelSigner>,
187	) -> ChannelMonitorUpdateStatus;
188	/// Prevents the channel monitor from being loaded on startup.
189	///
190	/// Archiving the data in a backup location (rather than deleting it fully) is useful for
191	/// hedging against data loss in case of unexpected failure.
192	///
193	/// Note that if a crash occurs during the archiving process, and its implementation is not
194	/// atomic, a state may emerge with the archival operation only being partially complete. In
195	/// that scenario, the monitor may still be loaded on startup pending successful completion of
196	/// the archive process. Additionally, because the archive operation could be retried on
197	/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
198	/// the monitor already exists in the archive.
199	fn archive_persisted_channel(&self, monitor_name: MonitorName);
200
201	/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
202	/// [`Self::update_persisted_channel`], which have completed.
203	///
204	/// Returning an update here is equivalent to calling
205	/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
206	/// hidden in the docs.
207	#[doc(hidden)]
208	fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209		Vec::new()
210	}
211}
212
213struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
214	monitor: ChannelMonitor<ChannelSigner>,
215	/// The full set of pending monitor updates for this Channel.
216	///
217	/// Note that this lock must be held from [`ChannelMonitor::update_monitor`] through to
218	/// [`Persist::update_persisted_channel`] to prevent a race where we call
219	/// [`Persist::update_persisted_channel`], the user returns a
220	/// [`ChannelMonitorUpdateStatus::InProgress`], and then calls
221	/// [`ChainMonitor::channel_monitor_updated`] immediately, racing our insertion of the pending
222	/// update into the contained Vec.
223	///
224	/// This also avoids a race where we update a [`ChannelMonitor`], then while connecting a block
225	/// persist a full [`ChannelMonitor`] prior to persisting the [`ChannelMonitorUpdate`]. This
226	/// could cause users to have a full [`ChannelMonitor`] on disk as well as a
227	/// [`ChannelMonitorUpdate`] which was already applied. While this isn't an issue for the
228	/// LDK-provided update-based [`Persist`], it is somewhat surprising for users so we avoid it.
229	pending_monitor_updates: Mutex<Vec<u64>>,
230}
231
232impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
233	fn has_pending_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<u64>>) -> bool {
234		!pending_monitor_updates_lock.is_empty()
235	}
236}
237
238/// A read-only reference to a current ChannelMonitor.
239///
240/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
241/// released.
242pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
243	lock: RwLockReadGuard<'a, HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
244	channel_id: ChannelId,
245}
246
247impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
248	type Target = ChannelMonitor<ChannelSigner>;
249	fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
250		&self.lock.get(&self.channel_id).expect("Checked at construction").monitor
251	}
252}
253
254/// An unconstructable [`Persist`]er which is used under the hood when you call
255/// [`ChainMonitor::new_async_beta`].
256///
257/// This is not exported to bindings users as async is not supported outside of Rust.
258pub struct AsyncPersister<
259	K: Deref + MaybeSend + MaybeSync + 'static,
260	S: FutureSpawner,
261	L: Deref + MaybeSend + MaybeSync + 'static,
262	ES: Deref + MaybeSend + MaybeSync + 'static,
263	SP: Deref + MaybeSend + MaybeSync + 'static,
264	BI: Deref + MaybeSend + MaybeSync + 'static,
265	FE: Deref + MaybeSend + MaybeSync + 'static,
266> where
267	K::Target: KVStore + MaybeSync,
268	L::Target: Logger,
269	ES::Target: EntropySource + Sized,
270	SP::Target: SignerProvider + Sized,
271	BI::Target: BroadcasterInterface,
272	FE::Target: FeeEstimator,
273{
274	persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275	event_notifier: Arc<Notifier>,
276}
277
278impl<
279		K: Deref + MaybeSend + MaybeSync + 'static,
280		S: FutureSpawner,
281		L: Deref + MaybeSend + MaybeSync + 'static,
282		ES: Deref + MaybeSend + MaybeSync + 'static,
283		SP: Deref + MaybeSend + MaybeSync + 'static,
284		BI: Deref + MaybeSend + MaybeSync + 'static,
285		FE: Deref + MaybeSend + MaybeSync + 'static,
286	> Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
287where
288	K::Target: KVStore + MaybeSync,
289	L::Target: Logger,
290	ES::Target: EntropySource + Sized,
291	SP::Target: SignerProvider + Sized,
292	BI::Target: BroadcasterInterface,
293	FE::Target: FeeEstimator,
294{
295	type Target = Self;
296	fn deref(&self) -> &Self {
297		self
298	}
299}
300
301impl<
302		K: Deref + MaybeSend + MaybeSync + 'static,
303		S: FutureSpawner,
304		L: Deref + MaybeSend + MaybeSync + 'static,
305		ES: Deref + MaybeSend + MaybeSync + 'static,
306		SP: Deref + MaybeSend + MaybeSync + 'static,
307		BI: Deref + MaybeSend + MaybeSync + 'static,
308		FE: Deref + MaybeSend + MaybeSync + 'static,
309	> Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
310where
311	K::Target: KVStore + MaybeSync,
312	L::Target: Logger,
313	ES::Target: EntropySource + Sized,
314	SP::Target: SignerProvider + Sized,
315	BI::Target: BroadcasterInterface,
316	FE::Target: FeeEstimator,
317	<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
318{
319	fn persist_new_channel(
320		&self, monitor_name: MonitorName,
321		monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322	) -> ChannelMonitorUpdateStatus {
323		let notifier = Arc::clone(&self.event_notifier);
324		self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
325		ChannelMonitorUpdateStatus::InProgress
326	}
327
328	fn update_persisted_channel(
329		&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330		monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331	) -> ChannelMonitorUpdateStatus {
332		let notifier = Arc::clone(&self.event_notifier);
333		self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
334		ChannelMonitorUpdateStatus::InProgress
335	}
336
337	fn archive_persisted_channel(&self, monitor_name: MonitorName) {
338		self.persister.spawn_async_archive_persisted_channel(monitor_name);
339	}
340
341	fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342		self.persister.get_and_clear_completed_updates()
343	}
344}
345
346/// An implementation of [`chain::Watch`] for monitoring channels.
347///
348/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
349/// [`chain::Watch`]. May be used in conjunction with [`ChannelManager`] to monitor channels locally
350/// or used independently to monitor channels remotely. See the [module-level documentation] for
351/// details.
352///
353/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from
354/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks,
355/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if
356/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an
357/// environment with spotty connections, like on mobile.
358///
359/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
360/// [module-level documentation]: crate::chain::chainmonitor
361/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
362pub struct ChainMonitor<
363	ChannelSigner: EcdsaChannelSigner,
364	C: Deref,
365	T: Deref,
366	F: Deref,
367	L: Deref,
368	P: Deref,
369	ES: Deref,
370> where
371	C::Target: chain::Filter,
372	T::Target: BroadcasterInterface,
373	F::Target: FeeEstimator,
374	L::Target: Logger,
375	P::Target: Persist<ChannelSigner>,
376	ES::Target: EntropySource,
377{
378	monitors: RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
379	chain_source: Option<C>,
380	broadcaster: T,
381	logger: L,
382	fee_estimator: F,
383	persister: P,
384	_entropy_source: ES,
385	/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
386	/// from the user and not from a [`ChannelMonitor`].
387	pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
388	/// The best block height seen, used as a proxy for the passage of time.
389	highest_chain_height: AtomicUsize,
390
391	/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
392	/// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
393	event_notifier: Arc<Notifier>,
394
395	/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
396	pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
397
398	#[cfg(peer_storage)]
399	our_peerstorage_encryption_key: PeerStorageKey,
400}
401
402impl<
403		K: Deref + MaybeSend + MaybeSync + 'static,
404		S: FutureSpawner,
405		SP: Deref + MaybeSend + MaybeSync + 'static,
406		C: Deref,
407		T: Deref + MaybeSend + MaybeSync + 'static,
408		F: Deref + MaybeSend + MaybeSync + 'static,
409		L: Deref + MaybeSend + MaybeSync + 'static,
410		ES: Deref + MaybeSend + MaybeSync + 'static,
411	>
412	ChainMonitor<
413		<SP::Target as SignerProvider>::EcdsaSigner,
414		C,
415		T,
416		F,
417		L,
418		AsyncPersister<K, S, L, ES, SP, T, F>,
419		ES,
420	> where
421	K::Target: KVStore + MaybeSync,
422	SP::Target: SignerProvider + Sized,
423	C::Target: chain::Filter,
424	T::Target: BroadcasterInterface,
425	F::Target: FeeEstimator,
426	L::Target: Logger,
427	ES::Target: EntropySource + Sized,
428	<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
429{
430	/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
431	///
432	/// This behaves the same as [`ChainMonitor::new`] except that it relies on
433	/// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
434	///
435	/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
436	///
437	/// This is not exported to bindings users as async is not supported outside of Rust.
438	pub fn new_async_beta(
439		chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
440		persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441		_our_peerstorage_encryption_key: PeerStorageKey,
442	) -> Self {
443		let event_notifier = Arc::new(Notifier::new());
444		Self {
445			monitors: RwLock::new(new_hash_map()),
446			chain_source,
447			broadcaster,
448			logger,
449			fee_estimator: feeest,
450			_entropy_source,
451			pending_monitor_events: Mutex::new(Vec::new()),
452			highest_chain_height: AtomicUsize::new(0),
453			event_notifier: Arc::clone(&event_notifier),
454			persister: AsyncPersister { persister, event_notifier },
455			pending_send_only_events: Mutex::new(Vec::new()),
456			#[cfg(peer_storage)]
457			our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
458		}
459	}
460}
461
462impl<
463		ChannelSigner: EcdsaChannelSigner,
464		C: Deref,
465		T: Deref,
466		F: Deref,
467		L: Deref,
468		P: Deref,
469		ES: Deref,
470	> ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
471where
472	C::Target: chain::Filter,
473	T::Target: BroadcasterInterface,
474	F::Target: FeeEstimator,
475	L::Target: Logger,
476	P::Target: Persist<ChannelSigner>,
477	ES::Target: EntropySource,
478{
479	/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
480	/// of a channel and reacting accordingly based on transactions in the given chain data. See
481	/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
482	/// be returned by [`chain::Watch::release_pending_monitor_events`].
483	///
484	/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
485	/// calls must not exclude any transactions matching the new outputs nor any in-block
486	/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
487	/// updated `txdata`.
488	///
489	/// Calls which represent a new blockchain tip height should set `best_height`.
490	fn process_chain_data<FN>(
491		&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN,
492	) where
493		FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
494	{
495		let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
496		let channel_ids = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
497		let channel_count = channel_ids.len();
498		for channel_id in channel_ids.iter() {
499			let monitor_lock = self.monitors.read().unwrap();
500			if let Some(monitor_state) = monitor_lock.get(channel_id) {
501				let update_res = self.update_monitor_with_chain_data(
502					header,
503					best_height,
504					txdata,
505					&process,
506					channel_id,
507					&monitor_state,
508					channel_count,
509				);
510				if update_res.is_err() {
511					// Take the monitors lock for writing so that we poison it and any future
512					// operations going forward fail immediately.
513					core::mem::drop(monitor_lock);
514					let _poison = self.monitors.write().unwrap();
515					log_error!(self.logger, "{}", err_str);
516					panic!("{}", err_str);
517				}
518			}
519		}
520
521		// Do another pass to handle any monitors added in between iterations.
522		let monitor_states = self.monitors.write().unwrap();
523		for (channel_id, monitor_state) in monitor_states.iter() {
524			if !channel_ids.contains(channel_id) {
525				let update_res = self.update_monitor_with_chain_data(
526					header,
527					best_height,
528					txdata,
529					&process,
530					channel_id,
531					&monitor_state,
532					channel_count,
533				);
534				if update_res.is_err() {
535					log_error!(self.logger, "{}", err_str);
536					panic!("{}", err_str);
537				}
538			}
539		}
540
541		if let Some(height) = best_height {
542			// If the best block height is being updated, update highest_chain_height under the
543			// monitors write lock.
544			let old_height = self.highest_chain_height.load(Ordering::Acquire);
545			let new_height = height as usize;
546			if new_height > old_height {
547				self.highest_chain_height.store(new_height, Ordering::Release);
548			}
549		}
550	}
551
552	fn update_monitor_with_chain_data<FN>(
553		&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN,
554		channel_id: &ChannelId, monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
555	) -> Result<(), ()>
556	where
557		FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
558	{
559		let monitor = &monitor_state.monitor;
560		let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
561
562		let mut txn_outputs = process(monitor, txdata);
563
564		let get_partition_key = |channel_id: &ChannelId| {
565			let channel_id_bytes = channel_id.0;
566			let channel_id_u32 = u32::from_be_bytes([
567				channel_id_bytes[0],
568				channel_id_bytes[1],
569				channel_id_bytes[2],
570				channel_id_bytes[3],
571			]);
572			channel_id_u32.wrapping_add(best_height.unwrap_or_default())
573		};
574
575		let partition_factor = if channel_count < 15 {
576			5
577		} else {
578			50 // ~ 8hours
579		};
580
581		let has_pending_claims = monitor_state.monitor.has_pending_claims();
582		if has_pending_claims || get_partition_key(channel_id) % partition_factor == 0 {
583			log_trace!(
584				logger,
585				"Syncing Channel Monitor for channel {}",
586				log_funding_info!(monitor)
587			);
588			// Even though we don't track monitor updates from chain-sync as pending, we still want
589			// updates per-channel to be well-ordered so that users don't see a
590			// `ChannelMonitorUpdate` after a channel persist for a channel with the same
591			// `latest_update_id`.
592			let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
593			match self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor)
594			{
595				ChannelMonitorUpdateStatus::Completed => log_trace!(
596					logger,
597					"Finished syncing Channel Monitor for channel {} for block-data",
598					log_funding_info!(monitor)
599				),
600				ChannelMonitorUpdateStatus::InProgress => {
601					log_trace!(
602						logger,
603						"Channel Monitor sync for channel {} in progress.",
604						log_funding_info!(monitor)
605					);
606				},
607				ChannelMonitorUpdateStatus::UnrecoverableError => {
608					return Err(());
609				},
610			}
611		}
612
613		// Register any new outputs with the chain source for filtering, storing any dependent
614		// transactions from within the block that previously had not been included in txdata.
615		if let Some(ref chain_source) = self.chain_source {
616			let block_hash = header.block_hash();
617			for (txid, mut outputs) in txn_outputs.drain(..) {
618				for (idx, output) in outputs.drain(..) {
619					// Register any new outputs with the chain source for filtering
620					let output = WatchedOutput {
621						block_hash: Some(block_hash),
622						outpoint: OutPoint { txid, index: idx as u16 },
623						script_pubkey: output.script_pubkey,
624					};
625					log_trace!(
626						logger,
627						"Adding monitoring for spends of outpoint {} to the filter",
628						output.outpoint
629					);
630					chain_source.register_output(output);
631				}
632			}
633		}
634		Ok(())
635	}
636
637	/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
638	///
639	/// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
640	/// will call back to it indicating transactions and outputs of interest. This allows clients to
641	/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
642	/// always need to fetch full blocks absent another means for determining which blocks contain
643	/// transactions relevant to the watched channels.
644	///
645	/// # Note
646	/// `our_peerstorage_encryption_key` must be obtained from [`NodeSigner::get_peer_storage_key`].
647	/// This key is used to encrypt peer storage backups.
648	///
649	/// **Important**: This key should not be set arbitrarily or changed after initialization. The same key
650	/// is obtained by the [`ChannelManager`] through [`NodeSigner`] to decrypt peer backups.
651	/// Using an inconsistent or incorrect key will result in the inability to decrypt previously encrypted backups.
652	///
653	/// [`NodeSigner`]: crate::sign::NodeSigner
654	/// [`NodeSigner::get_peer_storage_key`]: crate::sign::NodeSigner::get_peer_storage_key
655	/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
656	pub fn new(
657		chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
658		_entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey,
659	) -> Self {
660		Self {
661			monitors: RwLock::new(new_hash_map()),
662			chain_source,
663			broadcaster,
664			logger,
665			fee_estimator: feeest,
666			persister,
667			_entropy_source,
668			pending_monitor_events: Mutex::new(Vec::new()),
669			highest_chain_height: AtomicUsize::new(0),
670			event_notifier: Arc::new(Notifier::new()),
671			pending_send_only_events: Mutex::new(Vec::new()),
672			#[cfg(peer_storage)]
673			our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
674		}
675	}
676
677	/// Gets the balances in the contained [`ChannelMonitor`]s which are claimable on-chain or
678	/// claims which are awaiting confirmation.
679	///
680	/// Includes the balances from each [`ChannelMonitor`] *except* those included in
681	/// `ignored_channels`.
682	///
683	/// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
684	/// inclusion in the return value.
685	pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
686		let mut ret = Vec::new();
687		let monitor_states = self.monitors.read().unwrap();
688		for (_, monitor_state) in monitor_states.iter().filter(|(channel_id, _)| {
689			for chan in ignored_channels {
690				if chan.channel_id == **channel_id {
691					return false;
692				}
693			}
694			true
695		}) {
696			ret.append(&mut monitor_state.monitor.get_claimable_balances());
697		}
698		ret
699	}
700
701	/// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no
702	/// such [`ChannelMonitor`] is currently being monitored for.
703	///
704	/// Note that the result holds a mutex over our monitor set, and should not be held
705	/// indefinitely.
706	pub fn get_monitor(
707		&self, channel_id: ChannelId,
708	) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
709		let lock = self.monitors.read().unwrap();
710		if lock.get(&channel_id).is_some() {
711			Ok(LockedChannelMonitor { lock, channel_id })
712		} else {
713			Err(())
714		}
715	}
716
717	/// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
718	///
719	/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
720	/// monitoring for on-chain state resolutions.
721	pub fn list_monitors(&self) -> Vec<ChannelId> {
722		self.monitors.read().unwrap().keys().copied().collect()
723	}
724
725	#[cfg(not(c_bindings))]
726	/// Lists the pending updates for each [`ChannelMonitor`] (by `ChannelId` being monitored).
727	/// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
728	/// that have not yet been fully persisted. Note that if a full monitor is persisted all the pending
729	/// monitor updates must be individually marked completed by calling [`ChainMonitor::channel_monitor_updated`].
730	pub fn list_pending_monitor_updates(&self) -> HashMap<ChannelId, Vec<u64>> {
731		hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(channel_id, holder)| {
732			(*channel_id, holder.pending_monitor_updates.lock().unwrap().clone())
733		}))
734	}
735
736	#[cfg(c_bindings)]
737	/// Lists the pending updates for each [`ChannelMonitor`] (by `ChannelId` being monitored).
738	/// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
739	/// that have not yet been fully persisted. Note that if a full monitor is persisted all the pending
740	/// monitor updates must be individually marked completed by calling [`ChainMonitor::channel_monitor_updated`].
741	pub fn list_pending_monitor_updates(&self) -> Vec<(ChannelId, Vec<u64>)> {
742		let monitors = self.monitors.read().unwrap();
743		monitors
744			.iter()
745			.map(|(channel_id, holder)| {
746				(*channel_id, holder.pending_monitor_updates.lock().unwrap().clone())
747			})
748			.collect()
749	}
750
751	#[cfg(any(test, feature = "_test_utils"))]
752	pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
753		self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
754	}
755
756	/// Indicates the persistence of a [`ChannelMonitor`] has completed after
757	/// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation.
758	///
759	/// Thus, the anticipated use is, at a high level:
760	///  1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
761	///     update to disk and begins updating any remote (e.g. watchtower/backup) copies,
762	///     returning [`ChannelMonitorUpdateStatus::InProgress`],
763	///  2) once all remote copies are updated, you call this function with [`ChannelMonitor::get_latest_update_id`]
764	///     or [`ChannelMonitorUpdate::update_id`] as the `completed_update_id`, and once all pending
765	///     updates have completed the channel will be re-enabled.
766	///
767	/// It is only necessary to call [`ChainMonitor::channel_monitor_updated`] when you return [`ChannelMonitorUpdateStatus::InProgress`]
768	/// from [`Persist`] and either:
769	///   1. A new [`ChannelMonitor`] was added in [`Persist::persist_new_channel`], or
770	///   2. A [`ChannelMonitorUpdate`] was provided as part of [`Persist::update_persisted_channel`].
771	/// Note that we don't care about calls to [`Persist::update_persisted_channel`] where no
772	/// [`ChannelMonitorUpdate`] was provided.
773	///
774	/// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently
775	/// registered [`ChannelMonitor`]s.
776	pub fn channel_monitor_updated(
777		&self, channel_id: ChannelId, completed_update_id: u64,
778	) -> Result<(), APIError> {
779		let monitors = self.monitors.read().unwrap();
780		let monitor_data = if let Some(mon) = monitors.get(&channel_id) {
781			mon
782		} else {
783			return Err(APIError::APIMisuseError {
784				err: format!("No ChannelMonitor matching channel ID {} found", channel_id),
785			});
786		};
787		let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
788		pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
789
790		// Note that we only check for pending non-chainsync monitor updates and we don't track monitor
791		// updates resulting from chainsync in `pending_monitor_updates`.
792		let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates);
793		log_debug!(
794			self.logger,
795			"Completed off-chain monitor update {} for channel with channel ID {}, {}",
796			completed_update_id,
797			channel_id,
798			if monitor_is_pending_updates {
799				"still have pending off-chain updates"
800			} else {
801				"all off-chain updates complete, returning a MonitorEvent"
802			}
803		);
804		if monitor_is_pending_updates {
805			// If there are still monitor updates pending, we cannot yet construct a
806			// Completed event.
807			return Ok(());
808		}
809		let funding_txo = monitor_data.monitor.get_funding_txo();
810		self.pending_monitor_events.lock().unwrap().push((
811			funding_txo,
812			channel_id,
813			vec![MonitorEvent::Completed {
814				funding_txo,
815				channel_id,
816				monitor_update_id: monitor_data.monitor.get_latest_update_id(),
817			}],
818			monitor_data.monitor.get_counterparty_node_id(),
819		));
820
821		self.event_notifier.notify();
822		Ok(())
823	}
824
825	/// This wrapper avoids having to update some of our tests for now as they assume the direct
826	/// chain::Watch API wherein we mark a monitor fully-updated by just calling
827	/// channel_monitor_updated once with the highest ID.
828	#[cfg(any(test, fuzzing))]
829	pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) {
830		let monitors = self.monitors.read().unwrap();
831		let monitor = &monitors.get(&channel_id).unwrap().monitor;
832		let counterparty_node_id = monitor.get_counterparty_node_id();
833		let funding_txo = monitor.get_funding_txo();
834		self.pending_monitor_events.lock().unwrap().push((
835			funding_txo,
836			channel_id,
837			vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }],
838			counterparty_node_id,
839		));
840		self.event_notifier.notify();
841	}
842
843	#[cfg(any(test, feature = "_test_utils"))]
844	pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
845		use crate::events::EventsProvider;
846		let events = core::cell::RefCell::new(Vec::new());
847		let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
848		self.process_pending_events(&event_handler);
849		events.into_inner()
850	}
851
852	/// Processes any events asynchronously in the order they were generated since the last call
853	/// using the given event handler.
854	///
855	/// See the trait-level documentation of [`EventsProvider`] for requirements.
856	///
857	/// [`EventsProvider`]: crate::events::EventsProvider
858	pub async fn process_pending_events_async<
859		Future: core::future::Future<Output = Result<(), ReplayEvent>>,
860		H: Fn(Event) -> Future,
861	>(
862		&self, handler: H,
863	) {
864		// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
865		// crazy dance to process a monitor's events then only remove them once we've done so.
866		let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
867		for channel_id in mons_to_process {
868			let mut ev;
869			match super::channelmonitor::process_events_body!(
870				self.monitors.read().unwrap().get(&channel_id).map(|m| &m.monitor),
871				self.logger,
872				ev,
873				handler(ev).await
874			) {
875				Ok(()) => {},
876				Err(ReplayEvent()) => {
877					self.event_notifier.notify();
878				},
879			}
880		}
881	}
882
883	/// Gets a [`Future`] that completes when an event is available either via
884	/// [`chain::Watch::release_pending_monitor_events`] or
885	/// [`EventsProvider::process_pending_events`].
886	///
887	/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
888	/// [`ChainMonitor`] and should instead register actions to be taken later.
889	///
890	/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
891	pub fn get_update_future(&self) -> Future {
892		self.event_notifier.get_future()
893	}
894
895	/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
896	/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
897	/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
898	/// invoking this every 30 seconds, or lower if running in an environment with spotty
899	/// connections, like on mobile.
900	pub fn rebroadcast_pending_claims(&self) {
901		let monitors = self.monitors.read().unwrap();
902		for (_, monitor_holder) in &*monitors {
903			monitor_holder.monitor.rebroadcast_pending_claims(
904				&*self.broadcaster,
905				&*self.fee_estimator,
906				&self.logger,
907			)
908		}
909	}
910
911	/// Triggers rebroadcasts of pending claims from force-closed channels after a transaction
912	/// signature generation failure.
913	///
914	/// `monitor_opt` can be used as a filter to only trigger them for a specific channel monitor.
915	pub fn signer_unblocked(&self, monitor_opt: Option<ChannelId>) {
916		let monitors = self.monitors.read().unwrap();
917		if let Some(channel_id) = monitor_opt {
918			if let Some(monitor_holder) = monitors.get(&channel_id) {
919				monitor_holder.monitor.signer_unblocked(
920					&*self.broadcaster,
921					&*self.fee_estimator,
922					&self.logger,
923				)
924			}
925		} else {
926			for (_, monitor_holder) in &*monitors {
927				monitor_holder.monitor.signer_unblocked(
928					&*self.broadcaster,
929					&*self.fee_estimator,
930					&self.logger,
931				)
932			}
933		}
934	}
935
936	/// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
937	///
938	/// This is useful for pruning fully resolved monitors from the monitor set and primary
939	/// storage so they are not kept in memory and reloaded on restart.
940	///
941	/// Should be called occasionally (once every handful of blocks or on startup).
942	///
943	/// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor
944	/// data could be moved to an archive location or removed entirely.
945	pub fn archive_fully_resolved_channel_monitors(&self) {
946		let mut have_monitors_to_prune = false;
947		for monitor_holder in self.monitors.read().unwrap().values() {
948			let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
949			let (is_fully_resolved, needs_persistence) =
950				monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
951			if is_fully_resolved {
952				have_monitors_to_prune = true;
953			}
954			if needs_persistence {
955				self.persister.update_persisted_channel(
956					monitor_holder.monitor.persistence_key(),
957					None,
958					&monitor_holder.monitor,
959				);
960			}
961		}
962		if have_monitors_to_prune {
963			let mut monitors = self.monitors.write().unwrap();
964			monitors.retain(|channel_id, monitor_holder| {
965				let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
966				let (is_fully_resolved, _) =
967					monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
968				if is_fully_resolved {
969					log_info!(
970						logger,
971						"Archiving fully resolved ChannelMonitor for channel ID {}",
972						channel_id
973					);
974					self.persister
975						.archive_persisted_channel(monitor_holder.monitor.persistence_key());
976					false
977				} else {
978					true
979				}
980			});
981		}
982	}
983
984	/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
985	/// ensuring unique IDs are returned.
986	#[cfg(peer_storage)]
987	fn all_counterparty_node_ids(&self) -> HashSet<PublicKey> {
988		let mon = self.monitors.read().unwrap();
989		mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect()
990	}
991
992	#[cfg(peer_storage)]
993	fn send_peer_storage(&self, their_node_id: PublicKey) {
994		let mut monitors_list: Vec<PeerStorageMonitorHolder> = Vec::new();
995		let random_bytes = self._entropy_source.get_secure_random_bytes();
996
997		const MAX_PEER_STORAGE_SIZE: usize = 65531;
998		const USIZE_LEN: usize = core::mem::size_of::<usize>();
999		let mut random_bytes_cycle_iter = random_bytes.iter().cycle();
1000
1001		let mut current_size = 0;
1002		let monitors_lock = self.monitors.read().unwrap();
1003		let mut channel_ids = monitors_lock.keys().copied().collect();
1004
1005		fn next_random_id(
1006			channel_ids: &mut Vec<ChannelId>,
1007			random_bytes_cycle_iter: &mut Cycle<core::slice::Iter<u8>>,
1008		) -> Option<ChannelId> {
1009			if channel_ids.is_empty() {
1010				return None;
1011			}
1012			let random_idx = {
1013				let mut usize_bytes = [0u8; USIZE_LEN];
1014				usize_bytes.iter_mut().for_each(|b| {
1015					*b = *random_bytes_cycle_iter.next().expect("A cycle never ends")
1016				});
1017				// Take one more to introduce a slight misalignment.
1018				random_bytes_cycle_iter.next().expect("A cycle never ends");
1019				usize::from_le_bytes(usize_bytes) % channel_ids.len()
1020			};
1021			Some(channel_ids.swap_remove(random_idx))
1022		}
1023
1024		while let Some(channel_id) = next_random_id(&mut channel_ids, &mut random_bytes_cycle_iter)
1025		{
1026			let monitor_holder = if let Some(monitor_holder) = monitors_lock.get(&channel_id) {
1027				monitor_holder
1028			} else {
1029				debug_assert!(
1030					false,
1031					"Tried to access non-existing monitor, this should never happen"
1032				);
1033				break;
1034			};
1035
1036			let mut serialized_channel = VecWriter(Vec::new());
1037			let min_seen_secret = monitor_holder.monitor.get_min_seen_secret();
1038			let counterparty_node_id = monitor_holder.monitor.get_counterparty_node_id();
1039			{
1040				let inner_lock = monitor_holder.monitor.inner.lock().unwrap();
1041
1042				write_chanmon_internal(&inner_lock, true, &mut serialized_channel)
1043					.expect("can not write Channel Monitor for peer storage message");
1044			}
1045			let peer_storage_monitor = PeerStorageMonitorHolder {
1046				channel_id,
1047				min_seen_secret,
1048				counterparty_node_id,
1049				monitor_bytes: serialized_channel.0,
1050			};
1051
1052			let serialized_length = peer_storage_monitor.serialized_length();
1053
1054			if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
1055				continue;
1056			} else {
1057				current_size += serialized_length;
1058				monitors_list.push(peer_storage_monitor);
1059			}
1060		}
1061
1062		let serialised_channels = monitors_list.encode();
1063		let our_peer_storage = DecryptedOurPeerStorage::new(serialised_channels);
1064		let cipher = our_peer_storage.encrypt(&self.our_peerstorage_encryption_key, &random_bytes);
1065
1066		log_debug!(self.logger, "Sending Peer Storage to {}", log_pubkey!(their_node_id));
1067		let send_peer_storage_event = MessageSendEvent::SendPeerStorage {
1068			node_id: their_node_id,
1069			msg: PeerStorage { data: cipher.into_vec() },
1070		};
1071
1072		self.pending_send_only_events.lock().unwrap().push(send_peer_storage_event)
1073	}
1074
1075	/// Loads a [`ChannelMonitor`] which already exists on disk after startup.
1076	///
1077	/// Using this over [`chain::Watch::watch_channel`] avoids re-persisting a [`ChannelMonitor`]
1078	/// that hasn't changed, slowing down startup.
1079	///
1080	/// Note that this method *can* be used if additional blocks were replayed against the
1081	/// [`ChannelMonitor`] or if a [`ChannelMonitorUpdate`] loaded from disk was replayed such that
1082	/// it will replayed on startup, and in general can only *not* be used if you directly accessed
1083	/// the [`ChannelMonitor`] and changed its state in some way that will not be replayed again on
1084	/// a restart. Such direct access should generally never occur for most LDK-based nodes.
1085	///
1086	/// For [`ChannelMonitor`]s which were last serialized by an LDK version prior to 0.1 this will
1087	/// fall back to calling [`chain::Watch::watch_channel`] and persisting the [`ChannelMonitor`].
1088	/// See the release notes for LDK 0.1 for more information on this requirement.
1089	///
1090	/// [`ChannelMonitor`]s which do not need to be persisted (i.e. were last written by LDK 0.1 or
1091	/// later) will be loaded without persistence and this method will return
1092	/// [`ChannelMonitorUpdateStatus::Completed`].
1093	pub fn load_existing_monitor(
1094		&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1095	) -> Result<ChannelMonitorUpdateStatus, ()> {
1096		if !monitor.written_by_0_1_or_later() {
1097			return chain::Watch::watch_channel(self, channel_id, monitor);
1098		}
1099
1100		let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1101		let mut monitors = self.monitors.write().unwrap();
1102		let entry = match monitors.entry(channel_id) {
1103			hash_map::Entry::Occupied(_) => {
1104				log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1105				return Err(());
1106			},
1107			hash_map::Entry::Vacant(e) => e,
1108		};
1109		log_trace!(
1110			logger,
1111			"Loaded existing ChannelMonitor for channel {}",
1112			log_funding_info!(monitor)
1113		);
1114		if let Some(ref chain_source) = self.chain_source {
1115			monitor.load_outputs_to_watch(chain_source, &self.logger);
1116		}
1117		entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) });
1118
1119		Ok(ChannelMonitorUpdateStatus::Completed)
1120	}
1121}
1122
1123impl<
1124		ChannelSigner: EcdsaChannelSigner,
1125		C: Deref,
1126		T: Deref,
1127		F: Deref,
1128		L: Deref,
1129		P: Deref,
1130		ES: Deref,
1131	> BaseMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1132where
1133	C::Target: chain::Filter,
1134	T::Target: BroadcasterInterface,
1135	F::Target: FeeEstimator,
1136	L::Target: Logger,
1137	P::Target: Persist<ChannelSigner>,
1138	ES::Target: EntropySource,
1139{
1140	fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
1141		let mut pending_events = self.pending_send_only_events.lock().unwrap();
1142		core::mem::take(&mut *pending_events)
1143	}
1144
1145	fn peer_disconnected(&self, _their_node_id: PublicKey) {}
1146
1147	fn provided_node_features(&self) -> NodeFeatures {
1148		NodeFeatures::empty()
1149	}
1150
1151	fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
1152		InitFeatures::empty()
1153	}
1154
1155	fn peer_connected(
1156		&self, _their_node_id: PublicKey, _msg: &Init, _inbound: bool,
1157	) -> Result<(), ()> {
1158		Ok(())
1159	}
1160}
1161
1162impl<
1163		ChannelSigner: EcdsaChannelSigner,
1164		C: Deref,
1165		T: Deref,
1166		F: Deref,
1167		L: Deref,
1168		P: Deref,
1169		ES: Deref,
1170	> SendOnlyMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1171where
1172	C::Target: chain::Filter,
1173	T::Target: BroadcasterInterface,
1174	F::Target: FeeEstimator,
1175	L::Target: Logger,
1176	P::Target: Persist<ChannelSigner>,
1177	ES::Target: EntropySource,
1178{
1179}
1180
1181impl<
1182		ChannelSigner: EcdsaChannelSigner,
1183		C: Deref,
1184		T: Deref,
1185		F: Deref,
1186		L: Deref,
1187		P: Deref,
1188		ES: Deref,
1189	> chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1190where
1191	C::Target: chain::Filter,
1192	T::Target: BroadcasterInterface,
1193	F::Target: FeeEstimator,
1194	L::Target: Logger,
1195	P::Target: Persist<ChannelSigner>,
1196	ES::Target: EntropySource,
1197{
1198	fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
1199		log_debug!(
1200			self.logger,
1201			"New best block {} at height {} provided via block_connected",
1202			header.block_hash(),
1203			height
1204		);
1205		self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
1206			monitor.block_connected(
1207				header,
1208				txdata,
1209				height,
1210				&*self.broadcaster,
1211				&*self.fee_estimator,
1212				&self.logger,
1213			)
1214		});
1215
1216		#[cfg(peer_storage)]
1217		// Send peer storage everytime a new block arrives.
1218		for node_id in self.all_counterparty_node_ids() {
1219			self.send_peer_storage(node_id);
1220		}
1221
1222		// Assume we may have some new events and wake the event processor
1223		self.event_notifier.notify();
1224	}
1225
1226	fn blocks_disconnected(&self, fork_point: BestBlock) {
1227		let monitor_states = self.monitors.read().unwrap();
1228		log_debug!(
1229			self.logger,
1230			"Block(s) removed to height {} via blocks_disconnected. New best block is {}",
1231			fork_point.height,
1232			fork_point.block_hash,
1233		);
1234		for monitor_state in monitor_states.values() {
1235			monitor_state.monitor.blocks_disconnected(
1236				fork_point,
1237				&*self.broadcaster,
1238				&*self.fee_estimator,
1239				&self.logger,
1240			);
1241		}
1242	}
1243}
1244
1245impl<
1246		ChannelSigner: EcdsaChannelSigner,
1247		C: Deref,
1248		T: Deref,
1249		F: Deref,
1250		L: Deref,
1251		P: Deref,
1252		ES: Deref,
1253	> chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1254where
1255	C::Target: chain::Filter,
1256	T::Target: BroadcasterInterface,
1257	F::Target: FeeEstimator,
1258	L::Target: Logger,
1259	P::Target: Persist<ChannelSigner>,
1260	ES::Target: EntropySource,
1261{
1262	fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
1263		log_debug!(
1264			self.logger,
1265			"{} provided transactions confirmed at height {} in block {}",
1266			txdata.len(),
1267			height,
1268			header.block_hash()
1269		);
1270		self.process_chain_data(header, None, txdata, |monitor, txdata| {
1271			monitor.transactions_confirmed(
1272				header,
1273				txdata,
1274				height,
1275				&*self.broadcaster,
1276				&*self.fee_estimator,
1277				&self.logger,
1278			)
1279		});
1280		// Assume we may have some new events and wake the event processor
1281		self.event_notifier.notify();
1282	}
1283
1284	fn transaction_unconfirmed(&self, txid: &Txid) {
1285		log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
1286		let monitor_states = self.monitors.read().unwrap();
1287		for monitor_state in monitor_states.values() {
1288			monitor_state.monitor.transaction_unconfirmed(
1289				txid,
1290				&*self.broadcaster,
1291				&*self.fee_estimator,
1292				&self.logger,
1293			);
1294		}
1295	}
1296
1297	fn best_block_updated(&self, header: &Header, height: u32) {
1298		log_debug!(
1299			self.logger,
1300			"New best block {} at height {} provided via best_block_updated",
1301			header.block_hash(),
1302			height
1303		);
1304		self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
1305			// While in practice there shouldn't be any recursive calls when given empty txdata,
1306			// it's still possible if a chain::Filter implementation returns a transaction.
1307			debug_assert!(txdata.is_empty());
1308			monitor.best_block_updated(
1309				header,
1310				height,
1311				&*self.broadcaster,
1312				&*self.fee_estimator,
1313				&self.logger,
1314			)
1315		});
1316
1317		#[cfg(peer_storage)]
1318		// Send peer storage everytime a new block arrives.
1319		for node_id in self.all_counterparty_node_ids() {
1320			self.send_peer_storage(node_id);
1321		}
1322
1323		// Assume we may have some new events and wake the event processor
1324		self.event_notifier.notify();
1325	}
1326
1327	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
1328		let mut txids = Vec::new();
1329		let monitor_states = self.monitors.read().unwrap();
1330		for monitor_state in monitor_states.values() {
1331			txids.append(&mut monitor_state.monitor.get_relevant_txids());
1332		}
1333
1334		txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
1335		txids.dedup_by_key(|(txid, _, _)| *txid);
1336		txids
1337	}
1338}
1339
1340impl<
1341		ChannelSigner: EcdsaChannelSigner,
1342		C: Deref,
1343		T: Deref,
1344		F: Deref,
1345		L: Deref,
1346		P: Deref,
1347		ES: Deref,
1348	> chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1349where
1350	C::Target: chain::Filter,
1351	T::Target: BroadcasterInterface,
1352	F::Target: FeeEstimator,
1353	L::Target: Logger,
1354	P::Target: Persist<ChannelSigner>,
1355	ES::Target: EntropySource,
1356{
1357	fn watch_channel(
1358		&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1359	) -> Result<ChannelMonitorUpdateStatus, ()> {
1360		let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1361		let mut monitors = self.monitors.write().unwrap();
1362		let entry = match monitors.entry(channel_id) {
1363			hash_map::Entry::Occupied(_) => {
1364				log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1365				return Err(());
1366			},
1367			hash_map::Entry::Vacant(e) => e,
1368		};
1369		log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
1370		let update_id = monitor.get_latest_update_id();
1371		let mut pending_monitor_updates = Vec::new();
1372		let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1373		match persist_res {
1374			ChannelMonitorUpdateStatus::InProgress => {
1375				log_info!(
1376					logger,
1377					"Persistence of new ChannelMonitor for channel {} in progress",
1378					log_funding_info!(monitor)
1379				);
1380				pending_monitor_updates.push(update_id);
1381			},
1382			ChannelMonitorUpdateStatus::Completed => {
1383				log_info!(
1384					logger,
1385					"Persistence of new ChannelMonitor for channel {} completed",
1386					log_funding_info!(monitor)
1387				);
1388			},
1389			ChannelMonitorUpdateStatus::UnrecoverableError => {
1390				let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1391				log_error!(logger, "{}", err_str);
1392				panic!("{}", err_str);
1393			},
1394		}
1395		if let Some(ref chain_source) = self.chain_source {
1396			monitor.load_outputs_to_watch(chain_source, &self.logger);
1397		}
1398		entry.insert(MonitorHolder {
1399			monitor,
1400			pending_monitor_updates: Mutex::new(pending_monitor_updates),
1401		});
1402		Ok(persist_res)
1403	}
1404
1405	fn update_channel(
1406		&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
1407	) -> ChannelMonitorUpdateStatus {
1408		// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
1409		// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
1410		debug_assert_eq!(update.channel_id.unwrap(), channel_id);
1411		// Update the monitor that watches the channel referred to by the given outpoint.
1412		let monitors = self.monitors.read().unwrap();
1413		match monitors.get(&channel_id) {
1414			None => {
1415				let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
1416				log_error!(logger, "Failed to update channel monitor: no such monitor registered");
1417
1418				// We should never ever trigger this from within ChannelManager. Technically a
1419				// user could use this object with some proxying in between which makes this
1420				// possible, but in tests and fuzzing, this should be a panic.
1421				#[cfg(debug_assertions)]
1422				panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
1423				#[cfg(not(debug_assertions))]
1424				ChannelMonitorUpdateStatus::InProgress
1425			},
1426			Some(monitor_state) => {
1427				let monitor = &monitor_state.monitor;
1428				let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1429				log_trace!(
1430					logger,
1431					"Updating ChannelMonitor to id {} for channel {}",
1432					update.update_id,
1433					log_funding_info!(monitor)
1434				);
1435
1436				// We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we
1437				// have well-ordered updates from the users' point of view. See the
1438				// `pending_monitor_updates` docs for more.
1439				let mut pending_monitor_updates =
1440					monitor_state.pending_monitor_updates.lock().unwrap();
1441				let update_res = monitor.update_monitor(
1442					update,
1443					&self.broadcaster,
1444					&self.fee_estimator,
1445					&self.logger,
1446				);
1447
1448				let update_id = update.update_id;
1449				let persist_res = if update_res.is_err() {
1450					// Even if updating the monitor returns an error, the monitor's state will
1451					// still be changed. Therefore, we should persist the updated monitor despite the error.
1452					// We don't want to persist a `monitor_update` which results in a failure to apply later
1453					// while reading `channel_monitor` with updates from storage. Instead, we should persist
1454					// the entire `channel_monitor` here.
1455					log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
1456					self.persister.update_persisted_channel(
1457						monitor.persistence_key(),
1458						None,
1459						monitor,
1460					)
1461				} else {
1462					self.persister.update_persisted_channel(
1463						monitor.persistence_key(),
1464						Some(update),
1465						monitor,
1466					)
1467				};
1468				match persist_res {
1469					ChannelMonitorUpdateStatus::InProgress => {
1470						pending_monitor_updates.push(update_id);
1471						log_debug!(logger,
1472							"Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
1473							update_id,
1474							log_funding_info!(monitor)
1475						);
1476					},
1477					ChannelMonitorUpdateStatus::Completed => {
1478						log_debug!(
1479							logger,
1480							"Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
1481							update_id,
1482							log_funding_info!(monitor)
1483						);
1484					},
1485					ChannelMonitorUpdateStatus::UnrecoverableError => {
1486						// Take the monitors lock for writing so that we poison it and any future
1487						// operations going forward fail immediately.
1488						core::mem::drop(pending_monitor_updates);
1489						core::mem::drop(monitors);
1490						let _poison = self.monitors.write().unwrap();
1491						let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1492						log_error!(logger, "{}", err_str);
1493						panic!("{}", err_str);
1494					},
1495				}
1496
1497				// We may need to start monitoring for any alternative funding transactions.
1498				if let Some(ref chain_source) = self.chain_source {
1499					for (funding_outpoint, funding_script) in
1500						update.internal_renegotiated_funding_data()
1501					{
1502						log_trace!(
1503							logger,
1504							"Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
1505							funding_outpoint
1506						);
1507						chain_source.register_tx(&funding_outpoint.txid, &funding_script);
1508						chain_source.register_output(WatchedOutput {
1509							block_hash: None,
1510							outpoint: funding_outpoint,
1511							script_pubkey: funding_script,
1512						});
1513					}
1514				}
1515
1516				if update_res.is_err() {
1517					ChannelMonitorUpdateStatus::InProgress
1518				} else {
1519					persist_res
1520				}
1521			},
1522		}
1523	}
1524
1525	fn release_pending_monitor_events(
1526		&self,
1527	) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1528		for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1529			let _ = self.channel_monitor_updated(channel_id, update_id);
1530		}
1531		let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
1532		for monitor_state in self.monitors.read().unwrap().values() {
1533			let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
1534			if monitor_events.len() > 0 {
1535				let monitor_funding_txo = monitor_state.monitor.get_funding_txo();
1536				let monitor_channel_id = monitor_state.monitor.channel_id();
1537				let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
1538				pending_monitor_events.push((
1539					monitor_funding_txo,
1540					monitor_channel_id,
1541					monitor_events,
1542					counterparty_node_id,
1543				));
1544			}
1545		}
1546		pending_monitor_events
1547	}
1548}
1549
1550impl<
1551		ChannelSigner: EcdsaChannelSigner,
1552		C: Deref,
1553		T: Deref,
1554		F: Deref,
1555		L: Deref,
1556		P: Deref,
1557		ES: Deref,
1558	> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1559where
1560	C::Target: chain::Filter,
1561	T::Target: BroadcasterInterface,
1562	F::Target: FeeEstimator,
1563	L::Target: Logger,
1564	P::Target: Persist<ChannelSigner>,
1565	ES::Target: EntropySource,
1566{
1567	/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
1568	///
1569	/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
1570	/// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
1571	/// within each channel. As the confirmation of a commitment transaction may be critical to the
1572	/// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
1573	/// environment with spotty connections, like on mobile.
1574	///
1575	/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
1576	/// order to handle these events.
1577	///
1578	/// [`SpendableOutputs`]: events::Event::SpendableOutputs
1579	/// [`BumpTransaction`]: events::Event::BumpTransaction
1580	fn process_pending_events<H: Deref>(&self, handler: H)
1581	where
1582		H::Target: EventHandler,
1583	{
1584		for monitor_state in self.monitors.read().unwrap().values() {
1585			match monitor_state.monitor.process_pending_events(&handler, &self.logger) {
1586				Ok(()) => {},
1587				Err(ReplayEvent()) => {
1588					self.event_notifier.notify();
1589				},
1590			}
1591		}
1592	}
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597	use crate::chain::channelmonitor::ANTI_REORG_DELAY;
1598	use crate::chain::{ChannelMonitorUpdateStatus, Watch};
1599	use crate::events::{ClosureReason, Event};
1600	use crate::ln::functional_test_utils::*;
1601	use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent};
1602	use crate::{check_added_monitors, check_closed_event};
1603	use crate::{expect_payment_path_successful, get_event_msg};
1604	use crate::{get_htlc_update_msgs, get_revoke_commit_msgs};
1605
1606	const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5;
1607
1608	#[test]
1609	fn test_async_ooo_offchain_updates() {
1610		// Test that if we have multiple offchain updates being persisted and they complete
1611		// out-of-order, the ChainMonitor waits until all have completed before informing the
1612		// ChannelManager.
1613		let chanmon_cfgs = create_chanmon_cfgs(2);
1614		let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1615		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1616		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1617		let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
1618
1619		let node_a_id = nodes[0].node.get_our_node_id();
1620		let node_b_id = nodes[1].node.get_our_node_id();
1621
1622		// Route two payments to be claimed at the same time.
1623		let (payment_preimage_1, payment_hash_1, ..) =
1624			route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
1625		let (payment_preimage_2, payment_hash_2, ..) =
1626			route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
1627
1628		chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
1629		chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
1630		chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
1631
1632		nodes[1].node.claim_funds(payment_preimage_1);
1633		check_added_monitors!(nodes[1], 1);
1634		nodes[1].node.claim_funds(payment_preimage_2);
1635		check_added_monitors!(nodes[1], 1);
1636
1637		let persistences =
1638			chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
1639		assert_eq!(persistences.len(), 1);
1640		let (_, updates) = persistences.iter().next().unwrap();
1641		assert_eq!(updates.len(), 2);
1642
1643		// Note that updates is a HashMap so the ordering here is actually random. This shouldn't
1644		// fail either way but if it fails intermittently it's depending on the ordering of updates.
1645		let mut update_iter = updates.iter();
1646		let next_update = update_iter.next().unwrap().clone();
1647		let node_b_mon = &nodes[1].chain_monitor.chain_monitor;
1648
1649		// Should contain next_update when pending updates listed.
1650		let pending_updates = node_b_mon.list_pending_monitor_updates();
1651		#[cfg(not(c_bindings))]
1652		let pending_chan_updates = pending_updates.get(&channel_id).unwrap();
1653		#[cfg(c_bindings)]
1654		let pending_chan_updates =
1655			&pending_updates.iter().find(|(chan_id, _)| *chan_id == channel_id).unwrap().1;
1656		assert!(pending_chan_updates.contains(&next_update));
1657
1658		node_b_mon.channel_monitor_updated(channel_id, next_update.clone()).unwrap();
1659
1660		// Should not contain the previously pending next_update when pending updates listed.
1661		let pending_updates = node_b_mon.list_pending_monitor_updates();
1662		#[cfg(not(c_bindings))]
1663		let pending_chan_updates = pending_updates.get(&channel_id).unwrap();
1664		#[cfg(c_bindings)]
1665		let pending_chan_updates =
1666			&pending_updates.iter().find(|(chan_id, _)| *chan_id == channel_id).unwrap().1;
1667		assert!(!pending_chan_updates.contains(&next_update));
1668
1669		assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
1670		assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
1671		assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
1672
1673		let next_update = update_iter.next().unwrap().clone();
1674		node_b_mon.channel_monitor_updated(channel_id, next_update).unwrap();
1675
1676		let claim_events = nodes[1].node.get_and_clear_pending_events();
1677		assert_eq!(claim_events.len(), 2);
1678		match claim_events[0] {
1679			Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
1680				assert_eq!(payment_hash_1, *payment_hash);
1681			},
1682			_ => panic!("Unexpected event"),
1683		}
1684		match claim_events[1] {
1685			Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
1686				assert_eq!(payment_hash_2, *payment_hash);
1687			},
1688			_ => panic!("Unexpected event"),
1689		}
1690
1691		// Now manually walk the commitment signed dance - because we claimed two payments
1692		// back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
1693
1694		let mut updates = get_htlc_update_msgs!(nodes[1], node_a_id);
1695		nodes[0].node.handle_update_fulfill_htlc(node_b_id, updates.update_fulfill_htlcs.remove(0));
1696		expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
1697		nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &updates.commitment_signed);
1698		check_added_monitors!(nodes[0], 1);
1699		let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], node_b_id);
1700
1701		nodes[1].node.handle_revoke_and_ack(node_a_id, &as_first_raa);
1702		check_added_monitors!(nodes[1], 1);
1703		let mut bs_2nd_updates = get_htlc_update_msgs!(nodes[1], node_a_id);
1704		nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_first_update);
1705		check_added_monitors!(nodes[1], 1);
1706		let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id);
1707
1708		nodes[0]
1709			.node
1710			.handle_update_fulfill_htlc(node_b_id, bs_2nd_updates.update_fulfill_htlcs.remove(0));
1711		expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
1712		nodes[0]
1713			.node
1714			.handle_commitment_signed_batch_test(node_b_id, &bs_2nd_updates.commitment_signed);
1715		check_added_monitors!(nodes[0], 1);
1716		nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_first_raa);
1717		expect_payment_path_successful!(nodes[0]);
1718		check_added_monitors!(nodes[0], 1);
1719		let (as_second_raa, as_second_update) = get_revoke_commit_msgs!(nodes[0], node_b_id);
1720
1721		nodes[1].node.handle_revoke_and_ack(node_a_id, &as_second_raa);
1722		check_added_monitors!(nodes[1], 1);
1723		nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_second_update);
1724		check_added_monitors!(nodes[1], 1);
1725		let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id);
1726
1727		nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_second_raa);
1728		expect_payment_path_successful!(nodes[0]);
1729		check_added_monitors!(nodes[0], 1);
1730	}
1731
1732	#[test]
1733	fn test_chainsync_triggers_distributed_monitor_persistence() {
1734		let chanmon_cfgs = create_chanmon_cfgs(3);
1735		let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
1736		let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
1737		let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
1738
1739		let node_a_id = nodes[0].node.get_our_node_id();
1740		let node_c_id = nodes[2].node.get_our_node_id();
1741
1742		// Use FullBlockViaListen to avoid duplicate calls to process_chain_data and skips_blocks() in
1743		// case of other connect_styles.
1744		*nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1745		*nodes[1].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1746		*nodes[2].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1747
1748		let _channel_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
1749		let channel_2 =
1750			create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 1_000_000, 0).2;
1751
1752		chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1753		chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1754		chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1755
1756		connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1757		connect_blocks(&nodes[1], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1758		connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1759
1760		// Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] * 2 blocks should trigger only 2 writes
1761		// per monitor/channel.
1762		assert_eq!(
1763			2 * 2,
1764			chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1765		);
1766		assert_eq!(
1767			2,
1768			chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1769		);
1770		assert_eq!(
1771			2,
1772			chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1773		);
1774
1775		// Test that monitors with pending_claims are persisted on every block.
1776		// Now, close channel_2 i.e. b/w node-0 and node-2 to create pending_claim in node[0].
1777		let message = "Channel force-closed".to_owned();
1778		nodes[0]
1779			.node
1780			.force_close_broadcasting_latest_txn(&channel_2, &node_c_id, message.clone())
1781			.unwrap();
1782		let closure_reason =
1783			ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
1784		check_closed_event!(&nodes[0], 1, closure_reason, false, [node_c_id], 1000000);
1785		check_closed_broadcast(&nodes[0], 1, true);
1786		let close_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
1787		assert_eq!(close_tx.len(), 1);
1788
1789		mine_transaction(&nodes[2], &close_tx[0]);
1790		check_closed_broadcast(&nodes[2], 1, true);
1791		check_added_monitors(&nodes[2], 1);
1792		let closure_reason = ClosureReason::CommitmentTxConfirmed;
1793		check_closed_event!(&nodes[2], 1, closure_reason, false, [node_a_id], 1000000);
1794
1795		chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1796		chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1797
1798		// For channel_2, there should be a monitor write for every block connection.
1799		// We connect [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`] blocks since we don't know when
1800		// channel_1 monitor persistence will occur, with [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`]
1801		// it will be persisted exactly once.
1802		connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1803		connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1804
1805		// DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR writes for channel_2 due to pending_claim, 1 for
1806		// channel_1
1807		assert_eq!(
1808			(CHAINSYNC_MONITOR_PARTITION_FACTOR + 1) as usize,
1809			chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1810		);
1811		// For node[2], there is no pending_claim
1812		assert_eq!(
1813			1,
1814			chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1815		);
1816
1817		// Confirm claim for node[0] with ANTI_REORG_DELAY and reset monitor write counter.
1818		mine_transaction(&nodes[0], &close_tx[0]);
1819		connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
1820		check_added_monitors(&nodes[0], 1);
1821		chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1822
1823		// Again connect 1 full cycle of DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR blocks, it should only
1824		// result in 1 write per monitor/channel.
1825		connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1826		assert_eq!(
1827			2,
1828			chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1829		);
1830	}
1831
1832	#[test]
1833	#[cfg(feature = "std")]
1834	fn update_during_chainsync_poisons_channel() {
1835		let chanmon_cfgs = create_chanmon_cfgs(2);
1836		let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1837		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1838		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1839		create_announced_chan_between_nodes(&nodes, 0, 1);
1840		*nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1841
1842		chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);
1843
1844		assert!(std::panic::catch_unwind(|| {
1845			// Returning an UnrecoverableError should always panic immediately
1846			// Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] blocks so that we trigger some persistence
1847			// after accounting for block-height based partitioning/distribution.
1848			connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1849		})
1850		.is_err());
1851		assert!(std::panic::catch_unwind(|| {
1852			// ...and also poison our locks causing later use to panic as well
1853			core::mem::drop(nodes);
1854		})
1855		.is_err());
1856	}
1857}