1 // compile-flags: -C opt-level=3
4 use std::future::Future;
5 use std::marker::PhantomData;
7 use std::sync::atomic::AtomicUsize;
9 use std::task::Poll::{Pending, Ready};
11 use std::task::{Context, Poll};
14 task::{RawWaker, RawWakerVTable},
17 /// Future for the [`poll_fn`] function.
18 pub struct PollFn<F> {
22 impl<F> Unpin for PollFn<F> {}
24 /// Creates a new future wrapping around a function returning [`Poll`].
25 pub fn poll_fn<T, F>(f: F) -> PollFn<F>
27 F: FnMut(&mut Context<'_>) -> Poll<T>,
32 impl<T, F> Future for PollFn<F>
34 F: FnMut(&mut Context<'_>) -> Poll<T>,
38 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
42 pub fn run<F: Future>(future: F) -> F::Output {
43 BasicScheduler.block_on(future)
46 pub(crate) struct BasicScheduler;
49 pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output
53 let waker = unsafe { Waker::from_raw(raw_waker()) };
54 let mut cx = std::task::Context::from_waker(&waker);
56 let mut future = unsafe { Pin::new_unchecked(&mut future) };
59 if let Ready(v) = future.as_mut().poll(&mut cx) {
66 // ===== impl Spawner =====
68 fn raw_waker() -> RawWaker {
69 RawWaker::new(ptr::null(), waker_vtable())
72 fn waker_vtable() -> &'static RawWakerVTable {
81 unsafe fn clone_arc_raw(_: *const ()) -> RawWaker {
85 unsafe fn wake_arc_raw(_: *const ()) {}
87 unsafe fn wake_by_ref_arc_raw(_: *const ()) {}
89 unsafe fn drop_arc_raw(_: *const ()) {}
94 /// Create an `AtomicWaker`
95 fn new() -> AtomicWaker {
99 fn register_by_ref(&self, _waker: &Waker) {}
115 rx_waker: AtomicWaker,
119 fn channel<T>() -> (Tx<T>, Rx<T>) {
120 let chan = Arc::new(Chan {
122 semaphore: Sema(AtomicUsize::new(0)),
123 rx_waker: AtomicWaker::new(),
135 // ===== impl Rx =====
138 /// Receive the next value
139 fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
140 self.inner.rx_waker.register_by_ref(cx.waker());
142 if self.inner.rx_closed && self.inner.semaphore.is_idle() {
150 struct Sema(AtomicUsize);
153 fn is_idle(&self) -> bool {
158 pub struct UnboundedReceiver<T> {
162 pub fn unbounded_channel<T>() -> UnboundedReceiver<T> {
163 let (tx, rx) = channel();
166 let rx = UnboundedReceiver { chan: rx };
171 impl<T> UnboundedReceiver<T> {
172 pub async fn recv(&mut self) -> Option<T> {
173 poll_fn(|cx| self.chan.recv(cx)).await