Skip to content

Commit

Permalink
feat: NR-121867 Supervisor traits (newrelic#24)
Browse files Browse the repository at this point in the history
* feat: NR-121867 Supervisor traits, infra supervisor

* refactor: supervise just one process, wait and kill inside run

* Add Graceful shutdown to supervisor (newrelic#26)

* chore: cleanup

* test: kill process with the terminator

* test: scope process termination to unix

* feat: return another type for run

test: move to integration

* test: deactivate miri

* chore: clippy suggestions, typos

* feat: default impl for SupervisorContext

* chore: cleanup run function and supervisorrunner

* refactor: use from attribute to provide impls

* test: remove supervisor mod test (now integration)

* refactor: unify types, go full typestate

* feat: define stop and Supervisor state

* refactor: make ProcessRunner.process private

* docs: remove unneeded, change docs for stop method

---------

Co-authored-by: Alvaro Cabanas <acabanas@newrelic.com>
Co-authored-by: Roger Coll <rogercoll@protonmail.com>
  • Loading branch information
3 people authored Jun 6, 2023
1 parent 4e4743d commit 245ee7e
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/agent/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mod tests {

let expected = InfraAgent {
uuid_dir: "/bin/sudoo".to_string(),
value: 1 as i64,
value: 1_i64,
kind: CustomTypeTest::A,
};

Expand Down
25 changes: 3 additions & 22 deletions src/command/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,18 @@ pub enum CommandError {
StreamPipeError(String),

#[error("could not get output event")]
StreamOutputError(#[source] SendError<OutputEvent>),
StreamOutputError(#[from] SendError<OutputEvent>),

#[error("io error")]
IOError(#[source] std::io::Error),
IOError(#[from] std::io::Error),

#[cfg(target_family = "unix")]
#[error("system error")]
NixError(#[source] nix::Error),
}

impl From<std::io::Error> for CommandError {
fn from(value: std::io::Error) -> CommandError {
CommandError::IOError(value)
}
}

impl From<SendError<OutputEvent>> for CommandError {
fn from(e: SendError<OutputEvent>) -> Self {
CommandError::StreamOutputError(e)
}
NixError(#[from] nix::Error),
}

impl From<ExitStatus> for CommandError {
fn from(value: ExitStatus) -> Self {
CommandError::ProcessError(value)
}
}

#[cfg(target_family = "unix")]
impl From<nix::errno::Errno> for CommandError {
fn from(value: nix::errno::Errno) -> CommandError {
CommandError::NixError(value)
}
}
10 changes: 7 additions & 3 deletions src/command/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod error;
pub mod processrunner;
pub use crate::command::processrunner::ProcessRunner;
pub mod shutdown;
pub use crate::command::{
processrunner::ProcessRunner, shutdown::wait_exit_timeout, shutdown::wait_exit_timeout_default,
shutdown::ProcessTerminator,
};
pub mod stream;

use std::{process::ExitStatus, sync::mpsc::Sender};
Expand All @@ -21,8 +24,9 @@ pub trait CommandExecutor {
pub trait CommandHandle {
type Error: std::error::Error + Send + Sync;

/// The stop method will stop the command's execution
fn stop(self) -> Result<(), Self::Error>;
fn wait(self) -> Result<ExitStatus, Self::Error>;

fn get_pid(&self) -> u32;
}

/// Trait that specifies the interface for a blocking task execution
Expand Down
37 changes: 23 additions & 14 deletions src/command/processrunner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
ffi::OsStr,
io::{BufRead, BufReader},
marker::PhantomData,
process::{Child, ChildStderr, ChildStdout, Command, Stdio},
process::{Child, ChildStderr, ChildStdout, Command, ExitStatus, Stdio},
sync::mpsc::Sender,
};

Expand Down Expand Up @@ -42,7 +42,7 @@ impl ProcessRunner {
}
}

impl CommandExecutor for ProcessRunner {
impl CommandExecutor for ProcessRunner<Unstarted> {
type Error = CommandError;
type Process = ProcessRunner<Started>;
fn start(self) -> Result<Self::Process, Self::Error> {
Expand All @@ -56,11 +56,17 @@ impl CommandExecutor for ProcessRunner {

impl CommandHandle for ProcessRunner<Started> {
type Error = CommandError;
fn stop(self) -> Result<(), Self::Error> {
Ok(self
.process

fn wait(self) -> Result<ExitStatus, Self::Error> {
self.process
.ok_or(CommandError::ProcessNotStarted)?
.kill()?)
.wait()
.map_err(CommandError::from)
}

fn get_pid(&self) -> u32 {
// process should always be Some here
self.process.as_ref().unwrap().id()
}
}

Expand Down Expand Up @@ -149,14 +155,14 @@ mod tests {

// MockedCommandExector returns an error on start if fail is true
// It can be used to mock process spawn
type MockedCommandExector = bool;
type MockedCommandExecutor = bool;
pub struct MockedCommandHandler;

impl super::CommandExecutor for MockedCommandExector {
impl super::CommandExecutor for MockedCommandExecutor {
type Error = CommandError;
type Process = MockedCommandHandler;
fn start(self) -> Result<Self::Process, Self::Error> {
if self == true {
if self {
Err(CommandError::ProcessError(ExitStatus::from_raw(1)))
} else {
Ok(MockedCommandHandler {})
Expand All @@ -166,14 +172,18 @@ mod tests {

impl CommandHandle for MockedCommandHandler {
type Error = CommandError;
fn stop(self) -> Result<(), CommandError> {
Ok(())
fn wait(self) -> Result<ExitStatus, Self::Error> {
Ok(ExitStatus::from_raw(0))
}

fn get_pid(&self) -> u32 {
0
}
}

#[test]
fn start_stop() {
let cmds: Vec<MockedCommandExector> = vec![true, false, true, true, false];
let cmds: Vec<MockedCommandExecutor> = vec![true, false, true, true, false];

assert_eq!(
cmds.iter()
Expand Down Expand Up @@ -208,7 +218,7 @@ mod tests {
let cmd = MockedCommandHandler {};
let (tx, rx) = std::sync::mpsc::channel();

let cmd = cmd.stream(tx).unwrap();
cmd.stream(tx).unwrap();

let mut stdout_expected = Vec::new();
let mut stderr_expected = Vec::new();
Expand All @@ -226,6 +236,5 @@ mod tests {

assert_eq!(stdout_expected, stdout_result);
assert_eq!(stderr_expected, stderr_result);
assert!(cmd.stop().is_ok());
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ mod config;
pub mod command;
pub use crate::agent::Agent;
pub use crate::config::resolver::Resolver;

pub mod supervisor;
23 changes: 23 additions & 0 deletions src/supervisor/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError};

#[derive(Debug, Clone, Default)]
pub struct SupervisorContext(Arc<(Mutex<bool>, Condvar)>);

impl SupervisorContext {
pub fn new() -> Self {
Self::default()
}

/// Sets the cancellation signal. All threads that are waiting for this signal (i.e. were passed this [`SupervisorContext`] are notified so they unblock and finish execution, cancelling the processes.
pub fn cancel_all(&self) -> Result<(), PoisonError<MutexGuard<'_, bool>>> /* this is the error type returned by a failed `lock()` */
{
let (lck, cvar) = &*self.0;
*lck.lock()? = true;
cvar.notify_all();
Ok(())
}

pub(crate) fn get_lock_cvar(&self) -> &(Mutex<bool>, Condvar) {
&self.0
}
}
21 changes: 21 additions & 0 deletions src/supervisor/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::{fmt::Debug, process::ExitStatus};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ProcessError {
#[error("process exited with error: `{0}`")]
ProcessExited(ExitStatus),

#[error("io error")]
IOError(#[from] std::io::Error),

#[cfg(target_family = "unix")]
#[error("system error")]
NixError(#[from] nix::Error),
}

impl From<ExitStatus> for ProcessError {
fn from(value: ExitStatus) -> Self {
ProcessError::ProcessExited(value)
}
}
21 changes: 21 additions & 0 deletions src/supervisor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub mod context;
mod error;
pub mod runner;

/// The Runner trait defines the entry-point interface for a supervisor. Exposes a run method that will start the supervised process' execution.
pub trait Runner {
type E: std::error::Error + Send + Sync;
type H: Handle;

/// The run method will execute a supervisor (non-blocking). Returns a [`Handle`] to manage the running process.
fn run(self) -> Self::H;
}

/// The Handle trait defines the interface for a supervised process' handle. Exposes a stop method that will cancel the supervised process' execution.
pub trait Handle {
type E: std::error::Error + Send + Sync;
type S: Send + Sync;

/// Cancels the supervised process and returns its inner handle.
fn stop(self) -> Self::S;
}
137 changes: 137 additions & 0 deletions src/supervisor/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::{
ops::Deref,
sync::mpsc::Sender,
sync::{Arc, Condvar, Mutex},
thread::{self, JoinHandle},
};

use crate::command::{
stream::OutputEvent, wait_exit_timeout_default, CommandExecutor, CommandHandle,
CommandTerminator, OutputStreamer, ProcessRunner, ProcessTerminator,
};

use super::{context::SupervisorContext, error::ProcessError, Handle, Runner};

use log::error;

pub struct Stopped {
bin: String,
args: Vec<String>,
ctx: SupervisorContext,
snd: Sender<OutputEvent>,
}
pub struct Running {
handle: JoinHandle<()>,
ctx: SupervisorContext,
}

#[derive(Debug)]
pub struct SupervisorRunner<State = Stopped> {
state: State,
}

impl<T> Deref for SupervisorRunner<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.state
}
}

impl Runner for SupervisorRunner<Stopped> {
type E = ProcessError;
type H = SupervisorRunner<Running>;

fn run(self) -> Self::H {
let ctx = self.ctx.clone();
SupervisorRunner {
state: Running {
handle: run_process_thread(self),
ctx,
},
}
}
}

impl From<&SupervisorRunner<Stopped>> for ProcessRunner {
fn from(value: &SupervisorRunner<Stopped>) -> Self {
ProcessRunner::new(&value.bin, &value.args)
}
}

fn run_process_thread(runner: SupervisorRunner<Stopped>) -> JoinHandle<()> {
thread::spawn({
move || loop {
let proc_runner = ProcessRunner::from(&runner);

// Actually run the process
let started = match proc_runner.start() {
Ok(s) => s,
Err(e) => {
error!("Failed to start a supervised process: {}", e);
continue;
}
};

// Stream the output
let streaming = match started.stream(runner.snd.clone()) {
Ok(s) => s,
Err(e) => {
error!("Failed to stream the output of a supervised process: {}", e);
continue;
}
};

_ = wait_for_termination(streaming.get_pid(), runner.ctx.clone());
_ = streaming.wait().unwrap();

let (lck, _) = SupervisorContext::get_lock_cvar(&runner.ctx);
let val = lck.lock().unwrap();
if *val {
break;
}
}
})
}

/// Blocks on the [`SupervisorContext`], [`ctx`]. When the termination signal is activated, this will send a shutdown signal to the process being supervised (the one whose PID was passed as [`pid`]).
fn wait_for_termination(pid: u32, ctx: SupervisorContext) -> JoinHandle<()> {
thread::spawn(move || {
let (lck, cvar) = SupervisorContext::get_lock_cvar(&ctx);
_ = cvar.wait_while(lck.lock().unwrap(), |finish| !*finish);

thread::spawn(move || {
let shutdown_ctx = Arc::new((Mutex::new(false), Condvar::new()));
_ = ProcessTerminator::new(pid).shutdown(|| wait_exit_timeout_default(shutdown_ctx));
});
})
}

impl Handle for SupervisorRunner<Running> {
type E = ProcessError;
type S = JoinHandle<()>;

fn stop(self) -> Self::S {
// Stop all the supervisors
// TODO: handle PoisonErrors (log?)
self.ctx.cancel_all().unwrap();
self.state.handle
}
}

impl SupervisorRunner<Stopped> {
pub fn new(
bin: String,
args: Vec<String>,
ctx: SupervisorContext,
snd: Sender<OutputEvent>,
) -> Self {
SupervisorRunner {
state: Stopped {
bin,
args,
ctx,
snd,
},
}
}
}
4 changes: 2 additions & 2 deletions tests/command/blocking_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ fn blocking_stop_runner() {
let mut proc: ProcessRunner = ProcessRunner::from(&agent);

// run the process with wrong parameter
assert_eq!(proc.run().unwrap().success(), false);
assert!(!proc.run().unwrap().success());

agent.agent_args = vec!["0.1".to_string()];

proc = ProcessRunner::from(&agent);

// run the process with correct parameter
assert_eq!(proc.run().unwrap().success(), true);
assert!(proc.run().unwrap().success());
}
Loading

0 comments on commit 245ee7e

Please sign in to comment.