Store the heartbeat data in the task and node tables (#164)

This commit is contained in:
Cheick Keita
2020-10-20 11:24:00 -07:00
committed by GitHub
parent ecd322bfdc
commit 178537df05
6 changed files with 31 additions and 134 deletions

View File

@ -9,7 +9,6 @@ from onefuzztypes.models import Error
from onefuzztypes.requests import NodeGet, NodeSearch from onefuzztypes.requests import NodeGet, NodeSearch
from onefuzztypes.responses import BoolResult from onefuzztypes.responses import BoolResult
from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node, NodeTasks from ..onefuzzlib.pools import Node, NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request from ..onefuzzlib.request import not_ok, ok, parse_request
@ -32,7 +31,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:
node_tasks = NodeTasks.get_by_machine_id(request.machine_id) node_tasks = NodeTasks.get_by_machine_id(request.machine_id)
node.tasks = [(t.task_id, t.state) for t in node_tasks] node.tasks = [(t.task_id, t.state) for t in node_tasks]
node.heartbeats = NodeHeartbeat.get_heartbeats(request.machine_id)
return ok(node) return ok(node)

View File

@ -1,97 +0,0 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from typing import Dict, List, Tuple
from uuid import UUID
from onefuzztypes.models import NodeHeartbeat as BASE_NODE_HEARTBEAT
from onefuzztypes.models import NodeHeartbeatEntry, NodeHeartbeatSummary
from onefuzztypes.models import TaskHeartbeat as BASE_TASK_HEARTBEAT
from onefuzztypes.models import TaskHeartbeatEntry, TaskHeartbeatSummary
from pydantic import ValidationError
from .orm import ORMMixin
class TaskHeartbeat(BASE_TASK_HEARTBEAT, ORMMixin):
@classmethod
def add(cls, entry: TaskHeartbeatEntry) -> None:
for value in entry.data:
heartbeat_id = "-".join([str(entry.machine_id), value["type"].name])
heartbeat = cls(
task_id=entry.task_id,
heartbeat_id=heartbeat_id,
machine_id=entry.machine_id,
heartbeat_type=value["type"],
)
heartbeat.save()
@classmethod
def try_add(cls, raw: Dict) -> bool:
try:
entry = TaskHeartbeatEntry.parse_obj(raw)
cls.add(entry)
return True
except ValidationError:
return False
@classmethod
def get_heartbeats(cls, task_id: UUID) -> List[TaskHeartbeatSummary]:
entries = cls.search(query={"task_id": [task_id]})
result = []
for entry in entries:
result.append(
TaskHeartbeatSummary(
timestamp=entry.Timestamp,
machine_id=entry.machine_id,
type=entry.heartbeat_type,
)
)
return result
@classmethod
def key_fields(cls) -> Tuple[str, str]:
return ("task_id", "heartbeat_id")
class NodeHeartbeat(BASE_NODE_HEARTBEAT, ORMMixin):
@classmethod
def add(cls, entry: NodeHeartbeatEntry) -> None:
for value in entry.data:
heartbeat_id = "-".join([str(entry.node_id), value["type"].name])
heartbeat = cls(
heartbeat_id=heartbeat_id,
node_id=entry.node_id,
heartbeat_type=value["type"],
)
heartbeat.save()
@classmethod
def try_add(cls, raw: Dict) -> bool:
try:
entry = NodeHeartbeatEntry.parse_obj(raw)
cls.add(entry)
return True
except ValidationError:
return False
@classmethod
def get_heartbeats(cls, node_id: UUID) -> List[NodeHeartbeatSummary]:
entries = cls.search(query={"node_id": [node_id]})
result = []
for entry in entries:
result.append(
NodeHeartbeatSummary(
timestamp=entry.Timestamp,
type=entry.heartbeat_type,
)
)
return result
@classmethod
def key_fields(cls) -> Tuple[str, str]:
return ("node_id", "heartbeat_id")

View File

@ -3,22 +3,32 @@
# Copyright (c) Microsoft Corporation. # Copyright (c) Microsoft Corporation.
# Licensed under the MIT License. # Licensed under the MIT License.
import datetime
import json import json
import logging import logging
import azure.functions as func import azure.functions as func
from onefuzztypes.models import NodeHeartbeatEntry
from pydantic import ValidationError
from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import NodeHeartbeat from ..onefuzzlib.pools import Node
def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
body = msg.get_body() body = msg.get_body()
logging.info("heartbeat: %s", body) logging.info("heartbeat: %s", body)
raw = json.loads(body) raw = json.loads(body)
try:
NodeHeartbeat.try_add(raw) entry = NodeHeartbeatEntry.parse_obj(raw)
node = Node.get_by_machine_id(entry.node_id)
if not node:
logging.error("invalid node id: %s", entry.node_id)
return
node.heartbeat = datetime.datetime.utcnow()
node.save()
except ValidationError:
logging.error("invalid node heartbeat: %s", raw)
event = get_event() event = get_event()
if event: if event:

View File

@ -5,11 +5,14 @@
import json import json
import logging import logging
from datetime import datetime
import azure.functions as func import azure.functions as func
from onefuzztypes.models import Error, TaskHeartbeatEntry
from pydantic import ValidationError
from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import TaskHeartbeat from ..onefuzzlib.tasks.main import Task
def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
@ -17,8 +20,17 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
logging.info("heartbeat: %s", body) logging.info("heartbeat: %s", body)
raw = json.loads(body) raw = json.loads(body)
try:
TaskHeartbeat.try_add(raw) entry = TaskHeartbeatEntry.parse_obj(raw)
task = Task.get_by_task_id(entry.task_id)
if isinstance(task, Error):
logging.error(task)
return
if task:
task.heartbeat = datetime.utcnow()
task.save()
except ValidationError:
logging.error("invalid task heartbat: %s", raw)
event = get_event() event = get_event()
if event: if event:

View File

@ -10,7 +10,6 @@ from onefuzztypes.models import Error, TaskConfig
from onefuzztypes.requests import TaskGet, TaskSearch from onefuzztypes.requests import TaskGet, TaskSearch
from onefuzztypes.responses import BoolResult from onefuzztypes.responses import BoolResult
from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.jobs import Job from ..onefuzzlib.jobs import Job
from ..onefuzzlib.pools import NodeTasks from ..onefuzzlib.pools import NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request from ..onefuzzlib.request import not_ok, ok, parse_request
@ -72,7 +71,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:
task = Task.get_by_task_id(request.task_id) task = Task.get_by_task_id(request.task_id)
if isinstance(task, Error): if isinstance(task, Error):
return not_ok(task, context=request.task_id) return not_ok(task, context=request.task_id)
task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id)
task.nodes = NodeTasks.get_node_assignments(request.task_id) task.nodes = NodeTasks.get_node_assignments(request.task_id)
task.events = TaskEvent.get_summary(request.task_id) task.events = TaskEvent.get_summary(request.task_id)
return ok(task) return ok(task)

