add node & task heartbeat events (#621)

This adds node & task heartbeats and makes the event data available as a structured data in the logs.
This commit is contained in:
bmc-msft
2021-03-02 17:04:39 -05:00
committed by GitHub
parent 7f66eeee0d
commit 4489036d9f
5 changed files with 538 additions and 3 deletions

View File

@ -29,6 +29,7 @@ Each event will be submitted via HTTP POST to the user provided URL.
* [job_stopped](#job_stopped) * [job_stopped](#job_stopped)
* [node_created](#node_created) * [node_created](#node_created)
* [node_deleted](#node_deleted) * [node_deleted](#node_deleted)
* [node_heartbeat](#node_heartbeat)
* [node_state_updated](#node_state_updated) * [node_state_updated](#node_state_updated)
* [ping](#ping) * [ping](#ping)
* [pool_created](#pool_created) * [pool_created](#pool_created)
@ -41,6 +42,7 @@ Each event will be submitted via HTTP POST to the user provided URL.
* [scaleset_failed](#scaleset_failed) * [scaleset_failed](#scaleset_failed)
* [task_created](#task_created) * [task_created](#task_created)
* [task_failed](#task_failed) * [task_failed](#task_failed)
* [task_heartbeat](#task_heartbeat)
* [task_state_updated](#task_state_updated) * [task_state_updated](#task_state_updated)
* [task_stopped](#task_stopped) * [task_stopped](#task_stopped)
@ -506,6 +508,47 @@ Each event will be submitted via HTTP POST to the user provided URL.
} }
``` ```
### node_heartbeat
#### Example
```json
{
"machine_id": "00000000-0000-0000-0000-000000000000",
"pool_name": "example"
}
```
#### Schema
```json
{
"additionalProperties": false,
"properties": {
"machine_id": {
"format": "uuid",
"title": "Machine Id",
"type": "string"
},
"pool_name": {
"title": "Pool Name",
"type": "string"
},
"scaleset_id": {
"format": "uuid",
"title": "Scaleset Id",
"type": "string"
}
},
"required": [
"machine_id",
"pool_name"
],
"title": "EventNodeHeartbeat",
"type": "object"
}
```
### node_state_updated ### node_state_updated
#### Example #### Example
@ -1982,6 +2025,404 @@ Each event will be submitted via HTTP POST to the user provided URL.
} }
``` ```
### task_heartbeat
#### Example
```json
{
"config": {
"containers": [
{
"name": "my-setup",
"type": "setup"
},
{
"name": "my-inputs",
"type": "inputs"
},
{
"name": "my-crashes",
"type": "crashes"
}
],
"job_id": "00000000-0000-0000-0000-000000000000",
"tags": {},
"task": {
"check_debugger": true,
"duration": 1,
"target_env": {},
"target_exe": "fuzz.exe",
"target_options": [],
"type": "libfuzzer_fuzz"
}
},
"job_id": "00000000-0000-0000-0000-000000000000",
"task_id": "00000000-0000-0000-0000-000000000000"
}
```
#### Schema
```json
{
"additionalProperties": false,
"definitions": {
"ContainerType": {
"description": "An enumeration.",
"enum": [
"analysis",
"coverage",
"crashes",
"inputs",
"no_repro",
"readonly_inputs",
"reports",
"setup",
"tools",
"unique_inputs",
"unique_reports"
],
"title": "ContainerType"
},
"StatsFormat": {
"description": "An enumeration.",
"enum": [
"AFL"
],
"title": "StatsFormat"
},
"TaskConfig": {
"properties": {
"colocate": {
"title": "Colocate",
"type": "boolean"
},
"containers": {
"items": {
"$ref": "#/definitions/TaskContainers"
},
"title": "Containers",
"type": "array"
},
"debug": {
"items": {
"$ref": "#/definitions/TaskDebugFlag"
},
"type": "array"
},
"job_id": {
"format": "uuid",
"title": "Job Id",
"type": "string"
},
"pool": {
"$ref": "#/definitions/TaskPool"
},
"prereq_tasks": {
"items": {
"format": "uuid",
"type": "string"
},
"title": "Prereq Tasks",
"type": "array"
},
"tags": {
"additionalProperties": {
"type": "string"
},
"title": "Tags",
"type": "object"
},
"task": {
"$ref": "#/definitions/TaskDetails"
},
"vm": {
"$ref": "#/definitions/TaskVm"
}
},
"required": [
"job_id",
"task",
"containers",
"tags"
],
"title": "TaskConfig",
"type": "object"
},
"TaskContainers": {
"properties": {
"name": {
"title": "Name",
"type": "string"
},
"type": {
"$ref": "#/definitions/ContainerType"
}
},
"required": [
"type",
"name"
],
"title": "TaskContainers",
"type": "object"
},
"TaskDebugFlag": {
"description": "An enumeration.",
"enum": [
"keep_node_on_failure",
"keep_node_on_completion"
],
"title": "TaskDebugFlag"
},
"TaskDetails": {
"properties": {
"analyzer_env": {
"additionalProperties": {
"type": "string"
},
"title": "Analyzer Env",
"type": "object"
},
"analyzer_exe": {
"title": "Analyzer Exe",
"type": "string"
},
"analyzer_options": {
"items": {
"type": "string"
},
"title": "Analyzer Options",
"type": "array"
},
"check_asan_log": {
"title": "Check Asan Log",
"type": "boolean"
},
"check_debugger": {
"default": true,
"title": "Check Debugger",
"type": "boolean"
},
"check_fuzzer_help": {
"title": "Check Fuzzer Help",
"type": "boolean"
},
"check_retry_count": {
"title": "Check Retry Count",
"type": "integer"
},
"duration": {
"title": "Duration",
"type": "integer"
},
"ensemble_sync_delay": {
"title": "Ensemble Sync Delay",
"type": "integer"
},
"expect_crash_on_failure": {
"title": "Expect Crash On Failure",
"type": "boolean"
},
"generator_env": {
"additionalProperties": {
"type": "string"
},
"title": "Generator Env",
"type": "object"
},
"generator_exe": {
"title": "Generator Exe",
"type": "string"
},
"generator_options": {
"items": {
"type": "string"
},
"title": "Generator Options",
"type": "array"
},
"preserve_existing_outputs": {
"title": "Preserve Existing Outputs",
"type": "boolean"
},
"reboot_after_setup": {
"title": "Reboot After Setup",
"type": "boolean"
},
"rename_output": {
"title": "Rename Output",
"type": "boolean"
},
"stats_file": {
"title": "Stats File",
"type": "string"
},
"stats_format": {
"$ref": "#/definitions/StatsFormat"
},
"supervisor_env": {
"additionalProperties": {
"type": "string"
},
"title": "Supervisor Env",
"type": "object"
},
"supervisor_exe": {
"title": "Supervisor Exe",
"type": "string"
},
"supervisor_input_marker": {
"title": "Supervisor Input Marker",
"type": "string"
},
"supervisor_options": {
"items": {
"type": "string"
},
"title": "Supervisor Options",
"type": "array"
},
"target_env": {
"additionalProperties": {
"type": "string"
},
"title": "Target Env",
"type": "object"
},
"target_exe": {
"title": "Target Exe",
"type": "string"
},
"target_options": {
"items": {
"type": "string"
},
"title": "Target Options",
"type": "array"
},
"target_options_merge": {
"title": "Target Options Merge",
"type": "boolean"
},
"target_timeout": {
"title": "Target Timeout",
"type": "integer"
},
"target_workers": {
"title": "Target Workers",
"type": "integer"
},
"type": {
"$ref": "#/definitions/TaskType"
},
"wait_for_files": {
"$ref": "#/definitions/ContainerType"
}
},
"required": [
"type",
"duration"
],
"title": "TaskDetails",
"type": "object"
},
"TaskPool": {
"properties": {
"count": {
"title": "Count",
"type": "integer"
},
"pool_name": {
"title": "Pool Name",
"type": "string"
}
},
"required": [
"count",
"pool_name"
],
"title": "TaskPool",
"type": "object"
},
"TaskType": {
"description": "An enumeration.",
"enum": [
"libfuzzer_fuzz",
"libfuzzer_coverage",
"libfuzzer_crash_report",
"libfuzzer_merge",
"generic_analysis",
"generic_supervisor",
"generic_merge",
"generic_generator",
"generic_crash_report"
],
"title": "TaskType"
},
"TaskVm": {
"properties": {
"count": {
"default": 1,
"title": "Count",
"type": "integer"
},
"image": {
"title": "Image",
"type": "string"
},
"reboot_after_setup": {
"title": "Reboot After Setup",
"type": "boolean"
},
"region": {
"title": "Region",
"type": "string"
},
"sku": {
"title": "Sku",
"type": "string"
},
"spot_instances": {
"default": false,
"title": "Spot Instances",
"type": "boolean"
}
},
"required": [
"region",
"sku",
"image"
],
"title": "TaskVm",
"type": "object"
}
},
"properties": {
"config": {
"$ref": "#/definitions/TaskConfig"
},
"job_id": {
"format": "uuid",
"title": "Job Id",
"type": "string"
},
"task_id": {
"format": "uuid",
"title": "Task Id",
"type": "string"
}
},
"required": [
"job_id",
"task_id",
"config"
],
"title": "EventTaskHeartbeat",
"type": "object"
}
```
### task_state_updated ### task_state_updated
#### Example #### Example
@ -3108,6 +3549,31 @@ Each event will be submitted via HTTP POST to the user provided URL.
"title": "EventNodeDeleted", "title": "EventNodeDeleted",
"type": "object" "type": "object"
}, },
"EventNodeHeartbeat": {
"additionalProperties": false,
"properties": {
"machine_id": {
"format": "uuid",
"title": "Machine Id",
"type": "string"
},
"pool_name": {
"title": "Pool Name",
"type": "string"
},
"scaleset_id": {
"format": "uuid",
"title": "Scaleset Id",
"type": "string"
}
},
"required": [
"machine_id",
"pool_name"
],
"title": "EventNodeHeartbeat",
"type": "object"
},
"EventNodeStateUpdated": { "EventNodeStateUpdated": {
"additionalProperties": false, "additionalProperties": false,
"properties": { "properties": {
@ -3385,6 +3851,31 @@ Each event will be submitted via HTTP POST to the user provided URL.
"title": "EventTaskFailed", "title": "EventTaskFailed",
"type": "object" "type": "object"
}, },
"EventTaskHeartbeat": {
"additionalProperties": false,
"properties": {
"config": {
"$ref": "#/definitions/TaskConfig"
},
"job_id": {
"format": "uuid",
"title": "Job Id",
"type": "string"
},
"task_id": {
"format": "uuid",
"title": "Task Id",
"type": "string"
}
},
"required": [
"job_id",
"task_id",
"config"
],
"title": "EventTaskHeartbeat",
"type": "object"
},
"EventTaskStateUpdated": { "EventTaskStateUpdated": {
"additionalProperties": false, "additionalProperties": false,
"properties": { "properties": {
@ -3469,7 +3960,9 @@ Each event will be submitted via HTTP POST to the user provided URL.
"task_state_updated", "task_state_updated",
"task_stopped", "task_stopped",
"crash_reported", "crash_reported",
"file_added" "file_added",
"task_heartbeat",
"node_heartbeat"
], ],
"title": "EventType" "title": "EventType"
}, },
@ -3962,6 +4455,9 @@ Each event will be submitted via HTTP POST to the user provided URL.
{ {
"$ref": "#/definitions/EventNodeDeleted" "$ref": "#/definitions/EventNodeDeleted"
}, },
{
"$ref": "#/definitions/EventNodeHeartbeat"
},
{ {
"$ref": "#/definitions/EventPing" "$ref": "#/definitions/EventPing"
}, },
@ -4001,6 +4497,9 @@ Each event will be submitted via HTTP POST to the user provided URL.
{ {
"$ref": "#/definitions/EventTaskStopped" "$ref": "#/definitions/EventTaskStopped"
}, },
{
"$ref": "#/definitions/EventTaskHeartbeat"
},
{ {
"$ref": "#/definitions/EventCrashReported" "$ref": "#/definitions/EventCrashReported"
}, },

View File

@ -8,10 +8,11 @@ import json
import logging import logging
import azure.functions as func import azure.functions as func
from onefuzztypes.events import EventNodeHeartbeat
from onefuzztypes.models import NodeHeartbeatEntry from onefuzztypes.models import NodeHeartbeatEntry
from pydantic import ValidationError from pydantic import ValidationError
from ..onefuzzlib.events import get_events from ..onefuzzlib.events import get_events, send_event
from ..onefuzzlib.workers.nodes import Node from ..onefuzzlib.workers.nodes import Node
@ -27,6 +28,13 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
return return
node.heartbeat = datetime.datetime.utcnow() node.heartbeat = datetime.datetime.utcnow()
node.save() node.save()
send_event(
EventNodeHeartbeat(
machine_id=node.machine_id,
scaleset_id=node.scaleset_id,
pool_name=node.pool_name,
)
)
except ValidationError: except ValidationError:
logging.error("invalid node heartbeat: %s", raw) logging.error("invalid node heartbeat: %s", raw)

View File

@ -8,10 +8,11 @@ import logging
from datetime import datetime from datetime import datetime
import azure.functions as func import azure.functions as func
from onefuzztypes.events import EventTaskHeartbeat
from onefuzztypes.models import Error, TaskHeartbeatEntry from onefuzztypes.models import Error, TaskHeartbeatEntry
from pydantic import ValidationError from pydantic import ValidationError
from ..onefuzzlib.events import get_events from ..onefuzzlib.events import get_events, send_event
from ..onefuzzlib.tasks.main import Task from ..onefuzzlib.tasks.main import Task
@ -29,6 +30,11 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
if task: if task:
task.heartbeat = datetime.utcnow() task.heartbeat = datetime.utcnow()
task.save() task.save()
send_event(
EventTaskHeartbeat(
job_id=task.job_id, task_id=task.task_id, config=task.config
)
)
except ValidationError: except ValidationError:
logging.error("invalid task heartbeat: %s", raw) logging.error("invalid task heartbeat: %s", raw)

View File

@ -47,6 +47,8 @@ from onefuzztypes.events import (
EventNodeStateUpdated, EventNodeStateUpdated,
EventNodeCreated, EventNodeCreated,
EventNodeDeleted, EventNodeDeleted,
EventNodeHeartbeat,
EventTaskHeartbeat,
get_event_type, get_event_type,
EventType, EventType,
) )
@ -194,6 +196,8 @@ def main():
), ),
), ),
EventFileAdded(container=Container("container-name"), filename="example.txt"), EventFileAdded(container=Container("container-name"), filename="example.txt"),
EventNodeHeartbeat(machine_id=UUID(int=0), pool_name="example"),
EventTaskHeartbeat(task_id=UUID(int=0), job_id=UUID(int=0), config=task_config),
] ]
for event in Event.__args__: for event in Event.__args__:

View File

@ -63,6 +63,12 @@ class EventTaskStateUpdated(BaseEvent):
config: TaskConfig config: TaskConfig
class EventTaskHeartbeat(BaseEvent):
job_id: UUID
task_id: UUID
config: TaskConfig
class EventPing(BaseResponse): class EventPing(BaseResponse):
ping_id: UUID ping_id: UUID
@ -118,6 +124,12 @@ class EventNodeCreated(BaseEvent):
pool_name: PoolName pool_name: PoolName
class EventNodeHeartbeat(BaseEvent):
machine_id: UUID
scaleset_id: Optional[UUID]
pool_name: PoolName
class EventNodeDeleted(BaseEvent): class EventNodeDeleted(BaseEvent):
machine_id: UUID machine_id: UUID
scaleset_id: Optional[UUID] scaleset_id: Optional[UUID]
@ -148,6 +160,7 @@ Event = Union[
EventNodeStateUpdated, EventNodeStateUpdated,
EventNodeCreated, EventNodeCreated,
EventNodeDeleted, EventNodeDeleted,
EventNodeHeartbeat,
EventPing, EventPing,
EventPoolCreated, EventPoolCreated,
EventPoolDeleted, EventPoolDeleted,
@ -161,6 +174,7 @@ Event = Union[
EventTaskStateUpdated, EventTaskStateUpdated,
EventTaskCreated, EventTaskCreated,
EventTaskStopped, EventTaskStopped,
EventTaskHeartbeat,
EventCrashReported, EventCrashReported,
EventFileAdded, EventFileAdded,
] ]
@ -187,6 +201,8 @@ class EventType(Enum):
task_stopped = "task_stopped" task_stopped = "task_stopped"
crash_reported = "crash_reported" crash_reported = "crash_reported"
file_added = "file_added" file_added = "file_added"
task_heartbeat = "task_heartbeat"
node_heartbeat = "node_heartbeat"
EventTypeMap = { EventTypeMap = {
@ -195,6 +211,7 @@ EventTypeMap = {
EventType.node_created: EventNodeCreated, EventType.node_created: EventNodeCreated,
EventType.node_deleted: EventNodeDeleted, EventType.node_deleted: EventNodeDeleted,
EventType.node_state_updated: EventNodeStateUpdated, EventType.node_state_updated: EventNodeStateUpdated,
EventType.node_heartbeat: EventNodeHeartbeat,
EventType.ping: EventPing, EventType.ping: EventPing,
EventType.pool_created: EventPoolCreated, EventType.pool_created: EventPoolCreated,
EventType.pool_deleted: EventPoolDeleted, EventType.pool_deleted: EventPoolDeleted,
@ -207,6 +224,7 @@ EventTypeMap = {
EventType.task_created: EventTaskCreated, EventType.task_created: EventTaskCreated,
EventType.task_failed: EventTaskFailed, EventType.task_failed: EventTaskFailed,
EventType.task_state_updated: EventTaskStateUpdated, EventType.task_state_updated: EventTaskStateUpdated,
EventType.task_heartbeat: EventTaskHeartbeat,
EventType.task_stopped: EventTaskStopped, EventType.task_stopped: EventTaskStopped,
EventType.crash_reported: EventCrashReported, EventType.crash_reported: EventCrashReported,
EventType.file_added: EventFileAdded, EventType.file_added: EventFileAdded,