1use bitcoin::TxOut;
17use bitcoin::amount::Amount;
18use bitcoin::constants::ChainHash;
19
20use bitcoin::hex::DisplayHex;
21
22use crate::events::MessageSendEvent;
23use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
24use crate::ln::msgs::{self, LightningError, ErrorAction};
25use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
26use crate::util::logger::{Level, Logger};
27
28use crate::prelude::*;
29
30use alloc::sync::{Arc, Weak};
31use crate::sync::{Mutex, LockTestExt};
32use core::ops::Deref;
33
34#[derive(Clone, Debug)]
36pub enum UtxoLookupError {
37 UnknownChain,
39
40 UnknownTx,
42}
43
44#[derive(Clone)]
48pub enum UtxoResult {
49 Sync(Result<TxOut, UtxoLookupError>),
52 Async(UtxoFuture),
60}
61
62pub trait UtxoLookup {
64 fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
70}
71
72enum ChannelAnnouncement {
73 Full(msgs::ChannelAnnouncement),
74 Unsigned(msgs::UnsignedChannelAnnouncement),
75}
76impl ChannelAnnouncement {
77 fn node_id_1(&self) -> &NodeId {
78 match self {
79 ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
80 ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
81 }
82 }
83}
84
85enum NodeAnnouncement {
86 Full(msgs::NodeAnnouncement),
87 Unsigned(msgs::UnsignedNodeAnnouncement),
88}
89impl NodeAnnouncement {
90 fn timestamp(&self) -> u32 {
91 match self {
92 NodeAnnouncement::Full(msg) => msg.contents.timestamp,
93 NodeAnnouncement::Unsigned(msg) => msg.timestamp,
94 }
95 }
96}
97
98enum ChannelUpdate {
99 Full(msgs::ChannelUpdate),
100 Unsigned(msgs::UnsignedChannelUpdate),
101}
102impl ChannelUpdate {
103 fn timestamp(&self) -> u32 {
104 match self {
105 ChannelUpdate::Full(msg) => msg.contents.timestamp,
106 ChannelUpdate::Unsigned(msg) => msg.timestamp,
107 }
108 }
109}
110
111struct UtxoMessages {
112 complete: Option<Result<TxOut, UtxoLookupError>>,
113 channel_announce: Option<ChannelAnnouncement>,
114 latest_node_announce_a: Option<NodeAnnouncement>,
115 latest_node_announce_b: Option<NodeAnnouncement>,
116 latest_channel_update_a: Option<ChannelUpdate>,
117 latest_channel_update_b: Option<ChannelUpdate>,
118}
119
120#[derive(Clone)]
124pub struct UtxoFuture {
125 state: Arc<Mutex<UtxoMessages>>,
126}
127
128pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
131impl UtxoLookup for UtxoResolver {
132 fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
133 UtxoResult::Sync(self.0.clone())
134 }
135}
136
137impl UtxoFuture {
138 pub fn new() -> Self {
140 Self { state: Arc::new(Mutex::new(UtxoMessages {
141 complete: None,
142 channel_announce: None,
143 latest_node_announce_a: None,
144 latest_node_announce_b: None,
145 latest_channel_update_a: None,
146 latest_channel_update_b: None,
147 }))}
148 }
149
150 pub fn resolve_without_forwarding<L: Deref>(&self,
162 graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
163 where L::Target: Logger {
164 self.do_resolve(graph, result);
165 }
166
167 pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
179 graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
180 ) where L::Target: Logger, U::Target: UtxoLookup {
181 let mut res = self.do_resolve(graph, result);
182 for msg_opt in res.iter_mut() {
183 if let Some(msg) = msg_opt.take() {
184 gossip.forward_gossip_msg(msg);
185 }
186 }
187 }
188
189 fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
190 -> [Option<MessageSendEvent>; 5] where L::Target: Logger {
191 let (announcement, node_a, node_b, update_a, update_b) = {
192 let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
193 let mut async_messages = self.state.lock().unwrap();
194
195 if async_messages.channel_announce.is_none() {
196 async_messages.complete = Some(result);
200 return [None, None, None, None, None];
201 }
202
203 let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
204 ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
205 ChannelAnnouncement::Unsigned(msg) => &msg,
206 };
207
208 pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
209
210 (async_messages.channel_announce.take().unwrap(),
211 async_messages.latest_node_announce_a.take(),
212 async_messages.latest_node_announce_b.take(),
213 async_messages.latest_channel_update_a.take(),
214 async_messages.latest_channel_update_b.take())
215 };
216
217 let mut res = [None, None, None, None, None];
218 let mut res_idx = 0;
219
220 let resolver = UtxoResolver(result);
225 match announcement {
226 ChannelAnnouncement::Full(signed_msg) => {
227 if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
228 res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
229 msg: signed_msg, update_msg: None,
230 });
231 res_idx += 1;
232 }
233 },
234 ChannelAnnouncement::Unsigned(msg) => {
235 let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
236 },
237 }
238
239 for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
240 match announce {
241 Some(NodeAnnouncement::Full(signed_msg)) => {
242 if graph.update_node_from_announcement(&signed_msg).is_ok() {
243 res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
244 msg: signed_msg,
245 });
246 res_idx += 1;
247 }
248 },
249 Some(NodeAnnouncement::Unsigned(msg)) => {
250 let _ = graph.update_node_from_unsigned_announcement(&msg);
251 },
252 None => {},
253 }
254 }
255
256 for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
257 match update {
258 Some(ChannelUpdate::Full(signed_msg)) => {
259 if graph.update_channel(&signed_msg).is_ok() {
260 res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
261 msg: signed_msg,
262 });
263 res_idx += 1;
264 }
265 },
266 Some(ChannelUpdate::Unsigned(msg)) => {
267 let _ = graph.update_channel_unsigned(&msg);
268 },
269 None => {},
270 }
271 }
272
273 res
274 }
275}
276
277struct PendingChecksContext {
278 channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
279 nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
280}
281
282impl PendingChecksContext {
283 fn lookup_completed(&mut self,
284 msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
285 ) {
286 if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
287 if Weak::ptr_eq(e.get(), &completed_state) {
288 e.remove();
289 }
290 }
291
292 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
293 e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
294 if e.get().is_empty() { e.remove(); }
295 }
296 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
297 e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
298 if e.get().is_empty() { e.remove(); }
299 }
300 }
301}
302
303pub(super) struct PendingChecks {
305 internal: Mutex<PendingChecksContext>,
306}
307
308impl PendingChecks {
309 pub(super) fn new() -> Self {
310 PendingChecks { internal: Mutex::new(PendingChecksContext {
311 channels: new_hash_map(), nodes: new_hash_map(),
312 }) }
313 }
314
315 pub(super) fn check_hold_pending_channel_update(
318 &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
319 ) -> Result<(), LightningError> {
320 let mut pending_checks = self.internal.lock().unwrap();
321 if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
322 let is_from_a = (msg.channel_flags & 1) == 1;
323 match Weak::upgrade(e.get()) {
324 Some(msgs_ref) => {
325 let mut messages = msgs_ref.lock().unwrap();
326 let latest_update = if is_from_a {
327 &mut messages.latest_channel_update_a
328 } else {
329 &mut messages.latest_channel_update_b
330 };
331 if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
332 *latest_update = Some(
337 if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
338 else { ChannelUpdate::Unsigned(msg.clone()) });
339 }
340 return Err(LightningError {
341 err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
342 action: ErrorAction::IgnoreAndLog(Level::Gossip),
343 });
344 },
345 None => { e.remove(); },
346 }
347 }
348 Ok(())
349 }
350
351 pub(super) fn check_hold_pending_node_announcement(
354 &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
355 ) -> Result<(), LightningError> {
356 let mut pending_checks = self.internal.lock().unwrap();
357 if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
358 let mut found_at_least_one_chan = false;
359 e.get_mut().retain(|node_msgs| {
360 match Weak::upgrade(&node_msgs) {
361 Some(chan_mtx) => {
362 let mut chan_msgs = chan_mtx.lock().unwrap();
363 if let Some(chan_announce) = &chan_msgs.channel_announce {
364 let latest_announce =
365 if *chan_announce.node_id_1() == msg.node_id {
366 &mut chan_msgs.latest_node_announce_a
367 } else {
368 &mut chan_msgs.latest_node_announce_b
369 };
370 if latest_announce.is_none() ||
371 latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
372 {
373 *latest_announce = Some(
374 if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
375 else { NodeAnnouncement::Unsigned(msg.clone()) });
376 }
377 found_at_least_one_chan = true;
378 true
379 } else {
380 debug_assert!(false, "channel_announce is set before struct is added to node map");
381 false
382 }
383 },
384 None => false,
385 }
386 });
387 if e.get().is_empty() { e.remove(); }
388 if found_at_least_one_chan {
389 return Err(LightningError {
390 err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
391 action: ErrorAction::IgnoreAndLog(Level::Gossip),
392 });
393 }
394 }
395 Ok(())
396 }
397
398 fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
399 full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
400 pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
401 ) -> Result<(), msgs::LightningError> {
402 match pending_channels.entry(msg.short_channel_id) {
403 hash_map::Entry::Occupied(mut e) => {
404 match Weak::upgrade(&e.get()) {
408 Some(pending_msgs) => {
409 let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
413 Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
414 Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
415 None => {
416 debug_assert!(false);
421 false
422 },
423 };
424 if pending_matches {
425 return Err(LightningError {
426 err: "Channel announcement is already being checked".to_owned(),
427 action: ErrorAction::IgnoreDuplicateGossip,
428 });
429 } else {
430 if let Some(item) = replacement {
436 *e.get_mut() = item;
437 }
438 }
439 },
440 None => {
441 if let Some(item) = replacement {
444 *e.get_mut() = item;
445 } else { e.remove(); }
446 },
447 }
448 },
449 hash_map::Entry::Vacant(v) => {
450 if let Some(item) = replacement { v.insert(item); }
451 },
452 }
453 Ok(())
454 }
455
456 pub(super) fn check_channel_announcement<U: Deref>(&self,
457 utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
458 full_msg: Option<&msgs::ChannelAnnouncement>
459 ) -> Result<Option<Amount>, msgs::LightningError> where U::Target: UtxoLookup {
460 let handle_result = |res| {
461 match res {
462 Ok(TxOut { value, script_pubkey }) => {
463 let expected_script =
464 make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_p2wsh();
465 if script_pubkey != expected_script {
466 return Err(LightningError{
467 err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
468 expected_script.to_hex_string(), script_pubkey.to_hex_string()),
469 action: ErrorAction::IgnoreError
470 });
471 }
472 Ok(Some(value))
473 },
474 Err(UtxoLookupError::UnknownChain) => {
475 Err(LightningError {
476 err: format!("Channel announced on an unknown chain ({})",
477 msg.chain_hash.to_bytes().as_hex()),
478 action: ErrorAction::IgnoreError
479 })
480 },
481 Err(UtxoLookupError::UnknownTx) => {
482 Err(LightningError {
483 err: "Channel announced without corresponding UTXO entry".to_owned(),
484 action: ErrorAction::IgnoreError
485 })
486 },
487 }
488 };
489
490 Self::check_replace_previous_entry(msg, full_msg, None,
491 &mut self.internal.lock().unwrap().channels)?;
492
493 match utxo_lookup {
494 &None => {
495 Ok(None)
497 },
498 &Some(ref utxo_lookup) => {
499 match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
500 UtxoResult::Sync(res) => handle_result(res),
501 UtxoResult::Async(future) => {
502 let mut pending_checks = self.internal.lock().unwrap();
503 let mut async_messages = future.state.lock().unwrap();
504 if let Some(res) = async_messages.complete.take() {
505 handle_result(res)
508 } else {
509 Self::check_replace_previous_entry(msg, full_msg,
510 Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
511 async_messages.channel_announce = Some(
512 if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
513 else { ChannelAnnouncement::Unsigned(msg.clone()) });
514 pending_checks.nodes.entry(msg.node_id_1)
515 .or_default().push(Arc::downgrade(&future.state));
516 pending_checks.nodes.entry(msg.node_id_2)
517 .or_default().push(Arc::downgrade(&future.state));
518 Err(LightningError {
519 err: "Channel being checked async".to_owned(),
520 action: ErrorAction::IgnoreAndLog(Level::Gossip),
521 })
522 }
523 },
524 }
525 }
526 }
527 }
528
529 const MAX_PENDING_LOOKUPS: usize = 32;
538
539 pub(super) fn too_many_checks_pending(&self) -> bool {
543 let mut pending_checks = self.internal.lock().unwrap();
544 if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
545 pending_checks.channels.retain(|_, chan| {
549 Weak::upgrade(&chan).is_some()
550 });
551 pending_checks.nodes.retain(|_, channels| {
552 channels.retain(|chan| Weak::upgrade(&chan).is_some());
553 !channels.is_empty()
554 });
555 pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
556 } else {
557 false
558 }
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use crate::routing::gossip::tests::*;
566 use crate::util::test_utils::{TestChainSource, TestLogger};
567
568 use bitcoin::amount::Amount;
569 use bitcoin::secp256k1::{Secp256k1, SecretKey};
570
571 use core::sync::atomic::Ordering;
572
573 fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
574 let logger = Box::new(TestLogger::new());
575 let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
576 let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
577
578 (chain_source, network_graph)
579 }
580
581 fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
582 NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
583 msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
584 {
585 let secp_ctx = Secp256k1::new();
586
587 let (chain_source, network_graph) = get_network();
588
589 let good_script = get_channel_script(&secp_ctx);
590 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
591 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
592 let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
593
594 let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
595 let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
596
597 let chan_update_a = get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx);
599 let chan_update_b = get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx);
600 let chan_update_c = get_signed_channel_update(|msg| {
601 msg.channel_flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
602
603 (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
604 node_b_announce, chan_update_a, chan_update_b, chan_update_c)
605 }
606
607 #[test]
608 fn test_fast_async_lookup() {
609 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
612
613 let future = UtxoFuture::new();
614 future.resolve_without_forwarding(&network_graph,
615 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
616 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
617
618 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
619 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
620 }
621
622 #[test]
623 fn test_async_lookup() {
624 let (valid_announcement, chain_source, network_graph, good_script,
626 node_a_announce, node_b_announce, ..) = get_test_objects();
627
628 let future = UtxoFuture::new();
629 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
630
631 assert_eq!(
632 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
633 "Channel being checked async");
634 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
635
636 future.resolve_without_forwarding(&network_graph,
637 Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script }));
638 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
639 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
640
641 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
642 .unwrap().announcement_info.is_none());
643
644 network_graph.update_node_from_announcement(&node_a_announce).unwrap();
645 network_graph.update_node_from_announcement(&node_b_announce).unwrap();
646
647 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
648 .unwrap().announcement_info.is_some());
649 }
650
651 #[test]
652 fn test_invalid_async_lookup() {
653 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
655
656 let future = UtxoFuture::new();
657 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
658
659 assert_eq!(
660 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
661 "Channel being checked async");
662 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
663
664 future.resolve_without_forwarding(&network_graph,
665 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() }));
666 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
667 }
668
669 #[test]
670 fn test_failing_async_lookup() {
671 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
673
674 let future = UtxoFuture::new();
675 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
676
677 assert_eq!(
678 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
679 "Channel being checked async");
680 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
681
682 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
683 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
684 }
685
686 #[test]
687 fn test_updates_async_lookup() {
688 let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
691 node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
692
693 let future = UtxoFuture::new();
694 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
695
696 assert_eq!(
697 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
698 "Channel being checked async");
699 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
700
701 assert_eq!(
702 network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
703 "Awaiting channel_announcement validation to accept node_announcement");
704 assert_eq!(
705 network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
706 "Awaiting channel_announcement validation to accept node_announcement");
707
708 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
709 "Awaiting channel_announcement validation to accept channel_update");
710 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
711 "Awaiting channel_announcement validation to accept channel_update");
712
713 future.resolve_without_forwarding(&network_graph,
714 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
715
716 assert!(network_graph.read_only().channels()
717 .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
718 assert!(network_graph.read_only().channels()
719 .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
720
721 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
722 .unwrap().announcement_info.is_some());
723 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
724 .unwrap().announcement_info.is_some());
725 }
726
727 #[test]
728 fn test_latest_update_async_lookup() {
729 let (valid_announcement, chain_source, network_graph, good_script, _,
732 _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
733
734 let future = UtxoFuture::new();
735 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
736
737 assert_eq!(
738 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
739 "Channel being checked async");
740 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
741
742 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
743 "Awaiting channel_announcement validation to accept channel_update");
744 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
745 "Awaiting channel_announcement validation to accept channel_update");
746 assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
747 "Awaiting channel_announcement validation to accept channel_update");
748
749 future.resolve_without_forwarding(&network_graph,
750 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
751
752 assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
753 let graph_lock = network_graph.read_only();
754 assert!(graph_lock.channels()
755 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
756 .one_to_two.as_ref().unwrap().last_update !=
757 graph_lock.channels()
758 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
759 .two_to_one.as_ref().unwrap().last_update);
760 }
761
762 #[test]
763 fn test_no_double_lookups() {
764 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
767
768 let future = UtxoFuture::new();
769 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
770
771 assert_eq!(
772 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
773 "Channel being checked async");
774 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
775
776 let future_b = UtxoFuture::new();
778 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
779 assert_eq!(
780 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
781 "Channel announcement is already being checked");
782 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
783
784 let secp_ctx = Secp256k1::new();
787 let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
788 let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
789 let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
790 assert_eq!(
791 network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
792 "Channel being checked async");
793 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
794
795 future.resolve_without_forwarding(&network_graph,
797 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
798 assert!(!network_graph.read_only().channels()
799 .get(&valid_announcement.contents.short_channel_id).unwrap()
800 .announcement_message.as_ref().unwrap()
801 .contents.features.supports_unknown_test_feature());
802 }
803
804 #[test]
805 fn test_checks_backpressure() {
806 let secp_ctx = Secp256k1::new();
809 let (chain_source, network_graph) = get_network();
810
811 let future = UtxoFuture::new();
813 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
814
815 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
816 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
817
818 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
819 let valid_announcement = get_signed_channel_announcement(
820 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
821 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
822 assert!(!network_graph.pending_checks.too_many_checks_pending());
823 }
824
825 let valid_announcement = get_signed_channel_announcement(
826 |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
827 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
828 assert!(network_graph.pending_checks.too_many_checks_pending());
829
830 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
832 assert!(!network_graph.pending_checks.too_many_checks_pending());
833 }
834
835 #[test]
836 fn test_checks_backpressure_drop() {
837 let secp_ctx = Secp256k1::new();
840 let (chain_source, network_graph) = get_network();
841
842 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
844
845 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
846 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
847
848 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
849 let valid_announcement = get_signed_channel_announcement(
850 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
851 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
852 assert!(!network_graph.pending_checks.too_many_checks_pending());
853 }
854
855 let valid_announcement = get_signed_channel_announcement(
856 |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
857 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
858 assert!(network_graph.pending_checks.too_many_checks_pending());
859
860 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
863 assert!(!network_graph.pending_checks.too_many_checks_pending());
864 }
865}