lightning/ln/
peer_handler.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Top level peer message handling and socket handling logic lives here.
11//!
12//! Instead of actually servicing sockets ourselves we require that you implement the
13//! SocketDescriptor interface and use that to receive actions which you should perform on the
14//! socket, and call into PeerManager with bytes read from the socket. The PeerManager will then
15//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
16//! messages they should handle, and encoding/sending response messages.
17
18use bitcoin::constants::ChainHash;
19use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey};
20
21use crate::blinded_path::message::{AsyncPaymentsContext, DNSResolverContext, OffersContext};
22use crate::ln::msgs;
23use crate::ln::msgs::{
24	BaseMessageHandler, ChannelMessageHandler, Init, LightningError, MessageSendEvent,
25	OnionMessageHandler, RoutingMessageHandler, SendOnlyMessageHandler, SocketAddress,
26};
27use crate::ln::peer_channel_encryptor::{
28	MessageBuf, NextNoiseStep, PeerChannelEncryptor, MSG_BUF_ALLOC_SIZE,
29};
30use crate::ln::types::ChannelId;
31use crate::ln::wire;
32use crate::ln::wire::{Encode, Type};
33use crate::onion_message::async_payments::{
34	AsyncPaymentsMessageHandler, HeldHtlcAvailable, OfferPaths, OfferPathsRequest, ReleaseHeldHtlc,
35	ServeStaticInvoice, StaticInvoicePersisted,
36};
37use crate::onion_message::dns_resolution::{
38	DNSResolverMessage, DNSResolverMessageHandler, DNSSECProof, DNSSECQuery,
39};
40use crate::onion_message::messenger::{
41	CustomOnionMessageHandler, MessageSendInstructions, Responder, ResponseInstruction,
42};
43use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
44use crate::onion_message::packet::OnionMessageContents;
45use crate::routing::gossip::{NodeAlias, NodeId};
46use crate::sign::{NodeSigner, Recipient};
47use crate::types::features::{InitFeatures, NodeFeatures};
48use crate::types::string::PrintableString;
49use crate::util::atomic_counter::AtomicCounter;
50use crate::util::logger::{Level, Logger, WithContext};
51use crate::util::ser::{VecWriter, Writeable, Writer};
52
53#[allow(unused_imports)]
54use crate::prelude::*;
55
56use crate::io;
57use crate::sync::{FairRwLock, Mutex, MutexGuard};
58use core::convert::Infallible;
59use core::ops::Deref;
60use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering};
61use core::{cmp, fmt, hash, mem};
62#[cfg(not(c_bindings))]
63use {
64	crate::chain::chainmonitor::ChainMonitor,
65	crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager},
66	crate::onion_message::messenger::{SimpleArcOnionMessenger, SimpleRefOnionMessenger},
67	crate::routing::gossip::{NetworkGraph, P2PGossipSync},
68	crate::sign::{InMemorySigner, KeysManager},
69	crate::sync::Arc,
70};
71
72use bitcoin::hashes::sha256::Hash as Sha256;
73use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
74use bitcoin::hashes::{Hash, HashEngine};
75
76/// A handler provided to [`PeerManager`] for reading and handling custom messages.
77///
78/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific
79/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the
80/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler.
81///
82/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
83/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message
84pub trait CustomMessageHandler: wire::CustomMessageReader {
85	/// Handles the given message sent from `sender_node_id`, possibly producing messages for
86	/// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`]
87	/// to send.
88	fn handle_custom_message(
89		&self, msg: Self::CustomMessage, sender_node_id: PublicKey,
90	) -> Result<(), LightningError>;
91
92	/// Returns the list of pending messages that were generated by the handler, clearing the list
93	/// in the process. Each message is paired with the node id of the intended recipient. If no
94	/// connection to the node exists, then the message is simply not sent.
95	fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>;
96
97	/// Indicates a peer disconnected.
98	fn peer_disconnected(&self, their_node_id: PublicKey);
99
100	/// Handle a peer connecting.
101	///
102	/// May return an `Err(())` to indicate that we should immediately disconnect from the peer
103	/// (e.g. because the features they support are not sufficient to communicate with us).
104	///
105	/// Note, of course, that other message handlers may wish to communicate with the peer, which
106	/// disconnecting them will prevent.
107	///
108	/// [`Self::peer_disconnected`] will not be called if `Err(())` is returned.
109	fn peer_connected(&self, their_node_id: PublicKey, msg: &Init, inbound: bool)
110		-> Result<(), ()>;
111
112	/// Gets the node feature flags which this handler itself supports. All available handlers are
113	/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]
114	/// which are broadcasted in our [`NodeAnnouncement`] message.
115	///
116	/// [`NodeAnnouncement`]: crate::ln::msgs::NodeAnnouncement
117	fn provided_node_features(&self) -> NodeFeatures;
118
119	/// Gets the init feature flags which should be sent to the given peer. All available handlers
120	/// are queried similarly and their feature flags are OR'd together to form the [`InitFeatures`]
121	/// which are sent in our [`Init`] message.
122	///
123	/// [`Init`]: crate::ln::msgs::Init
124	fn provided_init_features(&self, their_node_id: PublicKey) -> InitFeatures;
125}
126
127/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
128/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
129pub struct IgnoringMessageHandler {}
130impl BaseMessageHandler for IgnoringMessageHandler {
131	fn peer_disconnected(&self, _their_node_id: PublicKey) {}
132	fn peer_connected(
133		&self, _their_node_id: PublicKey, _init: &msgs::Init, _inbound: bool,
134	) -> Result<(), ()> {
135		Ok(())
136	}
137	fn provided_node_features(&self) -> NodeFeatures {
138		NodeFeatures::empty()
139	}
140	fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
141		InitFeatures::empty()
142	}
143	fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
144		Vec::new()
145	}
146}
147impl RoutingMessageHandler for IgnoringMessageHandler {
148	fn handle_node_announcement(
149		&self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement,
150	) -> Result<bool, LightningError> {
151		Ok(false)
152	}
153	fn handle_channel_announcement(
154		&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement,
155	) -> Result<bool, LightningError> {
156		Ok(false)
157	}
158	fn handle_channel_update(
159		&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate,
160	) -> Result<bool, LightningError> {
161		Ok(false)
162	}
163	fn get_next_channel_announcement(
164		&self, _starting_point: u64,
165	) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)>
166	{
167		None
168	}
169	fn get_next_node_announcement(
170		&self, _starting_point: Option<&NodeId>,
171	) -> Option<msgs::NodeAnnouncement> {
172		None
173	}
174	fn handle_reply_channel_range(
175		&self, _their_node_id: PublicKey, _msg: msgs::ReplyChannelRange,
176	) -> Result<(), LightningError> {
177		Ok(())
178	}
179	fn handle_reply_short_channel_ids_end(
180		&self, _their_node_id: PublicKey, _msg: msgs::ReplyShortChannelIdsEnd,
181	) -> Result<(), LightningError> {
182		Ok(())
183	}
184	fn handle_query_channel_range(
185		&self, _their_node_id: PublicKey, _msg: msgs::QueryChannelRange,
186	) -> Result<(), LightningError> {
187		Ok(())
188	}
189	fn handle_query_short_channel_ids(
190		&self, _their_node_id: PublicKey, _msg: msgs::QueryShortChannelIds,
191	) -> Result<(), LightningError> {
192		Ok(())
193	}
194	fn processing_queue_high(&self) -> bool {
195		false
196	}
197}
198
199impl OnionMessageHandler for IgnoringMessageHandler {
200	fn handle_onion_message(&self, _their_node_id: PublicKey, _msg: &msgs::OnionMessage) {}
201	fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> {
202		None
203	}
204	fn timer_tick_occurred(&self) {}
205}
206
207impl OffersMessageHandler for IgnoringMessageHandler {
208	fn handle_message(
209		&self, _message: OffersMessage, _context: Option<OffersContext>,
210		_responder: Option<Responder>,
211	) -> Option<(OffersMessage, ResponseInstruction)> {
212		None
213	}
214}
215impl AsyncPaymentsMessageHandler for IgnoringMessageHandler {
216	fn handle_offer_paths_request(
217		&self, _message: OfferPathsRequest, _context: AsyncPaymentsContext,
218		_responder: Option<Responder>,
219	) -> Option<(OfferPaths, ResponseInstruction)> {
220		None
221	}
222	fn handle_offer_paths(
223		&self, _message: OfferPaths, _context: AsyncPaymentsContext, _responder: Option<Responder>,
224	) -> Option<(ServeStaticInvoice, ResponseInstruction)> {
225		None
226	}
227	fn handle_serve_static_invoice(
228		&self, _message: ServeStaticInvoice, _context: AsyncPaymentsContext,
229		_responder: Option<Responder>,
230	) {
231	}
232	fn handle_static_invoice_persisted(
233		&self, _message: StaticInvoicePersisted, _context: AsyncPaymentsContext,
234	) {
235	}
236	fn handle_held_htlc_available(
237		&self, _message: HeldHtlcAvailable, _context: AsyncPaymentsContext,
238		_responder: Option<Responder>,
239	) -> Option<(ReleaseHeldHtlc, ResponseInstruction)> {
240		None
241	}
242	fn handle_release_held_htlc(&self, _message: ReleaseHeldHtlc, _context: AsyncPaymentsContext) {}
243}
244impl DNSResolverMessageHandler for IgnoringMessageHandler {
245	fn handle_dnssec_query(
246		&self, _message: DNSSECQuery, _responder: Option<Responder>,
247	) -> Option<(DNSResolverMessage, ResponseInstruction)> {
248		None
249	}
250	fn handle_dnssec_proof(&self, _message: DNSSECProof, _context: DNSResolverContext) {}
251}
252impl CustomOnionMessageHandler for IgnoringMessageHandler {
253	type CustomMessage = Infallible;
254	fn handle_custom_message(
255		&self, _message: Infallible, _context: Option<Vec<u8>>, _responder: Option<Responder>,
256	) -> Option<(Infallible, ResponseInstruction)> {
257		// Since we always return `None` in the read the handle method should never be called.
258		unreachable!();
259	}
260	fn read_custom_message<R: io::Read>(
261		&self, _msg_type: u64, _buffer: &mut R,
262	) -> Result<Option<Infallible>, msgs::DecodeError>
263	where
264		Self: Sized,
265	{
266		Ok(None)
267	}
268	fn release_pending_custom_messages(&self) -> Vec<(Infallible, MessageSendInstructions)> {
269		vec![]
270	}
271}
272
273impl SendOnlyMessageHandler for IgnoringMessageHandler {}
274
275impl OnionMessageContents for Infallible {
276	fn tlv_type(&self) -> u64 {
277		unreachable!();
278	}
279	#[cfg(c_bindings)]
280	fn msg_type(&self) -> String {
281		unreachable!();
282	}
283	#[cfg(not(c_bindings))]
284	fn msg_type(&self) -> &'static str {
285		unreachable!();
286	}
287}
288
289impl Deref for IgnoringMessageHandler {
290	type Target = IgnoringMessageHandler;
291	fn deref(&self) -> &Self {
292		self
293	}
294}
295
296// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
297// method that takes self for it.
298impl wire::Type for Infallible {
299	fn type_id(&self) -> u16 {
300		unreachable!();
301	}
302}
303impl Writeable for Infallible {
304	fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
305		unreachable!();
306	}
307}
308
309impl wire::CustomMessageReader for IgnoringMessageHandler {
310	type CustomMessage = Infallible;
311	fn read<R: io::Read>(
312		&self, _message_type: u16, _buffer: &mut R,
313	) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
314		Ok(None)
315	}
316}
317
318impl CustomMessageHandler for IgnoringMessageHandler {
319	fn handle_custom_message(
320		&self, _msg: Infallible, _sender_node_id: PublicKey,
321	) -> Result<(), LightningError> {
322		// Since we always return `None` in the read the handle method should never be called.
323		unreachable!();
324	}
325
326	fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
327		Vec::new()
328	}
329
330	fn peer_disconnected(&self, _their_node_id: PublicKey) {}
331
332	fn peer_connected(
333		&self, _their_node_id: PublicKey, _msg: &Init, _inbound: bool,
334	) -> Result<(), ()> {
335		Ok(())
336	}
337
338	fn provided_node_features(&self) -> NodeFeatures {
339		NodeFeatures::empty()
340	}
341
342	fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
343		InitFeatures::empty()
344	}
345}
346
347/// A dummy struct which implements `ChannelMessageHandler` without having any channels.
348/// You can provide one of these as the route_handler in a MessageHandler.
349pub struct ErroringMessageHandler {
350	message_queue: Mutex<Vec<MessageSendEvent>>,
351}
352impl ErroringMessageHandler {
353	/// Constructs a new ErroringMessageHandler
354	pub fn new() -> Self {
355		Self { message_queue: Mutex::new(Vec::new()) }
356	}
357	fn push_error(&self, node_id: PublicKey, channel_id: ChannelId) {
358		self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError {
359			action: msgs::ErrorAction::SendErrorMessage {
360				msg: msgs::ErrorMessage {
361					channel_id,
362					data: "We do not support channel messages, sorry.".to_owned(),
363				},
364			},
365			node_id,
366		});
367	}
368}
369impl BaseMessageHandler for ErroringMessageHandler {
370	fn peer_disconnected(&self, _their_node_id: PublicKey) {}
371	fn peer_connected(
372		&self, _their_node_id: PublicKey, _init: &msgs::Init, _inbound: bool,
373	) -> Result<(), ()> {
374		Ok(())
375	}
376	fn provided_node_features(&self) -> NodeFeatures {
377		NodeFeatures::empty()
378	}
379	fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
380		// Set a number of features which various nodes may require to talk to us. It's totally
381		// reasonable to indicate we "support" all kinds of channel features...we just reject all
382		// channels.
383		let mut features = InitFeatures::empty();
384		features.set_data_loss_protect_optional();
385		features.set_upfront_shutdown_script_optional();
386		features.set_variable_length_onion_optional();
387		features.set_static_remote_key_optional();
388		features.set_payment_secret_optional();
389		features.set_basic_mpp_optional();
390		features.set_wumbo_optional();
391		features.set_shutdown_any_segwit_optional();
392		features.set_dual_fund_optional();
393		features.set_channel_type_optional();
394		features.set_scid_privacy_optional();
395		features.set_zero_conf_optional();
396		features.set_route_blinding_optional();
397		#[cfg(simple_close)]
398		features.set_simple_close_optional();
399		features
400	}
401
402	fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
403		let mut res = Vec::new();
404		mem::swap(&mut res, &mut self.message_queue.lock().unwrap());
405		res
406	}
407}
408impl ChannelMessageHandler for ErroringMessageHandler {
409	// Any messages which are related to a specific channel generate an error message to let the
410	// peer know we don't care about channels.
411	fn handle_open_channel(&self, their_node_id: PublicKey, msg: &msgs::OpenChannel) {
412		ErroringMessageHandler::push_error(
413			self,
414			their_node_id,
415			msg.common_fields.temporary_channel_id,
416		);
417	}
418	fn handle_accept_channel(&self, their_node_id: PublicKey, msg: &msgs::AcceptChannel) {
419		ErroringMessageHandler::push_error(
420			self,
421			their_node_id,
422			msg.common_fields.temporary_channel_id,
423		);
424	}
425	fn handle_funding_created(&self, their_node_id: PublicKey, msg: &msgs::FundingCreated) {
426		ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
427	}
428	fn handle_funding_signed(&self, their_node_id: PublicKey, msg: &msgs::FundingSigned) {
429		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
430	}
431	fn handle_channel_ready(&self, their_node_id: PublicKey, msg: &msgs::ChannelReady) {
432		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
433	}
434	fn handle_shutdown(&self, their_node_id: PublicKey, msg: &msgs::Shutdown) {
435		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
436	}
437	fn handle_closing_signed(&self, their_node_id: PublicKey, msg: &msgs::ClosingSigned) {
438		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
439	}
440	#[cfg(simple_close)]
441	fn handle_closing_complete(&self, their_node_id: PublicKey, msg: msgs::ClosingComplete) {
442		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
443	}
444	#[cfg(simple_close)]
445	fn handle_closing_sig(&self, their_node_id: PublicKey, msg: msgs::ClosingSig) {
446		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
447	}
448	fn handle_stfu(&self, their_node_id: PublicKey, msg: &msgs::Stfu) {
449		ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
450	}
451	fn handle_splice_init(&self, their_node_id: PublicKey, msg: &msgs::SpliceInit) {
452		ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
453	}
454	fn handle_splice_ack(&self, their_node_id: PublicKey, msg: &msgs::SpliceAck) {
455		ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
456	}
457	fn handle_splice_locked(&self, their_node_id: PublicKey, msg: &msgs::SpliceLocked) {
458		ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
459	}
460	fn handle_update_add_htlc(&self, their_node_id: PublicKey, msg: &msgs::UpdateAddHTLC) {
461		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
462	}
463	fn handle_update_fulfill_htlc(&self, their_node_id: PublicKey, msg: msgs::UpdateFulfillHTLC) {
464		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
465	}
466	fn handle_update_fail_htlc(&self, their_node_id: PublicKey, msg: &msgs::UpdateFailHTLC) {
467		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
468	}
469	fn handle_update_fail_malformed_htlc(
470		&self, their_node_id: PublicKey, msg: &msgs::UpdateFailMalformedHTLC,
471	) {
472		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
473	}
474	fn handle_commitment_signed(&self, their_node_id: PublicKey, msg: &msgs::CommitmentSigned) {
475		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
476	}
477	fn handle_commitment_signed_batch(
478		&self, their_node_id: PublicKey, channel_id: ChannelId, _batch: Vec<msgs::CommitmentSigned>,
479	) {
480		ErroringMessageHandler::push_error(self, their_node_id, channel_id);
481	}
482	fn handle_revoke_and_ack(&self, their_node_id: PublicKey, msg: &msgs::RevokeAndACK) {
483		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
484	}
485	fn handle_update_fee(&self, their_node_id: PublicKey, msg: &msgs::UpdateFee) {
486		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
487	}
488	fn handle_announcement_signatures(
489		&self, their_node_id: PublicKey, msg: &msgs::AnnouncementSignatures,
490	) {
491		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
492	}
493	fn handle_channel_reestablish(&self, their_node_id: PublicKey, msg: &msgs::ChannelReestablish) {
494		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
495	}
496	// msgs::ChannelUpdate does not contain the channel_id field, so we just drop them.
497	fn handle_channel_update(&self, _their_node_id: PublicKey, _msg: &msgs::ChannelUpdate) {}
498
499	fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: msgs::PeerStorage) {}
500	fn handle_peer_storage_retrieval(
501		&self, _their_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval,
502	) {
503	}
504
505	fn handle_error(&self, _their_node_id: PublicKey, _msg: &msgs::ErrorMessage) {}
506
507	fn get_chain_hashes(&self) -> Option<Vec<ChainHash>> {
508		// We don't enforce any chains upon peer connection for `ErroringMessageHandler` and leave it up
509		// to users of `ErroringMessageHandler` to make decisions on network compatiblility.
510		// There's not really any way to pull in specific networks here, and hardcoding can cause breakages.
511		None
512	}
513
514	fn handle_open_channel_v2(&self, their_node_id: PublicKey, msg: &msgs::OpenChannelV2) {
515		ErroringMessageHandler::push_error(
516			self,
517			their_node_id,
518			msg.common_fields.temporary_channel_id,
519		);
520	}
521
522	fn handle_accept_channel_v2(&self, their_node_id: PublicKey, msg: &msgs::AcceptChannelV2) {
523		ErroringMessageHandler::push_error(
524			self,
525			their_node_id,
526			msg.common_fields.temporary_channel_id,
527		);
528	}
529
530	fn handle_tx_add_input(&self, their_node_id: PublicKey, msg: &msgs::TxAddInput) {
531		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
532	}
533
534	fn handle_tx_add_output(&self, their_node_id: PublicKey, msg: &msgs::TxAddOutput) {
535		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
536	}
537
538	fn handle_tx_remove_input(&self, their_node_id: PublicKey, msg: &msgs::TxRemoveInput) {
539		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
540	}
541
542	fn handle_tx_remove_output(&self, their_node_id: PublicKey, msg: &msgs::TxRemoveOutput) {
543		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
544	}
545
546	fn handle_tx_complete(&self, their_node_id: PublicKey, msg: &msgs::TxComplete) {
547		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
548	}
549
550	fn handle_tx_signatures(&self, their_node_id: PublicKey, msg: &msgs::TxSignatures) {
551		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
552	}
553
554	fn handle_tx_init_rbf(&self, their_node_id: PublicKey, msg: &msgs::TxInitRbf) {
555		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
556	}
557
558	fn handle_tx_ack_rbf(&self, their_node_id: PublicKey, msg: &msgs::TxAckRbf) {
559		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
560	}
561
562	fn handle_tx_abort(&self, their_node_id: PublicKey, msg: &msgs::TxAbort) {
563		ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
564	}
565
566	fn message_received(&self) {}
567}
568
569impl Deref for ErroringMessageHandler {
570	type Target = ErroringMessageHandler;
571	fn deref(&self) -> &Self {
572		self
573	}
574}
575
576/// Provides references to trait impls which handle different types of messages.
577pub struct MessageHandler<CM: Deref, RM: Deref, OM: Deref, CustomM: Deref, SM: Deref>
578where
579	CM::Target: ChannelMessageHandler,
580	RM::Target: RoutingMessageHandler,
581	OM::Target: OnionMessageHandler,
582	CustomM::Target: CustomMessageHandler,
583	SM::Target: SendOnlyMessageHandler,
584{
585	/// A message handler which handles messages specific to channels. Usually this is just a
586	/// [`ChannelManager`] object or an [`ErroringMessageHandler`].
587	///
588	/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
589	pub chan_handler: CM,
590	/// A message handler which handles messages updating our knowledge of the network channel
591	/// graph. Usually this is just a [`P2PGossipSync`] object or an [`IgnoringMessageHandler`].
592	///
593	/// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
594	pub route_handler: RM,
595
596	/// A message handler which handles onion messages. This should generally be an
597	/// [`OnionMessenger`], but can also be an [`IgnoringMessageHandler`].
598	///
599	/// [`OnionMessenger`]: crate::onion_message::messenger::OnionMessenger
600	pub onion_message_handler: OM,
601
602	/// A message handler which handles custom messages. The only LDK-provided implementation is
603	/// [`IgnoringMessageHandler`].
604	pub custom_message_handler: CustomM,
605
606	/// A message handler which can be used to send messages.
607	///
608	/// This should generally be a [`ChainMonitor`].
609	///
610	/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
611	pub send_only_message_handler: SM,
612}
613
614/// Provides an object which can be used to send data to and which uniquely identifies a connection
615/// to a remote host. You will need to be able to generate multiple of these which meet Eq and
616/// implement Hash to meet the PeerManager API.
617///
618/// For efficiency, [`Clone`] should be relatively cheap for this type.
619///
620/// Two descriptors may compare equal (by [`cmp::Eq`] and [`hash::Hash`]) as long as the original
621/// has been disconnected, the [`PeerManager`] has been informed of the disconnection (either by it
622/// having triggered the disconnection or a call to [`PeerManager::socket_disconnected`]), and no
623/// further calls to the [`PeerManager`] related to the original socket occur. This allows you to
624/// use a file descriptor for your SocketDescriptor directly, however for simplicity you may wish
625/// to simply use another value which is guaranteed to be globally unique instead.
626pub trait SocketDescriptor: cmp::Eq + hash::Hash + Clone {
627	/// Attempts to send some data from the given slice to the peer.
628	///
629	/// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
630	/// Note that in the disconnected case, [`PeerManager::socket_disconnected`] must still be
631	/// called and further write attempts may occur until that time.
632	///
633	/// If the returned size is smaller than `data.len()`, a
634	/// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
635	/// written.
636	///
637	/// If `continue_read` is *not* set, further [`PeerManager::read_event`] calls should be
638	/// avoided until another call is made with it set. This allows us to pause read if there are
639	/// too many outgoing messages queued for a peer to avoid DoS issues where a peer fills our
640	/// buffer by sending us messages that need response without reading the responses.
641	///
642	/// Note that calls may be made with an empty `data` to update the `continue_read` flag.
643	fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize;
644	/// Disconnect the socket pointed to by this SocketDescriptor.
645	///
646	/// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
647	/// call (doing so is a noop).
648	fn disconnect_socket(&mut self);
649}
650
651/// Details of a connected peer as returned by [`PeerManager::list_peers`].
652pub struct PeerDetails {
653	/// The node id of the peer.
654	///
655	/// For outbound connections, this [`PublicKey`] will be the same as the `their_node_id` parameter
656	/// passed in to [`PeerManager::new_outbound_connection`].
657	pub counterparty_node_id: PublicKey,
658	/// The socket address the peer provided in the initial handshake.
659	///
660	/// Will only be `Some` if an address had been previously provided to
661	/// [`PeerManager::new_outbound_connection`] or [`PeerManager::new_inbound_connection`].
662	pub socket_address: Option<SocketAddress>,
663	/// The features the peer provided in the initial handshake.
664	pub init_features: InitFeatures,
665	/// Indicates the direction of the peer connection.
666	///
667	/// Will be `true` for inbound connections, and `false` for outbound connections.
668	pub is_inbound_connection: bool,
669}
670
671/// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
672/// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the
673/// descriptor.
674#[derive(Clone)]
675pub struct PeerHandleError {}
676impl fmt::Debug for PeerHandleError {
677	fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
678		formatter.write_str("Peer Sent Invalid Data")
679	}
680}
681impl fmt::Display for PeerHandleError {
682	fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
683		formatter.write_str("Peer Sent Invalid Data")
684	}
685}
686
687/// Internal struct for keeping track of the gossip syncing progress with a given peer
688enum InitSyncTracker {
689	/// Only sync ad-hoc gossip as it comes in, do not send historical gossip.
690	/// Upon receipt of a GossipTimestampFilter message, this is the default initial state if the
691	/// contained timestamp is less than 6 hours old.
692	NoSyncRequested,
693	/// Send historical gossip starting at the given channel id, which gets incremented as the
694	/// gossiping progresses.
695	/// Upon receipt of a GossipTimestampFilter message, this is the default initial state if the
696	/// contained timestamp is at least 6 hours old, and the initial channel id is set to 0.
697	ChannelsSyncing(u64),
698	/// Once the channel announcements and updates finish syncing, the node announcements are synced.
699	NodesSyncing(NodeId),
700}
701
702/// A batch of messages initiated when receiving a `start_batch` message.
703struct MessageBatch {
704	/// The channel associated with all the messages in the batch.
705	channel_id: ChannelId,
706
707	/// The number of messages expected to be in the batch.
708	batch_size: usize,
709
710	/// The batch of messages, which should all be of the same type.
711	messages: MessageBatchImpl,
712}
713
714/// The representation of the message batch, which may different for each message type.
715enum MessageBatchImpl {
716	/// A batch of `commitment_signed` messages used when there are pending splices.
717	CommitmentSigned(Vec<msgs::CommitmentSigned>),
718}
719
720/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
721/// we have fewer than this many messages in the outbound buffer again.
722/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
723/// refilled as we send bytes.
724const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12;
725
726/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
727/// the socket receive buffer before receiving the ping.
728///
729/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
730/// including any network delays, outbound traffic, or the same for messages from other peers.
731///
732/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
733/// per connected peer to respond to a ping, as long as they send us at least one message during
734/// each tick, ensuring we aren't actually just disconnected.
735/// With a timer tick interval of ten seconds, this translates to about 40 seconds per connected
736/// peer.
737///
738/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
739/// two connected peers, assuming most LDK-running systems have at least two cores.
740const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
741
742/// This is the minimum number of messages we expect a peer to be able to handle within one timer
743/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
744/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
745/// process before the next ping.
746///
747/// Note that we continue responding to other messages even after we've sent this many messages, so
748/// this really limits gossip broadcast, gossip backfill, and onion message relay.
749const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
750
751/// The maximum number of bytes which we allow in a peer's outbound buffers before we start
752/// dropping outbound gossip forwards.
753///
754/// This is currently 128KiB, or two messages at the maximum message size (though in practice we
755/// refuse to forward gossip messages which are substantially larger than we expect, so this is
756/// closer to ~85 messages if all queued messages are maximum-sized channel announcements).
757///
758/// Note that as we always drain the gossip forwarding queue before continuing gossip backfill,
759/// the equivalent maximum buffer size for gossip backfill is zero.
760const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP: usize = 64 * 1024 * 2;
761
762struct Peer {
763	channel_encryptor: PeerChannelEncryptor,
764	/// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
765	/// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field.
766	their_node_id: Option<(PublicKey, NodeId)>,
767	/// The features provided in the peer's [`msgs::Init`] message.
768	///
769	/// This is set only after we've processed the [`msgs::Init`] message and called relevant
770	/// `peer_connected` handler methods. Thus, this field is set *iff* we've finished our
771	/// handshake and can talk to this peer normally (though use [`Peer::handshake_complete`] to
772	/// check this.
773	their_features: Option<InitFeatures>,
774	their_socket_address: Option<SocketAddress>,
775
776	pending_outbound_buffer: VecDeque<Vec<u8>>,
777	pending_outbound_buffer_first_msg_offset: usize,
778	/// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
779	/// prioritize channel messages over them.
780	///
781	/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
782	gossip_broadcast_buffer: VecDeque<MessageBuf>,
783	awaiting_write_event: bool,
784	/// Set to true if the last call to [`SocketDescriptor::send_data`] for this peer had the
785	/// `should_read` flag unset, indicating we've told the driver to stop reading from this peer.
786	sent_pause_read: bool,
787
788	pending_read_buffer: Vec<u8>,
789	pending_read_buffer_pos: usize,
790	pending_read_is_header: bool,
791
792	sync_status: InitSyncTracker,
793
794	msgs_sent_since_pong: usize,
795	awaiting_pong_timer_tick_intervals: i64,
796	received_message_since_timer_tick: bool,
797	sent_gossip_timestamp_filter: bool,
798
799	/// Indicates we've received a `channel_announcement` since the last time we had
800	/// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
801	/// `channel_announcement` at all - we set this unconditionally but unset it every time we
802	/// check if we're gossip-processing-backlogged).
803	received_channel_announce_since_backlogged: bool,
804
805	inbound_connection: bool,
806
807	message_batch: Option<MessageBatch>,
808}
809
810impl Peer {
811	/// True after we've processed the [`msgs::Init`] message and called relevant `peer_connected`
812	/// handler methods. Thus, this implies we've finished our handshake and can talk to this peer
813	/// normally.
814	fn handshake_complete(&self) -> bool {
815		self.their_features.is_some()
816	}
817
818	/// Returns true if the channel announcements/updates for the given channel should be
819	/// forwarded to this peer.
820	/// If we are sending our routing table to this peer and we have not yet sent channel
821	/// announcements/updates for the given channel_id then we will send it when we get to that
822	/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
823	/// sent the old versions, we should send the update, and so return true here.
824	fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
825		if !self.handshake_complete() {
826			return false;
827		}
828		if self.their_features.as_ref().unwrap().supports_gossip_queries()
829			&& !self.sent_gossip_timestamp_filter
830		{
831			return false;
832		}
833		match self.sync_status {
834			InitSyncTracker::NoSyncRequested => true,
835			InitSyncTracker::ChannelsSyncing(i) => channel_id < i,
836			InitSyncTracker::NodesSyncing(_) => true,
837		}
838	}
839
840	/// Similar to the above, but for node announcements indexed by node_id.
841	fn should_forward_node_announcement(&self, node_id: NodeId) -> bool {
842		if !self.handshake_complete() {
843			return false;
844		}
845		if self.their_features.as_ref().unwrap().supports_gossip_queries()
846			&& !self.sent_gossip_timestamp_filter
847		{
848			return false;
849		}
850		match self.sync_status {
851			InitSyncTracker::NoSyncRequested => true,
852			InitSyncTracker::ChannelsSyncing(_) => false,
853			InitSyncTracker::NodesSyncing(sync_node_id) => {
854				sync_node_id.as_slice() < node_id.as_slice()
855			},
856		}
857	}
858
859	/// Returns whether we should be reading bytes from this peer, based on whether its outbound
860	/// buffer still has space and we don't need to pause reads to get some writes out.
861	fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
862		if !gossip_processing_backlogged {
863			self.received_channel_announce_since_backlogged = false;
864		}
865		self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
866			&& (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
867	}
868
869	/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
870	/// outbound buffer. This is checked every time the peer's buffer may have been drained.
871	fn should_buffer_gossip_backfill(&self) -> bool {
872		self.pending_outbound_buffer.is_empty()
873			&& self.gossip_broadcast_buffer.is_empty()
874			&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
875			&& self.handshake_complete()
876	}
877
878	/// Determines if we should push an onion message onto a peer's outbound buffer. This is checked
879	/// every time the peer's buffer may have been drained.
880	fn should_buffer_onion_message(&self) -> bool {
881		self.pending_outbound_buffer.is_empty()
882			&& self.handshake_complete()
883			&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
884	}
885
886	/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
887	/// buffer. This is checked every time the peer's buffer may have been drained.
888	fn should_buffer_gossip_broadcast(&self) -> bool {
889		self.pending_outbound_buffer.is_empty()
890			&& self.handshake_complete()
891			&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
892	}
893
894	/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
895	fn buffer_full_drop_gossip_broadcast(&self) -> bool {
896		let total_outbound_buffered: usize =
897			self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>()
898				+ self.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>();
899
900		total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP
901	}
902
903	fn set_their_node_id(&mut self, node_id: PublicKey) {
904		self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id)));
905	}
906}
907
908/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
909/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
910/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
911/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
912/// issues such as overly long function definitions.
913///
914/// This is not exported to bindings users as type aliases aren't supported in most languages.
915#[cfg(not(c_bindings))]
916pub type SimpleArcPeerManager<SD, M, T, F, C, L, CF, S> = PeerManager<
917	SD,
918	Arc<SimpleArcChannelManager<M, T, F, L>>,
919	Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, C, Arc<L>>>,
920	Arc<SimpleArcOnionMessenger<M, T, F, L>>,
921	Arc<L>,
922	IgnoringMessageHandler,
923	Arc<KeysManager>,
924	Arc<ChainMonitor<InMemorySigner, Arc<CF>, Arc<T>, Arc<F>, Arc<L>, Arc<S>, Arc<KeysManager>>>,
925>;
926
927/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
928/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
929/// need a PeerManager with a static lifetime. You'll need a static lifetime in cases such as
930/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
931/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
932/// helps with issues such as long function definitions.
933///
934/// This is not exported to bindings users as type aliases aren't supported in most languages.
935#[cfg(not(c_bindings))]
936#[rustfmt::skip]
937pub type SimpleRefPeerManager<
938	'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, 'mr, SD, M, T, F, C, L
939> = PeerManager<
940	SD,
941	&'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, 'mr, M, T, F, L>,
942	&'f P2PGossipSync<&'graph NetworkGraph<&'logger L>, C, &'logger L>,
943	&'h SimpleRefOnionMessenger<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, 'j, 'k, M, T, F, L>,
944	&'logger L,
945	IgnoringMessageHandler,
946	&'c KeysManager,
947	&'j ChainMonitor<&'a M, C, &'b T, &'c F, &'logger L, &'c KeysManager, &'c KeysManager>,
948>;
949
950/// A generic trait which is implemented for all [`PeerManager`]s. This makes bounding functions or
951/// structs on any [`PeerManager`] much simpler as only this trait is needed as a bound, rather
952/// than the full set of bounds on [`PeerManager`] itself.
953///
954/// This is not exported to bindings users as general cover traits aren't useful in other
955/// languages.
956#[allow(missing_docs)]
957pub trait APeerManager {
958	type Descriptor: SocketDescriptor;
959	type CMT: ChannelMessageHandler + ?Sized;
960	type CM: Deref<Target = Self::CMT>;
961	type RMT: RoutingMessageHandler + ?Sized;
962	type RM: Deref<Target = Self::RMT>;
963	type OMT: OnionMessageHandler + ?Sized;
964	type OM: Deref<Target = Self::OMT>;
965	type LT: Logger + ?Sized;
966	type L: Deref<Target = Self::LT>;
967	type CMHT: CustomMessageHandler + ?Sized;
968	type CMH: Deref<Target = Self::CMHT>;
969	type NST: NodeSigner + ?Sized;
970	type NS: Deref<Target = Self::NST>;
971	type SMT: SendOnlyMessageHandler + ?Sized;
972	type SM: Deref<Target = Self::SMT>;
973	/// Gets a reference to the underlying [`PeerManager`].
974	fn as_ref(
975		&self,
976	) -> &PeerManager<
977		Self::Descriptor,
978		Self::CM,
979		Self::RM,
980		Self::OM,
981		Self::L,
982		Self::CMH,
983		Self::NS,
984		Self::SM,
985	>;
986}
987
988impl<
989		Descriptor: SocketDescriptor,
990		CM: Deref,
991		RM: Deref,
992		OM: Deref,
993		L: Deref,
994		CMH: Deref,
995		NS: Deref,
996		SM: Deref,
997	> APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS, SM>
998where
999	CM::Target: ChannelMessageHandler,
1000	RM::Target: RoutingMessageHandler,
1001	OM::Target: OnionMessageHandler,
1002	L::Target: Logger,
1003	CMH::Target: CustomMessageHandler,
1004	NS::Target: NodeSigner,
1005	SM::Target: SendOnlyMessageHandler,
1006{
1007	type Descriptor = Descriptor;
1008	type CMT = <CM as Deref>::Target;
1009	type CM = CM;
1010	type RMT = <RM as Deref>::Target;
1011	type RM = RM;
1012	type OMT = <OM as Deref>::Target;
1013	type OM = OM;
1014	type LT = <L as Deref>::Target;
1015	type L = L;
1016	type CMHT = <CMH as Deref>::Target;
1017	type CMH = CMH;
1018	type NST = <NS as Deref>::Target;
1019	type NS = NS;
1020	type SMT = <SM as Deref>::Target;
1021	type SM = SM;
1022	fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS, SM> {
1023		self
1024	}
1025}
1026
1027/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
1028/// socket events into messages which it passes on to its [`MessageHandler`].
1029///
1030/// Locks are taken internally, so you must never assume that reentrancy from a
1031/// [`SocketDescriptor`] call back into [`PeerManager`] methods will not deadlock.
1032///
1033/// Calls to [`read_event`] will decode relevant messages and pass them to the
1034/// [`ChannelMessageHandler`], likely doing message processing in-line. Thus, the primary form of
1035/// parallelism in Rust-Lightning is in calls to [`read_event`]. Note, however, that calls to any
1036/// [`PeerManager`] functions related to the same connection must occur only in serial, making new
1037/// calls only after previous ones have returned.
1038///
1039/// Rather than using a plain [`PeerManager`], it is preferable to use either a [`SimpleArcPeerManager`]
1040/// a [`SimpleRefPeerManager`], for conciseness. See their documentation for more details, but
1041/// essentially you should default to using a [`SimpleRefPeerManager`], and use a
1042/// [`SimpleArcPeerManager`] when you require a `PeerManager` with a static lifetime, such as when
1043/// you're using lightning-net-tokio.
1044///
1045/// [`read_event`]: PeerManager::read_event
1046pub struct PeerManager<
1047	Descriptor: SocketDescriptor,
1048	CM: Deref,
1049	RM: Deref,
1050	OM: Deref,
1051	L: Deref,
1052	CMH: Deref,
1053	NS: Deref,
1054	SM: Deref,
1055> where
1056	CM::Target: ChannelMessageHandler,
1057	RM::Target: RoutingMessageHandler,
1058	OM::Target: OnionMessageHandler,
1059	L::Target: Logger,
1060	CMH::Target: CustomMessageHandler,
1061	NS::Target: NodeSigner,
1062	SM::Target: SendOnlyMessageHandler,
1063{
1064	message_handler: MessageHandler<CM, RM, OM, CMH, SM>,
1065	/// Connection state for each connected peer - we have an outer read-write lock which is taken
1066	/// as read while we're doing processing for a peer and taken write when a peer is being added
1067	/// or removed.
1068	///
1069	/// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold
1070	/// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires
1071	/// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to
1072	/// the `MessageHandler`s for a given peer is already guaranteed.
1073	peers: FairRwLock<HashMap<Descriptor, Mutex<Peer>>>,
1074	/// Only add to this set when noise completes.
1075	/// Locked *after* peers. When an item is removed, it must be removed with the `peers` write
1076	/// lock held. Entries may be added with only the `peers` read lock held (though the
1077	/// `Descriptor` value must already exist in `peers`).
1078	node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
1079	/// We can only have one thread processing events at once, but if a second call to
1080	/// `process_events` happens while a first call is in progress, one of the two calls needs to
1081	/// start from the top to ensure any new messages are also handled.
1082	///
1083	/// Because the event handler calls into user code which may block, we don't want to block a
1084	/// second thread waiting for another thread to handle events which is then blocked on user
1085	/// code, so we store an atomic counter here:
1086	///  * 0 indicates no event processor is running
1087	///  * 1 indicates an event processor is running
1088	///  * > 1 indicates an event processor is running but needs to start again from the top once
1089	///        it finishes as another thread tried to start processing events but returned early.
1090	event_processing_state: AtomicI32,
1091
1092	/// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
1093	/// value increases strictly since we don't assume access to a time source.
1094	last_node_announcement_serial: AtomicU32,
1095
1096	ephemeral_key_midstate: Sha256Engine,
1097
1098	peer_counter: AtomicCounter,
1099
1100	gossip_processing_backlogged: AtomicBool,
1101	gossip_processing_backlog_lifted: AtomicBool,
1102
1103	node_signer: NS,
1104
1105	logger: L,
1106	secp_ctx: Secp256k1<secp256k1::SignOnly>,
1107}
1108
1109enum LogicalMessage<T: core::fmt::Debug + wire::Type + wire::TestEq> {
1110	FromWire(wire::Message<T>),
1111	CommitmentSignedBatch(ChannelId, Vec<msgs::CommitmentSigned>),
1112}
1113
1114enum MessageHandlingError {
1115	PeerHandleError(PeerHandleError),
1116	LightningError(LightningError),
1117}
1118
1119impl From<PeerHandleError> for MessageHandlingError {
1120	fn from(error: PeerHandleError) -> Self {
1121		MessageHandlingError::PeerHandleError(error)
1122	}
1123}
1124
1125impl From<LightningError> for MessageHandlingError {
1126	fn from(error: LightningError) -> Self {
1127		MessageHandlingError::LightningError(error)
1128	}
1129}
1130
1131macro_rules! encode_msg {
1132	($msg: expr) => {{
1133		let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE));
1134		wire::write($msg, &mut buffer).unwrap();
1135		buffer.0
1136	}};
1137}
1138
1139impl<Descriptor: SocketDescriptor, CM: Deref, OM: Deref, L: Deref, NS: Deref, SM: Deref>
1140	PeerManager<Descriptor, CM, IgnoringMessageHandler, OM, L, IgnoringMessageHandler, NS, SM>
1141where
1142	CM::Target: ChannelMessageHandler,
1143	OM::Target: OnionMessageHandler,
1144	L::Target: Logger,
1145	NS::Target: NodeSigner,
1146	SM::Target: SendOnlyMessageHandler,
1147{
1148	/// Constructs a new `PeerManager` with the given `ChannelMessageHandler` and
1149	/// `OnionMessageHandler`. No routing message handler is used and network graph messages are
1150	/// ignored.
1151	///
1152	/// `ephemeral_random_data` is used to derive per-connection ephemeral keys and must be
1153	/// cryptographically secure random bytes.
1154	///
1155	/// `current_time` is used as an always-increasing counter that survives across restarts and is
1156	/// incremented irregularly internally. In general it is best to simply use the current UNIX
1157	/// timestamp, however if it is not available a persistent counter that increases once per
1158	/// minute should suffice.
1159	///
1160	/// This is not exported to bindings users as we can't export a PeerManager with a dummy route handler
1161	pub fn new_channel_only(
1162		channel_message_handler: CM, onion_message_handler: OM, current_time: u32,
1163		ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS,
1164		send_only_message_handler: SM,
1165	) -> Self {
1166		Self::new(
1167			MessageHandler {
1168				chan_handler: channel_message_handler,
1169				route_handler: IgnoringMessageHandler {},
1170				onion_message_handler,
1171				custom_message_handler: IgnoringMessageHandler {},
1172				send_only_message_handler,
1173			},
1174			current_time,
1175			ephemeral_random_data,
1176			logger,
1177			node_signer,
1178		)
1179	}
1180}
1181
1182impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref, NS: Deref>
1183	PeerManager<
1184		Descriptor,
1185		ErroringMessageHandler,
1186		RM,
1187		IgnoringMessageHandler,
1188		L,
1189		IgnoringMessageHandler,
1190		NS,
1191		IgnoringMessageHandler,
1192	> where
1193	RM::Target: RoutingMessageHandler,
1194	L::Target: Logger,
1195	NS::Target: NodeSigner,
1196{
1197	/// Constructs a new `PeerManager` with the given `RoutingMessageHandler`. No channel message
1198	/// handler or onion message handler is used and onion and channel messages will be ignored (or
1199	/// generate error messages). Note that some other lightning implementations time-out connections
1200	/// after some time if no channel is built with the peer.
1201	///
1202	/// `current_time` is used as an always-increasing counter that survives across restarts and is
1203	/// incremented irregularly internally. In general it is best to simply use the current UNIX
1204	/// timestamp, however if it is not available a persistent counter that increases once per
1205	/// minute should suffice.
1206	///
1207	/// `ephemeral_random_data` is used to derive per-connection ephemeral keys and must be
1208	/// cryptographically secure random bytes.
1209	///
1210	/// This is not exported to bindings users as we can't export a PeerManager with a dummy channel handler
1211	pub fn new_routing_only(
1212		routing_message_handler: RM, current_time: u32, ephemeral_random_data: &[u8; 32],
1213		logger: L, node_signer: NS,
1214	) -> Self {
1215		Self::new(
1216			MessageHandler {
1217				chan_handler: ErroringMessageHandler::new(),
1218				route_handler: routing_message_handler,
1219				onion_message_handler: IgnoringMessageHandler {},
1220				custom_message_handler: IgnoringMessageHandler {},
1221				send_only_message_handler: IgnoringMessageHandler {},
1222			},
1223			current_time,
1224			ephemeral_random_data,
1225			logger,
1226			node_signer,
1227		)
1228	}
1229}
1230
1231/// A simple wrapper that optionally prints ` from <pubkey>` for an optional pubkey.
1232/// This works around `format!()` taking a reference to each argument, preventing
1233/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
1234/// due to lifetime errors.
1235struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>);
1236impl core::fmt::Display for OptionalFromDebugger<'_> {
1237	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
1238		if let Some((node_id, _)) = self.0 {
1239			write!(f, " from {}", node_id)
1240		} else {
1241			Ok(())
1242		}
1243	}
1244}
1245
1246/// A function used to filter out local or private addresses
1247/// <https://www.iana.org./assignments/ipv4-address-space/ipv4-address-space.xhtml>
1248/// <https://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xhtml>
1249fn filter_addresses(ip_address: Option<SocketAddress>) -> Option<SocketAddress> {
1250	match ip_address {
1251		// For IPv4 range 10.0.0.0 - 10.255.255.255 (10/8)
1252		Some(SocketAddress::TcpIpV4 { addr: [10, _, _, _], port: _ }) => None,
1253		// For IPv4 range 0.0.0.0 - 0.255.255.255 (0/8)
1254		Some(SocketAddress::TcpIpV4 { addr: [0, _, _, _], port: _ }) => None,
1255		// For IPv4 range 100.64.0.0 - 100.127.255.255 (100.64/10)
1256		Some(SocketAddress::TcpIpV4 { addr: [100, 64..=127, _, _], port: _ }) => None,
1257		// For IPv4 range  	127.0.0.0 - 127.255.255.255 (127/8)
1258		Some(SocketAddress::TcpIpV4 { addr: [127, _, _, _], port: _ }) => None,
1259		// For IPv4 range  	169.254.0.0 - 169.254.255.255 (169.254/16)
1260		Some(SocketAddress::TcpIpV4 { addr: [169, 254, _, _], port: _ }) => None,
1261		// For IPv4 range 172.16.0.0 - 172.31.255.255 (172.16/12)
1262		Some(SocketAddress::TcpIpV4 { addr: [172, 16..=31, _, _], port: _ }) => None,
1263		// For IPv4 range 192.168.0.0 - 192.168.255.255 (192.168/16)
1264		Some(SocketAddress::TcpIpV4 { addr: [192, 168, _, _], port: _ }) => None,
1265		// For IPv4 range 192.88.99.0 - 192.88.99.255  (192.88.99/24)
1266		Some(SocketAddress::TcpIpV4 { addr: [192, 88, 99, _], port: _ }) => None,
1267		// For IPv6 range 2000:0000:0000:0000:0000:0000:0000:0000 - 3fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff (2000::/3)
1268		Some(SocketAddress::TcpIpV6 {
1269			addr: [0x20..=0x3F, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],
1270			port: _,
1271		}) => ip_address,
1272		// For remaining addresses
1273		Some(SocketAddress::TcpIpV6 { addr: _, port: _ }) => None,
1274		Some(..) => ip_address,
1275		None => None,
1276	}
1277}
1278
1279impl<
1280		Descriptor: SocketDescriptor,
1281		CM: Deref,
1282		RM: Deref,
1283		OM: Deref,
1284		L: Deref,
1285		CMH: Deref,
1286		NS: Deref,
1287		SM: Deref,
1288	> PeerManager<Descriptor, CM, RM, OM, L, CMH, NS, SM>
1289where
1290	CM::Target: ChannelMessageHandler,
1291	RM::Target: RoutingMessageHandler,
1292	OM::Target: OnionMessageHandler,
1293	L::Target: Logger,
1294	CMH::Target: CustomMessageHandler,
1295	NS::Target: NodeSigner,
1296	SM::Target: SendOnlyMessageHandler,
1297{
1298	/// Constructs a new `PeerManager` with the given message handlers.
1299	///
1300	/// `ephemeral_random_data` is used to derive per-connection ephemeral keys and must be
1301	/// cryptographically secure random bytes.
1302	///
1303	/// `current_time` is used as an always-increasing counter that survives across restarts and is
1304	/// incremented irregularly internally. In general it is best to simply use the current UNIX
1305	/// timestamp, however if it is not available a persistent counter that increases once per
1306	/// minute should suffice.
1307	pub fn new(
1308		message_handler: MessageHandler<CM, RM, OM, CMH, SM>, current_time: u32,
1309		ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS,
1310	) -> Self {
1311		let mut ephemeral_key_midstate = Sha256::engine();
1312		ephemeral_key_midstate.input(ephemeral_random_data);
1313
1314		let mut secp_ctx = Secp256k1::signing_only();
1315		let ephemeral_hash = Sha256::from_engine(ephemeral_key_midstate.clone()).to_byte_array();
1316		secp_ctx.seeded_randomize(&ephemeral_hash);
1317
1318		PeerManager {
1319			message_handler,
1320			peers: FairRwLock::new(new_hash_map()),
1321			node_id_to_descriptor: Mutex::new(new_hash_map()),
1322			event_processing_state: AtomicI32::new(0),
1323			ephemeral_key_midstate,
1324			peer_counter: AtomicCounter::new(),
1325			gossip_processing_backlogged: AtomicBool::new(false),
1326			gossip_processing_backlog_lifted: AtomicBool::new(false),
1327			last_node_announcement_serial: AtomicU32::new(current_time),
1328			logger,
1329			node_signer,
1330			secp_ctx,
1331		}
1332	}
1333
1334	/// Returns a list of [`PeerDetails`] for connected peers that have completed the initial
1335	/// handshake.
1336	pub fn list_peers(&self) -> Vec<PeerDetails> {
1337		let peers = self.peers.read().unwrap();
1338		let filter_fn = |peer_mutex: &Mutex<Peer>| {
1339			let p = peer_mutex.lock().unwrap();
1340			if !p.handshake_complete() {
1341				return None;
1342			}
1343			let details = PeerDetails {
1344				// unwrap safety: their_node_id is guaranteed to be `Some` after the handshake
1345				// completed.
1346				counterparty_node_id: p.their_node_id.unwrap().0,
1347				socket_address: p.their_socket_address.clone(),
1348				// unwrap safety: their_features is guaranteed to be `Some` after the handshake
1349				// completed.
1350				init_features: p.their_features.clone().unwrap(),
1351				is_inbound_connection: p.inbound_connection,
1352			};
1353			Some(details)
1354		};
1355		peers.values().filter_map(filter_fn).collect()
1356	}
1357
1358	/// Returns the [`PeerDetails`] of a connected peer that has completed the initial handshake.
1359	///
1360	/// Will return `None` if the peer is unknown or it hasn't completed the initial handshake.
1361	pub fn peer_by_node_id(&self, their_node_id: &PublicKey) -> Option<PeerDetails> {
1362		let peers = self.peers.read().unwrap();
1363		peers.values().find_map(|peer_mutex| {
1364			let p = peer_mutex.lock().unwrap();
1365			if !p.handshake_complete() {
1366				return None;
1367			}
1368
1369			// unwrap safety: their_node_id is guaranteed to be `Some` after the handshake
1370			// completed.
1371			let counterparty_node_id = p.their_node_id.unwrap().0;
1372
1373			if counterparty_node_id != *their_node_id {
1374				return None;
1375			}
1376
1377			let details = PeerDetails {
1378				counterparty_node_id,
1379				socket_address: p.their_socket_address.clone(),
1380				// unwrap safety: their_features is guaranteed to be `Some` after the handshake
1381				// completed.
1382				init_features: p.their_features.clone().unwrap(),
1383				is_inbound_connection: p.inbound_connection,
1384			};
1385			Some(details)
1386		})
1387	}
1388
1389	fn get_ephemeral_key(&self) -> SecretKey {
1390		let mut ephemeral_hash = self.ephemeral_key_midstate.clone();
1391		let counter = self.peer_counter.next();
1392		ephemeral_hash.input(&counter.to_le_bytes());
1393		SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).to_byte_array())
1394			.expect("You broke SHA-256!")
1395	}
1396
1397	fn init_features(&self, their_node_id: PublicKey) -> InitFeatures {
1398		self.message_handler.chan_handler.provided_init_features(their_node_id)
1399			| self.message_handler.route_handler.provided_init_features(their_node_id)
1400			| self.message_handler.onion_message_handler.provided_init_features(their_node_id)
1401			| self.message_handler.custom_message_handler.provided_init_features(their_node_id)
1402			| self.message_handler.send_only_message_handler.provided_init_features(their_node_id)
1403	}
1404
1405	/// Indicates a new outbound connection has been established to a node with the given `node_id`
1406	/// and an optional remote network address.
1407	///
1408	/// The remote network address adds the option to report a remote IP address back to a connecting
1409	/// peer using the init message.
1410	/// The user should pass the remote network address of the host they are connected to.
1411	///
1412	/// If an `Err` is returned here you must disconnect the connection immediately.
1413	///
1414	/// Returns a small number of bytes to send to the remote node (currently always 50).
1415	///
1416	/// Panics if descriptor is duplicative with some other descriptor which has not yet been
1417	/// [`socket_disconnected`].
1418	///
1419	/// [`socket_disconnected`]: PeerManager::socket_disconnected
1420	pub fn new_outbound_connection(
1421		&self, their_node_id: PublicKey, descriptor: Descriptor,
1422		remote_network_address: Option<SocketAddress>,
1423	) -> Result<Vec<u8>, PeerHandleError> {
1424		let mut peer_encryptor =
1425			PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key());
1426		let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec();
1427		let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
1428
1429		let mut peers = self.peers.write().unwrap();
1430		match peers.entry(descriptor) {
1431			hash_map::Entry::Occupied(_) => {
1432				debug_assert!(false, "PeerManager driver duplicated descriptors!");
1433				Err(PeerHandleError {})
1434			},
1435			hash_map::Entry::Vacant(e) => {
1436				e.insert(Mutex::new(Peer {
1437					channel_encryptor: peer_encryptor,
1438					their_node_id: None,
1439					their_features: None,
1440					their_socket_address: remote_network_address,
1441
1442					pending_outbound_buffer: VecDeque::new(),
1443					pending_outbound_buffer_first_msg_offset: 0,
1444					gossip_broadcast_buffer: VecDeque::new(),
1445					awaiting_write_event: false,
1446					sent_pause_read: false,
1447
1448					pending_read_buffer,
1449					pending_read_buffer_pos: 0,
1450					pending_read_is_header: false,
1451
1452					sync_status: InitSyncTracker::NoSyncRequested,
1453
1454					msgs_sent_since_pong: 0,
1455					awaiting_pong_timer_tick_intervals: 0,
1456					received_message_since_timer_tick: false,
1457					sent_gossip_timestamp_filter: false,
1458
1459					received_channel_announce_since_backlogged: false,
1460					inbound_connection: false,
1461
1462					message_batch: None,
1463				}));
1464				Ok(res)
1465			},
1466		}
1467	}
1468
1469	/// Indicates a new inbound connection has been established to a node with an optional remote
1470	/// network address.
1471	///
1472	/// The remote network address adds the option to report a remote IP address back to a connecting
1473	/// peer using the init message.
1474	/// The user should pass the remote network address of the host they are connected to.
1475	///
1476	/// May refuse the connection by returning an Err, but will never write bytes to the remote end
1477	/// (outbound connector always speaks first). If an `Err` is returned here you must disconnect
1478	/// the connection immediately.
1479	///
1480	/// Panics if descriptor is duplicative with some other descriptor which has not yet been
1481	/// [`socket_disconnected`].
1482	///
1483	/// [`socket_disconnected`]: PeerManager::socket_disconnected
1484	pub fn new_inbound_connection(
1485		&self, descriptor: Descriptor, remote_network_address: Option<SocketAddress>,
1486	) -> Result<(), PeerHandleError> {
1487		let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.node_signer);
1488		let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
1489
1490		let mut peers = self.peers.write().unwrap();
1491		match peers.entry(descriptor) {
1492			hash_map::Entry::Occupied(_) => {
1493				debug_assert!(false, "PeerManager driver duplicated descriptors!");
1494				Err(PeerHandleError {})
1495			},
1496			hash_map::Entry::Vacant(e) => {
1497				e.insert(Mutex::new(Peer {
1498					channel_encryptor: peer_encryptor,
1499					their_node_id: None,
1500					their_features: None,
1501					their_socket_address: remote_network_address,
1502
1503					pending_outbound_buffer: VecDeque::new(),
1504					pending_outbound_buffer_first_msg_offset: 0,
1505					gossip_broadcast_buffer: VecDeque::new(),
1506					awaiting_write_event: false,
1507					sent_pause_read: false,
1508
1509					pending_read_buffer,
1510					pending_read_buffer_pos: 0,
1511					pending_read_is_header: false,
1512
1513					sync_status: InitSyncTracker::NoSyncRequested,
1514
1515					msgs_sent_since_pong: 0,
1516					awaiting_pong_timer_tick_intervals: 0,
1517					received_message_since_timer_tick: false,
1518					sent_gossip_timestamp_filter: false,
1519
1520					received_channel_announce_since_backlogged: false,
1521					inbound_connection: true,
1522
1523					message_batch: None,
1524				}));
1525				Ok(())
1526			},
1527		}
1528	}
1529
1530	fn should_read_from(&self, peer: &mut Peer) -> bool {
1531		peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
1532	}
1533
1534	fn update_gossip_backlogged(&self) {
1535		let new_state = self.message_handler.route_handler.processing_queue_high();
1536		let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
1537		if prev_state && !new_state {
1538			self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
1539		}
1540	}
1541
1542	fn do_attempt_write_data(
1543		&self, descriptor: &mut Descriptor, peer: &mut Peer, mut force_one_write: bool,
1544	) {
1545		// If we detect that we should be reading from the peer but reads are currently paused, or
1546		// vice versa, then we need to tell the socket driver to update their internal flag
1547		// indicating whether or not reads are paused. Do this by forcing a write with the desired
1548		// `continue_read` flag set, even if no outbound messages are currently queued.
1549		force_one_write |= self.should_read_from(peer) == peer.sent_pause_read;
1550		while force_one_write || !peer.awaiting_write_event {
1551			if peer.should_buffer_onion_message() {
1552				if let Some((peer_node_id, _)) = peer.their_node_id {
1553					let handler = &self.message_handler.onion_message_handler;
1554					if let Some(next_onion_message) =
1555						handler.next_onion_message_for_peer(peer_node_id)
1556					{
1557						self.enqueue_message(peer, &next_onion_message);
1558					}
1559				}
1560			}
1561			if peer.should_buffer_gossip_broadcast() {
1562				if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
1563					peer.msgs_sent_since_pong += 1;
1564					peer.pending_outbound_buffer
1565						.push_back(peer.channel_encryptor.encrypt_buffer(msg));
1566				}
1567			}
1568			if peer.should_buffer_gossip_backfill() {
1569				match peer.sync_status {
1570					InitSyncTracker::NoSyncRequested => {},
1571					InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
1572						if let Some((announce, update_a_option, update_b_option)) =
1573							self.message_handler.route_handler.get_next_channel_announcement(c)
1574						{
1575							self.enqueue_message(peer, &announce);
1576							if let Some(update_a) = update_a_option {
1577								self.enqueue_message(peer, &update_a);
1578							}
1579							if let Some(update_b) = update_b_option {
1580								self.enqueue_message(peer, &update_b);
1581							}
1582							peer.sync_status = InitSyncTracker::ChannelsSyncing(
1583								announce.contents.short_channel_id + 1,
1584							);
1585						} else {
1586							peer.sync_status =
1587								InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
1588						}
1589					},
1590					InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
1591						let handler = &self.message_handler.route_handler;
1592						if let Some(msg) = handler.get_next_node_announcement(None) {
1593							self.enqueue_message(peer, &msg);
1594							peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
1595						} else {
1596							peer.sync_status = InitSyncTracker::NoSyncRequested;
1597						}
1598					},
1599					InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
1600					InitSyncTracker::NodesSyncing(sync_node_id) => {
1601						let handler = &self.message_handler.route_handler;
1602						if let Some(msg) = handler.get_next_node_announcement(Some(&sync_node_id)) {
1603							self.enqueue_message(peer, &msg);
1604							peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
1605						} else {
1606							peer.sync_status = InitSyncTracker::NoSyncRequested;
1607						}
1608					},
1609				}
1610			}
1611			if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
1612				self.maybe_send_extra_ping(peer);
1613			}
1614
1615			let should_read = self.should_read_from(peer);
1616			let next_buff = match peer.pending_outbound_buffer.front() {
1617				None => {
1618					if force_one_write {
1619						let data_sent = descriptor.send_data(&[], should_read);
1620						debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1621						peer.sent_pause_read = !should_read;
1622					}
1623					return;
1624				},
1625				Some(buff) => buff,
1626			};
1627			force_one_write = false;
1628
1629			let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
1630			let data_sent = descriptor.send_data(pending, should_read);
1631			peer.sent_pause_read = !should_read;
1632			peer.pending_outbound_buffer_first_msg_offset += data_sent;
1633			if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
1634				peer.pending_outbound_buffer_first_msg_offset = 0;
1635				peer.pending_outbound_buffer.pop_front();
1636				const VEC_SIZE: usize = ::core::mem::size_of::<Vec<u8>>();
1637				let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE;
1638				let lots_of_slack = peer.pending_outbound_buffer.len()
1639					< peer.pending_outbound_buffer.capacity() / 2;
1640				if large_capacity && lots_of_slack {
1641					peer.pending_outbound_buffer.shrink_to_fit();
1642				}
1643			} else {
1644				peer.awaiting_write_event = true;
1645			}
1646		}
1647	}
1648
1649	/// Indicates that there is room to write data to the given socket descriptor.
1650	///
1651	/// May return an Err to indicate that the connection should be closed.
1652	///
1653	/// May call [`send_data`] on the descriptor passed in (or an equal descriptor) before
1654	/// returning. Thus, be very careful with reentrancy issues! The invariants around calling
1655	/// [`write_buffer_space_avail`] in case a write did not fully complete must still hold - be
1656	/// ready to call [`write_buffer_space_avail`] again if a write call generated here isn't
1657	/// sufficient!
1658	///
1659	/// [`send_data`]: SocketDescriptor::send_data
1660	/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
1661	pub fn write_buffer_space_avail(
1662		&self, descriptor: &mut Descriptor,
1663	) -> Result<(), PeerHandleError> {
1664		let peers = self.peers.read().unwrap();
1665		match peers.get(descriptor) {
1666			None => {
1667				// This is most likely a simple race condition where the user found that the socket
1668				// was writeable, then we told the user to `disconnect_socket()`, then they called
1669				// this method. Return an error to make sure we get disconnected.
1670				return Err(PeerHandleError {});
1671			},
1672			Some(peer_mutex) => {
1673				let mut peer = peer_mutex.lock().unwrap();
1674				peer.awaiting_write_event = false;
1675				// We go ahead and force at least one write here, because if we don't have any
1676				// messages to send and the net driver thought we did that's weird, so they might
1677				// also have a confused read-paused state that we should go ahead and clear.
1678				self.do_attempt_write_data(descriptor, &mut peer, true);
1679			},
1680		};
1681		Ok(())
1682	}
1683
1684	/// Indicates that data was read from the given socket descriptor.
1685	///
1686	/// May return an Err to indicate that the connection should be closed.
1687	///
1688	/// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
1689	/// Thus, however, you should call [`process_events`] after any `read_event` to generate
1690	/// [`send_data`] calls to handle responses. This is also important to give [`send_data`] calls
1691	/// a chance to pause reads if too many messages have been queued in response allowing a peer
1692	/// to bloat our memory.
1693	///
1694	/// In order to avoid processing too many messages at once per peer, `data` should be on the
1695	/// order of 4KiB.
1696	///
1697	/// [`send_data`]: SocketDescriptor::send_data
1698	/// [`process_events`]: PeerManager::process_events
1699	pub fn read_event(
1700		&self, peer_descriptor: &mut Descriptor, data: &[u8],
1701	) -> Result<(), PeerHandleError> {
1702		match self.do_read_event(peer_descriptor, data) {
1703			Ok(res) => Ok(res),
1704			Err(e) => {
1705				self.disconnect_event_internal(peer_descriptor, "of a protocol error");
1706				Err(e)
1707			},
1708		}
1709	}
1710
1711	/// Append a message to a peer's pending outbound/write buffer
1712	fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
1713		let their_node_id = peer.their_node_id.map(|p| p.0);
1714		if let Some(node_id) = their_node_id {
1715			let logger = WithContext::from(&self.logger, their_node_id, None, None);
1716			if is_gossip_msg(message.type_id()) {
1717				log_gossip!(logger, "Enqueueing message {:?} to {}", message, node_id);
1718			} else {
1719				log_trace!(logger, "Enqueueing message {:?} to {}", message, node_id);
1720			}
1721		} else {
1722			debug_assert!(false, "node_id should be set by the time we send a message");
1723		}
1724		peer.msgs_sent_since_pong += 1;
1725		peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message));
1726	}
1727
1728	fn do_read_event(
1729		&self, peer_descriptor: &mut Descriptor, data: &[u8],
1730	) -> Result<(), PeerHandleError> {
1731		let peers = self.peers.read().unwrap();
1732		let mut msgs_to_forward = Vec::new();
1733		let mut peer_node_id = None;
1734
1735		if let Some(peer_mutex) = peers.get(peer_descriptor) {
1736			let mut read_pos = 0;
1737			while read_pos < data.len() {
1738				macro_rules! try_potential_handleerror {
1739					($peer: expr, $thing: expr) => {{
1740						let res = $thing;
1741						let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None, None);
1742						match res {
1743							Ok(x) => x,
1744							Err(e) => {
1745								match e.action {
1746									msgs::ErrorAction::DisconnectPeer { .. } => {
1747										// We may have an `ErrorMessage` to send to the peer,
1748										// but writing to the socket while reading can lead to
1749										// re-entrant code and possibly unexpected behavior. The
1750										// message send is optimistic anyway, and in this case
1751										// we immediately disconnect the peer.
1752										log_debug!(logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1753										return Err(PeerHandleError { });
1754									},
1755									msgs::ErrorAction::DisconnectPeerWithWarning { .. } => {
1756										// We have a `WarningMessage` to send to the peer, but
1757										// writing to the socket while reading can lead to
1758										// re-entrant code and possibly unexpected behavior. The
1759										// message send is optimistic anyway, and in this case
1760										// we immediately disconnect the peer.
1761										log_debug!(logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1762										return Err(PeerHandleError { });
1763									},
1764									msgs::ErrorAction::IgnoreAndLog(level) => {
1765										log_given_level!(logger, level, "Error handling {}message{}; ignoring: {}",
1766											 if level == Level::Gossip { "gossip " } else { "" },
1767											 OptionalFromDebugger(&peer_node_id), e.err);
1768										continue
1769									},
1770									msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these
1771									msgs::ErrorAction::IgnoreError => {
1772										log_debug!(logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err);
1773										continue;
1774									},
1775									msgs::ErrorAction::SendErrorMessage { msg } => {
1776										log_debug!(logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1777										self.enqueue_message($peer, &msg);
1778										continue;
1779									},
1780									msgs::ErrorAction::SendWarningMessage { msg, log_level } => {
1781										log_given_level!(logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err);
1782										self.enqueue_message($peer, &msg);
1783										continue;
1784									},
1785								}
1786							}
1787						}
1788					}}
1789				}
1790
1791				let mut peer_lock = peer_mutex.lock().unwrap();
1792				let peer = &mut *peer_lock;
1793				let mut msg_to_handle = None;
1794				if peer_node_id.is_none() {
1795					peer_node_id.clone_from(&peer.their_node_id);
1796				}
1797
1798				assert!(peer.pending_read_buffer.len() > 0);
1799				assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
1800
1801				{
1802					let data_to_copy = cmp::min(
1803						peer.pending_read_buffer.len() - peer.pending_read_buffer_pos,
1804						data.len() - read_pos,
1805					);
1806					peer.pending_read_buffer
1807						[peer.pending_read_buffer_pos..peer.pending_read_buffer_pos + data_to_copy]
1808						.copy_from_slice(&data[read_pos..read_pos + data_to_copy]);
1809					read_pos += data_to_copy;
1810					peer.pending_read_buffer_pos += data_to_copy;
1811				}
1812
1813				if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() {
1814					peer.pending_read_buffer_pos = 0;
1815
1816					macro_rules! insert_node_id {
1817						() => {
1818							let their_node_id = if let Some((node_id, _)) = peer.their_node_id {
1819								node_id
1820							} else {
1821								debug_assert!(false, "Should have a node_id to insert");
1822								return Err(PeerHandleError {});
1823							};
1824							let logger = WithContext::from(&self.logger, Some(their_node_id), None, None);
1825							match self.node_id_to_descriptor.lock().unwrap().entry(their_node_id) {
1826								hash_map::Entry::Occupied(e) => {
1827									log_trace!(logger, "Got second connection with {}, closing", their_node_id);
1828									// Unset `their_node_id` so that we don't generate a peer_disconnected event
1829									peer.their_node_id = None;
1830									// Check that the peers map is consistent with the
1831									// node_id_to_descriptor map, as this has been broken
1832									// before.
1833									debug_assert!(peers.get(e.get()).is_some());
1834									return Err(PeerHandleError { })
1835								},
1836								hash_map::Entry::Vacant(entry) => {
1837									log_debug!(logger, "Finished noise handshake for connection with {}", their_node_id);
1838									entry.insert(peer_descriptor.clone())
1839								},
1840							};
1841						}
1842					}
1843
1844					let next_step = peer.channel_encryptor.get_noise_step();
1845					match next_step {
1846						NextNoiseStep::ActOne => {
1847							let res = peer.channel_encryptor.process_act_one_with_keys(
1848								&peer.pending_read_buffer[..],
1849								&self.node_signer,
1850								self.get_ephemeral_key(),
1851								&self.secp_ctx,
1852							);
1853							let act_two = try_potential_handleerror!(peer, res).to_vec();
1854							peer.pending_outbound_buffer.push_back(act_two);
1855							peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long
1856						},
1857						NextNoiseStep::ActTwo => {
1858							let res = peer
1859								.channel_encryptor
1860								.process_act_two(&peer.pending_read_buffer[..], &self.node_signer);
1861							let (act_three, their_node_id) = try_potential_handleerror!(peer, res);
1862							peer.pending_outbound_buffer.push_back(act_three.to_vec());
1863							peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
1864							peer.pending_read_is_header = true;
1865
1866							peer.set_their_node_id(their_node_id);
1867							insert_node_id!();
1868							let features = self.init_features(their_node_id);
1869							let networks = self.message_handler.chan_handler.get_chain_hashes();
1870							let resp = msgs::Init {
1871								features,
1872								networks,
1873								remote_network_address: filter_addresses(
1874									peer.their_socket_address.clone(),
1875								),
1876							};
1877							self.enqueue_message(peer, &resp);
1878						},
1879						NextNoiseStep::ActThree => {
1880							let res = peer
1881								.channel_encryptor
1882								.process_act_three(&peer.pending_read_buffer[..]);
1883							let their_node_id = try_potential_handleerror!(peer, res);
1884							peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
1885							peer.pending_read_is_header = true;
1886							peer.set_their_node_id(their_node_id);
1887							insert_node_id!();
1888							let features = self.init_features(their_node_id);
1889							let networks = self.message_handler.chan_handler.get_chain_hashes();
1890							let resp = msgs::Init {
1891								features,
1892								networks,
1893								remote_network_address: filter_addresses(
1894									peer.their_socket_address.clone(),
1895								),
1896							};
1897							self.enqueue_message(peer, &resp);
1898						},
1899						NextNoiseStep::NoiseComplete => {
1900							if peer.pending_read_is_header {
1901								let res = peer
1902									.channel_encryptor
1903									.decrypt_length_header(&peer.pending_read_buffer[..]);
1904								let msg_len = try_potential_handleerror!(peer, res);
1905								if peer.pending_read_buffer.capacity() > 8192 {
1906									peer.pending_read_buffer = Vec::new();
1907								}
1908								peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
1909								if msg_len < 2 {
1910									// Need at least the message type tag
1911									return Err(PeerHandleError {});
1912								}
1913								peer.pending_read_is_header = false;
1914							} else {
1915								debug_assert!(peer.pending_read_buffer.len() >= 2 + 16);
1916								let res = peer
1917									.channel_encryptor
1918									.decrypt_message(&mut peer.pending_read_buffer[..]);
1919								try_potential_handleerror!(peer, res);
1920
1921								let message_result = wire::read(
1922									&mut &peer.pending_read_buffer
1923										[..peer.pending_read_buffer.len() - 16],
1924									&*self.message_handler.custom_message_handler,
1925								);
1926
1927								// Reset read buffer
1928								if peer.pending_read_buffer.capacity() > 8192 {
1929									peer.pending_read_buffer = Vec::new();
1930								}
1931								peer.pending_read_buffer.resize(18, 0);
1932								peer.pending_read_is_header = true;
1933
1934								let their_node_id = peer.their_node_id.map(|p| p.0);
1935								let logger =
1936									WithContext::from(&self.logger, their_node_id, None, None);
1937								let message = match message_result {
1938									Ok(x) => x,
1939									Err(e) => {
1940										match e {
1941											// Note that to avoid re-entrancy we never call
1942											// `do_attempt_write_data` from here, causing
1943											// the messages enqueued here to not actually
1944											// be sent before the peer is disconnected.
1945											(
1946												msgs::DecodeError::UnknownRequiredFeature,
1947												Some(ty),
1948											) if is_gossip_msg(ty) => {
1949												log_gossip!(logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!");
1950												continue;
1951											},
1952											(msgs::DecodeError::UnsupportedCompression, _) => {
1953												log_gossip!(logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message");
1954												let channel_id = ChannelId::new_zero();
1955												let data = "Unsupported message compression: zlib"
1956													.to_owned();
1957												let msg = msgs::WarningMessage { channel_id, data };
1958												self.enqueue_message(peer, &msg);
1959												continue;
1960											},
1961											(_, Some(ty)) if is_gossip_msg(ty) => {
1962												log_gossip!(logger, "Got an invalid value while deserializing a gossip message");
1963												let channel_id = ChannelId::new_zero();
1964												let data = format!(
1965													"Unreadable/bogus gossip message of type {}",
1966													ty
1967												);
1968												let msg = msgs::WarningMessage { channel_id, data };
1969												self.enqueue_message(peer, &msg);
1970												continue;
1971											},
1972											(msgs::DecodeError::UnknownRequiredFeature, _) => {
1973												log_debug!(logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
1974												return Err(PeerHandleError {});
1975											},
1976											(msgs::DecodeError::UnknownVersion, _) => {
1977												return Err(PeerHandleError {})
1978											},
1979											(msgs::DecodeError::InvalidValue, _) => {
1980												log_debug!(logger, "Got an invalid value while deserializing message");
1981												return Err(PeerHandleError {});
1982											},
1983											(msgs::DecodeError::ShortRead, _) => {
1984												log_debug!(logger, "Deserialization failed due to shortness of message");
1985												return Err(PeerHandleError {});
1986											},
1987											(msgs::DecodeError::BadLengthDescriptor, _) => {
1988												return Err(PeerHandleError {})
1989											},
1990											(msgs::DecodeError::Io(_), _) => {
1991												return Err(PeerHandleError {})
1992											},
1993											(msgs::DecodeError::DangerousValue, _) => {
1994												return Err(PeerHandleError {})
1995											},
1996										}
1997									},
1998								};
1999
2000								msg_to_handle = Some(message);
2001							}
2002						},
2003					}
2004				}
2005
2006				if let Some(message) = msg_to_handle {
2007					match self.handle_message(&peer_mutex, peer_lock, message) {
2008						Err(handling_error) => match handling_error {
2009							MessageHandlingError::PeerHandleError(e) => return Err(e),
2010							MessageHandlingError::LightningError(e) => {
2011								try_potential_handleerror!(&mut peer_mutex.lock().unwrap(), Err(e));
2012							},
2013						},
2014						Ok(Some(msg)) => {
2015							msgs_to_forward.push(msg);
2016						},
2017						Ok(None) => {},
2018					}
2019				}
2020			}
2021		} else {
2022			// This is most likely a simple race condition where the user read some bytes
2023			// from the socket, then we told the user to `disconnect_socket()`, then they
2024			// called this method. Return an error to make sure we get disconnected.
2025			return Err(PeerHandleError {});
2026		}
2027
2028		for msg in msgs_to_forward.drain(..) {
2029			self.forward_broadcast_msg(
2030				&*peers,
2031				&msg,
2032				peer_node_id.as_ref().map(|(pk, _)| pk),
2033				false,
2034			);
2035		}
2036
2037		Ok(())
2038	}
2039
2040	/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
2041	///
2042	/// Returns the message back if it needs to be broadcasted to all other peers.
2043	fn handle_message(
2044		&self, peer_mutex: &Mutex<Peer>, peer_lock: MutexGuard<Peer>,
2045		message: wire::Message<
2046			<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage,
2047		>,
2048	) -> Result<
2049		Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>,
2050		MessageHandlingError,
2051	> {
2052		let their_node_id = peer_lock
2053			.their_node_id
2054			.expect("We know the peer's public key by the time we receive messages")
2055			.0;
2056		let logger = WithContext::from(&self.logger, Some(their_node_id), None, None);
2057
2058		let unprocessed_message =
2059			self.do_handle_message_holding_peer_lock(peer_lock, message, their_node_id, &logger)?;
2060
2061		self.message_handler.chan_handler.message_received();
2062
2063		match unprocessed_message {
2064			Some(LogicalMessage::FromWire(message)) => self.do_handle_message_without_peer_lock(
2065				peer_mutex,
2066				message,
2067				their_node_id,
2068				&logger,
2069			),
2070			Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)) => {
2071				log_trace!(
2072					logger,
2073					"Received commitment_signed batch {:?} from {}",
2074					batch,
2075					their_node_id,
2076				);
2077				let chan_handler = &self.message_handler.chan_handler;
2078				chan_handler.handle_commitment_signed_batch(their_node_id, channel_id, batch);
2079				return Ok(None);
2080			},
2081			None => Ok(None),
2082		}
2083	}
2084
2085	// Conducts all message processing that requires us to hold the `peer_lock`.
2086	//
2087	// Returns `None` if the message was fully processed and otherwise returns the message back to
2088	// allow it to be subsequently processed by `do_handle_message_without_peer_lock`.
2089	fn do_handle_message_holding_peer_lock<'a>(
2090		&self, mut peer_lock: MutexGuard<Peer>,
2091		message: wire::Message<
2092			<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage,
2093		>,
2094		their_node_id: PublicKey, logger: &WithContext<'a, L>,
2095	) -> Result<
2096		Option<
2097			LogicalMessage<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
2098		>,
2099		MessageHandlingError,
2100	> {
2101		peer_lock.received_message_since_timer_tick = true;
2102
2103		// Need an Init as first message
2104		if let wire::Message::Init(msg) = message {
2105			// Check if we have any compatible chains if the `networks` field is specified.
2106			if let Some(networks) = &msg.networks {
2107				let chan_handler = &self.message_handler.chan_handler;
2108				if let Some(our_chains) = chan_handler.get_chain_hashes() {
2109					let mut have_compatible_chains = false;
2110					'our_chains: for our_chain in our_chains.iter() {
2111						for their_chain in networks {
2112							if our_chain == their_chain {
2113								have_compatible_chains = true;
2114								break 'our_chains;
2115							}
2116						}
2117					}
2118					if !have_compatible_chains {
2119						log_debug!(logger, "Peer does not support any of our supported chains");
2120						return Err(PeerHandleError {}.into());
2121					}
2122				}
2123			}
2124
2125			let our_features = self.init_features(their_node_id);
2126			if msg.features.requires_unknown_bits_from(&our_features) {
2127				log_debug!(
2128					logger,
2129					"Peer {} requires features unknown to us: {:?}",
2130					their_node_id,
2131					msg.features.required_unknown_bits_from(&our_features)
2132				);
2133				return Err(PeerHandleError {}.into());
2134			}
2135
2136			if our_features.requires_unknown_bits_from(&msg.features) {
2137				log_debug!(
2138					logger,
2139					"We require features unknown to our peer {}: {:?}",
2140					their_node_id,
2141					our_features.required_unknown_bits_from(&msg.features)
2142				);
2143				return Err(PeerHandleError {}.into());
2144			}
2145
2146			if peer_lock.their_features.is_some() {
2147				return Err(PeerHandleError {}.into());
2148			}
2149
2150			log_info!(
2151				logger,
2152				"Received peer Init message from {}: {}",
2153				their_node_id,
2154				msg.features
2155			);
2156
2157			// For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
2158			if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
2159				peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
2160			}
2161
2162			let inbound = peer_lock.inbound_connection;
2163			let route_handler = &self.message_handler.route_handler;
2164			if let Err(()) = route_handler.peer_connected(their_node_id, &msg, inbound) {
2165				log_debug!(
2166					logger,
2167					"Route Handler decided we couldn't communicate with peer {}",
2168					their_node_id,
2169				);
2170				return Err(PeerHandleError {}.into());
2171			}
2172			let chan_handler = &self.message_handler.chan_handler;
2173			if let Err(()) = chan_handler.peer_connected(their_node_id, &msg, inbound) {
2174				log_debug!(
2175					logger,
2176					"Channel Handler decided we couldn't communicate with peer {}",
2177					their_node_id,
2178				);
2179				self.message_handler.route_handler.peer_disconnected(their_node_id);
2180				return Err(PeerHandleError {}.into());
2181			}
2182			let onion_message_handler = &self.message_handler.onion_message_handler;
2183			if let Err(()) = onion_message_handler.peer_connected(their_node_id, &msg, inbound) {
2184				log_debug!(
2185					logger,
2186					"Onion Message Handler decided we couldn't communicate with peer {}",
2187					their_node_id,
2188				);
2189				self.message_handler.route_handler.peer_disconnected(their_node_id);
2190				self.message_handler.chan_handler.peer_disconnected(their_node_id);
2191				return Err(PeerHandleError {}.into());
2192			}
2193			let custom_handler = &self.message_handler.custom_message_handler;
2194			if let Err(()) = custom_handler.peer_connected(their_node_id, &msg, inbound) {
2195				log_debug!(
2196					logger,
2197					"Custom Message Handler decided we couldn't communicate with peer {}",
2198					their_node_id,
2199				);
2200				self.message_handler.route_handler.peer_disconnected(their_node_id);
2201				self.message_handler.chan_handler.peer_disconnected(their_node_id);
2202				self.message_handler.onion_message_handler.peer_disconnected(their_node_id);
2203				return Err(PeerHandleError {}.into());
2204			}
2205			let sends_handler = &self.message_handler.send_only_message_handler;
2206			if let Err(()) = sends_handler.peer_connected(their_node_id, &msg, inbound) {
2207				log_debug!(
2208					logger,
2209					"Sending-Only Message Handler decided we couldn't communicate with peer {}",
2210					their_node_id,
2211				);
2212				self.message_handler.route_handler.peer_disconnected(their_node_id);
2213				self.message_handler.chan_handler.peer_disconnected(their_node_id);
2214				self.message_handler.onion_message_handler.peer_disconnected(their_node_id);
2215				self.message_handler.custom_message_handler.peer_disconnected(their_node_id);
2216				return Err(PeerHandleError {}.into());
2217			}
2218
2219			peer_lock.awaiting_pong_timer_tick_intervals = 0;
2220			peer_lock.their_features = Some(msg.features);
2221			return Ok(None);
2222		} else if peer_lock.their_features.is_none() {
2223			log_debug!(logger, "Peer {} sent non-Init first message", their_node_id);
2224			return Err(PeerHandleError {}.into());
2225		}
2226
2227		// During splicing, commitment_signed messages need to be collected into a single batch
2228		// before they are handled.
2229		if let wire::Message::StartBatch(msg) = message {
2230			if peer_lock.message_batch.is_some() {
2231				let error = format!(
2232					"Peer {} sent start_batch for channel {} before previous batch completed",
2233					their_node_id, &msg.channel_id
2234				);
2235				log_debug!(logger, "{}", error);
2236				return Err(LightningError {
2237					err: error.clone(),
2238					action: msgs::ErrorAction::DisconnectPeerWithWarning {
2239						msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2240					},
2241				}
2242				.into());
2243			}
2244
2245			let batch_size = msg.batch_size as usize;
2246			if batch_size <= 1 {
2247				let error = format!(
2248					"Peer {} sent start_batch for channel {} not strictly greater than 1",
2249					their_node_id, &msg.channel_id
2250				);
2251				log_debug!(logger, "{}", error);
2252				return Err(LightningError {
2253					err: error.clone(),
2254					action: msgs::ErrorAction::SendWarningMessage {
2255						msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2256						log_level: Level::Debug,
2257					},
2258				}
2259				.into());
2260			}
2261
2262			const BATCH_SIZE_LIMIT: usize = 20;
2263			if batch_size > BATCH_SIZE_LIMIT {
2264				let error = format!(
2265					"Peer {} sent start_batch for channel {} exceeding the limit",
2266					their_node_id, &msg.channel_id
2267				);
2268				log_debug!(logger, "{}", error);
2269				return Err(LightningError {
2270					err: error.clone(),
2271					action: msgs::ErrorAction::DisconnectPeerWithWarning {
2272						msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2273					},
2274				}
2275				.into());
2276			}
2277
2278			let messages = match msg.message_type {
2279				Some(message_type) if message_type == msgs::CommitmentSigned::TYPE => {
2280					let messages = Vec::with_capacity(batch_size);
2281					MessageBatchImpl::CommitmentSigned(messages)
2282				},
2283				_ => {
2284					log_debug!(
2285						logger,
2286						"Peer {} sent start_batch for channel {} without a known message type; ignoring",
2287						their_node_id,
2288						&msg.channel_id,
2289					);
2290					return Ok(None);
2291				},
2292			};
2293
2294			let message_batch = MessageBatch { channel_id: msg.channel_id, batch_size, messages };
2295			peer_lock.message_batch = Some(message_batch);
2296
2297			return Ok(None);
2298		}
2299
2300		if let wire::Message::CommitmentSigned(msg) = message {
2301			if let Some(message_batch) = &mut peer_lock.message_batch {
2302				let MessageBatchImpl::CommitmentSigned(ref mut messages) =
2303					&mut message_batch.messages;
2304
2305				if msg.channel_id != message_batch.channel_id {
2306					let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", their_node_id, message_batch.channel_id, &msg.channel_id);
2307					log_debug!(logger, "{}", error);
2308					return Err(LightningError {
2309						err: error.clone(),
2310						action: msgs::ErrorAction::DisconnectPeerWithWarning {
2311							msg: msgs::WarningMessage { channel_id: msg.channel_id, data: error },
2312						},
2313					}
2314					.into());
2315				}
2316
2317				messages.push(msg);
2318
2319				if messages.len() == message_batch.batch_size {
2320					let MessageBatch { channel_id, batch_size: _, messages } =
2321						peer_lock.message_batch.take().expect("batch should have been inserted");
2322					let MessageBatchImpl::CommitmentSigned(batch) = messages;
2323
2324					return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
2325				} else {
2326					return Ok(None);
2327				}
2328			} else {
2329				return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
2330			}
2331		} else if let Some(message_batch) = &peer_lock.message_batch {
2332			match message_batch.messages {
2333				MessageBatchImpl::CommitmentSigned(_) => {
2334					log_debug!(
2335						logger,
2336						"Peer {} sent an unexpected message for a commitment_signed batch",
2337						their_node_id,
2338					);
2339				},
2340			}
2341
2342			return Err(PeerHandleError {}.into());
2343		}
2344
2345		if let wire::Message::GossipTimestampFilter(_msg) = message {
2346			// When supporting gossip messages, start initial gossip sync only after we receive
2347			// a GossipTimestampFilter
2348			if peer_lock.their_features.as_ref().unwrap().supports_gossip_queries()
2349				&& !peer_lock.sent_gossip_timestamp_filter
2350			{
2351				peer_lock.sent_gossip_timestamp_filter = true;
2352
2353				#[allow(unused_mut)]
2354				let mut should_do_full_sync = true;
2355				#[cfg(feature = "std")]
2356				{
2357					// Forward ad-hoc gossip if the timestamp range is less than six hours ago.
2358					// Otherwise, do a full sync.
2359					use std::time::{SystemTime, UNIX_EPOCH};
2360					let full_sync_threshold = SystemTime::now()
2361						.duration_since(UNIX_EPOCH)
2362						.expect("Time must be > 1970")
2363						.as_secs() - 6 * 3600;
2364					if (_msg.first_timestamp as u64) > full_sync_threshold {
2365						should_do_full_sync = false;
2366					}
2367				}
2368				if should_do_full_sync {
2369					peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
2370				} else {
2371					peer_lock.sync_status = InitSyncTracker::NoSyncRequested;
2372				}
2373			}
2374			return Ok(None);
2375		}
2376
2377		if let wire::Message::ChannelAnnouncement(ref _msg) = message {
2378			peer_lock.received_channel_announce_since_backlogged = true;
2379		}
2380
2381		Ok(Some(LogicalMessage::FromWire(message)))
2382	}
2383
2384	// Conducts all message processing that doesn't require us to hold the `peer_lock`.
2385	//
2386	// Returns the message back if it needs to be broadcasted to all other peers.
2387	fn do_handle_message_without_peer_lock<'a>(
2388		&self, peer_mutex: &Mutex<Peer>,
2389		message: wire::Message<
2390			<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage,
2391		>,
2392		their_node_id: PublicKey, logger: &WithContext<'a, L>,
2393	) -> Result<
2394		Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>,
2395		MessageHandlingError,
2396	> {
2397		if is_gossip_msg(message.type_id()) {
2398			log_gossip!(logger, "Received message {:?} from {}", message, their_node_id);
2399		} else {
2400			log_trace!(logger, "Received message {:?} from {}", message, their_node_id);
2401		}
2402
2403		let mut should_forward = None;
2404
2405		match message {
2406			// Setup and Control messages:
2407			wire::Message::Init(_) => {
2408				// Handled above
2409			},
2410			wire::Message::GossipTimestampFilter(_) => {
2411				// Handled above
2412			},
2413			wire::Message::Error(msg) => {
2414				log_debug!(
2415					logger,
2416					"Got Err message from {}: {}",
2417					their_node_id,
2418					PrintableString(&msg.data)
2419				);
2420				self.message_handler.chan_handler.handle_error(their_node_id, &msg);
2421				if msg.channel_id.is_zero() {
2422					return Err(PeerHandleError {}.into());
2423				}
2424			},
2425			wire::Message::Warning(msg) => {
2426				log_debug!(
2427					logger,
2428					"Got warning message from {}: {}",
2429					their_node_id,
2430					PrintableString(&msg.data)
2431				);
2432			},
2433
2434			wire::Message::Ping(msg) => {
2435				if msg.ponglen < 65532 {
2436					let resp = msgs::Pong { byteslen: msg.ponglen };
2437					self.enqueue_message(&mut *peer_mutex.lock().unwrap(), &resp);
2438				}
2439			},
2440			wire::Message::Pong(_msg) => {
2441				let mut peer_lock = peer_mutex.lock().unwrap();
2442				peer_lock.awaiting_pong_timer_tick_intervals = 0;
2443				peer_lock.msgs_sent_since_pong = 0;
2444			},
2445
2446			// Channel messages:
2447			wire::Message::StartBatch(_msg) => {
2448				debug_assert!(false);
2449			},
2450			wire::Message::OpenChannel(msg) => {
2451				self.message_handler.chan_handler.handle_open_channel(their_node_id, &msg);
2452			},
2453			wire::Message::OpenChannelV2(_msg) => {
2454				self.message_handler.chan_handler.handle_open_channel_v2(their_node_id, &_msg);
2455			},
2456			wire::Message::AcceptChannel(msg) => {
2457				self.message_handler.chan_handler.handle_accept_channel(their_node_id, &msg);
2458			},
2459			wire::Message::AcceptChannelV2(msg) => {
2460				self.message_handler.chan_handler.handle_accept_channel_v2(their_node_id, &msg);
2461			},
2462
2463			wire::Message::FundingCreated(msg) => {
2464				self.message_handler.chan_handler.handle_funding_created(their_node_id, &msg);
2465			},
2466			wire::Message::FundingSigned(msg) => {
2467				self.message_handler.chan_handler.handle_funding_signed(their_node_id, &msg);
2468			},
2469			wire::Message::ChannelReady(msg) => {
2470				self.message_handler.chan_handler.handle_channel_ready(their_node_id, &msg);
2471			},
2472			wire::Message::PeerStorage(msg) => {
2473				self.message_handler.chan_handler.handle_peer_storage(their_node_id, msg);
2474			},
2475			wire::Message::PeerStorageRetrieval(msg) => {
2476				self.message_handler.chan_handler.handle_peer_storage_retrieval(their_node_id, msg);
2477			},
2478
2479			// Quiescence messages:
2480			wire::Message::Stfu(msg) => {
2481				self.message_handler.chan_handler.handle_stfu(their_node_id, &msg);
2482			},
2483
2484			// Splicing messages:
2485			wire::Message::SpliceInit(msg) => {
2486				self.message_handler.chan_handler.handle_splice_init(their_node_id, &msg);
2487			},
2488			wire::Message::SpliceAck(msg) => {
2489				self.message_handler.chan_handler.handle_splice_ack(their_node_id, &msg);
2490			},
2491			wire::Message::SpliceLocked(msg) => {
2492				self.message_handler.chan_handler.handle_splice_locked(their_node_id, &msg);
2493			},
2494
2495			// Interactive transaction construction messages:
2496			wire::Message::TxAddInput(msg) => {
2497				self.message_handler.chan_handler.handle_tx_add_input(their_node_id, &msg);
2498			},
2499			wire::Message::TxAddOutput(msg) => {
2500				self.message_handler.chan_handler.handle_tx_add_output(their_node_id, &msg);
2501			},
2502			wire::Message::TxRemoveInput(msg) => {
2503				self.message_handler.chan_handler.handle_tx_remove_input(their_node_id, &msg);
2504			},
2505			wire::Message::TxRemoveOutput(msg) => {
2506				self.message_handler.chan_handler.handle_tx_remove_output(their_node_id, &msg);
2507			},
2508			wire::Message::TxComplete(msg) => {
2509				self.message_handler.chan_handler.handle_tx_complete(their_node_id, &msg);
2510			},
2511			wire::Message::TxSignatures(msg) => {
2512				self.message_handler.chan_handler.handle_tx_signatures(their_node_id, &msg);
2513			},
2514			wire::Message::TxInitRbf(msg) => {
2515				self.message_handler.chan_handler.handle_tx_init_rbf(their_node_id, &msg);
2516			},
2517			wire::Message::TxAckRbf(msg) => {
2518				self.message_handler.chan_handler.handle_tx_ack_rbf(their_node_id, &msg);
2519			},
2520			wire::Message::TxAbort(msg) => {
2521				self.message_handler.chan_handler.handle_tx_abort(their_node_id, &msg);
2522			},
2523
2524			wire::Message::Shutdown(msg) => {
2525				self.message_handler.chan_handler.handle_shutdown(their_node_id, &msg);
2526			},
2527			wire::Message::ClosingSigned(msg) => {
2528				self.message_handler.chan_handler.handle_closing_signed(their_node_id, &msg);
2529			},
2530			#[cfg(simple_close)]
2531			wire::Message::ClosingComplete(msg) => {
2532				self.message_handler.chan_handler.handle_closing_complete(their_node_id, msg);
2533			},
2534			#[cfg(simple_close)]
2535			wire::Message::ClosingSig(msg) => {
2536				self.message_handler.chan_handler.handle_closing_sig(their_node_id, msg);
2537			},
2538
2539			// Commitment messages:
2540			wire::Message::UpdateAddHTLC(msg) => {
2541				self.message_handler.chan_handler.handle_update_add_htlc(their_node_id, &msg);
2542			},
2543			wire::Message::UpdateFulfillHTLC(msg) => {
2544				self.message_handler.chan_handler.handle_update_fulfill_htlc(their_node_id, msg);
2545			},
2546			wire::Message::UpdateFailHTLC(msg) => {
2547				self.message_handler.chan_handler.handle_update_fail_htlc(their_node_id, &msg);
2548			},
2549			wire::Message::UpdateFailMalformedHTLC(msg) => {
2550				let chan_handler = &self.message_handler.chan_handler;
2551				chan_handler.handle_update_fail_malformed_htlc(their_node_id, &msg);
2552			},
2553
2554			wire::Message::CommitmentSigned(msg) => {
2555				self.message_handler.chan_handler.handle_commitment_signed(their_node_id, &msg);
2556			},
2557			wire::Message::RevokeAndACK(msg) => {
2558				self.message_handler.chan_handler.handle_revoke_and_ack(their_node_id, &msg);
2559			},
2560			wire::Message::UpdateFee(msg) => {
2561				self.message_handler.chan_handler.handle_update_fee(their_node_id, &msg);
2562			},
2563			wire::Message::ChannelReestablish(msg) => {
2564				self.message_handler.chan_handler.handle_channel_reestablish(their_node_id, &msg);
2565			},
2566
2567			// Routing messages:
2568			wire::Message::AnnouncementSignatures(msg) => {
2569				let chan_handler = &self.message_handler.chan_handler;
2570				chan_handler.handle_announcement_signatures(their_node_id, &msg);
2571			},
2572			wire::Message::ChannelAnnouncement(msg) => {
2573				let route_handler = &self.message_handler.route_handler;
2574				if route_handler
2575					.handle_channel_announcement(Some(their_node_id), &msg)
2576					.map_err(|e| -> MessageHandlingError { e.into() })?
2577				{
2578					should_forward = Some(wire::Message::ChannelAnnouncement(msg));
2579				}
2580				self.update_gossip_backlogged();
2581			},
2582			wire::Message::NodeAnnouncement(msg) => {
2583				let route_handler = &self.message_handler.route_handler;
2584				if route_handler
2585					.handle_node_announcement(Some(their_node_id), &msg)
2586					.map_err(|e| -> MessageHandlingError { e.into() })?
2587				{
2588					should_forward = Some(wire::Message::NodeAnnouncement(msg));
2589				}
2590				self.update_gossip_backlogged();
2591			},
2592			wire::Message::ChannelUpdate(msg) => {
2593				let chan_handler = &self.message_handler.chan_handler;
2594				chan_handler.handle_channel_update(their_node_id, &msg);
2595
2596				let route_handler = &self.message_handler.route_handler;
2597				if route_handler
2598					.handle_channel_update(Some(their_node_id), &msg)
2599					.map_err(|e| -> MessageHandlingError { e.into() })?
2600				{
2601					should_forward = Some(wire::Message::ChannelUpdate(msg));
2602				}
2603				self.update_gossip_backlogged();
2604			},
2605			wire::Message::QueryShortChannelIds(msg) => {
2606				let route_handler = &self.message_handler.route_handler;
2607				route_handler.handle_query_short_channel_ids(their_node_id, msg)?;
2608			},
2609			wire::Message::ReplyShortChannelIdsEnd(msg) => {
2610				let route_handler = &self.message_handler.route_handler;
2611				route_handler.handle_reply_short_channel_ids_end(their_node_id, msg)?;
2612			},
2613			wire::Message::QueryChannelRange(msg) => {
2614				let route_handler = &self.message_handler.route_handler;
2615				route_handler.handle_query_channel_range(their_node_id, msg)?;
2616			},
2617			wire::Message::ReplyChannelRange(msg) => {
2618				let route_handler = &self.message_handler.route_handler;
2619				route_handler.handle_reply_channel_range(their_node_id, msg)?;
2620			},
2621
2622			// Onion message:
2623			wire::Message::OnionMessage(msg) => {
2624				let onion_message_handler = &self.message_handler.onion_message_handler;
2625				onion_message_handler.handle_onion_message(their_node_id, &msg);
2626			},
2627
2628			// Unknown messages:
2629			wire::Message::Unknown(type_id) if message.is_even() => {
2630				log_debug!(
2631					logger,
2632					"Received unknown even message of type {}, disconnecting peer!",
2633					type_id
2634				);
2635				return Err(PeerHandleError {}.into());
2636			},
2637			wire::Message::Unknown(type_id) => {
2638				log_trace!(logger, "Received unknown odd message of type {}, ignoring", type_id);
2639			},
2640			wire::Message::Custom(custom) => {
2641				let custom_message_handler = &self.message_handler.custom_message_handler;
2642				custom_message_handler.handle_custom_message(custom, their_node_id)?;
2643			},
2644		};
2645		Ok(should_forward)
2646	}
2647
2648	/// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
2649	/// excluding `except_node`.
2650	///
2651	/// If the message queue for a peer is somewhat full, the message will not be forwarded to them
2652	/// unless `allow_large_buffer` is set, in which case the message will be treated as critical
2653	/// and delivered no matter the available buffer space.
2654	fn forward_broadcast_msg(
2655		&self, peers: &HashMap<Descriptor, Mutex<Peer>>,
2656		msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
2657		except_node: Option<&PublicKey>, allow_large_buffer: bool,
2658	) {
2659		match msg {
2660			wire::Message::ChannelAnnouncement(ref msg) => {
2661				log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
2662				let encoded_msg = encode_msg!(msg);
2663
2664				for (_, peer_mutex) in peers.iter() {
2665					let mut peer = peer_mutex.lock().unwrap();
2666					if !peer.handshake_complete()
2667						|| !peer.should_forward_channel_announcement(msg.contents.short_channel_id)
2668					{
2669						continue;
2670					}
2671					debug_assert!(peer.their_node_id.is_some());
2672					debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
2673					let their_node_id = peer.their_node_id.map(|p| p.0);
2674					let logger = WithContext::from(&self.logger, their_node_id, None, None);
2675					if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
2676						log_gossip!(
2677							logger,
2678							"Skipping broadcast message to {:?} as its outbound buffer is full",
2679							peer.their_node_id
2680						);
2681						continue;
2682					}
2683					if let Some((_, their_node_id)) = peer.their_node_id {
2684						if their_node_id == msg.contents.node_id_1
2685							|| their_node_id == msg.contents.node_id_2
2686						{
2687							continue;
2688						}
2689					}
2690					if except_node.is_some()
2691						&& peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node
2692					{
2693						continue;
2694					}
2695					let encoded_message = MessageBuf::from_encoded(&encoded_msg);
2696					peer.gossip_broadcast_buffer.push_back(encoded_message);
2697				}
2698			},
2699			wire::Message::NodeAnnouncement(ref msg) => {
2700				log_gossip!(
2701					self.logger,
2702					"Sending message to all peers except {:?} or the announced node: {:?}",
2703					except_node,
2704					msg
2705				);
2706				let encoded_msg = encode_msg!(msg);
2707
2708				for (_, peer_mutex) in peers.iter() {
2709					let mut peer = peer_mutex.lock().unwrap();
2710					if !peer.handshake_complete()
2711						|| !peer.should_forward_node_announcement(msg.contents.node_id)
2712					{
2713						continue;
2714					}
2715					debug_assert!(peer.their_node_id.is_some());
2716					debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
2717					let their_node_id = peer.their_node_id.map(|p| p.0);
2718					let logger = WithContext::from(&self.logger, their_node_id, None, None);
2719					if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
2720						log_gossip!(
2721							logger,
2722							"Skipping broadcast message to {:?} as its outbound buffer is full",
2723							peer.their_node_id
2724						);
2725						continue;
2726					}
2727					if let Some((_, their_node_id)) = peer.their_node_id {
2728						if their_node_id == msg.contents.node_id {
2729							continue;
2730						}
2731					}
2732					if except_node.is_some()
2733						&& peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node
2734					{
2735						continue;
2736					}
2737					let encoded_message = MessageBuf::from_encoded(&encoded_msg);
2738					peer.gossip_broadcast_buffer.push_back(encoded_message);
2739				}
2740			},
2741			wire::Message::ChannelUpdate(ref msg) => {
2742				log_gossip!(
2743					self.logger,
2744					"Sending message to all peers except {:?}: {:?}",
2745					except_node,
2746					msg
2747				);
2748				let encoded_msg = encode_msg!(msg);
2749
2750				for (_, peer_mutex) in peers.iter() {
2751					let mut peer = peer_mutex.lock().unwrap();
2752					if !peer.handshake_complete()
2753						|| !peer.should_forward_channel_announcement(msg.contents.short_channel_id)
2754					{
2755						continue;
2756					}
2757					debug_assert!(peer.their_node_id.is_some());
2758					debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
2759					let their_node_id = peer.their_node_id.map(|p| p.0);
2760					let logger = WithContext::from(&self.logger, their_node_id, None, None);
2761					if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
2762						log_gossip!(
2763							logger,
2764							"Skipping broadcast message to {:?} as its outbound buffer is full",
2765							peer.their_node_id
2766						);
2767						continue;
2768					}
2769					if except_node.is_some()
2770						&& peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node
2771					{
2772						continue;
2773					}
2774					let encoded_message = MessageBuf::from_encoded(&encoded_msg);
2775					peer.gossip_broadcast_buffer.push_back(encoded_message);
2776				}
2777			},
2778			_ => {
2779				debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages")
2780			},
2781		}
2782	}
2783
2784	/// Checks for any events generated by our handlers and processes them. Includes sending most
2785	/// response messages as well as messages generated by calls to handler functions directly (eg
2786	/// functions like [`ChannelManager::process_pending_htlc_forwards`] or [`send_payment`]).
2787	///
2788	/// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy
2789	/// issues!
2790	///
2791	/// This should be called any time we may have messages to send. It is automatically called by
2792	/// [`lightning-net-tokio`] after processing incoming messages, and by
2793	/// [`lightning-background-processor`] when channel state has changed. Therefore, If you are not
2794	/// using both [`lightning-net-tokio`] and [`lightning-background-processor`], you may need to call
2795	/// this function manually to prevent messages from being delayed.
2796	///
2797	/// Note that if there are any other calls to this function waiting on lock(s) this may return
2798	/// without doing any work. All available events that need handling will be handled before the
2799	/// other calls return.
2800	///
2801	/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
2802	/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
2803	/// [`send_data`]: SocketDescriptor::send_data
2804	/// [`lightning-net-tokio`]: https://docs.rs/lightning-net-tokio/latest/lightning_net_tokio
2805	/// [`lightning-background-processor`]: https://docs.rs/lightning-background-processor/latest/lightning_background_processor
2806	pub fn process_events(&self) {
2807		if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 {
2808			// If we're not the first event processor to get here, just return early, the increment
2809			// we just did will be treated as "go around again" at the end.
2810			return;
2811		}
2812
2813		loop {
2814			self.update_gossip_backlogged();
2815			let flush_read_disabled =
2816				self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
2817
2818			let mut peers_to_disconnect = new_hash_map();
2819
2820			{
2821				let peers_lock = self.peers.read().unwrap();
2822
2823				let peers = &*peers_lock;
2824				macro_rules! get_peer_for_forwarding {
2825					($node_id: expr) => {{
2826						if peers_to_disconnect.get($node_id).is_some() {
2827							// If we've "disconnected" this peer, do not send to it.
2828							None
2829						} else {
2830							let descriptor_opt =
2831								self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
2832							match descriptor_opt {
2833								Some(descriptor) => match peers.get(&descriptor) {
2834									Some(peer_mutex) => {
2835										let peer_lock = peer_mutex.lock().unwrap();
2836										if !peer_lock.handshake_complete() {
2837											None
2838										} else {
2839											Some(peer_lock)
2840										}
2841									},
2842									None => {
2843										debug_assert!(false, "Inconsistent peers set state!");
2844										None
2845									},
2846								},
2847								None => None,
2848							}
2849						}
2850					}};
2851				}
2852
2853				let route_handler = &self.message_handler.route_handler;
2854				let chan_handler = &self.message_handler.chan_handler;
2855				let onion_message_handler = &self.message_handler.onion_message_handler;
2856				let custom_message_handler = &self.message_handler.custom_message_handler;
2857				let send_only_message_handler = &self.message_handler.send_only_message_handler;
2858
2859				// Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
2860				// robustly gossip broadcast events even if a peer's message buffer is full.
2861				let mut handle_event = |event, from_chan_handler| {
2862					match event {
2863						MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => {
2864							log_debug!(
2865								WithContext::from(&self.logger, Some(*node_id), None, None),
2866								"Handling SendPeerStorage event in peer_handler for {}",
2867								node_id,
2868							);
2869							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2870						},
2871						MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } => {
2872							log_debug!(
2873								WithContext::from(&self.logger, Some(*node_id), None, None),
2874								"Handling SendPeerStorageRetrieval event in peer_handler for {}",
2875								node_id,
2876							);
2877							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2878						},
2879						MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
2880							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
2881									node_id,
2882									&msg.common_fields.temporary_channel_id);
2883							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2884						},
2885						MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
2886							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
2887									node_id,
2888									&msg.common_fields.temporary_channel_id);
2889							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2890						},
2891						MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
2892							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
2893									node_id,
2894									&msg.common_fields.temporary_channel_id);
2895							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2896						},
2897						MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
2898							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
2899									node_id,
2900									&msg.common_fields.temporary_channel_id);
2901							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2902						},
2903						MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
2904							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
2905									node_id,
2906									&msg.temporary_channel_id,
2907									ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
2908							// TODO: If the peer is gone we should generate a DiscardFunding event
2909							// indicating to the wallet that they should just throw away this funding transaction
2910							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2911						},
2912						MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
2913							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
2914									node_id,
2915									&msg.channel_id);
2916							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2917						},
2918						MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
2919							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
2920									node_id,
2921									&msg.channel_id);
2922							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2923						},
2924						MessageSendEvent::SendStfu { ref node_id, ref msg } => {
2925							let logger = WithContext::from(
2926								&self.logger,
2927								Some(*node_id),
2928								Some(msg.channel_id),
2929								None,
2930							);
2931							log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
2932									node_id,
2933									&msg.channel_id);
2934							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2935						},
2936						MessageSendEvent::SendSpliceInit { ref node_id, ref msg } => {
2937							let logger = WithContext::from(
2938								&self.logger,
2939								Some(*node_id),
2940								Some(msg.channel_id),
2941								None,
2942							);
2943							log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
2944									node_id,
2945									&msg.channel_id);
2946							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2947						},
2948						MessageSendEvent::SendSpliceAck { ref node_id, ref msg } => {
2949							let logger = WithContext::from(
2950								&self.logger,
2951								Some(*node_id),
2952								Some(msg.channel_id),
2953								None,
2954							);
2955							log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
2956									node_id,
2957									&msg.channel_id);
2958							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2959						},
2960						MessageSendEvent::SendSpliceLocked { ref node_id, ref msg } => {
2961							let logger = WithContext::from(
2962								&self.logger,
2963								Some(*node_id),
2964								Some(msg.channel_id),
2965								None,
2966							);
2967							log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
2968									node_id,
2969									&msg.channel_id);
2970							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2971						},
2972						MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
2973							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
2974									node_id,
2975									&msg.channel_id);
2976							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2977						},
2978						MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
2979							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
2980									node_id,
2981									&msg.channel_id);
2982							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2983						},
2984						MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
2985							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
2986									node_id,
2987									&msg.channel_id);
2988							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2989						},
2990						MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
2991							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
2992									node_id,
2993									&msg.channel_id);
2994							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2995						},
2996						MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
2997							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
2998									node_id,
2999									&msg.channel_id);
3000							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3001						},
3002						MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
3003							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
3004									node_id,
3005									&msg.channel_id);
3006							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3007						},
3008						MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
3009							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
3010									node_id,
3011									&msg.channel_id);
3012							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3013						},
3014						MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
3015							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
3016									node_id,
3017									&msg.channel_id);
3018							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3019						},
3020						MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
3021							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
3022									node_id,
3023									&msg.channel_id);
3024							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3025						},
3026						MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
3027							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
3028									node_id,
3029									&msg.channel_id);
3030							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3031						},
3032						MessageSendEvent::UpdateHTLCs {
3033							ref node_id,
3034							ref channel_id,
3035							updates:
3036								msgs::CommitmentUpdate {
3037									ref update_add_htlcs,
3038									ref update_fulfill_htlcs,
3039									ref update_fail_htlcs,
3040									ref update_fail_malformed_htlcs,
3041									ref update_fee,
3042									ref commitment_signed,
3043								},
3044						} => {
3045							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(*channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails, {} commits for channel {}",
3046									node_id,
3047									update_add_htlcs.len(),
3048									update_fulfill_htlcs.len(),
3049									update_fail_htlcs.len(),
3050									commitment_signed.len(),
3051									channel_id);
3052							let mut peer = get_peer_for_forwarding!(node_id)?;
3053							for msg in update_fulfill_htlcs {
3054								self.enqueue_message(&mut *peer, msg);
3055							}
3056							for msg in update_fail_htlcs {
3057								self.enqueue_message(&mut *peer, msg);
3058							}
3059							for msg in update_fail_malformed_htlcs {
3060								self.enqueue_message(&mut *peer, msg);
3061							}
3062							for msg in update_add_htlcs {
3063								self.enqueue_message(&mut *peer, msg);
3064							}
3065							if let &Some(ref msg) = update_fee {
3066								self.enqueue_message(&mut *peer, msg);
3067							}
3068							if commitment_signed.len() > 1 {
3069								let msg = msgs::StartBatch {
3070									channel_id: *channel_id,
3071									batch_size: commitment_signed.len() as u16,
3072									message_type: Some(msgs::CommitmentSigned::TYPE),
3073								};
3074								self.enqueue_message(&mut *peer, &msg);
3075							}
3076							for msg in commitment_signed {
3077								self.enqueue_message(&mut *peer, msg);
3078							}
3079						},
3080						MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
3081							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
3082									node_id,
3083									&msg.channel_id);
3084							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3085						},
3086						MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
3087							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
3088									node_id,
3089									&msg.channel_id);
3090							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3091						},
3092						MessageSendEvent::SendClosingComplete { ref node_id, ref msg } => {
3093							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingComplete event in peer_handler for node {} for channel {}",
3094									node_id,
3095									&msg.channel_id);
3096							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3097						},
3098						MessageSendEvent::SendClosingSig { ref node_id, ref msg } => {
3099							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSig event in peer_handler for node {} for channel {}",
3100									node_id,
3101									&msg.channel_id);
3102							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3103						},
3104						MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
3105							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
3106									node_id,
3107									&msg.channel_id);
3108							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3109						},
3110						MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
3111							log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
3112									node_id,
3113									&msg.channel_id);
3114							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3115						},
3116						MessageSendEvent::SendChannelAnnouncement {
3117							ref node_id,
3118							ref msg,
3119							ref update_msg,
3120						} => {
3121							log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
3122									node_id,
3123									msg.contents.short_channel_id);
3124							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3125							self.enqueue_message(
3126								&mut *get_peer_for_forwarding!(node_id)?,
3127								update_msg,
3128							);
3129						},
3130						MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
3131							log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
3132							match route_handler.handle_channel_announcement(None, &msg) {
3133								Ok(_)
3134								| Err(LightningError {
3135									action: msgs::ErrorAction::IgnoreDuplicateGossip,
3136									..
3137								}) => {
3138									let forward = wire::Message::ChannelAnnouncement(msg);
3139									self.forward_broadcast_msg(
3140										peers,
3141										&forward,
3142										None,
3143										from_chan_handler,
3144									);
3145								},
3146								_ => {},
3147							}
3148							if let Some(msg) = update_msg {
3149								match route_handler.handle_channel_update(None, &msg) {
3150									Ok(_)
3151									| Err(LightningError {
3152										action: msgs::ErrorAction::IgnoreDuplicateGossip,
3153										..
3154									}) => {
3155										let forward = wire::Message::ChannelUpdate(msg);
3156										self.forward_broadcast_msg(
3157											peers,
3158											&forward,
3159											None,
3160											from_chan_handler,
3161										);
3162									},
3163									_ => {},
3164								}
3165							}
3166						},
3167						MessageSendEvent::BroadcastChannelUpdate { msg } => {
3168							log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents);
3169							match route_handler.handle_channel_update(None, &msg) {
3170								Ok(_)
3171								| Err(LightningError {
3172									action: msgs::ErrorAction::IgnoreDuplicateGossip,
3173									..
3174								}) => {
3175									let forward = wire::Message::ChannelUpdate(msg);
3176									self.forward_broadcast_msg(
3177										peers,
3178										&forward,
3179										None,
3180										from_chan_handler,
3181									);
3182								},
3183								_ => {},
3184							}
3185						},
3186						MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
3187							log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
3188							match route_handler.handle_node_announcement(None, &msg) {
3189								Ok(_)
3190								| Err(LightningError {
3191									action: msgs::ErrorAction::IgnoreDuplicateGossip,
3192									..
3193								}) => {
3194									let forward = wire::Message::NodeAnnouncement(msg);
3195									self.forward_broadcast_msg(
3196										peers,
3197										&forward,
3198										None,
3199										from_chan_handler,
3200									);
3201								},
3202								_ => {},
3203							}
3204						},
3205						MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
3206							log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
3207									node_id, msg.contents.short_channel_id);
3208							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3209						},
3210						MessageSendEvent::HandleError { node_id, action } => {
3211							let logger = WithContext::from(&self.logger, Some(node_id), None, None);
3212							match action {
3213								msgs::ErrorAction::DisconnectPeer { msg } => {
3214									if let Some(msg) = msg.as_ref() {
3215										log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
3216											node_id, msg.data);
3217									} else {
3218										log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}",
3219											node_id);
3220									}
3221									// We do not have the peers write lock, so we just store that we're
3222									// about to disconnect the peer and do it after we finish
3223									// processing most messages.
3224									let msg = msg.map(|msg| {
3225										wire::Message::<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)
3226									});
3227									peers_to_disconnect.insert(node_id, msg);
3228								},
3229								msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
3230									log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
3231										node_id, msg.data);
3232									// We do not have the peers write lock, so we just store that we're
3233									// about to disconnect the peer and do it after we finish
3234									// processing most messages.
3235									peers_to_disconnect
3236										.insert(node_id, Some(wire::Message::Warning(msg)));
3237								},
3238								msgs::ErrorAction::IgnoreAndLog(level) => {
3239									log_given_level!(
3240										logger,
3241										level,
3242										"Received a HandleError event to be ignored for node {}",
3243										node_id,
3244									);
3245								},
3246								msgs::ErrorAction::IgnoreDuplicateGossip => {},
3247								msgs::ErrorAction::IgnoreError => {
3248									log_debug!(
3249										logger,
3250										"Received a HandleError event to be ignored for node {}",
3251										node_id,
3252									);
3253								},
3254								msgs::ErrorAction::SendErrorMessage { ref msg } => {
3255									log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
3256											node_id,
3257											msg.data);
3258									self.enqueue_message(
3259										&mut *get_peer_for_forwarding!(&node_id)?,
3260										msg,
3261									);
3262								},
3263								msgs::ErrorAction::SendWarningMessage {
3264									ref msg,
3265									ref log_level,
3266								} => {
3267									log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
3268											node_id,
3269											msg.data);
3270									self.enqueue_message(
3271										&mut *get_peer_for_forwarding!(&node_id)?,
3272										msg,
3273									);
3274								},
3275							}
3276						},
3277						MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
3278							log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelRangeQuery event in peer_handler for node {} with first_blocknum={}, number_of_blocks={}",
3279								node_id,
3280								msg.first_blocknum,
3281								msg.number_of_blocks);
3282							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3283						},
3284						MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
3285							log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendShortIdsQuery event in peer_handler for node {} with num_scids={}",
3286								node_id,
3287								msg.short_channel_ids.len());
3288							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3289						},
3290						MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
3291							log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
3292								node_id,
3293								msg.short_channel_ids.len(),
3294								msg.first_blocknum,
3295								msg.number_of_blocks,
3296								msg.sync_complete);
3297							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3298						},
3299						MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
3300							log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendGossipTimestampFilter event in peer_handler for node {} with first_timestamp={}, timestamp_range={}",
3301								node_id,
3302								msg.first_timestamp,
3303								msg.timestamp_range);
3304							self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
3305						},
3306					}
3307					Some(())
3308				};
3309
3310				let chan_events = chan_handler.get_and_clear_pending_msg_events();
3311				for event in chan_events {
3312					handle_event(event, true);
3313				}
3314
3315				let route_events = route_handler.get_and_clear_pending_msg_events();
3316				for event in route_events {
3317					handle_event(event, false);
3318				}
3319
3320				let send_only_events = send_only_message_handler.get_and_clear_pending_msg_events();
3321				for event in send_only_events {
3322					handle_event(event, false);
3323				}
3324
3325				let onion_msg_events = onion_message_handler.get_and_clear_pending_msg_events();
3326				for event in onion_msg_events {
3327					handle_event(event, false);
3328				}
3329
3330				for (node_id, msg) in custom_message_handler.get_and_clear_pending_msg() {
3331					if peers_to_disconnect.get(&node_id).is_some() {
3332						continue;
3333					}
3334					let mut peer = if let Some(peer) = get_peer_for_forwarding!(&node_id) {
3335						peer
3336					} else {
3337						continue;
3338					};
3339					self.enqueue_message(&mut peer, &msg);
3340				}
3341
3342				for (descriptor, peer_mutex) in peers.iter() {
3343					let mut peer = peer_mutex.lock().unwrap();
3344					if flush_read_disabled {
3345						peer.received_channel_announce_since_backlogged = false;
3346					}
3347					self.do_attempt_write_data(
3348						&mut (*descriptor).clone(),
3349						&mut *peer,
3350						flush_read_disabled,
3351					);
3352				}
3353			}
3354			if !peers_to_disconnect.is_empty() {
3355				let mut peers_lock = self.peers.write().unwrap();
3356				let peers = &mut *peers_lock;
3357				for (node_id, msg) in peers_to_disconnect.drain() {
3358					// Note that since we are holding the peers *write* lock we can
3359					// remove from node_id_to_descriptor immediately (as no other
3360					// thread can be holding the peer lock if we have the global write
3361					// lock).
3362
3363					let descriptor_opt =
3364						self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
3365					if let Some(mut descriptor) = descriptor_opt {
3366						if let Some(peer_mutex) = peers.remove(&descriptor) {
3367							let mut peer = peer_mutex.lock().unwrap();
3368							if let Some(msg) = msg {
3369								self.enqueue_message(&mut *peer, &msg);
3370								// This isn't guaranteed to work, but if there is enough free
3371								// room in the send buffer, put the error message there...
3372								self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
3373							}
3374							self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
3375						} else {
3376							debug_assert!(false, "Missing connection for peer");
3377						}
3378					}
3379				}
3380			}
3381
3382			if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 {
3383				// If another thread incremented the state while we were running we should go
3384				// around again, but only once.
3385				self.event_processing_state.store(1, Ordering::Release);
3386				continue;
3387			}
3388			break;
3389		}
3390	}
3391
3392	/// Indicates that the given socket descriptor's connection is now closed.
3393	pub fn socket_disconnected(&self, descriptor: &Descriptor) {
3394		self.disconnect_event_internal(descriptor, "the socket was disconnected");
3395	}
3396
3397	fn do_disconnect(&self, mut descriptor: Descriptor, peer: &Peer, reason: &'static str) {
3398		if !peer.handshake_complete() {
3399			log_trace!(
3400				self.logger,
3401				"Disconnecting peer which hasn't completed handshake due to {}",
3402				reason
3403			);
3404			descriptor.disconnect_socket();
3405			return;
3406		}
3407
3408		debug_assert!(peer.their_node_id.is_some());
3409		if let Some((node_id, _)) = peer.their_node_id {
3410			log_trace!(
3411				WithContext::from(&self.logger, Some(node_id), None, None),
3412				"Disconnecting peer with id {} due to {}",
3413				node_id,
3414				reason
3415			);
3416			self.message_handler.route_handler.peer_disconnected(node_id);
3417			self.message_handler.chan_handler.peer_disconnected(node_id);
3418			self.message_handler.onion_message_handler.peer_disconnected(node_id);
3419			self.message_handler.custom_message_handler.peer_disconnected(node_id);
3420			self.message_handler.send_only_message_handler.peer_disconnected(node_id);
3421		}
3422		descriptor.disconnect_socket();
3423	}
3424
3425	fn disconnect_event_internal(&self, descriptor: &Descriptor, reason: &'static str) {
3426		let mut peers = self.peers.write().unwrap();
3427		let peer_option = peers.remove(descriptor);
3428		match peer_option {
3429			None => {
3430				// This is most likely a simple race condition where the user found that the socket
3431				// was disconnected, then we told the user to `disconnect_socket()`, then they
3432				// called this method. Either way we're disconnected, return.
3433			},
3434			Some(peer_lock) => {
3435				let peer = peer_lock.lock().unwrap();
3436				if let Some((node_id, _)) = peer.their_node_id {
3437					let logger = WithContext::from(&self.logger, Some(node_id), None, None);
3438					log_trace!(
3439						logger,
3440						"Handling disconnection of peer {} because {}",
3441						node_id,
3442						reason
3443					);
3444					let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
3445					debug_assert!(removed.is_some(), "descriptor maps should be consistent");
3446					if !peer.handshake_complete() {
3447						return;
3448					}
3449					self.message_handler.route_handler.peer_disconnected(node_id);
3450					self.message_handler.chan_handler.peer_disconnected(node_id);
3451					self.message_handler.onion_message_handler.peer_disconnected(node_id);
3452					self.message_handler.custom_message_handler.peer_disconnected(node_id);
3453					self.message_handler.send_only_message_handler.peer_disconnected(node_id);
3454				}
3455			},
3456		};
3457	}
3458
3459	/// Disconnect a peer given its node id.
3460	///
3461	/// If a peer is connected, this will call [`disconnect_socket`] on the descriptor for the
3462	/// peer. Thus, be very careful about reentrancy issues.
3463	///
3464	/// [`disconnect_socket`]: SocketDescriptor::disconnect_socket
3465	pub fn disconnect_by_node_id(&self, node_id: PublicKey) {
3466		let mut peers_lock = self.peers.write().unwrap();
3467		if let Some(descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
3468			let peer_opt = peers_lock.remove(&descriptor);
3469			if let Some(peer_mutex) = peer_opt {
3470				self.do_disconnect(descriptor, &*peer_mutex.lock().unwrap(), "client request");
3471			} else {
3472				debug_assert!(false, "node_id_to_descriptor thought we had a peer");
3473			}
3474		}
3475	}
3476
3477	/// Disconnects all currently-connected peers. This is useful on platforms where there may be
3478	/// an indication that TCP sockets have stalled even if we weren't around to time them out
3479	/// using regular ping/pongs.
3480	pub fn disconnect_all_peers(&self) {
3481		let mut peers_lock = self.peers.write().unwrap();
3482		self.node_id_to_descriptor.lock().unwrap().clear();
3483		let peers = &mut *peers_lock;
3484		for (descriptor, peer_mutex) in peers.drain() {
3485			self.do_disconnect(
3486				descriptor,
3487				&*peer_mutex.lock().unwrap(),
3488				"client request to disconnect all peers",
3489			);
3490		}
3491	}
3492
3493	/// This is called when we're blocked on sending additional gossip messages until we receive a
3494	/// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
3495	/// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
3496	fn maybe_send_extra_ping(&self, peer: &mut Peer) {
3497		if peer.awaiting_pong_timer_tick_intervals == 0 {
3498			peer.awaiting_pong_timer_tick_intervals = -1;
3499			let ping = msgs::Ping { ponglen: 0, byteslen: 64 };
3500			self.enqueue_message(peer, &ping);
3501		}
3502	}
3503
3504	/// Send pings to each peer and disconnect those which did not respond to the last round of
3505	/// pings.
3506	///
3507	/// This may be called on any timescale you want, however, roughly once every ten seconds is
3508	/// preferred. The call rate determines both how often we send a ping to our peers and how much
3509	/// time they have to respond before we disconnect them.
3510	///
3511	/// May call [`send_data`] on all [`SocketDescriptor`]s. Thus, be very careful with reentrancy
3512	/// issues!
3513	///
3514	/// [`send_data`]: SocketDescriptor::send_data
3515	pub fn timer_tick_occurred(&self) {
3516		let mut descriptors_needing_disconnect = Vec::new();
3517		{
3518			let peers_lock = self.peers.read().unwrap();
3519
3520			self.update_gossip_backlogged();
3521			let flush_read_disabled =
3522				self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
3523
3524			for (descriptor, peer_mutex) in peers_lock.iter() {
3525				let mut peer = peer_mutex.lock().unwrap();
3526				if flush_read_disabled {
3527					peer.received_channel_announce_since_backlogged = false;
3528				}
3529
3530				if !peer.handshake_complete() {
3531					// The peer needs to complete its handshake before we can exchange messages. We
3532					// give peers one timer tick to complete handshake, reusing
3533					// `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
3534					// for handshake completion.
3535					if peer.awaiting_pong_timer_tick_intervals != 0 {
3536						descriptors_needing_disconnect.push(descriptor.clone());
3537					} else {
3538						peer.awaiting_pong_timer_tick_intervals = 1;
3539					}
3540					continue;
3541				}
3542				debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
3543				debug_assert!(peer.their_node_id.is_some());
3544
3545				// We use a loop as a `goto` to skip writing the Ping message:
3546				loop {
3547					if peer.awaiting_pong_timer_tick_intervals == -1 {
3548						// Magic value set in `maybe_send_extra_ping`.
3549						peer.awaiting_pong_timer_tick_intervals = 1;
3550						peer.received_message_since_timer_tick = false;
3551						break;
3552					}
3553					let not_recently_active = peer.awaiting_pong_timer_tick_intervals > 0
3554						&& !peer.received_message_since_timer_tick;
3555					let reached_threshold_intervals = peer.awaiting_pong_timer_tick_intervals
3556						as u64
3557						> MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64;
3558					if not_recently_active || reached_threshold_intervals {
3559						descriptors_needing_disconnect.push(descriptor.clone());
3560						break;
3561					}
3562					peer.received_message_since_timer_tick = false;
3563
3564					if peer.awaiting_pong_timer_tick_intervals > 0 {
3565						peer.awaiting_pong_timer_tick_intervals += 1;
3566						break;
3567					}
3568
3569					peer.awaiting_pong_timer_tick_intervals = 1;
3570					let ping = msgs::Ping { ponglen: 0, byteslen: 64 };
3571					self.enqueue_message(&mut *peer, &ping);
3572					break;
3573				}
3574				self.do_attempt_write_data(
3575					&mut (descriptor.clone()),
3576					&mut *peer,
3577					flush_read_disabled,
3578				);
3579			}
3580		}
3581
3582		if !descriptors_needing_disconnect.is_empty() {
3583			{
3584				let mut peers_lock = self.peers.write().unwrap();
3585				for descriptor in descriptors_needing_disconnect {
3586					if let Some(peer_mutex) = peers_lock.remove(&descriptor) {
3587						let peer = peer_mutex.lock().unwrap();
3588						if let Some((node_id, _)) = peer.their_node_id {
3589							self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
3590						}
3591						self.do_disconnect(descriptor, &*peer, "ping/handshake timeout");
3592					}
3593				}
3594			}
3595		}
3596	}
3597
3598	#[allow(dead_code)]
3599	// Messages of up to 64KB should never end up more than half full with addresses, as that would
3600	// be absurd. We ensure this by checking that at least 100 (our stated public contract on when
3601	// broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
3602	// message...
3603	const HALF_MESSAGE_IS_ADDRS: u32 =
3604		::core::u16::MAX as u32 / (SocketAddress::MAX_LEN as u32 + 1) / 2;
3605	#[allow(dead_code)]
3606	// ...by failing to compile if the number of addresses that would be half of a message is
3607	// smaller than 100:
3608	const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 100;
3609
3610	/// Generates a signed node_announcement from the given arguments, sending it to all connected
3611	/// peers. Note that peers will likely ignore this message unless we have at least one public
3612	/// channel which has at least six confirmations on-chain.
3613	///
3614	/// `rgb` is a node "color" and `alias` is a printable human-readable string to describe this
3615	/// node to humans. They carry no in-protocol meaning.
3616	///
3617	/// `addresses` represent the set (possibly empty) of socket addresses on which this node
3618	/// accepts incoming connections. These will be included in the node_announcement, publicly
3619	/// tying these addresses together and to this node. If you wish to preserve user privacy,
3620	/// addresses should likely contain only Tor Onion addresses.
3621	///
3622	/// Panics if `addresses` is absurdly large (more than 100).
3623	///
3624	/// [`get_and_clear_pending_msg_events`]: BaseMessageHandler::get_and_clear_pending_msg_events
3625	pub fn broadcast_node_announcement(
3626		&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec<SocketAddress>,
3627	) {
3628		if addresses.len() > 100 {
3629			panic!("More than half the message size was taken up by public addresses!");
3630		}
3631
3632		// While all existing nodes handle unsorted addresses just fine, the spec requires that
3633		// addresses be sorted for future compatibility.
3634		addresses.sort_by_key(|addr| addr.get_id());
3635
3636		let features = self.message_handler.chan_handler.provided_node_features()
3637			| self.message_handler.route_handler.provided_node_features()
3638			| self.message_handler.onion_message_handler.provided_node_features()
3639			| self.message_handler.custom_message_handler.provided_node_features()
3640			| self.message_handler.send_only_message_handler.provided_node_features();
3641		let announcement = msgs::UnsignedNodeAnnouncement {
3642			features,
3643			timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel),
3644			node_id: NodeId::from_pubkey(&self.node_signer.get_node_id(Recipient::Node).unwrap()),
3645			rgb,
3646			alias: NodeAlias(alias),
3647			addresses,
3648			excess_address_data: Vec::new(),
3649			excess_data: Vec::new(),
3650		};
3651		let node_announce_sig = match self
3652			.node_signer
3653			.sign_gossip_message(msgs::UnsignedGossipMessage::NodeAnnouncement(&announcement))
3654		{
3655			Ok(sig) => sig,
3656			Err(_) => {
3657				log_error!(self.logger, "Failed to generate signature for node_announcement");
3658				return;
3659			},
3660		};
3661
3662		let msg = msgs::NodeAnnouncement { signature: node_announce_sig, contents: announcement };
3663
3664		log_debug!(
3665			self.logger,
3666			"Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler."
3667		);
3668		let _ = self.message_handler.route_handler.handle_node_announcement(None, &msg);
3669		self.forward_broadcast_msg(
3670			&*self.peers.read().unwrap(),
3671			&wire::Message::NodeAnnouncement(msg),
3672			None,
3673			true,
3674		);
3675	}
3676}
3677
3678fn is_gossip_msg(type_id: u16) -> bool {
3679	match type_id {
3680		msgs::ChannelAnnouncement::TYPE
3681		| msgs::ChannelUpdate::TYPE
3682		| msgs::NodeAnnouncement::TYPE
3683		| msgs::QueryChannelRange::TYPE
3684		| msgs::ReplyChannelRange::TYPE
3685		| msgs::QueryShortChannelIds::TYPE
3686		| msgs::ReplyShortChannelIdsEnd::TYPE => true,
3687		_ => false,
3688	}
3689}
3690
3691#[cfg(test)]
3692mod tests {
3693	use super::*;
3694
3695	use crate::io;
3696	use crate::ln::msgs::{Init, LightningError, SocketAddress};
3697	use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
3698	use crate::ln::types::ChannelId;
3699	use crate::ln::{msgs, wire};
3700	use crate::sign::{NodeSigner, Recipient};
3701	use crate::types::features::{InitFeatures, NodeFeatures};
3702	use crate::util::test_utils;
3703
3704	use bitcoin::constants::ChainHash;
3705	use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
3706	use bitcoin::Network;
3707
3708	use crate::sync::{Arc, Mutex};
3709	use core::convert::Infallible;
3710	use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3711
3712	#[allow(unused_imports)]
3713	use crate::prelude::*;
3714
3715	#[derive(Clone)]
3716	struct FileDescriptor {
3717		fd: u16,
3718		hang_writes: Arc<AtomicBool>,
3719		outbound_data: Arc<Mutex<Vec<u8>>>,
3720		disconnect: Arc<AtomicBool>,
3721	}
3722	impl PartialEq for FileDescriptor {
3723		fn eq(&self, other: &Self) -> bool {
3724			self.fd == other.fd
3725		}
3726	}
3727	impl Eq for FileDescriptor {}
3728	impl core::hash::Hash for FileDescriptor {
3729		fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
3730			self.fd.hash(hasher)
3731		}
3732	}
3733
3734	impl SocketDescriptor for FileDescriptor {
3735		fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize {
3736			if self.hang_writes.load(Ordering::Acquire) {
3737				0
3738			} else {
3739				self.outbound_data.lock().unwrap().extend_from_slice(data);
3740				data.len()
3741			}
3742		}
3743
3744		fn disconnect_socket(&mut self) {
3745			self.disconnect.store(true, Ordering::Release);
3746		}
3747	}
3748
3749	impl FileDescriptor {
3750		fn new(fd: u16) -> Self {
3751			Self {
3752				fd,
3753				hang_writes: Arc::new(AtomicBool::new(false)),
3754				outbound_data: Arc::new(Mutex::new(Vec::new())),
3755				disconnect: Arc::new(AtomicBool::new(false)),
3756			}
3757		}
3758	}
3759
3760	struct PeerManagerCfg {
3761		chan_handler: test_utils::TestChannelMessageHandler,
3762		routing_handler: test_utils::TestRoutingMessageHandler,
3763		custom_handler: TestCustomMessageHandler,
3764		send_only_handler: TestBaseMsgHandler,
3765		logger: test_utils::TestLogger,
3766		node_signer: test_utils::TestNodeSigner,
3767	}
3768
3769	struct TestCustomMessageHandler {
3770		features: InitFeatures,
3771		conn_tracker: test_utils::ConnectionTracker,
3772	}
3773
3774	impl TestCustomMessageHandler {
3775		fn new(features: InitFeatures) -> Self {
3776			Self { features, conn_tracker: test_utils::ConnectionTracker::new() }
3777		}
3778	}
3779
3780	impl wire::CustomMessageReader for TestCustomMessageHandler {
3781		type CustomMessage = Infallible;
3782		fn read<R: io::Read>(
3783			&self, _: u16, _: &mut R,
3784		) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
3785			Ok(None)
3786		}
3787	}
3788
3789	impl CustomMessageHandler for TestCustomMessageHandler {
3790		fn handle_custom_message(&self, _: Infallible, _: PublicKey) -> Result<(), LightningError> {
3791			unreachable!();
3792		}
3793
3794		fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
3795			Vec::new()
3796		}
3797
3798		fn peer_disconnected(&self, their_node_id: PublicKey) {
3799			self.conn_tracker.peer_disconnected(their_node_id);
3800		}
3801
3802		fn peer_connected(
3803			&self, their_node_id: PublicKey, _msg: &Init, _inbound: bool,
3804		) -> Result<(), ()> {
3805			self.conn_tracker.peer_connected(their_node_id)
3806		}
3807
3808		fn provided_node_features(&self) -> NodeFeatures {
3809			NodeFeatures::empty()
3810		}
3811
3812		fn provided_init_features(&self, _: PublicKey) -> InitFeatures {
3813			self.features.clone()
3814		}
3815	}
3816
3817	struct TestBaseMsgHandler(test_utils::ConnectionTracker);
3818
3819	impl BaseMessageHandler for TestBaseMsgHandler {
3820		fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
3821			Vec::new()
3822		}
3823
3824		fn peer_disconnected(&self, their_node_id: PublicKey) {
3825			self.0.peer_disconnected(their_node_id);
3826		}
3827
3828		fn peer_connected(
3829			&self, their_node_id: PublicKey, _msg: &Init, _inbound: bool,
3830		) -> Result<(), ()> {
3831			self.0.peer_connected(their_node_id)
3832		}
3833
3834		fn provided_node_features(&self) -> NodeFeatures {
3835			NodeFeatures::empty()
3836		}
3837
3838		fn provided_init_features(&self, _: PublicKey) -> InitFeatures {
3839			InitFeatures::empty()
3840		}
3841	}
3842
3843	impl SendOnlyMessageHandler for TestBaseMsgHandler {}
3844
3845	fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
3846		let mut cfgs = Vec::new();
3847		for i in 0..peer_count {
3848			let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
3849			let features = {
3850				let mut feature_bits = vec![0u8; 33];
3851				feature_bits[32] = 0b00000001;
3852				InitFeatures::from_le_bytes(feature_bits)
3853			};
3854			cfgs.push(PeerManagerCfg {
3855				chan_handler: test_utils::TestChannelMessageHandler::new(
3856					ChainHash::using_genesis_block(Network::Testnet),
3857				),
3858				logger: test_utils::TestLogger::with_id(i.to_string()),
3859				routing_handler: test_utils::TestRoutingMessageHandler::new(),
3860				custom_handler: TestCustomMessageHandler::new(features),
3861				send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()),
3862				node_signer: test_utils::TestNodeSigner::new(node_secret),
3863			});
3864		}
3865
3866		cfgs
3867	}
3868
3869	fn create_feature_incompatible_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
3870		let mut cfgs = Vec::new();
3871		for i in 0..peer_count {
3872			let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
3873			let features = {
3874				let mut feature_bits = vec![0u8; 33 + i + 1];
3875				feature_bits[33 + i] = 0b00000001;
3876				InitFeatures::from_le_bytes(feature_bits)
3877			};
3878			cfgs.push(PeerManagerCfg {
3879				chan_handler: test_utils::TestChannelMessageHandler::new(
3880					ChainHash::using_genesis_block(Network::Testnet),
3881				),
3882				logger: test_utils::TestLogger::new(),
3883				routing_handler: test_utils::TestRoutingMessageHandler::new(),
3884				custom_handler: TestCustomMessageHandler::new(features),
3885				send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()),
3886				node_signer: test_utils::TestNodeSigner::new(node_secret),
3887			});
3888		}
3889
3890		cfgs
3891	}
3892
3893	fn create_chain_incompatible_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
3894		let mut cfgs = Vec::new();
3895		for i in 0..peer_count {
3896			let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
3897			let features = InitFeatures::from_le_bytes(vec![0u8; 33]);
3898			let network = ChainHash::from(&[i as u8; 32]);
3899			cfgs.push(PeerManagerCfg {
3900				chan_handler: test_utils::TestChannelMessageHandler::new(network),
3901				logger: test_utils::TestLogger::new(),
3902				routing_handler: test_utils::TestRoutingMessageHandler::new(),
3903				custom_handler: TestCustomMessageHandler::new(features),
3904				send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()),
3905				node_signer: test_utils::TestNodeSigner::new(node_secret),
3906			});
3907		}
3908
3909		cfgs
3910	}
3911
3912	fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<TestPeer<'a>> {
3913		let mut peers = Vec::new();
3914		for i in 0..peer_count {
3915			let ephemeral_bytes = [i as u8; 32];
3916			let msg_handler = MessageHandler {
3917				chan_handler: &cfgs[i].chan_handler,
3918				route_handler: &cfgs[i].routing_handler,
3919				onion_message_handler: IgnoringMessageHandler {},
3920				custom_message_handler: &cfgs[i].custom_handler,
3921				send_only_message_handler: &cfgs[i].send_only_handler,
3922			};
3923			let peer = PeerManager::new(
3924				msg_handler,
3925				0,
3926				&ephemeral_bytes,
3927				&cfgs[i].logger,
3928				&cfgs[i].node_signer,
3929			);
3930			peers.push(peer);
3931		}
3932
3933		peers
3934	}
3935
3936	type TestPeer<'a> = PeerManager<
3937		FileDescriptor,
3938		&'a test_utils::TestChannelMessageHandler,
3939		&'a test_utils::TestRoutingMessageHandler,
3940		IgnoringMessageHandler,
3941		&'a test_utils::TestLogger,
3942		&'a TestCustomMessageHandler,
3943		&'a test_utils::TestNodeSigner,
3944		&'a TestBaseMsgHandler,
3945	>;
3946
3947	fn try_establish_connection<'a>(
3948		peer_a: &TestPeer<'a>, peer_b: &TestPeer<'a>,
3949	) -> (FileDescriptor, FileDescriptor, Result<(), PeerHandleError>, Result<(), PeerHandleError>)
3950	{
3951		let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
3952		let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
3953
3954		static FD_COUNTER: AtomicUsize = AtomicUsize::new(0);
3955		let fd = FD_COUNTER.fetch_add(1, Ordering::Relaxed) as u16;
3956
3957		let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
3958		let mut fd_a = FileDescriptor::new(fd);
3959		let mut fd_b = FileDescriptor::new(fd);
3960
3961		let initial_data =
3962			peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
3963		peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
3964		peer_a.read_event(&mut fd_a, &initial_data).unwrap();
3965		peer_a.process_events();
3966
3967		let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
3968		peer_b.read_event(&mut fd_b, &a_data).unwrap();
3969
3970		peer_b.process_events();
3971		let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
3972		let a_refused = peer_a.read_event(&mut fd_a, &b_data);
3973
3974		peer_a.process_events();
3975		let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
3976		let b_refused = peer_b.read_event(&mut fd_b, &a_data);
3977
3978		(fd_a, fd_b, a_refused, b_refused)
3979	}
3980
3981	fn establish_connection<'a>(
3982		peer_a: &TestPeer<'a>, peer_b: &TestPeer<'a>,
3983	) -> (FileDescriptor, FileDescriptor) {
3984		let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
3985		let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
3986
3987		let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
3988		let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
3989
3990		let features_a = peer_a.init_features(id_b);
3991		let features_b = peer_b.init_features(id_a);
3992
3993		let (fd_a, fd_b, a_refused, b_refused) = try_establish_connection(peer_a, peer_b);
3994
3995		a_refused.unwrap();
3996		b_refused.unwrap();
3997
3998		assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().counterparty_node_id, id_b);
3999		assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().socket_address, Some(addr_b));
4000		assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().init_features, features_b);
4001		assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().counterparty_node_id, id_a);
4002		assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().socket_address, Some(addr_a));
4003		assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().init_features, features_a);
4004		(fd_a.clone(), fd_b.clone())
4005	}
4006
4007	#[test]
4008	#[cfg(feature = "std")]
4009	fn fuzz_threaded_connections() {
4010		// Spawn two threads which repeatedly connect two peers together, leading to "got second
4011		// connection with peer" disconnections and rapid reconnect. This previously found an issue
4012		// with our internal map consistency, and is a generally good smoke test of disconnection.
4013		let cfgs = Arc::new(create_peermgr_cfgs(2));
4014		// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
4015		let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));
4016
4017		let start_time = std::time::Instant::now();
4018		macro_rules! spawn_thread {
4019			($id: expr) => {{
4020				let peers = Arc::clone(&peers);
4021				let cfgs = Arc::clone(&cfgs);
4022				std::thread::spawn(move || {
4023					let mut ctr = 0;
4024					while start_time.elapsed() < std::time::Duration::from_secs(1) {
4025						let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
4026						let mut fd_a = FileDescriptor::new($id + ctr * 3);
4027						let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
4028						let mut fd_b = FileDescriptor::new($id + ctr * 3);
4029						let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
4030						let initial_data = peers[1]
4031							.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone()))
4032							.unwrap();
4033						peers[0]
4034							.new_inbound_connection(fd_a.clone(), Some(addr_b.clone()))
4035							.unwrap();
4036						if peers[0].read_event(&mut fd_a, &initial_data).is_err() {
4037							break;
4038						}
4039
4040						while start_time.elapsed() < std::time::Duration::from_secs(1) {
4041							peers[0].process_events();
4042							if fd_a.disconnect.load(Ordering::Acquire) {
4043								break;
4044							}
4045							let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4046							if peers[1].read_event(&mut fd_b, &a_data).is_err() {
4047								break;
4048							}
4049
4050							peers[1].process_events();
4051							if fd_b.disconnect.load(Ordering::Acquire) {
4052								break;
4053							}
4054							let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4055							if peers[0].read_event(&mut fd_a, &b_data).is_err() {
4056								break;
4057							}
4058
4059							let node_id_1 =
4060								peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
4061							let msg_event_1 = MessageSendEvent::SendShutdown {
4062								node_id: node_id_1,
4063								msg: msgs::Shutdown {
4064									channel_id: ChannelId::new_zero(),
4065									scriptpubkey: bitcoin::ScriptBuf::new(),
4066								},
4067							};
4068							cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_event_1);
4069
4070							let node_id_0 =
4071								peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
4072							let msg_event_0 = MessageSendEvent::SendShutdown {
4073								node_id: node_id_0,
4074								msg: msgs::Shutdown {
4075									channel_id: ChannelId::new_zero(),
4076									scriptpubkey: bitcoin::ScriptBuf::new(),
4077								},
4078							};
4079							cfgs[1].chan_handler.pending_events.lock().unwrap().push(msg_event_0);
4080
4081							if ctr % 2 == 0 {
4082								peers[0].timer_tick_occurred();
4083								peers[1].timer_tick_occurred();
4084							}
4085						}
4086
4087						peers[0].socket_disconnected(&fd_a);
4088						peers[1].socket_disconnected(&fd_b);
4089						ctr += 1;
4090						std::thread::sleep(std::time::Duration::from_micros(1));
4091					}
4092				})
4093			}};
4094		}
4095		let thrd_a = spawn_thread!(1);
4096		let thrd_b = spawn_thread!(2);
4097
4098		thrd_a.join().unwrap();
4099		thrd_b.join().unwrap();
4100	}
4101
4102	#[test]
4103	fn test_feature_incompatible_peers() {
4104		let cfgs = create_peermgr_cfgs(2);
4105		let incompatible_cfgs = create_feature_incompatible_peermgr_cfgs(2);
4106
4107		let peers = create_network(2, &cfgs);
4108		let incompatible_peers = create_network(2, &incompatible_cfgs);
4109		let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
4110		for (peer_a, peer_b) in peer_pairs.iter() {
4111			let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
4112			let mut fd_a = FileDescriptor::new(1);
4113			let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
4114			let mut fd_b = FileDescriptor::new(1);
4115			let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
4116			let initial_data =
4117				peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
4118			peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4119			peer_a.read_event(&mut fd_a, &initial_data).unwrap();
4120			peer_a.process_events();
4121
4122			let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4123			peer_b.read_event(&mut fd_b, &a_data).unwrap();
4124
4125			peer_b.process_events();
4126			let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4127
4128			// Should fail because of unknown required features
4129			assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
4130		}
4131	}
4132
4133	#[test]
4134	fn test_chain_incompatible_peers() {
4135		let cfgs = create_peermgr_cfgs(2);
4136		let incompatible_cfgs = create_chain_incompatible_peermgr_cfgs(2);
4137
4138		let peers = create_network(2, &cfgs);
4139		let incompatible_peers = create_network(2, &incompatible_cfgs);
4140		let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
4141		for (peer_a, peer_b) in peer_pairs.iter() {
4142			let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
4143			let mut fd_a = FileDescriptor::new(1);
4144			let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
4145			let mut fd_b = FileDescriptor::new(1);
4146			let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
4147			let initial_data =
4148				peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
4149			peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4150			peer_a.read_event(&mut fd_a, &initial_data).unwrap();
4151			peer_a.process_events();
4152
4153			let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4154			peer_b.read_event(&mut fd_b, &a_data).unwrap();
4155
4156			peer_b.process_events();
4157			let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4158
4159			// Should fail because of incompatible chains
4160			assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
4161		}
4162	}
4163
4164	#[test]
4165	fn test_disconnect_peer() {
4166		// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
4167		// push a DisconnectPeer event to remove the node flagged by id
4168		let cfgs = create_peermgr_cfgs(2);
4169		let peers = create_network(2, &cfgs);
4170		establish_connection(&peers[0], &peers[1]);
4171		{
4172			let peers_len = peers[0].peers.read().unwrap().len();
4173			assert_eq!(peers_len, 1);
4174		}
4175
4176		let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
4177		cfgs[0].chan_handler.pending_events.lock().unwrap().push(MessageSendEvent::HandleError {
4178			node_id: their_id,
4179			action: msgs::ErrorAction::DisconnectPeer { msg: None },
4180		});
4181
4182		peers[0].process_events();
4183		{
4184			let peers_len = peers[0].peers.read().unwrap().len();
4185			assert_eq!(peers_len, 0);
4186		}
4187	}
4188
4189	#[test]
4190	fn test_send_simple_msg() {
4191		// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
4192		// push a message from one peer to another.
4193		let cfgs = create_peermgr_cfgs(2);
4194		let a_chan_handler = test_utils::TestChannelMessageHandler::new(
4195			ChainHash::using_genesis_block(Network::Testnet),
4196		);
4197		let b_chan_handler = test_utils::TestChannelMessageHandler::new(
4198			ChainHash::using_genesis_block(Network::Testnet),
4199		);
4200		let mut peers = create_network(2, &cfgs);
4201		let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4202		{
4203			let peers_len = peers[0].peers.read().unwrap().len();
4204			assert_eq!(peers_len, 1);
4205		}
4206
4207		let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
4208
4209		let msg = msgs::Shutdown {
4210			channel_id: ChannelId::from_bytes([42; 32]),
4211			scriptpubkey: bitcoin::ScriptBuf::new(),
4212		};
4213		a_chan_handler
4214			.pending_events
4215			.lock()
4216			.unwrap()
4217			.push(MessageSendEvent::SendShutdown { node_id: their_id, msg: msg.clone() });
4218		peers[0].message_handler.chan_handler = &a_chan_handler;
4219
4220		b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg));
4221		peers[1].message_handler.chan_handler = &b_chan_handler;
4222
4223		peers[0].process_events();
4224
4225		let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4226		peers[1].read_event(&mut fd_b, &a_data).unwrap();
4227	}
4228
4229	#[test]
4230	fn test_non_init_first_msg() {
4231		// Simple test of the first message received over a connection being something other than
4232		// Init. This results in an immediate disconnection, which previously included a spurious
4233		// peer_disconnected event handed to event handlers (which would panic in
4234		// `TestChannelMessageHandler` here).
4235		let cfgs = create_peermgr_cfgs(2);
4236		let peers = create_network(2, &cfgs);
4237
4238		let mut fd_dup = FileDescriptor::new(3);
4239		let addr_dup = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1003 };
4240		let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
4241		peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
4242
4243		let mut dup_encryptor =
4244			PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap());
4245		let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx);
4246		peers[0].read_event(&mut fd_dup, &initial_data).unwrap();
4247		peers[0].process_events();
4248
4249		let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0);
4250		let (act_three, _) =
4251			dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap();
4252		peers[0].read_event(&mut fd_dup, &act_three).unwrap();
4253
4254		let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 };
4255		let msg_bytes = dup_encryptor.encrypt_message(&not_init_msg);
4256		assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err());
4257	}
4258
4259	#[test]
4260	fn test_disconnect_all_peer() {
4261		// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
4262		// then calls disconnect_all_peers
4263		let cfgs = create_peermgr_cfgs(2);
4264		let peers = create_network(2, &cfgs);
4265		establish_connection(&peers[0], &peers[1]);
4266		{
4267			let peers_len = peers[0].peers.read().unwrap().len();
4268			assert_eq!(peers_len, 1);
4269		}
4270
4271		peers[0].disconnect_all_peers();
4272		{
4273			let peers_len = peers[0].peers.read().unwrap().len();
4274			assert_eq!(peers_len, 0);
4275		}
4276	}
4277
4278	#[test]
4279	fn test_timer_tick_occurred() {
4280		// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
4281		let cfgs = create_peermgr_cfgs(2);
4282		let peers = create_network(2, &cfgs);
4283		establish_connection(&peers[0], &peers[1]);
4284		{
4285			let peers_len = peers[0].peers.read().unwrap().len();
4286			assert_eq!(peers_len, 1);
4287		}
4288
4289		// peers[0] awaiting_pong is set to true, but the Peer is still connected
4290		peers[0].timer_tick_occurred();
4291		peers[0].process_events();
4292		{
4293			let peers_len = peers[0].peers.read().unwrap().len();
4294			assert_eq!(peers_len, 1);
4295		}
4296
4297		// Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
4298		peers[0].timer_tick_occurred();
4299		peers[0].process_events();
4300		{
4301			let peers_len = peers[0].peers.read().unwrap().len();
4302			assert_eq!(peers_len, 0);
4303		}
4304	}
4305
4306	fn do_test_peer_connected_error_disconnects(handler: usize) {
4307		// Test that if a message handler fails a connection in `peer_connected` we reliably
4308		// produce `peer_disconnected` events for all other message handlers (that saw a
4309		// corresponding `peer_connected`).
4310		let cfgs = create_peermgr_cfgs(2);
4311		let peers = create_network(2, &cfgs);
4312
4313		let chan_handler = peers[handler & 1].message_handler.chan_handler;
4314		let route_handler = peers[handler & 1].message_handler.route_handler;
4315		let custom_message_handler = peers[handler & 1].message_handler.custom_message_handler;
4316		let send_only_msg_handler = peers[handler & 1].message_handler.send_only_message_handler;
4317
4318		match handler & !1 {
4319			0 => {
4320				chan_handler.conn_tracker.fail_connections.store(true, Ordering::Release);
4321			},
4322			2 => {
4323				route_handler.conn_tracker.fail_connections.store(true, Ordering::Release);
4324			},
4325			4 => {
4326				custom_message_handler.conn_tracker.fail_connections.store(true, Ordering::Release);
4327			},
4328			6 => {
4329				send_only_msg_handler.0.fail_connections.store(true, Ordering::Release);
4330			},
4331			_ => panic!(),
4332		}
4333		let (_sd1, _sd2, a_refused, b_refused) = try_establish_connection(&peers[0], &peers[1]);
4334		if handler & 1 == 0 {
4335			assert!(a_refused.is_err());
4336			assert!(peers[0].list_peers().is_empty());
4337		} else {
4338			assert!(b_refused.is_err());
4339			assert!(peers[1].list_peers().is_empty());
4340		}
4341		// At least one message handler should have seen the connection.
4342		assert!(
4343			chan_handler.conn_tracker.had_peers.load(Ordering::Acquire)
4344				|| route_handler.conn_tracker.had_peers.load(Ordering::Acquire)
4345				|| custom_message_handler.conn_tracker.had_peers.load(Ordering::Acquire)
4346				|| send_only_msg_handler.0.had_peers.load(Ordering::Acquire)
4347		);
4348		// And both message handlers doing tracking should see the disconnection
4349		assert!(chan_handler.conn_tracker.connected_peers.lock().unwrap().is_empty());
4350		assert!(route_handler.conn_tracker.connected_peers.lock().unwrap().is_empty());
4351		assert!(custom_message_handler.conn_tracker.connected_peers.lock().unwrap().is_empty());
4352		assert!(send_only_msg_handler.0.connected_peers.lock().unwrap().is_empty());
4353	}
4354
4355	#[test]
4356	fn test_peer_connected_error_disconnects() {
4357		for i in 0..8 {
4358			do_test_peer_connected_error_disconnects(i);
4359		}
4360	}
4361
4362	#[test]
4363	fn test_do_attempt_write_data() {
4364		// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
4365		let cfgs = create_peermgr_cfgs(2);
4366		cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
4367		cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
4368		cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4369		cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4370		let peers = create_network(2, &cfgs);
4371
4372		// By calling establish_connect, we trigger do_attempt_write_data between
4373		// the peers. Previously this function would mistakenly enter an infinite loop
4374		// when there were more channel messages available than could fit into a peer's
4375		// buffer. This issue would now be detected by this test (because we use custom
4376		// RoutingMessageHandlers that intentionally return more channel messages
4377		// than can fit into a peer's buffer).
4378		let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4379
4380		// Make each peer to read the messages that the other peer just wrote to them. Note that
4381		// due to the max-message-before-ping limits this may take a few iterations to complete.
4382		for _ in 0..150 / super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
4383			peers[1].process_events();
4384			let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4385			assert!(!a_read_data.is_empty());
4386
4387			peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
4388			peers[0].process_events();
4389
4390			let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4391			assert!(!b_read_data.is_empty());
4392			peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
4393
4394			peers[0].process_events();
4395			assert_eq!(
4396				fd_a.outbound_data.lock().unwrap().len(),
4397				0,
4398				"Until A receives data, it shouldn't send more messages"
4399			);
4400		}
4401
4402		// Check that each peer has received the expected number of channel updates and channel
4403		// announcements.
4404		assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
4405		assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
4406		assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
4407		assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
4408	}
4409
4410	#[test]
4411	fn test_forward_while_syncing() {
4412		use crate::ln::peer_handler::tests::test_utils::get_dummy_channel_update;
4413
4414		// Test forwarding new channel announcements while we're doing syncing.
4415		let cfgs = create_peermgr_cfgs(2);
4416		cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
4417		cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
4418		cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4419		cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
4420		let peers = create_network(2, &cfgs);
4421
4422		let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4423
4424		// Iterate a handful of times to exchange some messages
4425		for _ in 0..150 {
4426			peers[1].process_events();
4427			let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4428			assert!(!a_read_data.is_empty());
4429
4430			peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
4431			peers[0].process_events();
4432
4433			let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4434			assert!(!b_read_data.is_empty());
4435			peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
4436
4437			peers[0].process_events();
4438			assert_eq!(
4439				fd_a.outbound_data.lock().unwrap().len(),
4440				0,
4441				"Until A receives data, it shouldn't send more messages"
4442			);
4443		}
4444
4445		// Forward one more gossip backfill message but don't flush it so that we can examine the
4446		// unencrypted message for broadcasts.
4447		fd_b.hang_writes.store(true, Ordering::Relaxed);
4448		peers[1].process_events();
4449
4450		{
4451			let peer_lock = peers[1].peers.read().unwrap();
4452			let peer = peer_lock.get(&fd_b).unwrap().lock().unwrap();
4453			assert_eq!(peer.pending_outbound_buffer.len(), 1);
4454			assert_eq!(peer.gossip_broadcast_buffer.len(), 0);
4455		}
4456
4457		// At this point we should have sent channel announcements up to roughly SCID 150. Now
4458		// build an updated update for SCID 100 and SCID 5000 and make sure only the one for SCID
4459		// 100 gets forwarded
4460		let msg_100 = get_dummy_channel_update(100);
4461		let msg_ev_100 = MessageSendEvent::BroadcastChannelUpdate { msg: msg_100.clone() };
4462
4463		let msg_5000 = get_dummy_channel_update(5000);
4464		let msg_ev_5000 = MessageSendEvent::BroadcastChannelUpdate { msg: msg_5000 };
4465
4466		fd_a.hang_writes.store(true, Ordering::Relaxed);
4467
4468		cfgs[1].routing_handler.pending_events.lock().unwrap().push(msg_ev_100);
4469		cfgs[1].routing_handler.pending_events.lock().unwrap().push(msg_ev_5000);
4470		peers[1].process_events();
4471
4472		{
4473			let peer_lock = peers[1].peers.read().unwrap();
4474			let peer = peer_lock.get(&fd_b).unwrap().lock().unwrap();
4475			assert_eq!(peer.pending_outbound_buffer.len(), 1);
4476			assert_eq!(peer.gossip_broadcast_buffer.len(), 1);
4477
4478			let pending_msg = &peer.gossip_broadcast_buffer[0];
4479			let expected = encode_msg!(&msg_100);
4480			assert_eq!(expected, pending_msg.fetch_encoded_msg_with_type_pfx());
4481		}
4482	}
4483
4484	#[test]
4485	fn test_handshake_timeout() {
4486		// Tests that we time out a peer still waiting on handshake completion after a full timer
4487		// tick.
4488		let cfgs = create_peermgr_cfgs(2);
4489		cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
4490		cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
4491		let peers = create_network(2, &cfgs);
4492
4493		let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
4494		let mut fd_a = FileDescriptor::new(1);
4495		let mut fd_b = FileDescriptor::new(1);
4496		let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
4497		peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
4498
4499		// If we get a single timer tick before completion, that's fine
4500		{
4501			let peers_len = peers[0].peers.read().unwrap().len();
4502			assert_eq!(peers_len, 1);
4503		}
4504		peers[0].timer_tick_occurred();
4505		{
4506			let peers_len = peers[0].peers.read().unwrap().len();
4507			assert_eq!(peers_len, 1);
4508		}
4509
4510		peers[0].read_event(&mut fd_a, &initial_data).unwrap();
4511		peers[0].process_events();
4512		let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4513		peers[1].read_event(&mut fd_b, &a_data).unwrap();
4514		peers[1].process_events();
4515
4516		// ...but if we get a second timer tick, we should disconnect the peer
4517		peers[0].timer_tick_occurred();
4518		{
4519			let peers_len = peers[0].peers.read().unwrap().len();
4520			assert_eq!(peers_len, 0);
4521		}
4522
4523		let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
4524		assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
4525	}
4526
4527	#[test]
4528	fn test_inbound_conn_handshake_complete_awaiting_pong() {
4529		// Test that we do not disconnect an outbound peer after the noise handshake completes due
4530		// to a pong timeout for a ping that was never sent if a timer tick fires after we send act
4531		// two of the noise handshake along with our init message but before we receive their init
4532		// message.
4533		let logger = test_utils::TestLogger::new();
4534		let node_signer_a =
4535			test_utils::TestNodeSigner::new(SecretKey::from_slice(&[42; 32]).unwrap());
4536		let node_signer_b =
4537			test_utils::TestNodeSigner::new(SecretKey::from_slice(&[43; 32]).unwrap());
4538		let message_handler_a = MessageHandler {
4539			chan_handler: ErroringMessageHandler::new(),
4540			route_handler: IgnoringMessageHandler {},
4541			onion_message_handler: IgnoringMessageHandler {},
4542			custom_message_handler: IgnoringMessageHandler {},
4543			send_only_message_handler: IgnoringMessageHandler {},
4544		};
4545		let message_handler_b = MessageHandler {
4546			chan_handler: ErroringMessageHandler::new(),
4547			route_handler: IgnoringMessageHandler {},
4548			onion_message_handler: IgnoringMessageHandler {},
4549			custom_message_handler: IgnoringMessageHandler {},
4550			send_only_message_handler: IgnoringMessageHandler {},
4551		};
4552		let peer_a = PeerManager::new(message_handler_a, 0, &[0; 32], &logger, &node_signer_a);
4553		let peer_b = PeerManager::new(message_handler_b, 0, &[1; 32], &logger, &node_signer_b);
4554
4555		let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap();
4556		let mut fd_a = FileDescriptor::new(1);
4557		let mut fd_b = FileDescriptor::new(1);
4558
4559		// Exchange messages with both peers until they both complete the init handshake.
4560		let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
4561		peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
4562
4563		peer_a.read_event(&mut fd_a, &act_one).unwrap();
4564		peer_a.process_events();
4565
4566		let act_two = fd_a.outbound_data.lock().unwrap().split_off(0);
4567		peer_b.read_event(&mut fd_b, &act_two).unwrap();
4568		peer_b.process_events();
4569
4570		// Calling this here triggers the race on inbound connections.
4571		peer_b.timer_tick_occurred();
4572
4573		let act_three_with_init_b = fd_b.outbound_data.lock().unwrap().split_off(0);
4574		{
4575			let peer_a_lock = peer_a.peers.read().unwrap();
4576			let handshake_complete =
4577				peer_a_lock.get(&fd_a).unwrap().lock().unwrap().handshake_complete();
4578			assert!(!handshake_complete);
4579		}
4580
4581		peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap();
4582		peer_a.process_events();
4583
4584		{
4585			let peer_a_lock = peer_a.peers.read().unwrap();
4586			let handshake_complete =
4587				peer_a_lock.get(&fd_a).unwrap().lock().unwrap().handshake_complete();
4588			assert!(handshake_complete);
4589		}
4590
4591		let init_a = fd_a.outbound_data.lock().unwrap().split_off(0);
4592		assert!(!init_a.is_empty());
4593
4594		{
4595			let peer_b_lock = peer_b.peers.read().unwrap();
4596			let handshake_complete =
4597				peer_b_lock.get(&fd_b).unwrap().lock().unwrap().handshake_complete();
4598			assert!(!handshake_complete);
4599		}
4600
4601		peer_b.read_event(&mut fd_b, &init_a).unwrap();
4602		peer_b.process_events();
4603
4604		{
4605			let peer_b_lock = peer_b.peers.read().unwrap();
4606			let handshake_complete =
4607				peer_b_lock.get(&fd_b).unwrap().lock().unwrap().handshake_complete();
4608			assert!(handshake_complete);
4609		}
4610
4611		// Make sure we're still connected.
4612		{
4613			let peers_len = peer_b.peers.read().unwrap().len();
4614			assert_eq!(peers_len, 1);
4615		}
4616
4617		// B should send a ping on the first timer tick after `handshake_complete`.
4618		assert!(fd_b.outbound_data.lock().unwrap().split_off(0).is_empty());
4619		peer_b.timer_tick_occurred();
4620		peer_b.process_events();
4621		assert!(!fd_b.outbound_data.lock().unwrap().split_off(0).is_empty());
4622
4623		let mut send_warning = || {
4624			{
4625				let peers = peer_a.peers.read().unwrap();
4626				let mut peer_b = peers.get(&fd_a).unwrap().lock().unwrap();
4627				peer_a.enqueue_message(
4628					&mut peer_b,
4629					&msgs::WarningMessage {
4630						channel_id: ChannelId([0; 32]),
4631						data: "no disconnect plz".to_string(),
4632					},
4633				);
4634			}
4635			peer_a.process_events();
4636			let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
4637			assert!(!msg.is_empty());
4638			peer_b.read_event(&mut fd_b, &msg).unwrap();
4639			peer_b.process_events();
4640		};
4641
4642		// Fire more ticks until we reach the pong timeout. We send any message except pong to
4643		// pretend the connection is still alive.
4644		send_warning();
4645		for _ in 0..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER {
4646			peer_b.timer_tick_occurred();
4647			send_warning();
4648		}
4649		{
4650			let peers_len = peer_b.peers.read().unwrap().len();
4651			assert_eq!(peers_len, 1);
4652		}
4653
4654		// One more tick should enforce the pong timeout.
4655		peer_b.timer_tick_occurred();
4656		{
4657			let peers_len = peer_b.peers.read().unwrap().len();
4658			assert_eq!(peers_len, 0);
4659		}
4660	}
4661
4662	#[test]
4663	fn test_gossip_flood_pause() {
4664		use crate::routing::test_utils::channel_announcement;
4665		use lightning_types::features::ChannelFeatures;
4666
4667		// Simple test which connects two nodes to a PeerManager and checks that if we run out of
4668		// socket buffer space we'll stop forwarding gossip but still push our own gossip.
4669		let cfgs = create_peermgr_cfgs(2);
4670		let peers = create_network(2, &cfgs);
4671		let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
4672
4673		macro_rules! drain_queues {
4674			() => {
4675				loop {
4676					peers[0].process_events();
4677					peers[1].process_events();
4678
4679					let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
4680					if !msg.is_empty() {
4681						peers[1].read_event(&mut fd_b, &msg).unwrap();
4682						continue;
4683					}
4684					let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
4685					if !msg.is_empty() {
4686						peers[0].read_event(&mut fd_a, &msg).unwrap();
4687						continue;
4688					}
4689					break;
4690				}
4691			};
4692		}
4693
4694		// First, make sure all pending messages have been processed and queues drained.
4695		drain_queues!();
4696
4697		let secp_ctx = Secp256k1::new();
4698		let key = SecretKey::from_slice(&[1; 32]).unwrap();
4699		let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
4700		// The message bufer size is the message length plus two 16-byte MACs plus a 2-byte length
4701		// and 2-byte type.
4702		let encoded_size = msg.serialized_length() + 16 * 2 + 2 + 2;
4703		let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None };
4704
4705		fd_a.hang_writes.store(true, Ordering::Relaxed);
4706
4707		// Now push an arbitrarily large number of messages and check that only
4708		// `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP` message bytes end up in the queue.
4709		for _ in 0..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size {
4710			cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
4711			peers[0].process_events();
4712		}
4713
4714		{
4715			let peer_a_lock = peers[0].peers.read().unwrap();
4716			let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap();
4717			let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>()
4718				+ peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>();
4719			assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size);
4720			assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP);
4721		}
4722
4723		// Check that if a broadcast message comes in from the channel handler (i.e. it is an
4724		// announcement for our own channel), it gets queued anyway.
4725		cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
4726		peers[0].process_events();
4727
4728		{
4729			let peer_a_lock = peers[0].peers.read().unwrap();
4730			let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap();
4731			let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>()
4732				+ peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>();
4733			assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP);
4734			assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size);
4735		}
4736
4737		// Finally, deliver all the messages and make sure we got the right count. Note that there
4738		// was an extra message that had already moved from the broadcast queue to the encrypted
4739		// message queue so we actually receive `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + 2`
4740		// message bytes.
4741		fd_a.hang_writes.store(false, Ordering::Relaxed);
4742		cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
4743		peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
4744
4745		drain_queues!();
4746		{
4747			let peer_a_lock = peers[0].peers.read().unwrap();
4748			let empty =
4749				peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty();
4750			assert!(empty);
4751		}
4752
4753		assert_eq!(
4754			cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
4755			OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1
4756		);
4757	}
4758
4759	#[test]
4760	fn test_filter_addresses() {
4761		// Tests the filter_addresses function.
4762
4763		// For (10/8)
4764		let ip_address = SocketAddress::TcpIpV4 { addr: [10, 0, 0, 0], port: 1000 };
4765		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4766		let ip_address = SocketAddress::TcpIpV4 { addr: [10, 0, 255, 201], port: 1000 };
4767		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4768		let ip_address = SocketAddress::TcpIpV4 { addr: [10, 255, 255, 255], port: 1000 };
4769		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4770
4771		// For (0/8)
4772		let ip_address = SocketAddress::TcpIpV4 { addr: [0, 0, 0, 0], port: 1000 };
4773		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4774		let ip_address = SocketAddress::TcpIpV4 { addr: [0, 0, 255, 187], port: 1000 };
4775		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4776		let ip_address = SocketAddress::TcpIpV4 { addr: [0, 255, 255, 255], port: 1000 };
4777		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4778
4779		// For (100.64/10)
4780		let ip_address = SocketAddress::TcpIpV4 { addr: [100, 64, 0, 0], port: 1000 };
4781		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4782		let ip_address = SocketAddress::TcpIpV4 { addr: [100, 78, 255, 0], port: 1000 };
4783		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4784		let ip_address = SocketAddress::TcpIpV4 { addr: [100, 127, 255, 255], port: 1000 };
4785		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4786
4787		// For (127/8)
4788		let ip_address = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 0], port: 1000 };
4789		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4790		let ip_address = SocketAddress::TcpIpV4 { addr: [127, 65, 73, 0], port: 1000 };
4791		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4792		let ip_address = SocketAddress::TcpIpV4 { addr: [127, 255, 255, 255], port: 1000 };
4793		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4794
4795		// For (169.254/16)
4796		let ip_address = SocketAddress::TcpIpV4 { addr: [169, 254, 0, 0], port: 1000 };
4797		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4798		let ip_address = SocketAddress::TcpIpV4 { addr: [169, 254, 221, 101], port: 1000 };
4799		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4800		let ip_address = SocketAddress::TcpIpV4 { addr: [169, 254, 255, 255], port: 1000 };
4801		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4802
4803		// For (172.16/12)
4804		let ip_address = SocketAddress::TcpIpV4 { addr: [172, 16, 0, 0], port: 1000 };
4805		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4806		let ip_address = SocketAddress::TcpIpV4 { addr: [172, 27, 101, 23], port: 1000 };
4807		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4808		let ip_address = SocketAddress::TcpIpV4 { addr: [172, 31, 255, 255], port: 1000 };
4809		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4810
4811		// For (192.168/16)
4812		let ip_address = SocketAddress::TcpIpV4 { addr: [192, 168, 0, 0], port: 1000 };
4813		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4814		let ip_address = SocketAddress::TcpIpV4 { addr: [192, 168, 205, 159], port: 1000 };
4815		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4816		let ip_address = SocketAddress::TcpIpV4 { addr: [192, 168, 255, 255], port: 1000 };
4817		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4818
4819		// For (192.88.99/24)
4820		let ip_address = SocketAddress::TcpIpV4 { addr: [192, 88, 99, 0], port: 1000 };
4821		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4822		let ip_address = SocketAddress::TcpIpV4 { addr: [192, 88, 99, 140], port: 1000 };
4823		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4824		let ip_address = SocketAddress::TcpIpV4 { addr: [192, 88, 99, 255], port: 1000 };
4825		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4826
4827		// For other IPv4 addresses
4828		let ip_address = SocketAddress::TcpIpV4 { addr: [188, 255, 99, 0], port: 1000 };
4829		assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4830		let ip_address = SocketAddress::TcpIpV4 { addr: [123, 8, 129, 14], port: 1000 };
4831		assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4832		let ip_address = SocketAddress::TcpIpV4 { addr: [2, 88, 9, 255], port: 1000 };
4833		assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4834
4835		// For (2000::/3)
4836		let ip_address = SocketAddress::TcpIpV6 {
4837			addr: [32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
4838			port: 1000,
4839		};
4840		assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4841		let ip_address = SocketAddress::TcpIpV6 {
4842			addr: [45, 34, 209, 190, 0, 123, 55, 34, 0, 0, 3, 27, 201, 0, 0, 0],
4843			port: 1000,
4844		};
4845		assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4846		let ip_address = SocketAddress::TcpIpV6 {
4847			addr: [63, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255],
4848			port: 1000,
4849		};
4850		assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
4851
4852		// For other IPv6 addresses
4853		let ip_address = SocketAddress::TcpIpV6 {
4854			addr: [24, 240, 12, 32, 0, 0, 0, 0, 20, 97, 0, 32, 121, 254, 0, 0],
4855			port: 1000,
4856		};
4857		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4858		let ip_address = SocketAddress::TcpIpV6 {
4859			addr: [68, 23, 56, 63, 0, 0, 2, 7, 75, 109, 0, 39, 0, 0, 0, 0],
4860			port: 1000,
4861		};
4862		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4863		let ip_address = SocketAddress::TcpIpV6 {
4864			addr: [101, 38, 140, 230, 100, 0, 30, 98, 0, 26, 0, 0, 57, 96, 0, 0],
4865			port: 1000,
4866		};
4867		assert_eq!(filter_addresses(Some(ip_address.clone())), None);
4868
4869		// For (None)
4870		assert_eq!(filter_addresses(None), None);
4871	}
4872
4873	#[test]
4874	#[cfg(feature = "std")]
4875	fn test_process_events_multithreaded() {
4876		use std::time::{Duration, Instant};
4877		// `process_events` shouldn't block on another thread processing events and instead should
4878		// simply signal the currently processing thread to go around the loop again.
4879		// Here we test that this happens by spawning a few threads and checking that we see one go
4880		// around again at least once.
4881		//
4882		// Each time `process_events` goes around the loop we call
4883		// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. Thus,
4884		// to test we simply write zero to the counter before calling `process_events` and make
4885		// sure we observe a value greater than one at least once.
4886		let cfg = Arc::new(create_peermgr_cfgs(1));
4887		// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
4888		let peer = Arc::new(
4889			create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap(),
4890		);
4891
4892		let end_time = Instant::now() + Duration::from_millis(100);
4893		let observed_loop = Arc::new(AtomicBool::new(false));
4894		let thread_fn = || {
4895			let thread_peer = Arc::clone(&peer);
4896			let thread_observed_loop = Arc::clone(&observed_loop);
4897			move || {
4898				while Instant::now() < end_time || !thread_observed_loop.load(Ordering::Acquire) {
4899					test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER
4900						.with(|val| val.store(0, Ordering::Relaxed));
4901					thread_peer.process_events();
4902					if test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER
4903						.with(|val| val.load(Ordering::Relaxed))
4904						> 1
4905					{
4906						thread_observed_loop.store(true, Ordering::Release);
4907						return;
4908					}
4909					std::thread::sleep(Duration::from_micros(1));
4910				}
4911			}
4912		};
4913
4914		let thread_a = std::thread::spawn(thread_fn());
4915		let thread_b = std::thread::spawn(thread_fn());
4916		let thread_c = std::thread::spawn(thread_fn());
4917		thread_fn()();
4918		thread_a.join().unwrap();
4919		thread_b.join().unwrap();
4920		thread_c.join().unwrap();
4921		assert!(observed_loop.load(Ordering::Acquire));
4922	}
4923}