223 lines
5.9 KiB
Raw Normal View History

2021-11-14 17:51:48 +01:00
//! Pool of threads to run lightweight processes
//! We spawn futures onto the pool with [`spawn`] method of global run queue or
//! with corresponding [`Worker`]'s spawn method.
//! [`spawn`]: crate::pool::spawn
//! [`Worker`]: crate::run_queue::Worker
use std::cell::Cell;
use crate::thread_manager::{ThreadManager, DynamicRunner};
2021-11-14 17:51:48 +01:00
use lightproc::lightproc::LightProc;
use lightproc::recoverable_handle::RecoverableHandle;
use std::future::Future;
use std::iter::Iterator;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::sync::Arc;
2021-11-14 17:51:48 +01:00
use std::time::Duration;
use crossbeam_deque::{Injector, Stealer};
use crate::run::block;
use crate::worker::{Sleeper, WorkerThread};
2021-11-14 17:51:48 +01:00
struct Spooler<'a> {
pub spool: Arc<Injector<LightProc>>,
threads: &'a ThreadManager<AsyncRunner>,
_marker: PhantomData<&'a ()>,
2021-11-14 17:51:48 +01:00
impl Spooler<'_> {
pub fn new() -> Self {
let spool = Arc::new(Injector::new());
let threads = Box::leak(Box::new(
ThreadManager::new(2, AsyncRunner, spool.clone())));
Self { spool, threads, _marker: PhantomData }
2021-11-14 17:51:48 +01:00
#[derive(Clone, Debug)]
pub struct Executor<'a> {
spooler: Arc<Spooler<'a>>,
2021-11-14 17:51:48 +01:00
impl<'a, 'executor: 'a> Executor<'executor> {
pub fn new() -> Self {
Executor {
spooler: Arc::new(Spooler::new()),
fn schedule(&self) -> impl Fn(LightProc) + 'a {
let task_queue = self.spooler.spool.clone();
move |lightproc: LightProc| {
/// Spawn a process (which contains future + process stack) onto the executor from the global level.
/// # Example
/// ```rust
/// use executor::prelude::*;
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # start();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # start();
/// # }
/// #
/// # fn start() {
/// let executor = Spooler::new();
2021-11-14 17:51:48 +01:00
/// let handle = executor.spawn(
/// async {
/// panic!("test");
/// },
/// );
/// executor.run(
/// async {
/// handle.await;
/// }
/// );
/// # }
/// ```
2021-11-14 17:51:48 +01:00
pub fn spawn<F, R>(&self, future: F) -> RecoverableHandle<R>
F: Future<Output = R> + Send + 'a,
R: Send + 'a,
2021-11-14 17:51:48 +01:00
let (task, handle) =
LightProc::recoverable(future, self.schedule());
2021-11-14 17:51:48 +01:00
pub fn spawn_local<F, R>(&self, future: F) -> RecoverableHandle<R>
F: Future<Output = R> + 'a,
R: Send + 'a,
let (task, handle) =
LightProc::recoverable(future, schedule_local());
2021-11-14 17:51:48 +01:00
/// Block the calling thread until the given future completes.
/// # Example
/// ```rust
/// use executor::prelude::*;
/// use lightproc::prelude::*;
/// let executor = Spooler::new();
/// let mut sum = 0;
/// executor.run(
/// async {
/// (0..10_000_000).for_each(|_| {
/// sum += 1;
/// });
/// }
/// );
/// ```
pub fn run<F, R>(&self, future: F) -> R
F: Future<Output = R>,
unsafe {
// An explicitly uninitialized `R`. Until `assume_init` is called this will not call any
// drop code for R
let mut out = MaybeUninit::uninit();
2021-11-14 17:51:48 +01:00
// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.as_mut_ptr();
2021-11-14 17:51:48 +01:00
async move {
*out = future.await;
2021-11-14 17:51:48 +01:00
// Pin the future onto the stack.
2021-11-14 17:51:48 +01:00
// Block on the future and and wait for it to complete.
2021-11-14 17:51:48 +01:00
// Assume that if the future completed and didn't panic it fully initialized its output
2021-11-14 17:51:48 +01:00
struct AsyncRunner;
2021-11-14 17:51:48 +01:00
impl DynamicRunner for AsyncRunner {
fn setup(task_queue: Arc<Injector<LightProc>>) -> Sleeper<LightProc> {
let (worker, sleeper) = WorkerThread::new(task_queue);
2021-11-14 17:51:48 +01:00
fn run_static<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>, park_timeout: Duration) -> ! {
let worker = get_worker();
worker.run_timeout(fences, park_timeout)
2021-11-14 17:51:48 +01:00
fn run_dynamic<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>) -> ! {
let worker = get_worker();
2021-11-14 17:51:48 +01:00
fn run_standalone<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>) {
let worker = get_worker();
2021-11-14 17:51:48 +01:00
thread_local! {
static WORKER: Cell<Option<WorkerThread<'static, LightProc>>> = Cell::new(None);
2021-11-14 17:51:48 +01:00
fn get_worker() -> &'static WorkerThread<'static, LightProc> {
WORKER.with(|cell| {
let worker = unsafe {
&*cell.as_ptr() as &'static Option<WorkerThread<_>>
2021-11-14 17:51:48 +01:00
.expect("AsyncRunner running outside Executor context")
2021-11-14 17:51:48 +01:00
fn install_worker(worker_thread: WorkerThread<'static, LightProc>) {
WORKER.with(|cell| {
2021-11-14 17:51:48 +01:00
fn schedule_local() -> impl Fn(LightProc) {
let worker = get_worker();
let unparker = worker.unparker().clone();
move |lightproc| {
// This is safe because we never replace the value in that Cell and thus never drop the
// SharedWorker pointed to.
// We have to unpark the worker thread for our task to be run.
2021-11-14 17:51:48 +01:00