diff --git a/src/api-service/__app__/node/__init__.py b/src/api-service/__app__/node/__init__.py index ad68ec136..b6dd64476 100644 --- a/src/api-service/__app__/node/__init__.py +++ b/src/api-service/__app__/node/__init__.py @@ -9,7 +9,6 @@ from onefuzztypes.models import Error from onefuzztypes.requests import NodeGet, NodeSearch from onefuzztypes.responses import BoolResult -from ..onefuzzlib.heartbeat import NodeHeartbeat from ..onefuzzlib.pools import Node, NodeTasks from ..onefuzzlib.request import not_ok, ok, parse_request @@ -32,7 +31,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse: node_tasks = NodeTasks.get_by_machine_id(request.machine_id) node.tasks = [(t.task_id, t.state) for t in node_tasks] - node.heartbeats = NodeHeartbeat.get_heartbeats(request.machine_id) return ok(node) diff --git a/src/api-service/__app__/onefuzzlib/heartbeat.py b/src/api-service/__app__/onefuzzlib/heartbeat.py deleted file mode 100644 index ab4b674db..000000000 --- a/src/api-service/__app__/onefuzzlib/heartbeat.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -from typing import Dict, List, Tuple -from uuid import UUID - -from onefuzztypes.models import NodeHeartbeat as BASE_NODE_HEARTBEAT -from onefuzztypes.models import NodeHeartbeatEntry, NodeHeartbeatSummary -from onefuzztypes.models import TaskHeartbeat as BASE_TASK_HEARTBEAT -from onefuzztypes.models import TaskHeartbeatEntry, TaskHeartbeatSummary -from pydantic import ValidationError - -from .orm import ORMMixin - - -class TaskHeartbeat(BASE_TASK_HEARTBEAT, ORMMixin): - @classmethod - def add(cls, entry: TaskHeartbeatEntry) -> None: - for value in entry.data: - heartbeat_id = "-".join([str(entry.machine_id), value["type"].name]) - heartbeat = cls( - task_id=entry.task_id, - heartbeat_id=heartbeat_id, - machine_id=entry.machine_id, - heartbeat_type=value["type"], - ) - heartbeat.save() - - @classmethod - def try_add(cls, raw: Dict) -> bool: - try: - entry = TaskHeartbeatEntry.parse_obj(raw) - cls.add(entry) - return True - except ValidationError: - return False - - @classmethod - def get_heartbeats(cls, task_id: UUID) -> List[TaskHeartbeatSummary]: - entries = cls.search(query={"task_id": [task_id]}) - - result = [] - for entry in entries: - result.append( - TaskHeartbeatSummary( - timestamp=entry.Timestamp, - machine_id=entry.machine_id, - type=entry.heartbeat_type, - ) - ) - return result - - @classmethod - def key_fields(cls) -> Tuple[str, str]: - return ("task_id", "heartbeat_id") - - -class NodeHeartbeat(BASE_NODE_HEARTBEAT, ORMMixin): - @classmethod - def add(cls, entry: NodeHeartbeatEntry) -> None: - for value in entry.data: - heartbeat_id = "-".join([str(entry.node_id), value["type"].name]) - heartbeat = cls( - heartbeat_id=heartbeat_id, - node_id=entry.node_id, - heartbeat_type=value["type"], - ) - heartbeat.save() - - @classmethod - def try_add(cls, raw: Dict) -> bool: - try: - entry = NodeHeartbeatEntry.parse_obj(raw) - cls.add(entry) - return True - except ValidationError: - return False - - @classmethod - def get_heartbeats(cls, node_id: UUID) -> List[NodeHeartbeatSummary]: - entries = cls.search(query={"node_id": [node_id]}) - - result = [] - for entry in entries: - result.append( - NodeHeartbeatSummary( - timestamp=entry.Timestamp, - type=entry.heartbeat_type, - ) - ) - return result - - @classmethod - def key_fields(cls) -> Tuple[str, str]: - return ("node_id", "heartbeat_id") diff --git a/src/api-service/__app__/queue_node_heartbeat/__init__.py b/src/api-service/__app__/queue_node_heartbeat/__init__.py index 31e81d657..4a413b48b 100644 --- a/src/api-service/__app__/queue_node_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_node_heartbeat/__init__.py @@ -3,22 +3,32 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import datetime import json import logging import azure.functions as func +from onefuzztypes.models import NodeHeartbeatEntry +from pydantic import ValidationError from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.heartbeat import NodeHeartbeat +from ..onefuzzlib.pools import Node def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: body = msg.get_body() logging.info("heartbeat: %s", body) - raw = json.loads(body) - - NodeHeartbeat.try_add(raw) + try: + entry = NodeHeartbeatEntry.parse_obj(raw) + node = Node.get_by_machine_id(entry.node_id) + if not node: + logging.error("invalid node id: %s", entry.node_id) + return + node.heartbeat = datetime.datetime.utcnow() + node.save() + except ValidationError: + logging.error("invalid node heartbeat: %s", raw) event = get_event() if event: diff --git a/src/api-service/__app__/queue_task_heartbeat/__init__.py b/src/api-service/__app__/queue_task_heartbeat/__init__.py index ecae1180a..b8b43d104 100644 --- a/src/api-service/__app__/queue_task_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_task_heartbeat/__init__.py @@ -5,11 +5,14 @@ import json import logging +from datetime import datetime import azure.functions as func +from onefuzztypes.models import Error, TaskHeartbeatEntry +from pydantic import ValidationError from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.heartbeat import TaskHeartbeat +from ..onefuzzlib.tasks.main import Task def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: @@ -17,8 +20,17 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: logging.info("heartbeat: %s", body) raw = json.loads(body) - - TaskHeartbeat.try_add(raw) + try: + entry = TaskHeartbeatEntry.parse_obj(raw) + task = Task.get_by_task_id(entry.task_id) + if isinstance(task, Error): + logging.error(task) + return + if task: + task.heartbeat = datetime.utcnow() + task.save() + except ValidationError: + logging.error("invalid task heartbat: %s", raw) event = get_event() if event: diff --git a/src/api-service/__app__/tasks/__init__.py b/src/api-service/__app__/tasks/__init__.py index ec9bc9287..fc4a90eb0 100644 --- a/src/api-service/__app__/tasks/__init__.py +++ b/src/api-service/__app__/tasks/__init__.py @@ -10,7 +10,6 @@ from onefuzztypes.models import Error, TaskConfig from onefuzztypes.requests import TaskGet, TaskSearch from onefuzztypes.responses import BoolResult -from ..onefuzzlib.heartbeat import TaskHeartbeat from ..onefuzzlib.jobs import Job from ..onefuzzlib.pools import NodeTasks from ..onefuzzlib.request import not_ok, ok, parse_request @@ -72,7 +71,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse: task = Task.get_by_task_id(request.task_id) if isinstance(task, Error): return not_ok(task, context=request.task_id) - task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id) task.nodes = NodeTasks.get_node_assignments(request.task_id) task.events = TaskEvent.get_summary(request.task_id) return ok(task) diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index b51051197..5475baa6f 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -429,42 +429,18 @@ class TaskHeartbeatEntry(BaseModel): data: List[Dict[str, HeartbeatType]] -class TaskHeartbeatSummary(BaseModel): - machine_id: UUID - timestamp: Optional[datetime] - type: HeartbeatType - - -class TaskHeartbeat(BaseModel): - task_id: UUID - heartbeat_id: str - machine_id: UUID - heartbeat_type: HeartbeatType - - class NodeHeartbeatEntry(BaseModel): node_id: UUID data: List[Dict[str, HeartbeatType]] -class NodeHeartbeatSummary(BaseModel): - timestamp: Optional[datetime] - type: HeartbeatType - - -class NodeHeartbeat(BaseModel): - heartbeat_id: str - node_id: UUID - heartbeat_type: HeartbeatType - - class Node(BaseModel): pool_name: PoolName machine_id: UUID state: NodeState = Field(default=NodeState.init) scaleset_id: Optional[UUID] = None tasks: Optional[List[Tuple[UUID, NodeTaskState]]] = None - heartbeats: Optional[List[NodeHeartbeatSummary]] + heartbeat: Optional[datetime] version: str = Field(default="1.0.0") reimage_requested: bool = Field(default=False) delete_requested: bool = Field(default=False) @@ -712,7 +688,7 @@ class Task(BaseModel): config: TaskConfig error: Optional[Error] auth: Optional[Authentication] - heartbeats: Optional[List[TaskHeartbeatSummary]] + heartbeat: Optional[datetime] end_time: Optional[datetime] events: Optional[List[TaskEventSummary]] nodes: Optional[List[NodeAssignment]]