use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use crossbeam_queue::SegQueue; use crossbeam_utils::sync::{Parker, Unparker}; use lightproc::prelude::LightProc; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; pub trait Runnable { fn run(self); } impl Runnable for LightProc { fn run(self) { LightProc::run(self) } } #[derive(Debug)] /// A thread worker pulling tasks from a shared injector queue and executing them pub(crate) struct WorkerThread<'a, Task> { /// Shared task queue task_queue: Arc>, /// This threads task queue. For efficiency reasons worker threads pull a batch of tasks /// from the injector queue and work on them instead of pulling them one by one. Should the /// global queue become empty worker threads can steal tasks from each other. tasks: Worker, /// Queue of `!Send` tasks that have to be entirely ran on this thread and must not be moved /// or stolen to other threads. local_tasks: SegQueue, /// Thread parker. /// /// A worker thread will park when there is no more work it can do. Work threads can be /// unparked by either a local task being woken up or by the Executor owning the Injector queue. parker: Parker, _marker: PhantomData<&'a ()>, } #[derive(Debug)] pub struct Sleeper { stealer: Stealer, unparker: Unparker, } impl Sleeper { pub fn wakeup(&self) { self.unparker.unpark(); } } impl<'a, T: Runnable + 'a> WorkerThread<'a, T> { pub fn new(task_queue: Arc>) -> (WorkerThread<'a, T>, Sleeper) { let tasks: Worker = Worker::new_fifo(); let stealer = tasks.stealer(); let local_tasks: SegQueue = SegQueue::new(); let parker = Parker::new(); let _marker = PhantomData; let unparker = parker.unparker().clone(); ( Self { task_queue, tasks, local_tasks, parker, _marker, }, Sleeper { stealer, unparker }, ) } pub fn unparker(&self) -> &Unparker { self.parker.unparker() } /// Run this worker thread "forever" (i.e. until the thread panics or is otherwise killed) pub fn run(&self, fences: impl Iterator>) -> ! { let fences: Vec> = fences.map(|stealer| stealer.clone()).collect(); loop { self.run_inner(&fences); self.parker.park(); } } pub fn run_timeout( &self, fences: impl Iterator>, timeout: Duration, ) -> ! { let fences: Vec> = fences.map(|stealer| stealer.clone()).collect(); loop { self.run_inner(&fences); self.parker.park_timeout(timeout); } } pub fn run_once(&self, fences: impl Iterator>) { let fences: Vec> = fences.map(|stealer| stealer.clone()).collect(); self.run_inner(fences); } fn run_inner]>>(&self, fences: F) { // Continue working until there is no work to do. 'work: while { // Always run local tasks first since they can't be done by anybody else. if let Some(task) = self.local_tasks.pop() { task.run(); continue 'work; } else if let Some(task) = self.tasks.pop() { task.run(); continue 'work; } else { // If we were woken up by the global scheduler `should_steal` is set to true, // so we now try to clean out. // First try to take work from the global queue. let mut i = 0; loop { match self.task_queue.steal_batch_and_pop(&self.tasks) { // If we could steal from the global queue do more work. Steal::Success(task) => { task.run(); continue 'work; } // If there is no more work to steal from the global queue, try other // workers next Steal::Empty => break, // If a race condition occurred try again with backoff Steal::Retry => { for _ in 0..(1 << i) { core::hint::spin_loop(); i += 1; } } } } // If the global queue is empty too, steal from the thread with the most work. // This is only None when there are no stealers installed which, given that we // exist, *should* never be the case. while let Some(fence) = select_fence(fences.as_ref().iter()) { match fence.steal_batch_and_pop(&self.tasks) { Steal::Success(task) => { task.run(); continue 'work; } // If no other worker has work to do we're done once again. Steal::Empty => break, // If another worker is currently stealing chances are that the // current `stealer` will not have the most task afterwards so we do // want to do the maths regarding that again. Steal::Retry => core::hint::spin_loop(), } } } // If we get here we're done and need to park. false } {} } pub fn schedule_local(&self, task: T) { self.local_tasks.push(task); } } #[inline(always)] fn select_fence<'a, T>(fences: impl Iterator>) -> Option<&'a Stealer> { fences.max_by_key(|fence| fence.len()) }