1use crate::sync::Mutex;
17use alloc::sync::Arc;
18use core::mem;
19
20#[allow(unused_imports)]
21use crate::prelude::*;
22
23#[cfg(feature = "std")]
24use crate::sync::Condvar;
25#[cfg(feature = "std")]
26use std::time::Duration;
27
28use core::future::Future as StdFuture;
29use core::pin::Pin;
30use core::task::{Context, Poll};
31
32pub struct Notifier {
40 notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
41}
42
43impl Notifier {
44 pub fn new() -> Self {
46 Self { notify_pending: Mutex::new((false, None)) }
47 }
48
49 pub fn notify(&self) {
56 let mut lock = self.notify_pending.lock().unwrap();
57 if let Some(future_state) = &lock.1 {
58 if complete_future(future_state) {
59 lock.1 = None;
60 return;
61 }
62 }
63 lock.0 = true;
64 }
65
66 pub fn get_future(&self) -> Future {
68 let mut lock = self.notify_pending.lock().unwrap();
69 let mut self_idx = 0;
70 if let Some(existing_state) = &lock.1 {
71 let mut locked = existing_state.lock().unwrap();
72 if locked.callbacks_made {
73 mem::drop(locked);
76 lock.1.take();
77 lock.0 = false;
78 } else {
79 self_idx = locked.next_idx;
80 locked.next_idx += 1;
81 }
82 }
83 if let Some(existing_state) = &lock.1 {
84 Future { state: Arc::clone(&existing_state), self_idx }
85 } else {
86 let state = Arc::new(Mutex::new(FutureState {
87 callbacks: Vec::new(),
88 std_future_callbacks: Vec::new(),
89 callbacks_with_state: Vec::new(),
90 complete: lock.0,
91 callbacks_made: false,
92 next_idx: 1,
93 }));
94 lock.1 = Some(Arc::clone(&state));
95 Future { state, self_idx: 0 }
96 }
97 }
98
99 #[cfg(any(test, feature = "_test_utils"))]
100 pub fn notify_pending(&self) -> bool {
101 self.notify_pending.lock().unwrap().0
102 }
103}
104
105macro_rules! define_callback { ($($bounds: path),*) => {
106#[cfg_attr(feature = "std", doc = "Rust users should use the [`std::future::Future`] implementation for [`Future`] instead.")]
111#[cfg_attr(feature = "std", doc = "")]
112#[cfg_attr(feature = "std", doc = "Note that the [`std::future::Future`] implementation may only work for runtimes which schedule futures when they receive a wake, rather than immediately executing them.")]
113pub trait FutureCallback : $($bounds +)* {
114 fn call(&self);
116}
117
118impl<F: Fn() $(+ $bounds)*> FutureCallback for F {
119 fn call(&self) { (self)(); }
120}
121} }
122
123#[cfg(feature = "std")]
124define_callback!(Send);
125#[cfg(not(feature = "std"))]
126define_callback!();
127
128pub(crate) struct FutureState {
129 callbacks: Vec<Box<dyn FutureCallback>>,
134 std_future_callbacks: Vec<(usize, StdWaker)>,
135 callbacks_with_state: Vec<Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>>,
136 complete: bool,
137 callbacks_made: bool,
138 next_idx: usize,
139}
140
141fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
142 let mut state_lock = this.lock().unwrap();
143 let state = &mut *state_lock;
144 for callback in state.callbacks.drain(..) {
145 callback.call();
146 state.callbacks_made = true;
147 }
148 for (_, waker) in state.std_future_callbacks.drain(..) {
149 waker.0.wake();
150 }
151 for callback in state.callbacks_with_state.drain(..) {
152 (callback)(this);
153 }
154 state.complete = true;
155 state.callbacks_made
156}
157
158pub struct Future {
160 state: Arc<Mutex<FutureState>>,
161 self_idx: usize,
162}
163
164impl Future {
165 pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
170 let mut state = self.state.lock().unwrap();
171 if state.complete {
172 state.callbacks_made = true;
173 mem::drop(state);
174 callback.call();
175 } else {
176 state.callbacks.push(callback);
177 }
178 }
179
180 #[cfg(c_bindings)]
186 pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
187 self.register_callback(Box::new(callback));
188 }
189
190 #[cfg(feature = "std")]
192 pub fn wait(&self) {
193 Sleeper::from_single_future(&self).wait();
194 }
195
196 #[cfg(feature = "std")]
200 pub fn wait_timeout(&self, max_wait: Duration) -> bool {
201 Sleeper::from_single_future(&self).wait_timeout(max_wait)
202 }
203
204 #[cfg(test)]
205 pub fn poll_is_complete(&self) -> bool {
206 let mut state = self.state.lock().unwrap();
207 if state.complete {
208 state.callbacks_made = true;
209 true
210 } else {
211 false
212 }
213 }
214}
215
216impl Drop for Future {
217 fn drop(&mut self) {
218 self.state.lock().unwrap().std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
219 }
220}
221
222use core::task::Waker;
223struct StdWaker(pub Waker);
224
225impl<'a> StdFuture for Future {
227 type Output = ();
228
229 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230 let mut state = self.state.lock().unwrap();
231 if state.complete {
232 state.callbacks_made = true;
233 Poll::Ready(())
234 } else {
235 let waker = cx.waker().clone();
236 state.std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
237 state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
238 Poll::Pending
239 }
240 }
241}
242
243#[cfg(feature = "std")]
246pub struct Sleeper {
247 notifiers: Vec<Arc<Mutex<FutureState>>>,
248}
249
250#[cfg(feature = "std")]
251impl Sleeper {
252 pub fn from_single_future(future: &Future) -> Self {
254 Self { notifiers: vec![Arc::clone(&future.state)] }
255 }
256 pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
258 Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
259 }
260 pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
265 let notifiers =
266 vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state), Arc::clone(&fut_c.state)];
267 Self { notifiers }
268 }
269 pub fn from_four_futures(
274 fut_a: &Future, fut_b: &Future, fut_c: &Future, fut_d: &Future,
275 ) -> Self {
276 let notifiers = vec![
277 Arc::clone(&fut_a.state),
278 Arc::clone(&fut_b.state),
279 Arc::clone(&fut_c.state),
280 Arc::clone(&fut_d.state),
281 ];
282 Self { notifiers }
283 }
284 pub fn new(futures: Vec<Future>) -> Self {
286 Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
287 }
288 fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
292 let cv = Arc::new(Condvar::new());
293 let notified_fut_mtx = Arc::new(Mutex::new(None));
294 {
295 for notifier_mtx in self.notifiers.iter() {
296 let cv_ref = Arc::clone(&cv);
297 let notified_fut_ref = Arc::clone(¬ified_fut_mtx);
298 let mut notifier = notifier_mtx.lock().unwrap();
299 if notifier.complete {
300 *notified_fut_mtx.lock().unwrap() = Some(Arc::clone(¬ifier_mtx));
301 break;
302 }
303 notifier.callbacks_with_state.push(Box::new(move |notifier_ref| {
304 *notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
305 cv_ref.notify_all();
306 }));
307 }
308 }
309 (cv, notified_fut_mtx)
310 }
311
312 pub fn wait(&self) {
314 let (cv, notified_fut_mtx) = self.setup_wait();
315 let notified_fut = cv
316 .wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
317 .unwrap()
318 .take()
319 .expect("CV wait shouldn't have returned until the notifying future was set");
320 notified_fut.lock().unwrap().callbacks_made = true;
321 }
322
323 pub fn wait_timeout(&self, max_wait: Duration) -> bool {
327 let (cv, notified_fut_mtx) = self.setup_wait();
328 let notified_fut =
329 match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| {
330 fut_opt.is_none()
331 }) {
332 Ok((_, e)) if e.timed_out() => return false,
333 Ok((mut notified_fut, _)) => notified_fut
334 .take()
335 .expect("CV wait shouldn't have returned until the notifying future was set"),
336 Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
337 };
338 notified_fut.lock().unwrap().callbacks_made = true;
339 true
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use core::future::Future as FutureTrait;
347 use core::sync::atomic::{AtomicBool, Ordering};
348 use core::task::{RawWaker, RawWakerVTable};
349
350 #[test]
351 fn notifier_pre_notified_future() {
352 let notifier = Notifier::new();
356 notifier.notify();
357
358 let callback = Arc::new(AtomicBool::new(false));
359 let callback_ref = Arc::clone(&callback);
360 notifier.get_future().register_callback(Box::new(move || {
361 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
362 }));
363 assert!(callback.load(Ordering::SeqCst));
364 }
365
366 #[test]
367 fn notifier_future_completes_wake() {
368 let notifier = Notifier::new();
372
373 let callback = Arc::new(AtomicBool::new(false));
376 let callback_ref = Arc::clone(&callback);
377 notifier.get_future().register_callback(Box::new(move || {
378 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
379 }));
380 assert!(!callback.load(Ordering::SeqCst));
381
382 notifier.notify();
383 assert!(callback.load(Ordering::SeqCst));
384
385 let callback = Arc::new(AtomicBool::new(false));
386 let callback_ref = Arc::clone(&callback);
387 notifier.get_future().register_callback(Box::new(move || {
388 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
389 }));
390 assert!(!callback.load(Ordering::SeqCst));
391
392 notifier.notify();
393 assert!(callback.load(Ordering::SeqCst));
394
395 let future = notifier.get_future();
399 notifier.notify();
400
401 let callback = Arc::new(AtomicBool::new(false));
402 let callback_ref = Arc::clone(&callback);
403 future.register_callback(Box::new(move || {
404 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
405 }));
406 assert!(callback.load(Ordering::SeqCst));
407
408 let callback = Arc::new(AtomicBool::new(false));
409 let callback_ref = Arc::clone(&callback);
410 notifier.get_future().register_callback(Box::new(move || {
411 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
412 }));
413 assert!(!callback.load(Ordering::SeqCst));
414 }
415
416 #[test]
417 fn new_future_wipes_notify_bit() {
418 let notifier = Notifier::new();
422 notifier.notify();
423
424 let callback = Arc::new(AtomicBool::new(false));
425 let callback_ref = Arc::clone(&callback);
426 notifier.get_future().register_callback(Box::new(move || {
427 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
428 }));
429 assert!(callback.load(Ordering::SeqCst));
430
431 let callback = Arc::new(AtomicBool::new(false));
432 let callback_ref = Arc::clone(&callback);
433 notifier.get_future().register_callback(Box::new(move || {
434 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
435 }));
436 assert!(!callback.load(Ordering::SeqCst));
437
438 notifier.notify();
439 assert!(callback.load(Ordering::SeqCst));
440 }
441
442 #[cfg(feature = "std")]
443 #[test]
444 fn test_wait_timeout() {
445 use crate::sync::Arc;
446 use std::thread;
447
448 let persistence_notifier = Arc::new(Notifier::new());
449 let thread_notifier = Arc::clone(&persistence_notifier);
450
451 let exit_thread = Arc::new(AtomicBool::new(false));
452 let exit_thread_clone = Arc::clone(&exit_thread);
453 thread::spawn(move || loop {
454 thread_notifier.notify();
455 if exit_thread_clone.load(Ordering::SeqCst) {
456 break;
457 }
458 });
459
460 persistence_notifier.get_future().wait();
462
463 loop {
466 if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
467 break;
468 }
469 }
470
471 exit_thread.store(true, Ordering::SeqCst);
472
473 loop {
476 if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
477 break;
478 }
479 }
480 }
481
482 #[cfg(feature = "std")]
483 #[test]
484 fn test_state_drops() {
485 use crate::sync::Arc;
488 use std::thread;
489
490 let notifier_a = Arc::new(Notifier::new());
491 let notifier_b = Arc::new(Notifier::new());
492
493 let thread_notifier_a = Arc::clone(¬ifier_a);
494
495 let future_a = notifier_a.get_future();
496 let future_state_a = Arc::downgrade(&future_a.state);
497
498 let future_b = notifier_b.get_future();
499 let future_state_b = Arc::downgrade(&future_b.state);
500
501 let join_handle = thread::spawn(move || {
502 std::thread::sleep(Duration::from_millis(50));
504 thread_notifier_a.notify();
505 });
506
507 Sleeper::from_two_futures(&future_a, &future_b).wait();
510
511 join_handle.join().unwrap();
512
513 mem::drop(notifier_a);
515 mem::drop(notifier_b);
516 mem::drop(future_a);
517 mem::drop(future_b);
518
519 assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
520 }
521
522 #[test]
523 fn test_future_callbacks() {
524 let future = Future {
525 state: Arc::new(Mutex::new(FutureState {
526 callbacks: Vec::new(),
527 std_future_callbacks: Vec::new(),
528 callbacks_with_state: Vec::new(),
529 complete: false,
530 callbacks_made: false,
531 next_idx: 1,
532 })),
533 self_idx: 0,
534 };
535 let callback = Arc::new(AtomicBool::new(false));
536 let callback_ref = Arc::clone(&callback);
537 future.register_callback(Box::new(move || {
538 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
539 }));
540
541 assert!(!callback.load(Ordering::SeqCst));
542 complete_future(&future.state);
543 assert!(callback.load(Ordering::SeqCst));
544 complete_future(&future.state);
545 }
546
547 #[test]
548 fn test_pre_completed_future_callbacks() {
549 let future = Future {
550 state: Arc::new(Mutex::new(FutureState {
551 callbacks: Vec::new(),
552 std_future_callbacks: Vec::new(),
553 callbacks_with_state: Vec::new(),
554 complete: false,
555 callbacks_made: false,
556 next_idx: 1,
557 })),
558 self_idx: 0,
559 };
560 complete_future(&future.state);
561
562 let callback = Arc::new(AtomicBool::new(false));
563 let callback_ref = Arc::clone(&callback);
564 future.register_callback(Box::new(move || {
565 assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))
566 }));
567
568 assert!(callback.load(Ordering::SeqCst));
569 assert!(future.state.lock().unwrap().callbacks.is_empty());
570 }
571
572 const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
577 unsafe fn wake_by_ref(ptr: *const ()) {
578 let p = ptr as *const Arc<AtomicBool>;
579 assert!(!(*p).fetch_or(true, Ordering::SeqCst));
580 }
581 unsafe fn drop(ptr: *const ()) {
582 let p = ptr as *mut Arc<AtomicBool>;
583 let _freed = Box::from_raw(p);
584 }
585 unsafe fn wake(ptr: *const ()) {
586 wake_by_ref(ptr);
587 drop(ptr);
588 }
589 unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
590 let p = ptr as *const Arc<AtomicBool>;
591 RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
592 }
593
594 fn create_waker() -> (Arc<AtomicBool>, Waker) {
595 let a = Arc::new(AtomicBool::new(false));
596 let waker =
597 unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
598 (a, waker)
599 }
600
601 #[test]
602 fn test_future() {
603 let mut future = Future {
604 state: Arc::new(Mutex::new(FutureState {
605 callbacks: Vec::new(),
606 std_future_callbacks: Vec::new(),
607 callbacks_with_state: Vec::new(),
608 complete: false,
609 callbacks_made: false,
610 next_idx: 2,
611 })),
612 self_idx: 0,
613 };
614 let mut second_future = Future { state: Arc::clone(&future.state), self_idx: 1 };
615
616 let (woken, waker) = create_waker();
617 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
618 assert!(!woken.load(Ordering::SeqCst));
619
620 let (second_woken, second_waker) = create_waker();
621 assert_eq!(
622 Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)),
623 Poll::Pending
624 );
625 assert!(!second_woken.load(Ordering::SeqCst));
626
627 complete_future(&future.state);
628 assert!(woken.load(Ordering::SeqCst));
629 assert!(second_woken.load(Ordering::SeqCst));
630 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
631 assert_eq!(
632 Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)),
633 Poll::Ready(())
634 );
635 }
636
637 #[test]
638 #[cfg(feature = "std")]
639 fn test_dropped_future_doesnt_count() {
640 let notifier = Notifier::new();
643 notifier.notify();
644
645 notifier.get_future();
647 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
648 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
649
650 let mut future = notifier.get_future();
652 let (woken, waker) = create_waker();
653 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
654
655 notifier.notify();
656 assert!(woken.load(Ordering::SeqCst));
657 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
658
659 let mut future = notifier.get_future();
661 let (woken, waker) = create_waker();
662 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
663
664 notifier.notify();
665 assert!(woken.load(Ordering::SeqCst));
666 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
667 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
668 }
669
670 #[test]
671 fn test_poll_post_notify_completes() {
672 let notifier = Notifier::new();
676
677 notifier.notify();
678 let mut future = notifier.get_future();
679 let (woken, waker) = create_waker();
680 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
681 assert!(!woken.load(Ordering::SeqCst));
682
683 notifier.notify();
684 let mut future = notifier.get_future();
685 let (woken, waker) = create_waker();
686 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
687 assert!(!woken.load(Ordering::SeqCst));
688
689 let mut future = notifier.get_future();
690 let (woken, waker) = create_waker();
691 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
692 assert!(!woken.load(Ordering::SeqCst));
693
694 notifier.notify();
695 assert!(woken.load(Ordering::SeqCst));
696 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
697 }
698
699 #[test]
700 fn test_poll_post_notify_completes_initial_notified() {
701 let notifier = Notifier::new();
704
705 let mut future = notifier.get_future();
706 let (woken, waker) = create_waker();
707 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
708
709 notifier.notify();
710 assert!(woken.load(Ordering::SeqCst));
711 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
712
713 notifier.notify();
714 let mut future = notifier.get_future();
715 let (woken, waker) = create_waker();
716 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
717 assert!(!woken.load(Ordering::SeqCst));
718
719 let mut future = notifier.get_future();
720 let (woken, waker) = create_waker();
721 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
722 assert!(!woken.load(Ordering::SeqCst));
723
724 notifier.notify();
725 assert!(woken.load(Ordering::SeqCst));
726 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
727 }
728
729 #[test]
730 #[cfg(feature = "std")]
731 fn test_multi_future_sleep() {
732 let notifier_a = Notifier::new();
734 let notifier_b = Notifier::new();
735
736 notifier_a.notify();
738 notifier_b.notify();
739 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
740
741 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
743
744 assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future())
746 .wait_timeout(Duration::from_millis(10)));
747
748 notifier_a.notify();
750 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
751 }
752
753 #[test]
754 #[cfg(feature = "std")]
755 fn sleeper_with_pending_callbacks() {
756 let notifier_a = Notifier::new();
760 let notifier_b = Notifier::new();
761
762 notifier_a.notify();
764 notifier_b.notify();
765
766 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
769
770 let callback_a = Arc::new(AtomicBool::new(false));
774 let callback_b = Arc::new(AtomicBool::new(false));
775 let callback_a_ref = Arc::clone(&callback_a);
776 let callback_b_ref = Arc::clone(&callback_b);
777 notifier_a.get_future().register_callback(Box::new(move || {
778 assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))
779 }));
780 notifier_b.get_future().register_callback(Box::new(move || {
781 assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))
782 }));
783 assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
784
785 notifier_a.notify();
788 notifier_b.notify();
789
790 assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
791 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
792 assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future())
793 .wait_timeout(Duration::from_millis(10)));
794 }
795
796 #[test]
797 fn multi_poll_stores_single_waker() {
798 let notifier = Notifier::new();
802 let future_state = Arc::clone(¬ifier.get_future().state);
803 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
804
805 let mut future_a = notifier.get_future();
807 assert_eq!(
808 Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
809 Poll::Pending
810 );
811 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
812 assert_eq!(
813 Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
814 Poll::Pending
815 );
816 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
817
818 let mut future_b = notifier.get_future();
820 assert_eq!(
821 Pin::new(&mut future_b).poll(&mut Context::from_waker(&create_waker().1)),
822 Poll::Pending
823 );
824 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 2);
825
826 mem::drop(future_a);
828 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
829 mem::drop(future_b);
830 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
831
832 let mut future_a = notifier.get_future();
834 assert_eq!(
835 Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
836 Poll::Pending
837 );
838 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
839 assert_eq!(
840 Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
841 Poll::Pending
842 );
843 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
844 notifier.notify();
845 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
846 assert_eq!(
847 Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)),
848 Poll::Ready(())
849 );
850 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
851 }
852}