use crate::proc_vtable::ProcVTable; use crate::state::*; use crossbeam_utils::Backoff; use std::cell::Cell; use std::fmt::{self, Debug, Formatter}; use std::sync::atomic::Ordering; use std::task::Waker; /// The pdata of a proc. /// /// This pdata is stored right at the beginning of every heap-allocated proc. pub(crate) struct ProcData { /// Current state of the proc. /// /// Contains flags representing the current state and the reference count. pub(crate) state: AtomicState, /// The proc that is blocked on the `ProcHandle`. /// /// This waker needs to be woken once the proc completes or is closed. pub(crate) awaiter: Cell>, /// The virtual table. /// /// In addition to the actual waker virtual table, it also contains pointers to several other /// methods necessary for bookkeeping the heap-allocated proc. pub(crate) vtable: &'static ProcVTable, } impl ProcData { /// Cancels the proc. /// /// This method will only mark the proc as closed and will notify the awaiter, but it won't /// reschedule the proc if it's not completed. pub(crate) fn cancel(&self) { let mut state = self.state.load(Ordering::Acquire); loop { // If the proc has been completed or closed, it can't be cancelled. if state.get_flags().intersects(COMPLETED | CLOSED) { break; } let (flags, references) = state.parts(); let new = State::new(flags | CLOSED, references); // Mark the proc as closed. match self .state .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire) { Ok(_) => { // Notify the awaiter that the proc has been closed. if state.is_awaiter() { self.notify(); } break; } Err(s) => state = s, } } } /// Notifies the proc blocked on the proc. /// /// If there is a registered waker, it will be removed from the pdata and woken. #[inline] pub(crate) fn notify(&self) { if let Some(waker) = self.swap_awaiter(None) { // We need a safeguard against panics because waking can panic. waker.wake(); } } /// Notifies the proc blocked on the proc unless its waker matches `current`. /// /// If there is a registered waker, it will be removed from the pdata. #[inline] pub(crate) fn notify_unless(&self, current: &Waker) { if let Some(waker) = self.swap_awaiter(None) { if !waker.will_wake(current) { // We need a safeguard against panics because waking can panic. waker.wake(); } } } /// Swaps the awaiter and returns the previous value. #[inline] pub(crate) fn swap_awaiter(&self, new: Option) -> Option { let new_is_none = new.is_none(); // We're about to try acquiring the lock in a loop. If it's already being held by another // thread, we'll have to spin for a while so it's best to employ a backoff strategy. let backoff = Backoff::new(); loop { // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag. let state = if new_is_none { self.state.fetch_or(LOCKED, Ordering::Acquire) } else { self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire) }; // If the lock was acquired, break from the loop. if state.is_locked() { break; } // Snooze for a little while because the lock is held by another thread. backoff.snooze(); } // Replace the awaiter. let old = self.awaiter.replace(new); // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag. if new_is_none { self.state .fetch_and((!LOCKED & !AWAITER).into(), Ordering::Release); } else { self.state.fetch_and((!LOCKED).into(), Ordering::Release); } old } } impl Debug for ProcData { fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { let state = self.state.load(Ordering::SeqCst); if fmt.alternate() { fmt.debug_struct("ProcData") .field("scheduled", &state.is_scheduled()) .field("running", &state.is_running()) .field("completed", &state.is_completed()) .field("closed", &state.is_closed()) .field("handle", &state.is_handle()) .field("awaiter", &state.is_awaiter()) .field("locked", &state.is_locked()) .field("ref_count", &state.get_refcount()) .finish() } else { fmt.debug_struct("ProcData").field("state", &state).finish() } } }