lightning/routing/
utxo.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! This module contains traits for LDK to access UTXOs to check gossip data is correct.
11//!
12//! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
13//! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
14//! order to announce a channel. This module handles that checking.
15
16use bitcoin::TxOut;
17use bitcoin::amount::Amount;
18use bitcoin::constants::ChainHash;
19
20use bitcoin::hex::DisplayHex;
21
22use crate::events::MessageSendEvent;
23use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
24use crate::ln::msgs::{self, LightningError, ErrorAction};
25use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
26use crate::util::logger::{Level, Logger};
27
28use crate::prelude::*;
29
30use alloc::sync::{Arc, Weak};
31use crate::sync::{Mutex, LockTestExt};
32use core::ops::Deref;
33
34/// An error when accessing the chain via [`UtxoLookup`].
35#[derive(Clone, Debug)]
36pub enum UtxoLookupError {
37	/// The requested chain is unknown.
38	UnknownChain,
39
40	/// The requested transaction doesn't exist or hasn't confirmed.
41	UnknownTx,
42}
43
44/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
45/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
46/// variant.
47#[derive(Clone)]
48pub enum UtxoResult {
49	/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
50	/// requested or a [`UtxoLookupError`].
51	Sync(Result<TxOut, UtxoLookupError>),
52	/// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
53	/// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
54	///
55	/// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
56	/// but only fairly loosely. Because a pending checks block all message processing, leaving
57	/// checks pending for an extended time may cause DoS of other functions. It is recommended you
58	/// keep a tight timeout on lookups, on the order of a few seconds.
59	Async(UtxoFuture),
60}
61
62/// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
63pub trait UtxoLookup {
64	/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
65	/// Returns an error if `chain_hash` is for a different chain or if such a transaction output is
66	/// unknown.
67	///
68	/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
69	fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
70}
71
72enum ChannelAnnouncement {
73	Full(msgs::ChannelAnnouncement),
74	Unsigned(msgs::UnsignedChannelAnnouncement),
75}
76impl ChannelAnnouncement {
77	fn node_id_1(&self) -> &NodeId {
78		match self {
79			ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
80			ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
81		}
82	}
83}
84
85enum NodeAnnouncement {
86	Full(msgs::NodeAnnouncement),
87	Unsigned(msgs::UnsignedNodeAnnouncement),
88}
89impl NodeAnnouncement {
90	fn timestamp(&self) -> u32 {
91		match self {
92			NodeAnnouncement::Full(msg) => msg.contents.timestamp,
93			NodeAnnouncement::Unsigned(msg) => msg.timestamp,
94		}
95	}
96}
97
98enum ChannelUpdate {
99	Full(msgs::ChannelUpdate),
100	Unsigned(msgs::UnsignedChannelUpdate),
101}
102impl ChannelUpdate {
103	fn timestamp(&self) -> u32 {
104		match self {
105			ChannelUpdate::Full(msg) => msg.contents.timestamp,
106			ChannelUpdate::Unsigned(msg) => msg.timestamp,
107		}
108	}
109}
110
111struct UtxoMessages {
112	complete: Option<Result<TxOut, UtxoLookupError>>,
113	channel_announce: Option<ChannelAnnouncement>,
114	latest_node_announce_a: Option<NodeAnnouncement>,
115	latest_node_announce_b: Option<NodeAnnouncement>,
116	latest_channel_update_a: Option<ChannelUpdate>,
117	latest_channel_update_b: Option<ChannelUpdate>,
118}
119
120/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
121///
122/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
123#[derive(Clone)]
124pub struct UtxoFuture {
125	state: Arc<Mutex<UtxoMessages>>,
126}
127
128/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
129/// once we have a concrete resolution of a request.
130pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
131impl UtxoLookup for UtxoResolver {
132	fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
133		UtxoResult::Sync(self.0.clone())
134	}
135}
136
137impl UtxoFuture {
138	/// Builds a new future for later resolution.
139	pub fn new() -> Self {
140		Self { state: Arc::new(Mutex::new(UtxoMessages {
141			complete: None,
142			channel_announce: None,
143			latest_node_announce_a: None,
144			latest_node_announce_b: None,
145			latest_channel_update_a: None,
146			latest_channel_update_b: None,
147		}))}
148	}
149
150	/// Resolves this future against the given `graph` and with the given `result`.
151	///
152	/// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
153	/// forwarding the validated gossip message onwards to peers.
154	///
155	/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
156	/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
157	/// after this.
158	///
159	/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
160	/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
161	pub fn resolve_without_forwarding<L: Deref>(&self,
162		graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
163	where L::Target: Logger {
164		self.do_resolve(graph, result);
165	}
166
167	/// Resolves this future against the given `graph` and with the given `result`.
168	///
169	/// The given `gossip` is used to broadcast any validated messages onwards to all peers which
170	/// have available buffer space.
171	///
172	/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
173	/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
174	/// after this.
175	///
176	/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
177	/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
178	pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
179		graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
180	) where L::Target: Logger, U::Target: UtxoLookup {
181		let mut res = self.do_resolve(graph, result);
182		for msg_opt in res.iter_mut() {
183			if let Some(msg) = msg_opt.take() {
184				gossip.forward_gossip_msg(msg);
185			}
186		}
187	}
188
189	fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
190	-> [Option<MessageSendEvent>; 5] where L::Target: Logger {
191		let (announcement, node_a, node_b, update_a, update_b) = {
192			let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
193			let mut async_messages = self.state.lock().unwrap();
194
195			if async_messages.channel_announce.is_none() {
196				// We raced returning to `check_channel_announcement` which hasn't updated
197				// `channel_announce` yet. That's okay, we can set the `complete` field which it will
198				// check once it gets control again.
199				async_messages.complete = Some(result);
200				return [None, None, None, None, None];
201			}
202
203			let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
204				ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
205				ChannelAnnouncement::Unsigned(msg) => &msg,
206			};
207
208			pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
209
210			(async_messages.channel_announce.take().unwrap(),
211				async_messages.latest_node_announce_a.take(),
212				async_messages.latest_node_announce_b.take(),
213				async_messages.latest_channel_update_a.take(),
214				async_messages.latest_channel_update_b.take())
215		};
216
217		let mut res = [None, None, None, None, None];
218		let mut res_idx = 0;
219
220		// Now that we've updated our internal state, pass the pending messages back through the
221		// network graph with a different `UtxoLookup` which will resolve immediately.
222		// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
223		// with them.
224		let resolver = UtxoResolver(result);
225		match announcement {
226			ChannelAnnouncement::Full(signed_msg) => {
227				if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
228					res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
229						msg: signed_msg, update_msg: None,
230					});
231					res_idx += 1;
232				}
233			},
234			ChannelAnnouncement::Unsigned(msg) => {
235				let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
236			},
237		}
238
239		for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
240			match announce {
241				Some(NodeAnnouncement::Full(signed_msg)) => {
242					if graph.update_node_from_announcement(&signed_msg).is_ok() {
243						res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
244							msg: signed_msg,
245						});
246						res_idx += 1;
247					}
248				},
249				Some(NodeAnnouncement::Unsigned(msg)) => {
250					let _ = graph.update_node_from_unsigned_announcement(&msg);
251				},
252				None => {},
253			}
254		}
255
256		for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
257			match update {
258				Some(ChannelUpdate::Full(signed_msg)) => {
259					if graph.update_channel(&signed_msg).is_ok() {
260						res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
261							msg: signed_msg,
262						});
263						res_idx += 1;
264					}
265				},
266				Some(ChannelUpdate::Unsigned(msg)) => {
267					let _ = graph.update_channel_unsigned(&msg);
268				},
269				None => {},
270			}
271		}
272
273		res
274	}
275}
276
277struct PendingChecksContext {
278	channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
279	nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
280}
281
282impl PendingChecksContext {
283	fn lookup_completed(&mut self,
284		msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
285	) {
286		if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
287			if Weak::ptr_eq(e.get(), &completed_state) {
288				e.remove();
289			}
290		}
291
292		if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
293			e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
294			if e.get().is_empty() { e.remove(); }
295		}
296		if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
297			e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
298			if e.get().is_empty() { e.remove(); }
299		}
300	}
301}
302
303/// A set of messages which are pending UTXO lookups for processing.
304pub(super) struct PendingChecks {
305	internal: Mutex<PendingChecksContext>,
306}
307
308impl PendingChecks {
309	pub(super) fn new() -> Self {
310		PendingChecks { internal: Mutex::new(PendingChecksContext {
311			channels: new_hash_map(), nodes: new_hash_map(),
312		}) }
313	}
314
315	/// Checks if there is a pending `channel_update` UTXO validation for the given channel,
316	/// and, if so, stores the channel message for handling later and returns an `Err`.
317	pub(super) fn check_hold_pending_channel_update(
318		&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
319	) -> Result<(), LightningError> {
320		let mut pending_checks = self.internal.lock().unwrap();
321		if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
322			let is_from_a = (msg.channel_flags & 1) == 1;
323			match Weak::upgrade(e.get()) {
324				Some(msgs_ref) => {
325					let mut messages = msgs_ref.lock().unwrap();
326					let latest_update = if is_from_a {
327							&mut messages.latest_channel_update_a
328						} else {
329							&mut messages.latest_channel_update_b
330						};
331					if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
332						// If the messages we got has a higher timestamp, just blindly assume the
333						// signatures on the new message are correct and drop the old message. This
334						// may cause us to end up dropping valid `channel_update`s if a peer is
335						// malicious, but we should get the correct ones when the node updates them.
336						*latest_update = Some(
337							if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
338							else { ChannelUpdate::Unsigned(msg.clone()) });
339					}
340					return Err(LightningError {
341						err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
342						action: ErrorAction::IgnoreAndLog(Level::Gossip),
343					});
344				},
345				None => { e.remove(); },
346			}
347		}
348		Ok(())
349	}
350
351	/// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
352	/// given node and, if so, stores the channel message for handling later and returns an `Err`.
353	pub(super) fn check_hold_pending_node_announcement(
354		&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
355	) -> Result<(), LightningError> {
356		let mut pending_checks = self.internal.lock().unwrap();
357		if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
358			let mut found_at_least_one_chan = false;
359			e.get_mut().retain(|node_msgs| {
360				match Weak::upgrade(&node_msgs) {
361					Some(chan_mtx) => {
362						let mut chan_msgs = chan_mtx.lock().unwrap();
363						if let Some(chan_announce) = &chan_msgs.channel_announce {
364							let latest_announce =
365								if *chan_announce.node_id_1() == msg.node_id {
366									&mut chan_msgs.latest_node_announce_a
367								} else {
368									&mut chan_msgs.latest_node_announce_b
369								};
370							if latest_announce.is_none() ||
371								latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
372							{
373								*latest_announce = Some(
374									if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
375									else { NodeAnnouncement::Unsigned(msg.clone()) });
376							}
377							found_at_least_one_chan = true;
378							true
379						} else {
380							debug_assert!(false, "channel_announce is set before struct is added to node map");
381							false
382						}
383					},
384					None => false,
385				}
386			});
387			if e.get().is_empty() { e.remove(); }
388			if found_at_least_one_chan {
389				return Err(LightningError {
390					err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
391					action: ErrorAction::IgnoreAndLog(Level::Gossip),
392				});
393			}
394		}
395		Ok(())
396	}
397
398	fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
399		full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
400		pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
401	) -> Result<(), msgs::LightningError> {
402		match pending_channels.entry(msg.short_channel_id) {
403			hash_map::Entry::Occupied(mut e) => {
404				// There's already a pending lookup for the given SCID. Check if the messages
405				// are the same and, if so, return immediately (don't bother spawning another
406				// lookup if we haven't gotten that far yet).
407				match Weak::upgrade(&e.get()) {
408					Some(pending_msgs) => {
409						// This may be called with the mutex held on a different UtxoMessages
410						// struct, however in that case we have a global lockorder of new messages
411						// -> old messages, which makes this safe.
412						let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
413							Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
414							Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
415							None => {
416								// This shouldn't actually be reachable. We set the
417								// `channel_announce` field under the same lock as setting the
418								// channel map entry. Still, we can just treat it as
419								// non-matching and let the new request fly.
420								debug_assert!(false);
421								false
422							},
423						};
424						if pending_matches {
425							return Err(LightningError {
426								err: "Channel announcement is already being checked".to_owned(),
427								action: ErrorAction::IgnoreDuplicateGossip,
428							});
429						} else {
430							// The earlier lookup is a different message. If we have another
431							// request in-flight now replace the original.
432							// Note that in the replace case whether to replace is somewhat
433							// arbitrary - both results will be handled, we're just updating the
434							// value that will be compared to future lookups with the same SCID.
435							if let Some(item) = replacement {
436								*e.get_mut() = item;
437							}
438						}
439					},
440					None => {
441						// The earlier lookup already resolved. We can't be sure its the same
442						// so just remove/replace it and move on.
443						if let Some(item) = replacement {
444							*e.get_mut() = item;
445						} else { e.remove(); }
446					},
447				}
448			},
449			hash_map::Entry::Vacant(v) => {
450				if let Some(item) = replacement { v.insert(item); }
451			},
452		}
453		Ok(())
454	}
455
456	pub(super) fn check_channel_announcement<U: Deref>(&self,
457		utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
458		full_msg: Option<&msgs::ChannelAnnouncement>
459	) -> Result<Option<Amount>, msgs::LightningError> where U::Target: UtxoLookup {
460		let handle_result = |res| {
461			match res {
462				Ok(TxOut { value, script_pubkey }) => {
463					let expected_script =
464						make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_p2wsh();
465					if script_pubkey != expected_script {
466						return Err(LightningError{
467							err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
468								expected_script.to_hex_string(), script_pubkey.to_hex_string()),
469							action: ErrorAction::IgnoreError
470						});
471					}
472					Ok(Some(value))
473				},
474				Err(UtxoLookupError::UnknownChain) => {
475					Err(LightningError {
476						err: format!("Channel announced on an unknown chain ({})",
477							msg.chain_hash.to_bytes().as_hex()),
478						action: ErrorAction::IgnoreError
479					})
480				},
481				Err(UtxoLookupError::UnknownTx) => {
482					Err(LightningError {
483						err: "Channel announced without corresponding UTXO entry".to_owned(),
484						action: ErrorAction::IgnoreError
485					})
486				},
487			}
488		};
489
490		Self::check_replace_previous_entry(msg, full_msg, None,
491			&mut self.internal.lock().unwrap().channels)?;
492
493		match utxo_lookup {
494			&None => {
495				// Tentatively accept, potentially exposing us to DoS attacks
496				Ok(None)
497			},
498			&Some(ref utxo_lookup) => {
499				match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
500					UtxoResult::Sync(res) => handle_result(res),
501					UtxoResult::Async(future) => {
502						let mut pending_checks = self.internal.lock().unwrap();
503						let mut async_messages = future.state.lock().unwrap();
504						if let Some(res) = async_messages.complete.take() {
505							// In the unlikely event the future resolved before we managed to get it,
506							// handle the result in-line.
507							handle_result(res)
508						} else {
509							Self::check_replace_previous_entry(msg, full_msg,
510								Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
511							async_messages.channel_announce = Some(
512								if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
513								else { ChannelAnnouncement::Unsigned(msg.clone()) });
514							pending_checks.nodes.entry(msg.node_id_1)
515								.or_default().push(Arc::downgrade(&future.state));
516							pending_checks.nodes.entry(msg.node_id_2)
517								.or_default().push(Arc::downgrade(&future.state));
518							Err(LightningError {
519								err: "Channel being checked async".to_owned(),
520								action: ErrorAction::IgnoreAndLog(Level::Gossip),
521							})
522						}
523					},
524				}
525			}
526		}
527	}
528
529	/// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
530	/// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
531	/// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
532	/// which we'll have to process. With a socket buffer of 4KB and a minimum
533	/// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
534	/// count` messages to process beyond this limit. Because we'll probably have a few peers,
535	/// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
536	/// checks should be more than enough for decent parallelism.
537	const MAX_PENDING_LOOKUPS: usize = 32;
538
539	/// Returns true if there are a large number of async checks pending and future
540	/// `channel_announcement` messages should be delayed. Note that this is only a hint and
541	/// messages already in-flight may still have to be handled for various reasons.
542	pub(super) fn too_many_checks_pending(&self) -> bool {
543		let mut pending_checks = self.internal.lock().unwrap();
544		if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
545			// If we have many channel checks pending, ensure we don't have any dangling checks
546			// (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
547			// instead) before we commit to applying backpressure.
548			pending_checks.channels.retain(|_, chan| {
549				Weak::upgrade(&chan).is_some()
550			});
551			pending_checks.nodes.retain(|_, channels| {
552				channels.retain(|chan| Weak::upgrade(&chan).is_some());
553				!channels.is_empty()
554			});
555			pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
556		} else {
557			false
558		}
559	}
560}
561
562#[cfg(test)]
563mod tests {
564	use super::*;
565	use crate::routing::gossip::tests::*;
566	use crate::util::test_utils::{TestChainSource, TestLogger};
567
568	use bitcoin::amount::Amount;
569	use bitcoin::secp256k1::{Secp256k1, SecretKey};
570
571	use core::sync::atomic::Ordering;
572
573	fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
574		let logger = Box::new(TestLogger::new());
575		let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
576		let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
577
578		(chain_source, network_graph)
579	}
580
581	fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
582		NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
583		msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
584	{
585		let secp_ctx = Secp256k1::new();
586
587		let (chain_source, network_graph) = get_network();
588
589		let good_script = get_channel_script(&secp_ctx);
590		let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
591		let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
592		let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
593
594		let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
595		let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
596
597		// Note that we have to set the "direction" flag correctly on both messages
598		let chan_update_a = get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx);
599		let chan_update_b = get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx);
600		let chan_update_c = get_signed_channel_update(|msg| {
601			msg.channel_flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
602
603		(valid_announcement, chain_source, network_graph, good_script, node_a_announce,
604			node_b_announce, chan_update_a, chan_update_b, chan_update_c)
605	}
606
607	#[test]
608	fn test_fast_async_lookup() {
609		// Check that async lookups which resolve quicker than the future is returned to the
610		// `get_utxo` call can read it still resolve properly.
611		let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
612
613		let future = UtxoFuture::new();
614		future.resolve_without_forwarding(&network_graph,
615			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
616		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
617
618		network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
619		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
620	}
621
622	#[test]
623	fn test_async_lookup() {
624		// Test a simple async lookup
625		let (valid_announcement, chain_source, network_graph, good_script,
626			node_a_announce, node_b_announce, ..) = get_test_objects();
627
628		let future = UtxoFuture::new();
629		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
630
631		assert_eq!(
632			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
633			"Channel being checked async");
634		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
635
636		future.resolve_without_forwarding(&network_graph,
637			Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script }));
638		network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
639		network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
640
641		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
642			.unwrap().announcement_info.is_none());
643
644		network_graph.update_node_from_announcement(&node_a_announce).unwrap();
645		network_graph.update_node_from_announcement(&node_b_announce).unwrap();
646
647		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
648			.unwrap().announcement_info.is_some());
649	}
650
651	#[test]
652	fn test_invalid_async_lookup() {
653		// Test an async lookup which returns an incorrect script
654		let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
655
656		let future = UtxoFuture::new();
657		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
658
659		assert_eq!(
660			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
661			"Channel being checked async");
662		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
663
664		future.resolve_without_forwarding(&network_graph,
665			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() }));
666		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
667	}
668
669	#[test]
670	fn test_failing_async_lookup() {
671		// Test an async lookup which returns an error
672		let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
673
674		let future = UtxoFuture::new();
675		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
676
677		assert_eq!(
678			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
679			"Channel being checked async");
680		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
681
682		future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
683		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
684	}
685
686	#[test]
687	fn test_updates_async_lookup() {
688		// Test async lookups will process pending channel_update/node_announcements once they
689		// complete.
690		let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
691			node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
692
693		let future = UtxoFuture::new();
694		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
695
696		assert_eq!(
697			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
698			"Channel being checked async");
699		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
700
701		assert_eq!(
702			network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
703			"Awaiting channel_announcement validation to accept node_announcement");
704		assert_eq!(
705			network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
706			"Awaiting channel_announcement validation to accept node_announcement");
707
708		assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
709			"Awaiting channel_announcement validation to accept channel_update");
710		assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
711			"Awaiting channel_announcement validation to accept channel_update");
712
713		future.resolve_without_forwarding(&network_graph,
714			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
715
716		assert!(network_graph.read_only().channels()
717			.get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
718		assert!(network_graph.read_only().channels()
719			.get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
720
721		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
722			.unwrap().announcement_info.is_some());
723		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
724			.unwrap().announcement_info.is_some());
725	}
726
727	#[test]
728	fn test_latest_update_async_lookup() {
729		// Test async lookups will process the latest channel_update if two are received while
730		// awaiting an async UTXO lookup.
731		let (valid_announcement, chain_source, network_graph, good_script, _,
732			_, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
733
734		let future = UtxoFuture::new();
735		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
736
737		assert_eq!(
738			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
739			"Channel being checked async");
740		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
741
742		assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
743			"Awaiting channel_announcement validation to accept channel_update");
744		assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
745			"Awaiting channel_announcement validation to accept channel_update");
746		assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
747			"Awaiting channel_announcement validation to accept channel_update");
748
749		future.resolve_without_forwarding(&network_graph,
750			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
751
752		assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
753		let graph_lock = network_graph.read_only();
754		assert!(graph_lock.channels()
755				.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
756				.one_to_two.as_ref().unwrap().last_update !=
757			graph_lock.channels()
758				.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
759				.two_to_one.as_ref().unwrap().last_update);
760	}
761
762	#[test]
763	fn test_no_double_lookups() {
764		// Test that a pending async lookup will prevent a second async lookup from flying, but
765		// only if the channel_announcement message is identical.
766		let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
767
768		let future = UtxoFuture::new();
769		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
770
771		assert_eq!(
772			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
773			"Channel being checked async");
774		assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
775
776		// If we make a second request with the same message, the call count doesn't increase...
777		let future_b = UtxoFuture::new();
778		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
779		assert_eq!(
780			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
781			"Channel announcement is already being checked");
782		assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
783
784		// But if we make a third request with a tweaked message, we should get a second call
785		// against our new future...
786		let secp_ctx = Secp256k1::new();
787		let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
788		let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
789		let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
790		assert_eq!(
791			network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
792			"Channel being checked async");
793		assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
794
795		// Still, if we resolve the original future, the original channel will be accepted.
796		future.resolve_without_forwarding(&network_graph,
797			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
798		assert!(!network_graph.read_only().channels()
799			.get(&valid_announcement.contents.short_channel_id).unwrap()
800			.announcement_message.as_ref().unwrap()
801			.contents.features.supports_unknown_test_feature());
802	}
803
804	#[test]
805	fn test_checks_backpressure() {
806		// Test that too_many_checks_pending returns true when there are many checks pending, and
807		// returns false once they complete.
808		let secp_ctx = Secp256k1::new();
809		let (chain_source, network_graph) = get_network();
810
811		// We cheat and use a single future for all the lookups to complete them all at once.
812		let future = UtxoFuture::new();
813		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
814
815		let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
816		let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
817
818		for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
819			let valid_announcement = get_signed_channel_announcement(
820				|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
821			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
822			assert!(!network_graph.pending_checks.too_many_checks_pending());
823		}
824
825		let valid_announcement = get_signed_channel_announcement(
826			|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
827		network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
828		assert!(network_graph.pending_checks.too_many_checks_pending());
829
830		// Once the future completes the "too many checks" flag should reset.
831		future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
832		assert!(!network_graph.pending_checks.too_many_checks_pending());
833	}
834
835	#[test]
836	fn test_checks_backpressure_drop() {
837		// Test that too_many_checks_pending returns true when there are many checks pending, and
838		// returns false if we drop some of the futures without completion.
839		let secp_ctx = Secp256k1::new();
840		let (chain_source, network_graph) = get_network();
841
842		// We cheat and use a single future for all the lookups to complete them all at once.
843		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
844
845		let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
846		let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
847
848		for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
849			let valid_announcement = get_signed_channel_announcement(
850				|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
851			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
852			assert!(!network_graph.pending_checks.too_many_checks_pending());
853		}
854
855		let valid_announcement = get_signed_channel_announcement(
856			|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
857		network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
858		assert!(network_graph.pending_checks.too_many_checks_pending());
859
860		// Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
861		// should reset to false.
862		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
863		assert!(!network_graph.pending_checks.too_many_checks_pending());
864	}
865}