1use bitcoin::block::Header;
27use bitcoin::hash_types::{BlockHash, Txid};
28
29use bitcoin::secp256k1::PublicKey;
30
31use crate::chain;
32use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
33#[cfg(peer_storage)]
34use crate::chain::channelmonitor::write_chanmon_internal;
35use crate::chain::channelmonitor::{
36 Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs,
37 WithChannelMonitor,
38};
39use crate::chain::transaction::{OutPoint, TransactionData};
40use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
41use crate::events::{self, Event, EventHandler, ReplayEvent};
42use crate::ln::channel_state::ChannelDetails;
43#[cfg(peer_storage)]
44use crate::ln::msgs::PeerStorage;
45use crate::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
46#[cfg(peer_storage)]
47use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder};
48use crate::ln::types::ChannelId;
49use crate::prelude::*;
50use crate::sign::ecdsa::EcdsaChannelSigner;
51use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
52use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
53use crate::types::features::{InitFeatures, NodeFeatures};
54use crate::util::async_poll::{MaybeSend, MaybeSync};
55use crate::util::errors::APIError;
56use crate::util::logger::{Logger, WithContext};
57use crate::util::native_async::FutureSpawner;
58use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
59#[cfg(peer_storage)]
60use crate::util::ser::{VecWriter, Writeable};
61use crate::util::wakers::{Future, Notifier};
62
63use alloc::sync::Arc;
64#[cfg(peer_storage)]
65use core::iter::Cycle;
66use core::ops::Deref;
67use core::sync::atomic::{AtomicUsize, Ordering};
68
69pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
125 fn persist_new_channel(
144 &self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
145 ) -> ChannelMonitorUpdateStatus;
146
147 fn update_persisted_channel(
185 &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
186 monitor: &ChannelMonitor<ChannelSigner>,
187 ) -> ChannelMonitorUpdateStatus;
188 fn archive_persisted_channel(&self, monitor_name: MonitorName);
200
201 #[doc(hidden)]
208 fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209 Vec::new()
210 }
211}
212
213struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
214 monitor: ChannelMonitor<ChannelSigner>,
215 pending_monitor_updates: Mutex<Vec<u64>>,
230}
231
232impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
233 fn has_pending_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<u64>>) -> bool {
234 !pending_monitor_updates_lock.is_empty()
235 }
236}
237
238pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
243 lock: RwLockReadGuard<'a, HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
244 channel_id: ChannelId,
245}
246
247impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
248 type Target = ChannelMonitor<ChannelSigner>;
249 fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
250 &self.lock.get(&self.channel_id).expect("Checked at construction").monitor
251 }
252}
253
254pub struct AsyncPersister<
259 K: Deref + MaybeSend + MaybeSync + 'static,
260 S: FutureSpawner,
261 L: Deref + MaybeSend + MaybeSync + 'static,
262 ES: Deref + MaybeSend + MaybeSync + 'static,
263 SP: Deref + MaybeSend + MaybeSync + 'static,
264 BI: Deref + MaybeSend + MaybeSync + 'static,
265 FE: Deref + MaybeSend + MaybeSync + 'static,
266> where
267 K::Target: KVStore + MaybeSync,
268 L::Target: Logger,
269 ES::Target: EntropySource + Sized,
270 SP::Target: SignerProvider + Sized,
271 BI::Target: BroadcasterInterface,
272 FE::Target: FeeEstimator,
273{
274 persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275 event_notifier: Arc<Notifier>,
276}
277
278impl<
279 K: Deref + MaybeSend + MaybeSync + 'static,
280 S: FutureSpawner,
281 L: Deref + MaybeSend + MaybeSync + 'static,
282 ES: Deref + MaybeSend + MaybeSync + 'static,
283 SP: Deref + MaybeSend + MaybeSync + 'static,
284 BI: Deref + MaybeSend + MaybeSync + 'static,
285 FE: Deref + MaybeSend + MaybeSync + 'static,
286 > Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
287where
288 K::Target: KVStore + MaybeSync,
289 L::Target: Logger,
290 ES::Target: EntropySource + Sized,
291 SP::Target: SignerProvider + Sized,
292 BI::Target: BroadcasterInterface,
293 FE::Target: FeeEstimator,
294{
295 type Target = Self;
296 fn deref(&self) -> &Self {
297 self
298 }
299}
300
301impl<
302 K: Deref + MaybeSend + MaybeSync + 'static,
303 S: FutureSpawner,
304 L: Deref + MaybeSend + MaybeSync + 'static,
305 ES: Deref + MaybeSend + MaybeSync + 'static,
306 SP: Deref + MaybeSend + MaybeSync + 'static,
307 BI: Deref + MaybeSend + MaybeSync + 'static,
308 FE: Deref + MaybeSend + MaybeSync + 'static,
309 > Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
310where
311 K::Target: KVStore + MaybeSync,
312 L::Target: Logger,
313 ES::Target: EntropySource + Sized,
314 SP::Target: SignerProvider + Sized,
315 BI::Target: BroadcasterInterface,
316 FE::Target: FeeEstimator,
317 <SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
318{
319 fn persist_new_channel(
320 &self, monitor_name: MonitorName,
321 monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322 ) -> ChannelMonitorUpdateStatus {
323 let notifier = Arc::clone(&self.event_notifier);
324 self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
325 ChannelMonitorUpdateStatus::InProgress
326 }
327
328 fn update_persisted_channel(
329 &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330 monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331 ) -> ChannelMonitorUpdateStatus {
332 let notifier = Arc::clone(&self.event_notifier);
333 self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
334 ChannelMonitorUpdateStatus::InProgress
335 }
336
337 fn archive_persisted_channel(&self, monitor_name: MonitorName) {
338 self.persister.spawn_async_archive_persisted_channel(monitor_name);
339 }
340
341 fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342 self.persister.get_and_clear_completed_updates()
343 }
344}
345
346pub struct ChainMonitor<
363 ChannelSigner: EcdsaChannelSigner,
364 C: Deref,
365 T: Deref,
366 F: Deref,
367 L: Deref,
368 P: Deref,
369 ES: Deref,
370> where
371 C::Target: chain::Filter,
372 T::Target: BroadcasterInterface,
373 F::Target: FeeEstimator,
374 L::Target: Logger,
375 P::Target: Persist<ChannelSigner>,
376 ES::Target: EntropySource,
377{
378 monitors: RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
379 chain_source: Option<C>,
380 broadcaster: T,
381 logger: L,
382 fee_estimator: F,
383 persister: P,
384 _entropy_source: ES,
385 pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
388 highest_chain_height: AtomicUsize,
390
391 event_notifier: Arc<Notifier>,
394
395 pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
397
398 #[cfg(peer_storage)]
399 our_peerstorage_encryption_key: PeerStorageKey,
400}
401
402impl<
403 K: Deref + MaybeSend + MaybeSync + 'static,
404 S: FutureSpawner,
405 SP: Deref + MaybeSend + MaybeSync + 'static,
406 C: Deref,
407 T: Deref + MaybeSend + MaybeSync + 'static,
408 F: Deref + MaybeSend + MaybeSync + 'static,
409 L: Deref + MaybeSend + MaybeSync + 'static,
410 ES: Deref + MaybeSend + MaybeSync + 'static,
411 >
412 ChainMonitor<
413 <SP::Target as SignerProvider>::EcdsaSigner,
414 C,
415 T,
416 F,
417 L,
418 AsyncPersister<K, S, L, ES, SP, T, F>,
419 ES,
420 > where
421 K::Target: KVStore + MaybeSync,
422 SP::Target: SignerProvider + Sized,
423 C::Target: chain::Filter,
424 T::Target: BroadcasterInterface,
425 F::Target: FeeEstimator,
426 L::Target: Logger,
427 ES::Target: EntropySource + Sized,
428 <SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
429{
430 pub fn new_async_beta(
439 chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
440 persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441 _our_peerstorage_encryption_key: PeerStorageKey,
442 ) -> Self {
443 let event_notifier = Arc::new(Notifier::new());
444 Self {
445 monitors: RwLock::new(new_hash_map()),
446 chain_source,
447 broadcaster,
448 logger,
449 fee_estimator: feeest,
450 _entropy_source,
451 pending_monitor_events: Mutex::new(Vec::new()),
452 highest_chain_height: AtomicUsize::new(0),
453 event_notifier: Arc::clone(&event_notifier),
454 persister: AsyncPersister { persister, event_notifier },
455 pending_send_only_events: Mutex::new(Vec::new()),
456 #[cfg(peer_storage)]
457 our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
458 }
459 }
460}
461
462impl<
463 ChannelSigner: EcdsaChannelSigner,
464 C: Deref,
465 T: Deref,
466 F: Deref,
467 L: Deref,
468 P: Deref,
469 ES: Deref,
470 > ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
471where
472 C::Target: chain::Filter,
473 T::Target: BroadcasterInterface,
474 F::Target: FeeEstimator,
475 L::Target: Logger,
476 P::Target: Persist<ChannelSigner>,
477 ES::Target: EntropySource,
478{
479 fn process_chain_data<FN>(
491 &self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN,
492 ) where
493 FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
494 {
495 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
496 let channel_ids = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
497 let channel_count = channel_ids.len();
498 for channel_id in channel_ids.iter() {
499 let monitor_lock = self.monitors.read().unwrap();
500 if let Some(monitor_state) = monitor_lock.get(channel_id) {
501 let update_res = self.update_monitor_with_chain_data(
502 header,
503 best_height,
504 txdata,
505 &process,
506 channel_id,
507 &monitor_state,
508 channel_count,
509 );
510 if update_res.is_err() {
511 core::mem::drop(monitor_lock);
514 let _poison = self.monitors.write().unwrap();
515 log_error!(self.logger, "{}", err_str);
516 panic!("{}", err_str);
517 }
518 }
519 }
520
521 let monitor_states = self.monitors.write().unwrap();
523 for (channel_id, monitor_state) in monitor_states.iter() {
524 if !channel_ids.contains(channel_id) {
525 let update_res = self.update_monitor_with_chain_data(
526 header,
527 best_height,
528 txdata,
529 &process,
530 channel_id,
531 &monitor_state,
532 channel_count,
533 );
534 if update_res.is_err() {
535 log_error!(self.logger, "{}", err_str);
536 panic!("{}", err_str);
537 }
538 }
539 }
540
541 if let Some(height) = best_height {
542 let old_height = self.highest_chain_height.load(Ordering::Acquire);
545 let new_height = height as usize;
546 if new_height > old_height {
547 self.highest_chain_height.store(new_height, Ordering::Release);
548 }
549 }
550 }
551
552 fn update_monitor_with_chain_data<FN>(
553 &self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN,
554 channel_id: &ChannelId, monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
555 ) -> Result<(), ()>
556 where
557 FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
558 {
559 let monitor = &monitor_state.monitor;
560 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
561
562 let mut txn_outputs = process(monitor, txdata);
563
564 let get_partition_key = |channel_id: &ChannelId| {
565 let channel_id_bytes = channel_id.0;
566 let channel_id_u32 = u32::from_be_bytes([
567 channel_id_bytes[0],
568 channel_id_bytes[1],
569 channel_id_bytes[2],
570 channel_id_bytes[3],
571 ]);
572 channel_id_u32.wrapping_add(best_height.unwrap_or_default())
573 };
574
575 let partition_factor = if channel_count < 15 {
576 5
577 } else {
578 50 };
580
581 let has_pending_claims = monitor_state.monitor.has_pending_claims();
582 if has_pending_claims || get_partition_key(channel_id) % partition_factor == 0 {
583 log_trace!(
584 logger,
585 "Syncing Channel Monitor for channel {}",
586 log_funding_info!(monitor)
587 );
588 let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
593 match self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor)
594 {
595 ChannelMonitorUpdateStatus::Completed => log_trace!(
596 logger,
597 "Finished syncing Channel Monitor for channel {} for block-data",
598 log_funding_info!(monitor)
599 ),
600 ChannelMonitorUpdateStatus::InProgress => {
601 log_trace!(
602 logger,
603 "Channel Monitor sync for channel {} in progress.",
604 log_funding_info!(monitor)
605 );
606 },
607 ChannelMonitorUpdateStatus::UnrecoverableError => {
608 return Err(());
609 },
610 }
611 }
612
613 if let Some(ref chain_source) = self.chain_source {
616 let block_hash = header.block_hash();
617 for (txid, mut outputs) in txn_outputs.drain(..) {
618 for (idx, output) in outputs.drain(..) {
619 let output = WatchedOutput {
621 block_hash: Some(block_hash),
622 outpoint: OutPoint { txid, index: idx as u16 },
623 script_pubkey: output.script_pubkey,
624 };
625 log_trace!(
626 logger,
627 "Adding monitoring for spends of outpoint {} to the filter",
628 output.outpoint
629 );
630 chain_source.register_output(output);
631 }
632 }
633 }
634 Ok(())
635 }
636
637 pub fn new(
657 chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
658 _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey,
659 ) -> Self {
660 Self {
661 monitors: RwLock::new(new_hash_map()),
662 chain_source,
663 broadcaster,
664 logger,
665 fee_estimator: feeest,
666 persister,
667 _entropy_source,
668 pending_monitor_events: Mutex::new(Vec::new()),
669 highest_chain_height: AtomicUsize::new(0),
670 event_notifier: Arc::new(Notifier::new()),
671 pending_send_only_events: Mutex::new(Vec::new()),
672 #[cfg(peer_storage)]
673 our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
674 }
675 }
676
677 pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
686 let mut ret = Vec::new();
687 let monitor_states = self.monitors.read().unwrap();
688 for (_, monitor_state) in monitor_states.iter().filter(|(channel_id, _)| {
689 for chan in ignored_channels {
690 if chan.channel_id == **channel_id {
691 return false;
692 }
693 }
694 true
695 }) {
696 ret.append(&mut monitor_state.monitor.get_claimable_balances());
697 }
698 ret
699 }
700
701 pub fn get_monitor(
707 &self, channel_id: ChannelId,
708 ) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
709 let lock = self.monitors.read().unwrap();
710 if lock.get(&channel_id).is_some() {
711 Ok(LockedChannelMonitor { lock, channel_id })
712 } else {
713 Err(())
714 }
715 }
716
717 pub fn list_monitors(&self) -> Vec<ChannelId> {
722 self.monitors.read().unwrap().keys().copied().collect()
723 }
724
725 #[cfg(not(c_bindings))]
726 pub fn list_pending_monitor_updates(&self) -> HashMap<ChannelId, Vec<u64>> {
731 hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(channel_id, holder)| {
732 (*channel_id, holder.pending_monitor_updates.lock().unwrap().clone())
733 }))
734 }
735
736 #[cfg(c_bindings)]
737 pub fn list_pending_monitor_updates(&self) -> Vec<(ChannelId, Vec<u64>)> {
742 let monitors = self.monitors.read().unwrap();
743 monitors
744 .iter()
745 .map(|(channel_id, holder)| {
746 (*channel_id, holder.pending_monitor_updates.lock().unwrap().clone())
747 })
748 .collect()
749 }
750
751 #[cfg(any(test, feature = "_test_utils"))]
752 pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
753 self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
754 }
755
756 pub fn channel_monitor_updated(
777 &self, channel_id: ChannelId, completed_update_id: u64,
778 ) -> Result<(), APIError> {
779 let monitors = self.monitors.read().unwrap();
780 let monitor_data = if let Some(mon) = monitors.get(&channel_id) {
781 mon
782 } else {
783 return Err(APIError::APIMisuseError {
784 err: format!("No ChannelMonitor matching channel ID {} found", channel_id),
785 });
786 };
787 let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
788 pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
789
790 let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates);
793 log_debug!(
794 self.logger,
795 "Completed off-chain monitor update {} for channel with channel ID {}, {}",
796 completed_update_id,
797 channel_id,
798 if monitor_is_pending_updates {
799 "still have pending off-chain updates"
800 } else {
801 "all off-chain updates complete, returning a MonitorEvent"
802 }
803 );
804 if monitor_is_pending_updates {
805 return Ok(());
808 }
809 let funding_txo = monitor_data.monitor.get_funding_txo();
810 self.pending_monitor_events.lock().unwrap().push((
811 funding_txo,
812 channel_id,
813 vec![MonitorEvent::Completed {
814 funding_txo,
815 channel_id,
816 monitor_update_id: monitor_data.monitor.get_latest_update_id(),
817 }],
818 monitor_data.monitor.get_counterparty_node_id(),
819 ));
820
821 self.event_notifier.notify();
822 Ok(())
823 }
824
825 #[cfg(any(test, fuzzing))]
829 pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) {
830 let monitors = self.monitors.read().unwrap();
831 let monitor = &monitors.get(&channel_id).unwrap().monitor;
832 let counterparty_node_id = monitor.get_counterparty_node_id();
833 let funding_txo = monitor.get_funding_txo();
834 self.pending_monitor_events.lock().unwrap().push((
835 funding_txo,
836 channel_id,
837 vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }],
838 counterparty_node_id,
839 ));
840 self.event_notifier.notify();
841 }
842
843 #[cfg(any(test, feature = "_test_utils"))]
844 pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
845 use crate::events::EventsProvider;
846 let events = core::cell::RefCell::new(Vec::new());
847 let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
848 self.process_pending_events(&event_handler);
849 events.into_inner()
850 }
851
852 pub async fn process_pending_events_async<
859 Future: core::future::Future<Output = Result<(), ReplayEvent>>,
860 H: Fn(Event) -> Future,
861 >(
862 &self, handler: H,
863 ) {
864 let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
867 for channel_id in mons_to_process {
868 let mut ev;
869 match super::channelmonitor::process_events_body!(
870 self.monitors.read().unwrap().get(&channel_id).map(|m| &m.monitor),
871 self.logger,
872 ev,
873 handler(ev).await
874 ) {
875 Ok(()) => {},
876 Err(ReplayEvent()) => {
877 self.event_notifier.notify();
878 },
879 }
880 }
881 }
882
883 pub fn get_update_future(&self) -> Future {
892 self.event_notifier.get_future()
893 }
894
895 pub fn rebroadcast_pending_claims(&self) {
901 let monitors = self.monitors.read().unwrap();
902 for (_, monitor_holder) in &*monitors {
903 monitor_holder.monitor.rebroadcast_pending_claims(
904 &*self.broadcaster,
905 &*self.fee_estimator,
906 &self.logger,
907 )
908 }
909 }
910
911 pub fn signer_unblocked(&self, monitor_opt: Option<ChannelId>) {
916 let monitors = self.monitors.read().unwrap();
917 if let Some(channel_id) = monitor_opt {
918 if let Some(monitor_holder) = monitors.get(&channel_id) {
919 monitor_holder.monitor.signer_unblocked(
920 &*self.broadcaster,
921 &*self.fee_estimator,
922 &self.logger,
923 )
924 }
925 } else {
926 for (_, monitor_holder) in &*monitors {
927 monitor_holder.monitor.signer_unblocked(
928 &*self.broadcaster,
929 &*self.fee_estimator,
930 &self.logger,
931 )
932 }
933 }
934 }
935
936 pub fn archive_fully_resolved_channel_monitors(&self) {
946 let mut have_monitors_to_prune = false;
947 for monitor_holder in self.monitors.read().unwrap().values() {
948 let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
949 let (is_fully_resolved, needs_persistence) =
950 monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
951 if is_fully_resolved {
952 have_monitors_to_prune = true;
953 }
954 if needs_persistence {
955 self.persister.update_persisted_channel(
956 monitor_holder.monitor.persistence_key(),
957 None,
958 &monitor_holder.monitor,
959 );
960 }
961 }
962 if have_monitors_to_prune {
963 let mut monitors = self.monitors.write().unwrap();
964 monitors.retain(|channel_id, monitor_holder| {
965 let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
966 let (is_fully_resolved, _) =
967 monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
968 if is_fully_resolved {
969 log_info!(
970 logger,
971 "Archiving fully resolved ChannelMonitor for channel ID {}",
972 channel_id
973 );
974 self.persister
975 .archive_persisted_channel(monitor_holder.monitor.persistence_key());
976 false
977 } else {
978 true
979 }
980 });
981 }
982 }
983
984 #[cfg(peer_storage)]
987 fn all_counterparty_node_ids(&self) -> HashSet<PublicKey> {
988 let mon = self.monitors.read().unwrap();
989 mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect()
990 }
991
992 #[cfg(peer_storage)]
993 fn send_peer_storage(&self, their_node_id: PublicKey) {
994 let mut monitors_list: Vec<PeerStorageMonitorHolder> = Vec::new();
995 let random_bytes = self._entropy_source.get_secure_random_bytes();
996
997 const MAX_PEER_STORAGE_SIZE: usize = 65531;
998 const USIZE_LEN: usize = core::mem::size_of::<usize>();
999 let mut random_bytes_cycle_iter = random_bytes.iter().cycle();
1000
1001 let mut current_size = 0;
1002 let monitors_lock = self.monitors.read().unwrap();
1003 let mut channel_ids = monitors_lock.keys().copied().collect();
1004
1005 fn next_random_id(
1006 channel_ids: &mut Vec<ChannelId>,
1007 random_bytes_cycle_iter: &mut Cycle<core::slice::Iter<u8>>,
1008 ) -> Option<ChannelId> {
1009 if channel_ids.is_empty() {
1010 return None;
1011 }
1012 let random_idx = {
1013 let mut usize_bytes = [0u8; USIZE_LEN];
1014 usize_bytes.iter_mut().for_each(|b| {
1015 *b = *random_bytes_cycle_iter.next().expect("A cycle never ends")
1016 });
1017 random_bytes_cycle_iter.next().expect("A cycle never ends");
1019 usize::from_le_bytes(usize_bytes) % channel_ids.len()
1020 };
1021 Some(channel_ids.swap_remove(random_idx))
1022 }
1023
1024 while let Some(channel_id) = next_random_id(&mut channel_ids, &mut random_bytes_cycle_iter)
1025 {
1026 let monitor_holder = if let Some(monitor_holder) = monitors_lock.get(&channel_id) {
1027 monitor_holder
1028 } else {
1029 debug_assert!(
1030 false,
1031 "Tried to access non-existing monitor, this should never happen"
1032 );
1033 break;
1034 };
1035
1036 let mut serialized_channel = VecWriter(Vec::new());
1037 let min_seen_secret = monitor_holder.monitor.get_min_seen_secret();
1038 let counterparty_node_id = monitor_holder.monitor.get_counterparty_node_id();
1039 {
1040 let inner_lock = monitor_holder.monitor.inner.lock().unwrap();
1041
1042 write_chanmon_internal(&inner_lock, true, &mut serialized_channel)
1043 .expect("can not write Channel Monitor for peer storage message");
1044 }
1045 let peer_storage_monitor = PeerStorageMonitorHolder {
1046 channel_id,
1047 min_seen_secret,
1048 counterparty_node_id,
1049 monitor_bytes: serialized_channel.0,
1050 };
1051
1052 let serialized_length = peer_storage_monitor.serialized_length();
1053
1054 if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
1055 continue;
1056 } else {
1057 current_size += serialized_length;
1058 monitors_list.push(peer_storage_monitor);
1059 }
1060 }
1061
1062 let serialised_channels = monitors_list.encode();
1063 let our_peer_storage = DecryptedOurPeerStorage::new(serialised_channels);
1064 let cipher = our_peer_storage.encrypt(&self.our_peerstorage_encryption_key, &random_bytes);
1065
1066 log_debug!(self.logger, "Sending Peer Storage to {}", log_pubkey!(their_node_id));
1067 let send_peer_storage_event = MessageSendEvent::SendPeerStorage {
1068 node_id: their_node_id,
1069 msg: PeerStorage { data: cipher.into_vec() },
1070 };
1071
1072 self.pending_send_only_events.lock().unwrap().push(send_peer_storage_event)
1073 }
1074
1075 pub fn load_existing_monitor(
1094 &self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1095 ) -> Result<ChannelMonitorUpdateStatus, ()> {
1096 if !monitor.written_by_0_1_or_later() {
1097 return chain::Watch::watch_channel(self, channel_id, monitor);
1098 }
1099
1100 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1101 let mut monitors = self.monitors.write().unwrap();
1102 let entry = match monitors.entry(channel_id) {
1103 hash_map::Entry::Occupied(_) => {
1104 log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1105 return Err(());
1106 },
1107 hash_map::Entry::Vacant(e) => e,
1108 };
1109 log_trace!(
1110 logger,
1111 "Loaded existing ChannelMonitor for channel {}",
1112 log_funding_info!(monitor)
1113 );
1114 if let Some(ref chain_source) = self.chain_source {
1115 monitor.load_outputs_to_watch(chain_source, &self.logger);
1116 }
1117 entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) });
1118
1119 Ok(ChannelMonitorUpdateStatus::Completed)
1120 }
1121}
1122
1123impl<
1124 ChannelSigner: EcdsaChannelSigner,
1125 C: Deref,
1126 T: Deref,
1127 F: Deref,
1128 L: Deref,
1129 P: Deref,
1130 ES: Deref,
1131 > BaseMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1132where
1133 C::Target: chain::Filter,
1134 T::Target: BroadcasterInterface,
1135 F::Target: FeeEstimator,
1136 L::Target: Logger,
1137 P::Target: Persist<ChannelSigner>,
1138 ES::Target: EntropySource,
1139{
1140 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
1141 let mut pending_events = self.pending_send_only_events.lock().unwrap();
1142 core::mem::take(&mut *pending_events)
1143 }
1144
1145 fn peer_disconnected(&self, _their_node_id: PublicKey) {}
1146
1147 fn provided_node_features(&self) -> NodeFeatures {
1148 NodeFeatures::empty()
1149 }
1150
1151 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
1152 InitFeatures::empty()
1153 }
1154
1155 fn peer_connected(
1156 &self, _their_node_id: PublicKey, _msg: &Init, _inbound: bool,
1157 ) -> Result<(), ()> {
1158 Ok(())
1159 }
1160}
1161
1162impl<
1163 ChannelSigner: EcdsaChannelSigner,
1164 C: Deref,
1165 T: Deref,
1166 F: Deref,
1167 L: Deref,
1168 P: Deref,
1169 ES: Deref,
1170 > SendOnlyMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1171where
1172 C::Target: chain::Filter,
1173 T::Target: BroadcasterInterface,
1174 F::Target: FeeEstimator,
1175 L::Target: Logger,
1176 P::Target: Persist<ChannelSigner>,
1177 ES::Target: EntropySource,
1178{
1179}
1180
1181impl<
1182 ChannelSigner: EcdsaChannelSigner,
1183 C: Deref,
1184 T: Deref,
1185 F: Deref,
1186 L: Deref,
1187 P: Deref,
1188 ES: Deref,
1189 > chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1190where
1191 C::Target: chain::Filter,
1192 T::Target: BroadcasterInterface,
1193 F::Target: FeeEstimator,
1194 L::Target: Logger,
1195 P::Target: Persist<ChannelSigner>,
1196 ES::Target: EntropySource,
1197{
1198 fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
1199 log_debug!(
1200 self.logger,
1201 "New best block {} at height {} provided via block_connected",
1202 header.block_hash(),
1203 height
1204 );
1205 self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
1206 monitor.block_connected(
1207 header,
1208 txdata,
1209 height,
1210 &*self.broadcaster,
1211 &*self.fee_estimator,
1212 &self.logger,
1213 )
1214 });
1215
1216 #[cfg(peer_storage)]
1217 for node_id in self.all_counterparty_node_ids() {
1219 self.send_peer_storage(node_id);
1220 }
1221
1222 self.event_notifier.notify();
1224 }
1225
1226 fn blocks_disconnected(&self, fork_point: BestBlock) {
1227 let monitor_states = self.monitors.read().unwrap();
1228 log_debug!(
1229 self.logger,
1230 "Block(s) removed to height {} via blocks_disconnected. New best block is {}",
1231 fork_point.height,
1232 fork_point.block_hash,
1233 );
1234 for monitor_state in monitor_states.values() {
1235 monitor_state.monitor.blocks_disconnected(
1236 fork_point,
1237 &*self.broadcaster,
1238 &*self.fee_estimator,
1239 &self.logger,
1240 );
1241 }
1242 }
1243}
1244
1245impl<
1246 ChannelSigner: EcdsaChannelSigner,
1247 C: Deref,
1248 T: Deref,
1249 F: Deref,
1250 L: Deref,
1251 P: Deref,
1252 ES: Deref,
1253 > chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1254where
1255 C::Target: chain::Filter,
1256 T::Target: BroadcasterInterface,
1257 F::Target: FeeEstimator,
1258 L::Target: Logger,
1259 P::Target: Persist<ChannelSigner>,
1260 ES::Target: EntropySource,
1261{
1262 fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
1263 log_debug!(
1264 self.logger,
1265 "{} provided transactions confirmed at height {} in block {}",
1266 txdata.len(),
1267 height,
1268 header.block_hash()
1269 );
1270 self.process_chain_data(header, None, txdata, |monitor, txdata| {
1271 monitor.transactions_confirmed(
1272 header,
1273 txdata,
1274 height,
1275 &*self.broadcaster,
1276 &*self.fee_estimator,
1277 &self.logger,
1278 )
1279 });
1280 self.event_notifier.notify();
1282 }
1283
1284 fn transaction_unconfirmed(&self, txid: &Txid) {
1285 log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
1286 let monitor_states = self.monitors.read().unwrap();
1287 for monitor_state in monitor_states.values() {
1288 monitor_state.monitor.transaction_unconfirmed(
1289 txid,
1290 &*self.broadcaster,
1291 &*self.fee_estimator,
1292 &self.logger,
1293 );
1294 }
1295 }
1296
1297 fn best_block_updated(&self, header: &Header, height: u32) {
1298 log_debug!(
1299 self.logger,
1300 "New best block {} at height {} provided via best_block_updated",
1301 header.block_hash(),
1302 height
1303 );
1304 self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
1305 debug_assert!(txdata.is_empty());
1308 monitor.best_block_updated(
1309 header,
1310 height,
1311 &*self.broadcaster,
1312 &*self.fee_estimator,
1313 &self.logger,
1314 )
1315 });
1316
1317 #[cfg(peer_storage)]
1318 for node_id in self.all_counterparty_node_ids() {
1320 self.send_peer_storage(node_id);
1321 }
1322
1323 self.event_notifier.notify();
1325 }
1326
1327 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
1328 let mut txids = Vec::new();
1329 let monitor_states = self.monitors.read().unwrap();
1330 for monitor_state in monitor_states.values() {
1331 txids.append(&mut monitor_state.monitor.get_relevant_txids());
1332 }
1333
1334 txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
1335 txids.dedup_by_key(|(txid, _, _)| *txid);
1336 txids
1337 }
1338}
1339
1340impl<
1341 ChannelSigner: EcdsaChannelSigner,
1342 C: Deref,
1343 T: Deref,
1344 F: Deref,
1345 L: Deref,
1346 P: Deref,
1347 ES: Deref,
1348 > chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1349where
1350 C::Target: chain::Filter,
1351 T::Target: BroadcasterInterface,
1352 F::Target: FeeEstimator,
1353 L::Target: Logger,
1354 P::Target: Persist<ChannelSigner>,
1355 ES::Target: EntropySource,
1356{
1357 fn watch_channel(
1358 &self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1359 ) -> Result<ChannelMonitorUpdateStatus, ()> {
1360 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1361 let mut monitors = self.monitors.write().unwrap();
1362 let entry = match monitors.entry(channel_id) {
1363 hash_map::Entry::Occupied(_) => {
1364 log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1365 return Err(());
1366 },
1367 hash_map::Entry::Vacant(e) => e,
1368 };
1369 log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
1370 let update_id = monitor.get_latest_update_id();
1371 let mut pending_monitor_updates = Vec::new();
1372 let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1373 match persist_res {
1374 ChannelMonitorUpdateStatus::InProgress => {
1375 log_info!(
1376 logger,
1377 "Persistence of new ChannelMonitor for channel {} in progress",
1378 log_funding_info!(monitor)
1379 );
1380 pending_monitor_updates.push(update_id);
1381 },
1382 ChannelMonitorUpdateStatus::Completed => {
1383 log_info!(
1384 logger,
1385 "Persistence of new ChannelMonitor for channel {} completed",
1386 log_funding_info!(monitor)
1387 );
1388 },
1389 ChannelMonitorUpdateStatus::UnrecoverableError => {
1390 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1391 log_error!(logger, "{}", err_str);
1392 panic!("{}", err_str);
1393 },
1394 }
1395 if let Some(ref chain_source) = self.chain_source {
1396 monitor.load_outputs_to_watch(chain_source, &self.logger);
1397 }
1398 entry.insert(MonitorHolder {
1399 monitor,
1400 pending_monitor_updates: Mutex::new(pending_monitor_updates),
1401 });
1402 Ok(persist_res)
1403 }
1404
1405 fn update_channel(
1406 &self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
1407 ) -> ChannelMonitorUpdateStatus {
1408 debug_assert_eq!(update.channel_id.unwrap(), channel_id);
1411 let monitors = self.monitors.read().unwrap();
1413 match monitors.get(&channel_id) {
1414 None => {
1415 let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
1416 log_error!(logger, "Failed to update channel monitor: no such monitor registered");
1417
1418 #[cfg(debug_assertions)]
1422 panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
1423 #[cfg(not(debug_assertions))]
1424 ChannelMonitorUpdateStatus::InProgress
1425 },
1426 Some(monitor_state) => {
1427 let monitor = &monitor_state.monitor;
1428 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1429 log_trace!(
1430 logger,
1431 "Updating ChannelMonitor to id {} for channel {}",
1432 update.update_id,
1433 log_funding_info!(monitor)
1434 );
1435
1436 let mut pending_monitor_updates =
1440 monitor_state.pending_monitor_updates.lock().unwrap();
1441 let update_res = monitor.update_monitor(
1442 update,
1443 &self.broadcaster,
1444 &self.fee_estimator,
1445 &self.logger,
1446 );
1447
1448 let update_id = update.update_id;
1449 let persist_res = if update_res.is_err() {
1450 log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
1456 self.persister.update_persisted_channel(
1457 monitor.persistence_key(),
1458 None,
1459 monitor,
1460 )
1461 } else {
1462 self.persister.update_persisted_channel(
1463 monitor.persistence_key(),
1464 Some(update),
1465 monitor,
1466 )
1467 };
1468 match persist_res {
1469 ChannelMonitorUpdateStatus::InProgress => {
1470 pending_monitor_updates.push(update_id);
1471 log_debug!(logger,
1472 "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
1473 update_id,
1474 log_funding_info!(monitor)
1475 );
1476 },
1477 ChannelMonitorUpdateStatus::Completed => {
1478 log_debug!(
1479 logger,
1480 "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
1481 update_id,
1482 log_funding_info!(monitor)
1483 );
1484 },
1485 ChannelMonitorUpdateStatus::UnrecoverableError => {
1486 core::mem::drop(pending_monitor_updates);
1489 core::mem::drop(monitors);
1490 let _poison = self.monitors.write().unwrap();
1491 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1492 log_error!(logger, "{}", err_str);
1493 panic!("{}", err_str);
1494 },
1495 }
1496
1497 if let Some(ref chain_source) = self.chain_source {
1499 for (funding_outpoint, funding_script) in
1500 update.internal_renegotiated_funding_data()
1501 {
1502 log_trace!(
1503 logger,
1504 "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
1505 funding_outpoint
1506 );
1507 chain_source.register_tx(&funding_outpoint.txid, &funding_script);
1508 chain_source.register_output(WatchedOutput {
1509 block_hash: None,
1510 outpoint: funding_outpoint,
1511 script_pubkey: funding_script,
1512 });
1513 }
1514 }
1515
1516 if update_res.is_err() {
1517 ChannelMonitorUpdateStatus::InProgress
1518 } else {
1519 persist_res
1520 }
1521 },
1522 }
1523 }
1524
1525 fn release_pending_monitor_events(
1526 &self,
1527 ) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1528 for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1529 let _ = self.channel_monitor_updated(channel_id, update_id);
1530 }
1531 let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
1532 for monitor_state in self.monitors.read().unwrap().values() {
1533 let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
1534 if monitor_events.len() > 0 {
1535 let monitor_funding_txo = monitor_state.monitor.get_funding_txo();
1536 let monitor_channel_id = monitor_state.monitor.channel_id();
1537 let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
1538 pending_monitor_events.push((
1539 monitor_funding_txo,
1540 monitor_channel_id,
1541 monitor_events,
1542 counterparty_node_id,
1543 ));
1544 }
1545 }
1546 pending_monitor_events
1547 }
1548}
1549
1550impl<
1551 ChannelSigner: EcdsaChannelSigner,
1552 C: Deref,
1553 T: Deref,
1554 F: Deref,
1555 L: Deref,
1556 P: Deref,
1557 ES: Deref,
1558 > events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1559where
1560 C::Target: chain::Filter,
1561 T::Target: BroadcasterInterface,
1562 F::Target: FeeEstimator,
1563 L::Target: Logger,
1564 P::Target: Persist<ChannelSigner>,
1565 ES::Target: EntropySource,
1566{
1567 fn process_pending_events<H: Deref>(&self, handler: H)
1581 where
1582 H::Target: EventHandler,
1583 {
1584 for monitor_state in self.monitors.read().unwrap().values() {
1585 match monitor_state.monitor.process_pending_events(&handler, &self.logger) {
1586 Ok(()) => {},
1587 Err(ReplayEvent()) => {
1588 self.event_notifier.notify();
1589 },
1590 }
1591 }
1592 }
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use crate::chain::channelmonitor::ANTI_REORG_DELAY;
1598 use crate::chain::{ChannelMonitorUpdateStatus, Watch};
1599 use crate::events::{ClosureReason, Event};
1600 use crate::ln::functional_test_utils::*;
1601 use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent};
1602 use crate::{check_added_monitors, check_closed_event};
1603 use crate::{expect_payment_path_successful, get_event_msg};
1604 use crate::{get_htlc_update_msgs, get_revoke_commit_msgs};
1605
1606 const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5;
1607
1608 #[test]
1609 fn test_async_ooo_offchain_updates() {
1610 let chanmon_cfgs = create_chanmon_cfgs(2);
1614 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1615 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1616 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1617 let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
1618
1619 let node_a_id = nodes[0].node.get_our_node_id();
1620 let node_b_id = nodes[1].node.get_our_node_id();
1621
1622 let (payment_preimage_1, payment_hash_1, ..) =
1624 route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
1625 let (payment_preimage_2, payment_hash_2, ..) =
1626 route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
1627
1628 chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
1629 chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
1630 chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
1631
1632 nodes[1].node.claim_funds(payment_preimage_1);
1633 check_added_monitors!(nodes[1], 1);
1634 nodes[1].node.claim_funds(payment_preimage_2);
1635 check_added_monitors!(nodes[1], 1);
1636
1637 let persistences =
1638 chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
1639 assert_eq!(persistences.len(), 1);
1640 let (_, updates) = persistences.iter().next().unwrap();
1641 assert_eq!(updates.len(), 2);
1642
1643 let mut update_iter = updates.iter();
1646 let next_update = update_iter.next().unwrap().clone();
1647 let node_b_mon = &nodes[1].chain_monitor.chain_monitor;
1648
1649 let pending_updates = node_b_mon.list_pending_monitor_updates();
1651 #[cfg(not(c_bindings))]
1652 let pending_chan_updates = pending_updates.get(&channel_id).unwrap();
1653 #[cfg(c_bindings)]
1654 let pending_chan_updates =
1655 &pending_updates.iter().find(|(chan_id, _)| *chan_id == channel_id).unwrap().1;
1656 assert!(pending_chan_updates.contains(&next_update));
1657
1658 node_b_mon.channel_monitor_updated(channel_id, next_update.clone()).unwrap();
1659
1660 let pending_updates = node_b_mon.list_pending_monitor_updates();
1662 #[cfg(not(c_bindings))]
1663 let pending_chan_updates = pending_updates.get(&channel_id).unwrap();
1664 #[cfg(c_bindings)]
1665 let pending_chan_updates =
1666 &pending_updates.iter().find(|(chan_id, _)| *chan_id == channel_id).unwrap().1;
1667 assert!(!pending_chan_updates.contains(&next_update));
1668
1669 assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
1670 assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
1671 assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
1672
1673 let next_update = update_iter.next().unwrap().clone();
1674 node_b_mon.channel_monitor_updated(channel_id, next_update).unwrap();
1675
1676 let claim_events = nodes[1].node.get_and_clear_pending_events();
1677 assert_eq!(claim_events.len(), 2);
1678 match claim_events[0] {
1679 Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
1680 assert_eq!(payment_hash_1, *payment_hash);
1681 },
1682 _ => panic!("Unexpected event"),
1683 }
1684 match claim_events[1] {
1685 Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
1686 assert_eq!(payment_hash_2, *payment_hash);
1687 },
1688 _ => panic!("Unexpected event"),
1689 }
1690
1691 let mut updates = get_htlc_update_msgs!(nodes[1], node_a_id);
1695 nodes[0].node.handle_update_fulfill_htlc(node_b_id, updates.update_fulfill_htlcs.remove(0));
1696 expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
1697 nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &updates.commitment_signed);
1698 check_added_monitors!(nodes[0], 1);
1699 let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], node_b_id);
1700
1701 nodes[1].node.handle_revoke_and_ack(node_a_id, &as_first_raa);
1702 check_added_monitors!(nodes[1], 1);
1703 let mut bs_2nd_updates = get_htlc_update_msgs!(nodes[1], node_a_id);
1704 nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_first_update);
1705 check_added_monitors!(nodes[1], 1);
1706 let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id);
1707
1708 nodes[0]
1709 .node
1710 .handle_update_fulfill_htlc(node_b_id, bs_2nd_updates.update_fulfill_htlcs.remove(0));
1711 expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
1712 nodes[0]
1713 .node
1714 .handle_commitment_signed_batch_test(node_b_id, &bs_2nd_updates.commitment_signed);
1715 check_added_monitors!(nodes[0], 1);
1716 nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_first_raa);
1717 expect_payment_path_successful!(nodes[0]);
1718 check_added_monitors!(nodes[0], 1);
1719 let (as_second_raa, as_second_update) = get_revoke_commit_msgs!(nodes[0], node_b_id);
1720
1721 nodes[1].node.handle_revoke_and_ack(node_a_id, &as_second_raa);
1722 check_added_monitors!(nodes[1], 1);
1723 nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_second_update);
1724 check_added_monitors!(nodes[1], 1);
1725 let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id);
1726
1727 nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_second_raa);
1728 expect_payment_path_successful!(nodes[0]);
1729 check_added_monitors!(nodes[0], 1);
1730 }
1731
1732 #[test]
1733 fn test_chainsync_triggers_distributed_monitor_persistence() {
1734 let chanmon_cfgs = create_chanmon_cfgs(3);
1735 let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
1736 let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
1737 let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
1738
1739 let node_a_id = nodes[0].node.get_our_node_id();
1740 let node_c_id = nodes[2].node.get_our_node_id();
1741
1742 *nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1745 *nodes[1].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1746 *nodes[2].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1747
1748 let _channel_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
1749 let channel_2 =
1750 create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 1_000_000, 0).2;
1751
1752 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1753 chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1754 chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1755
1756 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1757 connect_blocks(&nodes[1], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1758 connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1759
1760 assert_eq!(
1763 2 * 2,
1764 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1765 );
1766 assert_eq!(
1767 2,
1768 chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1769 );
1770 assert_eq!(
1771 2,
1772 chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1773 );
1774
1775 let message = "Channel force-closed".to_owned();
1778 nodes[0]
1779 .node
1780 .force_close_broadcasting_latest_txn(&channel_2, &node_c_id, message.clone())
1781 .unwrap();
1782 let closure_reason =
1783 ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
1784 check_closed_event!(&nodes[0], 1, closure_reason, false, [node_c_id], 1000000);
1785 check_closed_broadcast(&nodes[0], 1, true);
1786 let close_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
1787 assert_eq!(close_tx.len(), 1);
1788
1789 mine_transaction(&nodes[2], &close_tx[0]);
1790 check_closed_broadcast(&nodes[2], 1, true);
1791 check_added_monitors(&nodes[2], 1);
1792 let closure_reason = ClosureReason::CommitmentTxConfirmed;
1793 check_closed_event!(&nodes[2], 1, closure_reason, false, [node_a_id], 1000000);
1794
1795 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1796 chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1797
1798 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1803 connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1804
1805 assert_eq!(
1808 (CHAINSYNC_MONITOR_PARTITION_FACTOR + 1) as usize,
1809 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1810 );
1811 assert_eq!(
1813 1,
1814 chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1815 );
1816
1817 mine_transaction(&nodes[0], &close_tx[0]);
1819 connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
1820 check_added_monitors(&nodes[0], 1);
1821 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1822
1823 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1826 assert_eq!(
1827 2,
1828 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
1829 );
1830 }
1831
1832 #[test]
1833 #[cfg(feature = "std")]
1834 fn update_during_chainsync_poisons_channel() {
1835 let chanmon_cfgs = create_chanmon_cfgs(2);
1836 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1837 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1838 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1839 create_announced_chan_between_nodes(&nodes, 0, 1);
1840 *nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1841
1842 chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);
1843
1844 assert!(std::panic::catch_unwind(|| {
1845 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1849 })
1850 .is_err());
1851 assert!(std::panic::catch_unwind(|| {
1852 core::mem::drop(nodes);
1854 })
1855 .is_err());
1856 }
1857}