diff --git a/src/api-service/__app__/onefuzzlib/jobs.py b/src/api-service/__app__/onefuzzlib/jobs.py index 04a8f9b20..db09dbad0 100644 --- a/src/api-service/__app__/onefuzzlib/jobs.py +++ b/src/api-service/__app__/onefuzzlib/jobs.py @@ -7,14 +7,18 @@ import logging from datetime import datetime, timedelta from typing import List, Optional, Tuple -from onefuzztypes.enums import JobState, TaskState +from onefuzztypes.enums import ErrorCode, JobState, TaskState from onefuzztypes.events import EventJobCreated, EventJobStopped +from onefuzztypes.models import Error from onefuzztypes.models import Job as BASE_JOB from .events import send_event from .orm import MappingIntStrAny, ORMMixin, QueryFilter from .tasks.main import Task +JOB_LOG_PREFIX = "jobs: " +JOB_NEVER_STARTED_DURATION: timedelta = timedelta(days=30) + class Job(BASE_JOB, ORMMixin): @classmethod @@ -36,6 +40,36 @@ class Job(BASE_JOB, ORMMixin): query={"state": JobState.available()}, raw_unchecked_filter=time_filter ) + @classmethod + def stop_never_started_jobs(cls) -> None: + # Note, the "not(end_time...)" with end_time set long before the use of + # OneFuzz enables identifying those without end_time being set. + last_timestamp = (datetime.utcnow() - JOB_NEVER_STARTED_DURATION).isoformat() + + time_filter = ( + f"Timestamp lt datetime'{last_timestamp}' and " + "not(end_time ge datetime'2000-01-11T00:00:00.0Z')" + ) + + for job in cls.search( + query={ + "state": [JobState.enabled], + }, + raw_unchecked_filter=time_filter, + ): + for task in Task.search(query={"job_id": [job.job_id]}): + task.mark_failed( + Error( + code=ErrorCode.TASK_FAILED, + errors=["job never not start"], + ) + ) + + logging.info( + JOB_LOG_PREFIX + "stopping job that never started: %s", job.job_id + ) + job.stopping() + def save_exclude(self) -> Optional[MappingIntStrAny]: return {"task_info": ...} @@ -47,13 +81,13 @@ class Job(BASE_JOB, ORMMixin): } def init(self) -> None: - logging.info("init job: %s", self.job_id) + logging.info(JOB_LOG_PREFIX + "init: %s", self.job_id) self.state = JobState.enabled self.save() def stopping(self) -> None: self.state = JobState.stopping - logging.info("stopping job: %s", self.job_id) + logging.info(JOB_LOG_PREFIX + "stopping: %s", self.job_id) not_stopped = [ task for task in Task.search(query={"job_id": [self.job_id]}) diff --git a/src/api-service/__app__/timer_tasks/__init__.py b/src/api-service/__app__/timer_tasks/__init__.py index 97a2f4fd1..92452a2d1 100644 --- a/src/api-service/__app__/timer_tasks/__init__.py +++ b/src/api-service/__app__/timer_tasks/__init__.py @@ -38,6 +38,8 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: schedule_tasks() + Job.stop_never_started_jobs() + events = get_events() if events: dashboard.set(events)