neon/sys/async_work.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
//! Rust wrappers for Node-API simple asynchronous operations
//!
//! Unlike `napi_async_work` which threads a single mutable pointer to a data
//! struct to both the `execute` and `complete` callbacks, the wrapper follows
//! a more idiomatic Rust ownership pattern by passing the output of `execute`
//! into the input of `complete`.
//!
//! See: [Async operations in Node-API](https://nodejs.org/api/n-api.html#n_api_simple_asynchronous_operations)
use std::{
ffi::c_void,
mem,
panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
ptr, thread,
};
use super::{
bindings as napi, debug_send_wrapper::DebugSendWrapper, no_panic::FailureBoundary, raw::Env,
};
const BOUNDARY: FailureBoundary = FailureBoundary {
both: "A panic and exception occurred while executing a `neon::event::TaskBuilder` task",
exception: "An exception occurred while executing a `neon::event::TaskBuilder` task",
panic: "A panic occurred while executing a `neon::event::TaskBuilder` task",
};
type Execute<I, O> = fn(input: I) -> O;
type Complete<O, D> = fn(env: Env, output: thread::Result<O>, data: D);
/// Schedule work to execute on the libuv thread pool
///
/// # Safety
/// * `env` must be a valid `napi_env` for the current thread
/// * The `thread::Result::Err` must only be used for resuming unwind if
/// `execute` is not unwind safe
pub unsafe fn schedule<I, O, D>(
env: Env,
input: I,
execute: Execute<I, O>,
complete: Complete<O, D>,
data: D,
) where
I: Send + 'static,
O: Send + 'static,
D: 'static,
{
let mut data = Box::new(Data {
state: State::Input(input),
execute,
complete,
data: DebugSendWrapper::new(data),
// Work is initialized as a null pointer, but set by `create_async_work`
// `data` must not be used until this value has been set.
work: ptr::null_mut(),
});
// Store a pointer to `work` before ownership is transferred to `Box::into_raw`
let work = &mut data.work as *mut _;
// Create the `async_work`
napi::create_async_work(
env,
ptr::null_mut(),
super::string(env, "neon_async_work"),
Some(call_execute::<I, O, D>),
Some(call_complete::<I, O, D>),
Box::into_raw(data).cast(),
work,
)
.unwrap();
// Queue the work
match napi::queue_async_work(env, *work) {
Ok(()) => {}
status => {
// If queueing failed, delete the work to prevent a leak
let _ = napi::delete_async_work(env, *work);
status.unwrap()
}
}
}
/// A pointer to data is passed to the `execute` and `complete` callbacks
struct Data<I, O, D> {
state: State<I, O>,
execute: Execute<I, O>,
complete: Complete<O, D>,
data: DebugSendWrapper<D>,
work: napi::AsyncWork,
}
/// State of the task that is transitioned by `execute` and `complete`
enum State<I, O> {
/// Initial data input passed to `execute`
Input(I),
/// Transient state while `execute` is running
Executing,
/// Return data of `execute` passed to `complete`
Output(thread::Result<O>),
}
impl<I, O> State<I, O> {
/// Return the input if `State::Input`, replacing with `State::Executing`
fn take_execute_input(&mut self) -> Option<I> {
match mem::replace(self, Self::Executing) {
Self::Input(input) => Some(input),
_ => None,
}
}
/// Return the output if `State::Output`, replacing with `State::Executing`
fn into_output(self) -> Option<thread::Result<O>> {
match self {
Self::Output(output) => Some(output),
_ => None,
}
}
}
/// Callback executed on the libuv thread pool
///
/// # Safety
/// * `Env` should not be used because it could attempt to call JavaScript
/// * `data` is expected to be a pointer to `Data<I, O, D>`
unsafe extern "C" fn call_execute<I, O, D>(_: Env, data: *mut c_void) {
let data = &mut *data.cast::<Data<I, O, D>>();
// This is unwind safe because unwinding will resume on the other side
let output = catch_unwind(AssertUnwindSafe(|| {
// `unwrap` is ok because `call_execute` should be called exactly once
// after initialization
let input = data.state.take_execute_input().unwrap();
(data.execute)(input)
}));
data.state = State::Output(output);
}
/// Callback executed on the JavaScript main thread
///
/// # Safety
/// * `data` is expected to be a pointer to `Data<I, O, D>`
unsafe extern "C" fn call_complete<I, O, D>(env: Env, status: napi::Status, data: *mut c_void) {
let Data {
state,
complete,
data,
work,
..
} = *Box::<Data<I, O, D>>::from_raw(data.cast());
debug_assert_eq!(napi::delete_async_work(env, work), Ok(()));
BOUNDARY.catch_failure(env, None, move |env| {
// `unwrap` is okay because `call_complete` should be called exactly once
// if and only if `call_execute` has completed successfully
let output = state.into_output().unwrap();
// The event looped has stopped if we do not have an Env
let env = if let Some(env) = env {
env
} else {
// Resume panicking if necessary
if let Err(panic) = output {
resume_unwind(panic);
}
return ptr::null_mut();
};
match status {
napi::Status::Ok => complete(env, output, data.take()),
napi::Status::Cancelled => {}
_ => assert_eq!(status, napi::Status::Ok),
}
ptr::null_mut()
});
}