View File

@ -429,42 +429,18 @@ class TaskHeartbeatEntry(BaseModel):
data: List[Dict[str, HeartbeatType]] data: List[Dict[str, HeartbeatType]]
class TaskHeartbeatSummary(BaseModel):
machine_id: UUID
timestamp: Optional[datetime]
type: HeartbeatType
class TaskHeartbeat(BaseModel):
task_id: UUID
heartbeat_id: str
machine_id: UUID
heartbeat_type: HeartbeatType
class NodeHeartbeatEntry(BaseModel): class NodeHeartbeatEntry(BaseModel):
node_id: UUID node_id: UUID
data: List[Dict[str, HeartbeatType]] data: List[Dict[str, HeartbeatType]]
class NodeHeartbeatSummary(BaseModel):
timestamp: Optional[datetime]
type: HeartbeatType
class NodeHeartbeat(BaseModel):
heartbeat_id: str
node_id: UUID
heartbeat_type: HeartbeatType
class Node(BaseModel): class Node(BaseModel):
pool_name: PoolName pool_name: PoolName
machine_id: UUID machine_id: UUID
state: NodeState = Field(default=NodeState.init) state: NodeState = Field(default=NodeState.init)
scaleset_id: Optional[UUID] = None scaleset_id: Optional[UUID] = None
tasks: Optional[List[Tuple[UUID, NodeTaskState]]] = None tasks: Optional[List[Tuple[UUID, NodeTaskState]]] = None
heartbeats: Optional[List[NodeHeartbeatSummary]] heartbeat: Optional[datetime]
version: str = Field(default="1.0.0") version: str = Field(default="1.0.0")
reimage_requested: bool = Field(default=False) reimage_requested: bool = Field(default=False)
delete_requested: bool = Field(default=False) delete_requested: bool = Field(default=False)
@ -712,7 +688,7 @@ class Task(BaseModel):
config: TaskConfig config: TaskConfig
error: Optional[Error] error: Optional[Error]
auth: Optional[Authentication] auth: Optional[Authentication]
heartbeats: Optional[List[TaskHeartbeatSummary]] heartbeat: Optional[datetime]
end_time: Optional[datetime] end_time: Optional[datetime]
events: Optional[List[TaskEventSummary]] events: Optional[List[TaskEventSummary]]
nodes: Optional[List[NodeAssignment]] nodes: Optional[List[NodeAssignment]]