1use alloc::sync::Arc;
15
16use bitcoin::hashes::hex::FromHex;
17use bitcoin::{BlockHash, Txid};
18
19use core::future::Future;
20use core::mem;
21use core::ops::Deref;
22use core::pin::Pin;
23use core::str::FromStr;
24use core::task;
25
26use crate::prelude::*;
27use crate::{io, log_error};
28
29use crate::chain;
30use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
31use crate::chain::chainmonitor::Persist;
32use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
33use crate::chain::transaction::OutPoint;
34use crate::ln::types::ChannelId;
35use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
36use crate::sync::Mutex;
37use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync};
38use crate::util::logger::Logger;
39use crate::util::native_async::FutureSpawner;
40use crate::util::ser::{Readable, ReadableArgs, Writeable};
41use crate::util::wakers::Notifier;
42
43pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
45 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
46
47pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
49
50pub const CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
54pub const CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
58pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
62
63pub const CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitors";
65pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
67pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
69
70pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
72pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
74
75pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
79pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
83pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
87
88pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
92pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
96pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
100
101pub const OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
105pub const OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
109pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper";
114
115pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
120
121pub trait KVStoreSync {
146 fn read(
154 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
155 ) -> Result<Vec<u8>, io::Error>;
156 fn write(
160 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
161 ) -> Result<(), io::Error>;
162 fn remove(
183 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
184 ) -> Result<(), io::Error>;
185 fn list(
191 &self, primary_namespace: &str, secondary_namespace: &str,
192 ) -> Result<Vec<String>, io::Error>;
193}
194
195#[derive(Clone)]
198pub struct KVStoreSyncWrapper<K: Deref>(pub K)
199where
200 K::Target: KVStoreSync;
201
202impl<K: Deref> Deref for KVStoreSyncWrapper<K>
203where
204 K::Target: KVStoreSync,
205{
206 type Target = Self;
207 fn deref(&self) -> &Self::Target {
208 self
209 }
210}
211
212impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
214where
215 K::Target: KVStoreSync,
216{
217 fn read(
218 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
219 ) -> AsyncResult<'static, Vec<u8>, io::Error> {
220 let res = self.0.read(primary_namespace, secondary_namespace, key);
221
222 Box::pin(async move { res })
223 }
224
225 fn write(
226 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
227 ) -> AsyncResult<'static, (), io::Error> {
228 let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
229
230 Box::pin(async move { res })
231 }
232
233 fn remove(
234 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
235 ) -> AsyncResult<'static, (), io::Error> {
236 let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
237
238 Box::pin(async move { res })
239 }
240
241 fn list(
242 &self, primary_namespace: &str, secondary_namespace: &str,
243 ) -> AsyncResult<'static, Vec<String>, io::Error> {
244 let res = self.0.list(primary_namespace, secondary_namespace);
245
246 Box::pin(async move { res })
247 }
248}
249
250pub trait KVStore {
277 fn read(
285 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
286 ) -> AsyncResult<'static, Vec<u8>, io::Error>;
287 fn write(
305 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
306 ) -> AsyncResult<'static, (), io::Error>;
307 fn remove(
328 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
329 ) -> AsyncResult<'static, (), io::Error>;
330 fn list(
336 &self, primary_namespace: &str, secondary_namespace: &str,
337 ) -> AsyncResult<'static, Vec<String>, io::Error>;
338}
339
340pub trait MigratableKVStore: KVStoreSync {
343 fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, io::Error>;
351}
352
353pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
362 source_store: &mut S, target_store: &mut T,
363) -> Result<(), io::Error> {
364 let keys_to_migrate = source_store.list_all_keys()?;
365
366 for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
367 let data = source_store.read(primary_namespace, secondary_namespace, key)?;
368 target_store.write(primary_namespace, secondary_namespace, key, data)?;
369 }
370
371 Ok(())
372}
373
374impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<ChannelSigner> for K {
375 fn persist_new_channel(
381 &self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
382 ) -> chain::ChannelMonitorUpdateStatus {
383 match self.write(
384 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
385 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
386 &monitor_name.to_string(),
387 monitor.encode(),
388 ) {
389 Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
390 Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
391 }
392 }
393
394 fn update_persisted_channel(
395 &self, monitor_name: MonitorName, _update: Option<&ChannelMonitorUpdate>,
396 monitor: &ChannelMonitor<ChannelSigner>,
397 ) -> chain::ChannelMonitorUpdateStatus {
398 match self.write(
399 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
400 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
401 &monitor_name.to_string(),
402 monitor.encode(),
403 ) {
404 Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
405 Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
406 }
407 }
408
409 fn archive_persisted_channel(&self, monitor_name: MonitorName) {
410 let monitor_key = monitor_name.to_string();
411 let monitor = match self.read(
412 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
413 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
414 monitor_key.as_str(),
415 ) {
416 Ok(monitor) => monitor,
417 Err(_) => return,
418 };
419 match self.write(
420 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
421 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
422 monitor_key.as_str(),
423 monitor,
424 ) {
425 Ok(()) => {},
426 Err(_e) => return,
427 };
428 let _ = self.remove(
429 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
430 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
431 monitor_key.as_str(),
432 true,
433 );
434 }
435}
436
437pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
439 kv_store: K, entropy_source: ES, signer_provider: SP,
440) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
441where
442 K::Target: KVStoreSync,
443 ES::Target: EntropySource + Sized,
444 SP::Target: SignerProvider + Sized,
445{
446 let mut res = Vec::new();
447
448 for stored_key in kv_store.list(
449 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
450 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
451 )? {
452 match <Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>>::read(
453 &mut io::Cursor::new(kv_store.read(
454 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
455 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
456 &stored_key,
457 )?),
458 (&*entropy_source, &*signer_provider),
459 ) {
460 Ok(Some((block_hash, channel_monitor))) => {
461 let monitor_name = MonitorName::from_str(&stored_key)?;
462 if channel_monitor.persistence_key() != monitor_name {
463 return Err(io::Error::new(
464 io::ErrorKind::InvalidData,
465 "ChannelMonitor was stored under the wrong key",
466 ));
467 }
468
469 res.push((block_hash, channel_monitor));
470 },
471 Ok(None) => {},
472 Err(_) => {
473 return Err(io::Error::new(
474 io::ErrorKind::InvalidData,
475 "Failed to read ChannelMonitor",
476 ))
477 },
478 }
479 }
480 Ok(res)
481}
482
483struct PanicingSpawner;
484impl FutureSpawner for PanicingSpawner {
485 fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
486 unreachable!();
487 }
488}
489
490fn poll_sync_future<F: Future>(future: F) -> F::Output {
491 let mut waker = dummy_waker();
492 let mut ctx = task::Context::from_waker(&mut waker);
493 match Pin::new(&mut Box::pin(future)).poll(&mut ctx) {
495 task::Poll::Ready(result) => result,
496 task::Poll::Pending => {
497 unreachable!("Sync KVStore-derived futures can not be pending in a sync context");
499 },
500 }
501}
502
503pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>(
588 MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, PanicingSpawner, L, ES, SP, BI, FE>,
589)
590where
591 K::Target: KVStoreSync,
592 L::Target: Logger,
593 ES::Target: EntropySource + Sized,
594 SP::Target: SignerProvider + Sized,
595 BI::Target: BroadcasterInterface,
596 FE::Target: FeeEstimator;
597
598impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
599 MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
600where
601 K::Target: KVStoreSync,
602 L::Target: Logger,
603 ES::Target: EntropySource + Sized,
604 SP::Target: SignerProvider + Sized,
605 BI::Target: BroadcasterInterface,
606 FE::Target: FeeEstimator,
607{
608 pub fn new(
629 kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
630 signer_provider: SP, broadcaster: BI, fee_estimator: FE,
631 ) -> Self {
632 MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
636 KVStoreSyncWrapper(kv_store),
637 PanicingSpawner,
638 logger,
639 maximum_pending_updates,
640 entropy_source,
641 signer_provider,
642 broadcaster,
643 fee_estimator,
644 ))
645 }
646
647 pub fn read_all_channel_monitors_with_updates(
653 &self,
654 ) -> Result<
655 Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
656 io::Error,
657 > {
658 poll_sync_future(self.0.read_all_channel_monitors_with_updates())
659 }
660
661 pub fn read_channel_monitor_with_updates(
681 &self, monitor_key: &str,
682 ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
683 {
684 poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key))
685 }
686
687 pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
694 poll_sync_future(self.0.cleanup_stale_updates(lazy))
695 }
696}
697
698impl<
699 ChannelSigner: EcdsaChannelSigner,
700 K: Deref,
701 L: Deref,
702 ES: Deref,
703 SP: Deref,
704 BI: Deref,
705 FE: Deref,
706 > Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
707where
708 K::Target: KVStoreSync,
709 L::Target: Logger,
710 ES::Target: EntropySource + Sized,
711 SP::Target: SignerProvider + Sized,
712 BI::Target: BroadcasterInterface,
713 FE::Target: FeeEstimator,
714{
715 fn persist_new_channel(
718 &self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
719 ) -> chain::ChannelMonitorUpdateStatus {
720 let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor));
721 match res {
722 Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
723 Err(e) => {
724 log_error!(
725 self.0 .0.logger,
726 "Failed to write ChannelMonitor {}/{}/{} reason: {}",
727 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
728 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
729 monitor_name,
730 e
731 );
732 chain::ChannelMonitorUpdateStatus::UnrecoverableError
733 },
734 }
735 }
736
737 fn update_persisted_channel(
747 &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
748 monitor: &ChannelMonitor<ChannelSigner>,
749 ) -> chain::ChannelMonitorUpdateStatus {
750 let inner = Arc::clone(&self.0 .0);
751 let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor));
752 match res {
753 Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
754 Err(e) => {
755 log_error!(
756 self.0 .0.logger,
757 "Failed to write ChannelMonitorUpdate {} id {} reason: {}",
758 monitor_name,
759 update.as_ref().map(|upd| upd.update_id).unwrap_or(0),
760 e
761 );
762 chain::ChannelMonitorUpdateStatus::UnrecoverableError
763 },
764 }
765 }
766
767 fn archive_persisted_channel(&self, monitor_name: MonitorName) {
768 poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name));
769 }
770}
771
772pub struct MonitorUpdatingPersisterAsync<
785 K: Deref,
786 S: FutureSpawner,
787 L: Deref,
788 ES: Deref,
789 SP: Deref,
790 BI: Deref,
791 FE: Deref,
792>(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
793where
794 K::Target: KVStore,
795 L::Target: Logger,
796 ES::Target: EntropySource + Sized,
797 SP::Target: SignerProvider + Sized,
798 BI::Target: BroadcasterInterface,
799 FE::Target: FeeEstimator;
800
801struct MonitorUpdatingPersisterAsyncInner<
802 K: Deref,
803 S: FutureSpawner,
804 L: Deref,
805 ES: Deref,
806 SP: Deref,
807 BI: Deref,
808 FE: Deref,
809> where
810 K::Target: KVStore,
811 L::Target: Logger,
812 ES::Target: EntropySource + Sized,
813 SP::Target: SignerProvider + Sized,
814 BI::Target: BroadcasterInterface,
815 FE::Target: FeeEstimator,
816{
817 kv_store: K,
818 async_completed_updates: Mutex<Vec<(ChannelId, u64)>>,
819 future_spawner: S,
820 logger: L,
821 maximum_pending_updates: u64,
822 entropy_source: ES,
823 signer_provider: SP,
824 broadcaster: BI,
825 fee_estimator: FE,
826}
827
828impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
829 MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
830where
831 K::Target: KVStore,
832 L::Target: Logger,
833 ES::Target: EntropySource + Sized,
834 SP::Target: SignerProvider + Sized,
835 BI::Target: BroadcasterInterface,
836 FE::Target: FeeEstimator,
837{
838 pub fn new(
842 kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64,
843 entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE,
844 ) -> Self {
845 MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
846 kv_store,
847 async_completed_updates: Mutex::new(Vec::new()),
848 future_spawner,
849 logger,
850 maximum_pending_updates,
851 entropy_source,
852 signer_provider,
853 broadcaster,
854 fee_estimator,
855 }))
856 }
857
858 pub async fn read_all_channel_monitors_with_updates(
864 &self,
865 ) -> Result<
866 Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
867 io::Error,
868 > {
869 let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
870 let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
871 let monitor_list = self.0.kv_store.list(primary, secondary).await?;
872 let mut res = Vec::with_capacity(monitor_list.len());
873 for monitor_key in monitor_list {
874 let result =
875 self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await?;
876 if let Some(read_res) = result {
877 res.push(read_res);
878 }
879 }
880 Ok(res)
881 }
882
883 pub async fn read_channel_monitor_with_updates(
903 &self, monitor_key: &str,
904 ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
905 {
906 self.0.read_channel_monitor_with_updates(monitor_key).await
907 }
908
909 pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
916 self.0.cleanup_stale_updates(lazy).await
917 }
918}
919
920impl<
921 K: Deref + MaybeSend + MaybeSync + 'static,
922 S: FutureSpawner,
923 L: Deref + MaybeSend + MaybeSync + 'static,
924 ES: Deref + MaybeSend + MaybeSync + 'static,
925 SP: Deref + MaybeSend + MaybeSync + 'static,
926 BI: Deref + MaybeSend + MaybeSync + 'static,
927 FE: Deref + MaybeSend + MaybeSync + 'static,
928 > MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
929where
930 K::Target: KVStore + MaybeSync,
931 L::Target: Logger,
932 ES::Target: EntropySource + Sized,
933 SP::Target: SignerProvider + Sized,
934 BI::Target: BroadcasterInterface,
935 FE::Target: FeeEstimator,
936 <SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
937{
938 pub(crate) fn spawn_async_persist_new_channel(
939 &self, monitor_name: MonitorName,
940 monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
941 notifier: Arc<Notifier>,
942 ) {
943 let inner = Arc::clone(&self.0);
944 let future = inner.persist_new_channel(monitor_name, monitor);
947 let channel_id = monitor.channel_id();
948 let completion = (monitor.channel_id(), monitor.get_latest_update_id());
949 self.0.future_spawner.spawn(async move {
950 match future.await {
951 Ok(()) => {
952 inner.async_completed_updates.lock().unwrap().push(completion);
953 notifier.notify();
954 },
955 Err(e) => {
956 log_error!(
957 inner.logger,
958 "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
959 );
960 },
961 }
962 });
963 }
964
965 pub(crate) fn spawn_async_update_channel(
966 &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
967 monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
968 notifier: Arc<Notifier>,
969 ) {
970 let inner = Arc::clone(&self.0);
971 let future = inner.update_persisted_channel(monitor_name, update, monitor);
974 let channel_id = monitor.channel_id();
975 let completion = if let Some(update) = update {
976 Some((monitor.channel_id(), update.update_id))
977 } else {
978 None
979 };
980 let inner = Arc::clone(&self.0);
981 self.0.future_spawner.spawn(async move {
982 match future.await {
983 Ok(()) => if let Some(completion) = completion {
984 inner.async_completed_updates.lock().unwrap().push(completion);
985 notifier.notify();
986 },
987 Err(e) => {
988 log_error!(
989 inner.logger,
990 "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
991 );
992 },
993 }
994 });
995 }
996
997 pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) {
998 let inner = Arc::clone(&self.0);
999 self.0.future_spawner.spawn(async move {
1000 inner.archive_persisted_channel(monitor_name).await;
1001 });
1002 }
1003
1004 pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
1005 mem::take(&mut *self.0.async_completed_updates.lock().unwrap())
1006 }
1007}
1008
1009impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
1010 MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>
1011where
1012 K::Target: KVStore,
1013 L::Target: Logger,
1014 ES::Target: EntropySource + Sized,
1015 SP::Target: SignerProvider + Sized,
1016 BI::Target: BroadcasterInterface,
1017 FE::Target: FeeEstimator,
1018{
1019 pub async fn read_channel_monitor_with_updates(
1020 &self, monitor_key: &str,
1021 ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
1022 {
1023 match self.maybe_read_channel_monitor_with_updates(monitor_key).await? {
1024 Some(res) => Ok(res),
1025 None => Err(io::Error::new(
1026 io::ErrorKind::InvalidData,
1027 format!(
1028 "ChannelMonitor {} was stale, with no updates since LDK 0.0.118. \
1029 It cannot be read by modern versions of LDK, though also does not contain any funds left to sweep. \
1030 You should manually delete it instead",
1031 monitor_key,
1032 ),
1033 )),
1034 }
1035 }
1036
1037 async fn maybe_read_channel_monitor_with_updates(
1038 &self, monitor_key: &str,
1039 ) -> Result<
1040 Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
1041 io::Error,
1042 > {
1043 let monitor_name = MonitorName::from_str(monitor_key)?;
1044 let read_res = self.maybe_read_monitor(&monitor_name, monitor_key).await?;
1045 let (block_hash, monitor) = match read_res {
1046 Some(res) => res,
1047 None => return Ok(None),
1048 };
1049 let mut current_update_id = monitor.get_latest_update_id();
1050 loop {
1052 current_update_id = match current_update_id.checked_add(1) {
1053 Some(next_update_id) => next_update_id,
1054 None => break,
1055 };
1056 let update_name = UpdateName::from(current_update_id);
1057 let update = match self.read_monitor_update(monitor_key, &update_name).await {
1058 Ok(update) => update,
1059 Err(err) if err.kind() == io::ErrorKind::NotFound => {
1060 break;
1062 },
1063 Err(err) => return Err(err),
1064 };
1065
1066 monitor
1067 .update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1068 .map_err(|e| {
1069 log_error!(
1070 self.logger,
1071 "Monitor update failed. monitor: {} update: {} reason: {:?}",
1072 monitor_key,
1073 update_name.as_str(),
1074 e
1075 );
1076 io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1077 })?;
1078 }
1079 Ok(Some((block_hash, monitor)))
1080 }
1081
1082 async fn maybe_read_monitor(
1084 &self, monitor_name: &MonitorName, monitor_key: &str,
1085 ) -> Result<
1086 Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
1087 io::Error,
1088 > {
1089 let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1090 let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1091 let monitor_bytes = self.kv_store.read(primary, secondary, monitor_key).await?;
1092 let mut monitor_cursor = io::Cursor::new(monitor_bytes);
1093 if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
1095 monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
1096 }
1097 match <Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>>::read(
1098 &mut monitor_cursor,
1099 (&*self.entropy_source, &*self.signer_provider),
1100 ) {
1101 Ok(None) => Ok(None),
1102 Ok(Some((blockhash, channel_monitor))) => {
1103 if channel_monitor.persistence_key() != *monitor_name {
1104 log_error!(
1105 self.logger,
1106 "ChannelMonitor {} was stored under the wrong key!",
1107 monitor_key,
1108 );
1109 Err(io::Error::new(
1110 io::ErrorKind::InvalidData,
1111 "ChannelMonitor was stored under the wrong key",
1112 ))
1113 } else {
1114 Ok(Some((blockhash, channel_monitor)))
1115 }
1116 },
1117 Err(e) => {
1118 log_error!(
1119 self.logger,
1120 "Failed to read ChannelMonitor {}, reason: {}",
1121 monitor_key,
1122 e,
1123 );
1124 Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
1125 },
1126 }
1127 }
1128
1129 async fn read_monitor_update(
1131 &self, monitor_key: &str, update_name: &UpdateName,
1132 ) -> Result<ChannelMonitorUpdate, io::Error> {
1133 let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1134 let update_bytes = self.kv_store.read(primary, monitor_key, update_name.as_str()).await?;
1135 ChannelMonitorUpdate::read(&mut &update_bytes[..]).map_err(|e| {
1136 log_error!(
1137 self.logger,
1138 "Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}",
1139 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1140 monitor_key,
1141 update_name.as_str(),
1142 e,
1143 );
1144 io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitorUpdate")
1145 })
1146 }
1147
1148 async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
1149 let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1150 let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1151 let monitor_keys = self.kv_store.list(primary, secondary).await?;
1152 for monitor_key in monitor_keys {
1153 let monitor_name = MonitorName::from_str(&monitor_key)?;
1154 let maybe_monitor = self.maybe_read_monitor(&monitor_name, &monitor_key).await?;
1155 if let Some((_, current_monitor)) = maybe_monitor {
1156 let latest_update_id = current_monitor.get_latest_update_id();
1157 self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy)
1158 .await?;
1159 } else {
1160 }
1163 }
1164 Ok(())
1165 }
1166
1167 async fn cleanup_stale_updates_for_monitor_to(
1168 &self, monitor_key: &str, latest_update_id: u64, lazy: bool,
1169 ) -> Result<(), io::Error> {
1170 let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1171 let updates = self.kv_store.list(primary, monitor_key).await?;
1172 for update in updates {
1173 let update_name = UpdateName::new(update)?;
1174 if update_name.0 <= latest_update_id {
1176 self.kv_store.remove(primary, monitor_key, update_name.as_str(), lazy).await?;
1177 }
1178 }
1179 Ok(())
1180 }
1181
1182 fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
1183 &self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
1184 ) -> impl Future<Output = Result<(), io::Error>> {
1185 let monitor_key = monitor_name.to_string();
1187 let mut monitor_bytes = Vec::with_capacity(
1189 MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
1190 );
1191 if self.maximum_pending_updates != 0 {
1195 monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
1196 }
1197 monitor.write(&mut monitor_bytes).unwrap();
1198 let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1202 let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1203 self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)
1204 }
1205
1206 fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>(
1207 self: Arc<Self>, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
1208 monitor: &ChannelMonitor<ChannelSigner>,
1209 ) -> impl Future<Output = Result<(), io::Error>> + 'a
1210 where
1211 Self: 'a,
1212 {
1213 const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
1214 let mut res_a = None;
1215 let mut res_b = None;
1216 let mut res_c = None;
1217 if let Some(update) = update {
1218 let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
1219 && self.maximum_pending_updates != 0
1220 && update.update_id % self.maximum_pending_updates != 0;
1221 if persist_update {
1222 let monitor_key = monitor_name.to_string();
1223 let update_name = UpdateName::from(update.update_id);
1224 let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1225 res_a = Some(self.kv_store.write(
1230 primary,
1231 &monitor_key,
1232 update_name.as_str(),
1233 update.encode(),
1234 ));
1235 } else {
1236 let write_fut = self.persist_new_channel(monitor_name, monitor);
1242 let latest_update_id = monitor.get_latest_update_id();
1243
1244 res_b = Some(async move {
1245 let write_status = write_fut.await;
1246 if let Ok(()) = write_status {
1247 if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1248 let monitor_key = monitor_name.to_string();
1249 self.cleanup_stale_updates_for_monitor_to(
1250 &monitor_key,
1251 latest_update_id,
1252 true,
1253 )
1254 .await?;
1255 } else {
1256 let end = latest_update_id;
1257 let start = end.saturating_sub(self.maximum_pending_updates);
1258 self.cleanup_in_range(monitor_name, start, end).await;
1259 }
1260 }
1261
1262 write_status
1263 });
1264 }
1265 } else {
1266 res_c = Some(self.persist_new_channel(monitor_name, monitor));
1271 }
1272 async move {
1273 if let Some(a) = res_a {
1277 a.await?;
1278 }
1279 if let Some(b) = res_b {
1280 b.await?;
1281 }
1282 if let Some(c) = res_c {
1283 c.await?;
1284 }
1285 Ok(())
1286 }
1287 }
1288
1289 async fn archive_persisted_channel(&self, monitor_name: MonitorName) {
1290 let monitor_key = monitor_name.to_string();
1291 let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await {
1292 Ok((_block_hash, monitor)) => monitor,
1293 Err(_) => return,
1294 };
1295 let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1296 let secondary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1297 match self.kv_store.write(primary, secondary, &monitor_key, monitor.encode()).await {
1298 Ok(()) => {},
1299 Err(_e) => return,
1300 };
1301 let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
1302 let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
1303 let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await;
1304 }
1305
1306 async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
1308 let monitor_key = monitor_name.to_string();
1309 for update_id in start..=end {
1310 let update_name = UpdateName::from(update_id);
1311 let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
1312 let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await;
1313 if let Err(e) = res {
1314 log_error!(
1315 self.logger,
1316 "Failed to clean up channel monitor updates for monitor {}, reason: {}",
1317 monitor_key.as_str(),
1318 e
1319 );
1320 };
1321 }
1322 }
1323}
1324
1325#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
1369pub enum MonitorName {
1370 V1Channel(OutPoint),
1372
1373 V2Channel(ChannelId),
1375}
1376
1377impl MonitorName {
1378 fn from_str(monitor_key: &str) -> Result<Self, io::Error> {
1382 let mut parts = monitor_key.splitn(2, '_');
1383 let id = parts
1384 .next()
1385 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Empty stored key"))?;
1386
1387 if let Some(part) = parts.next() {
1388 let txid = Txid::from_str(id).map_err(|_| {
1389 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
1390 })?;
1391 let index: u16 = part.parse().map_err(|_| {
1392 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
1393 })?;
1394 let outpoint = OutPoint { txid, index };
1395 Ok(MonitorName::V1Channel(outpoint))
1396 } else {
1397 let bytes = <[u8; 32]>::from_hex(id).map_err(|_| {
1398 io::Error::new(io::ErrorKind::InvalidData, "Invalid channel ID in stored key")
1399 })?;
1400 Ok(MonitorName::V2Channel(ChannelId(bytes)))
1401 }
1402 }
1403}
1404
1405impl core::fmt::Display for MonitorName {
1406 fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> {
1407 match self {
1408 MonitorName::V1Channel(outpoint) => {
1409 write!(f, "{}_{}", outpoint.txid, outpoint.index)
1410 },
1411 MonitorName::V2Channel(channel_id) => {
1412 write!(f, "{}", channel_id)
1413 },
1414 }
1415 }
1416}
1417
1418#[derive(Debug)]
1452pub struct UpdateName(pub u64, String);
1453
1454impl UpdateName {
1455 pub fn new(name: String) -> Result<Self, io::Error> {
1458 match name.parse::<u64>() {
1459 Ok(u) => Ok(u.into()),
1460 Err(_) => {
1461 Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
1462 },
1463 }
1464 }
1465
1466 pub fn as_str(&self) -> &str {
1480 &self.1
1481 }
1482}
1483
1484impl From<u64> for UpdateName {
1485 fn from(value: u64) -> Self {
1501 Self(value, value.to_string())
1502 }
1503}
1504
1505#[cfg(test)]
1506mod tests {
1507 use super::*;
1508 use crate::chain::ChannelMonitorUpdateStatus;
1509 use crate::events::ClosureReason;
1510 use crate::ln::functional_test_utils::*;
1511 use crate::ln::msgs::BaseMessageHandler;
1512 use crate::sync::Arc;
1513 use crate::util::test_channel_signer::TestChannelSigner;
1514 use crate::util::test_utils::{self, TestStore};
1515 use crate::{check_added_monitors, check_closed_broadcast};
1516 use bitcoin::hashes::hex::FromHex;
1517 use core::cmp;
1518
1519 const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
1520
1521 #[test]
1522 fn converts_u64_to_update_name() {
1523 assert_eq!(UpdateName::from(0).as_str(), "0");
1524 assert_eq!(UpdateName::from(21).as_str(), "21");
1525 assert_eq!(UpdateName::from(u64::MAX).as_str(), "18446744073709551615");
1526 }
1527
1528 #[test]
1529 fn bad_update_name_fails() {
1530 assert!(UpdateName::new("deadbeef".to_string()).is_err());
1531 assert!(UpdateName::new("-1".to_string()).is_err());
1532 }
1533
1534 #[test]
1535 fn creates_monitor_from_outpoint() {
1536 let monitor_name = MonitorName::V1Channel(OutPoint {
1537 txid: Txid::from_str(
1538 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
1539 )
1540 .unwrap(),
1541 index: 1,
1542 });
1543 assert_eq!(
1544 &monitor_name.to_string(),
1545 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
1546 );
1547
1548 let monitor_name = MonitorName::V1Channel(OutPoint {
1549 txid: Txid::from_str(
1550 "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
1551 )
1552 .unwrap(),
1553 index: u16::MAX,
1554 });
1555 assert_eq!(
1556 &monitor_name.to_string(),
1557 "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
1558 );
1559 }
1560
1561 #[test]
1562 fn creates_monitor_from_channel_id() {
1563 let monitor_name = MonitorName::V2Channel(ChannelId(
1564 <[u8; 32]>::from_hex(
1565 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
1566 )
1567 .unwrap(),
1568 ));
1569 assert_eq!(
1570 &monitor_name.to_string(),
1571 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
1572 );
1573 }
1574
1575 #[test]
1576 fn fails_parsing_monitor_name() {
1577 assert!(MonitorName::from_str(
1578 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_"
1579 )
1580 .is_err());
1581 assert!(MonitorName::from_str(
1582 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536"
1583 )
1584 .is_err());
1585 assert!(MonitorName::from_str(
1586 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21"
1587 )
1588 .is_err());
1589 }
1590
1591 fn do_persister_with_real_monitors(max_pending_updates_0: u64, max_pending_updates_1: u64) {
1593 let chanmon_cfgs = create_chanmon_cfgs(4);
1594 let kv_store_0 = TestStore::new(false);
1595 let persister_0 = MonitorUpdatingPersister::new(
1596 &kv_store_0,
1597 &chanmon_cfgs[0].logger,
1598 max_pending_updates_0,
1599 &chanmon_cfgs[0].keys_manager,
1600 &chanmon_cfgs[0].keys_manager,
1601 &chanmon_cfgs[0].tx_broadcaster,
1602 &chanmon_cfgs[0].fee_estimator,
1603 );
1604 let kv_store_1 = TestStore::new(false);
1605 let persister_1 = MonitorUpdatingPersister::new(
1606 &kv_store_1,
1607 &chanmon_cfgs[1].logger,
1608 max_pending_updates_1,
1609 &chanmon_cfgs[1].keys_manager,
1610 &chanmon_cfgs[1].keys_manager,
1611 &chanmon_cfgs[1].tx_broadcaster,
1612 &chanmon_cfgs[1].fee_estimator,
1613 );
1614 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1615 let chain_mon_0 = test_utils::TestChainMonitor::new(
1616 Some(&chanmon_cfgs[0].chain_source),
1617 &chanmon_cfgs[0].tx_broadcaster,
1618 &chanmon_cfgs[0].logger,
1619 &chanmon_cfgs[0].fee_estimator,
1620 &persister_0,
1621 &chanmon_cfgs[0].keys_manager,
1622 );
1623 let chain_mon_1 = test_utils::TestChainMonitor::new(
1624 Some(&chanmon_cfgs[1].chain_source),
1625 &chanmon_cfgs[1].tx_broadcaster,
1626 &chanmon_cfgs[1].logger,
1627 &chanmon_cfgs[1].fee_estimator,
1628 &persister_1,
1629 &chanmon_cfgs[1].keys_manager,
1630 );
1631 node_cfgs[0].chain_monitor = chain_mon_0;
1632 node_cfgs[1].chain_monitor = chain_mon_1;
1633 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1634 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1635
1636 let mut persisted_chan_data_0 =
1639 persister_0.read_all_channel_monitors_with_updates().unwrap();
1640 assert_eq!(persisted_chan_data_0.len(), 0);
1641 let mut persisted_chan_data_1 =
1642 persister_1.read_all_channel_monitors_with_updates().unwrap();
1643 assert_eq!(persisted_chan_data_1.len(), 0);
1644
1645 macro_rules! check_persisted_data {
1647 ($expected_update_id: expr) => {
1648 persisted_chan_data_0 =
1649 persister_0.read_all_channel_monitors_with_updates().unwrap();
1650 assert_eq!(persisted_chan_data_0.len(), 1);
1652 for (_, mon) in persisted_chan_data_0.iter() {
1653 assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1655
1656 let monitor_name = mon.persistence_key();
1657 let expected_updates = if max_pending_updates_0 == 0 {
1658 0
1659 } else {
1660 mon.get_latest_update_id() % max_pending_updates_0
1661 };
1662 let update_list = KVStoreSync::list(
1663 &kv_store_0,
1664 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1665 &monitor_name.to_string(),
1666 );
1667 assert_eq!(update_list.unwrap().len() as u64, expected_updates, "persister 0");
1668 }
1669 persisted_chan_data_1 =
1670 persister_1.read_all_channel_monitors_with_updates().unwrap();
1671 assert_eq!(persisted_chan_data_1.len(), 1);
1672 for (_, mon) in persisted_chan_data_1.iter() {
1673 assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1674 let monitor_name = mon.persistence_key();
1675 let expected_updates = if max_pending_updates_1 == 0 {
1676 0
1677 } else {
1678 mon.get_latest_update_id() % max_pending_updates_1
1679 };
1680 let update_list = KVStoreSync::list(
1681 &kv_store_1,
1682 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1683 &monitor_name.to_string(),
1684 );
1685 assert_eq!(update_list.unwrap().len() as u64, expected_updates, "persister 1");
1686 }
1687 };
1688 }
1689
1690 let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1692 check_persisted_data!(0);
1693
1694 send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1696 check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
1697 send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1698 check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
1699
1700 let mut sender = 0;
1703 for i in 3..=max_pending_updates_0 * 2 {
1704 let receiver;
1705 if sender == 0 {
1706 sender = 1;
1707 receiver = 0;
1708 } else {
1709 sender = 0;
1710 receiver = 1;
1711 }
1712 send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
1713 check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
1714 }
1715
1716 let node_id_1 = nodes[1].node.get_our_node_id();
1720 let chan_id = nodes[0].node.list_channels()[0].channel_id;
1721 let message = "Channel force-closed".to_owned();
1722 nodes[0]
1723 .node
1724 .force_close_broadcasting_latest_txn(&chan_id, &node_id_1, message.clone())
1725 .unwrap();
1726
1727 let reason =
1728 ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
1729 check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
1730 check_closed_broadcast!(nodes[0], true);
1731 check_added_monitors!(nodes[0], 1);
1732
1733 let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
1734 assert_eq!(node_txn.len(), 1);
1735 let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
1736 let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
1737 connect_block(&nodes[1], &dummy_block);
1738
1739 check_closed_broadcast!(nodes[1], true);
1740 let reason = ClosureReason::CommitmentTxConfirmed;
1741 let node_id_0 = nodes[0].node.get_our_node_id();
1742 check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
1743 check_added_monitors!(nodes[1], 1);
1744
1745 check_persisted_data!(
1748 cmp::max(2, max_pending_updates_0 * 2) * EXPECTED_UPDATES_PER_PAYMENT + 1
1749 );
1750 }
1751
1752 #[test]
1753 fn persister_with_real_monitors() {
1754 do_persister_with_real_monitors(7, 3);
1755 do_persister_with_real_monitors(0, 1);
1756 do_persister_with_real_monitors(4, 2);
1757 }
1758
1759 #[test]
1762 fn unrecoverable_error_on_write_failure() {
1763 let chanmon_cfgs = create_chanmon_cfgs(2);
1766 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1767 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1768 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1769 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
1770
1771 let message = "Channel force-closed".to_owned();
1772 let node_id_0 = nodes[0].node.get_our_node_id();
1773 nodes[1]
1774 .node
1775 .force_close_broadcasting_latest_txn(&chan.2, &node_id_0, message.clone())
1776 .unwrap();
1777 let reason =
1778 ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
1779 check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
1780
1781 {
1782 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
1783 let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
1784 let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
1785
1786 let store = TestStore::new(true);
1787 let ro_persister = MonitorUpdatingPersister::new(
1788 &store,
1789 node_cfgs[0].logger,
1790 11,
1791 node_cfgs[0].keys_manager,
1792 node_cfgs[0].keys_manager,
1793 node_cfgs[0].tx_broadcaster,
1794 node_cfgs[0].fee_estimator,
1795 );
1796 let monitor_name = added_monitors[0].1.persistence_key();
1797 match ro_persister.persist_new_channel(monitor_name, &added_monitors[0].1) {
1798 ChannelMonitorUpdateStatus::UnrecoverableError => {
1799 },
1801 ChannelMonitorUpdateStatus::Completed => {
1802 panic!("Completed persisting new channel when shouldn't have")
1803 },
1804 ChannelMonitorUpdateStatus::InProgress => {
1805 panic!("Returned InProgress when shouldn't have")
1806 },
1807 }
1808 match ro_persister.update_persisted_channel(
1809 monitor_name,
1810 Some(cmu),
1811 &added_monitors[0].1,
1812 ) {
1813 ChannelMonitorUpdateStatus::UnrecoverableError => {
1814 },
1816 ChannelMonitorUpdateStatus::Completed => {
1817 panic!("Completed persisting new channel when shouldn't have")
1818 },
1819 ChannelMonitorUpdateStatus::InProgress => {
1820 panic!("Returned InProgress when shouldn't have")
1821 },
1822 }
1823 added_monitors.clear();
1824 }
1825 nodes[1].node.get_and_clear_pending_msg_events();
1826 }
1827
1828 #[test]
1830 fn clean_stale_updates_works() {
1831 let test_max_pending_updates = 7;
1832 let chanmon_cfgs = create_chanmon_cfgs(3);
1833 let kv_store_0 = TestStore::new(false);
1834 let persister_0 = MonitorUpdatingPersister::new(
1835 &kv_store_0,
1836 &chanmon_cfgs[0].logger,
1837 test_max_pending_updates,
1838 &chanmon_cfgs[0].keys_manager,
1839 &chanmon_cfgs[0].keys_manager,
1840 &chanmon_cfgs[0].tx_broadcaster,
1841 &chanmon_cfgs[0].fee_estimator,
1842 );
1843 let kv_store_1 = TestStore::new(false);
1844 let persister_1 = MonitorUpdatingPersister::new(
1845 &kv_store_1,
1846 &chanmon_cfgs[1].logger,
1847 test_max_pending_updates,
1848 &chanmon_cfgs[1].keys_manager,
1849 &chanmon_cfgs[1].keys_manager,
1850 &chanmon_cfgs[1].tx_broadcaster,
1851 &chanmon_cfgs[1].fee_estimator,
1852 );
1853 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1854 let chain_mon_0 = test_utils::TestChainMonitor::new(
1855 Some(&chanmon_cfgs[0].chain_source),
1856 &chanmon_cfgs[0].tx_broadcaster,
1857 &chanmon_cfgs[0].logger,
1858 &chanmon_cfgs[0].fee_estimator,
1859 &persister_0,
1860 &chanmon_cfgs[0].keys_manager,
1861 );
1862 let chain_mon_1 = test_utils::TestChainMonitor::new(
1863 Some(&chanmon_cfgs[1].chain_source),
1864 &chanmon_cfgs[1].tx_broadcaster,
1865 &chanmon_cfgs[1].logger,
1866 &chanmon_cfgs[1].fee_estimator,
1867 &persister_1,
1868 &chanmon_cfgs[1].keys_manager,
1869 );
1870 node_cfgs[0].chain_monitor = chain_mon_0;
1871 node_cfgs[1].chain_monitor = chain_mon_1;
1872 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1873 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1874
1875 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
1878 assert_eq!(persisted_chan_data.len(), 0);
1879
1880 let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1882
1883 send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1885 send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1886
1887 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
1889 let (_, monitor) = &persisted_chan_data[0];
1890 let monitor_name = monitor.persistence_key();
1891 KVStoreSync::write(
1892 &kv_store_0,
1893 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1894 &monitor_name.to_string(),
1895 UpdateName::from(1).as_str(),
1896 vec![0u8; 1],
1897 )
1898 .unwrap();
1899
1900 persister_0.cleanup_stale_updates(false).unwrap();
1902
1903 assert!(KVStoreSync::read(
1905 &kv_store_0,
1906 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1907 &monitor_name.to_string(),
1908 UpdateName::from(1).as_str()
1909 )
1910 .is_err());
1911 }
1912
1913 fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
1914 where
1915 P::Target: Persist<ChannelSigner>,
1916 {
1917 true
1918 }
1919
1920 #[test]
1921 fn kvstore_trait_object_usage() {
1922 let store: Arc<dyn KVStoreSync + Send + Sync> = Arc::new(TestStore::new(false));
1923 assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store)));
1924 }
1925}