1use bitcoin::hashes::hmac::{Hmac, HmacEngine};
14use bitcoin::hashes::sha256::Hash as Sha256;
15use bitcoin::hashes::{Hash, HashEngine};
16use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
17
18use super::async_payments::{AsyncPaymentsMessage, AsyncPaymentsMessageHandler};
19use super::dns_resolution::{DNSResolverMessage, DNSResolverMessageHandler};
20use super::offers::{OffersMessage, OffersMessageHandler};
21use super::packet::OnionMessageContents;
22use super::packet::ParsedOnionMessageContents;
23use super::packet::{
24 ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, BIG_PACKET_HOP_DATA_LEN,
25 SMALL_PACKET_HOP_DATA_LEN,
26};
27use crate::blinded_path::message::{
28 AsyncPaymentsContext, BlindedMessagePath, DNSResolverContext, ForwardTlvs, MessageContext,
29 MessageForwardNode, NextMessageHop, OffersContext, ReceiveTlvs,
30};
31use crate::blinded_path::utils;
32use crate::blinded_path::{IntroductionNode, NodeIdLookUp};
33use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
34use crate::ln::msgs::{
35 self, BaseMessageHandler, MessageSendEvent, OnionMessage, OnionMessageHandler, SocketAddress,
36};
37use crate::ln::onion_utils;
38use crate::routing::gossip::{NetworkGraph, NodeId, ReadOnlyNetworkGraph};
39use crate::sign::{EntropySource, NodeSigner, ReceiveAuthKey, Recipient};
40use crate::types::features::{InitFeatures, NodeFeatures};
41use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
42use crate::util::logger::{Logger, WithContext};
43use crate::util::ser::Writeable;
44use crate::util::wakers::{Future, Notifier};
45
46use crate::io;
47use crate::prelude::*;
48use crate::sync::Mutex;
49use core::fmt;
50use core::ops::Deref;
51use core::sync::atomic::{AtomicBool, Ordering};
52
53#[cfg(not(c_bindings))]
54use {
55 crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager},
56 crate::ln::peer_handler::IgnoringMessageHandler,
57 crate::sign::KeysManager,
58 crate::sync::Arc,
59};
60
61pub(super) const MAX_TIMER_TICKS: usize = 2;
62
63pub trait AOnionMessenger {
68 type EntropySource: EntropySource + ?Sized;
70 type ES: Deref<Target = Self::EntropySource>;
72 type NodeSigner: NodeSigner + ?Sized;
74 type NS: Deref<Target = Self::NodeSigner>;
76 type Logger: Logger + ?Sized;
78 type L: Deref<Target = Self::Logger>;
80 type NodeIdLookUp: NodeIdLookUp + ?Sized;
82 type NL: Deref<Target = Self::NodeIdLookUp>;
84 type MessageRouter: MessageRouter + ?Sized;
86 type MR: Deref<Target = Self::MessageRouter>;
88 type OffersMessageHandler: OffersMessageHandler + ?Sized;
90 type OMH: Deref<Target = Self::OffersMessageHandler>;
92 type AsyncPaymentsMessageHandler: AsyncPaymentsMessageHandler + ?Sized;
94 type APH: Deref<Target = Self::AsyncPaymentsMessageHandler>;
96 type DNSResolverMessageHandler: DNSResolverMessageHandler + ?Sized;
98 type DRH: Deref<Target = Self::DNSResolverMessageHandler>;
100 type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
102 type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
104 fn get_om(
106 &self,
107 ) -> &OnionMessenger<
108 Self::ES,
109 Self::NS,
110 Self::L,
111 Self::NL,
112 Self::MR,
113 Self::OMH,
114 Self::APH,
115 Self::DRH,
116 Self::CMH,
117 >;
118}
119
120impl<
121 ES: Deref,
122 NS: Deref,
123 L: Deref,
124 NL: Deref,
125 MR: Deref,
126 OMH: Deref,
127 APH: Deref,
128 DRH: Deref,
129 CMH: Deref,
130 > AOnionMessenger for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
131where
132 ES::Target: EntropySource,
133 NS::Target: NodeSigner,
134 L::Target: Logger,
135 NL::Target: NodeIdLookUp,
136 MR::Target: MessageRouter,
137 OMH::Target: OffersMessageHandler,
138 APH::Target: AsyncPaymentsMessageHandler,
139 DRH::Target: DNSResolverMessageHandler,
140 CMH::Target: CustomOnionMessageHandler,
141{
142 type EntropySource = ES::Target;
143 type ES = ES;
144 type NodeSigner = NS::Target;
145 type NS = NS;
146 type Logger = L::Target;
147 type L = L;
148 type NodeIdLookUp = NL::Target;
149 type NL = NL;
150 type MessageRouter = MR::Target;
151 type MR = MR;
152 type OffersMessageHandler = OMH::Target;
153 type OMH = OMH;
154 type AsyncPaymentsMessageHandler = APH::Target;
155 type APH = APH;
156 type DNSResolverMessageHandler = DRH::Target;
157 type DRH = DRH;
158 type CustomOnionMessageHandler = CMH::Target;
159 type CMH = CMH;
160 fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH> {
161 self
162 }
163}
164
165pub struct OnionMessenger<
287 ES: Deref,
288 NS: Deref,
289 L: Deref,
290 NL: Deref,
291 MR: Deref,
292 OMH: Deref,
293 APH: Deref,
294 DRH: Deref,
295 CMH: Deref,
296> where
297 ES::Target: EntropySource,
298 NS::Target: NodeSigner,
299 L::Target: Logger,
300 NL::Target: NodeIdLookUp,
301 MR::Target: MessageRouter,
302 OMH::Target: OffersMessageHandler,
303 APH::Target: AsyncPaymentsMessageHandler,
304 DRH::Target: DNSResolverMessageHandler,
305 CMH::Target: CustomOnionMessageHandler,
306{
307 entropy_source: ES,
308 #[cfg(test)]
309 pub(super) node_signer: NS,
310 #[cfg(not(test))]
311 node_signer: NS,
312 logger: L,
313 message_recipients: Mutex<HashMap<PublicKey, OnionMessageRecipient>>,
314 secp_ctx: Secp256k1<secp256k1::All>,
315 node_id_lookup: NL,
316 message_router: MR,
317 offers_handler: OMH,
318 #[allow(unused)]
319 async_payments_handler: APH,
320 dns_resolver_handler: DRH,
321 custom_handler: CMH,
322 intercept_messages_for_offline_peers: bool,
323 pending_intercepted_msgs_events: Mutex<Vec<Event>>,
324 pending_peer_connected_events: Mutex<Vec<Event>>,
325 pending_events_processor: AtomicBool,
326 event_notifier: Notifier,
329}
330
331enum OnionMessageRecipient {
333 ConnectedPeer(VecDeque<OnionMessage>),
335
336 PendingConnection(VecDeque<OnionMessage>, Option<Vec<SocketAddress>>, usize),
339}
340
341impl OnionMessageRecipient {
342 fn pending_connection(addresses: Vec<SocketAddress>) -> Self {
343 Self::PendingConnection(VecDeque::new(), Some(addresses), 0)
344 }
345
346 fn pending_messages(&self) -> &VecDeque<OnionMessage> {
347 match self {
348 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
349 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
350 }
351 }
352
353 fn enqueue_message(&mut self, message: OnionMessage) {
354 let pending_messages = match self {
355 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
356 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
357 };
358
359 pending_messages.push_back(message);
360 }
361
362 fn dequeue_message(&mut self) -> Option<OnionMessage> {
363 let pending_messages = match self {
364 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
365 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => {
366 debug_assert!(false);
367 pending_messages
368 },
369 };
370
371 pending_messages.pop_front()
372 }
373
374 #[cfg(test)]
375 fn release_pending_messages(&mut self) -> VecDeque<OnionMessage> {
376 let pending_messages = match self {
377 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
378 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
379 };
380
381 core::mem::take(pending_messages)
382 }
383
384 fn mark_connected(&mut self) {
385 if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self {
386 let mut new_pending_messages = VecDeque::new();
387 core::mem::swap(pending_messages, &mut new_pending_messages);
388 *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages);
389 }
390 }
391
392 fn is_connected(&self) -> bool {
393 match self {
394 OnionMessageRecipient::ConnectedPeer(..) => true,
395 OnionMessageRecipient::PendingConnection(..) => false,
396 }
397 }
398}
399
400#[derive(Clone, Debug, Eq, PartialEq)]
403pub struct Responder {
404 reply_path: BlindedMessagePath,
406}
407
408impl_writeable_tlv_based!(Responder, {
409 (0, reply_path, required),
410});
411
412impl Responder {
413 pub(super) fn new(reply_path: BlindedMessagePath) -> Self {
415 Responder { reply_path }
416 }
417
418 pub fn respond(self) -> ResponseInstruction {
422 ResponseInstruction {
423 destination: Destination::BlindedPath(self.reply_path),
424 context: None,
425 }
426 }
427
428 pub fn respond_with_reply_path(self, context: MessageContext) -> ResponseInstruction {
432 ResponseInstruction {
433 destination: Destination::BlindedPath(self.reply_path),
434 context: Some(context),
435 }
436 }
437
438 pub(crate) fn into_blinded_path(self) -> BlindedMessagePath {
440 self.reply_path
441 }
442}
443
444#[derive(Clone)]
446pub struct ResponseInstruction {
447 destination: Destination,
451 context: Option<MessageContext>,
452}
453
454impl ResponseInstruction {
455 pub fn into_instructions(self) -> MessageSendInstructions {
458 MessageSendInstructions::ForReply { instructions: self }
459 }
460}
461
462#[derive(Clone)]
464pub enum MessageSendInstructions {
465 WithSpecifiedReplyPath {
468 destination: Destination,
470 reply_path: BlindedMessagePath,
472 },
473 WithReplyPath {
476 destination: Destination,
478 context: MessageContext,
481 },
482 WithoutReplyPath {
485 destination: Destination,
487 },
488 ForReply {
490 instructions: ResponseInstruction,
492 },
493 ForwardedMessage {
502 destination: Destination,
504 reply_path: Option<BlindedMessagePath>,
507 },
508}
509
510pub trait MessageRouter {
512 fn find_path(
514 &self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination,
515 ) -> Result<OnionMessagePath, ()>;
516
517 fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
520 &self, recipient: PublicKey, local_node_receive_key: ReceiveAuthKey,
521 context: MessageContext, peers: Vec<MessageForwardNode>, secp_ctx: &Secp256k1<T>,
522 ) -> Result<Vec<BlindedMessagePath>, ()>;
523}
524
525pub struct DefaultMessageRouter<G: Deref<Target = NetworkGraph<L>>, L: Deref, ES: Deref>
550where
551 L::Target: Logger,
552 ES::Target: EntropySource,
553{
554 network_graph: G,
555 entropy_source: ES,
556}
557
558pub(crate) const PADDED_PATH_LENGTH: usize = 4;
565
566impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, ES: Deref> DefaultMessageRouter<G, L, ES>
567where
568 L::Target: Logger,
569 ES::Target: EntropySource,
570{
571 pub fn new(network_graph: G, entropy_source: ES) -> Self {
573 Self { network_graph, entropy_source }
574 }
575
576 pub(crate) fn create_blinded_paths_from_iter<
577 I: ExactSizeIterator<Item = MessageForwardNode>,
578 T: secp256k1::Signing + secp256k1::Verification,
579 >(
580 network_graph: &G, recipient: PublicKey, local_node_receive_key: ReceiveAuthKey,
581 context: MessageContext, peers: I, entropy_source: &ES, secp_ctx: &Secp256k1<T>,
582 compact_paths: bool,
583 ) -> Result<Vec<BlindedMessagePath>, ()> {
584 const MAX_PATHS: usize = 3;
586
587 const MIN_PEER_CHANNELS: usize = 3;
590
591 let network_graph = network_graph.deref().read_only();
592 let is_recipient_announced =
593 network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
594
595 let has_one_peer = peers.len() == 1;
596 let mut peer_info = peers
597 .map(|peer| MessageForwardNode {
598 short_channel_id: if compact_paths { peer.short_channel_id } else { None },
599 ..peer
600 })
601 .filter_map(|peer| {
603 network_graph
604 .node(&NodeId::from_pubkey(&peer.node_id))
605 .filter(|info| {
606 !is_recipient_announced || info.channels.len() >= MIN_PEER_CHANNELS
607 })
608 .map(|info| (peer, info.is_tor_only(), info.channels.len()))
609 .or_else(|| (!is_recipient_announced && has_one_peer).then(|| (peer, false, 0)))
611 })
612 .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
614 .collect::<Vec<_>>();
615
616 peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| {
618 a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse())
619 });
620
621 let build_path = |intermediate_hops: &[MessageForwardNode]| {
622 let dummy_hops_count = if compact_paths {
623 0
624 } else {
625 PADDED_PATH_LENGTH.saturating_sub(intermediate_hops.len() + 1)
627 };
628
629 BlindedMessagePath::new_with_dummy_hops(
630 intermediate_hops,
631 recipient,
632 dummy_hops_count,
633 local_node_receive_key,
634 context.clone(),
635 &**entropy_source,
636 secp_ctx,
637 )
638 };
639
640 let mut paths = peer_info
642 .into_iter()
643 .map(|(peer, _, _)| build_path(&[peer]))
644 .take(MAX_PATHS)
645 .collect::<Vec<_>>();
646 if paths.is_empty() {
647 if is_recipient_announced {
648 paths = vec![build_path(&[])];
649 } else {
650 return Err(());
651 }
652 }
653
654 if !compact_paths {
657 debug_assert!(paths.iter().all(|path| path.blinded_hops().len() == PADDED_PATH_LENGTH));
658 }
659
660 if compact_paths {
661 for path in &mut paths {
662 path.use_compact_introduction_node(&network_graph);
663 }
664 }
665
666 Ok(paths)
667 }
668
669 pub(crate) fn find_path(
670 network_graph: &G, sender: PublicKey, peers: Vec<PublicKey>, mut destination: Destination,
671 ) -> Result<OnionMessagePath, ()> {
672 let network_graph = network_graph.deref().read_only();
673 destination.resolve(&network_graph);
674
675 let first_node = match destination.first_node() {
676 Some(first_node) => first_node,
677 None => return Err(()),
678 };
679
680 if peers.contains(&first_node) || sender == first_node {
681 Ok(OnionMessagePath {
682 intermediate_nodes: vec![],
683 destination,
684 first_node_addresses: vec![],
685 })
686 } else {
687 let node_details = network_graph
688 .node(&NodeId::from_pubkey(&first_node))
689 .and_then(|node_info| node_info.announcement_info.as_ref())
690 .map(|announcement_info| {
691 (announcement_info.features(), announcement_info.addresses())
692 });
693
694 match node_details {
695 Some((features, addresses))
696 if features.supports_onion_messages() && addresses.len() > 0 =>
697 {
698 Ok(OnionMessagePath {
699 intermediate_nodes: vec![],
700 destination,
701 first_node_addresses: addresses.to_vec(),
702 })
703 },
704 None => {
705 Ok(OnionMessagePath {
708 intermediate_nodes: vec![],
709 destination,
710 first_node_addresses: vec![],
711 })
712 },
713 _ => Err(()),
714 }
715 }
716 }
717}
718
719impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter
720 for DefaultMessageRouter<G, L, ES>
721where
722 L::Target: Logger,
723 ES::Target: EntropySource,
724{
725 fn find_path(
726 &self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination,
727 ) -> Result<OnionMessagePath, ()> {
728 Self::find_path(&self.network_graph, sender, peers, destination)
729 }
730
731 fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
732 &self, recipient: PublicKey, local_node_receive_key: ReceiveAuthKey,
733 context: MessageContext, peers: Vec<MessageForwardNode>, secp_ctx: &Secp256k1<T>,
734 ) -> Result<Vec<BlindedMessagePath>, ()> {
735 Self::create_blinded_paths_from_iter(
736 &self.network_graph,
737 recipient,
738 local_node_receive_key,
739 context,
740 peers.into_iter(),
741 &self.entropy_source,
742 secp_ctx,
743 true,
744 )
745 }
746}
747
748pub struct NodeIdMessageRouter<G: Deref<Target = NetworkGraph<L>>, L: Deref, ES: Deref>
760where
761 L::Target: Logger,
762 ES::Target: EntropySource,
763{
764 network_graph: G,
765 entropy_source: ES,
766}
767
768impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, ES: Deref> NodeIdMessageRouter<G, L, ES>
769where
770 L::Target: Logger,
771 ES::Target: EntropySource,
772{
773 pub fn new(network_graph: G, entropy_source: ES) -> Self {
775 Self { network_graph, entropy_source }
776 }
777}
778
779impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter
780 for NodeIdMessageRouter<G, L, ES>
781where
782 L::Target: Logger,
783 ES::Target: EntropySource,
784{
785 fn find_path(
786 &self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination,
787 ) -> Result<OnionMessagePath, ()> {
788 DefaultMessageRouter::<G, L, ES>::find_path(&self.network_graph, sender, peers, destination)
789 }
790
791 fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
792 &self, recipient: PublicKey, local_node_receive_key: ReceiveAuthKey,
793 context: MessageContext, peers: Vec<MessageForwardNode>, secp_ctx: &Secp256k1<T>,
794 ) -> Result<Vec<BlindedMessagePath>, ()> {
795 DefaultMessageRouter::create_blinded_paths_from_iter(
796 &self.network_graph,
797 recipient,
798 local_node_receive_key,
799 context,
800 peers.into_iter(),
801 &self.entropy_source,
802 secp_ctx,
803 false,
804 )
805 }
806}
807
808pub struct NullMessageRouter {}
825
826impl MessageRouter for NullMessageRouter {
827 fn find_path(
828 &self, _sender: PublicKey, _peers: Vec<PublicKey>, _destination: Destination,
829 ) -> Result<OnionMessagePath, ()> {
830 Err(())
831 }
832
833 fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
834 &self, _recipient: PublicKey, _local_node_receive_key: ReceiveAuthKey,
835 _context: MessageContext, _peers: Vec<MessageForwardNode>, _secp_ctx: &Secp256k1<T>,
836 ) -> Result<Vec<BlindedMessagePath>, ()> {
837 Ok(vec![])
838 }
839}
840
841#[derive(Clone)]
843pub struct OnionMessagePath {
844 pub intermediate_nodes: Vec<PublicKey>,
846
847 pub destination: Destination,
849
850 pub first_node_addresses: Vec<SocketAddress>,
855}
856
857impl OnionMessagePath {
858 pub fn first_node(&self) -> Option<PublicKey> {
860 self.intermediate_nodes.first().copied().or_else(|| self.destination.first_node())
861 }
862}
863
864#[derive(Clone, Hash, Debug, PartialEq, Eq)]
866pub enum Destination {
867 Node(PublicKey),
869 BlindedPath(BlindedMessagePath),
871}
872
873impl Destination {
874 pub fn resolve(&mut self, network_graph: &ReadOnlyNetworkGraph) {
878 if let Destination::BlindedPath(path) = self {
879 if let IntroductionNode::DirectedShortChannelId(..) = path.introduction_node() {
880 if let Some(pubkey) = path
881 .public_introduction_node_id(network_graph)
882 .and_then(|node_id| node_id.as_pubkey().ok())
883 {
884 *path.introduction_node_mut() = IntroductionNode::NodeId(pubkey);
885 }
886 }
887 }
888 }
889
890 pub(super) fn num_hops(&self) -> usize {
891 match self {
892 Destination::Node(_) => 1,
893 Destination::BlindedPath(path) => path.blinded_hops().len(),
894 }
895 }
896
897 fn first_node(&self) -> Option<PublicKey> {
898 match self {
899 Destination::Node(node_id) => Some(*node_id),
900 Destination::BlindedPath(path) => match path.introduction_node() {
901 IntroductionNode::NodeId(pubkey) => Some(*pubkey),
902 IntroductionNode::DirectedShortChannelId(..) => None,
903 },
904 }
905 }
906}
907
908#[derive(Clone, Hash, Debug, PartialEq, Eq)]
912pub enum SendSuccess {
913 Buffered,
916 BufferedAwaitingConnection(PublicKey),
919}
920
921#[derive(Clone, Hash, Debug, PartialEq, Eq)]
925pub enum SendError {
926 Secp256k1(secp256k1::Error),
928 TooBigPacket,
931 TooFewBlindedHops,
934 InvalidFirstHop(PublicKey),
936 PathNotFound,
942 InvalidMessage,
944 BufferFull,
946 GetNodeIdFailed,
950 UnresolvedIntroductionNode,
954 BlindedPathAdvanceFailed,
959}
960
961pub trait CustomOnionMessageHandler {
972 type CustomMessage: OnionMessageContents;
975
976 fn handle_custom_message(
983 &self, message: Self::CustomMessage, context: Option<Vec<u8>>, responder: Option<Responder>,
984 ) -> Option<(Self::CustomMessage, ResponseInstruction)>;
985
986 fn read_custom_message<R: io::Read>(
989 &self, message_type: u64, buffer: &mut R,
990 ) -> Result<Option<Self::CustomMessage>, msgs::DecodeError>;
991
992 fn release_pending_custom_messages(
997 &self,
998 ) -> Vec<(Self::CustomMessage, MessageSendInstructions)>;
999}
1000
1001#[derive(Clone, Debug)]
1004pub enum PeeledOnion<T: OnionMessageContents> {
1005 Forward(NextMessageHop, OnionMessage),
1007 Offers(OffersMessage, Option<OffersContext>, Option<BlindedMessagePath>),
1009 AsyncPayments(AsyncPaymentsMessage, AsyncPaymentsContext, Option<BlindedMessagePath>),
1011 DNSResolver(DNSResolverMessage, Option<DNSResolverContext>, Option<BlindedMessagePath>),
1013 Custom(T, Option<Vec<u8>>, Option<BlindedMessagePath>),
1015}
1016
1017pub fn create_onion_message_resolving_destination<
1024 ES: Deref,
1025 NS: Deref,
1026 NL: Deref,
1027 T: OnionMessageContents,
1028>(
1029 entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
1030 network_graph: &ReadOnlyNetworkGraph, secp_ctx: &Secp256k1<secp256k1::All>,
1031 mut path: OnionMessagePath, contents: T, reply_path: Option<BlindedMessagePath>,
1032) -> Result<(PublicKey, OnionMessage, Vec<SocketAddress>), SendError>
1033where
1034 ES::Target: EntropySource,
1035 NS::Target: NodeSigner,
1036 NL::Target: NodeIdLookUp,
1037{
1038 path.destination.resolve(network_graph);
1039 create_onion_message(
1040 entropy_source,
1041 node_signer,
1042 node_id_lookup,
1043 secp_ctx,
1044 path,
1045 contents,
1046 reply_path,
1047 )
1048}
1049
1050pub fn create_onion_message<ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents>(
1062 entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
1063 secp_ctx: &Secp256k1<secp256k1::All>, path: OnionMessagePath, contents: T,
1064 reply_path: Option<BlindedMessagePath>,
1065) -> Result<(PublicKey, OnionMessage, Vec<SocketAddress>), SendError>
1066where
1067 ES::Target: EntropySource,
1068 NS::Target: NodeSigner,
1069 NL::Target: NodeIdLookUp,
1070{
1071 let OnionMessagePath { intermediate_nodes, mut destination, first_node_addresses } = path;
1072 if let Destination::BlindedPath(ref path) = destination {
1073 if path.blinded_hops().is_empty() {
1074 return Err(SendError::TooFewBlindedHops);
1075 }
1076 }
1077
1078 if contents.tlv_type() < 64 {
1079 return Err(SendError::InvalidMessage);
1080 }
1081
1082 if intermediate_nodes.len() == 0 {
1085 if let Destination::BlindedPath(ref mut blinded_path) = destination {
1086 let our_node_id = node_signer
1087 .get_node_id(Recipient::Node)
1088 .map_err(|()| SendError::GetNodeIdFailed)?;
1089 let introduction_node_id = match blinded_path.introduction_node() {
1090 IntroductionNode::NodeId(pubkey) => *pubkey,
1091 IntroductionNode::DirectedShortChannelId(direction, scid) => {
1092 match node_id_lookup.next_node_id(*scid) {
1093 Some(next_node_id) => *direction.select_pubkey(&our_node_id, &next_node_id),
1094 None => return Err(SendError::UnresolvedIntroductionNode),
1095 }
1096 },
1097 };
1098 if introduction_node_id == our_node_id {
1099 blinded_path
1100 .advance_path_by_one(node_signer, node_id_lookup, &secp_ctx)
1101 .map_err(|()| SendError::BlindedPathAdvanceFailed)?;
1102 }
1103 }
1104 }
1105
1106 let blinding_secret_bytes = entropy_source.get_secure_random_bytes();
1107 let blinding_secret = SecretKey::from_slice(&blinding_secret_bytes[..]).expect("RNG is busted");
1108 let (first_node_id, blinding_point) = if let Some(first_node_id) = intermediate_nodes.first() {
1109 (*first_node_id, PublicKey::from_secret_key(&secp_ctx, &blinding_secret))
1110 } else {
1111 match &destination {
1112 Destination::Node(pk) => (*pk, PublicKey::from_secret_key(&secp_ctx, &blinding_secret)),
1113 Destination::BlindedPath(path) => match path.introduction_node() {
1114 IntroductionNode::NodeId(pubkey) => (*pubkey, path.blinding_point()),
1115 IntroductionNode::DirectedShortChannelId(..) => {
1116 return Err(SendError::UnresolvedIntroductionNode);
1117 },
1118 },
1119 }
1120 };
1121 let (packet_payloads, packet_keys) = packet_payloads_and_keys(
1122 &secp_ctx,
1123 intermediate_nodes,
1124 destination,
1125 contents,
1126 reply_path,
1127 &blinding_secret,
1128 )?;
1129
1130 let prng_seed = entropy_source.get_secure_random_bytes();
1131 let onion_routing_packet =
1132 construct_onion_message_packet(packet_payloads, packet_keys, prng_seed)
1133 .map_err(|()| SendError::TooBigPacket)?;
1134
1135 let message = OnionMessage { blinding_point, onion_routing_packet };
1136 Ok((first_node_id, message, first_node_addresses))
1137}
1138
1139pub fn peel_onion_message<NS: Deref, L: Deref, CMH: Deref>(
1144 msg: &OnionMessage, secp_ctx: &Secp256k1<secp256k1::All>, node_signer: NS, logger: L,
1145 custom_handler: CMH,
1146) -> Result<PeeledOnion<<<CMH>::Target as CustomOnionMessageHandler>::CustomMessage>, ()>
1147where
1148 NS::Target: NodeSigner,
1149 L::Target: Logger,
1150 CMH::Target: CustomOnionMessageHandler,
1151{
1152 let control_tlvs_ss = match node_signer.ecdh(Recipient::Node, &msg.blinding_point, None) {
1153 Ok(ss) => ss,
1154 Err(e) => {
1155 log_error!(logger, "Failed to retrieve node secret: {:?}", e);
1156 return Err(());
1157 },
1158 };
1159 let onion_decode_ss = {
1160 let blinding_factor = {
1161 let mut hmac = HmacEngine::<Sha256>::new(b"blinded_node_id");
1162 hmac.input(control_tlvs_ss.as_ref());
1163 let hmac = Hmac::from_engine(hmac).to_byte_array();
1164 Scalar::from_be_bytes(hmac).unwrap()
1165 };
1166 let packet_pubkey = &msg.onion_routing_packet.public_key;
1167 match node_signer.ecdh(Recipient::Node, packet_pubkey, Some(&blinding_factor)) {
1168 Ok(ss) => ss.secret_bytes(),
1169 Err(()) => {
1170 log_trace!(logger, "Failed to compute onion packet shared secret");
1171 return Err(());
1172 },
1173 }
1174 };
1175 let receiving_context_auth_key = node_signer.get_receive_auth_key();
1176 let next_hop = onion_utils::decode_next_untagged_hop(
1177 onion_decode_ss,
1178 &msg.onion_routing_packet.hop_data[..],
1179 msg.onion_routing_packet.hmac,
1180 (control_tlvs_ss, custom_handler.deref(), receiving_context_auth_key, logger.deref()),
1181 );
1182
1183 let build_outbound_onion_message = |packet_pubkey: PublicKey,
1185 next_hop_hmac: [u8; 32],
1186 new_packet_bytes: Vec<u8>,
1187 blinding_point_opt: Option<PublicKey>|
1188 -> Result<OnionMessage, ()> {
1189 let new_pubkey =
1190 match onion_utils::next_hop_pubkey(&secp_ctx, packet_pubkey, &onion_decode_ss) {
1191 Ok(pk) => pk,
1192 Err(e) => {
1193 log_trace!(logger, "Failed to compute next hop packet pubkey: {}", e);
1194 return Err(());
1195 },
1196 };
1197 let outgoing_packet = Packet {
1198 version: 0,
1199 public_key: new_pubkey,
1200 hop_data: new_packet_bytes,
1201 hmac: next_hop_hmac,
1202 };
1203 let blinding_point = match blinding_point_opt {
1204 Some(bp) => bp,
1205 None => match onion_utils::next_hop_pubkey(
1206 &secp_ctx,
1207 msg.blinding_point,
1208 control_tlvs_ss.as_ref(),
1209 ) {
1210 Ok(bp) => bp,
1211 Err(e) => {
1212 log_trace!(logger, "Failed to compute next blinding point: {}", e);
1213 return Err(());
1214 },
1215 },
1216 };
1217 Ok(OnionMessage { blinding_point, onion_routing_packet: outgoing_packet })
1218 };
1219
1220 match next_hop {
1221 Ok((
1222 Payload::Receive {
1223 message,
1224 control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { context }),
1225 reply_path,
1226 control_tlvs_authenticated,
1227 },
1228 None,
1229 )) => match (message, context) {
1230 (ParsedOnionMessageContents::Offers(msg), Some(MessageContext::Offers(ctx))) => {
1231 match ctx {
1232 OffersContext::InvoiceRequest { .. } => {
1233 },
1239 _ => {
1240 if !control_tlvs_authenticated {
1241 log_trace!(logger, "Received an unauthenticated offers onion message");
1242 return Err(());
1243 }
1244 },
1245 }
1246 Ok(PeeledOnion::Offers(msg, Some(ctx), reply_path))
1247 },
1248 (ParsedOnionMessageContents::Offers(msg), None) => {
1249 Ok(PeeledOnion::Offers(msg, None, reply_path))
1250 },
1251 (
1252 ParsedOnionMessageContents::AsyncPayments(msg),
1253 Some(MessageContext::AsyncPayments(ctx)),
1254 ) => {
1255 if !control_tlvs_authenticated {
1256 log_trace!(logger, "Received an unauthenticated async payments onion message");
1257 return Err(());
1258 }
1259 Ok(PeeledOnion::AsyncPayments(msg, ctx, reply_path))
1260 },
1261 (ParsedOnionMessageContents::Custom(msg), Some(MessageContext::Custom(ctx))) => {
1262 if !control_tlvs_authenticated {
1263 log_trace!(logger, "Received an unauthenticated custom onion message");
1264 return Err(());
1265 }
1266 Ok(PeeledOnion::Custom(msg, Some(ctx), reply_path))
1267 },
1268 (ParsedOnionMessageContents::Custom(msg), None) => {
1269 Ok(PeeledOnion::Custom(msg, None, reply_path))
1270 },
1271 (
1272 ParsedOnionMessageContents::DNSResolver(msg),
1273 Some(MessageContext::DNSResolver(ctx)),
1274 ) => {
1275 if !control_tlvs_authenticated {
1276 log_trace!(logger, "Received an unauthenticated DNS resolver onion message");
1277 return Err(());
1278 }
1279 Ok(PeeledOnion::DNSResolver(msg, Some(ctx), reply_path))
1280 },
1281 (ParsedOnionMessageContents::DNSResolver(msg), None) => {
1282 Ok(PeeledOnion::DNSResolver(msg, None, reply_path))
1283 },
1284 _ => {
1285 log_trace!(
1286 logger,
1287 "Received message was sent on a blinded path with wrong or missing context."
1288 );
1289 Err(())
1290 },
1291 },
1292 Ok((
1293 Payload::Dummy { control_tlvs_authenticated },
1294 Some((next_hop_hmac, new_packet_bytes)),
1295 )) => {
1296 if !control_tlvs_authenticated {
1297 log_trace!(logger, "Received an unauthenticated dummy onion message");
1298 return Err(());
1299 }
1300
1301 let onion_message = build_outbound_onion_message(
1302 msg.onion_routing_packet.public_key,
1303 next_hop_hmac,
1304 new_packet_bytes,
1305 None,
1306 )?;
1307 peel_onion_message(&onion_message, secp_ctx, node_signer, logger, custom_handler)
1308 },
1309 Ok((
1310 Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
1311 next_hop,
1312 next_blinding_override,
1313 })),
1314 Some((next_hop_hmac, new_packet_bytes)),
1315 )) => {
1316 let onion_message = build_outbound_onion_message(
1317 msg.onion_routing_packet.public_key,
1318 next_hop_hmac,
1319 new_packet_bytes,
1320 next_blinding_override,
1321 )?;
1322
1323 Ok(PeeledOnion::Forward(next_hop, onion_message))
1324 },
1325 Err(e) => {
1326 log_trace!(logger, "Errored decoding onion message packet: {:?}", e);
1327 Err(())
1328 },
1329 _ => {
1330 log_trace!(logger, "Received bogus onion message packet, either the sender encoded a final hop as a forwarding hop or vice versa");
1331 Err(())
1332 },
1333 }
1334}
1335
1336macro_rules! drop_handled_events_and_abort {
1337 ($self: expr, $res_iter: expr, $event_queue: expr) => {
1338 {
1342 let mut queue_lock = $event_queue.lock().unwrap();
1343
1344 let mut any_error = false;
1347 queue_lock.retain(|_| {
1348 $res_iter.next().map_or(true, |r| {
1349 let is_err = r.is_err();
1350 any_error |= is_err;
1351 is_err
1352 })
1353 });
1354
1355 if any_error {
1356 $self.pending_events_processor.store(false, Ordering::Release);
1358 $self.event_notifier.notify();
1359 return;
1360 }
1361 }
1362 };
1363}
1364
1365impl<
1366 ES: Deref,
1367 NS: Deref,
1368 L: Deref,
1369 NL: Deref,
1370 MR: Deref,
1371 OMH: Deref,
1372 APH: Deref,
1373 DRH: Deref,
1374 CMH: Deref,
1375 > OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
1376where
1377 ES::Target: EntropySource,
1378 NS::Target: NodeSigner,
1379 L::Target: Logger,
1380 NL::Target: NodeIdLookUp,
1381 MR::Target: MessageRouter,
1382 OMH::Target: OffersMessageHandler,
1383 APH::Target: AsyncPaymentsMessageHandler,
1384 DRH::Target: DNSResolverMessageHandler,
1385 CMH::Target: CustomOnionMessageHandler,
1386{
1387 pub fn new(
1390 entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
1391 offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
1392 ) -> Self {
1393 Self::new_inner(
1394 entropy_source,
1395 node_signer,
1396 logger,
1397 node_id_lookup,
1398 message_router,
1399 offers_handler,
1400 async_payments_handler,
1401 dns_resolver,
1402 custom_handler,
1403 false,
1404 )
1405 }
1406
1407 pub fn new_with_offline_peer_interception(
1429 entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
1430 offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
1431 ) -> Self {
1432 Self::new_inner(
1433 entropy_source,
1434 node_signer,
1435 logger,
1436 node_id_lookup,
1437 message_router,
1438 offers_handler,
1439 async_payments_handler,
1440 dns_resolver,
1441 custom_handler,
1442 true,
1443 )
1444 }
1445
1446 fn new_inner(
1447 entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
1448 offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
1449 intercept_messages_for_offline_peers: bool,
1450 ) -> Self {
1451 let mut secp_ctx = Secp256k1::new();
1452 secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
1453 OnionMessenger {
1454 entropy_source,
1455 node_signer,
1456 message_recipients: Mutex::new(new_hash_map()),
1457 secp_ctx,
1458 logger,
1459 node_id_lookup,
1460 message_router,
1461 offers_handler,
1462 async_payments_handler,
1463 dns_resolver_handler: dns_resolver,
1464 custom_handler,
1465 intercept_messages_for_offline_peers,
1466 pending_intercepted_msgs_events: Mutex::new(Vec::new()),
1467 pending_peer_connected_events: Mutex::new(Vec::new()),
1468 pending_events_processor: AtomicBool::new(false),
1469 event_notifier: Notifier::new(),
1470 }
1471 }
1472
1473 #[cfg(any(test, feature = "_test_utils"))]
1474 pub fn set_offers_handler(&mut self, offers_handler: OMH) {
1475 self.offers_handler = offers_handler;
1476 }
1477
1478 #[cfg(any(test, feature = "_test_utils"))]
1479 pub fn set_async_payments_handler(&mut self, async_payments_handler: APH) {
1480 self.async_payments_handler = async_payments_handler;
1481 }
1482
1483 pub fn send_onion_message<T: OnionMessageContents>(
1485 &self, contents: T, instructions: MessageSendInstructions,
1486 ) -> Result<SendSuccess, SendError> {
1487 self.send_onion_message_internal(contents, instructions, format_args!(""))
1488 }
1489
1490 fn send_onion_message_internal<T: OnionMessageContents>(
1491 &self, contents: T, instructions: MessageSendInstructions, log_suffix: fmt::Arguments,
1492 ) -> Result<SendSuccess, SendError> {
1493 let is_forward = matches!(instructions, MessageSendInstructions::ForwardedMessage { .. });
1494 let (destination, reply_path) = match instructions {
1495 MessageSendInstructions::WithSpecifiedReplyPath { destination, reply_path } => {
1496 (destination, Some(reply_path))
1497 },
1498 MessageSendInstructions::WithReplyPath { destination, context }
1499 | MessageSendInstructions::ForReply {
1500 instructions: ResponseInstruction { destination, context: Some(context) },
1501 } => match self.create_blinded_path(context) {
1502 Ok(reply_path) => (destination, Some(reply_path)),
1503 Err(err) => {
1504 log_trace!(
1505 self.logger,
1506 "Failed to create reply path {}: {:?}",
1507 log_suffix,
1508 err
1509 );
1510 return Err(err);
1511 },
1512 },
1513 MessageSendInstructions::WithoutReplyPath { destination }
1514 | MessageSendInstructions::ForReply {
1515 instructions: ResponseInstruction { destination, context: None },
1516 } => (destination, None),
1517 MessageSendInstructions::ForwardedMessage { destination, reply_path } => {
1518 (destination, reply_path)
1519 },
1520 };
1521
1522 let path = if is_forward {
1523 OnionMessagePath {
1525 intermediate_nodes: Vec::new(),
1526 first_node_addresses: Vec::new(),
1527 destination,
1528 }
1529 } else {
1530 self.find_path(destination).map_err(|e| {
1531 log_trace!(self.logger, "Failed to find path {}", log_suffix);
1532 e
1533 })?
1534 };
1535 let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
1536 let logger = WithContext::from(&self.logger, first_hop, None, None);
1537
1538 log_trace!(logger, "Constructing onion message {}: {:?}", log_suffix, contents);
1539 let (first_node_id, onion_message, addresses) = create_onion_message(
1540 &self.entropy_source,
1541 &self.node_signer,
1542 &self.node_id_lookup,
1543 &self.secp_ctx,
1544 path,
1545 contents,
1546 reply_path,
1547 )
1548 .map_err(|e| {
1549 log_warn!(logger, "Failed to create onion message with {:?} {}", e, log_suffix);
1550 e
1551 })?;
1552
1553 let result = if is_forward {
1554 self.enqueue_forwarded_onion_message(
1555 NextMessageHop::NodeId(first_node_id),
1556 onion_message,
1557 log_suffix,
1558 )
1559 .map(|()| SendSuccess::Buffered)
1560 } else {
1561 self.enqueue_outbound_onion_message(onion_message, first_node_id, addresses)
1562 };
1563
1564 match result.as_ref() {
1565 Err(SendError::GetNodeIdFailed) => {
1566 log_warn!(logger, "Unable to retrieve node id {}", log_suffix);
1567 },
1568 Err(SendError::PathNotFound) => {
1569 log_trace!(logger, "Failed to find path {}", log_suffix);
1570 },
1571 Err(e) => {
1572 log_trace!(logger, "Failed sending onion message {}: {:?}", log_suffix, e);
1573 },
1574 Ok(SendSuccess::Buffered) => {
1575 log_trace!(logger, "Buffered onion message {}", log_suffix);
1576 },
1577 Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => {
1578 log_trace!(
1579 logger,
1580 "Buffered onion message waiting on peer connection {}: {}",
1581 log_suffix,
1582 node_id
1583 );
1584 },
1585 }
1586
1587 result
1588 }
1589
1590 fn find_path(&self, destination: Destination) -> Result<OnionMessagePath, SendError> {
1591 let sender = self
1592 .node_signer
1593 .get_node_id(Recipient::Node)
1594 .map_err(|_| SendError::GetNodeIdFailed)?;
1595
1596 let peers = self
1597 .message_recipients
1598 .lock()
1599 .unwrap()
1600 .iter()
1601 .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_)))
1602 .map(|(node_id, _)| *node_id)
1603 .collect();
1604
1605 self.message_router
1606 .find_path(sender, peers, destination)
1607 .map_err(|_| SendError::PathNotFound)
1608 }
1609
1610 fn create_blinded_path(
1611 &self, context: MessageContext,
1612 ) -> Result<BlindedMessagePath, SendError> {
1613 let recipient = self
1614 .node_signer
1615 .get_node_id(Recipient::Node)
1616 .map_err(|_| SendError::GetNodeIdFailed)?;
1617 let secp_ctx = &self.secp_ctx;
1618
1619 let peers = {
1620 let message_recipients = self.message_recipients.lock().unwrap();
1621 message_recipients
1622 .iter()
1623 .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_)))
1624 .map(|(node_id, _)| MessageForwardNode {
1625 node_id: *node_id,
1626 short_channel_id: None,
1627 })
1628 .collect::<Vec<_>>()
1629 };
1630
1631 self.message_router
1632 .create_blinded_paths(
1633 recipient,
1634 self.node_signer.get_receive_auth_key(),
1635 context,
1636 peers,
1637 secp_ctx,
1638 )
1639 .and_then(|paths| paths.into_iter().next().ok_or(()))
1640 .map_err(|_| SendError::PathNotFound)
1641 }
1642
1643 fn enqueue_outbound_onion_message(
1644 &self, onion_message: OnionMessage, first_node_id: PublicKey, addresses: Vec<SocketAddress>,
1645 ) -> Result<SendSuccess, SendError> {
1646 let mut message_recipients = self.message_recipients.lock().unwrap();
1647 if outbound_buffer_full(&first_node_id, &message_recipients) {
1648 return Err(SendError::BufferFull);
1649 }
1650
1651 match message_recipients.entry(first_node_id) {
1652 hash_map::Entry::Vacant(e) => {
1653 e.insert(OnionMessageRecipient::pending_connection(addresses))
1654 .enqueue_message(onion_message);
1655 self.event_notifier.notify();
1656 Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
1657 },
1658 hash_map::Entry::Occupied(mut e) => {
1659 e.get_mut().enqueue_message(onion_message);
1660 if e.get().is_connected() {
1661 Ok(SendSuccess::Buffered)
1662 } else {
1663 Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
1664 }
1665 },
1666 }
1667 }
1668
1669 fn enqueue_forwarded_onion_message(
1670 &self, next_hop: NextMessageHop, onion_message: OnionMessage, log_suffix: fmt::Arguments,
1671 ) -> Result<(), SendError> {
1672 let next_node_id = match next_hop {
1673 NextMessageHop::NodeId(pubkey) => pubkey,
1674 NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) {
1675 Some(pubkey) => pubkey,
1676 None => {
1677 log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {} {}", scid, log_suffix);
1678 return Err(SendError::GetNodeIdFailed);
1679 },
1680 },
1681 };
1682
1683 let mut message_recipients = self.message_recipients.lock().unwrap();
1684 if outbound_buffer_full(&next_node_id, &message_recipients) {
1685 log_trace!(
1686 self.logger,
1687 "Dropping forwarded onion message to peer {}: outbound buffer full {}",
1688 next_node_id,
1689 log_suffix
1690 );
1691 return Err(SendError::BufferFull);
1692 }
1693
1694 #[cfg(fuzzing)]
1695 message_recipients
1696 .entry(next_node_id)
1697 .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()));
1698
1699 match message_recipients.entry(next_node_id) {
1700 hash_map::Entry::Occupied(mut e)
1701 if matches!(e.get(), OnionMessageRecipient::ConnectedPeer(..)) =>
1702 {
1703 e.get_mut().enqueue_message(onion_message);
1704 log_trace!(
1705 self.logger,
1706 "Forwarding an onion message to peer {} {}",
1707 next_node_id,
1708 log_suffix
1709 );
1710 Ok(())
1711 },
1712 _ if self.intercept_messages_for_offline_peers => {
1713 log_trace!(
1714 self.logger,
1715 "Generating OnionMessageIntercepted event for peer {} {}",
1716 next_node_id,
1717 log_suffix
1718 );
1719 self.enqueue_intercepted_event(Event::OnionMessageIntercepted {
1720 peer_node_id: next_node_id,
1721 message: onion_message,
1722 });
1723 Ok(())
1724 },
1725 _ => {
1726 log_trace!(
1727 self.logger,
1728 "Dropping forwarded onion message to disconnected peer {} {}",
1729 next_node_id,
1730 log_suffix
1731 );
1732 Err(SendError::InvalidFirstHop(next_node_id))
1733 },
1734 }
1735 }
1736
1737 pub fn forward_onion_message(
1742 &self, message: OnionMessage, peer_node_id: &PublicKey,
1743 ) -> Result<(), SendError> {
1744 let mut message_recipients = self.message_recipients.lock().unwrap();
1745 if outbound_buffer_full(&peer_node_id, &message_recipients) {
1746 return Err(SendError::BufferFull);
1747 }
1748
1749 match message_recipients.entry(*peer_node_id) {
1750 hash_map::Entry::Occupied(mut e) if e.get().is_connected() => {
1751 e.get_mut().enqueue_message(message);
1752 Ok(())
1753 },
1754 _ => Err(SendError::InvalidFirstHop(*peer_node_id)),
1755 }
1756 }
1757
1758 #[cfg(any(test, feature = "_test_utils"))]
1759 pub fn send_onion_message_using_path<T: OnionMessageContents>(
1760 &self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedMessagePath>,
1761 ) -> Result<SendSuccess, SendError> {
1762 let (first_node_id, onion_message, addresses) = create_onion_message(
1763 &self.entropy_source,
1764 &self.node_signer,
1765 &self.node_id_lookup,
1766 &self.secp_ctx,
1767 path,
1768 contents,
1769 reply_path,
1770 )?;
1771 self.enqueue_outbound_onion_message(onion_message, first_node_id, addresses)
1772 }
1773
1774 pub(crate) fn peel_onion_message(
1775 &self, msg: &OnionMessage,
1776 ) -> Result<PeeledOnion<<<CMH>::Target as CustomOnionMessageHandler>::CustomMessage>, ()> {
1777 peel_onion_message(
1778 msg,
1779 &self.secp_ctx,
1780 &*self.node_signer,
1781 &*self.logger,
1782 &*self.custom_handler,
1783 )
1784 }
1785
1786 pub fn handle_onion_message_response<T: OnionMessageContents>(
1795 &self, response: T, instructions: ResponseInstruction,
1796 ) -> Result<SendSuccess, SendError> {
1797 let message_type = response.msg_type();
1798 self.send_onion_message_internal(
1799 response,
1800 instructions.into_instructions(),
1801 format_args!("when responding with {} to an onion message", message_type,),
1802 )
1803 }
1804
1805 #[cfg(test)]
1806 pub(crate) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<OnionMessage>> {
1807 self.enqueue_messages_from_handlers();
1808
1809 let mut message_recipients = self.message_recipients.lock().unwrap();
1810 let mut msgs = new_hash_map();
1811 for (node_id, recipient) in &mut *message_recipients {
1814 msgs.insert(*node_id, recipient.release_pending_messages());
1815 }
1816 msgs
1817 }
1818
1819 fn enqueue_messages_from_handlers(&self) {
1821 for (message, instructions) in self.offers_handler.release_pending_messages() {
1823 let _ = self.send_onion_message_internal(
1824 message,
1825 instructions,
1826 format_args!("when sending OffersMessage"),
1827 );
1828 }
1829
1830 for (message, instructions) in self.async_payments_handler.release_pending_messages() {
1831 let _ = self.send_onion_message_internal(
1832 message,
1833 instructions,
1834 format_args!("when sending AsyncPaymentsMessage"),
1835 );
1836 }
1837
1838 for (message, instructions) in self.dns_resolver_handler.release_pending_messages() {
1840 let _ = self.send_onion_message_internal(
1841 message,
1842 instructions,
1843 format_args!("when sending DNSResolverMessage"),
1844 );
1845 }
1846
1847 for (message, instructions) in self.custom_handler.release_pending_custom_messages() {
1849 let _ = self.send_onion_message_internal(
1850 message,
1851 instructions,
1852 format_args!("when sending CustomMessage"),
1853 );
1854 }
1855 }
1856
1857 fn enqueue_intercepted_event(&self, event: Event) {
1858 const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
1859 let mut pending_intercepted_msgs_events =
1860 self.pending_intercepted_msgs_events.lock().unwrap();
1861 let total_buffered_bytes: usize =
1862 pending_intercepted_msgs_events.iter().map(|ev| ev.serialized_length()).sum();
1863 if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
1864 log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
1865 return;
1866 }
1867 pending_intercepted_msgs_events.push(event);
1868 self.event_notifier.notify();
1869 }
1870
1871 pub fn get_update_future(&self) -> Future {
1879 self.event_notifier.get_future()
1880 }
1881
1882 pub async fn process_pending_events_async<
1890 Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin,
1891 H: Fn(Event) -> Future,
1892 >(
1893 &self, handler: H,
1894 ) {
1895 if self
1896 .pending_events_processor
1897 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
1898 .is_err()
1899 {
1900 return;
1901 }
1902
1903 {
1904 let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
1905 let mut futures = Vec::with_capacity(intercepted_msgs.len());
1906 for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1907 if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1908 if let Some(addresses) = addresses.take() {
1909 let event = Event::ConnectionNeeded { node_id: *node_id, addresses };
1910 log_trace!(self.logger, "Handling event {:?} async...", event);
1911 let future = ResultFuture::Pending(handler(event));
1912 futures.push(future);
1913 }
1914 }
1915 }
1916
1917 let intercepted_msgs_offset = futures.len();
1920
1921 for ev in intercepted_msgs {
1922 if let Event::OnionMessageIntercepted { .. } = ev {
1923 } else {
1924 debug_assert!(false);
1925 }
1926 log_trace!(self.logger, "Handling event {:?} async...", ev);
1927 let future = ResultFuture::Pending(handler(ev));
1928 futures.push(future);
1929 }
1930
1931 if !futures.is_empty() {
1932 let res = MultiResultFuturePoller::new(futures).await;
1934 log_trace!(self.logger, "Done handling events async, results: {:?}", res);
1935 let mut res_iter = res.iter().skip(intercepted_msgs_offset);
1936 drop_handled_events_and_abort!(
1937 self,
1938 res_iter,
1939 self.pending_intercepted_msgs_events
1940 );
1941 }
1942 }
1943
1944 {
1945 let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
1946 let num_peer_connecteds = peer_connecteds.len();
1947 if num_peer_connecteds <= 1 {
1948 for event in peer_connecteds {
1949 if handler(event).await.is_ok() {
1950 let mut pending_peer_connected_events =
1951 self.pending_peer_connected_events.lock().unwrap();
1952 pending_peer_connected_events.drain(..num_peer_connecteds);
1953 } else {
1954 self.pending_events_processor.store(false, Ordering::Release);
1956 return;
1957 }
1958 }
1959 } else {
1960 let mut futures = Vec::new();
1961 for event in peer_connecteds {
1962 log_trace!(self.logger, "Handling event {:?} async...", event);
1963 let future = ResultFuture::Pending(handler(event));
1964 futures.push(future);
1965 }
1966
1967 if !futures.is_empty() {
1968 let res = MultiResultFuturePoller::new(futures).await;
1969 log_trace!(self.logger, "Done handling events async, results: {:?}", res);
1970 let mut res_iter = res.iter();
1971 drop_handled_events_and_abort!(
1972 self,
1973 res_iter,
1974 self.pending_peer_connected_events
1975 );
1976 }
1977 }
1978 }
1979 self.pending_events_processor.store(false, Ordering::Release);
1980 }
1981}
1982
1983const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
1984pub(super) const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
1985
1986fn outbound_buffer_full(
1987 peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageRecipient>,
1988) -> bool {
1989 let mut total_buffered_bytes = 0;
1990 let mut peer_buffered_bytes = 0;
1991 for (pk, peer_buf) in buffer {
1992 for om in peer_buf.pending_messages() {
1993 let om_len = om.serialized_length();
1994 if pk == peer_node_id {
1995 peer_buffered_bytes += om_len;
1996 }
1997 total_buffered_bytes += om_len;
1998
1999 if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE
2000 || peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
2001 {
2002 return true;
2003 }
2004 }
2005 }
2006 false
2007}
2008
2009impl<
2010 ES: Deref,
2011 NS: Deref,
2012 L: Deref,
2013 NL: Deref,
2014 MR: Deref,
2015 OMH: Deref,
2016 APH: Deref,
2017 DRH: Deref,
2018 CMH: Deref,
2019 > EventsProvider for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
2020where
2021 ES::Target: EntropySource,
2022 NS::Target: NodeSigner,
2023 L::Target: Logger,
2024 NL::Target: NodeIdLookUp,
2025 MR::Target: MessageRouter,
2026 OMH::Target: OffersMessageHandler,
2027 APH::Target: AsyncPaymentsMessageHandler,
2028 DRH::Target: DNSResolverMessageHandler,
2029 CMH::Target: CustomOnionMessageHandler,
2030{
2031 fn process_pending_events<H: Deref>(&self, handler: H)
2032 where
2033 H::Target: EventHandler,
2034 {
2035 if self
2036 .pending_events_processor
2037 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
2038 .is_err()
2039 {
2040 return;
2041 }
2042
2043 for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
2044 if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
2045 if let Some(addresses) = addresses.take() {
2046 let event = Event::ConnectionNeeded { node_id: *node_id, addresses };
2047 log_trace!(self.logger, "Handling event {:?}...", event);
2048 let res = handler.handle_event(event);
2049 log_trace!(self.logger, "Done handling event, ignoring result: {:?}", res);
2050 }
2051 }
2052 }
2053 let intercepted_msgs;
2054 let peer_connecteds;
2055 {
2056 let pending_intercepted_msgs_events =
2057 self.pending_intercepted_msgs_events.lock().unwrap();
2058 intercepted_msgs = pending_intercepted_msgs_events.clone();
2059 let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
2060 peer_connecteds = pending_peer_connected_events.clone();
2061 #[cfg(debug_assertions)]
2062 {
2063 for ev in pending_intercepted_msgs_events.iter() {
2064 if let Event::OnionMessageIntercepted { .. } = ev {
2065 } else {
2066 panic!();
2067 }
2068 }
2069 for ev in pending_peer_connected_events.iter() {
2070 if let Event::OnionMessagePeerConnected { .. } = ev {
2071 } else {
2072 panic!();
2073 }
2074 }
2075 }
2076 }
2077
2078 let mut handling_intercepted_msgs_failed = false;
2079 let mut num_handled_intercepted_events = 0;
2080 for ev in intercepted_msgs {
2081 log_trace!(self.logger, "Handling event {:?}...", ev);
2082 let res = handler.handle_event(ev);
2083 log_trace!(self.logger, "Done handling event, result: {:?}", res);
2084 match res {
2085 Ok(()) => num_handled_intercepted_events += 1,
2086 Err(ReplayEvent()) => {
2087 handling_intercepted_msgs_failed = true;
2088 break;
2089 },
2090 }
2091 }
2092
2093 {
2094 let mut pending_intercepted_msgs_events =
2095 self.pending_intercepted_msgs_events.lock().unwrap();
2096 pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
2097 }
2098
2099 if handling_intercepted_msgs_failed {
2100 self.pending_events_processor.store(false, Ordering::Release);
2101 self.event_notifier.notify();
2102 return;
2103 }
2104
2105 let mut num_handled_peer_connecteds = 0;
2106 for ev in peer_connecteds {
2107 log_trace!(self.logger, "Handling event {:?}...", ev);
2108 let res = handler.handle_event(ev);
2109 log_trace!(self.logger, "Done handling event, result: {:?}", res);
2110 match res {
2111 Ok(()) => num_handled_peer_connecteds += 1,
2112 Err(ReplayEvent()) => {
2113 self.event_notifier.notify();
2114 break;
2115 },
2116 }
2117 }
2118
2119 {
2120 let mut pending_peer_connected_events =
2121 self.pending_peer_connected_events.lock().unwrap();
2122 pending_peer_connected_events.drain(..num_handled_peer_connecteds);
2123 pending_peer_connected_events.shrink_to(10); }
2125
2126 self.pending_events_processor.store(false, Ordering::Release);
2127 }
2128}
2129
2130impl<
2131 ES: Deref,
2132 NS: Deref,
2133 L: Deref,
2134 NL: Deref,
2135 MR: Deref,
2136 OMH: Deref,
2137 APH: Deref,
2138 DRH: Deref,
2139 CMH: Deref,
2140 > BaseMessageHandler for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
2141where
2142 ES::Target: EntropySource,
2143 NS::Target: NodeSigner,
2144 L::Target: Logger,
2145 NL::Target: NodeIdLookUp,
2146 MR::Target: MessageRouter,
2147 OMH::Target: OffersMessageHandler,
2148 APH::Target: AsyncPaymentsMessageHandler,
2149 DRH::Target: DNSResolverMessageHandler,
2150 CMH::Target: CustomOnionMessageHandler,
2151{
2152 fn provided_node_features(&self) -> NodeFeatures {
2153 let mut features = NodeFeatures::empty();
2154 features.set_onion_messages_optional();
2155 features | self.dns_resolver_handler.provided_node_features()
2156 }
2157
2158 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
2159 let mut features = InitFeatures::empty();
2160 features.set_onion_messages_optional();
2161 features
2162 }
2163
2164 fn peer_connected(
2165 &self, their_node_id: PublicKey, init: &msgs::Init, _inbound: bool,
2166 ) -> Result<(), ()> {
2167 if init.features.supports_onion_messages() {
2168 {
2169 let mut message_recipients = self.message_recipients.lock().unwrap();
2170 message_recipients
2171 .entry(their_node_id)
2172 .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
2173 .mark_connected();
2174 }
2175 if self.intercept_messages_for_offline_peers {
2176 let mut pending_peer_connected_events =
2177 self.pending_peer_connected_events.lock().unwrap();
2178 pending_peer_connected_events
2179 .push(Event::OnionMessagePeerConnected { peer_node_id: their_node_id });
2180 self.event_notifier.notify();
2181 }
2182 } else {
2183 self.message_recipients.lock().unwrap().remove(&their_node_id);
2184 }
2185
2186 Ok(())
2187 }
2188
2189 fn peer_disconnected(&self, their_node_id: PublicKey) {
2190 match self.message_recipients.lock().unwrap().remove(&their_node_id) {
2191 Some(OnionMessageRecipient::ConnectedPeer(..)) => {},
2192 Some(_) => debug_assert!(false),
2193 None => {},
2194 }
2195 }
2196
2197 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
2198 Vec::new()
2199 }
2200}
2201
2202impl<
2203 ES: Deref,
2204 NS: Deref,
2205 L: Deref,
2206 NL: Deref,
2207 MR: Deref,
2208 OMH: Deref,
2209 APH: Deref,
2210 DRH: Deref,
2211 CMH: Deref,
2212 > OnionMessageHandler for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
2213where
2214 ES::Target: EntropySource,
2215 NS::Target: NodeSigner,
2216 L::Target: Logger,
2217 NL::Target: NodeIdLookUp,
2218 MR::Target: MessageRouter,
2219 OMH::Target: OffersMessageHandler,
2220 APH::Target: AsyncPaymentsMessageHandler,
2221 DRH::Target: DNSResolverMessageHandler,
2222 CMH::Target: CustomOnionMessageHandler,
2223{
2224 fn handle_onion_message(&self, peer_node_id: PublicKey, msg: &OnionMessage) {
2225 let logger = WithContext::from(&self.logger, Some(peer_node_id), None, None);
2226 macro_rules! log_receive {
2227 ($message: expr, $with_reply_path: expr) => {
2228 log_trace!(
2229 logger,
2230 "Received an onion message with {} reply_path: {:?}",
2231 if $with_reply_path { "a" } else { "no" },
2232 $message
2233 );
2234 };
2235 }
2236
2237 match self.peel_onion_message(msg) {
2238 Ok(PeeledOnion::Offers(message, context, reply_path)) => {
2239 log_receive!(message, reply_path.is_some());
2240 let responder = reply_path.map(Responder::new);
2241 let response_instructions =
2242 self.offers_handler.handle_message(message, context, responder);
2243 if let Some((msg, instructions)) = response_instructions {
2244 let _ = self.handle_onion_message_response(msg, instructions);
2245 }
2246 },
2247 Ok(PeeledOnion::AsyncPayments(message, context, reply_path)) => {
2248 log_receive!(message, reply_path.is_some());
2249 let responder = reply_path.map(Responder::new);
2250 match message {
2251 AsyncPaymentsMessage::OfferPathsRequest(msg) => {
2252 let response_instructions = self
2253 .async_payments_handler
2254 .handle_offer_paths_request(msg, context, responder);
2255 if let Some((msg, instructions)) = response_instructions {
2256 let _ = self.handle_onion_message_response(msg, instructions);
2257 }
2258 },
2259 AsyncPaymentsMessage::OfferPaths(msg) => {
2260 let response_instructions =
2261 self.async_payments_handler.handle_offer_paths(msg, context, responder);
2262 if let Some((msg, instructions)) = response_instructions {
2263 let _ = self.handle_onion_message_response(msg, instructions);
2264 }
2265 },
2266 AsyncPaymentsMessage::ServeStaticInvoice(msg) => {
2267 self.async_payments_handler
2268 .handle_serve_static_invoice(msg, context, responder);
2269 },
2270 AsyncPaymentsMessage::StaticInvoicePersisted(msg) => {
2271 self.async_payments_handler.handle_static_invoice_persisted(msg, context);
2272 },
2273 AsyncPaymentsMessage::HeldHtlcAvailable(msg) => {
2274 let response_instructions = self
2275 .async_payments_handler
2276 .handle_held_htlc_available(msg, context, responder);
2277 if let Some((msg, instructions)) = response_instructions {
2278 let _ = self.handle_onion_message_response(msg, instructions);
2279 }
2280 },
2281 AsyncPaymentsMessage::ReleaseHeldHtlc(msg) => {
2282 self.async_payments_handler.handle_release_held_htlc(msg, context);
2283 },
2284 }
2285 },
2286 Ok(PeeledOnion::DNSResolver(message, context, reply_path)) => {
2287 log_receive!(message, reply_path.is_some());
2288 let responder = reply_path.map(Responder::new);
2289 match message {
2290 DNSResolverMessage::DNSSECQuery(msg) => {
2291 if context.is_some() {
2292 log_trace!(
2293 logger,
2294 "Ignoring DNSSECQuery onion message with unexpected context: {:?}",
2295 context.unwrap()
2296 );
2297 return;
2298 }
2299 let response_instructions =
2300 self.dns_resolver_handler.handle_dnssec_query(msg, responder);
2301 if let Some((msg, instructions)) = response_instructions {
2302 let _ = self.handle_onion_message_response(msg, instructions);
2303 }
2304 },
2305 DNSResolverMessage::DNSSECProof(msg) => {
2306 let context = match context {
2307 Some(ctx) => ctx,
2308 None => {
2309 log_trace!(
2310 logger,
2311 "Ignoring DNSSECProof onion message due to missing context"
2312 );
2313 return;
2314 },
2315 };
2316 self.dns_resolver_handler.handle_dnssec_proof(msg, context);
2317 },
2318 }
2319 },
2320 Ok(PeeledOnion::Custom(message, context, reply_path)) => {
2321 log_receive!(message, reply_path.is_some());
2322 let responder = reply_path.map(Responder::new);
2323 let response_instructions =
2324 self.custom_handler.handle_custom_message(message, context, responder);
2325 if let Some((msg, instructions)) = response_instructions {
2326 let _ = self.handle_onion_message_response(msg, instructions);
2327 }
2328 },
2329 Ok(PeeledOnion::Forward(next_hop, onion_message)) => {
2330 let _ = self.enqueue_forwarded_onion_message(
2331 next_hop,
2332 onion_message,
2333 format_args!("when forwarding peeled onion message from {}", peer_node_id),
2334 );
2335 },
2336 Err(e) => {
2337 log_error!(logger, "Failed to process onion message {:?}", e);
2338 },
2339 }
2340 }
2341
2342 fn timer_tick_occurred(&self) {
2343 let mut message_recipients = self.message_recipients.lock().unwrap();
2344
2345 message_recipients.retain(|_, recipient| match recipient {
2348 OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS,
2349 OnionMessageRecipient::PendingConnection(_, Some(_), _) => true,
2350 _ => true,
2351 });
2352
2353 for recipient in message_recipients.values_mut() {
2356 if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient {
2357 *ticks += 1;
2358 }
2359 }
2360 }
2361
2362 fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<OnionMessage> {
2366 self.enqueue_messages_from_handlers();
2367
2368 let mut message_recipients = self.message_recipients.lock().unwrap();
2369 message_recipients.get_mut(&peer_node_id).and_then(|buffer| buffer.dequeue_message())
2370 }
2371}
2372
2373#[cfg(not(c_bindings))]
2383#[cfg(feature = "dnssec")]
2384pub type SimpleArcOnionMessenger<M, T, F, L> = OnionMessenger<
2385 Arc<KeysManager>,
2386 Arc<KeysManager>,
2387 Arc<L>,
2388 Arc<SimpleArcChannelManager<M, T, F, L>>,
2389 Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
2390 Arc<SimpleArcChannelManager<M, T, F, L>>,
2391 Arc<SimpleArcChannelManager<M, T, F, L>>,
2392 Arc<SimpleArcChannelManager<M, T, F, L>>,
2393 IgnoringMessageHandler,
2394>;
2395
2396#[cfg(not(c_bindings))]
2404#[cfg(not(feature = "dnssec"))]
2405pub type SimpleArcOnionMessenger<M, T, F, L> = OnionMessenger<
2406 Arc<KeysManager>,
2407 Arc<KeysManager>,
2408 Arc<L>,
2409 Arc<SimpleArcChannelManager<M, T, F, L>>,
2410 Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
2411 Arc<SimpleArcChannelManager<M, T, F, L>>,
2412 Arc<SimpleArcChannelManager<M, T, F, L>>,
2413 IgnoringMessageHandler,
2414 IgnoringMessageHandler,
2415>;
2416
2417#[cfg(not(c_bindings))]
2425#[cfg(feature = "dnssec")]
2426pub type SimpleRefOnionMessenger<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, M, T, F, L> =
2427 OnionMessenger<
2428 &'a KeysManager,
2429 &'a KeysManager,
2430 &'b L,
2431 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2432 &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
2433 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2434 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2435 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2436 IgnoringMessageHandler,
2437 >;
2438
2439#[cfg(not(c_bindings))]
2447#[cfg(not(feature = "dnssec"))]
2448pub type SimpleRefOnionMessenger<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, M, T, F, L> =
2449 OnionMessenger<
2450 &'a KeysManager,
2451 &'a KeysManager,
2452 &'b L,
2453 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2454 &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
2455 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2456 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
2457 IgnoringMessageHandler,
2458 IgnoringMessageHandler,
2459 >;
2460
2461fn packet_payloads_and_keys<
2464 T: OnionMessageContents,
2465 S: secp256k1::Signing + secp256k1::Verification,
2466>(
2467 secp_ctx: &Secp256k1<S>, unblinded_path: Vec<PublicKey>, destination: Destination, message: T,
2468 mut reply_path: Option<BlindedMessagePath>, session_priv: &SecretKey,
2469) -> Result<(Vec<(Payload<T>, [u8; 32])>, Vec<onion_utils::OnionKeys>), SendError> {
2470 let num_hops = unblinded_path.len() + destination.num_hops();
2471 let mut payloads = Vec::with_capacity(num_hops);
2472 let mut onion_packet_keys = Vec::with_capacity(num_hops);
2473
2474 let (mut intro_node_id_blinding_pt, num_blinded_hops) = match &destination {
2475 Destination::Node(_) => (None, 0),
2476 Destination::BlindedPath(path) => {
2477 let introduction_node_id = match path.introduction_node() {
2478 IntroductionNode::NodeId(pubkey) => pubkey,
2479 IntroductionNode::DirectedShortChannelId(..) => {
2480 return Err(SendError::UnresolvedIntroductionNode);
2481 },
2482 };
2483 (Some((*introduction_node_id, path.blinding_point())), path.blinded_hops().len())
2484 },
2485 };
2486 let num_unblinded_hops = num_hops - num_blinded_hops;
2487
2488 let mut unblinded_path_idx = 0;
2489 let mut blinded_path_idx = 0;
2490 let mut prev_control_tlvs_ss = None;
2491 let mut final_control_tlvs = None;
2492 utils::construct_keys_for_onion_message(
2493 secp_ctx,
2494 unblinded_path.into_iter(),
2495 destination,
2496 session_priv,
2497 |onion_packet_ss, ephemeral_pubkey, control_tlvs_ss, unblinded_pk_opt, enc_payload_opt| {
2498 if num_unblinded_hops != 0 && unblinded_path_idx < num_unblinded_hops {
2499 if let Some(ss) = prev_control_tlvs_ss.take() {
2500 payloads.push((
2501 Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
2502 next_hop: NextMessageHop::NodeId(unblinded_pk_opt.unwrap()),
2503 next_blinding_override: None,
2504 })),
2505 ss,
2506 ));
2507 }
2508 prev_control_tlvs_ss = Some(control_tlvs_ss);
2509 unblinded_path_idx += 1;
2510 } else if let Some((intro_node_id, blinding_pt)) = intro_node_id_blinding_pt.take() {
2511 if let Some(control_tlvs_ss) = prev_control_tlvs_ss.take() {
2512 payloads.push((
2513 Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
2514 next_hop: NextMessageHop::NodeId(intro_node_id),
2515 next_blinding_override: Some(blinding_pt),
2516 })),
2517 control_tlvs_ss,
2518 ));
2519 }
2520 }
2521 if blinded_path_idx < num_blinded_hops.saturating_sub(1) && enc_payload_opt.is_some() {
2522 payloads.push((
2523 Payload::Forward(ForwardControlTlvs::Blinded(enc_payload_opt.unwrap())),
2524 control_tlvs_ss,
2525 ));
2526 blinded_path_idx += 1;
2527 } else if let Some(encrypted_payload) = enc_payload_opt {
2528 final_control_tlvs = Some(ReceiveControlTlvs::Blinded(encrypted_payload));
2529 prev_control_tlvs_ss = Some(control_tlvs_ss);
2530 }
2531
2532 let (rho, mu) = onion_utils::gen_rho_mu_from_shared_secret(onion_packet_ss.as_ref());
2533 onion_packet_keys.push(onion_utils::OnionKeys {
2534 #[cfg(test)]
2535 shared_secret: onion_packet_ss,
2536 #[cfg(test)]
2537 blinding_factor: [0; 32],
2538 ephemeral_pubkey,
2539 rho,
2540 mu,
2541 });
2542 },
2543 );
2544
2545 if let Some(control_tlvs) = final_control_tlvs {
2546 payloads.push((
2547 Payload::Receive {
2548 control_tlvs,
2549 reply_path: reply_path.take(),
2550 message,
2551 control_tlvs_authenticated: false,
2552 },
2553 prev_control_tlvs_ss.unwrap(),
2554 ));
2555 } else {
2556 payloads.push((
2557 Payload::Receive {
2558 control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { context: None }),
2559 reply_path: reply_path.take(),
2560 message,
2561 control_tlvs_authenticated: false,
2562 },
2563 prev_control_tlvs_ss.unwrap(),
2564 ));
2565 }
2566
2567 Ok((payloads, onion_packet_keys))
2568}
2569
2570fn construct_onion_message_packet<T: OnionMessageContents>(
2572 payloads: Vec<(Payload<T>, [u8; 32])>, onion_keys: Vec<onion_utils::OnionKeys>,
2573 prng_seed: [u8; 32],
2574) -> Result<Packet, ()> {
2575 let payloads_ser_len = onion_utils::payloads_serialized_length(&payloads);
2580 let hop_data_len = if payloads_ser_len <= SMALL_PACKET_HOP_DATA_LEN {
2581 SMALL_PACKET_HOP_DATA_LEN
2582 } else if payloads_ser_len <= BIG_PACKET_HOP_DATA_LEN {
2583 BIG_PACKET_HOP_DATA_LEN
2584 } else {
2585 return Err(());
2586 };
2587
2588 onion_utils::construct_onion_message_packet::<_, _>(
2589 payloads,
2590 onion_keys,
2591 prng_seed,
2592 hop_data_len,
2593 )
2594}