diff --git a/docs/webhook_events.md b/docs/webhook_events.md index 23afba598..cd33d0dbe 100644 --- a/docs/webhook_events.md +++ b/docs/webhook_events.md @@ -10,16 +10,1071 @@ Each event will be submitted via HTTP POST to the user provided URL. ```json { - "webhook_id": "00000000-0000-0000-0000-000000000000", "event_id": "00000000-0000-0000-0000-000000000000", "event_type": "ping", "event": { "ping_id": "00000000-0000-0000-0000-000000000000" + }, + "webhook_id": "00000000-0000-0000-0000-000000000000" +} +``` + +## Event Types (EventType) + +* [crash_reported](#crash_reported) +* [file_added](#file_added) +* [job_created](#job_created) +* [job_stopped](#job_stopped) +* [node_created](#node_created) +* [node_deleted](#node_deleted) +* [node_state_updated](#node_state_updated) +* [ping](#ping) +* [pool_created](#pool_created) +* [pool_deleted](#pool_deleted) +* [proxy_created](#proxy_created) +* [proxy_deleted](#proxy_deleted) +* [proxy_failed](#proxy_failed) +* [scaleset_created](#scaleset_created) +* [scaleset_deleted](#scaleset_deleted) +* [scaleset_failed](#scaleset_failed) +* [task_created](#task_created) +* [task_failed](#task_failed) +* [task_state_updated](#task_state_updated) +* [task_stopped](#task_stopped) + +### crash_reported + +#### Example + +```json +{ + "report": { + "input_blob": { + "account": "contoso-storage-account", + "container": "crashes", + "name": "input.txt" + }, + "executable": "fuzz.exe", + "crash_type": "example crash report type", + "crash_site": "example crash site", + "call_stack": [ + "#0 line", + "#1 line", + "#2 line" + ], + "call_stack_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "input_sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + "asan_log": "example asan log", + "task_id": "00000000-0000-0000-0000-000000000000", + "job_id": "00000000-0000-0000-0000-000000000000", + "scariness_score": 10, + "scariness_description": "example-scariness" + }, + "container": "container-name", + "filename": "example.json" +} +``` + +#### Schema + +```json +{ + "title": "EventCrashReported", + "type": "object", + "properties": { + "report": { + "$ref": "#/definitions/Report" + }, + "container": { + "title": "Container", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "report", + "container", + "filename" + ], + "additionalProperties": false, + "definitions": { + "BlobRef": { + "title": "BlobRef", + "type": "object", + "properties": { + "account": { + "title": "Account", + "type": "string" + }, + "container": { + "title": "Container", + "type": "string" + }, + "name": { + "title": "Name", + "type": "string" + } + }, + "required": [ + "account", + "container", + "name" + ] + }, + "Report": { + "title": "Report", + "type": "object", + "properties": { + "input_url": { + "title": "Input Url", + "type": "string" + }, + "input_blob": { + "$ref": "#/definitions/BlobRef" + }, + "executable": { + "title": "Executable", + "type": "string" + }, + "crash_type": { + "title": "Crash Type", + "type": "string" + }, + "crash_site": { + "title": "Crash Site", + "type": "string" + }, + "call_stack": { + "title": "Call Stack", + "type": "array", + "items": { + "type": "string" + } + }, + "call_stack_sha256": { + "title": "Call Stack Sha256", + "type": "string" + }, + "input_sha256": { + "title": "Input Sha256", + "type": "string" + }, + "asan_log": { + "title": "Asan Log", + "type": "string" + }, + "task_id": { + "title": "Task Id", + "type": "string", + "format": "uuid" + }, + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "scariness_score": { + "title": "Scariness Score", + "type": "integer" + }, + "scariness_description": { + "title": "Scariness Description", + "type": "string" + } + }, + "required": [ + "input_blob", + "executable", + "crash_type", + "crash_site", + "call_stack", + "call_stack_sha256", + "input_sha256", + "task_id", + "job_id" + ] + } } } ``` -## Event Types (WebhookEventType) +### file_added + +#### Example + +```json +{ + "container": "container-name", + "filename": "example.txt" +} +``` + +#### Schema + +```json +{ + "title": "EventFileAdded", + "type": "object", + "properties": { + "container": { + "title": "Container", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "container", + "filename" + ], + "additionalProperties": false +} +``` + +### job_created + +#### Example + +```json +{ + "job_id": "00000000-0000-0000-0000-000000000000", + "config": { + "project": "example project", + "name": "example name", + "build": "build 1", + "duration": 24 + } +} +``` + +#### Schema + +```json +{ + "title": "EventJobCreated", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "config": { + "$ref": "#/definitions/JobConfig" + }, + "user_info": { + "$ref": "#/definitions/UserInfo" + } + }, + "required": [ + "job_id", + "config" + ], + "additionalProperties": false, + "definitions": { + "JobConfig": { + "title": "JobConfig", + "type": "object", + "properties": { + "project": { + "title": "Project", + "type": "string" + }, + "name": { + "title": "Name", + "type": "string" + }, + "build": { + "title": "Build", + "type": "string" + }, + "duration": { + "title": "Duration", + "type": "integer" + } + }, + "required": [ + "project", + "name", + "build", + "duration" + ] + }, + "UserInfo": { + "title": "UserInfo", + "type": "object", + "properties": { + "application_id": { + "title": "Application Id", + "type": "string", + "format": "uuid" + }, + "object_id": { + "title": "Object Id", + "type": "string", + "format": "uuid" + }, + "upn": { + "title": "Upn", + "type": "string" + } + }, + "required": [ + "application_id" + ] + } + } +} +``` + +### job_stopped + +#### Example + +```json +{ + "job_id": "00000000-0000-0000-0000-000000000000", + "config": { + "project": "example project", + "name": "example name", + "build": "build 1", + "duration": 24 + } +} +``` + +#### Schema + +```json +{ + "title": "EventJobStopped", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "config": { + "$ref": "#/definitions/JobConfig" + }, + "user_info": { + "$ref": "#/definitions/UserInfo" + } + }, + "required": [ + "job_id", + "config" + ], + "additionalProperties": false, + "definitions": { + "JobConfig": { + "title": "JobConfig", + "type": "object", + "properties": { + "project": { + "title": "Project", + "type": "string" + }, + "name": { + "title": "Name", + "type": "string" + }, + "build": { + "title": "Build", + "type": "string" + }, + "duration": { + "title": "Duration", + "type": "integer" + } + }, + "required": [ + "project", + "name", + "build", + "duration" + ] + }, + "UserInfo": { + "title": "UserInfo", + "type": "object", + "properties": { + "application_id": { + "title": "Application Id", + "type": "string", + "format": "uuid" + }, + "object_id": { + "title": "Object Id", + "type": "string", + "format": "uuid" + }, + "upn": { + "title": "Upn", + "type": "string" + } + }, + "required": [ + "application_id" + ] + } + } +} +``` + +### node_created + +#### Example + +```json +{ + "machine_id": "00000000-0000-0000-0000-000000000000", + "pool_name": "example" +} +``` + +#### Schema + +```json +{ + "title": "EventNodeCreated", + "type": "object", + "properties": { + "machine_id": { + "title": "Machine Id", + "type": "string", + "format": "uuid" + }, + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "machine_id", + "pool_name" + ], + "additionalProperties": false +} +``` + +### node_deleted + +#### Example + +```json +{ + "machine_id": "00000000-0000-0000-0000-000000000000", + "pool_name": "example" +} +``` + +#### Schema + +```json +{ + "title": "EventNodeDeleted", + "type": "object", + "properties": { + "machine_id": { + "title": "Machine Id", + "type": "string", + "format": "uuid" + }, + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "machine_id", + "pool_name" + ], + "additionalProperties": false +} +``` + +### node_state_updated + +#### Example + +```json +{ + "machine_id": "00000000-0000-0000-0000-000000000000", + "pool_name": "example", + "state": "setting_up" +} +``` + +#### Schema + +```json +{ + "title": "EventNodeStateUpdated", + "type": "object", + "properties": { + "machine_id": { + "title": "Machine Id", + "type": "string", + "format": "uuid" + }, + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "state": { + "$ref": "#/definitions/NodeState" + } + }, + "required": [ + "machine_id", + "pool_name", + "state" + ], + "additionalProperties": false, + "definitions": { + "NodeState": { + "title": "NodeState", + "description": "An enumeration.", + "enum": [ + "init", + "free", + "setting_up", + "rebooting", + "ready", + "busy", + "done", + "shutdown", + "halt" + ] + } + } +} +``` + +### ping + +#### Example + +```json +{ + "ping_id": "00000000-0000-0000-0000-000000000000" +} +``` + +#### Schema + +```json +{ + "title": "EventPing", + "type": "object", + "properties": { + "ping_id": { + "title": "Ping Id", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "ping_id" + ] +} +``` + +### pool_created + +#### Example + +```json +{ + "pool_name": "example", + "os": "linux", + "arch": "x86_64", + "managed": true +} +``` + +#### Schema + +```json +{ + "title": "EventPoolCreated", + "type": "object", + "properties": { + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "os": { + "$ref": "#/definitions/OS" + }, + "arch": { + "$ref": "#/definitions/Architecture" + }, + "managed": { + "title": "Managed", + "type": "boolean" + }, + "autoscale": { + "$ref": "#/definitions/AutoScaleConfig" + } + }, + "required": [ + "pool_name", + "os", + "arch", + "managed" + ], + "additionalProperties": false, + "definitions": { + "OS": { + "title": "OS", + "description": "An enumeration.", + "enum": [ + "windows", + "linux" + ] + }, + "Architecture": { + "title": "Architecture", + "description": "An enumeration.", + "enum": [ + "x86_64" + ] + }, + "AutoScaleConfig": { + "title": "AutoScaleConfig", + "type": "object", + "properties": { + "image": { + "title": "Image", + "type": "string" + }, + "max_size": { + "title": "Max Size", + "type": "integer" + }, + "min_size": { + "title": "Min Size", + "default": 0, + "type": "integer" + }, + "region": { + "title": "Region", + "type": "string" + }, + "scaleset_size": { + "title": "Scaleset Size", + "type": "integer" + }, + "spot_instances": { + "title": "Spot Instances", + "default": false, + "type": "boolean" + }, + "vm_sku": { + "title": "Vm Sku", + "type": "string" + } + }, + "required": [ + "image", + "scaleset_size", + "vm_sku" + ] + } + } +} +``` + +### pool_deleted + +#### Example + +```json +{ + "pool_name": "example" +} +``` + +#### Schema + +```json +{ + "title": "EventPoolDeleted", + "type": "object", + "properties": { + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "pool_name" + ], + "additionalProperties": false +} +``` + +### proxy_created + +#### Example + +```json +{ + "region": "eastus" +} +``` + +#### Schema + +```json +{ + "title": "EventProxyCreated", + "type": "object", + "properties": { + "region": { + "title": "Region", + "type": "string" + } + }, + "required": [ + "region" + ], + "additionalProperties": false +} +``` + +### proxy_deleted + +#### Example + +```json +{ + "region": "eastus" +} +``` + +#### Schema + +```json +{ + "title": "EventProxyDeleted", + "type": "object", + "properties": { + "region": { + "title": "Region", + "type": "string" + } + }, + "required": [ + "region" + ], + "additionalProperties": false +} +``` + +### proxy_failed + +#### Example + +```json +{ + "region": "eastus", + "error": { + "code": 472, + "errors": [ + "example error message" + ] + } +} +``` + +#### Schema + +```json +{ + "title": "EventProxyFailed", + "type": "object", + "properties": { + "region": { + "title": "Region", + "type": "string" + }, + "error": { + "$ref": "#/definitions/Error" + } + }, + "required": [ + "region", + "error" + ], + "additionalProperties": false, + "definitions": { + "ErrorCode": { + "title": "ErrorCode", + "description": "An enumeration.", + "enum": [ + 450, + 451, + 452, + 453, + 454, + 455, + 456, + 457, + 458, + 459, + 460, + 461, + 462, + 463, + 464, + 465, + 467, + 468, + 469, + 470, + 471, + 472 + ] + }, + "Error": { + "title": "Error", + "type": "object", + "properties": { + "code": { + "$ref": "#/definitions/ErrorCode" + }, + "errors": { + "title": "Errors", + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "code", + "errors" + ] + } + } +} +``` + +### scaleset_created + +#### Example + +```json +{ + "scaleset_id": "00000000-0000-0000-0000-000000000000", + "pool_name": "example", + "vm_sku": "Standard_D2s_v3", + "image": "Canonical:UbuntuServer:18.04-LTS:latest", + "region": "eastus", + "size": 10 +} +``` + +#### Schema + +```json +{ + "title": "EventScalesetCreated", + "type": "object", + "properties": { + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "vm_sku": { + "title": "Vm Sku", + "type": "string" + }, + "image": { + "title": "Image", + "type": "string" + }, + "region": { + "title": "Region", + "type": "string" + }, + "size": { + "title": "Size", + "type": "integer" + } + }, + "required": [ + "scaleset_id", + "pool_name", + "vm_sku", + "image", + "region", + "size" + ], + "additionalProperties": false +} +``` + +### scaleset_deleted + +#### Example + +```json +{ + "scaleset_id": "00000000-0000-0000-0000-000000000000", + "pool_name": "example" +} +``` + +#### Schema + +```json +{ + "title": "EventScalesetDeleted", + "type": "object", + "properties": { + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "scaleset_id", + "pool_name" + ], + "additionalProperties": false +} +``` + +### scaleset_failed + +#### Example + +```json +{ + "scaleset_id": "00000000-0000-0000-0000-000000000000", + "pool_name": "example", + "error": { + "code": 456, + "errors": [ + "example error message" + ] + } +} +``` + +#### Schema + +```json +{ + "title": "EventScalesetFailed", + "type": "object", + "properties": { + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "error": { + "$ref": "#/definitions/Error" + } + }, + "required": [ + "scaleset_id", + "pool_name", + "error" + ], + "additionalProperties": false, + "definitions": { + "ErrorCode": { + "title": "ErrorCode", + "description": "An enumeration.", + "enum": [ + 450, + 451, + 452, + 453, + 454, + 455, + 456, + 457, + 458, + 459, + 460, + 461, + 462, + 463, + 464, + 465, + 467, + 468, + 469, + 470, + 471, + 472 + ] + }, + "Error": { + "title": "Error", + "type": "object", + "properties": { + "code": { + "$ref": "#/definitions/ErrorCode" + }, + "errors": { + "title": "Errors", + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "code", + "errors" + ] + } + } +} +``` ### task_created @@ -67,7 +1122,7 @@ Each event will be submitted via HTTP POST to the user provided URL. ```json { - "title": "WebhookEventTaskCreated", + "title": "EventTaskCreated", "type": "object", "properties": { "job_id": { @@ -92,6 +1147,7 @@ Each event will be submitted via HTTP POST to the user provided URL. "task_id", "config" ], + "additionalProperties": false, "definitions": { "TaskType": { "title": "TaskType", @@ -453,75 +1509,6 @@ Each event will be submitted via HTTP POST to the user provided URL. } ``` -### task_stopped - -#### Example - -```json -{ - "job_id": "00000000-0000-0000-0000-000000000000", - "task_id": "00000000-0000-0000-0000-000000000000", - "user_info": { - "application_id": "00000000-0000-0000-0000-000000000000", - "object_id": "00000000-0000-0000-0000-000000000000", - "upn": "example@contoso.com" - } -} -``` - -#### Schema - -```json -{ - "title": "WebhookEventTaskStopped", - "type": "object", - "properties": { - "job_id": { - "title": "Job Id", - "type": "string", - "format": "uuid" - }, - "task_id": { - "title": "Task Id", - "type": "string", - "format": "uuid" - }, - "user_info": { - "$ref": "#/definitions/UserInfo" - } - }, - "required": [ - "job_id", - "task_id" - ], - "definitions": { - "UserInfo": { - "title": "UserInfo", - "type": "object", - "properties": { - "application_id": { - "title": "Application Id", - "type": "string", - "format": "uuid" - }, - "object_id": { - "title": "Object Id", - "type": "string", - "format": "uuid" - }, - "upn": { - "title": "Upn", - "type": "string" - } - }, - "required": [ - "application_id" - ] - } - } -} -``` - ### task_failed #### Example @@ -548,7 +1535,7 @@ Each event will be submitted via HTTP POST to the user provided URL. ```json { - "title": "WebhookEventTaskFailed", + "title": "EventTaskFailed", "type": "object", "properties": { "job_id": { @@ -573,6 +1560,7 @@ Each event will be submitted via HTTP POST to the user provided URL. "task_id", "error" ], + "additionalProperties": false, "definitions": { "ErrorCode": { "title": "ErrorCode", @@ -598,7 +1586,8 @@ Each event will be submitted via HTTP POST to the user provided URL. 468, 469, 470, - 471 + 471, + 472 ] }, "Error": { @@ -648,13 +1637,15 @@ Each event will be submitted via HTTP POST to the user provided URL. } ``` -### ping +### task_state_updated #### Example ```json { - "ping_id": "00000000-0000-0000-0000-000000000000" + "job_id": "00000000-0000-0000-0000-000000000000", + "task_id": "00000000-0000-0000-0000-000000000000", + "state": "init" } ``` @@ -662,13 +1653,118 @@ Each event will be submitted via HTTP POST to the user provided URL. ```json { - "title": "WebhookEventPing", + "title": "EventTaskStateUpdated", "type": "object", "properties": { - "ping_id": { - "title": "Ping Id", + "job_id": { + "title": "Job Id", "type": "string", "format": "uuid" + }, + "task_id": { + "title": "Task Id", + "type": "string", + "format": "uuid" + }, + "state": { + "$ref": "#/definitions/TaskState" + }, + "end_time": { + "title": "End Time", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "job_id", + "task_id", + "state" + ], + "additionalProperties": false, + "definitions": { + "TaskState": { + "title": "TaskState", + "description": "An enumeration.", + "enum": [ + "init", + "waiting", + "scheduled", + "setting_up", + "running", + "stopping", + "stopped", + "wait_job" + ] + } + } +} +``` + +### task_stopped + +#### Example + +```json +{ + "job_id": "00000000-0000-0000-0000-000000000000", + "task_id": "00000000-0000-0000-0000-000000000000", + "user_info": { + "application_id": "00000000-0000-0000-0000-000000000000", + "object_id": "00000000-0000-0000-0000-000000000000", + "upn": "example@contoso.com" + } +} +``` + +#### Schema + +```json +{ + "title": "EventTaskStopped", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "task_id": { + "title": "Task Id", + "type": "string", + "format": "uuid" + }, + "user_info": { + "$ref": "#/definitions/UserInfo" + } + }, + "required": [ + "job_id", + "task_id" + ], + "additionalProperties": false, + "definitions": { + "UserInfo": { + "title": "UserInfo", + "type": "object", + "properties": { + "application_id": { + "title": "Application Id", + "type": "string", + "format": "uuid" + }, + "object_id": { + "title": "Object Id", + "type": "string", + "format": "uuid" + }, + "upn": { + "title": "Upn", + "type": "string" + } + }, + "required": [ + "application_id" + ] } } } @@ -681,53 +1777,671 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "WebhookMessage", "type": "object", "properties": { - "webhook_id": { - "title": "Webhook Id", - "type": "string", - "format": "uuid" - }, "event_id": { "title": "Event Id", "type": "string", "format": "uuid" }, "event_type": { - "$ref": "#/definitions/WebhookEventType" + "$ref": "#/definitions/EventType" }, "event": { "title": "Event", "anyOf": [ { - "$ref": "#/definitions/WebhookEventTaskCreated" + "$ref": "#/definitions/EventJobCreated" }, { - "$ref": "#/definitions/WebhookEventTaskStopped" + "$ref": "#/definitions/EventJobStopped" }, { - "$ref": "#/definitions/WebhookEventTaskFailed" + "$ref": "#/definitions/EventNodeStateUpdated" }, { - "$ref": "#/definitions/WebhookEventPing" + "$ref": "#/definitions/EventNodeCreated" + }, + { + "$ref": "#/definitions/EventNodeDeleted" + }, + { + "$ref": "#/definitions/EventPing" + }, + { + "$ref": "#/definitions/EventPoolCreated" + }, + { + "$ref": "#/definitions/EventPoolDeleted" + }, + { + "$ref": "#/definitions/EventProxyFailed" + }, + { + "$ref": "#/definitions/EventProxyCreated" + }, + { + "$ref": "#/definitions/EventProxyDeleted" + }, + { + "$ref": "#/definitions/EventScalesetFailed" + }, + { + "$ref": "#/definitions/EventScalesetCreated" + }, + { + "$ref": "#/definitions/EventScalesetDeleted" + }, + { + "$ref": "#/definitions/EventTaskFailed" + }, + { + "$ref": "#/definitions/EventTaskStateUpdated" + }, + { + "$ref": "#/definitions/EventTaskCreated" + }, + { + "$ref": "#/definitions/EventTaskStopped" + }, + { + "$ref": "#/definitions/EventCrashReported" + }, + { + "$ref": "#/definitions/EventFileAdded" } ] + }, + "webhook_id": { + "title": "Webhook Id", + "type": "string", + "format": "uuid" } }, "required": [ - "webhook_id", "event_type", - "event" + "event", + "webhook_id" ], + "additionalProperties": false, "definitions": { - "WebhookEventType": { - "title": "WebhookEventType", + "EventType": { + "title": "EventType", "description": "An enumeration.", "enum": [ + "job_created", + "job_stopped", + "node_created", + "node_deleted", + "node_state_updated", + "ping", + "pool_created", + "pool_deleted", + "proxy_created", + "proxy_deleted", + "proxy_failed", + "scaleset_created", + "scaleset_deleted", + "scaleset_failed", "task_created", - "task_stopped", "task_failed", - "ping" + "task_state_updated", + "task_stopped", + "crash_reported", + "file_added" ] }, + "JobConfig": { + "title": "JobConfig", + "type": "object", + "properties": { + "project": { + "title": "Project", + "type": "string" + }, + "name": { + "title": "Name", + "type": "string" + }, + "build": { + "title": "Build", + "type": "string" + }, + "duration": { + "title": "Duration", + "type": "integer" + } + }, + "required": [ + "project", + "name", + "build", + "duration" + ] + }, + "UserInfo": { + "title": "UserInfo", + "type": "object", + "properties": { + "application_id": { + "title": "Application Id", + "type": "string", + "format": "uuid" + }, + "object_id": { + "title": "Object Id", + "type": "string", + "format": "uuid" + }, + "upn": { + "title": "Upn", + "type": "string" + } + }, + "required": [ + "application_id" + ] + }, + "EventJobCreated": { + "title": "EventJobCreated", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "config": { + "$ref": "#/definitions/JobConfig" + }, + "user_info": { + "$ref": "#/definitions/UserInfo" + } + }, + "required": [ + "job_id", + "config" + ], + "additionalProperties": false + }, + "EventJobStopped": { + "title": "EventJobStopped", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "config": { + "$ref": "#/definitions/JobConfig" + }, + "user_info": { + "$ref": "#/definitions/UserInfo" + } + }, + "required": [ + "job_id", + "config" + ], + "additionalProperties": false + }, + "NodeState": { + "title": "NodeState", + "description": "An enumeration.", + "enum": [ + "init", + "free", + "setting_up", + "rebooting", + "ready", + "busy", + "done", + "shutdown", + "halt" + ] + }, + "EventNodeStateUpdated": { + "title": "EventNodeStateUpdated", + "type": "object", + "properties": { + "machine_id": { + "title": "Machine Id", + "type": "string", + "format": "uuid" + }, + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "state": { + "$ref": "#/definitions/NodeState" + } + }, + "required": [ + "machine_id", + "pool_name", + "state" + ], + "additionalProperties": false + }, + "EventNodeCreated": { + "title": "EventNodeCreated", + "type": "object", + "properties": { + "machine_id": { + "title": "Machine Id", + "type": "string", + "format": "uuid" + }, + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "machine_id", + "pool_name" + ], + "additionalProperties": false + }, + "EventNodeDeleted": { + "title": "EventNodeDeleted", + "type": "object", + "properties": { + "machine_id": { + "title": "Machine Id", + "type": "string", + "format": "uuid" + }, + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "machine_id", + "pool_name" + ], + "additionalProperties": false + }, + "EventPing": { + "title": "EventPing", + "type": "object", + "properties": { + "ping_id": { + "title": "Ping Id", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "ping_id" + ] + }, + "OS": { + "title": "OS", + "description": "An enumeration.", + "enum": [ + "windows", + "linux" + ] + }, + "Architecture": { + "title": "Architecture", + "description": "An enumeration.", + "enum": [ + "x86_64" + ] + }, + "AutoScaleConfig": { + "title": "AutoScaleConfig", + "type": "object", + "properties": { + "image": { + "title": "Image", + "type": "string" + }, + "max_size": { + "title": "Max Size", + "type": "integer" + }, + "min_size": { + "title": "Min Size", + "default": 0, + "type": "integer" + }, + "region": { + "title": "Region", + "type": "string" + }, + "scaleset_size": { + "title": "Scaleset Size", + "type": "integer" + }, + "spot_instances": { + "title": "Spot Instances", + "default": false, + "type": "boolean" + }, + "vm_sku": { + "title": "Vm Sku", + "type": "string" + } + }, + "required": [ + "image", + "scaleset_size", + "vm_sku" + ] + }, + "EventPoolCreated": { + "title": "EventPoolCreated", + "type": "object", + "properties": { + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "os": { + "$ref": "#/definitions/OS" + }, + "arch": { + "$ref": "#/definitions/Architecture" + }, + "managed": { + "title": "Managed", + "type": "boolean" + }, + "autoscale": { + "$ref": "#/definitions/AutoScaleConfig" + } + }, + "required": [ + "pool_name", + "os", + "arch", + "managed" + ], + "additionalProperties": false + }, + "EventPoolDeleted": { + "title": "EventPoolDeleted", + "type": "object", + "properties": { + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "pool_name" + ], + "additionalProperties": false + }, + "ErrorCode": { + "title": "ErrorCode", + "description": "An enumeration.", + "enum": [ + 450, + 451, + 452, + 453, + 454, + 455, + 456, + 457, + 458, + 459, + 460, + 461, + 462, + 463, + 464, + 465, + 467, + 468, + 469, + 470, + 471, + 472 + ] + }, + "Error": { + "title": "Error", + "type": "object", + "properties": { + "code": { + "$ref": "#/definitions/ErrorCode" + }, + "errors": { + "title": "Errors", + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "code", + "errors" + ] + }, + "EventProxyFailed": { + "title": "EventProxyFailed", + "type": "object", + "properties": { + "region": { + "title": "Region", + "type": "string" + }, + "error": { + "$ref": "#/definitions/Error" + } + }, + "required": [ + "region", + "error" + ], + "additionalProperties": false + }, + "EventProxyCreated": { + "title": "EventProxyCreated", + "type": "object", + "properties": { + "region": { + "title": "Region", + "type": "string" + } + }, + "required": [ + "region" + ], + "additionalProperties": false + }, + "EventProxyDeleted": { + "title": "EventProxyDeleted", + "type": "object", + "properties": { + "region": { + "title": "Region", + "type": "string" + } + }, + "required": [ + "region" + ], + "additionalProperties": false + }, + "EventScalesetFailed": { + "title": "EventScalesetFailed", + "type": "object", + "properties": { + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "error": { + "$ref": "#/definitions/Error" + } + }, + "required": [ + "scaleset_id", + "pool_name", + "error" + ], + "additionalProperties": false + }, + "EventScalesetCreated": { + "title": "EventScalesetCreated", + "type": "object", + "properties": { + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + }, + "vm_sku": { + "title": "Vm Sku", + "type": "string" + }, + "image": { + "title": "Image", + "type": "string" + }, + "region": { + "title": "Region", + "type": "string" + }, + "size": { + "title": "Size", + "type": "integer" + } + }, + "required": [ + "scaleset_id", + "pool_name", + "vm_sku", + "image", + "region", + "size" + ], + "additionalProperties": false + }, + "EventScalesetDeleted": { + "title": "EventScalesetDeleted", + "type": "object", + "properties": { + "scaleset_id": { + "title": "Scaleset Id", + "type": "string", + "format": "uuid" + }, + "pool_name": { + "title": "Pool Name", + "type": "string" + } + }, + "required": [ + "scaleset_id", + "pool_name" + ], + "additionalProperties": false + }, + "EventTaskFailed": { + "title": "EventTaskFailed", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "task_id": { + "title": "Task Id", + "type": "string", + "format": "uuid" + }, + "error": { + "$ref": "#/definitions/Error" + }, + "user_info": { + "$ref": "#/definitions/UserInfo" + } + }, + "required": [ + "job_id", + "task_id", + "error" + ], + "additionalProperties": false + }, + "TaskState": { + "title": "TaskState", + "description": "An enumeration.", + "enum": [ + "init", + "waiting", + "scheduled", + "setting_up", + "running", + "stopping", + "stopped", + "wait_job" + ] + }, + "EventTaskStateUpdated": { + "title": "EventTaskStateUpdated", + "type": "object", + "properties": { + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" + }, + "task_id": { + "title": "Task Id", + "type": "string", + "format": "uuid" + }, + "state": { + "$ref": "#/definitions/TaskState" + }, + "end_time": { + "title": "End Time", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "job_id", + "task_id", + "state" + ], + "additionalProperties": false + }, "TaskType": { "title": "TaskType", "description": "An enumeration.", @@ -1061,31 +2775,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "tags" ] }, - "UserInfo": { - "title": "UserInfo", - "type": "object", - "properties": { - "application_id": { - "title": "Application Id", - "type": "string", - "format": "uuid" - }, - "object_id": { - "title": "Object Id", - "type": "string", - "format": "uuid" - }, - "upn": { - "title": "Upn", - "type": "string" - } - }, - "required": [ - "application_id" - ] - }, - "WebhookEventTaskCreated": { - "title": "WebhookEventTaskCreated", + "EventTaskCreated": { + "title": "EventTaskCreated", "type": "object", "properties": { "job_id": { @@ -1109,10 +2800,11 @@ Each event will be submitted via HTTP POST to the user provided URL. "job_id", "task_id", "config" - ] + ], + "additionalProperties": false }, - "WebhookEventTaskStopped": { - "title": "WebhookEventTaskStopped", + "EventTaskStopped": { + "title": "EventTaskStopped", "type": "object", "properties": { "job_id": { @@ -1132,92 +2824,146 @@ Each event will be submitted via HTTP POST to the user provided URL. "required": [ "job_id", "task_id" - ] + ], + "additionalProperties": false }, - "ErrorCode": { - "title": "ErrorCode", - "description": "An enumeration.", - "enum": [ - 450, - 451, - 452, - 453, - 454, - 455, - 456, - 457, - 458, - 459, - 460, - 461, - 462, - 463, - 464, - 465, - 467, - 468, - 469, - 470, - 471 - ] - }, - "Error": { - "title": "Error", + "BlobRef": { + "title": "BlobRef", "type": "object", "properties": { - "code": { - "$ref": "#/definitions/ErrorCode" + "account": { + "title": "Account", + "type": "string" }, - "errors": { - "title": "Errors", + "container": { + "title": "Container", + "type": "string" + }, + "name": { + "title": "Name", + "type": "string" + } + }, + "required": [ + "account", + "container", + "name" + ] + }, + "Report": { + "title": "Report", + "type": "object", + "properties": { + "input_url": { + "title": "Input Url", + "type": "string" + }, + "input_blob": { + "$ref": "#/definitions/BlobRef" + }, + "executable": { + "title": "Executable", + "type": "string" + }, + "crash_type": { + "title": "Crash Type", + "type": "string" + }, + "crash_site": { + "title": "Crash Site", + "type": "string" + }, + "call_stack": { + "title": "Call Stack", "type": "array", "items": { "type": "string" } - } - }, - "required": [ - "code", - "errors" - ] - }, - "WebhookEventTaskFailed": { - "title": "WebhookEventTaskFailed", - "type": "object", - "properties": { - "job_id": { - "title": "Job Id", - "type": "string", - "format": "uuid" + }, + "call_stack_sha256": { + "title": "Call Stack Sha256", + "type": "string" + }, + "input_sha256": { + "title": "Input Sha256", + "type": "string" + }, + "asan_log": { + "title": "Asan Log", + "type": "string" }, "task_id": { "title": "Task Id", "type": "string", "format": "uuid" }, - "error": { - "$ref": "#/definitions/Error" + "job_id": { + "title": "Job Id", + "type": "string", + "format": "uuid" }, - "user_info": { - "$ref": "#/definitions/UserInfo" + "scariness_score": { + "title": "Scariness Score", + "type": "integer" + }, + "scariness_description": { + "title": "Scariness Description", + "type": "string" } }, "required": [ - "job_id", + "input_blob", + "executable", + "crash_type", + "crash_site", + "call_stack", + "call_stack_sha256", + "input_sha256", "task_id", - "error" + "job_id" ] }, - "WebhookEventPing": { - "title": "WebhookEventPing", + "EventCrashReported": { + "title": "EventCrashReported", "type": "object", "properties": { - "ping_id": { - "title": "Ping Id", - "type": "string", - "format": "uuid" + "report": { + "$ref": "#/definitions/Report" + }, + "container": { + "title": "Container", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" } - } + }, + "required": [ + "report", + "container", + "filename" + ], + "additionalProperties": false + }, + "EventFileAdded": { + "title": "EventFileAdded", + "type": "object", + "properties": { + "container": { + "title": "Container", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "container", + "filename" + ], + "additionalProperties": false } } } diff --git a/src/api-service/__app__/agent_registration/__init__.py b/src/api-service/__app__/agent_registration/__init__.py index 6c1b84df5..522836473 100644 --- a/src/api-service/__app__/agent_registration/__init__.py +++ b/src/api-service/__app__/agent_registration/__init__.py @@ -99,16 +99,15 @@ def post(req: func.HttpRequest) -> func.HttpResponse: NodeMessage.clear_messages(node.machine_id) node.version = registration_request.version node.reimage_requested = False - node.state = NodeState.init node.reimage_queued = False + node.set_state(NodeState.init) else: - node = Node( + node = Node.create( pool_name=registration_request.pool_name, machine_id=registration_request.machine_id, scaleset_id=registration_request.scaleset_id, version=registration_request.version, ) - node.save() # if any tasks were running during an earlier instance of this node, clear them out node.mark_tasks_stopped_early() diff --git a/src/api-service/__app__/onefuzzlib/agent_events.py b/src/api-service/__app__/onefuzzlib/agent_events.py index e67fd9134..dcddf40e2 100644 --- a/src/api-service/__app__/onefuzzlib/agent_events.py +++ b/src/api-service/__app__/onefuzzlib/agent_events.py @@ -67,13 +67,11 @@ def on_state_update( # they send 'init' with reimage_requested, it's because the node was reimaged # successfully. node.reimage_requested = False - node.state = state - node.save() + node.set_state(state) return None logging.info("node state update: %s from:%s to:%s", machine_id, node.state, state) - node.state = state - node.save() + node.set_state(state) if state == NodeState.free: logging.info("node now available for work: %s", machine_id) @@ -113,9 +111,7 @@ def on_state_update( # Other states we would want to preserve are excluded by the # outermost conditional check. if task.state not in [TaskState.running, TaskState.setting_up]: - task.state = TaskState.setting_up - task.save() - task.on_start() + task.set_state(TaskState.setting_up) # Note: we set the node task state to `setting_up`, even though # the task itself may be `running`. @@ -160,8 +156,7 @@ def on_worker_event_running( return node if node.state not in NodeState.ready_for_reset(): - node.state = NodeState.busy - node.save() + node.set_state(NodeState.busy) node_task = NodeTasks( machine_id=machine_id, task_id=event.task_id, state=NodeTaskState.running @@ -184,12 +179,7 @@ def on_worker_event_running( task.job_id, task.task_id, ) - task.state = TaskState.running - task.save() - - # Start the clock for the task if it wasn't started already - # (as happens in 1.0.0 agents) - task.on_start() + task.set_state(TaskState.running) task_event = TaskEvent( task_id=task.task_id, diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py index b79a740c0..a31da3574 100644 --- a/src/api-service/__app__/onefuzzlib/autoscale.py +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -66,7 +66,7 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: if not autoscale_config.region: raise Exception("Region is missing") - scaleset = Scaleset.create( + Scaleset.create( pool_name=pool.name, vm_sku=autoscale_config.vm_sku, image=autoscale_config.image, @@ -75,7 +75,6 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: spot_instances=autoscale_config.spot_instances, tags={"pool": pool.name}, ) - scaleset.save() nodes_needed -= max_nodes_scaleset diff --git a/src/api-service/__app__/onefuzzlib/dashboard.py b/src/api-service/__app__/onefuzzlib/dashboard.py deleted file mode 100644 index 89c762168..000000000 --- a/src/api-service/__app__/onefuzzlib/dashboard.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import json -from enum import Enum -from queue import Empty, Queue -from typing import Dict, Optional, Union -from uuid import UUID - -from onefuzztypes.primitives import Event - -EVENTS: Queue = Queue() - - -def resolve(data: Event) -> Union[str, int, Dict[str, str]]: - if isinstance(data, str): - return data - if isinstance(data, UUID): - return str(data) - elif isinstance(data, Enum): - return data.name - elif isinstance(data, int): - return data - elif isinstance(data, dict): - for x in data: - data[x] = str(data[x]) - return data - raise NotImplementedError("no conversion from %s" % type(data)) - - -def get_event() -> Optional[str]: - events = [] - - for _ in range(10): - try: - (event, data) = EVENTS.get(block=False) - events.append({"type": event, "data": data}) - EVENTS.task_done() - except Empty: - break - - if events: - return json.dumps({"target": "dashboard", "arguments": events}) - else: - return None - - -def add_event(message_type: str, data: Dict[str, Event]) -> None: - for key in data: - data[key] = resolve(data[key]) - - EVENTS.put((message_type, data)) diff --git a/src/api-service/__app__/onefuzzlib/events.py b/src/api-service/__app__/onefuzzlib/events.py new file mode 100644 index 000000000..efe3220a3 --- /dev/null +++ b/src/api-service/__app__/onefuzzlib/events.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json +import logging +from queue import Empty, Queue +from typing import Optional + +from onefuzztypes.events import Event, EventMessage, get_event_type + +from .webhooks import Webhook + +EVENTS: Queue = Queue() + + +def get_events() -> Optional[str]: + events = [] + + for _ in range(5): + try: + event = EVENTS.get(block=False) + events.append(json.loads(event.json(exclude_none=True))) + EVENTS.task_done() + except Empty: + break + + if events: + return json.dumps({"target": "events", "arguments": events}) + else: + return None + + +def send_event(event: Event) -> None: + event_type = get_event_type(event) + logging.info("sending event: %s - %s", event_type, event) + event_message = EventMessage(event_type=event_type, event=event) + EVENTS.put(event_message) + Webhook.send_event(event_message) diff --git a/src/api-service/__app__/onefuzzlib/jobs.py b/src/api-service/__app__/onefuzzlib/jobs.py index 75c2cb081..04a8f9b20 100644 --- a/src/api-service/__app__/onefuzzlib/jobs.py +++ b/src/api-service/__app__/onefuzzlib/jobs.py @@ -8,8 +8,10 @@ from datetime import datetime, timedelta from typing import List, Optional, Tuple from onefuzztypes.enums import JobState, TaskState +from onefuzztypes.events import EventJobCreated, EventJobStopped from onefuzztypes.models import Job as BASE_JOB +from .events import send_event from .orm import MappingIntStrAny, ORMMixin, QueryFilter from .tasks.main import Task @@ -37,13 +39,6 @@ class Job(BASE_JOB, ORMMixin): def save_exclude(self) -> Optional[MappingIntStrAny]: return {"task_info": ...} - def event_include(self) -> Optional[MappingIntStrAny]: - return { - "job_id": ..., - "state": ..., - "error": ..., - } - def telemetry_include(self) -> Optional[MappingIntStrAny]: return { "machine_id": ..., @@ -70,6 +65,11 @@ class Job(BASE_JOB, ORMMixin): task.mark_stopping() else: self.state = JobState.stopped + send_event( + EventJobStopped( + job_id=self.job_id, config=self.config, user_info=self.user_info + ) + ) self.save() def on_start(self) -> None: @@ -77,3 +77,14 @@ class Job(BASE_JOB, ORMMixin): if self.end_time is None: self.end_time = datetime.utcnow() + timedelta(hours=self.config.duration) self.save() + + def save(self, new: bool = False, require_etag: bool = False) -> None: + created = self.etag is None + super().save(new=new, require_etag=require_etag) + + if created: + send_event( + EventJobCreated( + job_id=self.job_id, config=self.config, user_info=self.user_info + ) + ) diff --git a/src/api-service/__app__/onefuzzlib/notifications/main.py b/src/api-service/__app__/onefuzzlib/notifications/main.py index ad798031f..e4512c21f 100644 --- a/src/api-service/__app__/onefuzzlib/notifications/main.py +++ b/src/api-service/__app__/onefuzzlib/notifications/main.py @@ -10,6 +10,7 @@ from uuid import UUID from memoization import cached from onefuzztypes import models from onefuzztypes.enums import ErrorCode, TaskState +from onefuzztypes.events import EventCrashReported, EventFileAdded from onefuzztypes.models import ( ADOTemplate, Error, @@ -27,7 +28,7 @@ from ..azure.containers import ( ) from ..azure.queue import send_message from ..azure.storage import StorageType -from ..dashboard import add_event +from ..events import send_event from ..orm import ORMMixin from ..reports import get_report from ..tasks.config import get_input_container_queues @@ -116,16 +117,16 @@ def new_files(container: Container, filename: str) -> None: if metadata: results["metadata"] = metadata + report = get_report(container, filename) + if report: + results["executable"] = report.executable + results["crash_type"] = report.crash_type + results["crash_site"] = report.crash_site + results["job_id"] = report.job_id + results["task_id"] = report.task_id + notifications = get_notifications(container) if notifications: - report = get_report(container, filename) - if report: - results["executable"] = report.executable - results["crash_type"] = report.crash_type - results["crash_site"] = report.crash_site - results["job_id"] = report.job_id - results["task_id"] = report.task_id - logging.info("notifications for %s %s %s", container, filename, notifications) done = [] for notification in notifications: @@ -154,4 +155,9 @@ def new_files(container: Container, filename: str) -> None: ) send_message(task.task_id, bytes(url, "utf-8"), StorageType.corpus) - add_event("new_file", results) + if report: + send_event( + EventCrashReported(report=report, container=container, filename=filename) + ) + else: + send_event(EventFileAdded(container=container, filename=filename)) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index 177199f8b..dc82ff74e 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -39,7 +39,6 @@ from pydantic import BaseModel, Field from typing_extensions import Protocol from .azure.table import get_client -from .dashboard import add_event from .telemetry import track_event_filtered from .updates import queue_update @@ -255,21 +254,9 @@ class ORMMixin(ModelMixin): def telemetry_include(self) -> Optional[MappingIntStrAny]: return {} - def event_include(self) -> Optional[MappingIntStrAny]: - return {} - def telemetry(self) -> Any: return self.raw(exclude_none=True, include=self.telemetry_include()) - def _event_as_needed(self) -> None: - # Upon ORM save, if the object returns event data, we'll send it to the - # dashboard event subsystem - - data = self.raw(exclude_none=True, include=self.event_include()) - if not data: - return - add_event(self.table_name(), data) - def get_keys(self) -> Tuple[KEY, KEY]: partition_key_field, row_key_field = self.key_fields() @@ -331,13 +318,9 @@ class ORMMixin(ModelMixin): if telem: track_event_filtered(TelemetryEvent[self.table_name()], telem) - self._event_as_needed() return None def delete(self) -> None: - # fire off an event so Signalr knows it's being deleted - self._event_as_needed() - partition_key, row_key = self.get_keys() client = get_client() diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index a956fa82a..928851f14 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -16,6 +16,16 @@ from onefuzztypes.enums import ( PoolState, ScalesetState, ) +from onefuzztypes.events import ( + EventNodeCreated, + EventNodeDeleted, + EventNodeStateUpdated, + EventPoolCreated, + EventPoolDeleted, + EventScalesetCreated, + EventScalesetDeleted, + EventScalesetFailed, +) from onefuzztypes.models import AutoScaleConfig, Error from onefuzztypes.models import Node as BASE_NODE from onefuzztypes.models import NodeAssignment, NodeCommand, NodeCommandAddSshKey @@ -60,6 +70,7 @@ from .azure.vmss import ( resize_vmss, update_extensions, ) +from .events import send_event from .extension import fuzz_extensions from .orm import MappingIntStrAny, ORMMixin, QueryFilter @@ -76,6 +87,31 @@ class Node(BASE_NODE, ORMMixin): # should only be unset during agent_registration POST reimage_queued: bool = Field(default=False) + @classmethod + def create( + cls, + *, + pool_name: PoolName, + machine_id: UUID, + scaleset_id: Optional[UUID], + version: str, + ) -> "Node": + node = cls( + pool_name=pool_name, + machine_id=machine_id, + scaleset_id=scaleset_id, + version=version, + ) + node.save() + send_event( + EventNodeCreated( + machine_id=node.machine_id, + scaleset_id=node.scaleset_id, + pool_name=node.pool_name, + ) + ) + return node + @classmethod def search_states( cls, @@ -163,14 +199,6 @@ class Node(BASE_NODE, ORMMixin): "scaleset_id": ..., } - def event_include(self) -> Optional[MappingIntStrAny]: - return { - "pool_name": ..., - "machine_id": ..., - "state": ..., - "scaleset_id": ..., - } - def scaleset_node_exists(self) -> bool: if self.scaleset_id is None: return False @@ -302,6 +330,7 @@ class Node(BASE_NODE, ORMMixin): """ Tell the node to stop everything. """ self.set_shutdown() self.stop() + self.set_state(NodeState.halt) @classmethod def get_dead_nodes( @@ -315,8 +344,29 @@ class Node(BASE_NODE, ORMMixin): raw_unchecked_filter=time_filter, ) + def set_state(self, state: NodeState) -> None: + if self.state != state: + self.state = state + send_event( + EventNodeStateUpdated( + machine_id=self.machine_id, + pool_name=self.pool_name, + scaleset_id=self.scaleset_id, + state=state, + ) + ) + + self.save() + def delete(self) -> None: NodeTasks.clear_by_machine_id(self.machine_id) + send_event( + EventNodeDeleted( + machine_id=self.machine_id, + pool_name=self.pool_name, + scaleset_id=self.scaleset_id, + ) + ) super().delete() NodeMessage.clear_messages(self.machine_id) @@ -410,7 +460,7 @@ class Pool(BASE_POOL, ORMMixin): client_id: Optional[UUID], autoscale: Optional[AutoScaleConfig], ) -> "Pool": - return cls( + entry = cls( name=name, os=os, arch=arch, @@ -419,6 +469,17 @@ class Pool(BASE_POOL, ORMMixin): config=None, autoscale=autoscale, ) + entry.save() + send_event( + EventPoolCreated( + pool_name=name, + os=os, + arch=arch, + managed=managed, + autoscale=autoscale, + ) + ) + return entry def save_exclude(self) -> Optional[MappingIntStrAny]: return { @@ -435,15 +496,6 @@ class Pool(BASE_POOL, ORMMixin): "timestamp": ..., } - def event_include(self) -> Optional[MappingIntStrAny]: - return { - "name": ..., - "pool_id": ..., - "os": ..., - "state": ..., - "managed": ..., - } - def telemetry_include(self) -> Optional[MappingIntStrAny]: return { "pool_id": ..., @@ -533,6 +585,17 @@ class Pool(BASE_POOL, ORMMixin): query["state"] = states return cls.search(query=query) + def set_shutdown(self, now: bool) -> None: + if self.state in [PoolState.halt, PoolState.shutdown]: + return + + if now: + self.state = PoolState.halt + else: + self.state = PoolState.shutdown + + self.save() + def shutdown(self) -> None: """ shutdown allows nodes to finish current work then delete """ scalesets = Scaleset.search_by_pool(self.name) @@ -545,8 +608,7 @@ class Pool(BASE_POOL, ORMMixin): return for scaleset in scalesets: - scaleset.state = ScalesetState.shutdown - scaleset.save() + scaleset.set_shutdown(now=False) for node in nodes: node.set_shutdown() @@ -555,6 +617,7 @@ class Pool(BASE_POOL, ORMMixin): def halt(self) -> None: """ halt the pool immediately """ + scalesets = Scaleset.search_by_pool(self.name) nodes = Node.search(query={"pool_name": [self.name]}) if not scalesets and not nodes: @@ -577,21 +640,15 @@ class Pool(BASE_POOL, ORMMixin): def key_fields(cls) -> Tuple[str, str]: return ("name", "pool_id") + def delete(self) -> None: + super().delete() + send_event(EventPoolDeleted(pool_name=self.name)) + class Scaleset(BASE_SCALESET, ORMMixin): def save_exclude(self) -> Optional[MappingIntStrAny]: return {"nodes": ...} - def event_include(self) -> Optional[MappingIntStrAny]: - return { - "pool_name": ..., - "scaleset_id": ..., - "state": ..., - "os": ..., - "size": ..., - "error": ..., - } - def telemetry_include(self) -> Optional[MappingIntStrAny]: return { "scaleset_id": ..., @@ -615,7 +672,7 @@ class Scaleset(BASE_SCALESET, ORMMixin): client_id: Optional[UUID] = None, client_object_id: Optional[UUID] = None, ) -> "Scaleset": - return cls( + entry = cls( pool_name=pool_name, vm_sku=vm_sku, image=image, @@ -627,6 +684,18 @@ class Scaleset(BASE_SCALESET, ORMMixin): client_object_id=client_object_id, tags=tags, ) + entry.save() + send_event( + EventScalesetCreated( + scaleset_id=entry.scaleset_id, + pool_name=entry.pool_name, + vm_sku=vm_sku, + image=image, + region=region, + size=size, + ) + ) + return entry @classmethod def search_by_pool(cls, pool_name: PoolName) -> List["Scaleset"]: @@ -651,6 +720,20 @@ class Scaleset(BASE_SCALESET, ORMMixin): def get_by_object_id(cls, object_id: UUID) -> List["Scaleset"]: return cls.search(query={"client_object_id": [object_id]}) + def set_failed(self, error: Error) -> None: + if self.error is not None: + return + + self.error = error + self.state = ScalesetState.creation_failed + self.save() + + send_event( + EventScalesetFailed( + scaleset_id=self.scaleset_id, pool_name=self.pool_name, error=self.error + ) + ) + def init(self) -> None: logging.info("scaleset init: %s", self.scaleset_id) @@ -660,9 +743,7 @@ class Scaleset(BASE_SCALESET, ORMMixin): # scaleset being added to the pool. pool = Pool.get_by_name(self.pool_name) if isinstance(pool, Error): - self.error = pool - self.state = ScalesetState.halt - self.save() + self.set_failed(pool) return if pool.state == PoolState.init: @@ -672,14 +753,16 @@ class Scaleset(BASE_SCALESET, ORMMixin): elif pool.state == PoolState.running: image_os = get_os(self.region, self.image) if isinstance(image_os, Error): - self.error = image_os - self.state = ScalesetState.creation_failed + self.set_failed(image_os) + return + elif image_os != pool.os: - self.error = Error( + error = Error( code=ErrorCode.INVALID_REQUEST, errors=["invalid os (got: %s needed: %s)" % (image_os, pool.os)], ) - self.state = ScalesetState.creation_failed + self.set_failed(error) + return else: self.state = ScalesetState.setup else: @@ -698,26 +781,23 @@ class Scaleset(BASE_SCALESET, ORMMixin): logging.info("creating network: %s", self.region) result = network.create() if isinstance(result, Error): - self.error = result - self.state = ScalesetState.creation_failed + self.set_failed(result) + return self.save() return if self.auth is None: - self.error = Error( + error = Error( code=ErrorCode.UNABLE_TO_CREATE, errors=["missing required auth"] ) - self.state = ScalesetState.creation_failed - self.save() + self.set_failed(error) return vmss = get_vmss(self.scaleset_id) if vmss is None: pool = Pool.get_by_name(self.pool_name) if isinstance(pool, Error): - self.error = pool - self.state = ScalesetState.halt - self.save() + self.set_failed(pool) return logging.info("creating scaleset: %s", self.scaleset_id) @@ -736,13 +816,8 @@ class Scaleset(BASE_SCALESET, ORMMixin): self.tags, ) if isinstance(result, Error): - self.error = result - logging.error( - "stopping task because of failed vmss: %s %s", - self.scaleset_id, - result, - ) - self.state = ScalesetState.creation_failed + self.set_failed(result) + return else: logging.info("creating scaleset: %s", self.scaleset_id) elif vmss.provisioning_state == "Creating": @@ -750,10 +825,10 @@ class Scaleset(BASE_SCALESET, ORMMixin): self.try_set_identity(vmss) else: logging.info("scaleset running: %s", self.scaleset_id) - error = self.try_set_identity(vmss) - if error: - self.state = ScalesetState.creation_failed - self.error = error + identity_result = self.try_set_identity(vmss) + if identity_result: + self.set_failed(identity_result) + return else: self.state = ScalesetState.running self.save() @@ -843,8 +918,6 @@ class Scaleset(BASE_SCALESET, ORMMixin): self.delete_nodes(to_delete) for node in to_delete: node.set_halt() - node.state = NodeState.halt - node.save() if to_reimage: self.reimage_nodes(to_reimage) @@ -967,6 +1040,17 @@ class Scaleset(BASE_SCALESET, ORMMixin): node.reimage_queued = True node.save() + def set_shutdown(self, now: bool) -> None: + if self.state in [ScalesetState.halt, ScalesetState.shutdown]: + return + + if now: + self.state = ScalesetState.halt + else: + self.state = ScalesetState.shutdown + + self.save() + def shutdown(self) -> None: size = get_vmss_size(self.scaleset_id) logging.info("scaleset shutdown: %s (current size: %s)", self.scaleset_id, size) @@ -977,7 +1061,6 @@ class Scaleset(BASE_SCALESET, ORMMixin): self.halt() def halt(self) -> None: - self.state = ScalesetState.halt ScalesetShrinkQueue(self.scaleset_id).delete() for node in Node.search_states(scaleset_id=self.scaleset_id): @@ -1050,8 +1133,7 @@ class Scaleset(BASE_SCALESET, ORMMixin): pool = Pool.get_by_name(self.pool_name) if isinstance(pool, Error): - self.error = pool - self.halt() + self.set_failed(pool) return logging.debug("updating scaleset configs: %s", self.scaleset_id) @@ -1068,6 +1150,12 @@ class Scaleset(BASE_SCALESET, ORMMixin): def key_fields(cls) -> Tuple[str, str]: return ("pool_name", "scaleset_id") + def delete(self) -> None: + super().delete() + send_event( + EventScalesetDeleted(scaleset_id=self.scaleset_id, pool_name=self.pool_name) + ) + class ShrinkEntry(BaseModel): shrink_id: UUID = Field(default_factory=uuid4) diff --git a/src/api-service/__app__/onefuzzlib/proxy.py b/src/api-service/__app__/onefuzzlib/proxy.py index edb07c33a..791cd7c2e 100644 --- a/src/api-service/__app__/onefuzzlib/proxy.py +++ b/src/api-service/__app__/onefuzzlib/proxy.py @@ -8,7 +8,8 @@ import logging from typing import List, Optional, Tuple from azure.mgmt.compute.models import VirtualMachine -from onefuzztypes.enums import VmState +from onefuzztypes.enums import ErrorCode, VmState +from onefuzztypes.events import EventProxyCreated, EventProxyDeleted, EventProxyFailed from onefuzztypes.models import ( Authentication, Error, @@ -26,8 +27,9 @@ from .azure.ip import get_public_ip from .azure.queue import get_queue_sas from .azure.storage import StorageType from .azure.vm import VM +from .events import send_event from .extension import proxy_manager_extensions -from .orm import MappingIntStrAny, ORMMixin, QueryFilter +from .orm import ORMMixin, QueryFilter from .proxy_forward import ProxyForward PROXY_SKU = "Standard_B2s" @@ -41,7 +43,7 @@ class Proxy(ORMMixin): state: VmState = Field(default=VmState.init) auth: Authentication = Field(default_factory=build_auth) ip: Optional[str] - error: Optional[str] + error: Optional[Error] version: str = Field(default=__version__) heartbeat: Optional[ProxyHeartbeat] @@ -49,14 +51,6 @@ class Proxy(ORMMixin): def key_fields(cls) -> Tuple[str, Optional[str]]: return ("region", None) - def event_include(self) -> Optional[MappingIntStrAny]: - return { - "region": ..., - "state": ..., - "ip": ..., - "error": ..., - } - def get_vm(self) -> VM: vm = VM( name="proxy-%s" % self.region, @@ -72,42 +66,59 @@ class Proxy(ORMMixin): vm_data = vm.get() if vm_data: if vm_data.provisioning_state == "Failed": - self.set_failed(vm) + self.set_provision_failed(vm_data) + return else: self.save_proxy_config() self.state = VmState.extensions_launch else: result = vm.create() if isinstance(result, Error): - self.error = repr(result) - self.state = VmState.stopping + self.set_failed(result) + return self.save() - def set_failed(self, vm_data: VirtualMachine) -> None: - logging.error("vm failed to provision: %s", vm_data.name) + def set_provision_failed(self, vm_data: VirtualMachine) -> None: + errors = ["provisioning failed"] for status in vm_data.instance_view.statuses: if status.level.name.lower() == "error": - logging.error( - "vm status: %s %s %s %s", - vm_data.name, - status.code, - status.display_status, - status.message, + errors.append( + f"code:{status.code} status:{status.display_status} " + f"message:{status.message}" ) - self.state = VmState.vm_allocation_failed + + self.set_failed( + Error( + code=ErrorCode.PROXY_FAILED, + errors=errors, + ) + ) + return + + def set_failed(self, error: Error) -> None: + if self.error is not None: + return + + logging.error("proxy vm failed: %s - %s", self.region, error) + send_event(EventProxyFailed(region=self.region, error=error)) + self.error = error + self.state = VmState.stopping + self.save() def extensions_launch(self) -> None: vm = self.get_vm() vm_data = vm.get() if not vm_data: - logging.error("Azure VM does not exist: %s", vm.name) - self.state = VmState.stopping - self.save() + self.set_failed( + Error( + code=ErrorCode.PROXY_FAILED, + errors=["azure not able to find vm"], + ) + ) return if vm_data.provisioning_state == "Failed": - self.set_failed(vm_data) - self.save() + self.set_provision_failed(vm_data) return ip = get_public_ip(vm_data.network_profile.network_interfaces[0].id) @@ -119,9 +130,8 @@ class Proxy(ORMMixin): extensions = proxy_manager_extensions(self.region) result = vm.add_extensions(extensions) if isinstance(result, Error): - logging.error("vm extension failed: %s", repr(result)) - self.error = repr(result) - self.state = VmState.stopping + self.set_failed(result) + return elif result: self.state = VmState.running @@ -231,4 +241,9 @@ class Proxy(ORMMixin): proxy = Proxy(region=region) proxy.save() + send_event(EventProxyCreated(region=region)) return proxy + + def delete(self) -> None: + super().delete() + send_event(EventProxyDeleted(region=self.region)) diff --git a/src/api-service/__app__/onefuzzlib/reports.py b/src/api-service/__app__/onefuzzlib/reports.py index 36442f332..9c628b044 100644 --- a/src/api-service/__app__/onefuzzlib/reports.py +++ b/src/api-service/__app__/onefuzzlib/reports.py @@ -7,6 +7,7 @@ import json import logging from typing import Optional, Union +from memoization import cached from onefuzztypes.models import Report from onefuzztypes.primitives import Container from pydantic import ValidationError @@ -16,7 +17,7 @@ from .azure.storage import StorageType def parse_report( - content: Union[str, bytes], metadata: Optional[str] = None + content: Union[str, bytes], file_path: Optional[str] = None ) -> Optional[Report]: if isinstance(content, bytes): try: @@ -24,7 +25,7 @@ def parse_report( except UnicodeDecodeError as err: logging.error( "unable to parse report (%s): unicode decode of report failed - %s", - metadata, + file_path, err, ) return None @@ -33,28 +34,30 @@ def parse_report( data = json.loads(content) except json.decoder.JSONDecodeError as err: logging.error( - "unable to parse report (%s): json decoding failed - %s", metadata, err + "unable to parse report (%s): json decoding failed - %s", file_path, err ) return None try: entry = Report.parse_obj(data) except ValidationError as err: - logging.error("unable to parse report (%s): %s", metadata, err) + logging.error("unable to parse report (%s): %s", file_path, err) return None return entry +# cache the last 1000 reports +@cached(max_size=1000) def get_report(container: Container, filename: str) -> Optional[Report]: - metadata = "/".join([container, filename]) + file_path = "/".join([container, filename]) if not filename.endswith(".json"): - logging.error("get_report invalid extension: %s", metadata) + logging.error("get_report invalid extension: %s", file_path) return None blob = get_blob(container, filename, StorageType.corpus) if blob is None: - logging.error("get_report invalid blob: %s", metadata) + logging.error("get_report invalid blob: %s", file_path) return None - return parse_report(blob, metadata=metadata) + return parse_report(blob, file_path=file_path) diff --git a/src/api-service/__app__/onefuzzlib/tasks/main.py b/src/api-service/__app__/onefuzzlib/tasks/main.py index 3827989cc..03fff1501 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/main.py +++ b/src/api-service/__app__/onefuzzlib/tasks/main.py @@ -9,22 +9,23 @@ from typing import List, Optional, Tuple, Union from uuid import UUID from onefuzztypes.enums import ErrorCode, TaskState +from onefuzztypes.events import ( + EventTaskCreated, + EventTaskFailed, + EventTaskStateUpdated, + EventTaskStopped, +) from onefuzztypes.models import Error from onefuzztypes.models import Task as BASE_TASK from onefuzztypes.models import TaskConfig, TaskVm, UserInfo -from onefuzztypes.webhooks import ( - WebhookEventTaskCreated, - WebhookEventTaskFailed, - WebhookEventTaskStopped, -) from ..azure.image import get_os from ..azure.queue import create_queue, delete_queue from ..azure.storage import StorageType +from ..events import send_event from ..orm import MappingIntStrAny, ORMMixin, QueryFilter from ..pools import Node, Pool, Scaleset from ..proxy_forward import ProxyForward -from ..webhooks import Webhook class Task(BASE_TASK, ORMMixin): @@ -58,8 +59,8 @@ class Task(BASE_TASK, ORMMixin): raise Exception("task must have vm or pool") task = cls(config=config, job_id=job_id, os=os, user_info=user_info) task.save() - Webhook.send_event( - WebhookEventTaskCreated( + send_event( + EventTaskCreated( job_id=task.job_id, task_id=task.task_id, config=config, @@ -116,18 +117,9 @@ class Task(BASE_TASK, ORMMixin): "config": {"vm": {"count": ...}, "task": {"type": ...}}, } - def event_include(self) -> Optional[MappingIntStrAny]: - return { - "job_id": ..., - "task_id": ..., - "state": ..., - "error": ..., - } - def init(self) -> None: create_queue(self.task_id, StorageType.corpus) - self.state = TaskState.waiting - self.save() + self.set_state(TaskState.waiting) def stopping(self) -> None: # TODO: we need to 'unschedule' this task from the existing pools @@ -136,8 +128,7 @@ class Task(BASE_TASK, ORMMixin): ProxyForward.remove_forward(self.task_id) delete_queue(str(self.task_id), StorageType.corpus) Node.stop_task(self.task_id) - self.state = TaskState.stopped - self.save() + self.set_state(TaskState.stopped, send=False) @classmethod def search_states( @@ -195,10 +186,9 @@ class Task(BASE_TASK, ORMMixin): ) return - self.state = TaskState.stopping - self.save() - Webhook.send_event( - WebhookEventTaskStopped( + self.set_state(TaskState.stopping, send=False) + send_event( + EventTaskStopped( job_id=self.job_id, task_id=self.task_id, user_info=self.user_info ) ) @@ -211,11 +201,10 @@ class Task(BASE_TASK, ORMMixin): return self.error = error - self.state = TaskState.stopping - self.save() + self.set_state(TaskState.stopping, send=False) - Webhook.send_event( - WebhookEventTaskFailed( + send_event( + EventTaskFailed( job_id=self.job_id, task_id=self.task_id, error=error, @@ -287,7 +276,6 @@ class Task(BASE_TASK, ORMMixin): self.end_time = datetime.utcnow() + timedelta( hours=self.config.task.duration ) - self.save() from ..jobs import Job @@ -298,3 +286,22 @@ class Task(BASE_TASK, ORMMixin): @classmethod def key_fields(cls) -> Tuple[str, str]: return ("job_id", "task_id") + + def set_state(self, state: TaskState, send: bool = True) -> None: + if self.state == state: + return + + self.state = state + if self.state in [TaskState.running, TaskState.setting_up]: + self.on_start() + + self.save() + + send_event( + EventTaskStateUpdated( + job_id=self.job_id, + task_id=self.task_id, + state=self.state, + end_time=self.end_time, + ) + ) diff --git a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py index 98f84b8fb..f46c46493 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py +++ b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py @@ -235,8 +235,7 @@ def schedule_tasks() -> None: if schedule_workset(work_set, bucket_config.pool, bucket_config.count): for work_unit in work_set.work_units: task = tasks_by_id[work_unit.task_id] - task.state = TaskState.scheduled - task.save() + task.set_state(TaskState.scheduled) seen.add(task.task_id) not_ready_count = len(tasks) - len(seen) diff --git a/src/api-service/__app__/onefuzzlib/webhooks.py b/src/api-service/__app__/onefuzzlib/webhooks.py index 02b8d62c3..5f3d99a58 100644 --- a/src/api-service/__app__/onefuzzlib/webhooks.py +++ b/src/api-service/__app__/onefuzzlib/webhooks.py @@ -8,21 +8,15 @@ import hmac import logging from hashlib import sha512 from typing import List, Optional, Tuple -from uuid import UUID +from uuid import UUID, uuid4 import requests from memoization import cached -from onefuzztypes.enums import ErrorCode, WebhookEventType, WebhookMessageState +from onefuzztypes.enums import ErrorCode, WebhookMessageState +from onefuzztypes.events import Event, EventMessage, EventPing, EventType from onefuzztypes.models import Error, Result from onefuzztypes.webhooks import Webhook as BASE_WEBHOOK -from onefuzztypes.webhooks import ( - WebhookEvent, - WebhookEventPing, - WebhookEventTaskCreated, - WebhookEventTaskFailed, - WebhookEventTaskStopped, - WebhookMessage, -) +from onefuzztypes.webhooks import WebhookMessage from onefuzztypes.webhooks import WebhookMessageLog as BASE_WEBHOOK_MESSAGE_LOG from pydantic import BaseModel @@ -140,34 +134,18 @@ class WebhookMessageLog(BASE_WEBHOOK_MESSAGE_LOG, ORMMixin): ) -def get_event_type(event: WebhookEvent) -> WebhookEventType: - events = { - WebhookEventTaskCreated: WebhookEventType.task_created, - WebhookEventTaskFailed: WebhookEventType.task_failed, - WebhookEventTaskStopped: WebhookEventType.task_stopped, - WebhookEventPing: WebhookEventType.ping, - } - - for event_class in events: - if isinstance(event, event_class): - return events[event_class] - - raise NotImplementedError("unsupported event type: %s" % event) - - class Webhook(BASE_WEBHOOK, ORMMixin): @classmethod def key_fields(cls) -> Tuple[str, Optional[str]]: return ("webhook_id", "name") @classmethod - def send_event(cls, event: WebhookEvent) -> None: - event_type = get_event_type(event) + def send_event(cls, event_message: EventMessage) -> None: for webhook in get_webhooks_cached(): - if event_type not in webhook.event_types: + if event_message.event_type not in webhook.event_types: continue - webhook._add_event(event_type, event) + webhook._add_event(event_message) @classmethod def get_by_id(cls, webhook_id: UUID) -> Result["Webhook"]: @@ -185,18 +163,19 @@ class Webhook(BASE_WEBHOOK, ORMMixin): webhook = webhooks[0] return webhook - def _add_event(self, event_type: WebhookEventType, event: WebhookEvent) -> None: + def _add_event(self, event_message: EventMessage) -> None: message = WebhookMessageLog( webhook_id=self.webhook_id, - event_type=event_type, - event=event, + event_id=event_message.event_id, + event_type=event_message.event_type, + event=event_message.event, ) message.save() message.queue_webhook() - def ping(self) -> WebhookEventPing: - ping = WebhookEventPing() - self._add_event(WebhookEventType.ping, ping) + def ping(self) -> EventPing: + ping = EventPing(ping_id=uuid4()) + self._add_event(EventMessage(event_type=EventType.ping, event=ping)) return ping def send(self, message_log: WebhookMessageLog) -> bool: @@ -228,8 +207,8 @@ def build_message( *, webhook_id: UUID, event_id: UUID, - event_type: WebhookEventType, - event: WebhookEvent, + event_type: EventType, + event: Event, secret_token: Optional[str] = None, ) -> Tuple[bytes, Optional[str]]: data = ( diff --git a/src/api-service/__app__/pool/__init__.py b/src/api-service/__app__/pool/__init__.py index 68359a8eb..b2f45f60a 100644 --- a/src/api-service/__app__/pool/__init__.py +++ b/src/api-service/__app__/pool/__init__.py @@ -7,7 +7,7 @@ import logging import os import azure.functions as func -from onefuzztypes.enums import ErrorCode, PoolState +from onefuzztypes.enums import ErrorCode from onefuzztypes.models import AgentConfig, Error from onefuzztypes.requests import PoolCreate, PoolSearch, PoolStop from onefuzztypes.responses import BoolResult @@ -116,7 +116,6 @@ def post(req: func.HttpRequest) -> func.HttpResponse: client_id=request.client_id, autoscale=request.autoscale, ) - pool.save() return ok(set_config(pool)) @@ -128,11 +127,7 @@ def delete(req: func.HttpRequest) -> func.HttpResponse: pool = Pool.get_by_name(request.name) if isinstance(pool, Error): return not_ok(pool, context="pool stop") - if request.now: - pool.state = PoolState.halt - else: - pool.state = PoolState.shutdown - pool.save() + pool.set_shutdown(now=request.now) return ok(BoolResult(result=True)) diff --git a/src/api-service/__app__/proxy_notification/__init__.py b/src/api-service/__app__/proxy_notification/__init__.py index a17abe7f5..8d4904ecc 100644 --- a/src/api-service/__app__/proxy_notification/__init__.py +++ b/src/api-service/__app__/proxy_notification/__init__.py @@ -9,7 +9,7 @@ import logging import azure.functions as func from onefuzztypes.models import ProxyHeartbeat -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.proxy import Proxy @@ -25,6 +25,6 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: proxy.heartbeat = heartbeat proxy.save() - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/queue_file_changes/__init__.py b/src/api-service/__app__/queue_file_changes/__init__.py index 90461fed6..c6571c448 100644 --- a/src/api-service/__app__/queue_file_changes/__init__.py +++ b/src/api-service/__app__/queue_file_changes/__init__.py @@ -10,7 +10,7 @@ from typing import Dict import azure.functions as func from ..onefuzzlib.azure.storage import corpus_accounts -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.notifications.main import new_files @@ -33,6 +33,6 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: file_added(event) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/queue_node_heartbeat/__init__.py b/src/api-service/__app__/queue_node_heartbeat/__init__.py index 4a413b48b..e43ccf37e 100644 --- a/src/api-service/__app__/queue_node_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_node_heartbeat/__init__.py @@ -11,7 +11,7 @@ import azure.functions as func from onefuzztypes.models import NodeHeartbeatEntry from pydantic import ValidationError -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.pools import Node @@ -30,6 +30,6 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: except ValidationError: logging.error("invalid node heartbeat: %s", raw) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/queue_task_heartbeat/__init__.py b/src/api-service/__app__/queue_task_heartbeat/__init__.py index b8b43d104..b81b2880a 100644 --- a/src/api-service/__app__/queue_task_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_task_heartbeat/__init__.py @@ -11,7 +11,7 @@ import azure.functions as func from onefuzztypes.models import Error, TaskHeartbeatEntry from pydantic import ValidationError -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.tasks.main import Task @@ -32,6 +32,6 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: except ValidationError: logging.error("invalid task heartbat: %s", raw) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/queue_updates/__init__.py b/src/api-service/__app__/queue_updates/__init__.py index 6e3ddf04f..465d4e98b 100644 --- a/src/api-service/__app__/queue_updates/__init__.py +++ b/src/api-service/__app__/queue_updates/__init__.py @@ -7,7 +7,7 @@ import json import azure.functions as func -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.updates import Update, execute_update @@ -16,6 +16,6 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: update = Update.parse_obj(json.loads(body)) execute_update(update) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/scaleset/__init__.py b/src/api-service/__app__/scaleset/__init__.py index 3f37cc444..584a1b36f 100644 --- a/src/api-service/__app__/scaleset/__init__.py +++ b/src/api-service/__app__/scaleset/__init__.py @@ -92,7 +92,6 @@ def post(req: func.HttpRequest) -> func.HttpResponse: spot_instances=request.spot_instances, tags=request.tags, ) - scaleset.save() # don't return auths during create, only 'get' with include_auth scaleset.auth = None return ok(scaleset) diff --git a/src/api-service/__app__/timer_proxy/__init__.py b/src/api-service/__app__/timer_proxy/__init__.py index 13e5c166f..aa413d416 100644 --- a/src/api-service/__app__/timer_proxy/__init__.py +++ b/src/api-service/__app__/timer_proxy/__init__.py @@ -8,7 +8,7 @@ import logging import azure.functions as func from onefuzztypes.enums import VmState -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.proxy import Proxy @@ -28,6 +28,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: logging.info("update proxy vm: %s", proxy.region) process_state_updates(proxy) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/timer_repro/__init__.py b/src/api-service/__app__/timer_repro/__init__.py index c6fe21e66..100b23a65 100644 --- a/src/api-service/__app__/timer_repro/__init__.py +++ b/src/api-service/__app__/timer_repro/__init__.py @@ -8,7 +8,7 @@ import logging import azure.functions as func from onefuzztypes.enums import VmState -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.repro import Repro @@ -28,6 +28,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: logging.info("update repro: %s", repro.vm_id) process_state_updates(repro) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/timer_tasks/__init__.py b/src/api-service/__app__/timer_tasks/__init__.py index 61a65f87d..97a2f4fd1 100644 --- a/src/api-service/__app__/timer_tasks/__init__.py +++ b/src/api-service/__app__/timer_tasks/__init__.py @@ -8,7 +8,7 @@ import logging import azure.functions as func from onefuzztypes.enums import JobState, TaskState -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.jobs import Job from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.tasks.main import Task @@ -38,6 +38,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: schedule_tasks() - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/__app__/timer_workers/__init__.py b/src/api-service/__app__/timer_workers/__init__.py index bb1390db5..bf42dae2f 100644 --- a/src/api-service/__app__/timer_workers/__init__.py +++ b/src/api-service/__app__/timer_workers/__init__.py @@ -9,7 +9,7 @@ import azure.functions as func from onefuzztypes.enums import NodeState, PoolState from ..onefuzzlib.autoscale import autoscale_pool -from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.events import get_events from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.pools import Node, Pool, Scaleset @@ -44,6 +44,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: elif pool.state in PoolState.available() and pool.autoscale: autoscale_pool(pool) - event = get_event() - if event: - dashboard.set(event) + events = get_events() + if events: + dashboard.set(events) diff --git a/src/api-service/tests/test_webhook_hmac.py b/src/api-service/tests/test_webhook_hmac.py index b298a8e2a..ec976f39b 100644 --- a/src/api-service/tests/test_webhook_hmac.py +++ b/src/api-service/tests/test_webhook_hmac.py @@ -6,8 +6,7 @@ import unittest from uuid import UUID -from onefuzztypes.enums import WebhookEventType -from onefuzztypes.webhooks import WebhookEventPing +from onefuzztypes.events import EventPing, EventType from __app__.onefuzzlib.webhooks import build_message @@ -16,8 +15,8 @@ class TestWebhookHmac(unittest.TestCase): def test_webhook_hmac(self) -> None: webhook_id = UUID(int=0) event_id = UUID(int=1) - event_type = WebhookEventType.ping - event = WebhookEventPing(ping_id=UUID(int=2)) + event_type = EventType.ping + event = EventPing(ping_id=UUID(int=2)) data, digest = build_message( webhook_id=webhook_id, event_id=event_id, event_type=event_type, event=event diff --git a/src/cli/onefuzz/api.py b/src/cli/onefuzz/api.py index eaef410e9..892711ed1 100644 --- a/src/cli/onefuzz/api.py +++ b/src/cli/onefuzz/api.py @@ -17,7 +17,15 @@ from uuid import UUID import pkg_resources import semver from memoization import cached -from onefuzztypes import enums, models, primitives, requests, responses, webhooks +from onefuzztypes import ( + enums, + events, + models, + primitives, + requests, + responses, + webhooks, +) from pydantic import BaseModel from six.moves import input # workaround for static analysis @@ -291,7 +299,7 @@ class Webhooks(Endpoint): self, name: str, url: str, - event_types: List[enums.WebhookEventType], + event_types: List[events.EventType], *, secret_token: Optional[str] = None, ) -> webhooks.Webhook: @@ -311,7 +319,7 @@ class Webhooks(Endpoint): *, name: Optional[str] = None, url: Optional[str] = None, - event_types: Optional[List[enums.WebhookEventType]] = None, + event_types: Optional[List[events.EventType]] = None, secret_token: Optional[str] = None, ) -> webhooks.Webhook: """ Update a webhook """ @@ -346,7 +354,7 @@ class Webhooks(Endpoint): data=requests.WebhookGet(webhook_id=webhook_id_expanded), ) - def ping(self, webhook_id: UUID_EXPANSION) -> webhooks.WebhookEventPing: + def ping(self, webhook_id: UUID_EXPANSION) -> events.EventPing: """ ping a webhook """ webhook_id_expanded = self._disambiguate_uuid( @@ -356,7 +364,7 @@ class Webhooks(Endpoint): self.logger.debug("pinging webhook: %s", webhook_id_expanded) return self._req_model( "POST", - webhooks.WebhookEventPing, + events.EventPing, data=requests.WebhookGet(webhook_id=webhook_id_expanded), alternate_endpoint="webhooks/ping", ) diff --git a/src/cli/onefuzz/status/cache.py b/src/cli/onefuzz/status/cache.py index f68bc334e..6a0a4c4b2 100644 --- a/src/cli/onefuzz/status/cache.py +++ b/src/cli/onefuzz/status/cache.py @@ -4,39 +4,78 @@ # Licensed under the MIT License. import logging -from datetime import datetime, timedelta, timezone +from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional, Set, Tuple, Union from uuid import UUID -from onefuzztypes.enums import ContainerType, JobState, NodeState, PoolState, TaskState -from onefuzztypes.models import Job, Node, Pool, Task +from onefuzztypes.enums import ContainerType, JobState, NodeState, TaskState, TaskType +from onefuzztypes.events import ( + EventCrashReported, + EventFileAdded, + EventJobCreated, + EventJobStopped, + EventMessage, + EventNodeCreated, + EventNodeDeleted, + EventNodeStateUpdated, + EventPoolCreated, + EventPoolDeleted, + EventTaskCreated, + EventTaskFailed, + EventTaskStateUpdated, + EventTaskStopped, + EventType, +) +from onefuzztypes.models import ( + Job, + JobConfig, + Node, + Pool, + Task, + TaskContainers, + UserInfo, +) +from onefuzztypes.primitives import Container from pydantic import BaseModel -MESSAGE = Tuple[datetime, str, str] +MESSAGE = Tuple[datetime, EventType, str] MINUTES = 60 HOURS = 60 * MINUTES DAYS = 24 * HOURS -def fmt_delta(data: timedelta) -> str: - result = [] +# status-top only representation of a Node +class MiniNode(BaseModel): + machine_id: UUID + pool_name: str + state: NodeState - seconds = data.total_seconds() - for letter, size in [ - ("d", DAYS), - ("h", HOURS), - ("m", MINUTES), - ]: - part, seconds = divmod(seconds, size) - if part: - result.append("%d%s" % (part, letter)) - return "".join(result) +# status-top only representation of a Job +class MiniJob(BaseModel): + job_id: UUID + config: JobConfig + state: Optional[JobState] + user_info: Optional[UserInfo] + + +# status-top only representation of a Task +class MiniTask(BaseModel): + job_id: UUID + task_id: UUID + type: TaskType + target: str + state: TaskState + pool: str + end_time: Optional[datetime] + containers: List[TaskContainers] def fmt(data: Any) -> Any: + if data is None: + return "" if isinstance(data, int): return str(data) if isinstance(data, str): @@ -47,8 +86,6 @@ def fmt(data: Any) -> Any: return [fmt(x) for x in data] if isinstance(data, datetime): return data.strftime("%H:%M:%S") - if isinstance(data, timedelta): - return fmt_delta(data) if isinstance(data, tuple): return tuple([fmt(x) for x in data]) if isinstance(data, Enum): @@ -72,19 +109,18 @@ class JobFilter(BaseModel): class TopCache: - JOB_FIELDS = ["Updated", "State", "Job", "Name", "Files"] + JOB_FIELDS = ["Job", "Name", "User", "Files"] TASK_FIELDS = [ - "Updated", - "State", "Job", "Task", + "State", "Type", - "Name", + "Target", "Files", "Pool", - "Time left", + "End time", ] - POOL_FIELDS = ["Updated", "Pool", "Name", "OS", "State", "Nodes"] + POOL_FIELDS = ["Name", "OS", "Arch", "Nodes"] def __init__( self, @@ -93,11 +129,11 @@ class TopCache: ): self.onefuzz = onefuzz self.job_filters = job_filters - self.tasks: Dict[UUID, Tuple[datetime, Task]] = {} - self.jobs: Dict[UUID, Tuple[datetime, Job]] = {} - self.files: Dict[str, Tuple[Optional[datetime], Set[str]]] = {} - self.pools: Dict[str, Tuple[datetime, Pool]] = {} - self.nodes: Dict[UUID, Tuple[datetime, Node]] = {} + self.tasks: Dict[UUID, MiniTask] = {} + self.jobs: Dict[UUID, MiniJob] = {} + self.files: Dict[Container, Set[str]] = {} + self.pools: Dict[str, EventPoolCreated] = {} + self.nodes: Dict[UUID, MiniNode] = {} self.messages: List[MESSAGE] = [] endpoint = onefuzz._backend.config.endpoint @@ -106,7 +142,7 @@ class TopCache: self.endpoint: str = endpoint self.last_update = datetime.now() - def add_container(self, name: str, ignore_date: bool = False) -> None: + def add_container(self, name: Container) -> None: if name in self.files: return try: @@ -114,158 +150,177 @@ class TopCache: except Exception: return - self.add_files(name, set(files.files), ignore_date=ignore_date) + self.add_files_set(name, set(files.files)) + + def add_message(self, message: EventMessage) -> None: + events = { + EventPoolCreated: lambda x: self.pool_created(x), + EventPoolDeleted: lambda x: self.pool_deleted(x), + EventTaskCreated: lambda x: self.task_created(x), + EventTaskStopped: lambda x: self.task_stopped(x), + EventTaskFailed: lambda x: self.task_stopped(x), + EventTaskStateUpdated: lambda x: self.task_state_updated(x), + EventJobCreated: lambda x: self.job_created(x), + EventJobStopped: lambda x: self.job_stopped(x), + EventNodeStateUpdated: lambda x: self.node_state_updated(x), + EventNodeCreated: lambda x: self.node_created(x), + EventNodeDeleted: lambda x: self.node_deleted(x), + EventCrashReported: lambda x: self.file_added(x), + EventFileAdded: lambda x: self.file_added(x), + } + + for event_cls in events: + if isinstance(message.event, event_cls): + events[event_cls](message.event) - def add_message(self, name: str, message: Dict[str, Any]) -> None: self.last_update = datetime.now() - data: Dict[str, Union[int, str]] = {} - for (k, v) in message.items(): - if k in ["task_id", "job_id", "pool_id", "scaleset_id", "machine_id"]: - k = k.replace("_id", "") - data[k] = str(v)[:8] - elif isinstance(v, (int, str)): - data[k] = v - - as_str = fmt(data) - messages = [x for x in self.messages if (x[1:] != [name, as_str])][-99:] - messages += [(datetime.now(), name, as_str)] - + messages = [x for x in self.messages][-99:] + messages += [ + (datetime.now(), message.event_type, message.event.json(exclude_none=True)) + ] self.messages = messages - def add_files( - self, container: str, new_files: Set[str], ignore_date: bool = False - ) -> None: - current_date: Optional[datetime] = None + def file_added(self, event: Union[EventFileAdded, EventCrashReported]) -> None: + if event.container in self.files: + files = self.files[event.container] + else: + files = set() + files.update(set([event.filename])) + self.files[event.container] = files + + def add_files_set(self, container: Container, new_files: Set[str]) -> None: if container in self.files: - (current_date, files) = self.files[container] + files = self.files[container] else: files = set() files.update(new_files) - if not ignore_date: - current_date = datetime.now() - self.files[container] = (current_date, files) + self.files[container] = files - def add_node( - self, machine_id: UUID, state: NodeState, node: Optional[Node] = None + def add_node(self, node: Node) -> None: + self.nodes[node.machine_id] = MiniNode( + machine_id=node.machine_id, state=node.state, pool_name=node.pool_name + ) + + def add_job(self, job: Job) -> MiniJob: + mini_job = MiniJob( + job_id=job.job_id, + config=job.config, + state=job.state, + user_info=job.user_info, + ) + self.jobs[job.job_id] = mini_job + return mini_job + + def job_created( + self, + job: EventJobCreated, ) -> None: - if state in [NodeState.halt]: - if machine_id in self.nodes: - del self.nodes[machine_id] - return + self.jobs[job.job_id] = MiniJob( + job_id=job.job_id, config=job.config, user_info=job.user_info + ) - if machine_id in self.nodes: - (_, node) = self.nodes[machine_id] - node.state = state - self.nodes[machine_id] = (datetime.now(), node) - else: - try: - if not node: - node = self.onefuzz.nodes.get(machine_id) - self.nodes[node.machine_id] = (datetime.now(), node) - except Exception: - logging.debug("unable to find pool: %s", machine_id) + def add_pool(self, pool: Pool) -> None: + self.pool_created( + EventPoolCreated( + pool_name=pool.name, + os=pool.os, + arch=pool.arch, + managed=pool.managed, + ) + ) - def add_pool( - self, pool_name: str, state: PoolState, pool: Optional[Pool] = None + def pool_created( + self, + pool: EventPoolCreated, ) -> None: - if state in [PoolState.halt]: - if pool_name in self.pools: - del self.pools[pool_name] - return + self.pools[pool.pool_name] = pool - if pool_name in self.pools: - (_, pool) = self.pools[pool_name] - pool.state = state - self.pools[pool_name] = (datetime.now(), pool) - else: - try: - if not pool: - pool = self.onefuzz.pools.get(pool_name) - self.pools[pool.name] = (datetime.now(), pool) - except Exception: - logging.debug("unable to find pool: %s", pool_name) + def pool_deleted(self, pool: EventPoolDeleted) -> None: + if pool.pool_name in self.pools: + del self.pools[pool.pool_name] def render_pools(self) -> List: results = [] - - for (timestamp, pool) in sorted(self.pools.values(), key=lambda x: x[0]): - timestamps = [timestamp] + for pool in self.pools.values(): nodes = {} - for (node_ts, node) in self.nodes.values(): - if node.pool_name != pool.name: + for node in self.nodes.values(): + if node.pool_name != pool.pool_name: continue if node.state not in nodes: nodes[node.state] = 0 nodes[node.state] += 1 - timestamps.append(node_ts) - - timestamps = [timestamp] - entry = [ - max(timestamps), - pool.pool_id, - pool.name, - pool.os, - pool.state, - nodes or "None", - ] + entry = (pool.pool_name, pool.os, pool.arch, nodes or "None") results.append(entry) return results - def add_task( - self, - task_id: UUID, - state: TaskState, - add_files: bool = True, - task: Optional[Task] = None, - ) -> None: - if state in [TaskState.stopping, TaskState.stopped]: - if task_id in self.tasks: - del self.tasks[task_id] - return + def node_created(self, node: EventNodeCreated) -> None: + self.nodes[node.machine_id] = MiniNode( + machine_id=node.machine_id, pool_name=node.pool_name, state=NodeState.init + ) - if task_id in self.tasks and self.tasks[task_id][1].state != state: - (_, task) = self.tasks[task_id] - task.state = state - self.tasks[task_id] = (datetime.now(), task) - else: - try: - if task is None: - task = self.onefuzz.tasks.get(task_id) - self.add_job_if_missing(task.job_id) - self.tasks[task.task_id] = (datetime.now(), task) - if add_files: - for container in task.config.containers: - self.add_container(container.name) - except Exception: - logging.debug("unable to find task: %s", task_id) + def node_state_updated(self, node: EventNodeStateUpdated) -> None: + self.nodes[node.machine_id] = MiniNode( + machine_id=node.machine_id, pool_name=node.pool_name, state=node.state + ) + + def node_deleted(self, node: EventNodeDeleted) -> None: + if node.machine_id in self.nodes: + del self.nodes[node.machine_id] + + def add_task(self, task: Task) -> None: + self.tasks[task.task_id] = MiniTask( + job_id=task.job_id, + task_id=task.task_id, + type=task.config.task.type, + pool=task.config.pool.pool_name if task.config.pool else "", + state=task.state, + target=task.config.task.target_exe.replace("setup/", "", 0), + containers=task.config.containers, + end_time=task.end_time, + ) + + def task_created(self, event: EventTaskCreated) -> None: + self.tasks[event.task_id] = MiniTask( + job_id=event.job_id, + task_id=event.task_id, + type=event.config.task.type, + pool=event.config.pool.pool_name if event.config.pool else "", + target=event.config.task.target_exe.replace("setup/", "", 0), + containers=event.config.containers, + state=TaskState.init, + ) + + def task_state_updated(self, event: EventTaskStateUpdated) -> None: + if event.task_id in self.tasks: + task = self.tasks[event.task_id] + task.state = event.state + task.end_time = event.end_time + self.tasks[event.task_id] = task + + def task_stopped(self, event: EventTaskStopped) -> None: + if event.task_id in self.tasks: + del self.tasks[event.task_id] def render_tasks(self) -> List: results = [] - for (timestamp, task) in sorted(self.tasks.values(), key=lambda x: x[0]): + for task in self.tasks.values(): job_entry = self.jobs.get(task.job_id) if job_entry: - (_, job) = job_entry - if not self.should_render_job(job): + if not self.should_render_job(job_entry): continue - timestamps, files = self.get_file_counts([task]) - timestamps += [timestamp] + files = self.get_file_counts([task]) - end: Union[str, timedelta] = "" - if task.end_time: - end = task.end_time - datetime.now().astimezone(timezone.utc) - - entry = [ - max(timestamps), - task.state.name, + entry = ( task.job_id, task.task_id, - task.config.task.type.name, - task.config.task.target_exe.replace("setup/", "", 0), + task.state, + task.type.name, + task.target, files, - task.config.pool.pool_name if task.config.pool else "", - end, - ] + task.pool, + task.end_time, + ) results.append(entry) return results @@ -273,27 +328,9 @@ class TopCache: if job_id in self.jobs: return job = self.onefuzz.jobs.get(job_id) - self.add_job(job_id, job.state, job) + self.add_job(job) - def add_job(self, job_id: UUID, state: JobState, job: Optional[Job] = None) -> None: - if state in [JobState.stopping, JobState.stopped]: - if job_id in self.jobs: - del self.jobs[job_id] - return - - if job_id in self.jobs: - (_, job) = self.jobs[job_id] - job.state = state - self.jobs[job_id] = (datetime.now(), job) - else: - try: - if not job: - job = self.onefuzz.jobs.get(job_id) - self.jobs[job_id] = (datetime.now(), job) - except Exception: - logging.debug("unable to find job: %s", job_id) - - def should_render_job(self, job: Job) -> bool: + def should_render_job(self, job: MiniJob) -> bool: if self.job_filters.job_id is not None: if job.job_id not in self.job_filters.job_id: logging.info("skipping:%s", job) @@ -311,27 +348,27 @@ class TopCache: return True + def job_stopped(self, event: EventJobStopped) -> None: + if event.job_id in self.jobs: + del self.jobs[event.job_id] + def render_jobs(self) -> List[Tuple]: results: List[Tuple] = [] - for (timestamp, job) in sorted(self.jobs.values(), key=lambda x: x[0]): + for job in self.jobs.values(): if not self.should_render_job(job): continue - timestamps, files = self.get_file_counts( - self.get_tasks(job.job_id), merge_inputs=True - ) - timestamps += [timestamp] + files = self.get_file_counts(self.get_tasks(job.job_id), merge_inputs=True) for to_remove in [ContainerType.coverage, ContainerType.setup]: if to_remove in files: del files[to_remove] entry = ( - max(timestamps), - job.state.name, job.job_id, "%s:%s:%s" % (job.config.project, job.config.name, job.config.build), + job.user_info.upn if job.user_info else "", files, ) results.append(entry) @@ -339,12 +376,11 @@ class TopCache: return results def get_file_counts( - self, tasks: List[Task], merge_inputs: bool = False - ) -> Tuple[List[datetime], Dict[ContainerType, int]]: - timestamps = [] + self, tasks: List[MiniTask], merge_inputs: bool = False + ) -> Dict[ContainerType, int]: results: Dict[ContainerType, Dict[str, int]] = {} for task in tasks: - for container in task.config.containers: + for container in task.containers: if container.name not in self.files: continue if merge_inputs and container.type == ContainerType.readonly_inputs: @@ -352,22 +388,20 @@ class TopCache: if container.type not in results: results[container.type] = {} results[container.type][container.name] = len( - self.files[container.name][1] + self.files[container.name] ) - container_date = self.files[container.name][0] - if container_date is not None: - timestamps.append(container_date) + results_merged = {} for k, v in results.items(): value = sum(v.values()) if value: results_merged[k] = value - return (timestamps, results_merged) + return results_merged - def get_tasks(self, job_id: UUID) -> List[Task]: + def get_tasks(self, job_id: UUID) -> List[MiniTask]: result = [] - for (_, task) in self.tasks.values(): + for task in self.tasks.values(): if task.job_id == job_id: result.append(task) return result diff --git a/src/cli/onefuzz/status/signalr.py b/src/cli/onefuzz/status/signalr.py index 5cf929f79..f22bcef3e 100644 --- a/src/cli/onefuzz/status/signalr.py +++ b/src/cli/onefuzz/status/signalr.py @@ -40,7 +40,7 @@ class Stream: ) self.hub.on_open(self.on_connect) self.hub.on_close(self.on_close) - self.hub.on("dashboard", handler) + self.hub.on("events", handler) self.logger.info("connecting to signalr") self.hub.start() diff --git a/src/cli/onefuzz/status/top.py b/src/cli/onefuzz/status/top.py index c6297badb..06c6a7dcd 100644 --- a/src/cli/onefuzz/status/top.py +++ b/src/cli/onefuzz/status/top.py @@ -8,9 +8,8 @@ import time from queue import PriorityQueue from threading import Thread from typing import Any, Optional -from uuid import UUID -from onefuzztypes.enums import JobState, NodeState, PoolState, TaskState +from onefuzztypes.events import EventMessage from .cache import JobFilter, TopCache from .signalr import Stream @@ -49,28 +48,12 @@ class Top: def add_container(self, name: str) -> None: if name in self.cache.files: return - self.queue.put((2, (self.cache.add_container, [name, True]))) + self.queue.put((2, (self.cache.add_container, [name]))) def handler(self, message: Any) -> None: - handlers = { - "Node": lambda x: self.cache.add_node( - UUID(x["machine_id"]), NodeState[x["state"]] - ), - "Pool": lambda x: self.cache.add_pool(x["name"], PoolState[x["state"]]), - "Task": lambda x: self.cache.add_task( - UUID(x["task_id"]), TaskState[x["state"]] - ), - "Job": lambda x: self.cache.add_job( - UUID(x["job_id"]), JobState[x["state"]] - ), - "new_file": lambda x: self.cache.add_files( - x["container"], set([x["file"]]) - ), - } - for event in message: - if event["type"] in handlers: - handlers[event["type"]](event["data"]) - self.cache.add_message(event["type"], event["data"]) + for event_raw in message: + message = EventMessage.parse_obj(event_raw) + self.cache.add_message(message) def setup(self) -> Stream: client = Stream(self.onefuzz, self.logger) @@ -80,27 +63,23 @@ class Top: pools = self.onefuzz.pools.list() for pool in pools: - self.cache.add_pool(pool.name, pool.state, pool=pool) + self.cache.add_pool(pool) - nodes = self.onefuzz.nodes.list() - for node in nodes: - self.cache.add_node(node.machine_id, node.state, node=node) - - jobs = self.onefuzz.jobs.list() - - for job in jobs: - self.cache.add_job(job.job_id, job.state, job) + for job in self.onefuzz.jobs.list(): + mini_job = self.cache.add_job(job) # don't add pre-add tasks that we're going to filter out - if not self.cache.should_render_job(job): + if not self.cache.should_render_job(mini_job): continue for task in self.onefuzz.tasks.list(job_id=job.job_id): - self.cache.add_task( - task.task_id, task.state, task=task, add_files=False - ) + self.cache.add_task(task) for container in task.config.containers: self.add_container(container.name) + nodes = self.onefuzz.nodes.list() + for node in nodes: + self.cache.add_node(node) + if client.connected is None: self.logger.info("waiting for signalr connection") while client.connected is None: diff --git a/src/cli/onefuzz/status/top_view.py b/src/cli/onefuzz/status/top_view.py index e5d4a5fa2..615bb883b 100644 --- a/src/cli/onefuzz/status/top_view.py +++ b/src/cli/onefuzz/status/top_view.py @@ -71,7 +71,7 @@ class TopView(Frame): self.add_layout(layout) self.onefuzz_reversed = { - "pools": True, + "pools": False, "jobs": True, "tasks": True, "messages": True, @@ -163,6 +163,8 @@ class TopView(Frame): title.value = "%s: %s" % (name.title(), fmt(len(data))) def update(self, frame_no: int) -> Any: + if len(self.cache.pools) != self.pool_count: + raise ResizeScreenError("resizing because of a differing pool count") if len(self.cache.jobs) != self.job_count: raise ResizeScreenError("resizing because of a differing job count") self.render_base("status", [[now(), "| " + self.cache.endpoint]]) diff --git a/src/pytypes/extra/generate-docs.py b/src/pytypes/extra/generate-docs.py index 2045315df..838874e3e 100755 --- a/src/pytypes/extra/generate-docs.py +++ b/src/pytypes/extra/generate-docs.py @@ -5,17 +5,55 @@ from typing import Optional from uuid import UUID -import json -from onefuzztypes.enums import TaskType, ContainerType, ErrorCode -from onefuzztypes.models import TaskConfig, TaskDetails, TaskContainers, Error, UserInfo -from onefuzztypes.webhooks import ( - WebhookMessage, - WebhookEventPing, - WebhookEventTaskCreated, - WebhookEventTaskStopped, - WebhookEventTaskFailed, +from onefuzztypes.primitives import Region, Container +from onefuzztypes.enums import ( + TaskType, + ContainerType, + ErrorCode, + OS, + Architecture, + NodeState, ) -from onefuzztypes.enums import WebhookEventType +from onefuzztypes.models import ( + TaskConfig, + TaskDetails, + TaskContainers, + TaskState, + Error, + UserInfo, + JobConfig, + Report, + BlobRef, +) +from onefuzztypes.events import ( + Event, + EventPing, + EventCrashReported, + EventFileAdded, + EventTaskCreated, + EventTaskStopped, + EventTaskFailed, + EventProxyCreated, + EventProxyDeleted, + EventProxyFailed, + EventPoolCreated, + EventPoolDeleted, + EventScalesetCreated, + EventScalesetFailed, + EventScalesetDeleted, + EventJobCreated, + EventJobStopped, + EventTaskStateUpdated, + EventNodeStateUpdated, + EventNodeCreated, + EventNodeDeleted, + get_event_type, + EventType, +) +from onefuzztypes.webhooks import WebhookMessage + +EMPTY_SHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" +ZERO_SHA256 = "0" * len(EMPTY_SHA256) def layer(depth: int, title: str, content: Optional[str] = None) -> None: @@ -29,28 +67,9 @@ def typed(depth: int, title: str, content: str, data_type: str) -> None: def main(): - examples = { - WebhookEventType.ping: WebhookEventPing(ping_id=UUID(int=0)), - WebhookEventType.task_stopped: WebhookEventTaskStopped( - job_id=UUID(int=0), - task_id=UUID(int=0), - user_info=UserInfo( - application_id=UUID(int=0), - object_id=UUID(int=0), - upn="example@contoso.com", - ), - ), - WebhookEventType.task_failed: WebhookEventTaskFailed( - job_id=UUID(int=0), - task_id=UUID(int=0), - error=Error(code=ErrorCode.TASK_FAILED, errors=["example error message"]), - user_info=UserInfo( - application_id=UUID(int=0), - object_id=UUID(int=0), - upn="example@contoso.com", - ), - ), - WebhookEventType.task_created: WebhookEventTaskCreated( + examples = [ + EventPing(ping_id=UUID(int=0)), + EventTaskCreated( job_id=UUID(int=0), task_id=UUID(int=0), config=TaskConfig( @@ -75,13 +94,122 @@ def main(): upn="example@contoso.com", ), ), - } + EventTaskStopped( + job_id=UUID(int=0), + task_id=UUID(int=0), + user_info=UserInfo( + application_id=UUID(int=0), + object_id=UUID(int=0), + upn="example@contoso.com", + ), + ), + EventTaskFailed( + job_id=UUID(int=0), + task_id=UUID(int=0), + error=Error(code=ErrorCode.TASK_FAILED, errors=["example error message"]), + user_info=UserInfo( + application_id=UUID(int=0), + object_id=UUID(int=0), + upn="example@contoso.com", + ), + ), + EventTaskStateUpdated( + job_id=UUID(int=0), task_id=UUID(int=0), state=TaskState.init + ), + EventProxyCreated(region=Region("eastus")), + EventProxyDeleted(region=Region("eastus")), + EventProxyFailed( + region=Region("eastus"), + error=Error(code=ErrorCode.PROXY_FAILED, errors=["example error message"]), + ), + EventPoolCreated( + pool_name="example", os=OS.linux, arch=Architecture.x86_64, managed=True + ), + EventPoolDeleted(pool_name="example"), + EventScalesetCreated( + scaleset_id=UUID(int=0), + pool_name="example", + vm_sku="Standard_D2s_v3", + image="Canonical:UbuntuServer:18.04-LTS:latest", + region=Region("eastus"), + size=10, + ), + EventScalesetFailed( + scaleset_id=UUID(int=0), + pool_name="example", + error=Error( + code=ErrorCode.UNABLE_TO_RESIZE, errors=["example error message"] + ), + ), + EventScalesetDeleted(scaleset_id=UUID(int=0), pool_name="example"), + EventJobCreated( + job_id=UUID(int=0), + config=JobConfig( + project="example project", + name="example name", + build="build 1", + duration=24, + ), + ), + EventJobStopped( + job_id=UUID(int=0), + config=JobConfig( + project="example project", + name="example name", + build="build 1", + duration=24, + ), + ), + EventNodeCreated(machine_id=UUID(int=0), pool_name="example"), + EventNodeDeleted(machine_id=UUID(int=0), pool_name="example"), + EventNodeStateUpdated( + machine_id=UUID(int=0), pool_name="example", state=NodeState.setting_up + ), + EventCrashReported( + container=Container("container-name"), + filename="example.json", + report=Report( + input_blob=BlobRef( + account="contoso-storage-account", + container=Container("crashes"), + name="input.txt", + ), + executable="fuzz.exe", + crash_type="example crash report type", + crash_site="example crash site", + call_stack=["#0 line", "#1 line", "#2 line"], + call_stack_sha256=ZERO_SHA256, + input_sha256=EMPTY_SHA256, + asan_log="example asan log", + task_id=UUID(int=0), + job_id=UUID(int=0), + scariness_score=10, + scariness_description="example-scariness", + ), + ), + EventFileAdded(container=Container("container-name"), filename="example.txt"), + ] + + for event in Event.__args__: + seen = False + for value in examples: + if isinstance(value, event): + seen = True + break + assert seen, "missing event type definition: %s" % event.__name__ + + event_types = [get_event_type(x) for x in examples] + + for event_type in EventType: + assert event_type in event_types, ( + "missing event type definition: %s" % event_type.name + ) message = WebhookMessage( webhook_id=UUID(int=0), event_id=UUID(int=0), - event_type=WebhookEventType.ping, - event=examples[WebhookEventType.ping], + event_type=EventType.ping, + event=EventPing(ping_id=UUID(int=0)), ) layer( @@ -96,11 +224,18 @@ def main(): ) typed(3, "Example", message.json(indent=4, exclude_none=True), "json") - layer(2, "Event Types (WebhookEventType)") + layer(2, "Event Types (EventType)") - for webhook_type in WebhookEventType: - example = examples[webhook_type] - layer(3, webhook_type.name) + event_map = {get_event_type(x).name: x for x in examples} + + for name in sorted(event_map.keys()): + print(f"* [{name}](#{name})") + + print() + + for name in sorted(event_map.keys()): + example = event_map[name] + layer(3, name) typed(4, "Example", example.json(indent=4, exclude_none=True), "json") typed(4, "Schema", example.schema_json(indent=4), "json") diff --git a/src/pytypes/onefuzztypes/enums.py b/src/pytypes/onefuzztypes/enums.py index d7d1d4e57..47c1db3f9 100644 --- a/src/pytypes/onefuzztypes/enums.py +++ b/src/pytypes/onefuzztypes/enums.py @@ -252,6 +252,7 @@ class ErrorCode(Enum): INVALID_NODE = 469 NOTIFICATION_FAILURE = 470 UNABLE_TO_UPDATE = 471 + PROXY_FAILED = 472 class HeartbeatType(Enum): @@ -368,13 +369,6 @@ class TaskDebugFlag(Enum): keep_node_on_completion = "keep_node_on_completion" -class WebhookEventType(Enum): - task_created = "task_created" - task_stopped = "task_stopped" - task_failed = "task_failed" - ping = "ping" - - class WebhookMessageState(Enum): queued = "queued" retrying = "retrying" diff --git a/src/pytypes/onefuzztypes/events.py b/src/pytypes/onefuzztypes/events.py new file mode 100644 index 000000000..7ee086799 --- /dev/null +++ b/src/pytypes/onefuzztypes/events.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from datetime import datetime +from enum import Enum +from typing import Optional, Union +from uuid import UUID, uuid4 + +from pydantic import BaseModel, Extra, Field + +from .enums import OS, Architecture, NodeState, TaskState +from .models import AutoScaleConfig, Error, JobConfig, Report, TaskConfig, UserInfo +from .primitives import Container, Region +from .responses import BaseResponse + + +class BaseEvent(BaseModel): + class Config: + extra = Extra.forbid + + +class EventTaskStopped(BaseEvent): + job_id: UUID + task_id: UUID + user_info: Optional[UserInfo] + + +class EventTaskFailed(BaseEvent): + job_id: UUID + task_id: UUID + error: Error + user_info: Optional[UserInfo] + + +class EventJobCreated(BaseEvent): + job_id: UUID + config: JobConfig + user_info: Optional[UserInfo] + + +class EventJobStopped(BaseEvent): + job_id: UUID + config: JobConfig + user_info: Optional[UserInfo] + + +class EventTaskCreated(BaseEvent): + job_id: UUID + task_id: UUID + config: TaskConfig + user_info: Optional[UserInfo] + + +class EventTaskStateUpdated(BaseEvent): + job_id: UUID + task_id: UUID + state: TaskState + end_time: Optional[datetime] + + +class EventPing(BaseResponse): + ping_id: UUID + + +class EventScalesetCreated(BaseEvent): + scaleset_id: UUID + pool_name: str + vm_sku: str + image: str + region: Region + size: int + + +class EventScalesetFailed(BaseEvent): + scaleset_id: UUID + pool_name: str + error: Error + + +class EventScalesetDeleted(BaseEvent): + scaleset_id: UUID + pool_name: str + + +class EventPoolDeleted(BaseEvent): + pool_name: str + + +class EventPoolCreated(BaseEvent): + pool_name: str + os: OS + arch: Architecture + managed: bool + autoscale: Optional[AutoScaleConfig] + + +class EventProxyCreated(BaseEvent): + region: Region + + +class EventProxyDeleted(BaseEvent): + region: Region + + +class EventProxyFailed(BaseEvent): + region: Region + error: Error + + +class EventNodeCreated(BaseEvent): + machine_id: UUID + scaleset_id: Optional[UUID] + pool_name: str + + +class EventNodeDeleted(BaseEvent): + machine_id: UUID + scaleset_id: Optional[UUID] + pool_name: str + + +class EventNodeStateUpdated(BaseEvent): + machine_id: UUID + scaleset_id: Optional[UUID] + pool_name: str + state: NodeState + + +class EventCrashReported(BaseEvent): + report: Report + container: Container + filename: str + + +class EventFileAdded(BaseEvent): + container: Container + filename: str + + +Event = Union[ + EventJobCreated, + EventJobStopped, + EventNodeStateUpdated, + EventNodeCreated, + EventNodeDeleted, + EventPing, + EventPoolCreated, + EventPoolDeleted, + EventProxyFailed, + EventProxyCreated, + EventProxyDeleted, + EventScalesetFailed, + EventScalesetCreated, + EventScalesetDeleted, + EventTaskFailed, + EventTaskStateUpdated, + EventTaskCreated, + EventTaskStopped, + EventCrashReported, + EventFileAdded, +] + + +class EventType(Enum): + job_created = "job_created" + job_stopped = "job_stopped" + node_created = "node_created" + node_deleted = "node_deleted" + node_state_updated = "node_state_updated" + ping = "ping" + pool_created = "pool_created" + pool_deleted = "pool_deleted" + proxy_created = "proxy_created" + proxy_deleted = "proxy_deleted" + proxy_failed = "proxy_failed" + scaleset_created = "scaleset_created" + scaleset_deleted = "scaleset_deleted" + scaleset_failed = "scaleset_failed" + task_created = "task_created" + task_failed = "task_failed" + task_state_updated = "task_state_updated" + task_stopped = "task_stopped" + crash_reported = "crash_reported" + file_added = "file_added" + + +EventTypeMap = { + EventType.job_created: EventJobCreated, + EventType.job_stopped: EventJobStopped, + EventType.node_created: EventNodeCreated, + EventType.node_deleted: EventNodeDeleted, + EventType.node_state_updated: EventNodeStateUpdated, + EventType.ping: EventPing, + EventType.pool_created: EventPoolCreated, + EventType.pool_deleted: EventPoolDeleted, + EventType.proxy_created: EventProxyCreated, + EventType.proxy_deleted: EventProxyDeleted, + EventType.proxy_failed: EventProxyFailed, + EventType.scaleset_created: EventScalesetCreated, + EventType.scaleset_deleted: EventScalesetDeleted, + EventType.scaleset_failed: EventScalesetFailed, + EventType.task_created: EventTaskCreated, + EventType.task_failed: EventTaskFailed, + EventType.task_state_updated: EventTaskStateUpdated, + EventType.task_stopped: EventTaskStopped, + EventType.crash_reported: EventCrashReported, + EventType.file_added: EventFileAdded, +} + + +def get_event_type(event: Event) -> EventType: + + for (event_type, event_class) in EventTypeMap.items(): + if isinstance(event, event_class): + return event_type + + raise NotImplementedError("unsupported event type: %s" % type(event)) + + +class EventMessage(BaseEvent): + event_id: UUID = Field(default_factory=uuid4) + event_type: EventType + event: Event diff --git a/src/pytypes/onefuzztypes/requests.py b/src/pytypes/onefuzztypes/requests.py index 2c65c5568..603367a46 100644 --- a/src/pytypes/onefuzztypes/requests.py +++ b/src/pytypes/onefuzztypes/requests.py @@ -17,8 +17,8 @@ from .enums import ( PoolState, ScalesetState, TaskState, - WebhookEventType, ) +from .events import EventType from .models import AutoScaleConfig, NotificationConfig from .primitives import Container, PoolName, Region @@ -215,7 +215,7 @@ class CanScheduleRequest(BaseRequest): class WebhookCreate(BaseRequest): name: str url: AnyHttpUrl - event_types: List[WebhookEventType] + event_types: List[EventType] secret_token: Optional[str] @@ -230,7 +230,7 @@ class WebhookGet(BaseModel): class WebhookUpdate(BaseModel): webhook_id: UUID name: Optional[str] - event_types: Optional[List[WebhookEventType]] + event_types: Optional[List[EventType]] url: Optional[AnyHttpUrl] secret_token: Optional[str] diff --git a/src/pytypes/onefuzztypes/webhooks.py b/src/pytypes/onefuzztypes/webhooks.py index 545f1285d..1ad661bc3 100644 --- a/src/pytypes/onefuzztypes/webhooks.py +++ b/src/pytypes/onefuzztypes/webhooks.py @@ -3,53 +3,17 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from typing import List, Optional, Union +from typing import List, Optional from uuid import UUID, uuid4 from pydantic import AnyHttpUrl, BaseModel, Field -from .enums import WebhookEventType, WebhookMessageState -from .models import Error, TaskConfig, UserInfo -from .responses import BaseResponse +from .enums import WebhookMessageState +from .events import EventMessage, EventType -class WebhookEventTaskStopped(BaseModel): - job_id: UUID - task_id: UUID - user_info: Optional[UserInfo] - - -class WebhookEventTaskFailed(BaseModel): - job_id: UUID - task_id: UUID - error: Error - user_info: Optional[UserInfo] - - -class WebhookEventTaskCreated(BaseModel): - job_id: UUID - task_id: UUID - config: TaskConfig - user_info: Optional[UserInfo] - - -class WebhookEventPing(BaseResponse): - ping_id: UUID = Field(default_factory=uuid4) - - -WebhookEvent = Union[ - WebhookEventTaskCreated, - WebhookEventTaskStopped, - WebhookEventTaskFailed, - WebhookEventPing, -] - - -class WebhookMessage(BaseModel): +class WebhookMessage(EventMessage): webhook_id: UUID - event_id: UUID = Field(default_factory=uuid4) - event_type: WebhookEventType - event: WebhookEvent class WebhookMessageLog(WebhookMessage): @@ -61,5 +25,5 @@ class Webhook(BaseModel): webhook_id: UUID = Field(default_factory=uuid4) name: str url: Optional[AnyHttpUrl] - event_types: List[WebhookEventType] + event_types: List[EventType] secret_token: Optional[str]