lightning/util/
persist.rs

1// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4// You may not use this file except in accordance with one or both of these
5// licenses.
6
7//! This module contains a simple key-value store trait [`KVStoreSync`] that
8//! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
9//! and [`ChannelMonitor`] all in one place.
10//!
11//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
12//! [`NetworkGraph`]: crate::routing::gossip::NetworkGraph
13
14use alloc::sync::Arc;
15
16use bitcoin::hashes::hex::FromHex;
17use bitcoin::{BlockHash, Txid};
18
19use core::future::Future;
20use core::mem;
21use core::ops::Deref;
22use core::pin::Pin;
23use core::str::FromStr;
24use core::task;
25
26use crate::prelude::*;
27use crate::{io, log_error};
28
29use crate::chain;
30use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
31use crate::chain::chainmonitor::Persist;
32use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
33use crate::chain::transaction::OutPoint;
34use crate::ln::types::ChannelId;
35use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
36use crate::sync::Mutex;
37use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync};
38use crate::util::logger::Logger;
39use crate::util::native_async::FutureSpawner;
40use crate::util::ser::{Readable, ReadableArgs, Writeable};
41use crate::util::wakers::Notifier;
42
43/// The alphabet of characters allowed for namespaces and keys.
44pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
45	"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
46
47/// The maximum number of characters namespaces and keys may have.
48pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
49
50/// The primary namespace under which the [`ChannelManager`] will be persisted.
51///
52/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
53pub const CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
54/// The secondary namespace under which the [`ChannelManager`] will be persisted.
55///
56/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
57pub const CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
58/// The key under which the [`ChannelManager`] will be persisted.
59///
60/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
61pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
62
63/// The primary namespace under which [`ChannelMonitor`]s will be persisted.
64pub const CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitors";
65/// The secondary namespace under which [`ChannelMonitor`]s will be persisted.
66pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
67/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
68pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
69
70/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
71pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
72/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
73pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
74
75/// The primary namespace under which the [`NetworkGraph`] will be persisted.
76///
77/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph
78pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
79/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
80///
81/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph
82pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
83/// The key under which the [`NetworkGraph`] will be persisted.
84///
85/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph
86pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
87
88/// The primary namespace under which the [`WriteableScore`] will be persisted.
89///
90/// [`WriteableScore`]: crate::routing::scoring::WriteableScore
91pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
92/// The secondary namespace under which the [`WriteableScore`] will be persisted.
93///
94/// [`WriteableScore`]: crate::routing::scoring::WriteableScore
95pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
96/// The key under which the [`WriteableScore`] will be persisted.
97///
98/// [`WriteableScore`]: crate::routing::scoring::WriteableScore
99pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
100
101/// The primary namespace under which [`OutputSweeper`] state will be persisted.
102///
103/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper
104pub const OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
105/// The secondary namespace under which [`OutputSweeper`] state will be persisted.
106///
107/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper
108pub const OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
109/// The secondary namespace under which [`OutputSweeper`] state will be persisted.
110/// The key under which [`OutputSweeper`] state will be persisted.
111///
112/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper
113pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper";
114
115/// A sentinel value to be prepended to monitors persisted by the [`MonitorUpdatingPersister`].
116///
117/// This serves to prevent someone from accidentally loading such monitors (which may need
118/// updates applied to be current) with another implementation.
119pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
120
121/// Provides an interface that allows storage and retrieval of persisted values that are associated
122/// with given keys.
123///
124/// In order to avoid collisions the key space is segmented based on the given `primary_namespace`s
125/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different
126/// ways, as long as per-namespace key uniqueness is asserted.
127///
128/// Keys and namespaces are required to be valid ASCII strings in the range of
129/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty
130/// primary namespaces and secondary namespaces (`""`) are assumed to be a valid, however, if
131/// `primary_namespace` is empty, `secondary_namespace` is required to be empty, too. This means
132/// that concerns should always be separated by primary namespace first, before secondary
133/// namespaces are used. While the number of primary namespaces will be relatively small and is
134/// determined at compile time, there may be many secondary namespaces per primary namespace. Note
135/// that per-namespace uniqueness needs to also hold for keys *and* namespaces in any given
136/// namespace, i.e., conflicts between keys and equally named
137/// primary namespaces/secondary namespaces must be avoided.
138///
139/// **Note:** Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister`
140/// interface can use a concatenation of `[{primary_namespace}/[{secondary_namespace}/]]{key}` to
141/// recover a `key` compatible with the data model previously assumed by `KVStorePersister::persist`.
142///
143/// For an asynchronous version of this trait, see [`KVStore`].
144// Note that updates to documentation on this trait should be copied to the asynchronous version.
145pub trait KVStoreSync {
146	/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and
147	/// `key`.
148	///
149	/// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given
150	/// `primary_namespace` and `secondary_namespace`.
151	///
152	/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
153	fn read(
154		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
155	) -> Result<Vec<u8>, io::Error>;
156	/// Persists the given data under the given `key`.
157	///
158	/// Will create the given `primary_namespace` and `secondary_namespace` if not already present in the store.
159	fn write(
160		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
161	) -> Result<(), io::Error>;
162	/// Removes any data that had previously been persisted under the given `key`.
163	///
164	/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
165	/// remove the given `key` at some point in time after the method returns, e.g., as part of an
166	/// eventual batch deletion of multiple keys. As a consequence, subsequent calls to
167	/// [`KVStoreSync::list`] might include the removed key until the changes are actually persisted.
168	///
169	/// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent
170	/// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could
171	/// potentially get lost on crash after the method returns. Therefore, this flag should only be
172	/// set for `remove` operations that can be safely replayed at a later time.
173	///
174	/// All removal operations must complete in a consistent total order with [`Self::write`]s
175	/// to the same key. Whether a removal operation is `lazy` or not, [`Self::write`] operations
176	/// to the same key which occur before a removal completes must cancel/overwrite the pending
177	/// removal.
178	///
179	/// Returns successfully if no data will be stored for the given `primary_namespace`,
180	/// `secondary_namespace`, and `key`, independently of whether it was present before its
181	/// invokation or not.
182	fn remove(
183		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
184	) -> Result<(), io::Error>;
185	/// Returns a list of keys that are stored under the given `secondary_namespace` in
186	/// `primary_namespace`.
187	///
188	/// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
189	/// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
190	fn list(
191		&self, primary_namespace: &str, secondary_namespace: &str,
192	) -> Result<Vec<String>, io::Error>;
193}
194
195/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. It is not necessary to use this type
196/// directly.
197#[derive(Clone)]
198pub struct KVStoreSyncWrapper<K: Deref>(pub K)
199where
200	K::Target: KVStoreSync;
201
202impl<K: Deref> Deref for KVStoreSyncWrapper<K>
203where
204	K::Target: KVStoreSync,
205{
206	type Target = Self;
207	fn deref(&self) -> &Self::Target {
208		self
209	}
210}
211
212/// This is not exported to bindings users as async is only supported in Rust.
213impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
214where
215	K::Target: KVStoreSync,
216{
217	fn read(
218		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
219	) -> AsyncResult<'static, Vec<u8>, io::Error> {
220		let res = self.0.read(primary_namespace, secondary_namespace, key);
221
222		Box::pin(async move { res })
223	}
224
225	fn write(
226		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
227	) -> AsyncResult<'static, (), io::Error> {
228		let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
229
230		Box::pin(async move { res })
231	}
232
233	fn remove(
234		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
235	) -> AsyncResult<'static, (), io::Error> {
236		let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
237
238		Box::pin(async move { res })
239	}
240
241	fn list(
242		&self, primary_namespace: &str, secondary_namespace: &str,
243	) -> AsyncResult<'static, Vec<String>, io::Error> {
244		let res = self.0.list(primary_namespace, secondary_namespace);
245
246		Box::pin(async move { res })
247	}
248}
249
250/// Provides an interface that allows storage and retrieval of persisted values that are associated
251/// with given keys.
252///
253/// In order to avoid collisions the key space is segmented based on the given `primary_namespace`s
254/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different
255/// ways, as long as per-namespace key uniqueness is asserted.
256///
257/// Keys and namespaces are required to be valid ASCII strings in the range of
258/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty
259/// primary namespaces and secondary namespaces (`""`) are assumed to be a valid, however, if
260/// `primary_namespace` is empty, `secondary_namespace` is required to be empty, too. This means
261/// that concerns should always be separated by primary namespace first, before secondary
262/// namespaces are used. While the number of primary namespaces will be relatively small and is
263/// determined at compile time, there may be many secondary namespaces per primary namespace. Note
264/// that per-namespace uniqueness needs to also hold for keys *and* namespaces in any given
265/// namespace, i.e., conflicts between keys and equally named
266/// primary namespaces/secondary namespaces must be avoided.
267///
268/// **Note:** Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister`
269/// interface can use a concatenation of `[{primary_namespace}/[{secondary_namespace}/]]{key}` to
270/// recover a `key` compatible with the data model previously assumed by `KVStorePersister::persist`.
271///
272/// For a synchronous version of this trait, see [`KVStoreSync`].
273///
274/// This is not exported to bindings users as async is only supported in Rust.
275// Note that updates to documentation on this trait should be copied to the synchronous version.
276pub trait KVStore {
277	/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and
278	/// `key`.
279	///
280	/// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given
281	/// `primary_namespace` and `secondary_namespace`.
282	///
283	/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
284	fn read(
285		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
286	) -> AsyncResult<'static, Vec<u8>, io::Error>;
287	/// Persists the given data under the given `key`.
288	///
289	/// The order of multiple writes to the same key needs to be retained while persisting
290	/// asynchronously. In other words, if two writes to the same key occur, the state (as seen by
291	/// [`Self::read`]) must either see the first write then the second, or only ever the second,
292	/// no matter when the futures complete (and must always contain the second write once the
293	/// second future completes). The state should never contain the first write after the second
294	/// write's future completes, nor should it contain the second write, then contain the first
295	/// write at any point thereafter (even if the second write's future hasn't yet completed).
296	///
297	/// One way to ensure this requirement is met is by assigning a version number to each write
298	/// before returning the future, and then during asynchronous execution, ensuring that the
299	/// writes are executed in the correct order.
300	///
301	/// Note that no ordering requirements exist for writes to different keys.
302	///
303	/// Will create the given `primary_namespace` and `secondary_namespace` if not already present in the store.
304	fn write(
305		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
306	) -> AsyncResult<'static, (), io::Error>;
307	/// Removes any data that had previously been persisted under the given `key`.
308	///
309	/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
310	/// remove the given `key` at some point in time after the method returns, e.g., as part of an
311	/// eventual batch deletion of multiple keys. As a consequence, subsequent calls to
312	/// [`KVStoreSync::list`] might include the removed key until the changes are actually persisted.
313	///
314	/// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent
315	/// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could
316	/// potentially get lost on crash after the method returns. Therefore, this flag should only be
317	/// set for `remove` operations that can be safely replayed at a later time.
318	///
319	/// All removal operations must complete in a consistent total order with [`Self::write`]s
320	/// to the same key. Whether a removal operation is `lazy` or not, [`Self::write`] operations
321	/// to the same key which occur before a removal completes must cancel/overwrite the pending
322	/// removal.
323	///
324	/// Returns successfully if no data will be stored for the given `primary_namespace`,
325	/// `secondary_namespace`, and `key`, independently of whether it was present before its
326	/// invokation or not.
327	fn remove(
328		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
329	) -> AsyncResult<'static, (), io::Error>;
330	/// Returns a list of keys that are stored under the given `secondary_namespace` in
331	/// `primary_namespace`.
332	///
333	/// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
334	/// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
335	fn list(
336		&self, primary_namespace: &str, secondary_namespace: &str,
337	) -> AsyncResult<'static, Vec<String>, io::Error>;
338}
339
340/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`]
341/// data migration.
342pub trait MigratableKVStore: KVStoreSync {
343	/// Returns *all* known keys as a list of `primary_namespace`, `secondary_namespace`, `key` tuples.
344	///
345	/// This is useful for migrating data from [`KVStoreSync`] implementation to [`KVStoreSync`]
346	/// implementation.
347	///
348	/// Must exhaustively return all entries known to the store to ensure no data is missed, but
349	/// may return the items in arbitrary order.
350	fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, io::Error>;
351}
352
353/// Migrates all data from one store to another.
354///
355/// This operation assumes that `target_store` is empty, i.e., any data present under copied keys
356/// might get overriden. User must ensure `source_store` is not modified during operation,
357/// otherwise no consistency guarantees can be given.
358///
359/// Will abort and return an error if any IO operation fails. Note that in this case the
360/// `target_store` might get left in an intermediate state.
361pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
362	source_store: &mut S, target_store: &mut T,
363) -> Result<(), io::Error> {
364	let keys_to_migrate = source_store.list_all_keys()?;
365
366	for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
367		let data = source_store.read(primary_namespace, secondary_namespace, key)?;
368		target_store.write(primary_namespace, secondary_namespace, key, data)?;
369	}
370
371	Ok(())
372}
373
374impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<ChannelSigner> for K {
375	// TODO: We really need a way for the persister to inform the user that its time to crash/shut
376	// down once these start returning failure.
377	// Then we should return InProgress rather than UnrecoverableError, implying we should probably
378	// just shut down the node since we're not retrying persistence!
379
380	fn persist_new_channel(
381		&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
382	) -> chain::ChannelMonitorUpdateStatus {
383		match self.write(
384			CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
385			CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
386			&monitor_name.to_string(),
387			monitor.encode(),
388		) {
389			Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
390			Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
391		}
392	}
393
394	fn update_persisted_channel(
395		&self, monitor_name: MonitorName, _update: Option<&ChannelMonitorUpdate>,
396		monitor: &ChannelMonitor<ChannelSigner>,
397	) -> chain::ChannelMonitorUpdateStatus {
398		match self.write(
399			CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
400			CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
401			&monitor_name.to_string(),
402			monitor.encode(),
403		) {
404			Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
405			Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
406		}
407	}
408
409	fn archive_persisted_channel(&self, monitor_name: MonitorName) {
410		let monitor_key = monitor_name.to_string();
411		let monitor = match self.read(
412			CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
413			CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
414			monitor_key.as_str(),
415		) {
416			Ok(monitor) => monitor,
417			Err(_) => return,
418		};
419		match self.write(
420			ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
421			ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
422			monitor_key.as_str(),
423			monitor,
424		) {
425			Ok(()) => {},
426			Err(_e) => return,
427		};
428		let _ = self.remove(
429			CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
430			CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
431			monitor_key.as_str(),
432			true,
433		);
434	}
435}
436
437/// Read previously persisted [`ChannelMonitor`]s from the store.
438pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
439	kv_store: K, entropy_source: ES, signer_provider: SP,
440) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
441where
442	K::Target: KVStoreSync,
443	ES::Target: EntropySource + Sized,
444	SP::Target: SignerProvider + Sized,
445{
446	let mut res = Vec::new();
447
448	for stored_key in kv_store.list(
449		CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
450		CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
451	)? {
452		match <Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>>::read(
453			&mut io::Cursor::new(kv_store.read(
454				CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
455				CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
456				&stored_key,
457			)?),
458			(&*entropy_source, &*signer_provider),
459		) {
460			Ok(Some((block_hash, channel_monitor))) => {
461				let monitor_name = MonitorName::from_str(&stored_key)?;
462				if channel_monitor.persistence_key() != monitor_name {
463					return Err(io::Error::new(
464						io::ErrorKind::InvalidData,
465						"ChannelMonitor was stored under the wrong key",
466					));
467				}
468
469				res.push((block_hash, channel_monitor));
470			},
471			Ok(None) => {},
472			Err(_) => {
473				return Err(io::Error::new(
474					io::ErrorKind::InvalidData,
475					"Failed to read ChannelMonitor",
476				))
477			},
478		}
479	}
480	Ok(res)
481}
482
483struct PanicingSpawner;
484impl FutureSpawner for PanicingSpawner {
485	fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
486		unreachable!();
487	}
488}
489
490fn poll_sync_future<F: Future>(future: F) -> F::Output {
491	let mut waker = dummy_waker();
492	let mut ctx = task::Context::from_waker(&mut waker);
493	// TODO A future MSRV bump to 1.68 should allow for the pin macro
494	match Pin::new(&mut Box::pin(future)).poll(&mut ctx) {
495		task::Poll::Ready(result) => result,
496		task::Poll::Pending => {
497			// In a sync context, we can't wait for the future to complete.
498			unreachable!("Sync KVStore-derived futures can not be pending in a sync context");
499		},
500	}
501}
502
503/// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and
504/// [`ChannelMonitorUpdate`]s.
505///
506/// # Overview
507///
508/// The main benefit this provides over the [`KVStoreSync`]'s [`Persist`] implementation is decreased
509/// I/O bandwidth and storage churn, at the expense of more IOPS (including listing, reading, and
510/// deleting) and complexity. This is because it writes channel monitor differential updates,
511/// whereas the other (default) implementation rewrites the entire monitor on each update. For
512/// routing nodes, updates can happen many times per second to a channel, and monitors can be tens
513/// of megabytes (or more). Updates can be as small as a few hundred bytes.
514///
515/// Note that monitors written with `MonitorUpdatingPersister` are _not_ backward-compatible with
516/// the default [`KVStoreSync`]'s [`Persist`] implementation. They have a prepended byte sequence,
517/// [`MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL`], applied to prevent deserialization with other
518/// persisters. This is because monitors written by this struct _may_ have unapplied updates. In
519/// order to downgrade, you must ensure that all updates are applied to the monitor, and remove the
520/// sentinel bytes.
521///
522/// # Storing monitors
523///
524/// Monitors are stored by implementing the [`Persist`] trait, which has two functions:
525///
526///   - [`Persist::persist_new_channel`], which persists whole [`ChannelMonitor`]s.
527///   - [`Persist::update_persisted_channel`], which persists only a [`ChannelMonitorUpdate`]
528///
529/// Whole [`ChannelMonitor`]s are stored in the [`CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE`],
530/// using the familiar encoding of an [`OutPoint`] (e.g., `[SOME-64-CHAR-HEX-STRING]_1`) for v1
531/// channels or a [`ChannelId`] (e.g., `[SOME-64-CHAR-HEX-STRING]`) for v2 channels.
532///
533/// Each [`ChannelMonitorUpdate`] is stored in a dynamic secondary namespace, as follows:
534///
535///   - primary namespace: [`CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE`]
536///   - secondary namespace: [the monitor's encoded outpoint or channel id name]
537///
538/// Under that secondary namespace, each update is stored with a number string, like `21`, which
539/// represents its `update_id` value.
540///
541/// For example, consider this channel, named for its transaction ID and index, or [`OutPoint`]:
542///
543///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
544///   - Index: `1`
545///
546/// Full channel monitors would be stored at a single key:
547///
548/// `[CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
549///
550/// Updates would be stored as follows (with `/` delimiting primary_namespace/secondary_namespace/key):
551///
552/// ```text
553/// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/1
554/// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/2
555/// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/3
556/// ```
557/// ... and so on.
558///
559/// # Reading channel state from storage
560///
561/// Channel state can be reconstructed by calling
562/// [`MonitorUpdatingPersister::read_all_channel_monitors_with_updates`]. Alternatively, users can
563/// list channel monitors themselves and load channels individually using
564/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
565///
566/// ## EXTREMELY IMPORTANT
567///
568/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
569/// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
570/// that circumstance (not when there is really a permissions error, for example). This is because
571/// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
572/// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
573/// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
574///
575/// # Pruning stale channel updates
576///
577/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
578/// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
579/// are deleted.
580/// The `lazy` flag is used on the [`KVStoreSync::remove`] method, so there are no guarantees that the deletions
581/// will complete. However, stale updates are not a problem for data integrity, since updates are
582/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
583///
584/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
585/// would like to get rid of them, consider using the
586/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
587pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>(
588	MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, PanicingSpawner, L, ES, SP, BI, FE>,
589)
590where
591	K::Target: KVStoreSync,
592	L::Target: Logger,
593	ES::Target: EntropySource + Sized,
594	SP::Target: SignerProvider + Sized,
595	BI::Target: BroadcasterInterface,
596	FE::Target: FeeEstimator;
597
598impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
599	MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
600where
601	K::Target: KVStoreSync,
602	L::Target: Logger,
603	ES::Target: EntropySource + Sized,
604	SP::Target: SignerProvider + Sized,
605	BI::Target: BroadcasterInterface,
606	FE::Target: FeeEstimator,
607{
608	/// Constructs a new [`MonitorUpdatingPersister`].
609	///
610	/// The `maximum_pending_updates` parameter controls how many updates may be stored before a
611	/// [`MonitorUpdatingPersister`] consolidates updates by writing a full monitor. Note that
612	/// consolidation will frequently occur with fewer updates than what you set here; this number
613	/// is merely the maximum that may be stored. When setting this value, consider that for higher
614	/// values of `maximum_pending_updates`:
615	///
616	///   - [`MonitorUpdatingPersister`] will tend to write more [`ChannelMonitorUpdate`]s than
617	/// [`ChannelMonitor`]s, approaching one [`ChannelMonitor`] write for every
618	/// `maximum_pending_updates` [`ChannelMonitorUpdate`]s.
619	///   - [`MonitorUpdatingPersister`] will issue deletes differently. Lazy deletes will come in
620	/// "waves" for each [`ChannelMonitor`] write. A larger `maximum_pending_updates` means bigger,
621	/// less frequent "waves."
622	///   - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run
623	/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
624	///
625	/// Note that you can disable the update-writing entirely by setting `maximum_pending_updates`
626	/// to zero, causing this [`Persist`] implementation to behave like the blanket [`Persist`]
627	/// implementation for all [`KVStoreSync`]s.
628	pub fn new(
629		kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
630		signer_provider: SP, broadcaster: BI, fee_estimator: FE,
631	) -> Self {
632		// Note that calling the spawner only happens in the `pub(crate)` `spawn_*` methods defined
633		// with additional bounds on `MonitorUpdatingPersisterAsync`. Thus its safe to provide a
634		// dummy always-panic implementation here.
635		MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
636			KVStoreSyncWrapper(kv_store),
637			PanicingSpawner,
638			logger,
639			maximum_pending_updates,
640			entropy_source,
641			signer_provider,
642			broadcaster,
643			fee_estimator,
644		))
645	}
646
647	/// Reads all stored channel monitors, along with any stored updates for them.
648	///
649	/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
650	/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
651	/// documentation for [`MonitorUpdatingPersister`].
652	pub fn read_all_channel_monitors_with_updates(
653		&self,
654	) -> Result<
655		Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
656		io::Error,
657	> {
658		poll_sync_future(self.0.read_all_channel_monitors_with_updates())
659	}
660
661	/// Read a single channel monitor, along with any stored updates for it.
662	///
663	/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
664	/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
665	/// documentation for [`MonitorUpdatingPersister`].
666	///
667	/// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
668	/// underscore `_` between txid and index for v1 channels. For example, given:
669	///
670	///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
671	///   - Index: `1`
672	///
673	/// The correct `monitor_key` would be:
674	/// `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
675	///
676	/// For v2 channels, the hex-encoded [`ChannelId`] is used directly for `monitor_key` instead.
677	///
678	/// Loading a large number of monitors will be faster if done in parallel. You can use this
679	/// function to accomplish this. Take care to limit the number of parallel readers.
680	pub fn read_channel_monitor_with_updates(
681		&self, monitor_key: &str,
682	) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
683	{
684		poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key))
685	}
686
687	/// Cleans up stale updates for all monitors.
688	///
689	/// This function works by first listing all monitors, and then for each of them, listing all
690	/// updates. The updates that have an `update_id` less than or equal to than the stored monitor
691	/// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
692	/// be passed to [`KVStoreSync::remove`].
693	pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
694		poll_sync_future(self.0.cleanup_stale_updates(lazy))
695	}
696}
697
698impl<
699		ChannelSigner: EcdsaChannelSigner,
700		K: Deref,
701		L: Deref,
702		ES: Deref,
703		SP: Deref,
704		BI: Deref,
705		FE: Deref,
706	> Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
707where
708	K::Target: KVStoreSync,
709	L::Target: Logger,
710	ES::Target: EntropySource + Sized,
711	SP::Target: SignerProvider + Sized,
712	BI::Target: BroadcasterInterface,
713	FE::Target: FeeEstimator,
714{
715	/// Persists a new channel. This means writing the entire monitor to the
716	/// parametrized [`KVStoreSync`].
717	fn persist_new_channel(
718		&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
719	) -> chain::ChannelMonitorUpdateStatus {
720		let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor));
721		match res {
722			Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
723			Err(e) => {
724				log_error!(
725					self.0 .0.logger,
726					"Failed to write ChannelMonitor {}/{}/{} reason: {}",
727					CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
728					CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
729					monitor_name,
730					e
731				);
732				chain::ChannelMonitorUpdateStatus::UnrecoverableError
733			},
734		}
735	}
736
737	/// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible.
738	///
739	/// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]:
740	///
741	///   - No full monitor is found in [`KVStoreSync`]
742	///   - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`]
743	///   - LDK commands re-persisting the entire monitor through this function, specifically when
744	///	    `update` is `None`.
745	///   - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK.
746	fn update_persisted_channel(
747		&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
748		monitor: &ChannelMonitor<ChannelSigner>,
749	) -> chain::ChannelMonitorUpdateStatus {
750		let inner = Arc::clone(&self.0 .0);
751		let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor));
752		match res {
753			Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
754			Err(e) => {
755				log_error!(
756					self.0 .0.logger,
757					"Failed to write ChannelMonitorUpdate {} id {} reason: {}",
758					monitor_name,
759					update.as_ref().map(|upd| upd.update_id).unwrap_or(0),
760					e
761				);
762				chain::ChannelMonitorUpdateStatus::UnrecoverableError
763			},
764		}
765	}
766
767	fn archive_persisted_channel(&self, monitor_name: MonitorName) {
768		poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name));
769	}
770}
771
772/// A variant of the [`MonitorUpdatingPersister`] which utilizes the async [`KVStore`] and offers
773/// async versions of the public accessors.
774///
775/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
776///
777/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
778/// directly by the [`ChainMonitor`] via [`ChainMonitor::new_async_beta`].
779///
780/// This is not exported to bindings users as async is only supported in Rust.
781///
782/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
783/// [`ChainMonitor::new_async_beta`]: crate::chain::chainmonitor::ChainMonitor::new_async_beta
784pub struct MonitorUpdatingPersisterAsync<
785	K: Deref,
786	S: FutureSpawner,
787	L: Deref,
788	ES: Deref,
789	SP: Deref,
790	BI: Deref,
791	FE: Deref,
792>(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
793where
794	K::Target: KVStore,
795	L::Target: Logger,
796	ES::Target: EntropySource + Sized,
797	SP::Target: SignerProvider + Sized,
798	BI::Target: BroadcasterInterface,
799	FE::Target: FeeEstimator;
800
801struct MonitorUpdatingPersisterAsyncInner<
802	K: Deref,
803	S: FutureSpawner,
804	L: Deref,
805	ES: Deref,
806	SP: Deref,
807	BI: Deref,
808	FE: Deref,
809> where
810	K::Target: KVStore,
811	L::Target: Logger,
812	ES::Target: EntropySource + Sized,
813	SP::Target: SignerProvider + Sized,
814	BI::Target: BroadcasterInterface,
815	FE::Target: FeeEstimator,
816{
817	kv_store: K,
818	async_completed_updates: Mutex<Vec<(ChannelId, u64)>>,
819	future_spawner: S,
820	logger: L,
821	maximum_pending_updates: u64,
822	entropy_source: ES,
823	signer_provider: SP,
824	broadcaster: BI,
825	fee_estimator: FE,
826}
827
828impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
829	MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
830where
831	K::Target: KVStore,
832	L::Target: Logger,
833	ES::Target: EntropySource + Sized,
834	SP::Target: SignerProvider + Sized,
835	BI::Target: BroadcasterInterface,
836	FE::Target: FeeEstimator,
837{
838	/// Constructs a new [`MonitorUpdatingPersisterAsync`].
839	///
840	/// See [`MonitorUpdatingPersister::new`] for more info.
841	pub fn new(
842		kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64,
843		entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE,
844	) -> Self {
845		MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
846			kv_store,
847			async_completed_updates: Mutex::new(Vec::new()),
848			future_spawner,
849			logger,
850			maximum_pending_updates,
851			entropy_source,
852			signer_provider,
853			broadcaster,
854			fee_estimator,
855		}))
856	}
857
858	/// Reads all stored channel monitors, along with any stored updates for them.
859	///
860	/// It is extremely important that your [`KVStore::read`] implementation uses the
861	/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
862	/// documentation for [`MonitorUpdatingPersister`].
863	pub async fn read_all_channel_monitors_with_updates(
864		&self,
865	) -> Result<
866		Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
867		io::Error,
868	> {
869		let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
870		let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
871		let monitor_list = self.0.kv_store.list(primary, secondary).await?;
872		let mut res = Vec::with_capacity(monitor_list.len());
873		for monitor_key in monitor_list {
874			let result =
875				self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await?;
876			if let Some(read_res) = result {
877				res.push(read_res);
878			}
879		}
880		Ok(res)
881	}
882
883	/// Read a single channel monitor, along with any stored updates for it.
884	///
885	/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
886	/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
887	/// documentation for [`MonitorUpdatingPersister`].
888	///
889	/// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
890	/// underscore `_` between txid and index for v1 channels. For example, given:
891	///
892	///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
893	///   - Index: `1`
894	///
895	/// The correct `monitor_key` would be:
896	/// `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
897	///
898	/// For v2 channels, the hex-encoded [`ChannelId`] is used directly for `monitor_key` instead.
899	///
900	/// Loading a large number of monitors will be faster if done in parallel. You can use this
901	/// function to accomplish this. Take care to limit the number of parallel readers.
902	pub async fn read_channel_monitor_with_updates(
903		&self, monitor_key: &str,
904	) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
905	{
906		self.0.read_channel_monitor_with_updates(monitor_key).await
907	}
908
909	/// Cleans up stale updates for all monitors.
910	///
911	/// This function works by first listing all monitors, and then for each of them, listing all
912	/// updates. The updates that have an `update_id` less than or equal to than the stored monitor
913	/// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
914	/// be passed to [`KVStoreSync::remove`].
915	pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
916		self.0.cleanup_stale_updates(lazy).await
917	}
918}
919
920impl<
921		K: Deref + MaybeSend + MaybeSync + 'static,
922		S: FutureSpawner,
923		L: Deref + MaybeSend + MaybeSync + 'static,
924		ES: Deref + MaybeSend + MaybeSync + 'static,
925		SP: Deref + MaybeSend + MaybeSync + 'static,
926		BI: Deref + MaybeSend + MaybeSync + 'static,
927		FE: Deref + MaybeSend + MaybeSync + 'static,
928	> MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
929where
930	K::Target: KVStore + MaybeSync,
931	L::Target: Logger,
932	ES::Target: EntropySource + Sized,
933	SP::Target: SignerProvider + Sized,
934	BI::Target: BroadcasterInterface,
935	FE::Target: FeeEstimator,
936	<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
937{
938	pub(crate) fn spawn_async_persist_new_channel(
939		&self, monitor_name: MonitorName,
940		monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
941		notifier: Arc<Notifier>,
942	) {
943		let inner = Arc::clone(&self.0);
944		// Note that `persist_new_channel` is a sync method which calls all the way through to the
945		// sync KVStore::write method (which returns a future) to ensure writes are well-ordered.
946		let future = inner.persist_new_channel(monitor_name, monitor);
947		let channel_id = monitor.channel_id();
948		let completion = (monitor.channel_id(), monitor.get_latest_update_id());
949		self.0.future_spawner.spawn(async move {
950			match future.await {
951				Ok(()) => {
952					inner.async_completed_updates.lock().unwrap().push(completion);
953					notifier.notify();
954				},
955				Err(e) => {
956					log_error!(
957						inner.logger,
958						"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
959					);
960				},
961			}
962		});
963	}
964
965	pub(crate) fn spawn_async_update_channel(
966		&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
967		monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
968		notifier: Arc<Notifier>,
969	) {
970		let inner = Arc::clone(&self.0);
971		// Note that `update_persisted_channel` is a sync method which calls all the way through to
972		// the sync KVStore::write method (which returns a future) to ensure writes are well-ordered
973		let future = inner.update_persisted_channel(monitor_name, update, monitor);
974		let channel_id = monitor.channel_id();
975		let completion = if let Some(update) = update {
976			Some((monitor.channel_id(), update.update_id))
977		} else {
978			None
979		};
980		let inner = Arc::clone(&self.0);
981		self.0.future_spawner.spawn(async move {
982			match future.await {
983				Ok(()) => if let Some(completion) = completion {
984					inner.async_completed_updates.lock().unwrap().push(completion);
985					notifier.notify();
986				},
987				Err(e) => {
988					log_error!(
989						inner.logger,
990						"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
991					);
992				},
993			}
994		});
995	}
996
997	pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) {
998		let inner = Arc::clone(&self.0);
999		self.0.future_spawner.spawn(async move {
1000			inner.archive_persisted_channel(monitor_name).await;
1001		});
1002	}
1003
1004	pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
1005		mem::take(&mut *self.0.async_completed_updates.lock().unwrap())
1006	}
1007}
1008
1009impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
1010	MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>
1011where
1012	K::Target: KVStore,
1013	L::Target: Logger,
1014	ES::Target: EntropySource + Sized,
1015	SP::Target: SignerProvider + Sized,
1016	BI::Target: BroadcasterInterface,
1017	FE::Target: FeeEstimator,
1018{
1019	pub async fn read_channel_monitor_with_updates(
1020		&self, monitor_key: &str,
1021	) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
1022	{
1023		match self.maybe_read_channel_monitor_with_updates(monitor_key).await? {
1024			Some(res) => Ok(res),
1025			None => Err(io::Error::new(
1026				io::ErrorKind::InvalidData,
1027				format!(
1028					"ChannelMonitor {} was stale, with no updates since LDK 0.0.118. \
1029						It cannot be read by modern versions of LDK, though also does not contain any funds left to sweep. \
1030						You should manually delete it instead",
1031					monitor_key,
1032				),
1033			)),
1034		}
1035	}
1036
1037	async fn maybe_read_channel_monitor_with_updates(
1038		&self, monitor_key: &str,
1039	) -> Result<
1040		Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
1041		io::Error,
1042	> {
1043		let monitor_name = MonitorName::from_str(monitor_key)?;
1044		let read_res = self.maybe_read_monitor(&monitor_name, monitor_key).await?;
1045		let (block_hash, monitor) = match read_res {
1046			Some(res) => res,
1047			None => return Ok(None),
1048		};
1049		let mut current_update_id = monitor.get_latest_update_id();
1050		// TODO: Parallelize this loop by speculatively reading a batch of updates
1051		loop {
1052			current_update_id = match current_update_id.checked_add(1) {
1053				Some(next_update_id) => next_update_id,
1054				None => break,
1055			};
1056			let update_name = UpdateName::from(current_update_id);
1057			let update = match self.read_monitor_update(monitor_key, &update_name).await {
1058				Ok(update) => update,
1059				Err(err) if err.kind() == io::ErrorKind::NotFound => {
1060					// We can't find any more updates, so we are done.
1061					break;
1062				},
1063				Err(err) => return Err(err),
1064			};
1065
1066			monitor
1067				.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1068				.map_err(|e| {
1069				log_error!(
1070					self.logger,
1071					"Monitor update failed. monitor: {} update: {} reason: {:?}",
1072					monitor_key,
1073					update_name.as_str(),
1074					e
1075				);
1076				io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1077			})?;
1078		}
1079		Ok(Some((block_hash, monitor)))
1080	}
1081
1082	/// Read a channel monitor.
1083	async fn maybe_read_monitor(
1084		&self, monitor_name: &MonitorName, monitor_key: &str,
1085	) -> Result<
1086		Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
1087		io::Error,
1088	> {
1089		let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1090		let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1091		let monitor_bytes = self.kv_store.read(primary, secondary, monitor_key).await?;
1092		let mut monitor_cursor = io::Cursor::new(monitor_bytes);
1093		// Discard the sentinel bytes if found.
1094		if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
1095			monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
1096		}
1097		match <Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>>::read(
1098			&mut monitor_cursor,
1099			(&*self.entropy_source, &*self.signer_provider),
1100		) {
1101			Ok(None) => Ok(None),
1102			Ok(Some((blockhash, channel_monitor))) => {
1103				if channel_monitor.persistence_key() != *monitor_name {
1104					log_error!(
1105						self.logger,
1106						"ChannelMonitor {} was stored under the wrong key!",
1107						monitor_key,
1108					);
1109					Err(io::Error::new(
1110						io::ErrorKind::InvalidData,
1111						"ChannelMonitor was stored under the wrong key",
1112					))
1113				} else {
1114					Ok(Some((blockhash, channel_monitor)))
1115				}
1116			},
1117			Err(e) => {
1118				log_error!(
1119					self.logger,
1120					"Failed to read ChannelMonitor {}, reason: {}",
1121					monitor_key,
1122					e,
1123				);
1124				Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
1125			},
1126		}
1127	}
1128
1129	/// Read a channel monitor update.
1130	async fn read_monitor_update(
1131		&self, monitor_key: &str, update_name: &UpdateName,
1132	) -> Result<ChannelMonitorUpdate, io::Error> {
1133		let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1134		let update_bytes = self.kv_store.read(primary, monitor_key, update_name.as_str()).await?;
1135		ChannelMonitorUpdate::read(&mut &update_bytes[..]).map_err(|e| {
1136			log_error!(
1137				self.logger,
1138				"Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}",
1139				CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1140				monitor_key,
1141				update_name.as_str(),
1142				e,
1143			);
1144			io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitorUpdate")
1145		})
1146	}
1147
1148	async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
1149		let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1150		let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1151		let monitor_keys = self.kv_store.list(primary, secondary).await?;
1152		for monitor_key in monitor_keys {
1153			let monitor_name = MonitorName::from_str(&monitor_key)?;
1154			let maybe_monitor = self.maybe_read_monitor(&monitor_name, &monitor_key).await?;
1155			if let Some((_, current_monitor)) = maybe_monitor {
1156				let latest_update_id = current_monitor.get_latest_update_id();
1157				self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy)
1158					.await?;
1159			} else {
1160				// TODO: Also clean up super stale monitors (created pre-0.0.110 and last updated
1161				// pre-0.0.119).
1162			}
1163		}
1164		Ok(())
1165	}
1166
1167	async fn cleanup_stale_updates_for_monitor_to(
1168		&self, monitor_key: &str, latest_update_id: u64, lazy: bool,
1169	) -> Result<(), io::Error> {
1170		let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1171		let updates = self.kv_store.list(primary, monitor_key).await?;
1172		for update in updates {
1173			let update_name = UpdateName::new(update)?;
1174			// if the update_id is lower than the stored monitor, delete
1175			if update_name.0 <= latest_update_id {
1176				self.kv_store.remove(primary, monitor_key, update_name.as_str(), lazy).await?;
1177			}
1178		}
1179		Ok(())
1180	}
1181
1182	fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
1183		&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
1184	) -> impl Future<Output = Result<(), io::Error>> {
1185		// Determine the proper key for this monitor
1186		let monitor_key = monitor_name.to_string();
1187		// Serialize and write the new monitor
1188		let mut monitor_bytes = Vec::with_capacity(
1189			MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
1190		);
1191		// If `maximum_pending_updates` is zero, we aren't actually writing monitor updates at all.
1192		// Thus, there's no need to add the sentinel prefix as the monitor can be read directly
1193		// from disk without issue.
1194		if self.maximum_pending_updates != 0 {
1195			monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
1196		}
1197		monitor.write(&mut monitor_bytes).unwrap();
1198		// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1199		// method, allowing it to do its queueing immediately, and then return a future for the
1200		// completion of the write. This ensures monitor persistence ordering is preserved.
1201		let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1202		let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1203		self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)
1204	}
1205
1206	fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>(
1207		self: Arc<Self>, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
1208		monitor: &ChannelMonitor<ChannelSigner>,
1209	) -> impl Future<Output = Result<(), io::Error>> + 'a
1210	where
1211		Self: 'a,
1212	{
1213		const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
1214		let mut res_a = None;
1215		let mut res_b = None;
1216		let mut res_c = None;
1217		if let Some(update) = update {
1218			let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
1219				&& self.maximum_pending_updates != 0
1220				&& update.update_id % self.maximum_pending_updates != 0;
1221			if persist_update {
1222				let monitor_key = monitor_name.to_string();
1223				let update_name = UpdateName::from(update.update_id);
1224				let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1225				// Note that this is NOT an async function, but rather calls the *sync* KVStore
1226				// write method, allowing it to do its queueing immediately, and then return a
1227				// future for the completion of the write. This ensures monitor persistence
1228				// ordering is preserved.
1229				res_a = Some(self.kv_store.write(
1230					primary,
1231					&monitor_key,
1232					update_name.as_str(),
1233					update.encode(),
1234				));
1235			} else {
1236				// We could write this update, but it meets criteria of our design that calls for a full monitor write.
1237				// Note that this is NOT an async function, but rather calls the *sync* KVStore
1238				// write method, allowing it to do its queueing immediately, and then return a
1239				// future for the completion of the write. This ensures monitor persistence
1240				// ordering is preserved. This, thus, must happen before any await we do below.
1241				let write_fut = self.persist_new_channel(monitor_name, monitor);
1242				let latest_update_id = monitor.get_latest_update_id();
1243
1244				res_b = Some(async move {
1245					let write_status = write_fut.await;
1246					if let Ok(()) = write_status {
1247						if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1248							let monitor_key = monitor_name.to_string();
1249							self.cleanup_stale_updates_for_monitor_to(
1250								&monitor_key,
1251								latest_update_id,
1252								true,
1253							)
1254							.await?;
1255						} else {
1256							let end = latest_update_id;
1257							let start = end.saturating_sub(self.maximum_pending_updates);
1258							self.cleanup_in_range(monitor_name, start, end).await;
1259						}
1260					}
1261
1262					write_status
1263				});
1264			}
1265		} else {
1266			// There is no update given, so we must persist a new monitor.
1267			// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1268			// method, allowing it to do its queueing immediately, and then return a future for the
1269			// completion of the write. This ensures monitor persistence ordering is preserved.
1270			res_c = Some(self.persist_new_channel(monitor_name, monitor));
1271		}
1272		async move {
1273			// Complete any pending future(s). Note that to keep one return type we have to end
1274			// with a single async move block that we return, rather than trying to return the
1275			// individual futures themselves.
1276			if let Some(a) = res_a {
1277				a.await?;
1278			}
1279			if let Some(b) = res_b {
1280				b.await?;
1281			}
1282			if let Some(c) = res_c {
1283				c.await?;
1284			}
1285			Ok(())
1286		}
1287	}
1288
1289	async fn archive_persisted_channel(&self, monitor_name: MonitorName) {
1290		let monitor_key = monitor_name.to_string();
1291		let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await {
1292			Ok((_block_hash, monitor)) => monitor,
1293			Err(_) => return,
1294		};
1295		let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1296		let secondary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1297		match self.kv_store.write(primary, secondary, &monitor_key, monitor.encode()).await {
1298			Ok(()) => {},
1299			Err(_e) => return,
1300		};
1301		let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1302		let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1303		let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await;
1304	}
1305
1306	// Cleans up monitor updates for given monitor in range `start..=end`.
1307	async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
1308		let monitor_key = monitor_name.to_string();
1309		for update_id in start..=end {
1310			let update_name = UpdateName::from(update_id);
1311			let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1312			let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await;
1313			if let Err(e) = res {
1314				log_error!(
1315					self.logger,
1316					"Failed to clean up channel monitor updates for monitor {}, reason: {}",
1317					monitor_key.as_str(),
1318					e
1319				);
1320			};
1321		}
1322	}
1323}
1324
1325/// A struct representing a name for a channel monitor.
1326///
1327/// `MonitorName` is primarily used within the [`MonitorUpdatingPersister`]
1328/// in functions that store or retrieve [`ChannelMonitor`] snapshots.
1329/// It provides a consistent way to generate a unique key for channel
1330/// monitors based on the channel's funding [`OutPoint`] for v1 channels or
1331/// [`ChannelId`] for v2 channels. Use [`ChannelMonitor::persistence_key`] to
1332/// obtain the correct `MonitorName`.
1333///
1334/// While users of the Lightning Dev Kit library generally won't need
1335/// to interact with [`MonitorName`] directly, it can be useful for:
1336/// - Custom persistence implementations
1337/// - Debugging or logging channel monitor operations
1338/// - Extending the functionality of the `MonitorUpdatingPersister`
1339///
1340/// # Examples
1341///
1342/// ```
1343/// use std::str::FromStr;
1344///
1345/// use bitcoin::Txid;
1346/// use bitcoin::hashes::hex::FromHex;
1347///
1348/// use lightning::util::persist::MonitorName;
1349/// use lightning::chain::transaction::OutPoint;
1350/// use lightning::ln::types::ChannelId;
1351///
1352/// // v1 channel
1353/// let outpoint = OutPoint {
1354///	 txid: Txid::from_str("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
1355///	 index: 1,
1356/// };
1357/// let monitor_name = MonitorName::V1Channel(outpoint);
1358/// assert_eq!(&monitor_name.to_string(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1");
1359///
1360/// // v2 channel
1361/// let channel_id = ChannelId(<[u8; 32]>::from_hex("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap());
1362/// let monitor_name = MonitorName::V2Channel(channel_id);
1363/// assert_eq!(&monitor_name.to_string(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
1364///
1365/// // Using MonitorName to generate a storage key
1366/// let storage_key = format!("channel_monitors/{}", monitor_name);
1367/// ```
1368#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
1369pub enum MonitorName {
1370	/// The outpoint of the channel's funding transaction.
1371	V1Channel(OutPoint),
1372
1373	/// The id of the channel produced by [`ChannelId::v2_from_revocation_basepoints`].
1374	V2Channel(ChannelId),
1375}
1376
1377impl MonitorName {
1378	/// Attempts to construct a `MonitorName` from a storage key returned by [`KVStoreSync::list`].
1379	///
1380	/// This is useful when you need to reconstruct the original data the key represents.
1381	fn from_str(monitor_key: &str) -> Result<Self, io::Error> {
1382		let mut parts = monitor_key.splitn(2, '_');
1383		let id = parts
1384			.next()
1385			.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Empty stored key"))?;
1386
1387		if let Some(part) = parts.next() {
1388			let txid = Txid::from_str(id).map_err(|_| {
1389				io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
1390			})?;
1391			let index: u16 = part.parse().map_err(|_| {
1392				io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
1393			})?;
1394			let outpoint = OutPoint { txid, index };
1395			Ok(MonitorName::V1Channel(outpoint))
1396		} else {
1397			let bytes = <[u8; 32]>::from_hex(id).map_err(|_| {
1398				io::Error::new(io::ErrorKind::InvalidData, "Invalid channel ID in stored key")
1399			})?;
1400			Ok(MonitorName::V2Channel(ChannelId(bytes)))
1401		}
1402	}
1403}
1404
1405impl core::fmt::Display for MonitorName {
1406	fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> {
1407		match self {
1408			MonitorName::V1Channel(outpoint) => {
1409				write!(f, "{}_{}", outpoint.txid, outpoint.index)
1410			},
1411			MonitorName::V2Channel(channel_id) => {
1412				write!(f, "{}", channel_id)
1413			},
1414		}
1415	}
1416}
1417
1418/// A struct representing a name for a channel monitor update.
1419///
1420/// [`UpdateName`] is primarily used within the [`MonitorUpdatingPersister`] in
1421/// functions that store or retrieve partial updates to channel monitors. It
1422/// provides a consistent way to generate and parse unique identifiers for
1423/// monitor updates based on their sequence number.
1424///
1425/// The name is derived from the update's sequence ID, which is a monotonically
1426/// increasing u64 value. This format allows for easy ordering of updates and
1427/// efficient storage and retrieval in key-value stores.
1428///
1429/// # Usage
1430///
1431/// While users of the Lightning Dev Kit library generally won't need to
1432/// interact with `UpdateName` directly, it still can be useful for custom
1433/// persistence implementations. The u64 value is the update_id that can be
1434/// compared with [ChannelMonitor::get_latest_update_id] to check if this update
1435/// has been applied to the channel monitor or not, which is useful for pruning
1436/// stale channel monitor updates off persistence.
1437///
1438/// # Examples
1439///
1440/// ```
1441/// use lightning::util::persist::UpdateName;
1442///
1443/// let update_id: u64 = 42;
1444/// let update_name = UpdateName::from(update_id);
1445/// assert_eq!(update_name.as_str(), "42");
1446///
1447/// // Using UpdateName to generate a storage key
1448/// let monitor_name = "some_monitor_name";
1449/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
1450/// ```
1451#[derive(Debug)]
1452pub struct UpdateName(pub u64, String);
1453
1454impl UpdateName {
1455	/// Constructs an [`UpdateName`], after verifying that an update sequence ID
1456	/// can be derived from the given `name`.
1457	pub fn new(name: String) -> Result<Self, io::Error> {
1458		match name.parse::<u64>() {
1459			Ok(u) => Ok(u.into()),
1460			Err(_) => {
1461				Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
1462			},
1463		}
1464	}
1465
1466	/// Convert this update name to a string slice.
1467	///
1468	/// This method is particularly useful when you need to use the update name
1469	/// as part of a key in a key-value store or when logging.
1470	///
1471	/// # Examples
1472	///
1473	/// ```
1474	/// use lightning::util::persist::UpdateName;
1475	///
1476	/// let update_name = UpdateName::from(42);
1477	/// assert_eq!(update_name.as_str(), "42");
1478	/// ```
1479	pub fn as_str(&self) -> &str {
1480		&self.1
1481	}
1482}
1483
1484impl From<u64> for UpdateName {
1485	/// Creates an `UpdateName` from a `u64`.
1486	///
1487	/// This is typically used when you need to generate a storage key or
1488	/// identifier
1489	/// for a new channel monitor update.
1490	///
1491	/// # Examples
1492	///
1493	/// ```
1494	/// use lightning::util::persist::UpdateName;
1495	///
1496	/// let update_id: u64 = 42;
1497	/// let update_name = UpdateName::from(update_id);
1498	/// assert_eq!(update_name.as_str(), "42");
1499	/// ```
1500	fn from(value: u64) -> Self {
1501		Self(value, value.to_string())
1502	}
1503}
1504
1505#[cfg(test)]
1506mod tests {
1507	use super::*;
1508	use crate::chain::ChannelMonitorUpdateStatus;
1509	use crate::events::ClosureReason;
1510	use crate::ln::functional_test_utils::*;
1511	use crate::ln::msgs::BaseMessageHandler;
1512	use crate::sync::Arc;
1513	use crate::util::test_channel_signer::TestChannelSigner;
1514	use crate::util::test_utils::{self, TestStore};
1515	use crate::{check_added_monitors, check_closed_broadcast};
1516	use bitcoin::hashes::hex::FromHex;
1517	use core::cmp;
1518
1519	const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
1520
1521	#[test]
1522	fn converts_u64_to_update_name() {
1523		assert_eq!(UpdateName::from(0).as_str(), "0");
1524		assert_eq!(UpdateName::from(21).as_str(), "21");
1525		assert_eq!(UpdateName::from(u64::MAX).as_str(), "18446744073709551615");
1526	}
1527
1528	#[test]
1529	fn bad_update_name_fails() {
1530		assert!(UpdateName::new("deadbeef".to_string()).is_err());
1531		assert!(UpdateName::new("-1".to_string()).is_err());
1532	}
1533
1534	#[test]
1535	fn creates_monitor_from_outpoint() {
1536		let monitor_name = MonitorName::V1Channel(OutPoint {
1537			txid: Txid::from_str(
1538				"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
1539			)
1540			.unwrap(),
1541			index: 1,
1542		});
1543		assert_eq!(
1544			&monitor_name.to_string(),
1545			"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
1546		);
1547
1548		let monitor_name = MonitorName::V1Channel(OutPoint {
1549			txid: Txid::from_str(
1550				"f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
1551			)
1552			.unwrap(),
1553			index: u16::MAX,
1554		});
1555		assert_eq!(
1556			&monitor_name.to_string(),
1557			"f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
1558		);
1559	}
1560
1561	#[test]
1562	fn creates_monitor_from_channel_id() {
1563		let monitor_name = MonitorName::V2Channel(ChannelId(
1564			<[u8; 32]>::from_hex(
1565				"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
1566			)
1567			.unwrap(),
1568		));
1569		assert_eq!(
1570			&monitor_name.to_string(),
1571			"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
1572		);
1573	}
1574
1575	#[test]
1576	fn fails_parsing_monitor_name() {
1577		assert!(MonitorName::from_str(
1578			"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_"
1579		)
1580		.is_err());
1581		assert!(MonitorName::from_str(
1582			"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536"
1583		)
1584		.is_err());
1585		assert!(MonitorName::from_str(
1586			"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21"
1587		)
1588		.is_err());
1589	}
1590
1591	// Exercise the `MonitorUpdatingPersister` with real channels and payments.
1592	fn do_persister_with_real_monitors(max_pending_updates_0: u64, max_pending_updates_1: u64) {
1593		let chanmon_cfgs = create_chanmon_cfgs(4);
1594		let kv_store_0 = TestStore::new(false);
1595		let persister_0 = MonitorUpdatingPersister::new(
1596			&kv_store_0,
1597			&chanmon_cfgs[0].logger,
1598			max_pending_updates_0,
1599			&chanmon_cfgs[0].keys_manager,
1600			&chanmon_cfgs[0].keys_manager,
1601			&chanmon_cfgs[0].tx_broadcaster,
1602			&chanmon_cfgs[0].fee_estimator,
1603		);
1604		let kv_store_1 = TestStore::new(false);
1605		let persister_1 = MonitorUpdatingPersister::new(
1606			&kv_store_1,
1607			&chanmon_cfgs[1].logger,
1608			max_pending_updates_1,
1609			&chanmon_cfgs[1].keys_manager,
1610			&chanmon_cfgs[1].keys_manager,
1611			&chanmon_cfgs[1].tx_broadcaster,
1612			&chanmon_cfgs[1].fee_estimator,
1613		);
1614		let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1615		let chain_mon_0 = test_utils::TestChainMonitor::new(
1616			Some(&chanmon_cfgs[0].chain_source),
1617			&chanmon_cfgs[0].tx_broadcaster,
1618			&chanmon_cfgs[0].logger,
1619			&chanmon_cfgs[0].fee_estimator,
1620			&persister_0,
1621			&chanmon_cfgs[0].keys_manager,
1622		);
1623		let chain_mon_1 = test_utils::TestChainMonitor::new(
1624			Some(&chanmon_cfgs[1].chain_source),
1625			&chanmon_cfgs[1].tx_broadcaster,
1626			&chanmon_cfgs[1].logger,
1627			&chanmon_cfgs[1].fee_estimator,
1628			&persister_1,
1629			&chanmon_cfgs[1].keys_manager,
1630		);
1631		node_cfgs[0].chain_monitor = chain_mon_0;
1632		node_cfgs[1].chain_monitor = chain_mon_1;
1633		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1634		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1635
1636		// Check that the persisted channel data is empty before any channels are
1637		// open.
1638		let mut persisted_chan_data_0 =
1639			persister_0.read_all_channel_monitors_with_updates().unwrap();
1640		assert_eq!(persisted_chan_data_0.len(), 0);
1641		let mut persisted_chan_data_1 =
1642			persister_1.read_all_channel_monitors_with_updates().unwrap();
1643		assert_eq!(persisted_chan_data_1.len(), 0);
1644
1645		// Helper to make sure the channel is on the expected update ID.
1646		macro_rules! check_persisted_data {
1647			($expected_update_id: expr) => {
1648				persisted_chan_data_0 =
1649					persister_0.read_all_channel_monitors_with_updates().unwrap();
1650				// check that we stored only one monitor
1651				assert_eq!(persisted_chan_data_0.len(), 1);
1652				for (_, mon) in persisted_chan_data_0.iter() {
1653					// check that when we read it, we got the right update id
1654					assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1655
1656					let monitor_name = mon.persistence_key();
1657					let expected_updates = if max_pending_updates_0 == 0 {
1658						0
1659					} else {
1660						mon.get_latest_update_id() % max_pending_updates_0
1661					};
1662					let update_list = KVStoreSync::list(
1663						&kv_store_0,
1664						CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1665						&monitor_name.to_string(),
1666					);
1667					assert_eq!(update_list.unwrap().len() as u64, expected_updates, "persister 0");
1668				}
1669				persisted_chan_data_1 =
1670					persister_1.read_all_channel_monitors_with_updates().unwrap();
1671				assert_eq!(persisted_chan_data_1.len(), 1);
1672				for (_, mon) in persisted_chan_data_1.iter() {
1673					assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1674					let monitor_name = mon.persistence_key();
1675					let expected_updates = if max_pending_updates_1 == 0 {
1676						0
1677					} else {
1678						mon.get_latest_update_id() % max_pending_updates_1
1679					};
1680					let update_list = KVStoreSync::list(
1681						&kv_store_1,
1682						CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1683						&monitor_name.to_string(),
1684					);
1685					assert_eq!(update_list.unwrap().len() as u64, expected_updates, "persister 1");
1686				}
1687			};
1688		}
1689
1690		// Create some initial channel and check that a channel was persisted.
1691		let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1692		check_persisted_data!(0);
1693
1694		// Send a few payments and make sure the monitors are updated to the latest.
1695		send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1696		check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
1697		send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1698		check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
1699
1700		// Send a few more payments to try all the alignments of max pending updates with
1701		// updates for a payment sent and received.
1702		let mut sender = 0;
1703		for i in 3..=max_pending_updates_0 * 2 {
1704			let receiver;
1705			if sender == 0 {
1706				sender = 1;
1707				receiver = 0;
1708			} else {
1709				sender = 0;
1710				receiver = 1;
1711			}
1712			send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
1713			check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
1714		}
1715
1716		// Force close because cooperative close doesn't result in any persisted
1717		// updates.
1718
1719		let node_id_1 = nodes[1].node.get_our_node_id();
1720		let chan_id = nodes[0].node.list_channels()[0].channel_id;
1721		let message = "Channel force-closed".to_owned();
1722		nodes[0]
1723			.node
1724			.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, message.clone())
1725			.unwrap();
1726
1727		let reason =
1728			ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
1729		check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
1730		check_closed_broadcast!(nodes[0], true);
1731		check_added_monitors!(nodes[0], 1);
1732
1733		let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
1734		assert_eq!(node_txn.len(), 1);
1735		let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
1736		let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
1737		connect_block(&nodes[1], &dummy_block);
1738
1739		check_closed_broadcast!(nodes[1], true);
1740		let reason = ClosureReason::CommitmentTxConfirmed;
1741		let node_id_0 = nodes[0].node.get_our_node_id();
1742		check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
1743		check_added_monitors!(nodes[1], 1);
1744
1745		// Make sure everything is persisted as expected after close.
1746		// We always send at least two payments, and loop up to max_pending_updates_0 * 2.
1747		check_persisted_data!(
1748			cmp::max(2, max_pending_updates_0 * 2) * EXPECTED_UPDATES_PER_PAYMENT + 1
1749		);
1750	}
1751
1752	#[test]
1753	fn persister_with_real_monitors() {
1754		do_persister_with_real_monitors(7, 3);
1755		do_persister_with_real_monitors(0, 1);
1756		do_persister_with_real_monitors(4, 2);
1757	}
1758
1759	// Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
1760	// monitor or update with it results in the persister returning an UnrecoverableError status.
1761	#[test]
1762	fn unrecoverable_error_on_write_failure() {
1763		// Set up a dummy channel and force close. This will produce a monitor
1764		// that we can then use to test persistence.
1765		let chanmon_cfgs = create_chanmon_cfgs(2);
1766		let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1767		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1768		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1769		let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
1770
1771		let message = "Channel force-closed".to_owned();
1772		let node_id_0 = nodes[0].node.get_our_node_id();
1773		nodes[1]
1774			.node
1775			.force_close_broadcasting_latest_txn(&chan.2, &node_id_0, message.clone())
1776			.unwrap();
1777		let reason =
1778			ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
1779		check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
1780
1781		{
1782			let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
1783			let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
1784			let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
1785
1786			let store = TestStore::new(true);
1787			let ro_persister = MonitorUpdatingPersister::new(
1788				&store,
1789				node_cfgs[0].logger,
1790				11,
1791				node_cfgs[0].keys_manager,
1792				node_cfgs[0].keys_manager,
1793				node_cfgs[0].tx_broadcaster,
1794				node_cfgs[0].fee_estimator,
1795			);
1796			let monitor_name = added_monitors[0].1.persistence_key();
1797			match ro_persister.persist_new_channel(monitor_name, &added_monitors[0].1) {
1798				ChannelMonitorUpdateStatus::UnrecoverableError => {
1799					// correct result
1800				},
1801				ChannelMonitorUpdateStatus::Completed => {
1802					panic!("Completed persisting new channel when shouldn't have")
1803				},
1804				ChannelMonitorUpdateStatus::InProgress => {
1805					panic!("Returned InProgress when shouldn't have")
1806				},
1807			}
1808			match ro_persister.update_persisted_channel(
1809				monitor_name,
1810				Some(cmu),
1811				&added_monitors[0].1,
1812			) {
1813				ChannelMonitorUpdateStatus::UnrecoverableError => {
1814					// correct result
1815				},
1816				ChannelMonitorUpdateStatus::Completed => {
1817					panic!("Completed persisting new channel when shouldn't have")
1818				},
1819				ChannelMonitorUpdateStatus::InProgress => {
1820					panic!("Returned InProgress when shouldn't have")
1821				},
1822			}
1823			added_monitors.clear();
1824		}
1825		nodes[1].node.get_and_clear_pending_msg_events();
1826	}
1827
1828	// Confirm that the `clean_stale_updates` function finds and deletes stale updates.
1829	#[test]
1830	fn clean_stale_updates_works() {
1831		let test_max_pending_updates = 7;
1832		let chanmon_cfgs = create_chanmon_cfgs(3);
1833		let kv_store_0 = TestStore::new(false);
1834		let persister_0 = MonitorUpdatingPersister::new(
1835			&kv_store_0,
1836			&chanmon_cfgs[0].logger,
1837			test_max_pending_updates,
1838			&chanmon_cfgs[0].keys_manager,
1839			&chanmon_cfgs[0].keys_manager,
1840			&chanmon_cfgs[0].tx_broadcaster,
1841			&chanmon_cfgs[0].fee_estimator,
1842		);
1843		let kv_store_1 = TestStore::new(false);
1844		let persister_1 = MonitorUpdatingPersister::new(
1845			&kv_store_1,
1846			&chanmon_cfgs[1].logger,
1847			test_max_pending_updates,
1848			&chanmon_cfgs[1].keys_manager,
1849			&chanmon_cfgs[1].keys_manager,
1850			&chanmon_cfgs[1].tx_broadcaster,
1851			&chanmon_cfgs[1].fee_estimator,
1852		);
1853		let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1854		let chain_mon_0 = test_utils::TestChainMonitor::new(
1855			Some(&chanmon_cfgs[0].chain_source),
1856			&chanmon_cfgs[0].tx_broadcaster,
1857			&chanmon_cfgs[0].logger,
1858			&chanmon_cfgs[0].fee_estimator,
1859			&persister_0,
1860			&chanmon_cfgs[0].keys_manager,
1861		);
1862		let chain_mon_1 = test_utils::TestChainMonitor::new(
1863			Some(&chanmon_cfgs[1].chain_source),
1864			&chanmon_cfgs[1].tx_broadcaster,
1865			&chanmon_cfgs[1].logger,
1866			&chanmon_cfgs[1].fee_estimator,
1867			&persister_1,
1868			&chanmon_cfgs[1].keys_manager,
1869		);
1870		node_cfgs[0].chain_monitor = chain_mon_0;
1871		node_cfgs[1].chain_monitor = chain_mon_1;
1872		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1873		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1874
1875		// Check that the persisted channel data is empty before any channels are
1876		// open.
1877		let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
1878		assert_eq!(persisted_chan_data.len(), 0);
1879
1880		// Create some initial channel
1881		let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1882
1883		// Send a few payments to advance the updates a bit
1884		send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1885		send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1886
1887		// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
1888		let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
1889		let (_, monitor) = &persisted_chan_data[0];
1890		let monitor_name = monitor.persistence_key();
1891		KVStoreSync::write(
1892			&kv_store_0,
1893			CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1894			&monitor_name.to_string(),
1895			UpdateName::from(1).as_str(),
1896			vec![0u8; 1],
1897		)
1898		.unwrap();
1899
1900		// Do the stale update cleanup
1901		persister_0.cleanup_stale_updates(false).unwrap();
1902
1903		// Confirm the stale update is unreadable/gone
1904		assert!(KVStoreSync::read(
1905			&kv_store_0,
1906			CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1907			&monitor_name.to_string(),
1908			UpdateName::from(1).as_str()
1909		)
1910		.is_err());
1911	}
1912
1913	fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
1914	where
1915		P::Target: Persist<ChannelSigner>,
1916	{
1917		true
1918	}
1919
1920	#[test]
1921	fn kvstore_trait_object_usage() {
1922		let store: Arc<dyn KVStoreSync + Send + Sync> = Arc::new(TestStore::new(false));
1923		assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store)));
1924	}
1925}