Integration tests reliability fixes (#1505)

* only reimage nodes that are in the done state

* ignore done message when the node is deleted

* log warning instead of error when receiving a heartbeat from a deleted node
This commit is contained in:
Cheick Keita
2021-12-03 10:08:30 -08:00
committed by GitHub
parent d8fd5d5ce5
commit 08691c007f
4 changed files with 13 additions and 4 deletions

View File

@ -46,6 +46,12 @@ def on_state_update(
state = state_update.state state = state_update.state
node = get_node(machine_id) node = get_node(machine_id)
if isinstance(node, Error): if isinstance(node, Error):
if state == NodeState.done:
logging.warning(
"unable to process state update event. machine_id:"
f"{machine_id} state event:{state_update} error:{node}"
)
return None
return node return node
if state == NodeState.free: if state == NodeState.free:

View File

@ -151,7 +151,7 @@ class Node(BASE_NODE, ORMMixin):
# are made concurrently. By performing this check regularly, any nodes # are made concurrently. By performing this check regularly, any nodes
# that hit this race condition will get cleaned up. # that hit this race condition will get cleaned up.
for node in cls.search_states(states=[NodeState.busy]): for node in cls.search_states(states=[NodeState.busy]):
node.stop_if_complete() node.stop_if_complete(True)
@classmethod @classmethod
def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]: def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]:
@ -209,7 +209,7 @@ class Node(BASE_NODE, ORMMixin):
node.machine_id, node.machine_id,
) )
def stop_if_complete(self) -> bool: def stop_if_complete(self, done: bool = False) -> bool:
# returns True on stopping the node and False if this doesn't stop the node # returns True on stopping the node and False if this doesn't stop the node
from ..tasks.main import Task from ..tasks.main import Task
@ -228,7 +228,7 @@ class Node(BASE_NODE, ORMMixin):
"node: stopping busy node with all tasks complete: %s", "node: stopping busy node with all tasks complete: %s",
self.machine_id, self.machine_id,
) )
self.stop(done=True) self.stop(done=done)
return True return True
def mark_tasks_stopped_early(self, error: Optional[Error] = None) -> None: def mark_tasks_stopped_early(self, error: Optional[Error] = None) -> None:

View File

@ -607,6 +607,9 @@ class Scaleset(BASE_SCALESET, ORMMixin):
machine_ids = set() machine_ids = set()
for node in nodes: for node in nodes:
if node.state != NodeState.done:
continue
if node.debug_keep_node: if node.debug_keep_node:
logging.warning( logging.warning(
SCALESET_LOG_PREFIX + "not reimaging manually overridden node. " SCALESET_LOG_PREFIX + "not reimaging manually overridden node. "

View File

@ -24,7 +24,7 @@ def main(msg: func.QueueMessage) -> None:
entry = NodeHeartbeatEntry.parse_obj(raw) entry = NodeHeartbeatEntry.parse_obj(raw)
node = Node.get_by_machine_id(entry.node_id) node = Node.get_by_machine_id(entry.node_id)
if not node: if not node:
logging.error("invalid node id: %s", entry.node_id) logging.warning("invalid node id: %s", entry.node_id)
return return
node.heartbeat = datetime.datetime.utcnow() node.heartbeat = datetime.datetime.utcnow()
node.save() node.save()