diff --git a/src/api-service/__app__/onefuzzlib/agent_events.py b/src/api-service/__app__/onefuzzlib/agent_events.py index 0f5fdab1b..011fc204c 100644 --- a/src/api-service/__app__/onefuzzlib/agent_events.py +++ b/src/api-service/__app__/onefuzzlib/agent_events.py @@ -46,6 +46,12 @@ def on_state_update( state = state_update.state node = get_node(machine_id) if isinstance(node, Error): + if state == NodeState.done: + logging.warning( + "unable to process state update event. machine_id:" + f"{machine_id} state event:{state_update} error:{node}" + ) + return None return node if state == NodeState.free: diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index 5a3a76dc5..bc8809fd7 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -151,7 +151,7 @@ class Node(BASE_NODE, ORMMixin): # are made concurrently. By performing this check regularly, any nodes # that hit this race condition will get cleaned up. for node in cls.search_states(states=[NodeState.busy]): - node.stop_if_complete() + node.stop_if_complete(True) @classmethod def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]: @@ -209,7 +209,7 @@ class Node(BASE_NODE, ORMMixin): node.machine_id, ) - def stop_if_complete(self) -> bool: + def stop_if_complete(self, done: bool = False) -> bool: # returns True on stopping the node and False if this doesn't stop the node from ..tasks.main import Task @@ -228,7 +228,7 @@ class Node(BASE_NODE, ORMMixin): "node: stopping busy node with all tasks complete: %s", self.machine_id, ) - self.stop(done=True) + self.stop(done=done) return True def mark_tasks_stopped_early(self, error: Optional[Error] = None) -> None: diff --git a/src/api-service/__app__/onefuzzlib/workers/scalesets.py b/src/api-service/__app__/onefuzzlib/workers/scalesets.py index 2e1921bce..31b651a50 100644 --- a/src/api-service/__app__/onefuzzlib/workers/scalesets.py +++ b/src/api-service/__app__/onefuzzlib/workers/scalesets.py @@ -607,6 +607,9 @@ class Scaleset(BASE_SCALESET, ORMMixin): machine_ids = set() for node in nodes: + if node.state != NodeState.done: + continue + if node.debug_keep_node: logging.warning( SCALESET_LOG_PREFIX + "not reimaging manually overridden node. " diff --git a/src/api-service/__app__/queue_node_heartbeat/__init__.py b/src/api-service/__app__/queue_node_heartbeat/__init__.py index 76fe8b2fc..fab7a5eff 100644 --- a/src/api-service/__app__/queue_node_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_node_heartbeat/__init__.py @@ -24,7 +24,7 @@ def main(msg: func.QueueMessage) -> None: entry = NodeHeartbeatEntry.parse_obj(raw) node = Node.get_by_machine_id(entry.node_id) if not node: - logging.error("invalid node id: %s", entry.node_id) + logging.warning("invalid node id: %s", entry.node_id) return node.heartbeat = datetime.datetime.utcnow() node.save()