Fix state management in the scheduler (#337)

This commit is contained in:
Cheick Keita
2020-11-24 04:43:51 -08:00
committed by GitHub
parent 32ba86be9d
commit d47124fe8c
5 changed files with 121 additions and 43 deletions

View File

@ -86,21 +86,17 @@ impl Agent {
async fn update(&mut self) -> Result<bool> {
let last = self.scheduler.take().ok_or_else(scheduler_error)?;
let next = match last {
Scheduler::Free(s) => self.free(s).await?,
Scheduler::SettingUp(s) => self.setting_up(s).await?,
Scheduler::PendingReboot(s) => self.pending_reboot(s).await?,
Scheduler::Ready(s) => self.ready(s).await?,
Scheduler::Busy(s) => self.busy(s).await?,
Scheduler::Done(s) => self.done(s).await?,
let previous_state = NodeState::from(&last);
let (next, done) = match last {
Scheduler::Free(s) => (self.free(s).await?, false),
Scheduler::SettingUp(s) => (self.setting_up(s).await?, false),
Scheduler::PendingReboot(s) => (self.pending_reboot(s).await?, false),
Scheduler::Ready(s) => (self.ready(s).await?, false),
Scheduler::Busy(s) => (self.busy(s).await?, false),
Scheduler::Done(s) => (self.done(s).await?, true),
};
self.previous_state = NodeState::from(&next);
let done = matches!(next, Scheduler::Done(..));
self.previous_state = previous_state;
self.scheduler = Some(next);
Ok(done)
}

View File

@ -11,6 +11,8 @@ use crate::setup::double::*;
use crate::work::double::*;
use crate::work::*;
use crate::worker::double::*;
use crate::worker::WorkerEvent;
use onefuzz::process::ExitStatus;
use super::*;
@ -119,3 +121,101 @@ async fn test_update_free_has_work() {
let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
assert_eq!(double.claimed, &[Fixture.receipt()]);
}
#[tokio::test]
async fn test_emitted_state() {
let mut agent = Agent {
worker_runner: Box::new(WorkerRunnerDouble {
child: ChildDouble {
exit_status: Some(ExitStatus {
code: Some(0),
signal: None,
success: true,
}),
..ChildDouble::default()
},
}),
..Fixture.agent()
};
agent
.work_queue
.downcast_mut::<WorkQueueDouble>()
.unwrap()
.available
.push(Fixture.message());
for _i in 0..10 {
if agent.update().await.unwrap() {
break;
}
}
let expected_events: Vec<NodeEvent> = vec![
NodeEvent::StateUpdate(StateUpdateEvent::Free),
NodeEvent::StateUpdate(StateUpdateEvent::SettingUp {
tasks: vec![Fixture.task_id()],
}),
NodeEvent::StateUpdate(StateUpdateEvent::Ready),
NodeEvent::StateUpdate(StateUpdateEvent::Busy),
NodeEvent::WorkerEvent(WorkerEvent::Running {
task_id: Fixture.task_id(),
}),
NodeEvent::WorkerEvent(WorkerEvent::Done {
task_id: Fixture.task_id(),
exit_status: ExitStatus {
code: Some(0),
signal: None,
success: true,
},
stderr: String::default(),
stdout: String::default(),
}),
NodeEvent::StateUpdate(StateUpdateEvent::Done {
error: None,
script_output: None,
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
assert_eq!(events, &expected_events);
}
#[tokio::test]
async fn test_emitted_state_failed_setup() {
let error_message = "Failed setup";
let mut agent = Agent {
setup_runner: Box::new(SetupRunnerDouble {
error_message: Some(String::from(error_message)),
..SetupRunnerDouble::default()
}),
..Fixture.agent()
};
agent
.work_queue
.downcast_mut::<WorkQueueDouble>()
.unwrap()
.available
.push(Fixture.message());
for _i in 0..10 {
if agent.update().await.unwrap() {
break;
}
}
let expected_events: Vec<NodeEvent> = vec![
NodeEvent::StateUpdate(StateUpdateEvent::Free),
NodeEvent::StateUpdate(StateUpdateEvent::SettingUp {
tasks: vec![Fixture.task_id()],
}),
NodeEvent::StateUpdate(StateUpdateEvent::Done {
error: Some(String::from(error_message)),
script_output: None,
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
assert_eq!(events, &expected_events);
}

View File

@ -7,12 +7,16 @@ use super::*;
pub struct SetupRunnerDouble {
pub ran: Vec<WorkSet>,
pub script: SetupOutput,
pub error_message: Option<String>,
}
#[async_trait]
impl ISetupRunner for SetupRunnerDouble {
async fn run(&mut self, work_set: &WorkSet) -> Result<SetupOutput> {
self.ran.push(work_set.clone());
if let Some(error) = self.error_message.clone() {
anyhow::bail!(error);
}
Ok(self.script.clone())
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the MIT License.
use crate::work::WorkUnit;
use crate::worker::double::ChildDouble;
use super::*;
@ -58,36 +59,6 @@ impl IWorkerRunner for RunnerDouble {
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ChildDouble {
id: u64,
exit_status: Option<ExitStatus>,
stderr: String,
stdout: String,
killed: bool,
}
impl IWorkerChild for ChildDouble {
fn try_wait(&mut self) -> Result<Option<Output>> {
let output = if let Some(exit_status) = self.exit_status {
Some(Output {
exit_status,
stderr: self.stderr.clone(),
stdout: self.stdout.clone(),
})
} else {
None
};
Ok(output)
}
fn kill(&mut self) -> Result<()> {
self.killed = true;
Ok(())
}
}
#[tokio::test]
async fn test_ready_run() {
let mut runner = Fixture.runner(Fixture.child_running());

View File

@ -190,6 +190,13 @@ def on_worker_event_running(
# (as happens in 1.0.0 agents)
task.on_start()
task_event = TaskEvent(
task_id=task.task_id,
machine_id=machine_id,
event_data=WorkerEvent(running=event),
)
task_event.save()
return None