Refactor internal node event schemas (#29)

This commit is contained in:
Joe Ranweiler
2020-09-29 13:30:33 -07:00
committed by GitHub
parent 9dee8f12f0
commit dbb83e1496
6 changed files with 104 additions and 50 deletions

View File

@ -65,21 +65,49 @@ pub struct NodeEventEnvelope {
} }
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)] #[serde(rename_all = "snake_case")]
pub enum NodeEvent { pub enum NodeEvent {
StateUpdate { state: NodeState }, StateUpdate(StateUpdateEvent),
WorkerEvent { event: WorkerEvent }, WorkerEvent(WorkerEvent),
}
impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
NodeEvent::StateUpdate { state }
}
} }
impl From<WorkerEvent> for NodeEvent { impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self { fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent { event } NodeEvent::WorkerEvent(event)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")]
pub enum StateUpdateEvent {
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
}
impl From<StateUpdateEvent> for NodeEvent {
fn from(event: StateUpdateEvent) -> Self {
NodeEvent::StateUpdate(event)
}
}
impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
let event = match state {
NodeState::Init => StateUpdateEvent::Init,
NodeState::Free => StateUpdateEvent::Free,
NodeState::SettingUp => StateUpdateEvent::SettingUp,
NodeState::Rebooting => StateUpdateEvent::Rebooting,
NodeState::Ready => StateUpdateEvent::Ready,
NodeState::Busy => StateUpdateEvent::Busy,
NodeState::Done => StateUpdateEvent::Done,
};
event.into()
} }
} }

View File

@ -50,7 +50,7 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> {
} }
fn debug_node_event_state_update(state: NodeState) -> Result<()> { fn debug_node_event_state_update(state: NodeState) -> Result<()> {
let event = NodeEvent::StateUpdate { state }; let event = state.into();
print_json(into_envelope(event)) print_json(into_envelope(event))
} }
@ -95,7 +95,7 @@ fn debug_node_event_worker_event(opt: WorkerEventOpt) -> Result<()> {
} }
} }
}; };
let event = NodeEvent::WorkerEvent { event }; let event = NodeEvent::WorkerEvent(event);
print_json(into_envelope(event)) print_json(into_envelope(event))
} }

View File

@ -11,7 +11,7 @@ use crate::process::*;
use crate::work::*; use crate::work::*;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)] #[serde(rename_all = "snake_case")]
pub enum WorkerEvent { pub enum WorkerEvent {
Running { Running {
task_id: TaskId, task_id: TaskId,

View File

@ -10,11 +10,10 @@ import azure.functions as func
from onefuzztypes.enums import ErrorCode, NodeState, NodeTaskState, TaskState from onefuzztypes.enums import ErrorCode, NodeState, NodeTaskState, TaskState
from onefuzztypes.models import ( from onefuzztypes.models import (
Error, Error,
NodeEvent,
NodeEventEnvelope, NodeEventEnvelope,
NodeStateUpdate, NodeStateUpdate,
WorkerDoneEvent,
WorkerEvent, WorkerEvent,
WorkerRunningEvent,
) )
from onefuzztypes.responses import BoolResult from onefuzztypes.responses import BoolResult
@ -55,28 +54,33 @@ def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse:
return ok(BoolResult(result=True)) return ok(BoolResult(result=True))
def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpResponse: def on_worker_event(machine_id: UUID, event: WorkerEvent) -> func.HttpResponse:
task_id = worker_event.event.task_id if event.running:
task_id = event.running.task_id
elif event.done:
task_id = event.done.task_id
task = get_task_checked(task_id) task = get_task_checked(task_id)
node = get_node_checked(machine_id) node = get_node_checked(machine_id)
node_task = NodeTasks( node_task = NodeTasks(
machine_id=machine_id, task_id=task_id, state=NodeTaskState.running machine_id=machine_id, task_id=task_id, state=NodeTaskState.running
) )
if isinstance(worker_event.event, WorkerRunningEvent): if event.running:
if task.state not in TaskState.shutting_down(): if task.state not in TaskState.shutting_down():
task.state = TaskState.running task.state = TaskState.running
if node.state not in NodeState.ready_for_reset(): if node.state not in NodeState.ready_for_reset():
node.state = NodeState.busy node.state = NodeState.busy
node_task.save() node_task.save()
task.on_start() task.on_start()
elif isinstance(worker_event.event, WorkerDoneEvent): elif event.done:
# only record exit status if the task isn't already shutting down. # Only record exit status if the task isn't already shutting down.
# #
# the agent failing because resources vanish out from underneath it during # It's ok for the agent to fail because resources vanish out from underneath
# deletion is OK # it during deletion.
if task.state not in TaskState.shutting_down(): if task.state not in TaskState.shutting_down():
exit_status = worker_event.event.exit_status exit_status = event.done.exit_status
if not exit_status.success: if not exit_status.success:
logging.error("task failed: status = %s", exit_status) logging.error("task failed: status = %s", exit_status)
@ -84,8 +88,8 @@ def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpRes
code=ErrorCode.TASK_FAILED, code=ErrorCode.TASK_FAILED,
errors=[ errors=[
"task failed. exit_status = %s" % exit_status, "task failed. exit_status = %s" % exit_status,
worker_event.event.stdout, event.done.stdout,
worker_event.event.stderr, event.done.stderr,
], ],
) )
@ -103,7 +107,7 @@ def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpRes
task.save() task.save()
node.save() node.save()
task_event = TaskEvent( task_event = TaskEvent(
task_id=task_id, machine_id=machine_id, event_data=worker_event task_id=task_id, machine_id=machine_id, event_data=event
) )
task_event.save() task_event.save()
return ok(BoolResult(result=True)) return ok(BoolResult(result=True))
@ -120,10 +124,20 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
envelope.event, envelope.event,
) )
if isinstance(envelope.event, NodeStateUpdate): if isinstance(envelope.event, NodeEvent):
return on_state_update(envelope.machine_id, envelope.event.state) event = envelope.event
elif isinstance(envelope.event, NodeStateUpdate):
event = NodeEvent(state_update=envelope.event)
elif isinstance(envelope.event, WorkerEvent): elif isinstance(envelope.event, WorkerEvent):
return on_worker_event(envelope.machine_id, envelope.event) event = NodeEvent(worker_event=envelope.event)
else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT)
if event.state_update:
return on_state_update(envelope.machine_id, event.state_update.state)
elif event.worker_event:
return on_worker_event(envelope.machine_id, event.worker_event)
else: else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"]) err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT) return not_ok(err, context=ERROR_CONTEXT)

