1use alloc::sync::Arc;
17use core::mem;
18use crate::sync::Mutex;
19
20#[allow(unused_imports)]
21use crate::prelude::*;
22
23#[cfg(feature = "std")]
24use crate::sync::Condvar;
25#[cfg(feature = "std")]
26use std::time::Duration;
27
28use core::future::Future as StdFuture;
29use core::task::{Context, Poll};
30use core::pin::Pin;
31
32
33pub(crate) struct Notifier {
35 notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
36}
37
38impl Notifier {
39 pub(crate) fn new() -> Self {
40 Self {
41 notify_pending: Mutex::new((false, None)),
42 }
43 }
44
45 pub(crate) fn notify(&self) {
47 let mut lock = self.notify_pending.lock().unwrap();
48 if let Some(future_state) = &lock.1 {
49 if complete_future(future_state) {
50 lock.1 = None;
51 return;
52 }
53 }
54 lock.0 = true;
55 }
56
57 pub(crate) fn get_future(&self) -> Future {
59 let mut lock = self.notify_pending.lock().unwrap();
60 let mut self_idx = 0;
61 if let Some(existing_state) = &lock.1 {
62 let mut locked = existing_state.lock().unwrap();
63 if locked.callbacks_made {
64 mem::drop(locked);
67 lock.1.take();
68 lock.0 = false;
69 } else {
70 self_idx = locked.next_idx;
71 locked.next_idx += 1;
72 }
73 }
74 if let Some(existing_state) = &lock.1 {
75 Future { state: Arc::clone(&existing_state), self_idx }
76 } else {
77 let state = Arc::new(Mutex::new(FutureState {
78 callbacks: Vec::new(),
79 std_future_callbacks: Vec::new(),
80 callbacks_with_state: Vec::new(),
81 complete: lock.0,
82 callbacks_made: false,
83 next_idx: 1,
84 }));
85 lock.1 = Some(Arc::clone(&state));
86 Future { state, self_idx: 0 }
87 }
88 }
89
90 #[cfg(any(test, feature = "_test_utils"))]
91 pub fn notify_pending(&self) -> bool {
92 self.notify_pending.lock().unwrap().0
93 }
94}
95
96macro_rules! define_callback { ($($bounds: path),*) => {
97#[cfg_attr(feature = "std", doc = "Rust users should use the [`std::future::Future`] implementation for [`Future`] instead.")]
102#[cfg_attr(feature = "std", doc = "")]
103#[cfg_attr(feature = "std", doc = "Note that the [`std::future::Future`] implementation may only work for runtimes which schedule futures when they receive a wake, rather than immediately executing them.")]
104pub trait FutureCallback : $($bounds +)* {
105 fn call(&self);
107}
108
109impl<F: Fn() $(+ $bounds)*> FutureCallback for F {
110 fn call(&self) { (self)(); }
111}
112} }
113
114#[cfg(feature = "std")]
115define_callback!(Send);
116#[cfg(not(feature = "std"))]
117define_callback!();
118
119pub(crate) struct FutureState {
120 callbacks: Vec<Box<dyn FutureCallback>>,
125 std_future_callbacks: Vec<(usize, StdWaker)>,
126 callbacks_with_state: Vec<Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>>,
127 complete: bool,
128 callbacks_made: bool,
129 next_idx: usize,
130}
131
132fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
133 let mut state_lock = this.lock().unwrap();
134 let state = &mut *state_lock;
135 for callback in state.callbacks.drain(..) {
136 callback.call();
137 state.callbacks_made = true;
138 }
139 for (_, waker) in state.std_future_callbacks.drain(..) {
140 waker.0.wake_by_ref();
141 }
142 for callback in state.callbacks_with_state.drain(..) {
143 (callback)(this);
144 }
145 state.complete = true;
146 state.callbacks_made
147}
148
149pub struct Future {
151 state: Arc<Mutex<FutureState>>,
152 self_idx: usize,
153}
154
155impl Future {
156 pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
161 let mut state = self.state.lock().unwrap();
162 if state.complete {
163 state.callbacks_made = true;
164 mem::drop(state);
165 callback.call();
166 } else {
167 state.callbacks.push(callback);
168 }
169 }
170
171 #[cfg(c_bindings)]
177 pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
178 self.register_callback(Box::new(callback));
179 }
180
181 #[cfg(feature = "std")]
183 pub fn wait(&self) {
184 Sleeper::from_single_future(&self).wait();
185 }
186
187 #[cfg(feature = "std")]
191 pub fn wait_timeout(&self, max_wait: Duration) -> bool {
192 Sleeper::from_single_future(&self).wait_timeout(max_wait)
193 }
194
195 #[cfg(test)]
196 pub fn poll_is_complete(&self) -> bool {
197 let mut state = self.state.lock().unwrap();
198 if state.complete {
199 state.callbacks_made = true;
200 true
201 } else { false }
202 }
203}
204
205impl Drop for Future {
206 fn drop(&mut self) {
207 self.state.lock().unwrap().std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
208 }
209}
210
211use core::task::Waker;
212struct StdWaker(pub Waker);
213
214impl<'a> StdFuture for Future {
216 type Output = ();
217
218 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219 let mut state = self.state.lock().unwrap();
220 if state.complete {
221 state.callbacks_made = true;
222 Poll::Ready(())
223 } else {
224 let waker = cx.waker().clone();
225 state.std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
226 state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
227 Poll::Pending
228 }
229 }
230}
231
232#[cfg(feature = "std")]
235pub struct Sleeper {
236 notifiers: Vec<Arc<Mutex<FutureState>>>,
237}
238
239#[cfg(feature = "std")]
240impl Sleeper {
241 pub fn from_single_future(future: &Future) -> Self {
243 Self { notifiers: vec![Arc::clone(&future.state)] }
244 }
245 pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
247 Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
248 }
249 pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
254 let notifiers = vec![
255 Arc::clone(&fut_a.state),
256 Arc::clone(&fut_b.state),
257 Arc::clone(&fut_c.state)
258 ];
259 Self { notifiers }
260 }
261 pub fn new(futures: Vec<Future>) -> Self {
263 Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
264 }
265 fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
269 let cv = Arc::new(Condvar::new());
270 let notified_fut_mtx = Arc::new(Mutex::new(None));
271 {
272 for notifier_mtx in self.notifiers.iter() {
273 let cv_ref = Arc::clone(&cv);
274 let notified_fut_ref = Arc::clone(¬ified_fut_mtx);
275 let mut notifier = notifier_mtx.lock().unwrap();
276 if notifier.complete {
277 *notified_fut_mtx.lock().unwrap() = Some(Arc::clone(¬ifier_mtx));
278 break;
279 }
280 notifier.callbacks_with_state.push(Box::new(move |notifier_ref| {
281 *notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
282 cv_ref.notify_all();
283 }));
284 }
285 }
286 (cv, notified_fut_mtx)
287 }
288
289 pub fn wait(&self) {
291 let (cv, notified_fut_mtx) = self.setup_wait();
292 let notified_fut = cv.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
293 .unwrap().take().expect("CV wait shouldn't have returned until the notifying future was set");
294 notified_fut.lock().unwrap().callbacks_made = true;
295 }
296
297 pub fn wait_timeout(&self, max_wait: Duration) -> bool {
301 let (cv, notified_fut_mtx) = self.setup_wait();
302 let notified_fut =
303 match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| fut_opt.is_none()) {
304 Ok((_, e)) if e.timed_out() => return false,
305 Ok((mut notified_fut, _)) =>
306 notified_fut.take().expect("CV wait shouldn't have returned until the notifying future was set"),
307 Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
308 };
309 notified_fut.lock().unwrap().callbacks_made = true;
310 true
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use core::sync::atomic::{AtomicBool, Ordering};
318 use core::future::Future as FutureTrait;
319 use core::task::{RawWaker, RawWakerVTable};
320
321 #[test]
322 fn notifier_pre_notified_future() {
323 let notifier = Notifier::new();
327 notifier.notify();
328
329 let callback = Arc::new(AtomicBool::new(false));
330 let callback_ref = Arc::clone(&callback);
331 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
332 assert!(callback.load(Ordering::SeqCst));
333 }
334
335 #[test]
336 fn notifier_future_completes_wake() {
337 let notifier = Notifier::new();
341
342 let callback = Arc::new(AtomicBool::new(false));
345 let callback_ref = Arc::clone(&callback);
346 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
347 assert!(!callback.load(Ordering::SeqCst));
348
349 notifier.notify();
350 assert!(callback.load(Ordering::SeqCst));
351
352 let callback = Arc::new(AtomicBool::new(false));
353 let callback_ref = Arc::clone(&callback);
354 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
355 assert!(!callback.load(Ordering::SeqCst));
356
357 notifier.notify();
358 assert!(callback.load(Ordering::SeqCst));
359
360 let future = notifier.get_future();
364 notifier.notify();
365
366 let callback = Arc::new(AtomicBool::new(false));
367 let callback_ref = Arc::clone(&callback);
368 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
369 assert!(callback.load(Ordering::SeqCst));
370
371 let callback = Arc::new(AtomicBool::new(false));
372 let callback_ref = Arc::clone(&callback);
373 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
374 assert!(!callback.load(Ordering::SeqCst));
375 }
376
377 #[test]
378 fn new_future_wipes_notify_bit() {
379 let notifier = Notifier::new();
383 notifier.notify();
384
385 let callback = Arc::new(AtomicBool::new(false));
386 let callback_ref = Arc::clone(&callback);
387 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
388 assert!(callback.load(Ordering::SeqCst));
389
390 let callback = Arc::new(AtomicBool::new(false));
391 let callback_ref = Arc::clone(&callback);
392 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
393 assert!(!callback.load(Ordering::SeqCst));
394
395 notifier.notify();
396 assert!(callback.load(Ordering::SeqCst));
397 }
398
399 #[cfg(feature = "std")]
400 #[test]
401 fn test_wait_timeout() {
402 use crate::sync::Arc;
403 use std::thread;
404
405 let persistence_notifier = Arc::new(Notifier::new());
406 let thread_notifier = Arc::clone(&persistence_notifier);
407
408 let exit_thread = Arc::new(AtomicBool::new(false));
409 let exit_thread_clone = exit_thread.clone();
410 thread::spawn(move || {
411 loop {
412 thread_notifier.notify();
413 if exit_thread_clone.load(Ordering::SeqCst) {
414 break
415 }
416 }
417 });
418
419 let _ = persistence_notifier.get_future().wait();
421
422 loop {
425 if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
426 break
427 }
428 }
429
430 exit_thread.store(true, Ordering::SeqCst);
431
432 loop {
435 if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
436 break
437 }
438 }
439 }
440
441 #[cfg(feature = "std")]
442 #[test]
443 fn test_state_drops() {
444 use crate::sync::Arc;
447 use std::thread;
448
449 let notifier_a = Arc::new(Notifier::new());
450 let notifier_b = Arc::new(Notifier::new());
451
452 let thread_notifier_a = Arc::clone(¬ifier_a);
453
454 let future_a = notifier_a.get_future();
455 let future_state_a = Arc::downgrade(&future_a.state);
456
457 let future_b = notifier_b.get_future();
458 let future_state_b = Arc::downgrade(&future_b.state);
459
460 let join_handle = thread::spawn(move || {
461 std::thread::sleep(Duration::from_millis(50));
463 thread_notifier_a.notify();
464 });
465
466 Sleeper::from_two_futures(&future_a, &future_b).wait();
469
470 join_handle.join().unwrap();
471
472 mem::drop(notifier_a);
474 mem::drop(notifier_b);
475 mem::drop(future_a);
476 mem::drop(future_b);
477
478 assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
479 }
480
481 #[test]
482 fn test_future_callbacks() {
483 let future = Future {
484 state: Arc::new(Mutex::new(FutureState {
485 callbacks: Vec::new(),
486 std_future_callbacks: Vec::new(),
487 callbacks_with_state: Vec::new(),
488 complete: false,
489 callbacks_made: false,
490 next_idx: 1,
491 })),
492 self_idx: 0,
493 };
494 let callback = Arc::new(AtomicBool::new(false));
495 let callback_ref = Arc::clone(&callback);
496 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
497
498 assert!(!callback.load(Ordering::SeqCst));
499 complete_future(&future.state);
500 assert!(callback.load(Ordering::SeqCst));
501 complete_future(&future.state);
502 }
503
504 #[test]
505 fn test_pre_completed_future_callbacks() {
506 let future = Future {
507 state: Arc::new(Mutex::new(FutureState {
508 callbacks: Vec::new(),
509 std_future_callbacks: Vec::new(),
510 callbacks_with_state: Vec::new(),
511 complete: false,
512 callbacks_made: false,
513 next_idx: 1,
514 })),
515 self_idx: 0,
516 };
517 complete_future(&future.state);
518
519 let callback = Arc::new(AtomicBool::new(false));
520 let callback_ref = Arc::clone(&callback);
521 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
522
523 assert!(callback.load(Ordering::SeqCst));
524 assert!(future.state.lock().unwrap().callbacks.is_empty());
525 }
526
527 const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
532 unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
533 unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; let _freed = Box::from_raw(p); }
534 unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
535 unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
536 let p = ptr as *const Arc<AtomicBool>;
537 RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
538 }
539
540 fn create_waker() -> (Arc<AtomicBool>, Waker) {
541 let a = Arc::new(AtomicBool::new(false));
542 let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
543 (a, waker)
544 }
545
546 #[test]
547 fn test_future() {
548 let mut future = Future {
549 state: Arc::new(Mutex::new(FutureState {
550 callbacks: Vec::new(),
551 std_future_callbacks: Vec::new(),
552 callbacks_with_state: Vec::new(),
553 complete: false,
554 callbacks_made: false,
555 next_idx: 2,
556 })),
557 self_idx: 0,
558 };
559 let mut second_future = Future { state: Arc::clone(&future.state), self_idx: 1 };
560
561 let (woken, waker) = create_waker();
562 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
563 assert!(!woken.load(Ordering::SeqCst));
564
565 let (second_woken, second_waker) = create_waker();
566 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
567 assert!(!second_woken.load(Ordering::SeqCst));
568
569 complete_future(&future.state);
570 assert!(woken.load(Ordering::SeqCst));
571 assert!(second_woken.load(Ordering::SeqCst));
572 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
573 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
574 }
575
576 #[test]
577 #[cfg(feature = "std")]
578 fn test_dropped_future_doesnt_count() {
579 let notifier = Notifier::new();
582 notifier.notify();
583
584 notifier.get_future();
586 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
587 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
588
589 let mut future = notifier.get_future();
591 let (woken, waker) = create_waker();
592 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
593
594 notifier.notify();
595 assert!(woken.load(Ordering::SeqCst));
596 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
597
598 let mut future = notifier.get_future();
600 let (woken, waker) = create_waker();
601 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
602
603 notifier.notify();
604 assert!(woken.load(Ordering::SeqCst));
605 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
606 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
607 }
608
609 #[test]
610 fn test_poll_post_notify_completes() {
611 let notifier = Notifier::new();
615
616 notifier.notify();
617 let mut future = notifier.get_future();
618 let (woken, waker) = create_waker();
619 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
620 assert!(!woken.load(Ordering::SeqCst));
621
622 notifier.notify();
623 let mut future = notifier.get_future();
624 let (woken, waker) = create_waker();
625 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
626 assert!(!woken.load(Ordering::SeqCst));
627
628 let mut future = notifier.get_future();
629 let (woken, waker) = create_waker();
630 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
631 assert!(!woken.load(Ordering::SeqCst));
632
633 notifier.notify();
634 assert!(woken.load(Ordering::SeqCst));
635 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
636 }
637
638 #[test]
639 fn test_poll_post_notify_completes_initial_notified() {
640 let notifier = Notifier::new();
643
644 let mut future = notifier.get_future();
645 let (woken, waker) = create_waker();
646 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
647
648 notifier.notify();
649 assert!(woken.load(Ordering::SeqCst));
650 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
651
652 notifier.notify();
653 let mut future = notifier.get_future();
654 let (woken, waker) = create_waker();
655 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
656 assert!(!woken.load(Ordering::SeqCst));
657
658 let mut future = notifier.get_future();
659 let (woken, waker) = create_waker();
660 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
661 assert!(!woken.load(Ordering::SeqCst));
662
663 notifier.notify();
664 assert!(woken.load(Ordering::SeqCst));
665 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
666 }
667
668 #[test]
669 #[cfg(feature = "std")]
670 fn test_multi_future_sleep() {
671 let notifier_a = Notifier::new();
673 let notifier_b = Notifier::new();
674
675 notifier_a.notify();
677 notifier_b.notify();
678 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
679
680 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
682
683 assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future())
685 .wait_timeout(Duration::from_millis(10)));
686
687 notifier_a.notify();
689 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
690 }
691
692 #[test]
693 #[cfg(feature = "std")]
694 fn sleeper_with_pending_callbacks() {
695 let notifier_a = Notifier::new();
699 let notifier_b = Notifier::new();
700
701 notifier_a.notify();
703 notifier_b.notify();
704
705 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
708
709 let callback_a = Arc::new(AtomicBool::new(false));
713 let callback_b = Arc::new(AtomicBool::new(false));
714 let callback_a_ref = Arc::clone(&callback_a);
715 let callback_b_ref = Arc::clone(&callback_b);
716 notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
717 notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
718 assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
719
720 notifier_a.notify();
723 notifier_b.notify();
724
725 assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
726 Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait();
727 assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future())
728 .wait_timeout(Duration::from_millis(10)));
729 }
730
731 #[test]
732 fn multi_poll_stores_single_waker() {
733 let notifier = Notifier::new();
737 let future_state = Arc::clone(¬ifier.get_future().state);
738 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
739
740 let mut future_a = notifier.get_future();
742 assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
743 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
744 assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
745 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
746
747 let mut future_b = notifier.get_future();
749 assert_eq!(Pin::new(&mut future_b).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
750 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 2);
751
752 mem::drop(future_a);
754 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
755 mem::drop(future_b);
756 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
757
758 let mut future_a = notifier.get_future();
760 assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
761 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
762 assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
763 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
764 notifier.notify();
765 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
766 assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Ready(()));
767 assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
768 }
769}