mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-23 14:57:06 +00:00
Update can_schedule
check to support node reimaging (#35)
- Include version in `can_schedule` check, expect rich response - Check if work can be scheduled before claiming - If work is stopped, claim and drop it - If node is outdated, don't claim work
This commit is contained in:
@ -101,29 +101,58 @@ impl Agent {
|
||||
let next = if let Some(msg) = msg {
|
||||
verbose!("received work set message: {:?}", msg);
|
||||
|
||||
let claim = self.work_queue.claim(msg.receipt).await;
|
||||
let can_schedule = self.coordinator.can_schedule(&msg.work_set).await?;
|
||||
|
||||
if let Err(err) = claim {
|
||||
error!("unable to claim work set: {}", err);
|
||||
if can_schedule.allowed {
|
||||
info!("claiming work set: {:?}", msg.work_set);
|
||||
|
||||
// Stay in `Free` state.
|
||||
state.into()
|
||||
} else {
|
||||
info!("claimed work set: {:?}", msg.work_set);
|
||||
let claim = self.work_queue.claim(msg.receipt).await;
|
||||
|
||||
if self.coordinator.can_schedule(&msg.work_set).await? {
|
||||
info!("scheduling work set: {:?}", msg.work_set);
|
||||
if let Err(err) = claim {
|
||||
error!("unable to claim work set: {}", err);
|
||||
|
||||
// We were unable to claim the work set, so it will reappear in the pool's
|
||||
// work queue when the visibility timeout expires. Don't execute the work,
|
||||
// or else another node will pick it up, and it will be double-scheduled.
|
||||
//
|
||||
// Stay in the `Free` state.
|
||||
state.into()
|
||||
} else {
|
||||
info!("claimed work set: {:?}", msg.work_set);
|
||||
|
||||
// We are allowed to schedule this work, and we have claimed it, so no other
|
||||
// node will see it.
|
||||
//
|
||||
// Transition to `SettingUp` state.
|
||||
let state = state.schedule(msg.work_set.clone());
|
||||
state.into()
|
||||
} else {
|
||||
// We have claimed the work set, so it is no longer in the work queue.
|
||||
// But since the work has been stopped, we will not execute it. Drop the
|
||||
// work set message and stay in the `Free` state.
|
||||
warn!("unable to schedule work set: {:?}", msg.work_set);
|
||||
state.into()
|
||||
}
|
||||
} else {
|
||||
// We cannot schedule the work set. Depending on why, we want to either drop the work
|
||||
// (because it is no longer valid for anyone) or do nothing (because our version is out
|
||||
// of date, and we want another node to pick it up).
|
||||
warn!("unable to schedule work set: {:?}", msg.work_set);
|
||||
|
||||
// If `work_stopped`, the work set is not valid for any node, and we should drop it for the
|
||||
// entire pool by claiming but not executing it.
|
||||
if can_schedule.work_stopped {
|
||||
if let Err(err) = self.work_queue.claim(msg.receipt).await {
|
||||
error!("unable to drop stopped work: {}", err);
|
||||
} else {
|
||||
info!("dropped stopped work set: {:?}", msg.work_set);
|
||||
}
|
||||
} else {
|
||||
// Otherwise, the work was not stopped, but we still should not execute it. This is likely
|
||||
// our because agent version is out of date. Do nothing, so another node can see the work.
|
||||
// The service will eventually send us a stop command and reimage our node, if appropriate.
|
||||
verbose!(
|
||||
"not scheduling active work set, not dropping: {:?}",
|
||||
msg.work_set
|
||||
);
|
||||
}
|
||||
|
||||
// Stay in `Free` state.
|
||||
state.into()
|
||||
}
|
||||
} else {
|
||||
self.sleep().await;
|
||||
|
@ -95,11 +95,26 @@ pub enum TaskState {
|
||||
WaitJob,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct TaskSearch {
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct CanScheduleRequest {
|
||||
machine_id: Uuid,
|
||||
task_id: Uuid,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct CanSchedule {
|
||||
/// If true, then the receiving node can schedule the work.
|
||||
/// Otherwise, the receiver should inspect `work_stopped`.
|
||||
pub allowed: bool,
|
||||
|
||||
/// If `true`, then the work was stopped after being scheduled to the pool's
|
||||
/// work queue, but before being claimed by a node.
|
||||
///
|
||||
/// No node in the pool may schedule the work, so the receiving node should
|
||||
/// claim (delete) and drop the work set.
|
||||
pub work_stopped: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct TaskInfo {
|
||||
job_id: Uuid,
|
||||
@ -113,7 +128,7 @@ pub trait ICoordinator: Downcast {
|
||||
|
||||
async fn emit_event(&mut self, event: NodeEvent) -> Result<()>;
|
||||
|
||||
async fn can_schedule(&mut self, work: &WorkSet) -> Result<bool>;
|
||||
async fn can_schedule(&mut self, work: &WorkSet) -> Result<CanSchedule>;
|
||||
}
|
||||
|
||||
impl_downcast!(ICoordinator);
|
||||
@ -128,7 +143,7 @@ impl ICoordinator for Coordinator {
|
||||
self.emit_event(event).await
|
||||
}
|
||||
|
||||
async fn can_schedule(&mut self, work_set: &WorkSet) -> Result<bool> {
|
||||
async fn can_schedule(&mut self, work_set: &WorkSet) -> Result<CanSchedule> {
|
||||
self.can_schedule(work_set).await
|
||||
}
|
||||
}
|
||||
@ -181,15 +196,11 @@ impl Coordinator {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn can_schedule(&mut self, work_set: &WorkSet) -> Result<bool> {
|
||||
async fn can_schedule(&mut self, work_set: &WorkSet) -> Result<CanSchedule> {
|
||||
let request = RequestType::CanSchedule(work_set);
|
||||
let response = self.send_with_auth_retry(request).await?;
|
||||
|
||||
let task_info: TaskInfo = response.json().await?;
|
||||
|
||||
verbose!("task_info = {:?}", task_info);
|
||||
|
||||
let can_schedule = task_info.state == TaskState::Scheduled;
|
||||
let can_schedule: CanSchedule = response.json().await?;
|
||||
|
||||
Ok(can_schedule)
|
||||
}
|
||||
@ -285,17 +296,20 @@ impl Coordinator {
|
||||
// need to make sure that other the work units in the set have their states
|
||||
// updated if necessary.
|
||||
let task_id = work_set.work_units[0].task_id;
|
||||
let task_search = TaskSearch { task_id };
|
||||
let can_schedule = CanScheduleRequest {
|
||||
machine_id: self.registration.machine_id,
|
||||
task_id,
|
||||
};
|
||||
|
||||
verbose!("getting task info for task ID = {}", task_id);
|
||||
verbose!("checking if able to schedule task ID = {}", task_id);
|
||||
|
||||
let mut url = self.registration.config.onefuzz_url.clone();
|
||||
url.set_path("/api/tasks");
|
||||
url.set_path("/api/agents/can_schedule");
|
||||
let request = self
|
||||
.client
|
||||
.get(url)
|
||||
.post(url)
|
||||
.bearer_auth(self.token.secret().expose_ref())
|
||||
.json(&task_search)
|
||||
.json(&can_schedule)
|
||||
.build()?;
|
||||
|
||||
Ok(request)
|
||||
|
@ -20,7 +20,10 @@ impl ICoordinator for CoordinatorDouble {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn can_schedule(&mut self, _work: &WorkSet) -> Result<bool> {
|
||||
Ok(true)
|
||||
async fn can_schedule(&mut self, _work: &WorkSet) -> Result<CanSchedule> {
|
||||
Ok(CanSchedule {
|
||||
allowed: true,
|
||||
work_stopped: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -13,12 +13,12 @@ from onefuzztypes.responses import CanSchedule
|
||||
|
||||
from ..onefuzzlib.agent_authorization import verify_token
|
||||
from ..onefuzzlib.pools import Node, NodeMessage
|
||||
from ..onefuzzlib.request import not_ok, ok, parse_uri
|
||||
from ..onefuzzlib.request import not_ok, ok, parse_request
|
||||
from ..onefuzzlib.tasks.main import Task
|
||||
|
||||
|
||||
def post(req: func.HttpRequest) -> func.HttpResponse:
|
||||
request = parse_uri(CanScheduleRequest, req)
|
||||
request = parse_request(CanScheduleRequest, req)
|
||||
if isinstance(request, Error):
|
||||
return not_ok(request, context="CanScheduleRequest")
|
||||
|
||||
@ -31,7 +31,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
|
||||
|
||||
allowed = True
|
||||
work_stopped = False
|
||||
if node.is_outdated:
|
||||
if node.is_outdated():
|
||||
logging.info(
|
||||
"received can_schedule request from outdated node '%s' version '%s'",
|
||||
node.machine_id,
|
||||
@ -46,6 +46,10 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
|
||||
task = Task.get_by_task_id(request.task_id)
|
||||
|
||||
work_stopped = isinstance(task, Error) or (task.state != TaskState.scheduled)
|
||||
|
||||
if work_stopped:
|
||||
allowed = False
|
||||
|
||||
return ok(CanSchedule(allowed=allowed, work_stopped=work_stopped))
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user