View File

@ -7,12 +7,7 @@ from typing import List, Optional, Tuple
from uuid import UUID from uuid import UUID
from onefuzztypes.models import TaskEvent as BASE_TASK_EVENT from onefuzztypes.models import TaskEvent as BASE_TASK_EVENT
from onefuzztypes.models import ( from onefuzztypes.models import TaskEventSummary, WorkerEvent
TaskEventSummary,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from .orm import ORMMixin from .orm import ORMMixin
@ -26,8 +21,8 @@ class TaskEvent(BASE_TASK_EVENT, ORMMixin):
return [ return [
TaskEventSummary( TaskEventSummary(
timestamp=e.Timestamp, timestamp=e.Timestamp,
event_data=cls.get_event_data(e.event_data), event_data=get_event_data(e.event_data),
event_type=type(e.event_data.event).__name__, event_type=get_event_type(e.event_data),
) )
for e in events for e in events
] ]
@ -36,12 +31,20 @@ class TaskEvent(BASE_TASK_EVENT, ORMMixin):
def key_fields(cls) -> Tuple[str, Optional[str]]: def key_fields(cls) -> Tuple[str, Optional[str]]:
return ("task_id", None) return ("task_id", None)
@classmethod
def get_event_data(cls, worker_event: WorkerEvent) -> str: def get_event_data(event: WorkerEvent) -> str:
event = worker_event.event if event.done:
if isinstance(event, WorkerDoneEvent): return "exit status: %s" % event.done.exit_status
return "exit status: %s" % event.exit_status elif event.running:
elif isinstance(event, WorkerRunningEvent): return ""
return "" else:
else: return "Unrecognized event: %s" % event
return "Unrecognized event: %s" % event
def get_event_type(event: WorkerEvent) -> str:
if event.done:
return type(event.done).__name__
elif event.running:
return type(event.running).__name__
else:
return "Unrecognized event: %s" % event

View File

@ -495,20 +495,29 @@ class WorkerDoneEvent(BaseModel):
stdout: str stdout: str
class WorkerEvent(BaseModel): class WorkerEvent(EnumModel):
event: Union[WorkerDoneEvent, WorkerRunningEvent] done: Optional[WorkerDoneEvent]
running: Optional[WorkerRunningEvent]
class NodeStateUpdate(BaseModel): class NodeStateUpdate(BaseModel):
state: NodeState state: NodeState
NodeEvent = Union[WorkerEvent, NodeStateUpdate] class NodeEvent(EnumModel):
worker_event: Optional[WorkerEvent]
state_update: Optional[NodeStateUpdate]
# Temporary shim type to support hot upgrade of 1.0.0 nodes.
#
# We want future variants to use an externally-tagged repr.
NodeEventShim = Union[NodeEvent, WorkerEvent, NodeStateUpdate]
class NodeEventEnvelope(BaseModel): class NodeEventEnvelope(BaseModel):
machine_id: UUID machine_id: UUID
event: NodeEvent event: NodeEventShim
class StopNodeCommand(BaseModel): class StopNodeCommand(BaseModel):