send EventTaskFailed and EventTaskStopped once the task is stopped (#651)

As is, these events are sent once the task enters the state `stopping`.
However, the tasks can still be running on the VMs which can be
confusing.
This commit is contained in:
bmc-msft
2021-03-11 20:48:28 -05:00
committed by GitHub
parent 07f7f6fa48
commit 6888fc8fb8

View File

@ -132,7 +132,7 @@ class Task(BASE_TASK, ORMMixin):
ProxyForward.remove_forward(self.task_id)
delete_queue(str(self.task_id), StorageType.corpus)
Node.stop_task(self.task_id)
self.set_state(TaskState.stopped, send=False)
self.set_state(TaskState.stopped)
job = Job.get(self.job_id)
if job:
@ -188,43 +188,25 @@ class Task(BASE_TASK, ORMMixin):
return pool_tasks
def mark_stopping(self) -> None:
if self.state in [TaskState.stopped, TaskState.stopping]:
if self.state in TaskState.shutting_down():
logging.debug(
"ignoring post-task stop calls to stop %s:%s", self.job_id, self.task_id
)
return
self.set_state(TaskState.stopping, send=False)
send_event(
EventTaskStopped(
job_id=self.job_id,
task_id=self.task_id,
user_info=self.user_info,
config=self.config,
)
)
self.set_state(TaskState.stopping)
def mark_failed(
self, error: Error, tasks_in_job: Optional[List["Task"]] = None
) -> None:
if self.state in [TaskState.stopped, TaskState.stopping]:
if self.state in TaskState.shutting_down():
logging.debug(
"ignoring post-task stop failures for %s:%s", self.job_id, self.task_id
)
return
self.error = error
self.set_state(TaskState.stopping, send=False)
send_event(
EventTaskFailed(
job_id=self.job_id,
task_id=self.task_id,
error=error,
user_info=self.user_info,
config=self.config,
)
)
self.set_state(TaskState.stopping)
self.mark_dependants_failed(tasks_in_job=tasks_in_job)
@ -320,7 +302,7 @@ class Task(BASE_TASK, ORMMixin):
def key_fields(cls) -> Tuple[str, str]:
return ("job_id", "task_id")
def set_state(self, state: TaskState, send: bool = True) -> None:
def set_state(self, state: TaskState) -> None:
if self.state == state:
return
@ -330,6 +312,27 @@ class Task(BASE_TASK, ORMMixin):
self.save()
if self.state == TaskState.stopped:
if self.error:
send_event(
EventTaskFailed(
job_id=self.job_id,
task_id=self.task_id,
error=self.error,
user_info=self.user_info,
config=self.config,
)
)
else:
send_event(
EventTaskStopped(
job_id=self.job_id,
task_id=self.task_id,
user_info=self.user_info,
config=self.config,
)
)
else:
send_event(
EventTaskStateUpdated(
job_id=self.job_id,