1use bitcoin::{BlockHash, Txid};
14use core::cmp;
15use core::ops::Deref;
16use core::str::FromStr;
17
18use crate::prelude::*;
19use crate::{io, log_error};
20
21use crate::chain;
22use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
23use crate::chain::chainmonitor::Persist;
24use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
25use crate::chain::transaction::OutPoint;
26use crate::ln::channelmanager::AChannelManager;
27use crate::routing::gossip::NetworkGraph;
28use crate::routing::scoring::WriteableScore;
29use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
30use crate::util::logger::Logger;
31use crate::util::ser::{Readable, ReadableArgs, Writeable};
32
33pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
35 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
36
37pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
39
40pub const CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
44pub const CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
48pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
52
53pub const CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitors";
55pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
57pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
59
60pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
62pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
64
65pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
67pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
69pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
71
72pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
74pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
76pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
78
79pub const OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
83pub const OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
87pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper";
92
93pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
98
99pub trait KVStore {
121 fn read(
129 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
130 ) -> Result<Vec<u8>, io::Error>;
131 fn write(
136 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
137 ) -> Result<(), io::Error>;
138 fn remove(
154 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
155 ) -> Result<(), io::Error>;
156 fn list(
162 &self, primary_namespace: &str, secondary_namespace: &str,
163 ) -> Result<Vec<String>, io::Error>;
164}
165
166pub trait MigratableKVStore: KVStore {
169 fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, io::Error>;
177}
178
179pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
188 source_store: &mut S, target_store: &mut T,
189) -> Result<(), io::Error> {
190 let keys_to_migrate = source_store.list_all_keys()?;
191
192 for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
193 let data = source_store.read(primary_namespace, secondary_namespace, key)?;
194 target_store.write(primary_namespace, secondary_namespace, key, &data)?;
195 }
196
197 Ok(())
198}
199
200pub trait Persister<'a, CM: Deref, L: Deref, S: Deref>
204where
205 CM::Target: 'static + AChannelManager,
206 L::Target: 'static + Logger,
207 S::Target: WriteableScore<'a>,
208{
209 fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>;
213
214 fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error>;
216
217 fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
219}
220
221impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A
222where
223 CM::Target: 'static + AChannelManager,
224 L::Target: 'static + Logger,
225 S::Target: WriteableScore<'a>,
226{
227 fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
228 self.write(
229 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
230 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
231 CHANNEL_MANAGER_PERSISTENCE_KEY,
232 &channel_manager.get_cm().encode(),
233 )
234 }
235
236 fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
237 self.write(
238 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
239 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
240 NETWORK_GRAPH_PERSISTENCE_KEY,
241 &network_graph.encode(),
242 )
243 }
244
245 fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
246 self.write(
247 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
248 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
249 SCORER_PERSISTENCE_KEY,
250 &scorer.encode(),
251 )
252 }
253}
254
255impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSigner> for K {
256 fn persist_new_channel(
262 &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
263 ) -> chain::ChannelMonitorUpdateStatus {
264 let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
265 match self.write(
266 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
267 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
268 &key,
269 &monitor.encode(),
270 ) {
271 Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
272 Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
273 }
274 }
275
276 fn update_persisted_channel(
277 &self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>,
278 monitor: &ChannelMonitor<ChannelSigner>,
279 ) -> chain::ChannelMonitorUpdateStatus {
280 let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
281 match self.write(
282 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
283 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
284 &key,
285 &monitor.encode(),
286 ) {
287 Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
288 Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
289 }
290 }
291
292 fn archive_persisted_channel(&self, funding_txo: OutPoint) {
293 let monitor_name = MonitorName::from(funding_txo);
294 let monitor = match self.read(
295 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
296 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
297 monitor_name.as_str(),
298 ) {
299 Ok(monitor) => monitor,
300 Err(_) => return,
301 };
302 match self.write(
303 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
304 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
305 monitor_name.as_str(),
306 &monitor,
307 ) {
308 Ok(()) => {},
309 Err(_e) => return,
310 };
311 let _ = self.remove(
312 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
313 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
314 monitor_name.as_str(),
315 true,
316 );
317 }
318}
319
320pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
322 kv_store: K, entropy_source: ES, signer_provider: SP,
323) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
324where
325 K::Target: KVStore,
326 ES::Target: EntropySource + Sized,
327 SP::Target: SignerProvider + Sized,
328{
329 let mut res = Vec::new();
330
331 for stored_key in kv_store.list(
332 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
333 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
334 )? {
335 if stored_key.len() < 66 {
336 return Err(io::Error::new(
337 io::ErrorKind::InvalidData,
338 "Stored key has invalid length",
339 ));
340 }
341
342 let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| {
343 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
344 })?;
345
346 let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| {
347 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
348 })?;
349
350 match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
351 &mut io::Cursor::new(kv_store.read(
352 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
353 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
354 &stored_key,
355 )?),
356 (&*entropy_source, &*signer_provider),
357 ) {
358 Ok((block_hash, channel_monitor)) => {
359 if channel_monitor.get_funding_txo().0.txid != txid
360 || channel_monitor.get_funding_txo().0.index != index
361 {
362 return Err(io::Error::new(
363 io::ErrorKind::InvalidData,
364 "ChannelMonitor was stored under the wrong key",
365 ));
366 }
367 res.push((block_hash, channel_monitor));
368 },
369 Err(_) => {
370 return Err(io::Error::new(
371 io::ErrorKind::InvalidData,
372 "Failed to read ChannelMonitor",
373 ))
374 },
375 }
376 }
377 Ok(res)
378}
379
380pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
464where
465 K::Target: KVStore,
466 L::Target: Logger,
467 ES::Target: EntropySource + Sized,
468 SP::Target: SignerProvider + Sized,
469 BI::Target: BroadcasterInterface,
470 FE::Target: FeeEstimator,
471{
472 kv_store: K,
473 logger: L,
474 maximum_pending_updates: u64,
475 entropy_source: ES,
476 signer_provider: SP,
477 broadcaster: BI,
478 fee_estimator: FE,
479}
480
481#[allow(dead_code)]
482impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
483 MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
484where
485 K::Target: KVStore,
486 L::Target: Logger,
487 ES::Target: EntropySource + Sized,
488 SP::Target: SignerProvider + Sized,
489 BI::Target: BroadcasterInterface,
490 FE::Target: FeeEstimator,
491{
492 pub fn new(
509 kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
510 signer_provider: SP, broadcaster: BI, fee_estimator: FE,
511 ) -> Self {
512 MonitorUpdatingPersister {
513 kv_store,
514 logger,
515 maximum_pending_updates,
516 entropy_source,
517 signer_provider,
518 broadcaster,
519 fee_estimator,
520 }
521 }
522
523 pub fn read_all_channel_monitors_with_updates(
529 &self,
530 ) -> Result<
531 Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
532 io::Error,
533 > {
534 let monitor_list = self.kv_store.list(
535 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
536 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
537 )?;
538 let mut res = Vec::with_capacity(monitor_list.len());
539 for monitor_key in monitor_list {
540 res.push(self.read_channel_monitor_with_updates(monitor_key)?)
541 }
542 Ok(res)
543 }
544
545 pub fn read_channel_monitor_with_updates(
563 &self, monitor_key: String,
564 ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
565 {
566 let monitor_name = MonitorName::new(monitor_key)?;
567 let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
568 let mut current_update_id = monitor.get_latest_update_id();
569 loop {
570 current_update_id = match current_update_id.checked_add(1) {
571 Some(next_update_id) => next_update_id,
572 None => break,
573 };
574 let update_name = UpdateName::from(current_update_id);
575 let update = match self.read_monitor_update(&monitor_name, &update_name) {
576 Ok(update) => update,
577 Err(err) if err.kind() == io::ErrorKind::NotFound => {
578 break;
580 },
581 Err(err) => return Err(err),
582 };
583
584 monitor
585 .update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
586 .map_err(|e| {
587 log_error!(
588 self.logger,
589 "Monitor update failed. monitor: {} update: {} reason: {:?}",
590 monitor_name.as_str(),
591 update_name.as_str(),
592 e
593 );
594 io::Error::new(io::ErrorKind::Other, "Monitor update failed")
595 })?;
596 }
597 Ok((block_hash, monitor))
598 }
599
600 fn read_monitor(
602 &self, monitor_name: &MonitorName,
603 ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
604 {
605 let outpoint: OutPoint = monitor_name.try_into()?;
606 let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
607 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
608 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
609 monitor_name.as_str(),
610 )?);
611 if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
613 monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
614 }
615 match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
616 &mut monitor_cursor,
617 (&*self.entropy_source, &*self.signer_provider),
618 ) {
619 Ok((blockhash, channel_monitor)) => {
620 if channel_monitor.get_funding_txo().0.txid != outpoint.txid
621 || channel_monitor.get_funding_txo().0.index != outpoint.index
622 {
623 log_error!(
624 self.logger,
625 "ChannelMonitor {} was stored under the wrong key!",
626 monitor_name.as_str()
627 );
628 Err(io::Error::new(
629 io::ErrorKind::InvalidData,
630 "ChannelMonitor was stored under the wrong key",
631 ))
632 } else {
633 Ok((blockhash, channel_monitor))
634 }
635 },
636 Err(e) => {
637 log_error!(
638 self.logger,
639 "Failed to read ChannelMonitor {}, reason: {}",
640 monitor_name.as_str(),
641 e,
642 );
643 Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
644 },
645 }
646 }
647
648 fn read_monitor_update(
650 &self, monitor_name: &MonitorName, update_name: &UpdateName,
651 ) -> Result<ChannelMonitorUpdate, io::Error> {
652 let update_bytes = self.kv_store.read(
653 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
654 monitor_name.as_str(),
655 update_name.as_str(),
656 )?;
657 ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| {
658 log_error!(
659 self.logger,
660 "Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}",
661 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
662 monitor_name.as_str(),
663 update_name.as_str(),
664 e,
665 );
666 io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitorUpdate")
667 })
668 }
669
670 pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
677 let monitor_keys = self.kv_store.list(
678 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
679 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
680 )?;
681 for monitor_key in monitor_keys {
682 let monitor_name = MonitorName::new(monitor_key)?;
683 let (_, current_monitor) = self.read_monitor(&monitor_name)?;
684 let updates = self.kv_store.list(
685 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
686 monitor_name.as_str(),
687 )?;
688 for update in updates {
689 let update_name = UpdateName::new(update)?;
690 if update_name.0 <= current_monitor.get_latest_update_id() {
692 self.kv_store.remove(
693 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
694 monitor_name.as_str(),
695 update_name.as_str(),
696 lazy,
697 )?;
698 }
699 }
700 }
701 Ok(())
702 }
703}
704
705impl<
706 ChannelSigner: EcdsaChannelSigner,
707 K: Deref,
708 L: Deref,
709 ES: Deref,
710 SP: Deref,
711 BI: Deref,
712 FE: Deref,
713 > Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
714where
715 K::Target: KVStore,
716 L::Target: Logger,
717 ES::Target: EntropySource + Sized,
718 SP::Target: SignerProvider + Sized,
719 BI::Target: BroadcasterInterface,
720 FE::Target: FeeEstimator,
721{
722 fn persist_new_channel(
725 &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
726 ) -> chain::ChannelMonitorUpdateStatus {
727 let monitor_name = MonitorName::from(funding_txo);
729 let mut monitor_bytes = Vec::with_capacity(
731 MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
732 );
733 monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
734 monitor.write(&mut monitor_bytes).unwrap();
735 match self.kv_store.write(
736 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
737 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
738 monitor_name.as_str(),
739 &monitor_bytes,
740 ) {
741 Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
742 Err(e) => {
743 log_error!(
744 self.logger,
745 "Failed to write ChannelMonitor {}/{}/{} reason: {}",
746 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
747 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
748 monitor_name.as_str(),
749 e
750 );
751 chain::ChannelMonitorUpdateStatus::UnrecoverableError
752 },
753 }
754 }
755
756 fn update_persisted_channel(
766 &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
767 monitor: &ChannelMonitor<ChannelSigner>,
768 ) -> chain::ChannelMonitorUpdateStatus {
769 const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
770 if let Some(update) = update {
771 let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
772 && update.update_id % self.maximum_pending_updates != 0;
773 if persist_update {
774 let monitor_name = MonitorName::from(funding_txo);
775 let update_name = UpdateName::from(update.update_id);
776 match self.kv_store.write(
777 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
778 monitor_name.as_str(),
779 update_name.as_str(),
780 &update.encode(),
781 ) {
782 Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
783 Err(e) => {
784 log_error!(
785 self.logger,
786 "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}",
787 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
788 monitor_name.as_str(),
789 update_name.as_str(),
790 e
791 );
792 chain::ChannelMonitorUpdateStatus::UnrecoverableError
793 },
794 }
795 } else {
796 let monitor_name = MonitorName::from(funding_txo);
797 let maybe_old_monitor = match monitor.get_latest_update_id() {
800 LEGACY_CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
801 _ => None,
802 };
803
804 let monitor_update_status = self.persist_new_channel(funding_txo, monitor);
806
807 if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
808 let channel_closed_legacy =
809 monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID;
810 let cleanup_range = if channel_closed_legacy {
811 maybe_old_monitor.map(|(_, ref old_monitor)| {
813 let start = old_monitor.get_latest_update_id();
814 let end = cmp::min(
816 start.saturating_add(self.maximum_pending_updates),
817 LEGACY_CLOSED_CHANNEL_UPDATE_ID - 1,
818 );
819 (start, end)
820 })
821 } else {
822 let end = monitor.get_latest_update_id();
823 let start = end.saturating_sub(self.maximum_pending_updates);
824 Some((start, end))
825 };
826
827 if let Some((start, end)) = cleanup_range {
828 self.cleanup_in_range(monitor_name, start, end);
829 }
830 }
831
832 monitor_update_status
833 }
834 } else {
835 self.persist_new_channel(funding_txo, monitor)
837 }
838 }
839
840 fn archive_persisted_channel(&self, funding_txo: OutPoint) {
841 let monitor_name = MonitorName::from(funding_txo);
842 let monitor_key = monitor_name.as_str().to_string();
843 let monitor = match self.read_channel_monitor_with_updates(monitor_key) {
844 Ok((_block_hash, monitor)) => monitor,
845 Err(_) => return,
846 };
847 match self.kv_store.write(
848 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
849 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
850 monitor_name.as_str(),
851 &monitor.encode(),
852 ) {
853 Ok(()) => {},
854 Err(_e) => return,
855 };
856 let _ = self.kv_store.remove(
857 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
858 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
859 monitor_name.as_str(),
860 true,
861 );
862 }
863}
864
865impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
866 MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
867where
868 ES::Target: EntropySource + Sized,
869 K::Target: KVStore,
870 L::Target: Logger,
871 SP::Target: SignerProvider + Sized,
872 BI::Target: BroadcasterInterface,
873 FE::Target: FeeEstimator,
874{
875 fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
877 for update_id in start..=end {
878 let update_name = UpdateName::from(update_id);
879 if let Err(e) = self.kv_store.remove(
880 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
881 monitor_name.as_str(),
882 update_name.as_str(),
883 true,
884 ) {
885 log_error!(
886 self.logger,
887 "Failed to clean up channel monitor updates for monitor {}, reason: {}",
888 monitor_name.as_str(),
889 e
890 );
891 };
892 }
893 }
894}
895
896#[derive(Debug)]
930pub struct MonitorName(String);
931
932impl MonitorName {
933 pub fn new(name: String) -> Result<Self, io::Error> {
938 MonitorName::do_try_into_outpoint(&name)?;
939 Ok(Self(name))
940 }
941
942 pub fn as_str(&self) -> &str {
946 &self.0
947 }
948
949 fn do_try_into_outpoint(name: &str) -> Result<OutPoint, io::Error> {
951 let mut parts = name.splitn(2, '_');
952 let txid = if let Some(part) = parts.next() {
953 Txid::from_str(part).map_err(|_| {
954 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
955 })?
956 } else {
957 return Err(io::Error::new(
958 io::ErrorKind::InvalidData,
959 "Stored monitor key is not a splittable string",
960 ));
961 };
962 let index = if let Some(part) = parts.next() {
963 part.parse().map_err(|_| {
964 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
965 })?
966 } else {
967 return Err(io::Error::new(
968 io::ErrorKind::InvalidData,
969 "No tx index value found after underscore in stored key",
970 ));
971 };
972 Ok(OutPoint { txid, index })
973 }
974}
975
976impl TryFrom<&MonitorName> for OutPoint {
977 type Error = io::Error;
978
979 fn try_from(value: &MonitorName) -> Result<Self, io::Error> {
984 MonitorName::do_try_into_outpoint(&value.0)
985 }
986}
987
988impl From<OutPoint> for MonitorName {
989 fn from(value: OutPoint) -> Self {
994 MonitorName(format!("{}_{}", value.txid.to_string(), value.index))
995 }
996}
997
998#[derive(Debug)]
1032pub struct UpdateName(pub u64, String);
1033
1034impl UpdateName {
1035 pub fn new(name: String) -> Result<Self, io::Error> {
1038 match name.parse::<u64>() {
1039 Ok(u) => Ok(u.into()),
1040 Err(_) => {
1041 Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
1042 },
1043 }
1044 }
1045
1046 pub fn as_str(&self) -> &str {
1060 &self.1
1061 }
1062}
1063
1064impl From<u64> for UpdateName {
1065 fn from(value: u64) -> Self {
1081 Self(value, value.to_string())
1082 }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087 use super::*;
1088 use crate::chain::ChannelMonitorUpdateStatus;
1089 use crate::events::{ClosureReason, MessageSendEventsProvider};
1090 use crate::ln::functional_test_utils::*;
1091 use crate::sync::Arc;
1092 use crate::util::test_channel_signer::TestChannelSigner;
1093 use crate::util::test_utils::{self, TestLogger, TestStore};
1094 use crate::{check_added_monitors, check_closed_broadcast};
1095
1096 const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
1097
1098 #[test]
1099 fn converts_u64_to_update_name() {
1100 assert_eq!(UpdateName::from(0).as_str(), "0");
1101 assert_eq!(UpdateName::from(21).as_str(), "21");
1102 assert_eq!(UpdateName::from(u64::MAX).as_str(), "18446744073709551615");
1103 }
1104
1105 #[test]
1106 fn bad_update_name_fails() {
1107 assert!(UpdateName::new("deadbeef".to_string()).is_err());
1108 assert!(UpdateName::new("-1".to_string()).is_err());
1109 }
1110
1111 #[test]
1112 fn monitor_from_outpoint_works() {
1113 let monitor_name1 = MonitorName::from(OutPoint {
1114 txid: Txid::from_str(
1115 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
1116 )
1117 .unwrap(),
1118 index: 1,
1119 });
1120 assert_eq!(
1121 monitor_name1.as_str(),
1122 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
1123 );
1124
1125 let monitor_name2 = MonitorName::from(OutPoint {
1126 txid: Txid::from_str(
1127 "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
1128 )
1129 .unwrap(),
1130 index: u16::MAX,
1131 });
1132 assert_eq!(
1133 monitor_name2.as_str(),
1134 "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
1135 );
1136 }
1137
1138 #[test]
1139 fn bad_monitor_string_fails() {
1140 assert!(MonitorName::new(
1141 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()
1142 )
1143 .is_err());
1144 assert!(MonitorName::new(
1145 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()
1146 )
1147 .is_err());
1148 assert!(MonitorName::new(
1149 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()
1150 )
1151 .is_err());
1152 }
1153
1154 #[test]
1156 fn persister_with_real_monitors() {
1157 let persister_0_max_pending_updates = 7;
1159 let persister_1_max_pending_updates = 3;
1161 let chanmon_cfgs = create_chanmon_cfgs(4);
1162 let persister_0 = MonitorUpdatingPersister {
1163 kv_store: &TestStore::new(false),
1164 logger: &TestLogger::new(),
1165 maximum_pending_updates: persister_0_max_pending_updates,
1166 entropy_source: &chanmon_cfgs[0].keys_manager,
1167 signer_provider: &chanmon_cfgs[0].keys_manager,
1168 broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1169 fee_estimator: &chanmon_cfgs[0].fee_estimator,
1170 };
1171 let persister_1 = MonitorUpdatingPersister {
1172 kv_store: &TestStore::new(false),
1173 logger: &TestLogger::new(),
1174 maximum_pending_updates: persister_1_max_pending_updates,
1175 entropy_source: &chanmon_cfgs[1].keys_manager,
1176 signer_provider: &chanmon_cfgs[1].keys_manager,
1177 broadcaster: &chanmon_cfgs[1].tx_broadcaster,
1178 fee_estimator: &chanmon_cfgs[1].fee_estimator,
1179 };
1180 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1181 let chain_mon_0 = test_utils::TestChainMonitor::new(
1182 Some(&chanmon_cfgs[0].chain_source),
1183 &chanmon_cfgs[0].tx_broadcaster,
1184 &chanmon_cfgs[0].logger,
1185 &chanmon_cfgs[0].fee_estimator,
1186 &persister_0,
1187 &chanmon_cfgs[0].keys_manager,
1188 );
1189 let chain_mon_1 = test_utils::TestChainMonitor::new(
1190 Some(&chanmon_cfgs[1].chain_source),
1191 &chanmon_cfgs[1].tx_broadcaster,
1192 &chanmon_cfgs[1].logger,
1193 &chanmon_cfgs[1].fee_estimator,
1194 &persister_1,
1195 &chanmon_cfgs[1].keys_manager,
1196 );
1197 node_cfgs[0].chain_monitor = chain_mon_0;
1198 node_cfgs[1].chain_monitor = chain_mon_1;
1199 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1200 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1201
1202 let mut persisted_chan_data_0 =
1205 persister_0.read_all_channel_monitors_with_updates().unwrap();
1206 assert_eq!(persisted_chan_data_0.len(), 0);
1207 let mut persisted_chan_data_1 =
1208 persister_1.read_all_channel_monitors_with_updates().unwrap();
1209 assert_eq!(persisted_chan_data_1.len(), 0);
1210
1211 macro_rules! check_persisted_data {
1213 ($expected_update_id: expr) => {
1214 persisted_chan_data_0 =
1215 persister_0.read_all_channel_monitors_with_updates().unwrap();
1216 assert_eq!(persisted_chan_data_0.len(), 1);
1218 for (_, mon) in persisted_chan_data_0.iter() {
1219 assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1221
1222 let monitor_name = MonitorName::from(mon.get_funding_txo().0);
1223 assert_eq!(
1224 persister_0
1225 .kv_store
1226 .list(
1227 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1228 monitor_name.as_str()
1229 )
1230 .unwrap()
1231 .len() as u64,
1232 mon.get_latest_update_id() % persister_0_max_pending_updates,
1233 "Wrong number of updates stored in persister 0",
1234 );
1235 }
1236 persisted_chan_data_1 =
1237 persister_1.read_all_channel_monitors_with_updates().unwrap();
1238 assert_eq!(persisted_chan_data_1.len(), 1);
1239 for (_, mon) in persisted_chan_data_1.iter() {
1240 assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1241 let monitor_name = MonitorName::from(mon.get_funding_txo().0);
1242 assert_eq!(
1243 persister_1
1244 .kv_store
1245 .list(
1246 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1247 monitor_name.as_str()
1248 )
1249 .unwrap()
1250 .len() as u64,
1251 mon.get_latest_update_id() % persister_1_max_pending_updates,
1252 "Wrong number of updates stored in persister 1",
1253 );
1254 }
1255 };
1256 }
1257
1258 let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1260 check_persisted_data!(0);
1261
1262 send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1264 check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
1265 send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1266 check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
1267
1268 let mut sender = 0;
1271 for i in 3..=persister_0_max_pending_updates * 2 {
1272 let receiver;
1273 if sender == 0 {
1274 sender = 1;
1275 receiver = 0;
1276 } else {
1277 sender = 0;
1278 receiver = 1;
1279 }
1280 send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
1281 check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
1282 }
1283
1284 let node_id_1 = nodes[1].node.get_our_node_id();
1288 let chan_id = nodes[0].node.list_channels()[0].channel_id;
1289 let err_msg = "Channel force-closed".to_string();
1290 nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, err_msg).unwrap();
1291
1292 let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) };
1293 check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
1294 check_closed_broadcast!(nodes[0], true);
1295 check_added_monitors!(nodes[0], 1);
1296
1297 let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
1298 assert_eq!(node_txn.len(), 1);
1299 let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
1300 let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
1301 connect_block(&nodes[1], &dummy_block);
1302
1303 check_closed_broadcast!(nodes[1], true);
1304 let reason = ClosureReason::CommitmentTxConfirmed;
1305 let node_id_0 = nodes[0].node.get_our_node_id();
1306 check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
1307 check_added_monitors!(nodes[1], 1);
1308
1309 check_persisted_data!(
1311 persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1
1312 );
1313 }
1314
1315 #[test]
1318 fn unrecoverable_error_on_write_failure() {
1319 let chanmon_cfgs = create_chanmon_cfgs(2);
1322 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1323 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1324 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1325 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
1326 let err_msg = "Channel force-closed".to_string();
1327 let node_id_0 = nodes[0].node.get_our_node_id();
1328 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &node_id_0, err_msg).unwrap();
1329 let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) };
1330 check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
1331 {
1332 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
1333 let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
1334 let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
1335 let txid =
1336 Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be")
1337 .unwrap();
1338 let test_txo = OutPoint { txid, index: 0 };
1339
1340 let ro_persister = MonitorUpdatingPersister {
1341 kv_store: &TestStore::new(true),
1342 logger: &TestLogger::new(),
1343 maximum_pending_updates: 11,
1344 entropy_source: node_cfgs[0].keys_manager,
1345 signer_provider: node_cfgs[0].keys_manager,
1346 broadcaster: node_cfgs[0].tx_broadcaster,
1347 fee_estimator: node_cfgs[0].fee_estimator,
1348 };
1349 match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
1350 ChannelMonitorUpdateStatus::UnrecoverableError => {
1351 },
1353 ChannelMonitorUpdateStatus::Completed => {
1354 panic!("Completed persisting new channel when shouldn't have")
1355 },
1356 ChannelMonitorUpdateStatus::InProgress => {
1357 panic!("Returned InProgress when shouldn't have")
1358 },
1359 }
1360 match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) {
1361 ChannelMonitorUpdateStatus::UnrecoverableError => {
1362 },
1364 ChannelMonitorUpdateStatus::Completed => {
1365 panic!("Completed persisting new channel when shouldn't have")
1366 },
1367 ChannelMonitorUpdateStatus::InProgress => {
1368 panic!("Returned InProgress when shouldn't have")
1369 },
1370 }
1371 added_monitors.clear();
1372 }
1373 nodes[1].node.get_and_clear_pending_msg_events();
1374 }
1375
1376 #[test]
1378 fn clean_stale_updates_works() {
1379 let test_max_pending_updates = 7;
1380 let chanmon_cfgs = create_chanmon_cfgs(3);
1381 let persister_0 = MonitorUpdatingPersister {
1382 kv_store: &TestStore::new(false),
1383 logger: &TestLogger::new(),
1384 maximum_pending_updates: test_max_pending_updates,
1385 entropy_source: &chanmon_cfgs[0].keys_manager,
1386 signer_provider: &chanmon_cfgs[0].keys_manager,
1387 broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1388 fee_estimator: &chanmon_cfgs[0].fee_estimator,
1389 };
1390 let persister_1 = MonitorUpdatingPersister {
1391 kv_store: &TestStore::new(false),
1392 logger: &TestLogger::new(),
1393 maximum_pending_updates: test_max_pending_updates,
1394 entropy_source: &chanmon_cfgs[1].keys_manager,
1395 signer_provider: &chanmon_cfgs[1].keys_manager,
1396 broadcaster: &chanmon_cfgs[1].tx_broadcaster,
1397 fee_estimator: &chanmon_cfgs[1].fee_estimator,
1398 };
1399 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1400 let chain_mon_0 = test_utils::TestChainMonitor::new(
1401 Some(&chanmon_cfgs[0].chain_source),
1402 &chanmon_cfgs[0].tx_broadcaster,
1403 &chanmon_cfgs[0].logger,
1404 &chanmon_cfgs[0].fee_estimator,
1405 &persister_0,
1406 &chanmon_cfgs[0].keys_manager,
1407 );
1408 let chain_mon_1 = test_utils::TestChainMonitor::new(
1409 Some(&chanmon_cfgs[1].chain_source),
1410 &chanmon_cfgs[1].tx_broadcaster,
1411 &chanmon_cfgs[1].logger,
1412 &chanmon_cfgs[1].fee_estimator,
1413 &persister_1,
1414 &chanmon_cfgs[1].keys_manager,
1415 );
1416 node_cfgs[0].chain_monitor = chain_mon_0;
1417 node_cfgs[1].chain_monitor = chain_mon_1;
1418 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1419 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1420
1421 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
1424 assert_eq!(persisted_chan_data.len(), 0);
1425
1426 let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1428
1429 send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1431 send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1432
1433 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
1435 let (_, monitor) = &persisted_chan_data[0];
1436 let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
1437 persister_0
1438 .kv_store
1439 .write(
1440 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1441 monitor_name.as_str(),
1442 UpdateName::from(1).as_str(),
1443 &[0u8; 1],
1444 )
1445 .unwrap();
1446
1447 persister_0.cleanup_stale_updates(false).unwrap();
1449
1450 assert!(persister_0
1452 .kv_store
1453 .read(
1454 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1455 monitor_name.as_str(),
1456 UpdateName::from(1).as_str()
1457 )
1458 .is_err());
1459 }
1460
1461 fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
1462 where
1463 P::Target: Persist<ChannelSigner>,
1464 {
1465 true
1466 }
1467
1468 #[test]
1469 fn kvstore_trait_object_usage() {
1470 let store: Arc<dyn KVStore + Send + Sync> = Arc::new(TestStore::new(false));
1471 assert!(persist_fn::<_, TestChannelSigner>(store.clone()));
1472 }
1473}