lightning/util/
wakers.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Utilities which allow users to block on some future notification from LDK. These are
11//! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to
12//! be re-persisted.
13//!
14//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15
16use crate::sync::Mutex;
17use alloc::sync::Arc;
18use core::mem;
19
20#[allow(unused_imports)]
21use crate::prelude::*;
22
23#[cfg(feature = "std")]
24use crate::sync::Condvar;
25#[cfg(feature = "std")]
26use std::time::Duration;
27
28use core::future::Future as StdFuture;
29use core::pin::Pin;
30use core::task::{Context, Poll};
31
32/// Used to signal to one of many waiters that the condition they're waiting on has happened.
33///
34/// This is usually used by LDK objects such as [`ChannelManager`] or [`PeerManager`] to signal to
35/// the background processor that it should wake up and process pending events.
36///
37/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
38/// [`PeerManager`]: crate::ln::peer_handler::PeerManager
39pub struct Notifier {
40	notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
41}
42
43impl Notifier {
44	/// Constructs a new notifier.
45	pub fn new() -> Self {
46		Self { notify_pending: Mutex::new((false, None)) }
47	}
48
49	/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
50	///
51	/// We deem the notification successful either directly after any callbacks were made, or after
52	/// the user [`poll`]ed a previously-generated [`Future`].
53	///
54	/// [`poll`]: core::future::Future::poll
55	pub fn notify(&self) {
56		let mut lock = self.notify_pending.lock().unwrap();
57		if let Some(future_state) = &lock.1 {
58			if complete_future(future_state) {
59				lock.1 = None;
60				return;
61			}
62		}
63		lock.0 = true;
64	}
65
66	/// Gets a [`Future`] that will get woken up with any waiters
67	pub fn get_future(&self) -> Future {
68		let mut lock = self.notify_pending.lock().unwrap();
69		let mut self_idx = 0;
70		if let Some(existing_state) = &lock.1 {
71			let mut locked = existing_state.lock().unwrap();
72			if locked.callbacks_made {
73				// If the existing `FutureState` has completed and actually made callbacks,
74				// consider the notification flag to have been cleared and reset the future state.
75				mem::drop(locked);
76				lock.1.take();
77				lock.0 = false;
78			} else {
79				self_idx = locked.next_idx;
80				locked.next_idx += 1;
81			}
82		}
83		if let Some(existing_state) = &lock.1 {
84			Future { state: Arc::clone(&existing_state), self_idx }
85		} else {
86			let state = Arc::new(Mutex::new(FutureState {
87				callbacks: Vec::new(),
88				std_future_callbacks: Vec::new(),
89				callbacks_with_state: Vec::new(),
90				complete: lock.0,
91				callbacks_made: false,
92				next_idx: 1,
93			}));
94			lock.1 = Some(Arc::clone(&state));
95			Future { state, self_idx: 0 }
96		}
97	}
98
99	#[cfg(any(test, feature = "_test_utils"))]
100	pub fn notify_pending(&self) -> bool {
101		self.notify_pending.lock().unwrap().0
102	}
103}
104
105macro_rules! define_callback { ($($bounds: path),*) => {
106/// A callback which is called when a [`Future`] completes.
107///
108/// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
109/// taken later.
110#[cfg_attr(feature = "std", doc = "Rust users should use the [`std::future::Future`] implementation for [`Future`] instead.")]
111#[cfg_attr(feature = "std", doc = "")]
112#[cfg_attr(feature = "std", doc = "Note that the [`std::future::Future`] implementation may only work for runtimes which schedule futures when they receive a wake, rather than immediately executing them.")]
113pub trait FutureCallback : $($bounds +)* {
114	/// The method which is called.
115	fn call(&self);
116}
117
118impl<F: Fn() $(+ $bounds)*> FutureCallback for F {
119	fn call(&self) { (self)(); }
120}
121} }
122
123#[cfg(feature = "std")]
124define_callback!(Send);
125#[cfg(not(feature = "std"))]
126define_callback!();
127
128pub(crate) struct FutureState {
129	// `callbacks` count as having woken the users' code (as they go direct to the user), but
130	// `std_future_callbacks` and `callbacks_with_state` do not (as the first just wakes a future,
131	// we only count it after another `poll()` and the second wakes a `Sleeper` which handles
132	// setting `callbacks_made` itself).
133	callbacks: Vec<Box<dyn FutureCallback>>,
134	std_future_callbacks: Vec<(usize, StdWaker)>,
135	callbacks_with_state: Vec<Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>>,
136	complete: bool,
137	callbacks_made: bool,
138	next_idx: usize,
139}
140
141fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
142	let mut state_lock = this.lock().unwrap();
143	let state = &mut *state_lock;
144	for callback in state.callbacks.drain(..) {
145		callback.call();
146		state.callbacks_made = true;
147	}
148	for (_, waker) in state.std_future_callbacks.drain(..) {
149		waker.0.wake();
150	}
151	for callback in state.callbacks_with_state.drain(..) {
152		(callback)(this);
153	}
154	state.complete = true;
155	state.callbacks_made
156}
157
158/// A simple future which can complete once, and calls some callback(s) when it does so.
159pub struct Future {
160	state: Arc<Mutex<FutureState>>,
161	self_idx: usize,
162}
163
164impl Future {
165	/// Registers a callback to be called upon completion of this future. If the future has already
166	/// completed, the callback will be called immediately.
167	///
168	/// This is not exported to bindings users, use the bindings-only `register_callback_fn` instead
169	pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
170		let mut state = self.state.lock().unwrap();
171		if state.complete {
172			state.callbacks_made = true;
173			mem::drop(state);
174			callback.call();
175		} else {
176			state.callbacks.push(callback);
177		}
178	}
179
180	// C bindings don't (currently) know how to map `Box<dyn Trait>`, and while it could add the
181	// following wrapper, doing it in the bindings is currently much more work than simply doing it
182	// here.
183	/// Registers a callback to be called upon completion of this future. If the future has already
184	/// completed, the callback will be called immediately.
185	#[cfg(c_bindings)]
186	pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
187		self.register_callback(Box::new(callback));
188	}
189
190	/// Waits until this [`Future`] completes.
191	#[cfg(feature = "std")]
192	pub fn wait(&self) {
193		Sleeper::from_single_future(&self).wait();
194	}
195
196	/// Waits until this [`Future`] completes or the given amount of time has elapsed.
197	///
198	/// Returns true if the [`Future`] completed, false if the time elapsed.
199	#[cfg(feature = "std")]
200	pub fn wait_timeout(&self, max_wait: Duration) -> bool {
201		Sleeper::from_single_future(&self).wait_timeout(max_wait)
202	}
203
204	#[cfg(test)]
205	pub fn poll_is_complete(&self) -> bool {
206		let mut state = self.state.lock().unwrap();
207		if state.complete {
208			state.callbacks_made = true;
209			true
210		} else {
211			false
212		}
213	}
214}
215
216impl Drop for Future {
217	fn drop(&mut self) {
218		self.state.lock().unwrap().std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
219	}
220}
221
222use core::task::Waker;
223struct StdWaker(pub Waker);
224
225/// This is not exported to bindings users as Rust Futures aren't usable in language bindings.
226impl<'a> StdFuture for Future {
227	type Output = ();
228
229	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230		let mut state = self.state.lock().unwrap();
231		if state.complete {
232			state.callbacks_made = true;
233			Poll::Ready(())
234		} else {
235			let waker = cx.waker().clone();
236			state.std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
237			state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
238			Poll::Pending
239		}
240	}
241}
242
243/// A struct which can be used to select across many [`Future`]s at once without relying on a full
244/// async context.
245#[cfg(feature = "std")]
246pub struct Sleeper {
247	notifiers: Vec<Arc<Mutex<FutureState>>>,
248}
249
250#[cfg(feature = "std")]
251impl Sleeper {
252	/// Constructs a new sleeper from one future, allowing blocking on it.
253	pub fn from_single_future(future: &Future) -> Self {
254		Self { notifiers: vec![Arc::clone(&future.state)] }
255	}
256	/// Constructs a new sleeper from two futures, allowing blocking on both at once.
257	pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
258		Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
259	}
260	/// Constructs a new sleeper from three futures, allowing blocking on all three at once.
261	///
262	// Note that this is the common case - a ChannelManager, a ChainMonitor, and an
263	// OnionMessenger.
264	pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
265		let notifiers =
266			vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state), Arc::clone(&fut_c.state)];
267		Self { notifiers }
268	}
269	/// Constructs a new sleeper from four futures, allowing blocking on all four at once.
270	///
271	// Note that this is another common case - a ChannelManager, a ChainMonitor, an
272	// OnionMessenger, and a LiquidityManager.
273	pub fn from_four_futures(
274		fut_a: &Future, fut_b: &Future, fut_c: &Future, fut_d: &Future,
275	) -> Self {
276		let notifiers = vec![
277			Arc::clone(&fut_a.state),
278			Arc::clone(&fut_b.state),
279			Arc::clone(&fut_c.state),
280			Arc::clone(&fut_d.state),
281		];
282		Self { notifiers }
283	}
284	/// Constructs a new sleeper on many futures, allowing blocking on all at once.
285	pub fn new(futures: Vec<Future>) -> Self {
286		Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
287	}
288	/// Prepares to go into a wait loop body, creating a condition variable which we can block on
289	/// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
290	/// condition variable being woken.
291	fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
292		let cv = Arc::new(Condvar::new());
293		let notified_fut_mtx = Arc::new(Mutex::new(None));
294		{
295			for notifier_mtx in self.notifiers.iter() {
296				let cv_ref = Arc::clone(&cv);
297				let notified_fut_ref = Arc::clone(&notified_fut_mtx);
298				let mut notifier = notifier_mtx.lock().unwrap();
299				if notifier.complete {
300					*notified_fut_mtx.lock().unwrap() = Some(Arc::clone(&notifier_mtx));
301					break;
302				}
303				notifier.callbacks_with_state.push(Box::new(move |notifier_ref| {
304					*notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
305					cv_ref.notify_all();
306				}));
307			}
308		}
309		(cv, notified_fut_mtx)
310	}
311
312	/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
313	pub fn wait(&self) {
314		let (cv, notified_fut_mtx) = self.setup_wait();
315		let notified_fut = cv
316			.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
317			.unwrap()
318			.take()
319			.expect("CV wait shouldn't have returned until the notifying future was set");
320		notified_fut.lock().unwrap().callbacks_made = true;
321	}
322
323	/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
324	/// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
325	/// elapsed.
326	pub fn wait_timeout(&self, max_wait: Duration) -> bool {
327		let (cv, notified_fut_mtx) = self.setup_wait();
328		let notified_fut =
329			match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| {
330				fut_opt.is_none()
331			}) {
332				Ok((_, e)) if e.timed_out() => return false,
333				Ok((mut notified_fut, _)) => notified_fut
334					.take()
335					.expect("CV wait shouldn't have returned until the notifying future was set"),
336				Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
337			};
338		notified_fut.lock().unwrap().callbacks_made = true;
339		true
340	}
341}
342
343#[cfg(test)]
344mod tests {
345	use super::*;
346	use core::future::Future as FutureTrait;
347	use core::sync::atomic::{AtomicBool, Ordering};
348	use core::task::{RawWaker, RawWakerVTable};
349
350	#[test]
351	fn notifier_pre_notified_future() {
352		// Previously, if we generated a future after a `Notifier` had been notified, the future
353		// would never complete. This tests this behavior, ensuring the future instead completes
354		// immediately.
355		let notifier = Notifier::new();
356		notifier.notify();
357
358		let callback = Arc::new(AtomicBool::new(false));
359		let callback_ref = Arc::clone(&callback);
360		notifier.get_future().register_callback(Box::new(move || {
361			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
362		}));
363		assert!(callback.load(Ordering::SeqCst));
364	}
365
366	#[test]
367	fn notifier_future_completes_wake() {
368		// Previously, if we were only using the `Future` interface to learn when a `Notifier` has
369		// been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
370		// `lightning-background-processor` to persist in a tight loop.
371		let notifier = Notifier::new();
372
373		// First check the simple case, ensuring if we get notified a new future isn't woken until
374		// a second `notify`.
375		let callback = Arc::new(AtomicBool::new(false));
376		let callback_ref = Arc::clone(&callback);
377		notifier.get_future().register_callback(Box::new(move || {
378			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
379		}));
380		assert!(!callback.load(Ordering::SeqCst));
381
382		notifier.notify();
383		assert!(callback.load(Ordering::SeqCst));
384
385		let callback = Arc::new(AtomicBool::new(false));
386		let callback_ref = Arc::clone(&callback);
387		notifier.get_future().register_callback(Box::new(move || {
388			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
389		}));
390		assert!(!callback.load(Ordering::SeqCst));
391
392		notifier.notify();
393		assert!(callback.load(Ordering::SeqCst));
394
395		// Then check the case where the future is fetched before the notification, but a callback
396		// is only registered after the `notify`, ensuring that it is still sufficient to ensure we
397		// don't get an instant-wake when we get a new future.
398		let future = notifier.get_future();
399		notifier.notify();
400
401		let callback = Arc::new(AtomicBool::new(false));
402		let callback_ref = Arc::clone(&callback);
403		future.register_callback(Box::new(move || {
404			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
405		}));
406		assert!(callback.load(Ordering::SeqCst));
407
408		let callback = Arc::new(AtomicBool::new(false));
409		let callback_ref = Arc::clone(&callback);
410		notifier.get_future().register_callback(Box::new(move || {
411			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
412		}));
413		assert!(!callback.load(Ordering::SeqCst));
414	}
415
416	#[test]
417	fn new_future_wipes_notify_bit() {
418		// Previously, if we were only using the `Future` interface to learn when a `Notifier` has
419		// been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is
420		// fetched after the notify bit has been set.
421		let notifier = Notifier::new();
422		notifier.notify();
423
424		let callback = Arc::new(AtomicBool::new(false));
425		let callback_ref = Arc::clone(&callback);
426		notifier.get_future().register_callback(Box::new(move || {
427			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
428		}));
429		assert!(callback.load(Ordering::SeqCst));
430
431		let callback = Arc::new(AtomicBool::new(false));
432		let callback_ref = Arc::clone(&callback);
433		notifier.get_future().register_callback(Box::new(move || {
434			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
435		}));
436		assert!(!callback.load(Ordering::SeqCst));
437
438		notifier.notify();
439		assert!(callback.load(Ordering::SeqCst));
440	}
441
442	#[cfg(feature = "std")]
443	#[test]
444	fn test_wait_timeout() {
445		use crate::sync::Arc;
446		use std::thread;
447
448		let persistence_notifier = Arc::new(Notifier::new());
449		let thread_notifier = Arc::clone(&persistence_notifier);
450
451		let exit_thread = Arc::new(AtomicBool::new(false));
452		let exit_thread_clone = Arc::clone(&exit_thread);
453		thread::spawn(move || loop {
454			thread_notifier.notify();
455			if exit_thread_clone.load(Ordering::SeqCst) {
456				break;
457			}
458		});
459
460		// Check that we can block indefinitely until updates are available.
461		persistence_notifier.get_future().wait();
462
463		// Check that the Notifier will return after the given duration if updates are
464		// available.
465		loop {
466			if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
467				break;
468			}
469		}
470
471		exit_thread.store(true, Ordering::SeqCst);
472
473		// Check that the Notifier will return after the given duration even if no updates
474		// are available.
475		loop {
476			if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
477				break;
478			}
479		}
480	}
481
482	#[cfg(feature = "std")]
483	#[test]
484	fn test_state_drops() {
485		// Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
486		// but after having been slept-on. This tests for that leak.
487		use crate::sync::Arc;
488		use std::thread;
489
490		let notifier_a = Arc::new(Notifier::new());
491		let notifier_b = Arc::new(Notifier::new());
492
493		let thread_notifier_a = Arc::clone(&notifier_a);
494
495		let future_a = notifier_a.get_future();
496		let future_state_a = Arc::downgrade(&future_a.state);
497
498		let future_b = notifier_b.get_future();
499		let future_state_b = Arc::downgrade(&future_b.state);
500
501		let join_handle = thread::spawn(move || {
502			// Let the other thread get to the wait point, then notify it.
503			std::thread::sleep(Duration::from_millis(50));
504			thread_notifier_a.notify();
505		});
506
507		// Wait on the other thread to finish its sleep, note that the leak only happened if we
508		// actually have to sleep here, not if we immediately return.
509		Sleeper::from_two_futures(&future_a, &future_b).wait();
510
511		join_handle.join().unwrap();
512
513		// then drop the notifiers and make sure the future states are gone.
514		mem::drop(notifier_a);
515		mem::drop(notifier_b);
516		mem::drop(future_a);
517		mem::drop(future_b);
518
519		assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
520	}
521
522	#[test]
523	fn test_future_callbacks() {
524		let future = Future {
525			state: Arc::new(Mutex::new(FutureState {
526				callbacks: Vec::new(),
527				std_future_callbacks: Vec::new(),
528				callbacks_with_state: Vec::new(),
529				complete: false,
530				callbacks_made: false,
531				next_idx: 1,
532			})),
533			self_idx: 0,
534		};
535		let callback = Arc::new(AtomicBool::new(false));
536		let callback_ref = Arc::clone(&callback);
537		future.register_callback(Box::new(move || {
538			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
539		}));
540
541		assert!(!callback.load(Ordering::SeqCst));
542		complete_future(&future.state);
543		assert!(callback.load(Ordering::SeqCst));
544		complete_future(&future.state);
545	}
546
547	#[test]
548	fn test_pre_completed_future_callbacks() {
549		let future = Future {
550			state: Arc::new(Mutex::new(FutureState {
551				callbacks: Vec::new(),
552				std_future_callbacks: Vec::new(),
553				callbacks_with_state: Vec::new(),
554				complete: false,
555				callbacks_made: false,
556				next_idx: 1,
557			})),
558			self_idx: 0,
559		};
560		complete_future(&future.state);
561
562		let callback = Arc::new(AtomicBool::new(false));
563		let callback_ref = Arc::clone(&callback);
564		future.register_callback(Box::new(move || {
565			assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
566		}));
567
568		assert!(callback.load(Ordering::SeqCst));
569		assert!(future.state.lock().unwrap().callbacks.is_empty());
570	}
571
572	// Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
573	// totally possible to construct from a trait implementation (though somewhat less efficient
574	// compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
575	// waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
576	const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
577	unsafe fn wake_by_ref(ptr: *const ()) {
578		let p = ptr as *const Arc<AtomicBool>;
579		assert!(!(*p).fetch_or(true, Ordering::SeqCst));
580	}
581	unsafe fn drop(ptr: *const ()) {
582		let p = ptr as *mut Arc<AtomicBool>;
583		let _freed = Box::from_raw(p);
584	}
585	unsafe fn wake(ptr: *const ()) {
586		wake_by_ref(ptr);
587		drop(ptr);
588	}
589	unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
590		let p = ptr as *const Arc<AtomicBool>;
591		RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
592	}
593
594	fn create_waker() -> (Arc<AtomicBool>, Waker) {
595		let a = Arc::new(AtomicBool::new(false));
596		let waker =
597			unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
598		(a, waker)
599	}
600
601	#[test]
602	fn test_future() {
603		let mut future = Future {
604			state: Arc::new(Mutex::new(FutureState {
605				callbacks: Vec::new(),
606				std_future_callbacks: Vec::new(),
607				callbacks_with_state: Vec::new(),
608				complete: false,
609				callbacks_made: false,
610				next_idx: 2,
611			})),
612			self_idx: 0,
613		};
614		let mut second_future = Future { state: Arc::clone(&future.state), self_idx: 1 };
615
616		let (woken, waker) = create_waker();
617		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
618		assert!(!woken.load(Ordering::SeqCst));
619
620		let (second_woken, second_waker) = create_waker();
621		assert_eq!(
622			Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)),
623			Poll::Pending
624		);
625		assert!(!second_woken.load(Ordering::SeqCst));
626
627		complete_future(&future.state);
628		assert!(woken.load(Ordering::SeqCst));
629		assert!(second_woken.load(Ordering::SeqCst));
630		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
631		assert_eq!(
632			Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)),
633			Poll::Ready(())
634		);
635	}
636
637	#[test]
638	#[cfg(feature = "std")]
639	fn test_dropped_future_doesnt_count() {
640		// Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as
641		// having been woken, leaving the notify-required flag set.
642		let notifier = Notifier::new();
643		notifier.notify();
644
645		// If we get a future and don't touch it we're definitely still notify-required.
646		notifier.get_future();
647		assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
648		assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
649
650		// Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
651		let mut future = notifier.get_future();
652		let (woken, waker) = create_waker();
653		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
654
655		notifier.notify();
656		assert!(woken.load(Ordering::SeqCst));
657		assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
658
659		// However, once we do poll `Ready` it should wipe the notify-required flag.
660		let mut future = notifier.get_future();
661		let (woken, waker) = create_waker();
662		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
663
664		notifier.notify();
665		assert!(woken.load(Ordering::SeqCst));
666		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
667		assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
668	}
669
670	#[test]
671	fn test_poll_post_notify_completes() {
672		// Tests that if we have a future state that has completed, and we haven't yet requested a
673		// new future, if we get a notify prior to requesting that second future it is generated
674		// pre-completed.
675		let notifier = Notifier::new();
676
677		notifier.notify();
678		let mut future = notifier.get_future();
679		let (woken, waker) = create_waker();
680		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
681		assert!(!woken.load(Ordering::SeqCst));
682
683		notifier.notify();
684		let mut future = notifier.get_future();
685		let (woken, waker) = create_waker();
686		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
687		assert!(!woken.load(Ordering::SeqCst));
688
689		let mut future = notifier.get_future();
690		let (woken, waker) = create_waker();
691		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
692		assert!(!woken.load(Ordering::SeqCst));
693
694		notifier.notify();
695		assert!(woken.load(Ordering::SeqCst));
696		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
697	}
698
699	#[test]
700	fn test_poll_post_notify_completes_initial_notified() {
701		// Identical to the previous test, but the first future completes via a wake rather than an
702		// immediate `Poll::Ready`.
703		let notifier = Notifier::new();
704
705		let mut future = notifier.get_future();
706		let (woken, waker) = create_waker();
707		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
708
709		notifier.notify();
710		assert!(woken.load(Ordering::SeqCst));
711		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
712
713		notifier.notify();
714		let mut future = notifier.get_future();
715		let (woken, waker) = create_waker();
716		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
717		assert!(!woken.load(Ordering::SeqCst));
718
719		let mut future = notifier.get_future();
720		let (woken, waker) = create_waker();
721		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
722		assert!(!woken.load(Ordering::SeqCst));
723
724		notifier.notify();
725		assert!(woken.load(Ordering::SeqCst));
726		assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
727	}
728
729	#[test]
730	#[cfg(feature = "std")]
731	fn test_multi_future_sleep() {
732		// Tests the `Sleeper` with multiple futures.
733		let notifier_a = Notifier::new();
734		let notifier_b = Notifier::new();
735
736		// Set both notifiers as woken without sleeping yet.
737		notifier_a.notify();
738		notifier_b.notify();
739		Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
740
741		// One future has woken us up, but the other should still have a pending notification.
742		Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
743
744		// However once we've slept twice, we should no longer have any pending notifications
745		assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
746			.wait_timeout(Duration::from_millis(10)));
747
748		// Test ordering somewhat more.
749		notifier_a.notify();
750		Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
751	}
752
753	#[test]
754	#[cfg(feature = "std")]
755	fn sleeper_with_pending_callbacks() {
756		// This is similar to the above `test_multi_future_sleep` test, but in addition registers
757		// "normal" callbacks which will cause the futures to assume notification has occurred,
758		// rather than waiting for a woken sleeper.
759		let notifier_a = Notifier::new();
760		let notifier_b = Notifier::new();
761
762		// Set both notifiers as woken without sleeping yet.
763		notifier_a.notify();
764		notifier_b.notify();
765
766		// After sleeping one future (not guaranteed which one, however) will have its notification
767		// bit cleared.
768		Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
769
770		// By registering a callback on the futures for both notifiers, one will complete
771		// immediately, but one will remain tied to the notifier, and will complete once the
772		// notifier is next woken, which will be considered the completion of the notification.
773		let callback_a = Arc::new(AtomicBool::new(false));
774		let callback_b = Arc::new(AtomicBool::new(false));
775		let callback_a_ref = Arc::clone(&callback_a);
776		let callback_b_ref = Arc::clone(&callback_b);
777		notifier_a.get_future().register_callback(Box::new(move || {
778			assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))
779		}));
780		notifier_b.get_future().register_callback(Box::new(move || {
781			assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))
782		}));
783		assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
784
785		// If we now notify both notifiers again, the other callback will fire, completing the
786		// notification, and we'll be back to one pending notification.
787		notifier_a.notify();
788		notifier_b.notify();
789
790		assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
791		Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
792		assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
793			.wait_timeout(Duration::from_millis(10)));
794	}
795
796	#[test]
797	fn multi_poll_stores_single_waker() {
798		// When a `Future` is `poll()`ed multiple times, only the last `Waker` should be called,
799		// but previously we'd store all `Waker`s until they're all woken at once. This tests a few
800		// cases to ensure `Future`s avoid storing an endless set of `Waker`s.
801		let notifier = Notifier::new();
802		let future_state = Arc::clone(&notifier.get_future().state);
803		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
804
805		// Test that simply polling a future twice doesn't result in two pending `Waker`s.
806		let mut future_a = notifier.get_future();
807		assert_eq!(
808			Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
809			Poll::Pending
810		);
811		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
812		assert_eq!(
813			Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
814			Poll::Pending
815		);
816		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
817
818		// If we poll a second future, however, that will store a second `Waker`.
819		let mut future_b = notifier.get_future();
820		assert_eq!(
821			Pin::new(&mut future_b).poll(&mut Context::from_waker(&create_waker().1)),
822			Poll::Pending
823		);
824		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 2);
825
826		// but when we drop the `Future`s, the pending Wakers will also be dropped.
827		mem::drop(future_a);
828		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
829		mem::drop(future_b);
830		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
831
832		// Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
833		let mut future_a = notifier.get_future();
834		assert_eq!(
835			Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
836			Poll::Pending
837		);
838		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
839		assert_eq!(
840			Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
841			Poll::Pending
842		);
843		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
844		notifier.notify();
845		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
846		assert_eq!(
847			Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
848			Poll::Ready(())
849		);
850		assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
851	}
852}