1use bitcoin::constants::ChainHash;
19use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey};
20
21use crate::blinded_path::message::{AsyncPaymentsContext, DNSResolverContext, OffersContext};
22use crate::ln::msgs;
23use crate::ln::msgs::{
24 BaseMessageHandler, ChannelMessageHandler, Init, LightningError, MessageSendEvent,
25 OnionMessageHandler, RoutingMessageHandler, SendOnlyMessageHandler, SocketAddress,
26};
27use crate::ln::peer_channel_encryptor::{
28 MessageBuf, NextNoiseStep, PeerChannelEncryptor, MSG_BUF_ALLOC_SIZE,
29};
30use crate::ln::types::ChannelId;
31use crate::ln::wire;
32use crate::ln::wire::{Encode, Type};
33use crate::onion_message::async_payments::{
34 AsyncPaymentsMessageHandler, HeldHtlcAvailable, OfferPaths, OfferPathsRequest, ReleaseHeldHtlc,
35 ServeStaticInvoice, StaticInvoicePersisted,
36};
37use crate::onion_message::dns_resolution::{
38 DNSResolverMessage, DNSResolverMessageHandler, DNSSECProof, DNSSECQuery,
39};
40use crate::onion_message::messenger::{
41 CustomOnionMessageHandler, MessageSendInstructions, Responder, ResponseInstruction,
42};
43use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
44use crate::onion_message::packet::OnionMessageContents;
45use crate::routing::gossip::{NodeAlias, NodeId};
46use crate::sign::{NodeSigner, Recipient};
47use crate::types::features::{InitFeatures, NodeFeatures};
48use crate::types::string::PrintableString;
49use crate::util::atomic_counter::AtomicCounter;
50use crate::util::logger::{Level, Logger, WithContext};
51use crate::util::ser::{VecWriter, Writeable, Writer};
52
53#[allow(unused_imports)]
54use crate::prelude::*;
55
56use crate::io;
57use crate::sync::{FairRwLock, Mutex, MutexGuard};
58use core::convert::Infallible;
59use core::ops::Deref;
60use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering};
61use core::{cmp, fmt, hash, mem};
62#[cfg(not(c_bindings))]
63use {
64 crate::chain::chainmonitor::ChainMonitor,
65 crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager},
66 crate::onion_message::messenger::{SimpleArcOnionMessenger, SimpleRefOnionMessenger},
67 crate::routing::gossip::{NetworkGraph, P2PGossipSync},
68 crate::sign::{InMemorySigner, KeysManager},
69 crate::sync::Arc,
70};
71
72use bitcoin::hashes::sha256::Hash as Sha256;
73use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
74use bitcoin::hashes::{Hash, HashEngine};
75
76pub trait CustomMessageHandler: wire::CustomMessageReader {
85 fn handle_custom_message(
89 &self, msg: Self::CustomMessage, sender_node_id: PublicKey,
90 ) -> Result<(), LightningError>;
91
92 fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>;
96
97 fn peer_disconnected(&self, their_node_id: PublicKey);
99
100 fn peer_connected(&self, their_node_id: PublicKey, msg: &Init, inbound: bool)
110 -> Result<(), ()>;
111
112 fn provided_node_features(&self) -> NodeFeatures;
118
119 fn provided_init_features(&self, their_node_id: PublicKey) -> InitFeatures;
125}
126
127pub struct IgnoringMessageHandler {}
130impl BaseMessageHandler for IgnoringMessageHandler {
131 fn peer_disconnected(&self, _their_node_id: PublicKey) {}
132 fn peer_connected(
133 &self, _their_node_id: PublicKey, _init: &msgs::Init, _inbound: bool,
134 ) -> Result<(), ()> {
135 Ok(())
136 }
137 fn provided_node_features(&self) -> NodeFeatures {
138 NodeFeatures::empty()
139 }
140 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
141 InitFeatures::empty()
142 }
143 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
144 Vec::new()
145 }
146}
147impl RoutingMessageHandler for IgnoringMessageHandler {
148 fn handle_node_announcement(
149 &self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement,
150 ) -> Result<bool, LightningError> {
151 Ok(false)
152 }
153 fn handle_channel_announcement(
154 &self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement,
155 ) -> Result<bool, LightningError> {
156 Ok(false)
157 }
158 fn handle_channel_update(
159 &self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate,
160 ) -> Result<bool, LightningError> {
161 Ok(false)
162 }
163 fn get_next_channel_announcement(
164 &self, _starting_point: u64,
165 ) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)>
166 {
167 None
168 }
169 fn get_next_node_announcement(
170 &self, _starting_point: Option<&NodeId>,
171 ) -> Option<msgs::NodeAnnouncement> {
172 None
173 }
174 fn handle_reply_channel_range(
175 &self, _their_node_id: PublicKey, _msg: msgs::ReplyChannelRange,
176 ) -> Result<(), LightningError> {
177 Ok(())
178 }
179 fn handle_reply_short_channel_ids_end(
180 &self, _their_node_id: PublicKey, _msg: msgs::ReplyShortChannelIdsEnd,
181 ) -> Result<(), LightningError> {
182 Ok(())
183 }
184 fn handle_query_channel_range(
185 &self, _their_node_id: PublicKey, _msg: msgs::QueryChannelRange,
186 ) -> Result<(), LightningError> {
187 Ok(())
188 }
189 fn handle_query_short_channel_ids(
190 &self, _their_node_id: PublicKey, _msg: msgs::QueryShortChannelIds,
191 ) -> Result<(), LightningError> {
192 Ok(())
193 }
194 fn processing_queue_high(&self) -> bool {
195 false
196 }
197}
198
199impl OnionMessageHandler for IgnoringMessageHandler {
200 fn handle_onion_message(&self, _their_node_id: PublicKey, _msg: &msgs::OnionMessage) {}
201 fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> {
202 None
203 }
204 fn timer_tick_occurred(&self) {}
205}
206
207impl OffersMessageHandler for IgnoringMessageHandler {
208 fn handle_message(
209 &self, _message: OffersMessage, _context: Option<OffersContext>,
210 _responder: Option<Responder>,
211 ) -> Option<(OffersMessage, ResponseInstruction)> {
212 None
213 }
214}
215impl AsyncPaymentsMessageHandler for IgnoringMessageHandler {
216 fn handle_offer_paths_request(
217 &self, _message: OfferPathsRequest, _context: AsyncPaymentsContext,
218 _responder: Option<Responder>,
219 ) -> Option<(OfferPaths, ResponseInstruction)> {
220 None
221 }
222 fn handle_offer_paths(
223 &self, _message: OfferPaths, _context: AsyncPaymentsContext, _responder: Option<Responder>,
224 ) -> Option<(ServeStaticInvoice, ResponseInstruction)> {
225 None
226 }
227 fn handle_serve_static_invoice(
228 &self, _message: ServeStaticInvoice, _context: AsyncPaymentsContext,
229 _responder: Option<Responder>,
230 ) {
231 }
232 fn handle_static_invoice_persisted(
233 &self, _message: StaticInvoicePersisted, _context: AsyncPaymentsContext,
234 ) {
235 }
236 fn handle_held_htlc_available(
237 &self, _message: HeldHtlcAvailable, _context: AsyncPaymentsContext,
238 _responder: Option<Responder>,
239 ) -> Option<(ReleaseHeldHtlc, ResponseInstruction)> {
240 None
241 }
242 fn handle_release_held_htlc(&self, _message: ReleaseHeldHtlc, _context: AsyncPaymentsContext) {}
243}
244impl DNSResolverMessageHandler for IgnoringMessageHandler {
245 fn handle_dnssec_query(
246 &self, _message: DNSSECQuery, _responder: Option<Responder>,
247 ) -> Option<(DNSResolverMessage, ResponseInstruction)> {
248 None
249 }
250 fn handle_dnssec_proof(&self, _message: DNSSECProof, _context: DNSResolverContext) {}
251}
252impl CustomOnionMessageHandler for IgnoringMessageHandler {
253 type CustomMessage = Infallible;
254 fn handle_custom_message(
255 &self, _message: Infallible, _context: Option<Vec<u8>>, _responder: Option<Responder>,
256 ) -> Option<(Infallible, ResponseInstruction)> {
257 unreachable!();
259 }
260 fn read_custom_message<R: io::Read>(
261 &self, _msg_type: u64, _buffer: &mut R,
262 ) -> Result<Option<Infallible>, msgs::DecodeError>
263 where
264 Self: Sized,
265 {
266 Ok(None)
267 }
268 fn release_pending_custom_messages(&self) -> Vec<(Infallible, MessageSendInstructions)> {
269 vec![]
270 }
271}
272
273impl SendOnlyMessageHandler for IgnoringMessageHandler {}
274
275impl OnionMessageContents for Infallible {
276 fn tlv_type(&self) -> u64 {
277 unreachable!();
278 }
279 #[cfg(c_bindings)]
280 fn msg_type(&self) -> String {
281 unreachable!();
282 }
283 #[cfg(not(c_bindings))]
284 fn msg_type(&self) -> &'static str {
285 unreachable!();
286 }
287}
288
289impl Deref for IgnoringMessageHandler {
290 type Target = IgnoringMessageHandler;
291 fn deref(&self) -> &Self {
292 self
293 }
294}
295
296impl wire::Type for Infallible {
299 fn type_id(&self) -> u16 {
300 unreachable!();
301 }
302}
303impl Writeable for Infallible {
304 fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
305 unreachable!();
306 }
307}
308
309impl wire::CustomMessageReader for IgnoringMessageHandler {
310 type CustomMessage = Infallible;
311 fn read<R: io::Read>(
312 &self, _message_type: u16, _buffer: &mut R,
313 ) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
314 Ok(None)
315 }
316}
317
318impl CustomMessageHandler for IgnoringMessageHandler {
319 fn handle_custom_message(
320 &self, _msg: Infallible, _sender_node_id: PublicKey,
321 ) -> Result<(), LightningError> {
322 unreachable!();
324 }
325
326 fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
327 Vec::new()
328 }
329
330 fn peer_disconnected(&self, _their_node_id: PublicKey) {}
331
332 fn peer_connected(
333 &self, _their_node_id: PublicKey, _msg: &Init, _inbound: bool,
334 ) -> Result<(), ()> {
335 Ok(())
336 }
337
338 fn provided_node_features(&self) -> NodeFeatures {
339 NodeFeatures::empty()
340 }
341
342 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
343 InitFeatures::empty()
344 }
345}
346
347pub struct ErroringMessageHandler {
350 message_queue: Mutex<Vec<MessageSendEvent>>,
351}
352impl ErroringMessageHandler {
353 pub fn new() -> Self {
355 Self { message_queue: Mutex::new(Vec::new()) }
356 }
357 fn push_error(&self, node_id: PublicKey, channel_id: ChannelId) {
358 self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError {
359 action: msgs::ErrorAction::SendErrorMessage {
360 msg: msgs::ErrorMessage {
361 channel_id,
362 data: "We do not support channel messages, sorry.".to_owned(),
363 },
364 },
365 node_id,
366 });
367 }
368}
369impl BaseMessageHandler for ErroringMessageHandler {
370 fn peer_disconnected(&self, _their_node_id: PublicKey) {}
371 fn peer_connected(
372 &self, _their_node_id: PublicKey, _init: &msgs::Init, _inbound: bool,
373 ) -> Result<(), ()> {
374 Ok(())
375 }
376 fn provided_node_features(&self) -> NodeFeatures {
377 NodeFeatures::empty()
378 }
379 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
380 let mut features = InitFeatures::empty();
384 features.set_data_loss_protect_optional();
385 features.set_upfront_shutdown_script_optional();
386 features.set_variable_length_onion_optional();
387 features.set_static_remote_key_optional();
388 features.set_payment_secret_optional();
389 features.set_basic_mpp_optional();
390 features.set_wumbo_optional();
391 features.set_shutdown_any_segwit_optional();
392 features.set_dual_fund_optional();
393 features.set_channel_type_optional();
394 features.set_scid_privacy_optional();
395 features.set_zero_conf_optional();
396 features.set_route_blinding_optional();
397 #[cfg(simple_close)]
398 features.set_simple_close_optional();
399 features
400 }
401
402 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
403 let mut res = Vec::new();
404 mem::swap(&mut res, &mut self.message_queue.lock().unwrap());
405 res
406 }
407}
408impl ChannelMessageHandler for ErroringMessageHandler {
409 fn handle_open_channel(&self, their_node_id: PublicKey, msg: &msgs::OpenChannel) {
412 ErroringMessageHandler::push_error(
413 self,
414 their_node_id,
415 msg.common_fields.temporary_channel_id,
416 );
417 }
418 fn handle_accept_channel(&self, their_node_id: PublicKey, msg: &msgs::AcceptChannel) {
419 ErroringMessageHandler::push_error(
420 self,
421 their_node_id,
422 msg.common_fields.temporary_channel_id,
423 );
424 }
425 fn handle_funding_created(&self, their_node_id: PublicKey, msg: &msgs::FundingCreated) {
426 ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
427 }
428 fn handle_funding_signed(&self, their_node_id: PublicKey, msg: &msgs::FundingSigned) {
429 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
430 }
431 fn handle_channel_ready(&self, their_node_id: PublicKey, msg: &msgs::ChannelReady) {
432 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
433 }
434 fn handle_shutdown(&self, their_node_id: PublicKey, msg: &msgs::Shutdown) {
435 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
436 }
437 fn handle_closing_signed(&self, their_node_id: PublicKey, msg: &msgs::ClosingSigned) {
438 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
439 }
440 #[cfg(simple_close)]
441 fn handle_closing_complete(&self, their_node_id: PublicKey, msg: msgs::ClosingComplete) {
442 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
443 }
444 #[cfg(simple_close)]
445 fn handle_closing_sig(&self, their_node_id: PublicKey, msg: msgs::ClosingSig) {
446 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
447 }
448 fn handle_stfu(&self, their_node_id: PublicKey, msg: &msgs::Stfu) {
449 ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
450 }
451 fn handle_splice_init(&self, their_node_id: PublicKey, msg: &msgs::SpliceInit) {
452 ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
453 }
454 fn handle_splice_ack(&self, their_node_id: PublicKey, msg: &msgs::SpliceAck) {
455 ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
456 }
457 fn handle_splice_locked(&self, their_node_id: PublicKey, msg: &msgs::SpliceLocked) {
458 ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
459 }
460 fn handle_update_add_htlc(&self, their_node_id: PublicKey, msg: &msgs::UpdateAddHTLC) {
461 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
462 }
463 fn handle_update_fulfill_htlc(&self, their_node_id: PublicKey, msg: msgs::UpdateFulfillHTLC) {
464 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
465 }
466 fn handle_update_fail_htlc(&self, their_node_id: PublicKey, msg: &msgs::UpdateFailHTLC) {
467 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
468 }
469 fn handle_update_fail_malformed_htlc(
470 &self, their_node_id: PublicKey, msg: &msgs::UpdateFailMalformedHTLC,
471 ) {
472 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
473 }
474 fn handle_commitment_signed(&self, their_node_id: PublicKey, msg: &msgs::CommitmentSigned) {
475 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
476 }
477 fn handle_commitment_signed_batch(
478 &self, their_node_id: PublicKey, channel_id: ChannelId, _batch: Vec<msgs::CommitmentSigned>,
479 ) {
480 ErroringMessageHandler::push_error(self, their_node_id, channel_id);
481 }
482 fn handle_revoke_and_ack(&self, their_node_id: PublicKey, msg: &msgs::RevokeAndACK) {
483 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
484 }
485 fn handle_update_fee(&self, their_node_id: PublicKey, msg: &msgs::UpdateFee) {
486 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
487 }
488 fn handle_announcement_signatures(
489 &self, their_node_id: PublicKey, msg: &msgs::AnnouncementSignatures,
490 ) {
491 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
492 }
493 fn handle_channel_reestablish(&self, their_node_id: PublicKey, msg: &msgs::ChannelReestablish) {
494 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
495 }
496 fn handle_channel_update(&self, _their_node_id: PublicKey, _msg: &msgs::ChannelUpdate) {}
498
499 fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: msgs::PeerStorage) {}
500 fn handle_peer_storage_retrieval(
501 &self, _their_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval,
502 ) {
503 }
504
505 fn handle_error(&self, _their_node_id: PublicKey, _msg: &msgs::ErrorMessage) {}
506
507 fn get_chain_hashes(&self) -> Option<Vec<ChainHash>> {
508 None
512 }
513
514 fn handle_open_channel_v2(&self, their_node_id: PublicKey, msg: &msgs::OpenChannelV2) {
515 ErroringMessageHandler::push_error(
516 self,
517 their_node_id,
518 msg.common_fields.temporary_channel_id,
519 );
520 }
521
522 fn handle_accept_channel_v2(&self, their_node_id: PublicKey, msg: &msgs::AcceptChannelV2) {
523 ErroringMessageHandler::push_error(
524 self,
525 their_node_id,
526 msg.common_fields.temporary_channel_id,
527 );
528 }
529
530 fn handle_tx_add_input(&self, their_node_id: PublicKey, msg: &msgs::TxAddInput) {
531 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
532 }
533
534 fn handle_tx_add_output(&self, their_node_id: PublicKey, msg: &msgs::TxAddOutput) {
535 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
536 }
537
538 fn handle_tx_remove_input(&self, their_node_id: PublicKey, msg: &msgs::TxRemoveInput) {
539 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
540 }
541
542 fn handle_tx_remove_output(&self, their_node_id: PublicKey, msg: &msgs::TxRemoveOutput) {
543 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
544 }
545
546 fn handle_tx_complete(&self, their_node_id: PublicKey, msg: &msgs::TxComplete) {
547 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
548 }
549
550 fn handle_tx_signatures(&self, their_node_id: PublicKey, msg: &msgs::TxSignatures) {
551 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
552 }
553
554 fn handle_tx_init_rbf(&self, their_node_id: PublicKey, msg: &msgs::TxInitRbf) {
555 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
556 }
557
558 fn handle_tx_ack_rbf(&self, their_node_id: PublicKey, msg: &msgs::TxAckRbf) {
559 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
560 }
561
562 fn handle_tx_abort(&self, their_node_id: PublicKey, msg: &msgs::TxAbort) {
563 ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
564 }
565
566 fn message_received(&self) {}
567}
568
569impl Deref for ErroringMessageHandler {
570 type Target = ErroringMessageHandler;
571 fn deref(&self) -> &Self {
572 self
573 }
574}
575
576pub struct MessageHandler<CM: Deref, RM: Deref, OM: Deref, CustomM: Deref, SM: Deref>
578where
579 CM::Target: ChannelMessageHandler,
580 RM::Target: RoutingMessageHandler,
581 OM::Target: OnionMessageHandler,
582 CustomM::Target: CustomMessageHandler,
583 SM::Target: SendOnlyMessageHandler,
584{
585 pub chan_handler: CM,
590 pub route_handler: RM,
595
596 pub onion_message_handler: OM,
601
602 pub custom_message_handler: CustomM,
605
606 pub send_only_message_handler: SM,
612}
613
614pub trait SocketDescriptor: cmp::Eq + hash::Hash + Clone {
627 fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize;
644 fn disconnect_socket(&mut self);
649}
650
651pub struct PeerDetails {
653 pub counterparty_node_id: PublicKey,
658 pub socket_address: Option<SocketAddress>,
663 pub init_features: InitFeatures,
665 pub is_inbound_connection: bool,
669}
670
671#[derive(Clone)]
675pub struct PeerHandleError {}
676impl fmt::Debug for PeerHandleError {
677 fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
678 formatter.write_str("Peer Sent Invalid Data")
679 }
680}
681impl fmt::Display for PeerHandleError {
682 fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
683 formatter.write_str("Peer Sent Invalid Data")
684 }
685}
686
687enum InitSyncTracker {
689 NoSyncRequested,
693 ChannelsSyncing(u64),
698 NodesSyncing(NodeId),
700}
701
702struct MessageBatch {
704 channel_id: ChannelId,
706
707 batch_size: usize,
709
710 messages: MessageBatchImpl,
712}
713
714enum MessageBatchImpl {
716 CommitmentSigned(Vec<msgs::CommitmentSigned>),
718}
719
720const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12;
725
726const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
741
742const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
750
751const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP: usize = 64 * 1024 * 2;
761
762struct Peer {
763 channel_encryptor: PeerChannelEncryptor,
764 their_node_id: Option<(PublicKey, NodeId)>,
767 their_features: Option<InitFeatures>,
774 their_socket_address: Option<SocketAddress>,
775
776 pending_outbound_buffer: VecDeque<Vec<u8>>,
777 pending_outbound_buffer_first_msg_offset: usize,
778 gossip_broadcast_buffer: VecDeque<MessageBuf>,
783 awaiting_write_event: bool,
784 sent_pause_read: bool,
787
788 pending_read_buffer: Vec<u8>,
789 pending_read_buffer_pos: usize,
790 pending_read_is_header: bool,
791
792 sync_status: InitSyncTracker,
793
794 msgs_sent_since_pong: usize,
795 awaiting_pong_timer_tick_intervals: i64,
796 received_message_since_timer_tick: bool,
797 sent_gossip_timestamp_filter: bool,
798
799 received_channel_announce_since_backlogged: bool,
804
805 inbound_connection: bool,
806
807 message_batch: Option<MessageBatch>,
808}
809
810impl Peer {
811 fn handshake_complete(&self) -> bool {
815 self.their_features.is_some()
816 }
817
818 fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
825 if !self.handshake_complete() {
826 return false;
827 }
828 if self.their_features.as_ref().unwrap().supports_gossip_queries()
829 && !self.sent_gossip_timestamp_filter
830 {
831 return false;
832 }
833 match self.sync_status {
834 InitSyncTracker::NoSyncRequested => true,
835 InitSyncTracker::ChannelsSyncing(i) => channel_id < i,
836 InitSyncTracker::NodesSyncing(_) => true,
837 }
838 }
839
840 fn should_forward_node_announcement(&self, node_id: NodeId) -> bool {
842 if !self.handshake_complete() {
843 return false;
844 }
845 if self.their_features.as_ref().unwrap().supports_gossip_queries()
846 && !self.sent_gossip_timestamp_filter
847 {
848 return false;
849 }
850 match self.sync_status {
851 InitSyncTracker::NoSyncRequested => true,
852 InitSyncTracker::ChannelsSyncing(_) => false,
853 InitSyncTracker::NodesSyncing(sync_node_id) => {
854 sync_node_id.as_slice() < node_id.as_slice()
855 },
856 }
857 }
858
859 fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
862 if !gossip_processing_backlogged {
863 self.received_channel_announce_since_backlogged = false;
864 }
865 self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
866 && (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
867 }
868
869 fn should_buffer_gossip_backfill(&self) -> bool {
872 self.pending_outbound_buffer.is_empty()
873 && self.gossip_broadcast_buffer.is_empty()
874 && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
875 && self.handshake_complete()
876 }
877
878 fn should_buffer_onion_message(&self) -> bool {
881 self.pending_outbound_buffer.is_empty()
882 && self.handshake_complete()
883 && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
884 }
885
886 fn should_buffer_gossip_broadcast(&self) -> bool {
889 self.pending_outbound_buffer.is_empty()
890 && self.handshake_complete()
891 && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
892 }
893
894 fn buffer_full_drop_gossip_broadcast(&self) -> bool {
896 let total_outbound_buffered: usize =
897 self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>()
898 + self.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>();
899
900 total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP
901 }
902
903 fn set_their_node_id(&mut self, node_id: PublicKey) {
904 self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id)));
905 }
906}
907
908#[cfg(not(c_bindings))]
916pub type SimpleArcPeerManager<SD, M, T, F, C, L, CF, S> = PeerManager<
917 SD,
918 Arc<SimpleArcChannelManager<M, T, F, L>>,
919 Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, C, Arc<L>>>,
920 Arc<SimpleArcOnionMessenger<M, T, F, L>>,
921 Arc<L>,
922 IgnoringMessageHandler,
923 Arc<KeysManager>,
924 Arc<ChainMonitor<InMemorySigner, Arc<CF>, Arc<T>, Arc<F>, Arc<L>, Arc<S>, Arc<KeysManager>>>,
925>;
926
927#[cfg(not(c_bindings))]
936#[rustfmt::skip]
937pub type SimpleRefPeerManager<
938 'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, 'mr, SD, M, T, F, C, L
939> = PeerManager<
940 SD,
941 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, 'mr, M, T, F, L>,
942 &'f P2PGossipSync<&'graph NetworkGraph<&'logger L>, C, &'logger L>,
943 &'h SimpleRefOnionMessenger<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, 'j, 'k, M, T, F, L>,
944 &'logger L,
945 IgnoringMessageHandler,
946 &'c KeysManager,
947 &'j ChainMonitor<&'a M, C, &'b T, &'c F, &'logger L, &'c KeysManager, &'c KeysManager>,
948>;
949
950#[allow(missing_docs)]
957pub trait APeerManager {
958 type Descriptor: SocketDescriptor;
959 type CMT: ChannelMessageHandler + ?Sized;
960 type CM: Deref<Target = Self::CMT>;
961 type RMT: RoutingMessageHandler + ?Sized;
962 type RM: Deref<Target = Self::RMT>;
963 type OMT: OnionMessageHandler + ?Sized;
964 type OM: Deref<Target = Self::OMT>;
965 type LT: Logger + ?Sized;
966 type L: Deref<Target = Self::LT>;
967 type CMHT: CustomMessageHandler + ?Sized;
968 type CMH: Deref<Target = Self::CMHT>;
969 type NST: NodeSigner + ?Sized;
970 type NS: Deref<Target = Self::NST>;
971 type SMT: SendOnlyMessageHandler + ?Sized;
972 type SM: Deref<Target = Self::SMT>;
973 fn as_ref(
975 &self,
976 ) -> &PeerManager<
977 Self::Descriptor,
978 Self::CM,
979 Self::RM,
980 Self::OM,
981 Self::L,
982 Self::CMH,
983 Self::NS,
984 Self::SM,
985 >;
986}
987
988impl<
989 Descriptor: SocketDescriptor,
990 CM: Deref,
991 RM: Deref,
992 OM: Deref,
993 L: Deref,
994 CMH: Deref,
995 NS: Deref,
996 SM: Deref,
997 > APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS, SM>
998where
999 CM::Target: ChannelMessageHandler,
1000 RM::Target: RoutingMessageHandler,
1001 OM::Target: OnionMessageHandler,
1002 L::Target: Logger,
1003 CMH::Target: CustomMessageHandler,
1004 NS::Target: NodeSigner,
1005 SM::Target: SendOnlyMessageHandler,
1006{
1007 type Descriptor = Descriptor;
1008 type CMT = <CM as Deref>::Target;
1009 type CM = CM;
1010 type RMT = <RM as Deref>::Target;
1011 type RM = RM;
1012 type OMT = <OM as Deref>::Target;
1013 type OM = OM;
1014 type LT = <L as Deref>::Target;
1015 type L = L;
1016 type CMHT = <CMH as Deref>::Target;
1017 type CMH = CMH;
1018 type NST = <NS as Deref>::Target;
1019 type NS = NS;
1020 type SMT = <SM as Deref>::Target;
1021 type SM = SM;
1022 fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS, SM> {
1023 self
1024 }
1025}
1026
1027pub struct PeerManager<
1047 Descriptor: SocketDescriptor,
1048 CM: Deref,
1049 RM: Deref,
1050 OM: Deref,
1051 L: Deref,
1052 CMH: Deref,
1053 NS: Deref,
1054 SM: Deref,
1055> where
1056 CM::Target: ChannelMessageHandler,
1057 RM::Target: RoutingMessageHandler,
1058 OM::Target: OnionMessageHandler,
1059 L::Target: Logger,
1060 CMH::Target: CustomMessageHandler,
1061 NS::Target: NodeSigner,
1062 SM::Target: SendOnlyMessageHandler,
1063{
1064 message_handler: MessageHandler<CM, RM, OM, CMH, SM>,
1065 peers: FairRwLock<HashMap<Descriptor, Mutex<Peer>>>,
1074 node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
1079 event_processing_state: AtomicI32,
1091
1092 last_node_announcement_serial: AtomicU32,
1095
1096 ephemeral_key_midstate: Sha256Engine,
1097
1098 peer_counter: AtomicCounter,
1099
1100 gossip_processing_backlogged: AtomicBool,
1101 gossip_processing_backlog_lifted: AtomicBool,
1102
1103 node_signer: NS,
1104
1105 logger: L,
1106 secp_ctx: Secp256k1<secp256k1::SignOnly>,
1107}
1108
1109enum LogicalMessage<T: core::fmt::Debug + wire::Type + wire::TestEq> {
1110 FromWire(wire::Message<T>),
1111 CommitmentSignedBatch(ChannelId, Vec<msgs::CommitmentSigned>),
1112}
1113
1114enum MessageHandlingError {
1115 PeerHandleError(PeerHandleError),
1116 LightningError(LightningError),
1117}
1118
1119impl From<PeerHandleError> for MessageHandlingError {
1120 fn from(error: PeerHandleError) -> Self {
1121 MessageHandlingError::PeerHandleError(error)
1122 }
1123}
1124
1125impl From<LightningError> for MessageHandlingError {
1126 fn from(error: LightningError) -> Self {
1127 MessageHandlingError::LightningError(error)
1128 }
1129}
1130
1131macro_rules! encode_msg {
1132 ($msg: expr) => {{
1133 let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE));
1134 wire::write($msg, &mut buffer).unwrap();
1135 buffer.0
1136 }};
1137}
1138
1139impl<Descriptor: SocketDescriptor, CM: Deref, OM: Deref, L: Deref, NS: Deref, SM: Deref>
1140 PeerManager<Descriptor, CM, IgnoringMessageHandler, OM, L, IgnoringMessageHandler, NS, SM>
1141where
1142 CM::Target: ChannelMessageHandler,
1143 OM::Target: OnionMessageHandler,
1144 L::Target: Logger,
1145 NS::Target: NodeSigner,
1146 SM::Target: SendOnlyMessageHandler,
1147{
1148 pub fn new_channel_only(
1162 channel_message_handler: CM, onion_message_handler: OM, current_time: u32,
1163 ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS,
1164 send_only_message_handler: SM,
1165 ) -> Self {
1166 Self::new(
1167 MessageHandler {
1168 chan_handler: channel_message_handler,
1169 route_handler: IgnoringMessageHandler {},
1170 onion_message_handler,
1171 custom_message_handler: IgnoringMessageHandler {},
1172 send_only_message_handler,
1173 },
1174 current_time,
1175 ephemeral_random_data,
1176 logger,
1177 node_signer,
1178 )
1179 }
1180}
1181
1182impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref, NS: Deref>
1183 PeerManager<
1184 Descriptor,
1185 ErroringMessageHandler,
1186 RM,
1187 IgnoringMessageHandler,
1188 L,
1189 IgnoringMessageHandler,
1190 NS,
1191 IgnoringMessageHandler,
1192 > where
1193 RM::Target: RoutingMessageHandler,
1194 L::Target: Logger,
1195 NS::Target: NodeSigner,
1196{
1197 pub fn new_routing_only(
1212 routing_message_handler: RM, current_time: u32, ephemeral_random_data: &[u8; 32],
1213 logger: L, node_signer: NS,
1214 ) -> Self {
1215 Self::new(
1216 MessageHandler {
1217 chan_handler: ErroringMessageHandler::new(),
1218 route_handler: routing_message_handler,
1219 onion_message_handler: IgnoringMessageHandler {},
1220 custom_message_handler: IgnoringMessageHandler {},
1221 send_only_message_handler: IgnoringMessageHandler {},
1222 },
1223 current_time,
1224 ephemeral_random_data,
1225 logger,
1226 node_signer,
1227 )
1228 }
1229}
1230
1231struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>);
1236impl core::fmt::Display for OptionalFromDebugger<'_> {
1237 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
1238 if let Some((node_id, _)) = self.0 {
1239 write!(f, " from {}", node_id)
1240 } else {
1241 Ok(())
1242 }
1243 }
1244}
1245
1246fn filter_addresses(ip_address: Option<SocketAddress>) -> Option<SocketAddress> {
1250 match ip_address {
1251 Some(SocketAddress::TcpIpV4 { addr: [10, _, _, _], port: _ }) => None,
1253 Some(SocketAddress::TcpIpV4 { addr: [0, _, _, _], port: _ }) => None,
1255 Some(SocketAddress::TcpIpV4 { addr: [100, 64..=127, _, _], port: _ }) => None,
1257 Some(SocketAddress::TcpIpV4 { addr: [127, _, _, _], port: _ }) => None,
1259 Some(SocketAddress::TcpIpV4 { addr: [169, 254, _, _], port: _ }) => None,
1261 Some(SocketAddress::TcpIpV4 { addr: [172, 16..=31, _, _], port: _ }) => None,
1263 Some(SocketAddress::TcpIpV4 { addr: [192, 168, _, _], port: _ }) => None,
1265 Some(SocketAddress::TcpIpV4 { addr: [192, 88, 99, _], port: _ }) => None,
1267 Some(SocketAddress::TcpIpV6 {
1269 addr: [0x20..=0x3F, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],
1270 port: _,
1271 }) => ip_address,
1272 Some(SocketAddress::TcpIpV6 { addr: _, port: _ }) => None,
1274 Some(..) => ip_address,
1275 None => None,
1276 }
1277}
1278
1279impl<
1280 Descriptor: SocketDescriptor,
1281 CM: Deref,
1282 RM: Deref,
1283 OM: Deref,
1284 L: Deref,
1285 CMH: Deref,
1286 NS: Deref,
1287 SM: Deref,
1288 > PeerManager<Descriptor, CM, RM, OM, L, CMH, NS, SM>
1289where
1290 CM::Target: ChannelMessageHandler,
1291 RM::Target: RoutingMessageHandler,
1292 OM::Target: OnionMessageHandler,
1293 L::Target: Logger,
1294 CMH::Target: CustomMessageHandler,
1295 NS::Target: NodeSigner,
1296 SM::Target: SendOnlyMessageHandler,
1297{
1298 pub fn new(
1308 message_handler: MessageHandler<CM, RM, OM, CMH, SM>, current_time: u32,
1309 ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS,
1310 ) -> Self {
1311 let mut ephemeral_key_midstate = Sha256::engine();
1312 ephemeral_key_midstate.input(ephemeral_random_data);
1313
1314 let mut secp_ctx = Secp256k1::signing_only();
1315 let ephemeral_hash = Sha256::from_engine(ephemeral_key_midstate.clone()).to_byte_array();
1316 secp_ctx.seeded_randomize(&ephemeral_hash);
1317
1318 PeerManager {
1319 message_handler,
1320 peers: FairRwLock::new(new_hash_map()),
1321 node_id_to_descriptor: Mutex::new(new_hash_map()),
1322 event_processing_state: AtomicI32::new(0),
1323 ephemeral_key_midstate,
1324 peer_counter: AtomicCounter::new(),
1325 gossip_processing_backlogged: AtomicBool::new(false),
1326 gossip_processing_backlog_lifted: AtomicBool::new(false),
1327 last_node_announcement_serial: AtomicU32::new(current_time),
1328 logger,
1329 node_signer,
1330 secp_ctx,
1331 }
1332 }
1333
1334 pub fn list_peers(&self) -> Vec<PeerDetails> {
1337 let peers = self.peers.read().unwrap();
1338 let filter_fn = |peer_mutex: &Mutex<Peer>| {
1339 let p = peer_mutex.lock().unwrap();
1340 if !p.handshake_complete() {
1341 return None;
1342 }
1343 let details = PeerDetails {
1344 counterparty_node_id: p.their_node_id.unwrap().0,
1347 socket_address: p.their_socket_address.clone(),
1348 init_features: p.their_features.clone().unwrap(),
1351 is_inbound_connection: p.inbound_connection,
1352 };
1353 Some(details)
1354 };
1355 peers.values().filter_map(filter_fn).collect()
1356 }
1357
1358 pub fn peer_by_node_id(&self, their_node_id: &PublicKey) -> Option<PeerDetails> {
1362 let peers = self.peers.read().unwrap();
1363 peers.values().find_map(|peer_mutex| {
1364 let p = peer_mutex.lock().unwrap();
1365 if !p.handshake_complete() {
1366 return None;
1367 }
1368
1369 let counterparty_node_id = p.their_node_id.unwrap().0;
1372
1373 if counterparty_node_id != *their_node_id {
1374 return None;
1375 }
1376
1377 let details = PeerDetails {
1378 counterparty_node_id,
1379 socket_address: p.their_socket_address.clone(),
1380 init_features: p.their_features.clone().unwrap(),
1383 is_inbound_connection: p.inbound_connection,
1384 };
1385 Some(details)
1386 })
1387 }
1388
1389 fn get_ephemeral_key(&self) -> SecretKey {
1390 let mut ephemeral_hash = self.ephemeral_key_midstate.clone();
1391 let counter = self.peer_counter.next();
1392 ephemeral_hash.input(&counter.to_le_bytes());
1393 SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).to_byte_array())
1394 .expect("You broke SHA-256!")
1395 }
1396
1397 fn init_features(&self, their_node_id: PublicKey) -> InitFeatures {
1398 self.message_handler.chan_handler.provided_init_features(their_node_id)
1399 | self.message_handler.route_handler.provided_init_features(their_node_id)
1400 | self.message_handler.onion_message_handler.provided_init_features(their_node_id)
1401 | self.message_handler.custom_message_handler.provided_init_features(their_node_id)
1402 | self.message_handler.send_only_message_handler.provided_init_features(their_node_id)
1403 }
1404
1405 pub fn new_outbound_connection(
1421 &self, their_node_id: PublicKey, descriptor: Descriptor,
1422 remote_network_address: Option<SocketAddress>,
1423 ) -> Result<Vec<u8>, PeerHandleError> {
1424 let mut peer_encryptor =
1425 PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key());
1426 let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec();
1427 let pending_read_buffer = [0; 50].to_vec(); let mut peers = self.peers.write().unwrap();
1430 match peers.entry(descriptor) {
1431 hash_map::Entry::Occupied(_) => {
1432 debug_assert!(false, "PeerManager driver duplicated descriptors!");
1433 Err(PeerHandleError {})
1434 },
1435 hash_map::Entry::Vacant(e) => {
1436 e.insert(Mutex::new(Peer {
1437 channel_encryptor: peer_encryptor,
1438 their_node_id: None,
1439 their_features: None,
1440 their_socket_address: remote_network_address,
1441
1442 pending_outbound_buffer: VecDeque::new(),
1443 pending_outbound_buffer_first_msg_offset: 0,
1444 gossip_broadcast_buffer: VecDeque::new(),
1445 awaiting_write_event: false,
1446 sent_pause_read: false,
1447
1448 pending_read_buffer,
1449 pending_read_buffer_pos: 0,
1450 pending_read_is_header: false,
1451
1452 sync_status: InitSyncTracker::NoSyncRequested,
1453
1454 msgs_sent_since_pong: 0,
1455 awaiting_pong_timer_tick_intervals: 0,
1456 received_message_since_timer_tick: false,
1457 sent_gossip_timestamp_filter: false,
1458
1459 received_channel_announce_since_backlogged: false,
1460 inbound_connection: false,
1461
1462 message_batch: None,
1463 }));
1464 Ok(res)
1465 },
1466 }
1467 }
1468
1469 pub fn new_inbound_connection(
1485 &self, descriptor: Descriptor, remote_network_address: Option<SocketAddress>,
1486 ) -> Result<(), PeerHandleError> {
1487 let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.node_signer);
1488 let pending_read_buffer = [0; 50].to_vec(); let mut peers = self.peers.write().unwrap();
1491 match peers.entry(descriptor) {
1492 hash_map::Entry::Occupied(_) => {
1493 debug_assert!(false, "PeerManager driver duplicated descriptors!");
1494 Err(PeerHandleError {})
1495 },
1496 hash_map::Entry::Vacant(e) => {
1497 e.insert(Mutex::new(Peer {
1498 channel_encryptor: peer_encryptor,
1499 their_node_id: None,
1500 their_features: None,
1501 their_socket_address: remote_network_address,
1502
1503 pending_outbound_buffer: VecDeque::new(),
1504 pending_outbound_buffer_first_msg_offset: 0,
1505 gossip_broadcast_buffer: VecDeque::new(),
1506 awaiting_write_event: false,
1507 sent_pause_read: false,
1508
1509 pending_read_buffer,
1510 pending_read_buffer_pos: 0,
1511 pending_read_is_header: false,
1512
1513 sync_status: InitSyncTracker::NoSyncRequested,
1514
1515 msgs_sent_since_pong: 0,
1516 awaiting_pong_timer_tick_intervals: 0,
1517 received_message_since_timer_tick: false,
1518 sent_gossip_timestamp_filter: false,
1519
1520 received_channel_announce_since_backlogged: false,
1521 inbound_connection: true,
1522
1523 message_batch: None,
1524 }));
1525 Ok(())
1526 },
1527 }
1528 }
1529
1530 fn should_read_from(&self, peer: &mut Peer) -> bool {
1531 peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
1532 }
1533
1534 fn update_gossip_backlogged(&self) {
1535 let new_state = self.message_handler.route_handler.processing_queue_high();
1536 let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
1537 if prev_state && !new_state {
1538 self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
1539 }
1540 }
1541
1542 fn do_attempt_write_data(
1543 &self, descriptor: &mut Descriptor, peer: &mut Peer, mut force_one_write: bool,
1544 ) {
1545 force_one_write |= self.should_read_from(peer) == peer.sent_pause_read;
1550 while force_one_write || !peer.awaiting_write_event {
1551 if peer.should_buffer_onion_message() {
1552 if let Some((peer_node_id, _)) = peer.their_node_id {
1553 let handler = &self.message_handler.onion_message_handler;
1554 if let Some(next_onion_message) =
1555 handler.next_onion_message_for_peer(peer_node_id)
1556 {
1557 self.enqueue_message(peer, &next_onion_message);
1558 }
1559 }
1560 }
1561 if peer.should_buffer_gossip_broadcast() {
1562 if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
1563 peer.msgs_sent_since_pong += 1;
1564 peer.pending_outbound_buffer
1565 .push_back(peer.channel_encryptor.encrypt_buffer(msg));
1566 }
1567 }
1568 if peer.should_buffer_gossip_backfill() {
1569 match peer.sync_status {
1570 InitSyncTracker::NoSyncRequested => {},
1571 InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
1572 if let Some((announce, update_a_option, update_b_option)) =
1573 self.message_handler.route_handler.get_next_channel_announcement(c)
1574 {
1575 self.enqueue_message(peer, &announce);
1576 if let Some(update_a) = update_a_option {
1577 self.enqueue_message(peer, &update_a);
1578 }
1579 if let Some(update_b) = update_b_option {
1580 self.enqueue_message(peer, &update_b);
1581 }
1582 peer.sync_status = InitSyncTracker::ChannelsSyncing(
1583 announce.contents.short_channel_id + 1,
1584 );
1585 } else {
1586 peer.sync_status =
1587 InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
1588 }
1589 },
1590 InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
1591 let handler = &self.message_handler.route_handler;
1592 if let Some(msg) = handler.get_next_node_announcement(None) {
1593 self.enqueue_message(peer, &msg);
1594 peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
1595 } else {
1596 peer.sync_status = InitSyncTracker::NoSyncRequested;
1597 }
1598 },
1599 InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
1600 InitSyncTracker::NodesSyncing(sync_node_id) => {
1601 let handler = &self.message_handler.route_handler;
1602 if let Some(msg) = handler.get_next_node_announcement(Some(&sync_node_id)) {
1603 self.enqueue_message(peer, &msg);
1604 peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
1605 } else {
1606 peer.sync_status = InitSyncTracker::NoSyncRequested;
1607 }
1608 },
1609 }
1610 }
1611 if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
1612 self.maybe_send_extra_ping(peer);
1613 }
1614
1615 let should_read = self.should_read_from(peer);
1616 let next_buff = match peer.pending_outbound_buffer.front() {
1617 None => {
1618 if force_one_write {
1619 let data_sent = descriptor.send_data(&[], should_read);
1620 debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1621 peer.sent_pause_read = !should_read;
1622 }
1623 return;
1624 },
1625 Some(buff) => buff,
1626 };
1627 force_one_write = false;
1628
1629 let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
1630 let data_sent = descriptor.send_data(pending, should_read);
1631 peer.sent_pause_read = !should_read;
1632 peer.pending_outbound_buffer_first_msg_offset += data_sent;
1633 if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
1634 peer.pending_outbound_buffer_first_msg_offset = 0;
1635 peer.pending_outbound_buffer.pop_front();
1636 const VEC_SIZE: usize = ::core::mem::size_of::<Vec<u8>>();
1637 let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE;
1638 let lots_of_slack = peer.pending_outbound_buffer.len()
1639 < peer.pending_outbound_buffer.capacity() / 2;
1640 if large_capacity && lots_of_slack {
1641 peer.pending_outbound_buffer.shrink_to_fit();
1642 }
1643 } else {
1644 peer.awaiting_write_event = true;
1645 }
1646 }
1647 }
1648
1649 pub fn write_buffer_space_avail(
1662 &self, descriptor: &mut Descriptor,
1663 ) -> Result<(), PeerHandleError> {
1664 let peers = self.peers.read().unwrap();
1665 match peers.get(descriptor) {
1666 None => {
1667 return Err(PeerHandleError {});
1671 },
1672 Some(peer_mutex) => {
1673 let mut peer = peer_mutex.lock().unwrap();
1674 peer.awaiting_write_event = false;
1675 self.do_attempt_write_data(descriptor, &mut peer, true);
1679 },
1680 };
1681 Ok(())
1682 }
1683
1684 pub fn read_event(
1700 &self, peer_descriptor: &mut Descriptor, data: &[u8],
1701 ) -> Result<(), PeerHandleError> {
1702 match self.do_read_event(peer_descriptor, data) {
1703 Ok(res) => Ok(res),
1704 Err(e) => {
1705 self.disconnect_event_internal(peer_descriptor, "of a protocol error");
1706 Err(e)
1707 },
1708 }
1709 }
1710
1711 fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
1713 let their_node_id = peer.their_node_id.map(|p| p.0);
1714 if let Some(node_id) = their_node_id {
1715 let logger = WithContext::from(&self.logger, their_node_id, None, None);
1716 if is_gossip_msg(message.type_id()) {
1717 log_gossip!(logger, "Enqueueing message {:?} to {}", message, node_id);
1718 } else {
1719 log_trace!(logger, "Enqueueing message {:?} to {}", message, node_id);
1720 }
1721 } else {
1722 debug_assert!(false, "node_id should be set by the time we send a message");
1723 }
1724 peer.msgs_sent_since_pong += 1;
1725 peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message));
1726 }
1727
1728 fn do_read_event(
1729 &self, peer_descriptor: &mut Descriptor, data: &[u8],
1730 ) -> Result<(), PeerHandleError> {
1731 let peers = self.peers.read().unwrap();
1732 let mut msgs_to_forward = Vec::new();
1733 let mut peer_node_id = None;
1734
1735 if let Some(peer_mutex) = peers.get(peer_descriptor) {
1736 let mut read_pos = 0;
1737 while read_pos < data.len() {
1738 macro_rules! try_potential_handleerror {
1739 ($peer: expr, $thing: expr) => {{
1740 let res = $thing;
1741 let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None, None);
1742 match res {
1743 Ok(x) => x,
1744 Err(e) => {
1745 match e.action {
1746 msgs::ErrorAction::DisconnectPeer { .. } => {
1747 log_debug!(logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1753 return Err(PeerHandleError { });
1754 },
1755 msgs::ErrorAction::DisconnectPeerWithWarning { .. } => {
1756 log_debug!(logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1762 return Err(PeerHandleError { });
1763 },
1764 msgs::ErrorAction::IgnoreAndLog(level) => {
1765 log_given_level!(logger, level, "Error handling {}message{}; ignoring: {}",
1766 if level == Level::Gossip { "gossip " } else { "" },
1767 OptionalFromDebugger(&peer_node_id), e.err);
1768 continue
1769 },
1770 msgs::ErrorAction::IgnoreDuplicateGossip => continue, msgs::ErrorAction::IgnoreError => {
1772 log_debug!(logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err);
1773 continue;
1774 },
1775 msgs::ErrorAction::SendErrorMessage { msg } => {
1776 log_debug!(logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1777 self.enqueue_message($peer, &msg);
1778 continue;
1779 },
1780 msgs::ErrorAction::SendWarningMessage { msg, log_level } => {
1781 log_given_level!(logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1782 self.enqueue_message($peer, &msg);
1783 continue;
1784 },
1785 }
1786 }
1787 }
1788 }}
1789 }
1790
1791 let mut peer_lock = peer_mutex.lock().unwrap();
1792 let peer = &mut *peer_lock;
1793 let mut msg_to_handle = None;
1794 if peer_node_id.is_none() {
1795 peer_node_id.clone_from(&peer.their_node_id);
1796 }
1797
1798 assert!(peer.pending_read_buffer.len() > 0);
1799 assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
1800
1801 {
1802 let data_to_copy = cmp::min(
1803 peer.pending_read_buffer.len() - peer.pending_read_buffer_pos,
1804 data.len() - read_pos,
1805 );
1806 peer.pending_read_buffer
1807 [peer.pending_read_buffer_pos..peer.pending_read_buffer_pos + data_to_copy]
1808 .copy_from_slice(&data[read_pos..read_pos + data_to_copy]);
1809 read_pos += data_to_copy;
1810 peer.pending_read_buffer_pos += data_to_copy;
1811 }
1812
1813 if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() {
1814 peer.pending_read_buffer_pos = 0;
1815
1816 macro_rules! insert_node_id {
1817 () => {
1818 let their_node_id = if let Some((node_id, _)) = peer.their_node_id {
1819 node_id
1820 } else {
1821 debug_assert!(false, "Should have a node_id to insert");
1822 return Err(PeerHandleError {});
1823 };
1824 let logger = WithContext::from(&self.logger, Some(their_node_id), None, None);
1825 match self.node_id_to_descriptor.lock().unwrap().entry(their_node_id) {
1826 hash_map::Entry::Occupied(e) => {
1827 log_trace!(logger, "Got second connection with {}, closing", their_node_id);
1828 peer.their_node_id = None;
1830 debug_assert!(peers.get(e.get()).is_some());
1834 return Err(PeerHandleError { })
1835 },
1836 hash_map::Entry::Vacant(entry) => {
1837 log_debug!(logger, "Finished noise handshake for connection with {}", their_node_id);
1838 entry.insert(peer_descriptor.clone())
1839 },
1840 };
1841 }
1842 }
1843
1844 let next_step = peer.channel_encryptor.get_noise_step();
1845 match next_step {
1846 NextNoiseStep::ActOne => {
1847 let res = peer.channel_encryptor.process_act_one_with_keys(
1848 &peer.pending_read_buffer[..],
1849 &self.node_signer,
1850 self.get_ephemeral_key(),
1851 &self.secp_ctx,
1852 );
1853 let act_two = try_potential_handleerror!(peer, res).to_vec();
1854 peer.pending_outbound_buffer.push_back(act_two);
1855 peer.pending_read_buffer = [0; 66].to_vec(); },
1857 NextNoiseStep::ActTwo => {
1858 let res = peer
1859 .channel_encryptor
1860 .process_act_two(&peer.pending_read_buffer[..], &self.node_signer);
1861 let (act_three, their_node_id) = try_potential_handleerror!(peer, res);
1862 peer.pending_outbound_buffer.push_back(act_three.to_vec());
1863 peer.pending_read_buffer = [0; 18].to_vec(); peer.pending_read_is_header = true;
1865
1866 peer.set_their_node_id(their_node_id);
1867 insert_node_id!();
1868 let features = self.init_features(their_node_id);
1869 let networks = self.message_handler.chan_handler.get_chain_hashes();
1870 let resp = msgs::Init {
1871 features,
1872 networks,
1873 remote_network_address: filter_addresses(
1874 peer.their_socket_address.clone(),
1875 ),
1876 };
1877 self.enqueue_message(peer, &resp);
1878 },
1879 NextNoiseStep::ActThree => {
1880 let res = peer
1881 .channel_encryptor
1882 .process_act_three(&peer.pending_read_buffer[..]);
1883 let their_node_id = try_potential_handleerror!(peer, res);
1884 peer.pending_read_buffer = [0; 18].to_vec(); peer.pending_read_is_header = true;
1886 peer.set_their_node_id(their_node_id);
1887 insert_node_id!();
1888 let features = self.init_features(their_node_id);
1889 let networks = self.message_handler.chan_handler.get_chain_hashes();
1890 let resp = msgs::Init {
1891 features,
1892 networks,
1893 remote_network_address: filter_addresses(
1894 peer.their_socket_address.clone(),
1895 ),
1896 };
1897 self.enqueue_message(peer, &resp);
1898 },
1899 NextNoiseStep::NoiseComplete => {
1900 if peer.pending_read_is_header {
1901 let res = peer
1902 .channel_encryptor
1903 .decrypt_length_header(&peer.pending_read_buffer[..]);
1904 let msg_len = try_potential_handleerror!(peer, res);
1905 if peer.pending_read_buffer.capacity() > 8192 {
1906 peer.pending_read_buffer = Vec::new();
1907 }
1908 peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
1909 if msg_len < 2 {
1910 return Err(PeerHandleError {});
1912 }
1913 peer.pending_read_is_header = false;
1914 } else {
1915 debug_assert!(peer.pending_read_buffer.len() >= 2 + 16);
1916 let res = peer
1917 .channel_encryptor
1918 .decrypt_message(&mut peer.pending_read_buffer[..]);
1919 try_potential_handleerror!(peer, res);
1920
1921 let message_result = wire::read(
1922 &mut &peer.pending_read_buffer
1923 [..peer.pending_read_buffer.len() - 16],
1924 &*self.message_handler.custom_message_handler,
1925 );
1926
1927 if peer.pending_read_buffer.capacity() > 8192 {
1929 peer.pending_read_buffer = Vec::new();
1930 }
1931 peer.pending_read_buffer.resize(18, 0);
1932 peer.pending_read_is_header = true;
1933
1934 let their_node_id = peer.their_node_id.map(|p| p.0);
1935 let logger =
1936 WithContext::from(&self.logger, their_node_id, None, None);
1937 let message = match message_result {
1938 Ok(x) => x,
1939 Err(e) => {
1940 match e {
1941 (
1946 msgs::DecodeError::UnknownRequiredFeature,
1947 Some(ty),
1948 ) if is_gossip_msg(ty) => {
1949 log_gossip!(logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!");
1950 continue;
1951 },
1952 (msgs::DecodeError::UnsupportedCompression, _) => {
1953 log_gossip!(logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message");
1954 let channel_id = ChannelId::new_zero();
1955 let data = "Unsupported message compression: zlib"
1956 .to_owned();
1957 let msg = msgs::WarningMessage { channel_id, data };
1958 self.enqueue_message(peer, &msg);
1959 continue;
1960 },
1961 (_, Some(ty)) if is_gossip_msg(ty) => {
1962 log_gossip!(logger, "Got an invalid value while deserializing a gossip message");
1963 let channel_id = ChannelId::new_zero();
1964 let data = format!(
1965 "Unreadable/bogus gossip message of type {}",
1966 ty
1967 );
1968 let msg = msgs::WarningMessage { channel_id, data };
1969 self.enqueue_message(peer, &msg);
1970 continue;
1971 },
1972 (msgs::DecodeError::UnknownRequiredFeature, _) => {
1973 log_debug!(logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
1974 return Err(PeerHandleError {});
1975 },
1976 (msgs::DecodeError::UnknownVersion, _) => {
1977 return Err(PeerHandleError {})
1978 },
1979 (msgs::DecodeError::InvalidValue, _) => {
1980 log_debug!(logger, "Got an invalid value while deserializing message");
1981 return Err(PeerHandleError {});
1982 },
1983 (msgs::DecodeError::ShortRead, _) => {
1984 log_debug!(logger, "Deserialization failed due to shortness of message");
1985 return Err(PeerHandleError {});
1986 },
1987 (msgs::DecodeError::BadLengthDescriptor, _) => {
1988 return Err(PeerHandleError {})
1989 },
1990 (msgs::DecodeError::Io(_), _) => {
1991 return Err(PeerHandleError {})
1992 },
1993 (msgs::DecodeError::DangerousValue, _) => {
1994 return Err(PeerHandleError {})
1995 },
1996 }
1997 },
1998 };
1999
2000 msg_to_handle = Some(message);
2001 }
2002 },
2003 }
2004 }
2005
2006 if let Some(message) = msg_to_handle {
2007 match self.handle_message(&peer_mutex, peer_lock, message) {
2008 Err(handling_error) => match handling_error {
2009 MessageHandlingError::PeerHandleError(e) => return Err(e),
2010 MessageHandlingError::LightningError(e) => {
2011 try_potential_handleerror!(&mut peer_mutex.lock().unwrap(), Err(e));
2012 },
2013 },
2014 Ok(Some(msg)) => {
2015 msgs_to_forward.push(msg);
2016 },
2017 Ok(None) => {},
2018 }
2019 }
2020 }
2021 } else {
2022 return Err(PeerHandleError {});
2026 }
2027
2028 for msg in msgs_to_forward.drain(..) {
2029 self.forward_broadcast_msg(
2030 &*peers,
2031 &msg,
2032 peer_node_id.as_ref().map(|(pk, _)| pk),
2033 false,
2034 );
2035 }
2036
2037 Ok(())
2038 }
2039
2040 fn handle_message(
2044 &self, peer_mutex: &Mutex<Peer>, peer_lock: MutexGuard<Peer>,
2045 message: wire::Message<
2046 <<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage,
2047 >,
2048 ) -> Result<
2049 Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>,
2050 MessageHandlingError,
2051 > {
2052 let their_node_id = peer_lock
2053 .their_node_id
2054 .expect("We know the peer's public key by the time we receive messages")
2055 .0;
2056 let logger = WithContext::from(&self.logger, Some(their_node_id), None, None);
2057
2058 let unprocessed_message =
2059 self.do_handle_message_holding_peer_lock(peer_lock, message, their_node_id, &logger)?;
2060
2061 self.message_handler.chan_handler.message_received();
2062
2063 match unprocessed_message {
2064 Some(LogicalMessage::FromWire(message)) => self.do_handle_message_without_peer_lock(
2065 peer_mutex,
2066 message,
2067 their_node_id,
2068 &logger,
2069 ),
2070 Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)) => {
2071 log_trace!(
2072 logger,
2073 "Received commitment_signed batch {:?} from {}",
2074 batch,
2075 their_node_id,
2076 );
2077 let chan_handler = &self.message_handler.chan_handler;
2078 chan_handler.handle_commitment_signed_batch(their_node_id, channel_id, batch);
2079 return Ok(None);
2080 },
2081 None => Ok(None),
2082 }
2083 }
2084
2085 fn do_handle_message_holding_peer_lock<'a>(
2090 &self, mut peer_lock: MutexGuard<Peer>,
2091 message: wire::Message<
2092 <<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage,
2093 >,
2094 their_node_id: PublicKey, logger: &WithContext<'a, L>,
2095 ) -> Result<
2096 Option<
2097 LogicalMessage<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
2098 >,
2099 MessageHandlingError,
2100 > {
2101 peer_lock.received_message_since_timer_tick = true;
2102
2103 if let wire::Message::Init(msg) = message {
2105 if let Some(networks) = &msg.networks {
2107 let chan_handler = &self.message_handler.chan_handler;
2108 if let Some(our_chains) = chan_handler.get_chain_hashes() {
2109 let mut have_compatible_chains = false;
2110 'our_chains: for our_chain in our_chains.iter() {
2111 for their_chain in networks {
2112 if our_chain == their_chain {
2113 have_compatible_chains = true;
2114 break 'our_chains;
2115 }
2116 }
2117 }
2118 if !have_compatible_chains {
2119 log_debug!(logger, "Peer does not support any of our supported chains");
2120 return Err(PeerHandleError {}.into());
2121 }
2122 }
2123 }
2124
2125 let our_features = self.init_features(their_node_id);
2126 if msg.features.requires_unknown_bits_from(&our_features) {
2127 log_debug!(
2128 logger,
2129 "Peer {} requires features unknown to us: {:?}",
2130 their_node_id,
2131 msg.features.required_unknown_bits_from(&our_features)
2132 );
2133 return Err(PeerHandleError {}.into());
2134 }
2135
2136 if our_features.requires_unknown_bits_from(&msg.features) {
2137 log_debug!(
2138 logger,
2139 "We require features unknown to our peer {}: {:?}",
2140 their_node_id,
2141 our_features.required_unknown_bits_from(&msg.features)
2142 );
2143 return Err(PeerHandleError {}.into());
2144 }
2145
2146 if peer_lock.their_features.is_some() {
2147 return Err(PeerHandleError {}.into());
2148 }
2149
2150 log_info!(
2151 logger,
2152 "Received peer Init message from {}: {}",
2153 their_node_id,
2154 msg.features
2155 );
2156
2157 if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
2159 peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
2160 }
2161
2162 let inbound = peer_lock.inbound_connection;
2163 let route_handler = &self.message_handler.route_handler;
2164 if let Err(()) = route_handler.peer_connected(their_node_id, &msg, inbound) {
2165 log_debug!(
2166 logger,
2167 "Route Handler decided we couldn't communicate with peer {}",
2168 their_node_id,
2169 );
2170 return Err(PeerHandleError {}.into());
2171 }
2172 let chan_handler = &self.message_handler.chan_handler;
2173 if let Err(()) = chan_handler.peer_connected(their_node_id, &msg, inbound) {
2174 log_debug!(
2175 logger,
2176 "Channel Handler decided we couldn't communicate with peer {}",
2177 their_node_id,
2178 );
2179 self.message_handler.route_handler.peer_disconnected(their_node_id);
2180 return Err(PeerHandleError {}.into());
2181 }
2182 let onion_message_handler = &self.message_handler.onion_message_handler;
2183 if let Err(()) = onion_message_handler.peer_connected(their_node_id, &msg, inbound) {
2184 log_debug!(
2185 logger,
2186 "Onion Message Handler decided we couldn't communicate with peer {}",
2187 their_node_id,
2188 );
2189 self.message_handler.route_handler.peer_disconnected(their_node_id);
2190 self.message_handler.chan_handler.peer_disconnected(their_node_id);
2191 return Err(PeerHandleError {}.into());
2192 }
2193 let custom_handler = &self.message_handler.custom_message_handler;
2194 if let Err(()) = custom_handler.peer_connected(their_node_id, &msg, inbound) {
2195 log_debug!(
2196 logger,
2197 "Custom Message Handler decided we couldn't communicate with peer {}",
2198 their_node_id,
2199 );
2200 self.message_handler.route_handler.peer_disconnected(their_node_id);
2201 self.message_handler.chan_handler.peer_disconnected(their_node_id);
2202 self.message_handler.onion_message_handler.peer_disconnected(their_node_id);
2203 return Err(PeerHandleError {}.into());
2204 }
2205 let sends_handler = &self.message_handler.send_only_message_handler;
2206 if let Err(()) = sends_handler.peer_connected(their_node_id, &msg, inbound) {
2207 log_debug!(
2208 logger,
2209 "Sending-Only Message Handler decided we couldn't communicate with peer {}",
2210 their_node_id,
2211 );
2212 self.message_handler.route_handler.peer_disconnected(their_node_id);
2213 self.message_handler.chan_handler.peer_disconnected(their_node_id);
2214 self.message_handler.onion_message_handler.peer_disconnected(their_node_id);
2215 self.message_handler.custom_message_handler.peer_disconnected(their_node_id);
2216 return Err(PeerHandleError {}.into());
2217 }
2218
2219 peer_lock.awaiting_pong_timer_tick_intervals = 0;
2220 peer_lock.their_features = Some(msg.features);
2221 return Ok(None);
2222 } else if peer_lock.their_features.is_none() {
2223 log_debug!(logger, "Peer {} sent non-Init first message", their_node_id);
2224 return Err(PeerHandleError {}.into());
2225 }
2226
2227 if let wire::Message::StartBatch(msg) = message {
2230 if peer_lock.message_batch.is_some() {
2231 let error = format!(
2232 "Peer {} sent start_batch for channel {} before previous batch completed",
2233 their_node_id, &msg.channel_id
2234 );
2235 log_debug!(logger, "{}", error);
2236 return Err(LightningError {
2237 err: error.clone(),
2238 action: msgs::ErrorAction::DisconnectPeerWithWarning {
2239 msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2240 },
2241 }
2242 .into());
2243 }
2244
2245 let batch_size = msg.batch_size as usize;
2246 if batch_size <= 1 {
2247 let error = format!(
2248 "Peer {} sent start_batch for channel {} not strictly greater than 1",
2249 their_node_id, &msg.channel_id
2250 );
2251 log_debug!(logger, "{}", error);
2252 return Err(LightningError {
2253 err: error.clone(),
2254 action: msgs::ErrorAction::SendWarningMessage {
2255 msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2256 log_level: Level::Debug,
2257 },
2258 }
2259 .into());
2260 }
2261
2262 const BATCH_SIZE_LIMIT: usize = 20;
2263 if batch_size > BATCH_SIZE_LIMIT {
2264 let error = format!(
2265 "Peer {} sent start_batch for channel {} exceeding the limit",
2266 their_node_id, &msg.channel_id
2267 );
2268 log_debug!(logger, "{}", error);
2269 return Err(LightningError {
2270 err: error.clone(),
2271 action: msgs::ErrorAction::DisconnectPeerWithWarning {
2272 msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2273 },
2274 }
2275 .into());
2276 }
2277
2278 let messages = match msg.message_type {
2279 Some(message_type) if message_type == msgs::CommitmentSigned::TYPE => {
2280 let messages = Vec::with_capacity(batch_size);
2281 MessageBatchImpl::CommitmentSigned(messages)
2282 },
2283 _ => {
2284 log_debug!(
2285 logger,
2286 "Peer {} sent start_batch for channel {} without a known message type; ignoring",
2287 their_node_id,
2288 &msg.channel_id,
2289 );
2290 return Ok(None);
2291 },
2292 };
2293
2294 let message_batch = MessageBatch { channel_id: msg.channel_id, batch_size, messages };
2295 peer_lock.message_batch = Some(message_batch);
2296
2297 return Ok(None);
2298 }
2299
2300 if let wire::Message::CommitmentSigned(msg) = message {
2301 if let Some(message_batch) = &mut peer_lock.message_batch {
2302 let MessageBatchImpl::CommitmentSigned(ref mut messages) =
2303 &mut message_batch.messages;
2304
2305 if msg.channel_id != message_batch.channel_id {
2306 let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", their_node_id, message_batch.channel_id, &msg.channel_id);
2307 log_debug!(logger, "{}", error);
2308 return Err(LightningError {
2309 err: error.clone(),
2310 action: msgs::ErrorAction::DisconnectPeerWithWarning {
2311 msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2312 },
2313 }
2314 .into());
2315 }
2316
2317 messages.push(msg);
2318
2319 if messages.len() == message_batch.batch_size {
2320 let MessageBatch { channel_id, batch_size: _, messages } =
2321 peer_lock.message_batch.take().expect("batch should have been inserted");
2322 let MessageBatchImpl::CommitmentSigned(batch) = messages;
2323
2324 return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
2325 } else {
2326 return Ok(None);
2327 }
2328 } else {
2329 return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
2330 }
2331 } else if let Some(message_batch) = &peer_lock.message_batch {
2332 match message_batch.messages {
2333 MessageBatchImpl::CommitmentSigned(_) => {
2334 log_debug!(
2335 logger,
2336 "Peer {} sent an unexpected message for a commitment_signed batch",
2337 their_node_id,
2338 );
2339 },
2340 }
2341
2342 return Err(PeerHandleError {}.into());
2343 }
2344
2345 if let wire::Message::GossipTimestampFilter(_msg) = message {
2346 if peer_lock.their_features.as_ref().unwrap().supports_gossip_queries()
2349 && !peer_lock.sent_gossip_timestamp_filter
2350 {
2351 peer_lock.sent_gossip_timestamp_filter = true;
2352
2353 #[allow(unused_mut)]
2354 let mut should_do_full_sync = true;
2355 #[cfg(feature = "std")]
2356 {
2357 use std::time::{SystemTime, UNIX_EPOCH};
2360 let full_sync_threshold = SystemTime::now()
2361 .duration_since(UNIX_EPOCH)
2362 .expect("Time must be > 1970")
2363 .as_secs() - 6 * 3600;
2364 if (_msg.first_timestamp as u64) > full_sync_threshold {
2365 should_do_full_sync = false;
2366 }
2367 }
2368 if should_do_full_sync {
2369 peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
2370 } else {
2371 peer_lock.sync_status = InitSyncTracker::NoSyncRequested;
2372 }
2373 }
2374 return Ok(None);
2375 }
2376
2377 if let wire::Message::ChannelAnnouncement(ref _msg) = message {
2378 peer_lock.received_channel_announce_since_backlogged = true;
2379 }
2380
2381 Ok(Some(LogicalMessage::FromWire(message)))
2382 }
2383
2384 fn do_handle_message_without_peer_lock<'a>(
2388 &self, peer_mutex: &Mutex<Peer>,
2389 message: wire::Message<
2390 <<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage,
2391 >,
2392 their_node_id: PublicKey, logger: &WithContext<'a, L>,
2393 ) -> Result<
2394 Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>,
2395 MessageHandlingError,
2396 > {
2397 if is_gossip_msg(message.type_id()) {
2398 log_gossip!(logger, "Received message {:?} from {}", message, their_node_id);
2399 } else {
2400 log_trace!(logger, "Received message {:?} from {}", message, their_node_id);
2401 }
2402
2403 let mut should_forward = None;
2404
2405 match message {
2406 wire::Message::Init(_) => {
2408 },
2410 wire::Message::GossipTimestampFilter(_) => {
2411 },
2413 wire::Message::Error(msg) => {
2414 log_debug!(
2415 logger,
2416 "Got Err message from {}: {}",
2417 their_node_id,
2418 PrintableString(&msg.data)
2419 );
2420 self.message_handler.chan_handler.handle_error(their_node_id, &msg);
2421 if msg.channel_id.is_zero() {
2422 return Err(PeerHandleError {}.into());
2423 }
2424 },
2425 wire::Message::Warning(msg) => {
2426 log_debug!(
2427 logger,
2428 "Got warning message from {}: {}",
2429 their_node_id,
2430 PrintableString(&msg.data)
2431 );
2432 },
2433
2434 wire::Message::Ping(msg) => {
2435 if msg.ponglen < 65532 {
2436 let resp = msgs::Pong { byteslen: msg.ponglen };
2437 self.enqueue_message(&mut *peer_mutex.lock().unwrap(), &resp);
2438 }
2439 },
2440 wire::Message::Pong(_msg) => {
2441 let mut peer_lock = peer_mutex.lock().unwrap();
2442 peer_lock.awaiting_pong_timer_tick_intervals = 0;
2443 peer_lock.msgs_sent_since_pong = 0;
2444 },
2445
2446 wire::Message::StartBatch(_msg) => {
2448 debug_assert!(false);
2449 },
2450 wire::Message::OpenChannel(msg) => {
2451 self.message_handler.chan_handler.handle_open_channel(their_node_id, &msg);
2452 },
2453 wire::Message::OpenChannelV2(_msg) => {
2454 self.message_handler.chan_handler.handle_open_channel_v2(their_node_id, &_msg);
2455 },
2456 wire::Message::AcceptChannel(msg) => {
2457 self.message_handler.chan_handler.handle_accept_channel(their_node_id, &msg);
2458 },
2459 wire::Message::AcceptChannelV2(msg) => {
2460 self.message_handler.chan_handler.handle_accept_channel_v2(their_node_id, &msg);
2461 },
2462
2463 wire::Message::FundingCreated(msg) => {
2464 self.message_handler.chan_handler.handle_funding_created(their_node_id, &msg);
2465 },
2466 wire::Message::FundingSigned(msg) => {
2467 self.message_handler.chan_handler.handle_funding_signed(their_node_id, &msg);
2468 },
2469 wire::Message::ChannelReady(msg) => {
2470 self.message_handler.chan_handler.handle_channel_ready(their_node_id, &msg);
2471 },
2472 wire::Message::PeerStorage(msg) => {
2473 self.message_handler.chan_handler.handle_peer_storage(their_node_id, msg);
2474 },
2475 wire::Message::PeerStorageRetrieval(msg) => {
2476 self.message_handler.chan_handler.handle_peer_storage_retrieval(their_node_id, msg);
2477 },
2478
2479 wire::Message::Stfu(msg) => {
2481 self.message_handler.chan_handler.handle_stfu(their_node_id, &msg);
2482 },
2483
2484 wire::Message::SpliceInit(msg) => {
2486 self.message_handler.chan_handler.handle_splice_init(their_node_id, &msg);
2487 },
2488 wire::Message::SpliceAck(msg) => {
2489 self.message_handler.chan_handler.handle_splice_ack(their_node_id, &msg);
2490 },
2491 wire::Message::SpliceLocked(msg) => {
2492 self.message_handler.chan_handler.handle_splice_locked(their_node_id, &msg);
2493 },
2494
2495 wire::Message::TxAddInput(msg) => {
2497 self.message_handler.chan_handler.handle_tx_add_input(their_node_id, &msg);
2498 },
2499 wire::Message::TxAddOutput(msg) => {
2500 self.message_handler.chan_handler.handle_tx_add_output(their_node_id, &msg);
2501 },
2502 wire::Message::TxRemoveInput(msg) => {
2503 self.message_handler.chan_handler.handle_tx_remove_input(their_node_id, &msg);
2504 },
2505 wire::Message::TxRemoveOutput(msg) => {
2506 self.message_handler.chan_handler.handle_tx_remove_output(their_node_id, &msg);
2507 },
2508 wire::Message::TxComplete(msg) => {
2509 self.message_handler.chan_handler.handle_tx_complete(their_node_id, &msg);
2510 },
2511 wire::Message::TxSignatures(msg) => {
2512 self.message_handler.chan_handler.handle_tx_signatures(their_node_id, &msg);
2513 },
2514 wire::Message::TxInitRbf(msg) => {
2515 self.message_handler.chan_handler.handle_tx_init_rbf(their_node_id, &msg);
2516 },
2517 wire::Message::TxAckRbf(msg) => {
2518 self.message_handler.chan_handler.handle_tx_ack_rbf(their_node_id, &msg);
2519 },
2520 wire::Message::TxAbort(msg) => {
2521 self.message_handler.chan_handler.handle_tx_abort(their_node_id, &msg);
2522 },
2523
2524 wire::Message::Shutdown(msg) => {
2525 self.message_handler.chan_handler.handle_shutdown(their_node_id, &msg);
2526 },
2527 wire::Message::ClosingSigned(msg) => {
2528 self.message_handler.chan_handler.handle_closing_signed(their_node_id, &msg);
2529 },
2530 #[cfg(simple_close)]
2531 wire::Message::ClosingComplete(msg) => {
2532 self.message_handler.chan_handler.handle_closing_complete(their_node_id, msg);
2533 },
2534 #[cfg(simple_close)]
2535 wire::Message::ClosingSig(msg) => {
2536 self.message_handler.chan_handler.handle_closing_sig(their_node_id, msg);
2537 },
2538
2539 wire::Message::UpdateAddHTLC(msg) => {
2541 self.message_handler.chan_handler.handle_update_add_htlc(their_node_id, &msg);
2542 },
2543 wire::Message::UpdateFulfillHTLC(msg) => {
2544 self.message_handler.chan_handler.handle_update_fulfill_htlc(their_node_id, msg);
2545 },
2546 wire::Message::UpdateFailHTLC(msg) => {
2547 self.message_handler.chan_handler.handle_update_fail_htlc(their_node_id, &msg);
2548 },
2549 wire::Message::UpdateFailMalformedHTLC(msg) => {
2550 let chan_handler = &self.message_handler.chan_handler;
2551 chan_handler.handle_update_fail_malformed_htlc(their_node_id, &msg);
2552 },
2553
2554 wire::Message::CommitmentSigned(msg) => {
2555 self.message_handler.chan_handler.handle_commitment_signed(their_node_id, &msg);
2556 },
2557 wire::Message::RevokeAndACK(msg) => {
2558 self.message_handler.chan_handler.handle_revoke_and_ack(their_node_id, &msg);
2559 },
2560 wire::Message::UpdateFee(msg) => {
2561 self.message_handler.chan_handler.handle_update_fee(their_node_id, &msg);
2562 },
2563 wire::Message::ChannelReestablish(msg) => {
2564 self.message_handler.chan_handler.handle_channel_reestablish(their_node_id, &msg);
2565 },
2566
2567 wire::Message::AnnouncementSignatures(msg) => {
2569 let chan_handler = &self.message_handler.chan_handler;
2570 chan_handler.handle_announcement_signatures(their_node_id, &msg);
2571 },
2572 wire::Message::ChannelAnnouncement(msg) => {
2573 let route_handler = &self.message_handler.route_handler;
2574 if route_handler
2575 .handle_channel_announcement(Some(their_node_id), &msg)
2576 .map_err(|e| -> MessageHandlingError { e.into() })?
2577 {
2578 should_forward = Some(wire::Message::ChannelAnnouncement(msg));
2579 }
2580 self.update_gossip_backlogged();
2581 },
2582 wire::Message::NodeAnnouncement(msg) => {
2583 let route_handler = &self.message_handler.route_handler;
2584 if route_handler
2585 .handle_node_announcement(Some(their_node_id), &msg)
2586 .map_err(|e| -> MessageHandlingError { e.into() })?
2587 {
2588 should_forward = Some(wire::Message::NodeAnnouncement(msg));
2589 }
2590 self.update_gossip_backlogged();
2591 },
2592 wire::Message::ChannelUpdate(msg) => {
2593 let chan_handler = &self.message_handler.chan_handler;
2594 chan_handler.handle_channel_update(their_node_id, &msg);
2595
2596 let route_handler = &self.message_handler.route_handler;
2597 if route_handler
2598 .handle_channel_update(Some(their_node_id), &msg)
2599 .map_err(|e| -> MessageHandlingError { e.into() })?
2600 {
2601 should_forward = Some(wire::Message::ChannelUpdate(msg));
2602 }
2603 self.update_gossip_backlogged();
2604 },
2605 wire::Message::QueryShortChannelIds(msg) => {
2606 let route_handler = &self.message_handler.route_handler;
2607 route_handler.handle_query_short_channel_ids(their_node_id, msg)?;
2608 },
2609 wire::Message::ReplyShortChannelIdsEnd(msg) => {
2610 let route_handler = &self.message_handler.route_handler;
2611 route_handler.handle_reply_short_channel_ids_end(their_node_id, msg)?;
2612 },
2613 wire::Message::QueryChannelRange(msg) => {
2614 let route_handler = &self.message_handler.route_handler;
2615 route_handler.handle_query_channel_range(their_node_id, msg)?;
2616 },
2617 wire::Message::ReplyChannelRange(msg) => {
2618 let route_handler = &self.message_handler.route_handler;
2619 route_handler.handle_reply_channel_range(their_node_id, msg)?;
2620 },
2621
2622 wire::Message::OnionMessage(msg) => {
2624 let onion_message_handler = &self.message_handler.onion_message_handler;
2625 onion_message_handler.handle_onion_message(their_node_id, &msg);
2626 },
2627
2628 wire::Message::Unknown(type_id) if message.is_even() => {
2630 log_debug!(
2631 logger,
2632 "Received unknown even message of type {}, disconnecting peer!",
2633 type_id
2634 );
2635 return Err(PeerHandleError {}.into());
2636 },
2637 wire::Message::Unknown(type_id) => {
2638 log_trace!(logger, "Received unknown odd message of type {}, ignoring", type_id);
2639 },
2640 wire::Message::Custom(custom) => {
2641 let custom_message_handler = &self.message_handler.custom_message_handler;
2642 custom_message_handler.handle_custom_message(custom, their_node_id)?;
2643 },
2644 };
2645 Ok(should_forward)
2646 }
2647
2648 fn forward_broadcast_msg(
2655 &self, peers: &HashMap<Descriptor, Mutex<Peer>>,
2656 msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
2657 except_node: Option<&PublicKey>, allow_large_buffer: bool,
2658 ) {
2659 match msg {
2660 wire::Message::ChannelAnnouncement(ref msg) => {
2661 log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
2662 let encoded_msg = encode_msg!(msg);
2663
2664 for (_, peer_mutex) in peers.iter() {
2665 let mut peer = peer_mutex.lock().unwrap();
2666 if !peer.handshake_complete()
2667 || !peer.should_forward_channel_announcement(msg.contents.short_channel_id)
2668 {
2669 continue;
2670 }
2671 debug_assert!(peer.their_node_id.is_some());
2672 debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
2673 let their_node_id = peer.their_node_id.map(|p| p.0);
2674 let logger = WithContext::from(&self.logger, their_node_id, None, None);
2675 if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
2676 log_gossip!(
2677 logger,
2678 "Skipping broadcast message to {:?} as its outbound buffer is full",
2679 peer.their_node_id
2680 );
2681 continue;
2682 }
2683 if let Some((_, their_node_id)) = peer.their_node_id {
2684 if their_node_id == msg.contents.node_id_1
2685 || their_node_id == msg.contents.node_id_2
2686 {
2687 continue;
2688 }
2689 }
2690 if except_node.is_some()
2691 && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node
2692 {
2693 continue;
2694 }
2695 let encoded_message = MessageBuf::from_encoded(&encoded_msg);
2696 peer.gossip_broadcast_buffer.push_back(encoded_message);
2697 }
2698 },
2699 wire::Message::NodeAnnouncement(ref msg) => {
2700 log_gossip!(
2701 self.logger,
2702 "Sending message to all peers except {:?} or the announced node: {:?}",
2703 except_node,
2704 msg
2705 );
2706 let encoded_msg = encode_msg!(msg);
2707
2708 for (_, peer_mutex) in peers.iter() {
2709 let mut peer = peer_mutex.lock().unwrap();
2710 if !peer.handshake_complete()
2711 || !peer.should_forward_node_announcement(msg.contents.node_id)
2712 {
2713 continue;
2714 }
2715 debug_assert!(peer.their_node_id.is_some());
2716 debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
2717 let their_node_id = peer.their_node_id.map(|p| p.0);
2718 let logger = WithContext::from(&self.logger, their_node_id, None, None);
2719 if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
2720 log_gossip!(
2721 logger,
2722 "Skipping broadcast message to {:?} as its outbound buffer is full",
2723 peer.their_node_id
2724 );
2725 continue;
2726 }
2727 if let Some((_, their_node_id)) = peer.their_node_id {
2728 if their_node_id == msg.contents.node_id {
2729 continue;
2730 }
2731 }
2732 if except_node.is_some()
2733 && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node
2734 {
2735 continue;
2736 }
2737 let encoded_message = MessageBuf::from_encoded(&encoded_msg);
2738 peer.gossip_broadcast_buffer.push_back(encoded_message);
2739 }
2740 },
2741 wire::Message::ChannelUpdate(ref msg) => {
2742 log_gossip!(
2743 self.logger,
2744 "Sending message to all peers except {:?}: {:?}",
2745 except_node,
2746 msg
2747 );
2748 let encoded_msg = encode_msg!(msg);
2749
2750 for (_, peer_mutex) in peers.iter() {
2751 let mut peer = peer_mutex.lock().unwrap();
2752 if !peer.handshake_complete()
2753 || !peer.should_forward_channel_announcement(msg.contents.short_channel_id)
2754 {
2755 continue;
2756 }
2757 debug_assert!(peer.their_node_id.is_some());
2758 debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
2759 let their_node_id = peer.their_node_id.map(|p| p.0);
2760 let logger = WithContext::from(&self.logger, their_node_id, None, None);
2761 if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
2762 log_gossip!(
2763 logger,
2764 "Skipping broadcast message to {:?} as its outbound buffer is full",
2765 peer.their_node_id
2766 );
2767 continue;
2768 }
2769 if except_node.is_some()
2770 && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node
2771 {
2772 continue;
2773 }
2774 let encoded_message = MessageBuf::from_encoded(&encoded_msg);
2775 peer.gossip_broadcast_buffer.push_back(encoded_message);
2776 }
2777 },
2778 _ => {
2779 debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages")
2780 },
2781 }
2782 }
2783
2784 pub fn process_events(&self) {
2807 if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 {
2808 return;
2811 }
2812
2813 loop {
2814 self.update_gossip_backlogged();
2815 let flush_read_disabled =
2816 self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
2817
2818 let mut peers_to_disconnect = new_hash_map();
2819
2820 {
2821 let peers_lock = self.peers.read().unwrap();
2822
2823 let peers = &*peers_lock;
2824 macro_rules! get_peer_for_forwarding {
2825 ($node_id: expr) => {{
2826 if peers_to_disconnect.get($node_id).is_some() {
2827 None
2829 } else {
2830 let descriptor_opt =
2831 self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
2832 match descriptor_opt {
2833 Some(descriptor) => match peers.get(&descriptor) {
2834 Some(peer_mutex) => {
2835 let peer_lock = peer_mutex.lock().unwrap();
2836 if !peer_lock.handshake_complete() {
2837 None
2838 } else {
2839 Some(peer_lock)
2840 }
2841 },
2842 None => {
2843 debug_assert!(false, "Inconsistent peers set state!");
2844 None
2845 },
2846 },
2847 None => None,
2848 }
2849 }
2850 }};
2851 }
2852
2853 let route_handler = &self.message_handler.route_handler;
2854 let chan_handler = &self.message_handler.chan_handler;
2855 let onion_message_handler = &self.message_handler.onion_message_handler;
2856 let custom_message_handler = &self.message_handler.custom_message_handler;
2857 let send_only_message_handler = &self.message_handler.send_only_message_handler;
2858
2859 let mut handle_event = |event, from_chan_handler| {
2862 match event {
2863 MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => {
2864 log_debug!(
2865 WithContext::from(&self.logger, Some(*node_id), None, None),
2866 "Handling SendPeerStorage event in peer_handler for {}",
2867 node_id,
2868 );
2869 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2870 },
2871 MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } => {
2872 log_debug!(
2873 WithContext::from(&self.logger, Some(*node_id), None, None),
2874 "Handling SendPeerStorageRetrieval event in peer_handler for {}",
2875 node_id,
2876 );
2877 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2878 },
2879 MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
2880 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
2881 node_id,
2882 &msg.common_fields.temporary_channel_id);
2883 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2884 },
2885 MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
2886 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
2887 node_id,
2888 &msg.common_fields.temporary_channel_id);
2889 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2890 },
2891 MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
2892 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
2893 node_id,
2894 &msg.common_fields.temporary_channel_id);
2895 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2896 },
2897 MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
2898 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
2899 node_id,
2900 &msg.common_fields.temporary_channel_id);
2901 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2902 },
2903 MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
2904 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
2905 node_id,
2906 &msg.temporary_channel_id,
2907 ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
2908 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2911 },
2912 MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
2913 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
2914 node_id,
2915 &msg.channel_id);
2916 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2917 },
2918 MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
2919 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
2920 node_id,
2921 &msg.channel_id);
2922 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2923 },
2924 MessageSendEvent::SendStfu { ref node_id, ref msg } => {
2925 let logger = WithContext::from(
2926 &self.logger,
2927 Some(*node_id),
2928 Some(msg.channel_id),
2929 None,
2930 );
2931 log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
2932 node_id,
2933 &msg.channel_id);
2934 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2935 },
2936 MessageSendEvent::SendSpliceInit { ref node_id, ref msg } => {
2937 let logger = WithContext::from(
2938 &self.logger,
2939 Some(*node_id),
2940 Some(msg.channel_id),
2941 None,
2942 );
2943 log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
2944 node_id,
2945 &msg.channel_id);
2946 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2947 },
2948 MessageSendEvent::SendSpliceAck { ref node_id, ref msg } => {
2949 let logger = WithContext::from(
2950 &self.logger,
2951 Some(*node_id),
2952 Some(msg.channel_id),
2953 None,
2954 );
2955 log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
2956 node_id,
2957 &msg.channel_id);
2958 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2959 },
2960 MessageSendEvent::SendSpliceLocked { ref node_id, ref msg } => {
2961 let logger = WithContext::from(
2962 &self.logger,
2963 Some(*node_id),
2964 Some(msg.channel_id),
2965 None,
2966 );
2967 log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
2968 node_id,
2969 &msg.channel_id);
2970 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2971 },
2972 MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
2973 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
2974 node_id,
2975 &msg.channel_id);
2976 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2977 },
2978 MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
2979 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
2980 node_id,
2981 &msg.channel_id);
2982 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2983 },
2984 MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
2985 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
2986 node_id,
2987 &msg.channel_id);
2988 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2989 },
2990 MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
2991 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
2992 node_id,
2993 &msg.channel_id);
2994 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2995 },
2996 MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
2997 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
2998 node_id,
2999 &msg.channel_id);
3000 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3001 },
3002 MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
3003 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
3004 node_id,
3005 &msg.channel_id);
3006 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3007 },
3008 MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
3009 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
3010 node_id,
3011 &msg.channel_id);
3012 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3013 },
3014 MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
3015 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
3016 node_id,
3017 &msg.channel_id);
3018 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3019 },
3020 MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
3021 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
3022 node_id,
3023 &msg.channel_id);
3024 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3025 },
3026 MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
3027 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
3028 node_id,
3029 &msg.channel_id);
3030 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3031 },
3032 MessageSendEvent::UpdateHTLCs {
3033 ref node_id,
3034 ref channel_id,
3035 updates:
3036 msgs::CommitmentUpdate {
3037 ref update_add_htlcs,
3038 ref update_fulfill_htlcs,
3039 ref update_fail_htlcs,
3040 ref update_fail_malformed_htlcs,
3041 ref update_fee,
3042 ref commitment_signed,
3043 },
3044 } => {
3045 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(*channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails, {} commits for channel {}",
3046 node_id,
3047 update_add_htlcs.len(),
3048 update_fulfill_htlcs.len(),
3049 update_fail_htlcs.len(),
3050 commitment_signed.len(),
3051 channel_id);
3052 let mut peer = get_peer_for_forwarding!(node_id)?;
3053 for msg in update_fulfill_htlcs {
3054 self.enqueue_message(&mut *peer, msg);
3055 }
3056 for msg in update_fail_htlcs {
3057 self.enqueue_message(&mut *peer, msg);
3058 }
3059 for msg in update_fail_malformed_htlcs {
3060 self.enqueue_message(&mut *peer, msg);
3061 }
3062 for msg in update_add_htlcs {
3063 self.enqueue_message(&mut *peer, msg);
3064 }
3065 if let &Some(ref msg) = update_fee {
3066 self.enqueue_message(&mut *peer, msg);
3067 }
3068 if commitment_signed.len() > 1 {
3069 let msg = msgs::StartBatch {
3070 channel_id: *channel_id,
3071 batch_size: commitment_signed.len() as u16,
3072 message_type: Some(msgs::CommitmentSigned::TYPE),
3073 };
3074 self.enqueue_message(&mut *peer, &msg);
3075 }
3076 for msg in commitment_signed {
3077 self.enqueue_message(&mut *peer, msg);
3078 }
3079 },
3080 MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
3081 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
3082 node_id,
3083 &msg.channel_id);
3084 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3085 },
3086 MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
3087 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
3088 node_id,
3089 &msg.channel_id);
3090 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3091 },
3092 MessageSendEvent::SendClosingComplete { ref node_id, ref msg } => {
3093 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingComplete event in peer_handler for node {} for channel {}",
3094 node_id,
3095 &msg.channel_id);
3096 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3097 },
3098 MessageSendEvent::SendClosingSig { ref node_id, ref msg } => {
3099 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSig event in peer_handler for node {} for channel {}",
3100 node_id,
3101 &msg.channel_id);
3102 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3103 },
3104 MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
3105 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
3106 node_id,
3107 &msg.channel_id);
3108 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3109 },
3110 MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
3111 log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
3112 node_id,
3113 &msg.channel_id);
3114 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3115 },
3116 MessageSendEvent::SendChannelAnnouncement {
3117 ref node_id,
3118 ref msg,
3119 ref update_msg,
3120 } => {
3121 log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
3122 node_id,
3123 msg.contents.short_channel_id);
3124 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3125 self.enqueue_message(
3126 &mut *get_peer_for_forwarding!(node_id)?,
3127 update_msg,
3128 );
3129 },
3130 MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
3131 log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
3132 match route_handler.handle_channel_announcement(None, &msg) {
3133 Ok(_)
3134 | Err(LightningError {
3135 action: msgs::ErrorAction::IgnoreDuplicateGossip,
3136 ..
3137 }) => {
3138 let forward = wire::Message::ChannelAnnouncement(msg);
3139 self.forward_broadcast_msg(
3140 peers,
3141 &forward,
3142 None,
3143 from_chan_handler,
3144 );
3145 },
3146 _ => {},
3147 }
3148 if let Some(msg) = update_msg {
3149 match route_handler.handle_channel_update(None, &msg) {
3150 Ok(_)
3151 | Err(LightningError {
3152 action: msgs::ErrorAction::IgnoreDuplicateGossip,
3153 ..
3154 }) => {
3155 let forward = wire::Message::ChannelUpdate(msg);
3156 self.forward_broadcast_msg(
3157 peers,
3158 &forward,
3159 None,
3160 from_chan_handler,
3161 );
3162 },
3163 _ => {},
3164 }
3165 }
3166 },
3167 MessageSendEvent::BroadcastChannelUpdate { msg } => {
3168 log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents);
3169 match route_handler.handle_channel_update(None, &msg) {
3170 Ok(_)
3171 | Err(LightningError {
3172 action: msgs::ErrorAction::IgnoreDuplicateGossip,
3173 ..
3174 }) => {
3175 let forward = wire::Message::ChannelUpdate(msg);
3176 self.forward_broadcast_msg(
3177 peers,
3178 &forward,
3179 None,
3180 from_chan_handler,
3181 );
3182 },
3183 _ => {},
3184 }
3185 },
3186 MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
3187 log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
3188 match route_handler.handle_node_announcement(None, &msg) {
3189 Ok(_)
3190 | Err(LightningError {
3191 action: msgs::ErrorAction::IgnoreDuplicateGossip,
3192 ..
3193 }) => {
3194 let forward = wire::Message::NodeAnnouncement(msg);
3195 self.forward_broadcast_msg(
3196 peers,
3197 &forward,
3198 None,
3199 from_chan_handler,
3200 );
3201 },
3202 _ => {},
3203 }
3204 },
3205 MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
3206 log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
3207 node_id, msg.contents.short_channel_id);
3208 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3209 },
3210 MessageSendEvent::HandleError { node_id, action } => {
3211 let logger = WithContext::from(&self.logger, Some(node_id), None, None);
3212 match action {
3213 msgs::ErrorAction::DisconnectPeer { msg } => {
3214 if let Some(msg) = msg.as_ref() {
3215 log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
3216 node_id, msg.data);
3217 } else {
3218 log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}",
3219 node_id);
3220 }
3221 let msg = msg.map(|msg| {
3225 wire::Message::<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)
3226 });
3227 peers_to_disconnect.insert(node_id, msg);
3228 },
3229 msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
3230 log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
3231 node_id, msg.data);
3232 peers_to_disconnect
3236 .insert(node_id, Some(wire::Message::Warning(msg)));
3237 },
3238 msgs::ErrorAction::IgnoreAndLog(level) => {
3239 log_given_level!(
3240 logger,
3241 level,
3242 "Received a HandleError event to be ignored for node {}",
3243 node_id,
3244 );
3245 },
3246 msgs::ErrorAction::IgnoreDuplicateGossip => {},
3247 msgs::ErrorAction::IgnoreError => {
3248 log_debug!(
3249 logger,
3250 "Received a HandleError event to be ignored for node {}",
3251 node_id,
3252 );
3253 },
3254 msgs::ErrorAction::SendErrorMessage { ref msg } => {
3255 log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
3256 node_id,
3257 msg.data);
3258 self.enqueue_message(
3259 &mut *get_peer_for_forwarding!(&node_id)?,
3260 msg,
3261 );
3262 },
3263 msgs::ErrorAction::SendWarningMessage {
3264 ref msg,
3265 ref log_level,
3266 } => {
3267 log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
3268 node_id,
3269 msg.data);
3270 self.enqueue_message(
3271 &mut *get_peer_for_forwarding!(&node_id)?,
3272 msg,
3273 );
3274 },
3275 }
3276 },
3277 MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
3278 log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelRangeQuery event in peer_handler for node {} with first_blocknum={}, number_of_blocks={}",
3279 node_id,
3280 msg.first_blocknum,
3281 msg.number_of_blocks);
3282 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3283 },
3284 MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
3285 log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendShortIdsQuery event in peer_handler for node {} with num_scids={}",
3286 node_id,
3287 msg.short_channel_ids.len());
3288 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3289 },
3290 MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
3291 log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
3292 node_id,
3293 msg.short_channel_ids.len(),
3294 msg.first_blocknum,
3295 msg.number_of_blocks,
3296 msg.sync_complete);
3297 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3298 },
3299 MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
3300 log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendGossipTimestampFilter event in peer_handler for node {} with first_timestamp={}, timestamp_range={}",
3301 node_id,
3302 msg.first_timestamp,
3303 msg.timestamp_range);
3304 self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3305 },
3306 }
3307 Some(())
3308 };
3309
3310 let chan_events = chan_handler.get_and_clear_pending_msg_events();
3311 for event in chan_events {
3312 handle_event(event, true);
3313 }
3314
3315 let route_events = route_handler.get_and_clear_pending_msg_events();
3316 for event in route_events {
3317 handle_event(event, false);
3318 }
3319
3320 let send_only_events = send_only_message_handler.get_and_clear_pending_msg_events();
3321 for event in send_only_events {
3322 handle_event(event, false);
3323 }
3324
3325 let onion_msg_events = onion_message_handler.get_and_clear_pending_msg_events();
3326 for event in onion_msg_events {
3327 handle_event(event, false);
3328 }
3329
3330 for (node_id, msg) in custom_message_handler.get_and_clear_pending_msg() {
3331 if peers_to_disconnect.get(&node_id).is_some() {
3332 continue;
3333 }
3334 let mut peer = if let Some(peer) = get_peer_for_forwarding!(&node_id) {
3335 peer
3336 } else {
3337 continue;
3338 };
3339 self.enqueue_message(&mut peer, &msg);
3340 }
3341
3342 for (descriptor, peer_mutex) in peers.iter() {
3343 let mut peer = peer_mutex.lock().unwrap();
3344 if flush_read_disabled {
3345 peer.received_channel_announce_since_backlogged = false;
3346 }
3347 self.do_attempt_write_data(
3348 &mut (*descriptor).clone(),
3349 &mut *peer,
3350 flush_read_disabled,
3351 );
3352 }
3353 }
3354 if !peers_to_disconnect.is_empty() {
3355 let mut peers_lock = self.peers.write().unwrap();
3356 let peers = &mut *peers_lock;
3357 for (node_id, msg) in peers_to_disconnect.drain() {
3358 let descriptor_opt =
3364 self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
3365 if let Some(mut descriptor) = descriptor_opt {
3366 if let Some(peer_mutex) = peers.remove(&descriptor) {
3367 let mut peer = peer_mutex.lock().unwrap();
3368 if let Some(msg) = msg {
3369 self.enqueue_message(&mut *peer, &msg);
3370 self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
3373 }
3374 self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
3375 } else {
3376 debug_assert!(false, "Missing connection for peer");
3377 }
3378 }
3379 }
3380 }
3381
3382 if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 {
3383 self.event_processing_state.store(1, Ordering::Release);
3386 continue;
3387 }
3388 break;
3389 }
3390 }
3391
3392 pub fn socket_disconnected(&self, descriptor: &Descriptor) {
3394 self.disconnect_event_internal(descriptor, "the socket was disconnected");
3395 }
3396
3397 fn do_disconnect(&self, mut descriptor: Descriptor, peer: &Peer, reason: &'static str) {
3398 if !peer.handshake_complete() {
3399 log_trace!(
3400 self.logger,
3401 "Disconnecting peer which hasn't completed handshake due to {}",
3402 reason
3403 );
3404 descriptor.disconnect_socket();
3405 return;
3406 }
3407
3408 debug_assert!(peer.their_node_id.is_some());
3409 if let Some((node_id, _)) = peer.their_node_id {
3410 log_trace!(
3411 WithContext::from(&self.logger, Some(node_id), None, None),
3412 "Disconnecting peer with id {} due to {}",
3413 node_id,
3414 reason
3415 );
3416 self.message_handler.route_handler.peer_disconnected(node_id);
3417 self.message_handler.chan_handler.peer_disconnected(node_id);
3418 self.message_handler.onion_message_handler.peer_disconnected(node_id);
3419 self.message_handler.custom_message_handler.peer_disconnected(node_id);
3420 self.message_handler.send_only_message_handler.peer_disconnected(node_id);
3421 }
3422 descriptor.disconnect_socket();
3423 }
3424
3425 fn disconnect_event_internal(&self, descriptor: &Descriptor, reason: &'static str) {
3426 let mut peers = self.peers.write().unwrap();
3427 let peer_option = peers.remove(descriptor);
3428 match peer_option {
3429 None => {
3430 },
3434 Some(peer_lock) => {
3435 let peer = peer_lock.lock().unwrap();
3436 if let Some((node_id, _)) = peer.their_node_id {
3437 let logger = WithContext::from(&self.logger, Some(node_id), None, None);
3438 log_trace!(
3439 logger,
3440 "Handling disconnection of peer {} because {}",
3441 node_id,
3442 reason
3443 );
3444 let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
3445 debug_assert!(removed.is_some(), "descriptor maps should be consistent");
3446 if !peer.handshake_complete() {
3447 return;
3448 }
3449 self.message_handler.route_handler.peer_disconnected(node_id);
3450 self.message_handler.chan_handler.peer_disconnected(node_id);
3451 self.message_handler.onion_message_handler.peer_disconnected(node_id);
3452 self.message_handler.custom_message_handler.peer_disconnected(node_id);
3453 self.message_handler.send_only_message_handler.peer_disconnected(node_id);
3454 }
3455 },
3456 };
3457 }
3458
3459 pub fn disconnect_by_node_id(&self, node_id: PublicKey) {
3466 let mut peers_lock = self.peers.write().unwrap();
3467 if let Some(descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
3468 let peer_opt = peers_lock.remove(&descriptor);
3469 if let Some(peer_mutex) = peer_opt {
3470 self.do_disconnect(descriptor, &*peer_mutex.lock().unwrap(), "client request");
3471 } else {
3472 debug_assert!(false, "node_id_to_descriptor thought we had a peer");
3473 }
3474 }
3475 }
3476
3477 pub fn disconnect_all_peers(&self) {
3481 let mut peers_lock = self.peers.write().unwrap();
3482 self.node_id_to_descriptor.lock().unwrap().clear();
3483 let peers = &mut *peers_lock;
3484 for (descriptor, peer_mutex) in peers.drain() {
3485 self.do_disconnect(
3486 descriptor,
3487 &*peer_mutex.lock().unwrap(),
3488 "client request to disconnect all peers",
3489 );
3490 }
3491 }
3492
3493 fn maybe_send_extra_ping(&self, peer: &mut Peer) {
3497 if peer.awaiting_pong_timer_tick_intervals == 0 {
3498 peer.awaiting_pong_timer_tick_intervals = -1;
3499 let ping = msgs::Ping { ponglen: 0, byteslen: 64 };
3500 self.enqueue_message(peer, &ping);
3501 }
3502 }
3503
3504 pub fn timer_tick_occurred(&self) {
3516 let mut descriptors_needing_disconnect = Vec::new();
3517 {
3518 let peers_lock = self.peers.read().unwrap();
3519
3520 self.update_gossip_backlogged();
3521 let flush_read_disabled =
3522 self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
3523
3524 for (descriptor, peer_mutex) in peers_lock.iter() {
3525 let mut peer = peer_mutex.lock().unwrap();
3526 if flush_read_disabled {
3527 peer.received_channel_announce_since_backlogged = false;
3528 }
3529
3530 if !peer.handshake_complete() {
3531 if peer.awaiting_pong_timer_tick_intervals != 0 {
3536 descriptors_needing_disconnect.push(descriptor.clone());
3537 } else {
3538 peer.awaiting_pong_timer_tick_intervals = 1;
3539 }
3540 continue;
3541 }
3542 debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
3543 debug_assert!(peer.their_node_id.is_some());
3544
3545 loop {
3547 if peer.awaiting_pong_timer_tick_intervals == -1 {
3548 peer.awaiting_pong_timer_tick_intervals = 1;
3550 peer.received_message_since_timer_tick = false;
3551 break;
3552 }
3553 let not_recently_active = peer.awaiting_pong_timer_tick_intervals > 0
3554 && !peer.received_message_since_timer_tick;
3555 let reached_threshold_intervals = peer.awaiting_pong_timer_tick_intervals
3556 as u64
3557 > MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64;
3558 if not_recently_active || reached_threshold_intervals {
3559 descriptors_needing_disconnect.push(descriptor.clone());
3560 break;
3561 }
3562 peer.received_message_since_timer_tick = false;
3563
3564 if peer.awaiting_pong_timer_tick_intervals > 0 {
3565 peer.awaiting_pong_timer_tick_intervals += 1;
3566 break;
3567 }
3568
3569 peer.awaiting_pong_timer_tick_intervals = 1;
3570 let ping = msgs::Ping { ponglen: 0, byteslen: 64 };
3571 self.enqueue_message(&mut *peer, &ping);
3572 break;
3573 }
3574 self.do_attempt_write_data(
3575 &mut (descriptor.clone()),
3576 &mut *peer,
3577 flush_read_disabled,
3578 );
3579 }
3580 }
3581
3582 if !descriptors_needing_disconnect.is_empty() {
3583 {
3584 let mut peers_lock = self.peers.write().unwrap();
3585 for descriptor in descriptors_needing_disconnect {
3586 if let Some(peer_mutex) = peers_lock.remove(&descriptor) {
3587 let peer = peer_mutex.lock().unwrap();
3588 if let Some((node_id, _)) = peer.their_node_id {
3589 self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
3590 }
3591 self.do_disconnect(descriptor, &*peer, "ping/handshake timeout");
3592 }
3593 }
3594 }
3595 }
3596 }
3597
3598 #[allow(dead_code)]
3599 const HALF_MESSAGE_IS_ADDRS: u32 =
3604 ::core::u16::MAX as u32 / (SocketAddress::MAX_LEN as u32 + 1) / 2;
3605 #[allow(dead_code)]
3606 const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 100;
3609
3610 pub fn broadcast_node_announcement(
3626 &self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec<SocketAddress>,
3627 ) {
3628 if addresses.len() > 100 {
3629 panic!("More than half the message size was taken up by public addresses!");
3630 }
3631
3632 addresses.sort_by_key(|addr| addr.get_id());
3635
3636 let features = self.message_handler.chan_handler.provided_node_features()
3637 | self.message_handler.route_handler.provided_node_features()
3638 | self.message_handler.onion_message_handler.provided_node_features()
3639 | self.message_handler.custom_message_handler.provided_node_features()
3640 | self.message_handler.send_only_message_handler.provided_node_features();
3641 let announcement = msgs::UnsignedNodeAnnouncement {
3642 features,
3643 timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel),
3644 node_id: NodeId::from_pubkey(&self.node_signer.get_node_id(Recipient::Node).unwrap()),
3645 rgb,
3646 alias: NodeAlias(alias),
3647 addresses,
3648 excess_address_data: Vec::new(),
3649 excess_data: Vec::new(),
3650 };
3651 let node_announce_sig = match self
3652 .node_signer
3653 .sign_gossip_message(msgs::UnsignedGossipMessage::NodeAnnouncement(&announcement))
3654 {
3655 Ok(sig) => sig,
3656 Err(_) => {
3657 log_error!(self.logger, "Failed to generate signature for node_announcement");
3658 return;
3659 },
3660 };
3661
3662 let msg = msgs::NodeAnnouncement { signature: node_announce_sig, contents: announcement };
3663
3664 log_debug!(
3665 self.logger,
3666 "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler."
3667 );
3668 let _ = self.message_handler.route_handler.handle_node_announcement(None, &msg);
3669 self.forward_broadcast_msg(
3670 &*self.peers.read().unwrap(),
3671 &wire::Message::NodeAnnouncement(msg),
3672 None,
3673 true,
3674 );
3675 }
3676}
3677
3678fn is_gossip_msg(type_id: u16) -> bool {
3679 match type_id {
3680 msgs::ChannelAnnouncement::TYPE
3681 | msgs::ChannelUpdate::TYPE
3682 | msgs::NodeAnnouncement::TYPE
3683 | msgs::QueryChannelRange::TYPE
3684 | msgs::ReplyChannelRange::TYPE
3685 | msgs::QueryShortChannelIds::TYPE
3686 | msgs::ReplyShortChannelIdsEnd::TYPE => true,
3687 _ => false,
3688 }
3689}
3690
3691#[cfg(test)]
3692mod tests {
3693 use super::*;
3694
3695 use crate::io;
3696 use crate::ln::msgs::{Init, LightningError, SocketAddress};
3697 use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
3698 use crate::ln::types::ChannelId;
3699 use crate::ln::{msgs, wire};
3700 use crate::sign::{NodeSigner, Recipient};
3701 use crate::types::features::{InitFeatures, NodeFeatures};
3702 use crate::util::test_utils;
3703
3704 use bitcoin::constants::ChainHash;
3705 use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
3706 use bitcoin::Network;
3707
3708 use crate::sync::{Arc, Mutex};
3709 use core::convert::Infallible;
3710 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3711
3712 #[allow(unused_imports)]
3713 use crate::prelude::*;
3714
3715 #[derive(Clone)]
3716 struct FileDescriptor {
3717 fd: u16,
3718 hang_writes: Arc<AtomicBool>,
3719 outbound_data: Arc<Mutex<Vec<u8>>>,
3720 disconnect: Arc<AtomicBool>,
3721 }
3722 impl PartialEq for FileDescriptor {
3723 fn eq(&self, other: &Self) -> bool {
3724 self.fd == other.fd
3725 }
3726 }
3727 impl Eq for FileDescriptor {}
3728 impl core::hash::Hash for FileDescriptor {
3729 fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
3730 self.fd.hash(hasher)
3731 }
3732 }
3733
3734 impl SocketDescriptor for FileDescriptor {
3735 fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize {
3736 if self.hang_writes.load(Ordering::Acquire) {
3737 0
3738 } else {
3739 self.outbound_data.lock().unwrap().extend_from_slice(data);
3740 data.len()
3741 }
3742 }
3743
3744 fn disconnect_socket(&mut self) {
3745 self.disconnect.store(true, Ordering::Release);
3746 }
3747 }
3748
3749 impl FileDescriptor {
3750 fn new(fd: u16) -> Self {
3751 Self {
3752 fd,
3753 hang_writes: Arc::new(AtomicBool::new(false)),
3754 outbound_data: Arc::new(Mutex::new(Vec::new())),
3755 disconnect: Arc::new(AtomicBool::new(false)),
3756 }
3757 }
3758 }
3759
3760 struct PeerManagerCfg {
3761 chan_handler: test_utils::TestChannelMessageHandler,
3762 routing_handler: test_utils::TestRoutingMessageHandler,
3763 custom_handler: TestCustomMessageHandler,
3764 send_only_handler: TestBaseMsgHandler,
3765 logger: test_utils::TestLogger,
3766 node_signer: test_utils::TestNodeSigner,
3767 }
3768
3769 struct TestCustomMessageHandler {
3770 features: InitFeatures,
3771 conn_tracker: test_utils::ConnectionTracker,
3772 }
3773
3774 impl TestCustomMessageHandler {
3775 fn new(features: InitFeatures) -> Self {
3776 Self { features, conn_tracker: test_utils::ConnectionTracker::new() }
3777 }
3778 }
3779
3780 impl wire::CustomMessageReader for TestCustomMessageHandler {
3781 type CustomMessage = Infallible;
3782 fn read<R: io::Read>(
3783 &self, _: u16, _: &mut R,
3784 ) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
3785 Ok(None)
3786 }
3787 }
3788
3789 impl CustomMessageHandler for TestCustomMessageHandler {
3790 fn handle_custom_message(&self, _: Infallible, _: PublicKey) -> Result<(), LightningError> {
3791 unreachable!();
3792 }
3793
3794 fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
3795 Vec::new()
3796 }
3797
3798 fn peer_disconnected(&self, their_node_id: PublicKey) {
3799 self.conn_tracker.peer_disconnected(their_node_id);
3800 }
3801
3802 fn peer_connected(
3803 &self, their_node_id: PublicKey, _msg: &Init, _inbound: bool,
3804 ) -> Result<(), ()> {
3805 self.conn_tracker.peer_connected(their_node_id)
3806 }
3807
3808 fn provided_node_features(&self) -> NodeFeatures {
3809 NodeFeatures::empty()
3810 }
3811
3812 fn provided_init_features(&self, _: PublicKey) -> InitFeatures {
3813 self.features.clone()
3814 }
3815 }
3816
3817 struct TestBaseMsgHandler(test_utils::ConnectionTracker);
3818
3819 impl BaseMessageHandler for TestBaseMsgHandler {
3820 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
3821 Vec::new()
3822 }
3823
3824 fn peer_disconnected(&self, their_node_id: PublicKey) {
3825 self.0.peer_disconnected(their_node_id);
3826 }
3827
3828 fn peer_connected(
3829 &self, their_node_id: PublicKey, _msg: &Init, _inbound: bool,
3830 ) -> Result<(), ()> {
3831 self.0.peer_connected(their_node_id)
3832 }
3833
3834 fn provided_node_features(&self) -> NodeFeatures {
3835 NodeFeatures::empty()
3836 }
3837
3838 fn provided_init_features(&self, _: PublicKey) -> InitFeatures {
3839 InitFeatures::empty()
3840 }
3841 }
3842
3843 impl SendOnlyMessageHandler for TestBaseMsgHandler {}
3844
3845 fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
3846 let mut cfgs = Vec::new();
3847 for i in 0..peer_count {
3848 let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
3849 let features = {
3850 let mut feature_bits = vec![0u8; 33];
3851 feature_bits[32] = 0b00000001;
3852 InitFeatures::from_le_bytes(feature_bits)
3853 };
3854 cfgs.push(PeerManagerCfg {
3855 chan_handler: test_utils::TestChannelMessageHandler::new(
3856 ChainHash::using_genesis_block(Network::Testnet),
3857 ),
3858 logger: test_utils::TestLogger::with_id(i.to_string()),
3859 routing_handler: test_utils::TestRoutingMessageHandler::new(),
3860 custom_handler: TestCustomMessageHandler::new(features),
3861 send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()),
3862 node_signer: test_utils::TestNodeSigner::new(node_secret),
3863 });
3864 }
3865
3866 cfgs
3867 }
3868
3869 fn create_feature_incompatible_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
3870 let mut cfgs = Vec::new();
3871 for i in 0..peer_count {
3872 let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
3873 let features = {
3874 let mut feature_bits = vec![0u8; 33 + i + 1];
3875 feature_bits[33 + i] = 0b00000001;
3876 InitFeatures::from_le_bytes(feature_bits)
3877 };
3878 cfgs.push(PeerManagerCfg {
3879 chan_handler: test_utils::TestChannelMessageHandler::new(
3880 ChainHash::using_genesis_block(Network::Testnet),
3881 ),
3882 logger: test_utils::TestLogger::new(),
3883 routing_handler: test_utils::TestRoutingMessageHandler::new(),
3884 custom_handler: TestCustomMessageHandler::new(features),
3885 send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()),
3886 node_signer: test_utils::TestNodeSigner::new(node_secret),
3887 });
3888 }
3889
3890 cfgs
3891 }
3892
3893 fn create_chain_incompatible_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
3894 let mut cfgs = Vec::new();
3895 for i in 0..peer_count {
3896 let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
3897 let features = InitFeatures::from_le_bytes(vec![0u8; 33]);
3898 let network = ChainHash::from(&[i as u8; 32]);
3899 cfgs.push(PeerManagerCfg {
3900 chan_handler: test_utils::TestChannelMessageHandler::new(network),
3901 logger: test_utils::TestLogger::new(),
3902 routing_handler: test_utils::TestRoutingMessageHandler::new(),
3903 custom_handler: TestCustomMessageHandler::new(features),
3904 send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()),
3905 node_signer: test_utils::TestNodeSigner::new(node_secret),
3906 });
3907 }
3908
3909 cfgs
3910 }
3911
3912 fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<TestPeer<'a>> {
3913 let mut peers = Vec::new();
3914 for i in 0..peer_count {
3915 let ephemeral_bytes = [i as u8; 32];
3916 let msg_handler = MessageHandler {
3917 chan_handler: &cfgs[i].chan_handler,
3918 route_handler: &cfgs[i].routing_handler,
3919 onion_message_handler: IgnoringMessageHandler {},
3920 custom_message_handler: &cfgs[i].custom_handler,
3921 send_only_message_handler: &cfgs[i].send_only_handler,
3922 };
3923 let peer = PeerManager::new(
3924 msg_handler,
3925 0,
3926 &ephemeral_bytes,
3927 &cfgs[i].logger,
3928 &cfgs[i].node_signer,
3929 );
3930 peers.push(peer);
3931 }
3932
3933 peers
3934 }
3935
3936 type TestPeer<'a> = PeerManager<
3937 FileDescriptor,
3938 &'a test_utils::TestChannelMessageHandler,
3939 &'a test_utils::TestRoutingMessageHandler,
3940 IgnoringMessageHandler,
3941 &'a test_utils::TestLogger,
3942 &'a TestCustomMessageHandler,
3943 &'a test_utils::TestNodeSigner,
3944 &'a TestBaseMsgHandler,
3945 >;
3946
3947 fn try_establish_connection<'a>(
3948 peer_a: &TestPeer<'a>, peer_b: &TestPeer<'a>,
3949 ) -> (FileDescriptor, FileDescriptor, Result<(), PeerHandleError>, Result<(), PeerHandleError>)
3950 {
3951 let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
3952 let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
3953
3954 static FD_COUNTER: AtomicUsize = AtomicUsize::new(0);
3955 let fd = FD_COUNTER.fetch_add(1, Ordering::Relaxed) as u16;
3956
3957 let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
3958 let mut fd_a = FileDescriptor::new(fd);
3959 let mut fd_b = FileDescriptor::new(fd);
3960
3961 let initial_data =
3962 peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
3963 peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
3964 peer_a.read_event(&mut fd_a, &initial_data).unwrap();
3965 peer_a.process_events();
3966
3967 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
3968 peer_b.read_event(&mut fd_b, &a_data).unwrap();
3969
3970 peer_b.process_events();
3971 let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
3972 let a_refused = peer_a.read_event(&mut fd_a, &b_data);
3973
3974 peer_a.process_events();
3975 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
3976 let b_refused = peer_b.read_event(&mut fd_b, &a_data);
3977
3978 (fd_a, fd_b, a_refused, b_refused)
3979 }
3980
3981 fn establish_connection<'a>(
3982 peer_a: &TestPeer<'a>, peer_b: &TestPeer<'a>,
3983 ) -> (FileDescriptor, FileDescriptor) {
3984 let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
3985 let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
3986
3987 let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
3988 let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
3989
3990 let features_a = peer_a.init_features(id_b);
3991 let features_b = peer_b.init_features(id_a);
3992
3993 let (fd_a, fd_b, a_refused, b_refused) = try_establish_connection(peer_a, peer_b);
3994
3995 a_refused.unwrap();
3996 b_refused.unwrap();
3997
3998 assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().counterparty_node_id, id_b);
3999 assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().socket_address, Some(addr_b));
4000 assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().init_features, features_b);
4001 assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().counterparty_node_id, id_a);
4002 assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().socket_address, Some(addr_a));
4003 assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().init_features, features_a);
4004 (fd_a.clone(), fd_b.clone())
4005 }
4006
4007 #[test]
4008 #[cfg(feature = "std")]
4009 fn fuzz_threaded_connections() {
4010 let cfgs = Arc::new(create_peermgr_cfgs(2));
4014 let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));
4016
4017 let start_time = std::time::Instant::now();
4018 macro_rules! spawn_thread {
4019 ($id: expr) => {{
4020 let peers = Arc::clone(&peers);
4021 let cfgs = Arc::clone(&cfgs);
4022 std::thread::spawn(move || {
4023 let mut ctr = 0;
4024 while start_time.elapsed() < std::time::Duration::from_secs(1) {
4025 let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
4026 let mut fd_a = FileDescriptor::new($id + ctr * 3);
4027 let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
4028 let mut fd_b = FileDescriptor::new($id + ctr * 3);
4029 let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
4030 let initial_data = peers[1]
4031 .new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone()))
4032 .unwrap();
4033 peers[0]
4034 .new_inbound_connection(fd_a.clone(), Some(addr_b.clone()))
4035 .unwrap();
4036 if peers[0].read_event(&mut fd_a, &initial_data).is_err() {
4037 break;
4038 }
4039
4040 while start_time.elapsed() < std::time::Duration::from_secs(1) {
4041 peers[0].process_events();
4042 if fd_a.disconnect.load(Ordering::Acquire) {
4043 break;
4044 }
4045 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4046 if peers[1].read_event(&mut fd_b, &a_data).is_err() {
4047 break;
4048 }
4049
4050 peers[1].process_events();
4051 if fd_b.disconnect.load(Ordering::Acquire) {
4052 break;
4053 }
4054 let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4055 if peers[0].read_event(&mut fd_a, &b_data).is_err() {
4056 break;
4057 }
4058
4059 let node_id_1 =
4060 peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
4061 let msg_event_1 = MessageSendEvent::SendShutdown {
4062 node_id: node_id_1,
4063 msg: msgs::Shutdown {
4064 channel_id: ChannelId::new_zero(),
4065 scriptpubkey: bitcoin::ScriptBuf::new(),
4066 },
4067 };
4068 cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_event_1);
4069
4070 let node_id_0 =
4071 peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
4072 let msg_event_0 = MessageSendEvent::SendShutdown {
4073 node_id: node_id_0,
4074 msg: msgs::Shutdown {
4075 channel_id: ChannelId::new_zero(),
4076 scriptpubkey: bitcoin::ScriptBuf::new(),
4077 },
4078 };
4079 cfgs[1].chan_handler.pending_events.lock().unwrap().push(msg_event_0);
4080
4081 if ctr % 2 == 0 {
4082 peers[0].timer_tick_occurred();
4083 peers[1].timer_tick_occurred();
4084 }
4085 }
4086
4087 peers[0].socket_disconnected(&fd_a);
4088 peers[1].socket_disconnected(&fd_b);
4089 ctr += 1;
4090 std::thread::sleep(std::time::Duration::from_micros(1));
4091 }
4092 })
4093 }};
4094 }
4095 let thrd_a = spawn_thread!(1);
4096 let thrd_b = spawn_thread!(2);
4097
4098 thrd_a.join().unwrap();
4099 thrd_b.join().unwrap();
4100 }
4101
4102 #[test]
4103 fn test_feature_incompatible_peers() {
4104 let cfgs = create_peermgr_cfgs(2);
4105 let incompatible_cfgs = create_feature_incompatible_peermgr_cfgs(2);
4106
4107 let peers = create_network(2, &cfgs);
4108 let incompatible_peers = create_network(2, &incompatible_cfgs);
4109 let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
4110 for (peer_a, peer_b) in peer_pairs.iter() {
4111 let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
4112 let mut fd_a = FileDescriptor::new(1);
4113 let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
4114 let mut fd_b = FileDescriptor::new(1);
4115 let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
4116 let initial_data =
4117 peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
4118 peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4119 peer_a.read_event(&mut fd_a, &initial_data).unwrap();
4120 peer_a.process_events();
4121
4122 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4123 peer_b.read_event(&mut fd_b, &a_data).unwrap();
4124
4125 peer_b.process_events();
4126 let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4127
4128 assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
4130 }
4131 }
4132
4133 #[test]
4134 fn test_chain_incompatible_peers() {
4135 let cfgs = create_peermgr_cfgs(2);
4136 let incompatible_cfgs = create_chain_incompatible_peermgr_cfgs(2);
4137
4138 let peers = create_network(2, &cfgs);
4139 let incompatible_peers = create_network(2, &incompatible_cfgs);
4140 let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
4141 for (peer_a, peer_b) in peer_pairs.iter() {
4142 let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
4143 let mut fd_a = FileDescriptor::new(1);
4144 let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
4145 let mut fd_b = FileDescriptor::new(1);
4146 let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
4147 let initial_data =
4148 peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
4149 peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4150 peer_a.read_event(&mut fd_a, &initial_data).unwrap();
4151 peer_a.process_events();
4152
4153 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4154 peer_b.read_event(&mut fd_b, &a_data).unwrap();
4155
4156 peer_b.process_events();
4157 let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4158
4159 assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
4161 }
4162 }
4163
4164 #[test]
4165 fn test_disconnect_peer() {
4166 let cfgs = create_peermgr_cfgs(2);
4169 let peers = create_network(2, &cfgs);
4170 establish_connection(&peers[0], &peers[1]);
4171 {
4172 let peers_len = peers[0].peers.read().unwrap().len();
4173 assert_eq!(peers_len, 1);
4174 }
4175
4176 let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
4177 cfgs[0].chan_handler.pending_events.lock().unwrap().push(MessageSendEvent::HandleError {
4178 node_id: their_id,
4179 action: msgs::ErrorAction::DisconnectPeer { msg: None },
4180 });
4181
4182 peers[0].process_events();
4183 {
4184 let peers_len = peers[0].peers.read().unwrap().len();
4185 assert_eq!(peers_len, 0);
4186 }
4187 }
4188
4189 #[test]
4190 fn test_send_simple_msg() {
4191 let cfgs = create_peermgr_cfgs(2);
4194 let a_chan_handler = test_utils::TestChannelMessageHandler::new(
4195 ChainHash::using_genesis_block(Network::Testnet),
4196 );
4197 let b_chan_handler = test_utils::TestChannelMessageHandler::new(
4198 ChainHash::using_genesis_block(Network::Testnet),
4199 );
4200 let mut peers = create_network(2, &cfgs);
4201 let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4202 {
4203 let peers_len = peers[0].peers.read().unwrap().len();
4204 assert_eq!(peers_len, 1);
4205 }
4206
4207 let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
4208
4209 let msg = msgs::Shutdown {
4210 channel_id: ChannelId::from_bytes([42; 32]),
4211 scriptpubkey: bitcoin::ScriptBuf::new(),
4212 };
4213 a_chan_handler
4214 .pending_events
4215 .lock()
4216 .unwrap()
4217 .push(MessageSendEvent::SendShutdown { node_id: their_id, msg: msg.clone() });
4218 peers[0].message_handler.chan_handler = &a_chan_handler;
4219
4220 b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg));
4221 peers[1].message_handler.chan_handler = &b_chan_handler;
4222
4223 peers[0].process_events();
4224
4225 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4226 peers[1].read_event(&mut fd_b, &a_data).unwrap();
4227 }
4228
4229 #[test]
4230 fn test_non_init_first_msg() {
4231 let cfgs = create_peermgr_cfgs(2);
4236 let peers = create_network(2, &cfgs);
4237
4238 let mut fd_dup = FileDescriptor::new(3);
4239 let addr_dup = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1003 };
4240 let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
4241 peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
4242
4243 let mut dup_encryptor =
4244 PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap());
4245 let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx);
4246 peers[0].read_event(&mut fd_dup, &initial_data).unwrap();
4247 peers[0].process_events();
4248
4249 let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0);
4250 let (act_three, _) =
4251 dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap();
4252 peers[0].read_event(&mut fd_dup, &act_three).unwrap();
4253
4254 let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 };
4255 let msg_bytes = dup_encryptor.encrypt_message(¬_init_msg);
4256 assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err());
4257 }
4258
4259 #[test]
4260 fn test_disconnect_all_peer() {
4261 let cfgs = create_peermgr_cfgs(2);
4264 let peers = create_network(2, &cfgs);
4265 establish_connection(&peers[0], &peers[1]);
4266 {
4267 let peers_len = peers[0].peers.read().unwrap().len();
4268 assert_eq!(peers_len, 1);
4269 }
4270
4271 peers[0].disconnect_all_peers();
4272 {
4273 let peers_len = peers[0].peers.read().unwrap().len();
4274 assert_eq!(peers_len, 0);
4275 }
4276 }
4277
4278 #[test]
4279 fn test_timer_tick_occurred() {
4280 let cfgs = create_peermgr_cfgs(2);
4282 let peers = create_network(2, &cfgs);
4283 establish_connection(&peers[0], &peers[1]);
4284 {
4285 let peers_len = peers[0].peers.read().unwrap().len();
4286 assert_eq!(peers_len, 1);
4287 }
4288
4289 peers[0].timer_tick_occurred();
4291 peers[0].process_events();
4292 {
4293 let peers_len = peers[0].peers.read().unwrap().len();
4294 assert_eq!(peers_len, 1);
4295 }
4296
4297 peers[0].timer_tick_occurred();
4299 peers[0].process_events();
4300 {
4301 let peers_len = peers[0].peers.read().unwrap().len();
4302 assert_eq!(peers_len, 0);
4303 }
4304 }
4305
4306 fn do_test_peer_connected_error_disconnects(handler: usize) {
4307 let cfgs = create_peermgr_cfgs(2);
4311 let peers = create_network(2, &cfgs);
4312
4313 let chan_handler = peers[handler & 1].message_handler.chan_handler;
4314 let route_handler = peers[handler & 1].message_handler.route_handler;
4315 let custom_message_handler = peers[handler & 1].message_handler.custom_message_handler;
4316 let send_only_msg_handler = peers[handler & 1].message_handler.send_only_message_handler;
4317
4318 match handler & !1 {
4319 0 => {
4320 chan_handler.conn_tracker.fail_connections.store(true, Ordering::Release);
4321 },
4322 2 => {
4323 route_handler.conn_tracker.fail_connections.store(true, Ordering::Release);
4324 },
4325 4 => {
4326 custom_message_handler.conn_tracker.fail_connections.store(true, Ordering::Release);
4327 },
4328 6 => {
4329 send_only_msg_handler.0.fail_connections.store(true, Ordering::Release);
4330 },
4331 _ => panic!(),
4332 }
4333 let (_sd1, _sd2, a_refused, b_refused) = try_establish_connection(&peers[0], &peers[1]);
4334 if handler & 1 == 0 {
4335 assert!(a_refused.is_err());
4336 assert!(peers[0].list_peers().is_empty());
4337 } else {
4338 assert!(b_refused.is_err());
4339 assert!(peers[1].list_peers().is_empty());
4340 }
4341 assert!(
4343 chan_handler.conn_tracker.had_peers.load(Ordering::Acquire)
4344 || route_handler.conn_tracker.had_peers.load(Ordering::Acquire)
4345 || custom_message_handler.conn_tracker.had_peers.load(Ordering::Acquire)
4346 || send_only_msg_handler.0.had_peers.load(Ordering::Acquire)
4347 );
4348 assert!(chan_handler.conn_tracker.connected_peers.lock().unwrap().is_empty());
4350 assert!(route_handler.conn_tracker.connected_peers.lock().unwrap().is_empty());
4351 assert!(custom_message_handler.conn_tracker.connected_peers.lock().unwrap().is_empty());
4352 assert!(send_only_msg_handler.0.connected_peers.lock().unwrap().is_empty());
4353 }
4354
4355 #[test]
4356 fn test_peer_connected_error_disconnects() {
4357 for i in 0..8 {
4358 do_test_peer_connected_error_disconnects(i);
4359 }
4360 }
4361
4362 #[test]
4363 fn test_do_attempt_write_data() {
4364 let cfgs = create_peermgr_cfgs(2);
4366 cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
4367 cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
4368 cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4369 cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4370 let peers = create_network(2, &cfgs);
4371
4372 let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4379
4380 for _ in 0..150 / super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
4383 peers[1].process_events();
4384 let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4385 assert!(!a_read_data.is_empty());
4386
4387 peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
4388 peers[0].process_events();
4389
4390 let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4391 assert!(!b_read_data.is_empty());
4392 peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
4393
4394 peers[0].process_events();
4395 assert_eq!(
4396 fd_a.outbound_data.lock().unwrap().len(),
4397 0,
4398 "Until A receives data, it shouldn't send more messages"
4399 );
4400 }
4401
4402 assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
4405 assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
4406 assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
4407 assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
4408 }
4409
4410 #[test]
4411 fn test_forward_while_syncing() {
4412 use crate::ln::peer_handler::tests::test_utils::get_dummy_channel_update;
4413
4414 let cfgs = create_peermgr_cfgs(2);
4416 cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
4417 cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
4418 cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4419 cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4420 let peers = create_network(2, &cfgs);
4421
4422 let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4423
4424 for _ in 0..150 {
4426 peers[1].process_events();
4427 let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4428 assert!(!a_read_data.is_empty());
4429
4430 peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
4431 peers[0].process_events();
4432
4433 let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4434 assert!(!b_read_data.is_empty());
4435 peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
4436
4437 peers[0].process_events();
4438 assert_eq!(
4439 fd_a.outbound_data.lock().unwrap().len(),
4440 0,
4441 "Until A receives data, it shouldn't send more messages"
4442 );
4443 }
4444
4445 fd_b.hang_writes.store(true, Ordering::Relaxed);
4448 peers[1].process_events();
4449
4450 {
4451 let peer_lock = peers[1].peers.read().unwrap();
4452 let peer = peer_lock.get(&fd_b).unwrap().lock().unwrap();
4453 assert_eq!(peer.pending_outbound_buffer.len(), 1);
4454 assert_eq!(peer.gossip_broadcast_buffer.len(), 0);
4455 }
4456
4457 let msg_100 = get_dummy_channel_update(100);
4461 let msg_ev_100 = MessageSendEvent::BroadcastChannelUpdate { msg: msg_100.clone() };
4462
4463 let msg_5000 = get_dummy_channel_update(5000);
4464 let msg_ev_5000 = MessageSendEvent::BroadcastChannelUpdate { msg: msg_5000 };
4465
4466 fd_a.hang_writes.store(true, Ordering::Relaxed);
4467
4468 cfgs[1].routing_handler.pending_events.lock().unwrap().push(msg_ev_100);
4469 cfgs[1].routing_handler.pending_events.lock().unwrap().push(msg_ev_5000);
4470 peers[1].process_events();
4471
4472 {
4473 let peer_lock = peers[1].peers.read().unwrap();
4474 let peer = peer_lock.get(&fd_b).unwrap().lock().unwrap();
4475 assert_eq!(peer.pending_outbound_buffer.len(), 1);
4476 assert_eq!(peer.gossip_broadcast_buffer.len(), 1);
4477
4478 let pending_msg = &peer.gossip_broadcast_buffer[0];
4479 let expected = encode_msg!(&msg_100);
4480 assert_eq!(expected, pending_msg.fetch_encoded_msg_with_type_pfx());
4481 }
4482 }
4483
4484 #[test]
4485 fn test_handshake_timeout() {
4486 let cfgs = create_peermgr_cfgs(2);
4489 cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
4490 cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
4491 let peers = create_network(2, &cfgs);
4492
4493 let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
4494 let mut fd_a = FileDescriptor::new(1);
4495 let mut fd_b = FileDescriptor::new(1);
4496 let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
4497 peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
4498
4499 {
4501 let peers_len = peers[0].peers.read().unwrap().len();
4502 assert_eq!(peers_len, 1);
4503 }
4504 peers[0].timer_tick_occurred();
4505 {
4506 let peers_len = peers[0].peers.read().unwrap().len();
4507 assert_eq!(peers_len, 1);
4508 }
4509
4510 peers[0].read_event(&mut fd_a, &initial_data).unwrap();
4511 peers[0].process_events();
4512 let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4513 peers[1].read_event(&mut fd_b, &a_data).unwrap();
4514 peers[1].process_events();
4515
4516 peers[0].timer_tick_occurred();
4518 {
4519 let peers_len = peers[0].peers.read().unwrap().len();
4520 assert_eq!(peers_len, 0);
4521 }
4522
4523 let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4524 assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
4525 }
4526
4527 #[test]
4528 fn test_inbound_conn_handshake_complete_awaiting_pong() {
4529 let logger = test_utils::TestLogger::new();
4534 let node_signer_a =
4535 test_utils::TestNodeSigner::new(SecretKey::from_slice(&[42; 32]).unwrap());
4536 let node_signer_b =
4537 test_utils::TestNodeSigner::new(SecretKey::from_slice(&[43; 32]).unwrap());
4538 let message_handler_a = MessageHandler {
4539 chan_handler: ErroringMessageHandler::new(),
4540 route_handler: IgnoringMessageHandler {},
4541 onion_message_handler: IgnoringMessageHandler {},
4542 custom_message_handler: IgnoringMessageHandler {},
4543 send_only_message_handler: IgnoringMessageHandler {},
4544 };
4545 let message_handler_b = MessageHandler {
4546 chan_handler: ErroringMessageHandler::new(),
4547 route_handler: IgnoringMessageHandler {},
4548 onion_message_handler: IgnoringMessageHandler {},
4549 custom_message_handler: IgnoringMessageHandler {},
4550 send_only_message_handler: IgnoringMessageHandler {},
4551 };
4552 let peer_a = PeerManager::new(message_handler_a, 0, &[0; 32], &logger, &node_signer_a);
4553 let peer_b = PeerManager::new(message_handler_b, 0, &[1; 32], &logger, &node_signer_b);
4554
4555 let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap();
4556 let mut fd_a = FileDescriptor::new(1);
4557 let mut fd_b = FileDescriptor::new(1);
4558
4559 let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
4561 peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
4562
4563 peer_a.read_event(&mut fd_a, &act_one).unwrap();
4564 peer_a.process_events();
4565
4566 let act_two = fd_a.outbound_data.lock().unwrap().split_off(0);
4567 peer_b.read_event(&mut fd_b, &act_two).unwrap();
4568 peer_b.process_events();
4569
4570 peer_b.timer_tick_occurred();
4572
4573 let act_three_with_init_b = fd_b.outbound_data.lock().unwrap().split_off(0);
4574 {
4575 let peer_a_lock = peer_a.peers.read().unwrap();
4576 let handshake_complete =
4577 peer_a_lock.get(&fd_a).unwrap().lock().unwrap().handshake_complete();
4578 assert!(!handshake_complete);
4579 }
4580
4581 peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap();
4582 peer_a.process_events();
4583
4584 {
4585 let peer_a_lock = peer_a.peers.read().unwrap();
4586 let handshake_complete =
4587 peer_a_lock.get(&fd_a).unwrap().lock().unwrap().handshake_complete();
4588 assert!(handshake_complete);
4589 }
4590
4591 let init_a = fd_a.outbound_data.lock().unwrap().split_off(0);
4592 assert!(!init_a.is_empty());
4593
4594 {
4595 let peer_b_lock = peer_b.peers.read().unwrap();
4596 let handshake_complete =
4597 peer_b_lock.get(&fd_b).unwrap().lock().unwrap().handshake_complete();
4598 assert!(!handshake_complete);
4599 }
4600
4601 peer_b.read_event(&mut fd_b, &init_a).unwrap();
4602 peer_b.process_events();
4603
4604 {
4605 let peer_b_lock = peer_b.peers.read().unwrap();
4606 let handshake_complete =
4607 peer_b_lock.get(&fd_b).unwrap().lock().unwrap().handshake_complete();
4608 assert!(handshake_complete);
4609 }
4610
4611 {
4613 let peers_len = peer_b.peers.read().unwrap().len();
4614 assert_eq!(peers_len, 1);
4615 }
4616
4617 assert!(fd_b.outbound_data.lock().unwrap().split_off(0).is_empty());
4619 peer_b.timer_tick_occurred();
4620 peer_b.process_events();
4621 assert!(!fd_b.outbound_data.lock().unwrap().split_off(0).is_empty());
4622
4623 let mut send_warning = || {
4624 {
4625 let peers = peer_a.peers.read().unwrap();
4626 let mut peer_b = peers.get(&fd_a).unwrap().lock().unwrap();
4627 peer_a.enqueue_message(
4628 &mut peer_b,
4629 &msgs::WarningMessage {
4630 channel_id: ChannelId([0; 32]),
4631 data: "no disconnect plz".to_string(),
4632 },
4633 );
4634 }
4635 peer_a.process_events();
4636 let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
4637 assert!(!msg.is_empty());
4638 peer_b.read_event(&mut fd_b, &msg).unwrap();
4639 peer_b.process_events();
4640 };
4641
4642 send_warning();
4645 for _ in 0..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER {
4646 peer_b.timer_tick_occurred();
4647 send_warning();
4648 }
4649 {
4650 let peers_len = peer_b.peers.read().unwrap().len();
4651 assert_eq!(peers_len, 1);
4652 }
4653
4654 peer_b.timer_tick_occurred();
4656 {
4657 let peers_len = peer_b.peers.read().unwrap().len();
4658 assert_eq!(peers_len, 0);
4659 }
4660 }
4661
4662 #[test]
4663 fn test_gossip_flood_pause() {
4664 use crate::routing::test_utils::channel_announcement;
4665 use lightning_types::features::ChannelFeatures;
4666
4667 let cfgs = create_peermgr_cfgs(2);
4670 let peers = create_network(2, &cfgs);
4671 let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4672
4673 macro_rules! drain_queues {
4674 () => {
4675 loop {
4676 peers[0].process_events();
4677 peers[1].process_events();
4678
4679 let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
4680 if !msg.is_empty() {
4681 peers[1].read_event(&mut fd_b, &msg).unwrap();
4682 continue;
4683 }
4684 let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
4685 if !msg.is_empty() {
4686 peers[0].read_event(&mut fd_a, &msg).unwrap();
4687 continue;
4688 }
4689 break;
4690 }
4691 };
4692 }
4693
4694 drain_queues!();
4696
4697 let secp_ctx = Secp256k1::new();
4698 let key = SecretKey::from_slice(&[1; 32]).unwrap();
4699 let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
4700 let encoded_size = msg.serialized_length() + 16 * 2 + 2 + 2;
4703 let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None };
4704
4705 fd_a.hang_writes.store(true, Ordering::Relaxed);
4706
4707 for _ in 0..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size {
4710 cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
4711 peers[0].process_events();
4712 }
4713
4714 {
4715 let peer_a_lock = peers[0].peers.read().unwrap();
4716 let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap();
4717 let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>()
4718 + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>();
4719 assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size);
4720 assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP);
4721 }
4722
4723 cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
4726 peers[0].process_events();
4727
4728 {
4729 let peer_a_lock = peers[0].peers.read().unwrap();
4730 let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap();
4731 let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>()
4732 + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>();
4733 assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP);
4734 assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size);
4735 }
4736
4737 fd_a.hang_writes.store(false, Ordering::Relaxed);
4742 cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
4743 peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
4744
4745 drain_queues!();
4746 {
4747 let peer_a_lock = peers[0].peers.read().unwrap();
4748 let empty =
4749 peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty();
4750 assert!(empty);
4751 }
4752
4753 assert_eq!(
4754 cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
4755 OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1
4756 );
4757 }
4758
4759 #[test]
4760 fn test_filter_addresses() {
4761 let ip_address = SocketAddress::TcpIpV4 { addr: [10, 0, 0, 0], port: 1000 };
4765 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4766 let ip_address = SocketAddress::TcpIpV4 { addr: [10, 0, 255, 201], port: 1000 };
4767 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4768 let ip_address = SocketAddress::TcpIpV4 { addr: [10, 255, 255, 255], port: 1000 };
4769 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4770
4771 let ip_address = SocketAddress::TcpIpV4 { addr: [0, 0, 0, 0], port: 1000 };
4773 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4774 let ip_address = SocketAddress::TcpIpV4 { addr: [0, 0, 255, 187], port: 1000 };
4775 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4776 let ip_address = SocketAddress::TcpIpV4 { addr: [0, 255, 255, 255], port: 1000 };
4777 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4778
4779 let ip_address = SocketAddress::TcpIpV4 { addr: [100, 64, 0, 0], port: 1000 };
4781 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4782 let ip_address = SocketAddress::TcpIpV4 { addr: [100, 78, 255, 0], port: 1000 };
4783 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4784 let ip_address = SocketAddress::TcpIpV4 { addr: [100, 127, 255, 255], port: 1000 };
4785 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4786
4787 let ip_address = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 0], port: 1000 };
4789 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4790 let ip_address = SocketAddress::TcpIpV4 { addr: [127, 65, 73, 0], port: 1000 };
4791 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4792 let ip_address = SocketAddress::TcpIpV4 { addr: [127, 255, 255, 255], port: 1000 };
4793 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4794
4795 let ip_address = SocketAddress::TcpIpV4 { addr: [169, 254, 0, 0], port: 1000 };
4797 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4798 let ip_address = SocketAddress::TcpIpV4 { addr: [169, 254, 221, 101], port: 1000 };
4799 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4800 let ip_address = SocketAddress::TcpIpV4 { addr: [169, 254, 255, 255], port: 1000 };
4801 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4802
4803 let ip_address = SocketAddress::TcpIpV4 { addr: [172, 16, 0, 0], port: 1000 };
4805 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4806 let ip_address = SocketAddress::TcpIpV4 { addr: [172, 27, 101, 23], port: 1000 };
4807 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4808 let ip_address = SocketAddress::TcpIpV4 { addr: [172, 31, 255, 255], port: 1000 };
4809 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4810
4811 let ip_address = SocketAddress::TcpIpV4 { addr: [192, 168, 0, 0], port: 1000 };
4813 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4814 let ip_address = SocketAddress::TcpIpV4 { addr: [192, 168, 205, 159], port: 1000 };
4815 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4816 let ip_address = SocketAddress::TcpIpV4 { addr: [192, 168, 255, 255], port: 1000 };
4817 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4818
4819 let ip_address = SocketAddress::TcpIpV4 { addr: [192, 88, 99, 0], port: 1000 };
4821 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4822 let ip_address = SocketAddress::TcpIpV4 { addr: [192, 88, 99, 140], port: 1000 };
4823 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4824 let ip_address = SocketAddress::TcpIpV4 { addr: [192, 88, 99, 255], port: 1000 };
4825 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4826
4827 let ip_address = SocketAddress::TcpIpV4 { addr: [188, 255, 99, 0], port: 1000 };
4829 assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4830 let ip_address = SocketAddress::TcpIpV4 { addr: [123, 8, 129, 14], port: 1000 };
4831 assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4832 let ip_address = SocketAddress::TcpIpV4 { addr: [2, 88, 9, 255], port: 1000 };
4833 assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4834
4835 let ip_address = SocketAddress::TcpIpV6 {
4837 addr: [32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
4838 port: 1000,
4839 };
4840 assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4841 let ip_address = SocketAddress::TcpIpV6 {
4842 addr: [45, 34, 209, 190, 0, 123, 55, 34, 0, 0, 3, 27, 201, 0, 0, 0],
4843 port: 1000,
4844 };
4845 assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4846 let ip_address = SocketAddress::TcpIpV6 {
4847 addr: [63, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255],
4848 port: 1000,
4849 };
4850 assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4851
4852 let ip_address = SocketAddress::TcpIpV6 {
4854 addr: [24, 240, 12, 32, 0, 0, 0, 0, 20, 97, 0, 32, 121, 254, 0, 0],
4855 port: 1000,
4856 };
4857 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4858 let ip_address = SocketAddress::TcpIpV6 {
4859 addr: [68, 23, 56, 63, 0, 0, 2, 7, 75, 109, 0, 39, 0, 0, 0, 0],
4860 port: 1000,
4861 };
4862 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4863 let ip_address = SocketAddress::TcpIpV6 {
4864 addr: [101, 38, 140, 230, 100, 0, 30, 98, 0, 26, 0, 0, 57, 96, 0, 0],
4865 port: 1000,
4866 };
4867 assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4868
4869 assert_eq!(filter_addresses(None), None);
4871 }
4872
4873 #[test]
4874 #[cfg(feature = "std")]
4875 fn test_process_events_multithreaded() {
4876 use std::time::{Duration, Instant};
4877 let cfg = Arc::new(create_peermgr_cfgs(1));
4887 let peer = Arc::new(
4889 create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap(),
4890 );
4891
4892 let end_time = Instant::now() + Duration::from_millis(100);
4893 let observed_loop = Arc::new(AtomicBool::new(false));
4894 let thread_fn = || {
4895 let thread_peer = Arc::clone(&peer);
4896 let thread_observed_loop = Arc::clone(&observed_loop);
4897 move || {
4898 while Instant::now() < end_time || !thread_observed_loop.load(Ordering::Acquire) {
4899 test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER
4900 .with(|val| val.store(0, Ordering::Relaxed));
4901 thread_peer.process_events();
4902 if test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER
4903 .with(|val| val.load(Ordering::Relaxed))
4904 > 1
4905 {
4906 thread_observed_loop.store(true, Ordering::Release);
4907 return;
4908 }
4909 std::thread::sleep(Duration::from_micros(1));
4910 }
4911 }
4912 };
4913
4914 let thread_a = std::thread::spawn(thread_fn());
4915 let thread_b = std::thread::spawn(thread_fn());
4916 let thread_c = std::thread::spawn(thread_fn());
4917 thread_fn()();
4918 thread_a.join().unwrap();
4919 thread_b.join().unwrap();
4920 thread_c.join().unwrap();
4921 assert!(observed_loop.load(Ordering::Acquire));
4922 }
4923}