mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-12 01:58:18 +00:00
Link VMSS nodes and tasks when setting up (#43)
Add a (backwards-compatible) data field to node state update events. Use this to link nodes and tasks as soon as the tasks have been claimed. Add a new task-level `setting_up` state to encode this.
This commit is contained in:
@ -54,7 +54,8 @@ impl Agent {
|
||||
// `Free`. If it has started up after a work set-requested reboot, the
|
||||
// state will be `Ready`.
|
||||
if let Some(Scheduler::Free(..)) = &self.scheduler {
|
||||
self.coordinator.emit_event(NodeState::Init.into()).await?;
|
||||
let event = StateUpdateEvent::Init.into();
|
||||
self.coordinator.emit_event(event).await?;
|
||||
}
|
||||
|
||||
loop {
|
||||
@ -94,7 +95,8 @@ impl Agent {
|
||||
}
|
||||
|
||||
async fn free(&mut self, state: State<Free>) -> Result<Scheduler> {
|
||||
self.coordinator.emit_event(NodeState::Free.into()).await?;
|
||||
let event = StateUpdateEvent::Free.into();
|
||||
self.coordinator.emit_event(event).await?;
|
||||
|
||||
let msg = self.work_queue.poll().await?;
|
||||
|
||||
@ -165,9 +167,9 @@ impl Agent {
|
||||
async fn setting_up(&mut self, state: State<SettingUp>) -> Result<Scheduler> {
|
||||
verbose!("agent setting up");
|
||||
|
||||
self.coordinator
|
||||
.emit_event(NodeState::SettingUp.into())
|
||||
.await?;
|
||||
let tasks = state.work_set().task_ids();
|
||||
let event = StateUpdateEvent::SettingUp { tasks };
|
||||
self.coordinator.emit_event(event.into()).await?;
|
||||
|
||||
let scheduler = match state.finish(self.setup_runner.as_mut()).await? {
|
||||
SetupDone::Ready(s) => s.into(),
|
||||
@ -180,9 +182,8 @@ impl Agent {
|
||||
async fn pending_reboot(&mut self, state: State<PendingReboot>) -> Result<Scheduler> {
|
||||
verbose!("agent pending reboot");
|
||||
|
||||
self.coordinator
|
||||
.emit_event(NodeState::Rebooting.into())
|
||||
.await?;
|
||||
let event = StateUpdateEvent::Rebooting.into();
|
||||
self.coordinator.emit_event(event).await?;
|
||||
|
||||
let ctx = state.reboot_context();
|
||||
self.reboot.save_context(ctx).await?;
|
||||
@ -194,13 +195,15 @@ impl Agent {
|
||||
async fn ready(&mut self, state: State<Ready>) -> Result<Scheduler> {
|
||||
verbose!("agent ready");
|
||||
|
||||
self.coordinator.emit_event(NodeState::Ready.into()).await?;
|
||||
let event = StateUpdateEvent::Ready.into();
|
||||
self.coordinator.emit_event(event).await?;
|
||||
|
||||
Ok(state.run().await?.into())
|
||||
}
|
||||
|
||||
async fn busy(&mut self, state: State<Busy>) -> Result<Scheduler> {
|
||||
self.coordinator.emit_event(NodeState::Busy.into()).await?;
|
||||
let event = StateUpdateEvent::Busy.into();
|
||||
self.coordinator.emit_event(event).await?;
|
||||
|
||||
let mut events = vec![];
|
||||
let updated = state
|
||||
@ -217,7 +220,8 @@ impl Agent {
|
||||
async fn done(&mut self, state: State<Done>) -> Result<Scheduler> {
|
||||
verbose!("agent done");
|
||||
|
||||
self.coordinator.emit_event(NodeState::Done.into()).await?;
|
||||
let event = StateUpdateEvent::Done.into();
|
||||
self.coordinator.emit_event(event).await?;
|
||||
|
||||
// `Done` is a final state.
|
||||
Ok(state.into())
|
||||
|
@ -78,11 +78,11 @@ impl From<WorkerEvent> for NodeEvent {
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case", tag = "state")]
|
||||
#[serde(rename_all = "snake_case", tag = "state", content = "data")]
|
||||
pub enum StateUpdateEvent {
|
||||
Init,
|
||||
Free,
|
||||
SettingUp,
|
||||
SettingUp { tasks: Vec<TaskId> },
|
||||
Rebooting,
|
||||
Ready,
|
||||
Busy,
|
||||
@ -95,28 +95,13 @@ impl From<StateUpdateEvent> for NodeEvent {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NodeState> for NodeEvent {
|
||||
fn from(state: NodeState) -> Self {
|
||||
let event = match state {
|
||||
NodeState::Init => StateUpdateEvent::Init,
|
||||
NodeState::Free => StateUpdateEvent::Free,
|
||||
NodeState::SettingUp => StateUpdateEvent::SettingUp,
|
||||
NodeState::Rebooting => StateUpdateEvent::Rebooting,
|
||||
NodeState::Ready => StateUpdateEvent::Ready,
|
||||
NodeState::Busy => StateUpdateEvent::Busy,
|
||||
NodeState::Done => StateUpdateEvent::Done,
|
||||
};
|
||||
|
||||
event.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TaskState {
|
||||
Init,
|
||||
Waiting,
|
||||
Scheduled,
|
||||
SettingUp,
|
||||
Running,
|
||||
Stopping,
|
||||
Stopped,
|
||||
|
@ -50,7 +50,19 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> {
|
||||
}
|
||||
|
||||
fn debug_node_event_state_update(state: NodeState) -> Result<()> {
|
||||
let event = state.into();
|
||||
let event = match state {
|
||||
NodeState::Init => StateUpdateEvent::Init,
|
||||
NodeState::Free => StateUpdateEvent::Free,
|
||||
NodeState::SettingUp => {
|
||||
let tasks = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
|
||||
StateUpdateEvent::SettingUp { tasks }
|
||||
}
|
||||
NodeState::Rebooting => StateUpdateEvent::Rebooting,
|
||||
NodeState::Ready => StateUpdateEvent::Ready,
|
||||
NodeState::Busy => StateUpdateEvent::Busy,
|
||||
NodeState::Done => StateUpdateEvent::Done,
|
||||
};
|
||||
let event = event.into();
|
||||
print_json(into_envelope(event))
|
||||
}
|
||||
|
||||
|
@ -153,6 +153,10 @@ impl State<SettingUp> {
|
||||
|
||||
Ok(done)
|
||||
}
|
||||
|
||||
pub fn work_set(&self) -> &WorkSet {
|
||||
&self.ctx.work_set
|
||||
}
|
||||
}
|
||||
|
||||
impl State<PendingReboot> {
|
||||
|
@ -24,6 +24,12 @@ pub struct WorkSet {
|
||||
pub work_units: Vec<WorkUnit>,
|
||||
}
|
||||
|
||||
impl WorkSet {
|
||||
pub fn task_ids(&self) -> Vec<TaskId> {
|
||||
self.work_units.iter().map(|w| w.task_id).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct WorkUnit {
|
||||
/// Job that the work is part of.
|
||||
|
@ -41,13 +41,47 @@ def get_node_checked(machine_id: UUID) -> Node:
|
||||
return node
|
||||
|
||||
|
||||
def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse:
|
||||
def on_state_update(
|
||||
machine_id: UUID,
|
||||
state_update: NodeStateUpdate,
|
||||
) -> func.HttpResponse:
|
||||
state = state_update.state
|
||||
node = get_node_checked(machine_id)
|
||||
|
||||
if state == NodeState.init or node.state not in NodeState.ready_for_reset():
|
||||
if node.state != state:
|
||||
node.state = state
|
||||
node.save()
|
||||
|
||||
if state == NodeState.setting_up:
|
||||
# This field will be required in the future.
|
||||
# For now, it is optional for back compat.
|
||||
if state_update.data:
|
||||
for task_id in state_update.data.tasks:
|
||||
task = get_task_checked(task_id)
|
||||
|
||||
# The task state may be `running` if it has `vm_count` > 1, and
|
||||
# another node is concurrently executing the task. If so, leave
|
||||
# the state as-is, to represent the max progress made.
|
||||
#
|
||||
# Other states we would want to preserve are excluded by the
|
||||
# outermost conditional check.
|
||||
if task.state != TaskState.running:
|
||||
task.state = TaskState.setting_up
|
||||
|
||||
# We don't yet call `on_start()` for the task.
|
||||
# This will happen once we see a worker event that
|
||||
# reports it as `running`.
|
||||
task.save()
|
||||
|
||||
# Note: we set the node task state to `setting_up`, even though
|
||||
# the task itself may be `running`.
|
||||
node_task = NodeTasks(
|
||||
machine_id=machine_id,
|
||||
task_id=task_id,
|
||||
state=NodeTaskState.setting_up,
|
||||
)
|
||||
node_task.save()
|
||||
else:
|
||||
logging.info("ignoring state updates from the node: %s: %s", machine_id, state)
|
||||
|
||||
@ -133,7 +167,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
|
||||
return not_ok(err, context=ERROR_CONTEXT)
|
||||
|
||||
if event.state_update:
|
||||
return on_state_update(envelope.machine_id, event.state_update.state)
|
||||
return on_state_update(envelope.machine_id, event.state_update)
|
||||
elif event.worker_event:
|
||||
return on_worker_event(envelope.machine_id, event.worker_event)
|
||||
else:
|
||||
|
@ -110,6 +110,7 @@ class TaskState(Enum):
|
||||
init = "init"
|
||||
waiting = "waiting"
|
||||
scheduled = "scheduled"
|
||||
setting_up = "setting_up"
|
||||
running = "running"
|
||||
stopping = "stopping"
|
||||
stopped = "stopped"
|
||||
@ -286,6 +287,7 @@ class Architecture(Enum):
|
||||
|
||||
class NodeTaskState(Enum):
|
||||
init = "init"
|
||||
setting_up = "setting_up"
|
||||
running = "running"
|
||||
|
||||
|
||||
|
@ -500,13 +500,33 @@ class WorkerEvent(EnumModel):
|
||||
running: Optional[WorkerRunningEvent]
|
||||
|
||||
|
||||
class SettingUpEventData(BaseModel):
|
||||
tasks: List[UUID]
|
||||
|
||||
|
||||
class NodeStateUpdate(BaseModel):
|
||||
state: NodeState
|
||||
data: Optional[SettingUpEventData]
|
||||
|
||||
@validator("data")
|
||||
def check_data(
|
||||
cls,
|
||||
data: Optional[SettingUpEventData],
|
||||
values: Any,
|
||||
) -> Optional[SettingUpEventData]:
|
||||
if data:
|
||||
state = values.get("state")
|
||||
if state and state != NodeState.setting_up:
|
||||
raise ValueError(
|
||||
"data for node state update event does not match state = %s" % state
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class NodeEvent(EnumModel):
|
||||
worker_event: Optional[WorkerEvent]
|
||||
state_update: Optional[NodeStateUpdate]
|
||||
worker_event: Optional[WorkerEvent]
|
||||
|
||||
|
||||
# Temporary shim type to support hot upgrade of 1.0.0 nodes.
|
||||
|
Reference in New Issue
Block a user