Bug fixes in the TimerTasks function (#1961)

* Bug fixes in the TimerTasks function

* formatting
This commit is contained in:
Cheick Keita
2022-05-19 11:26:08 -07:00
committed by GitHub
parent b63de9c73d
commit e492886a85
4 changed files with 29 additions and 8 deletions

View File

@ -63,6 +63,7 @@ public abstract record BaseEvent() {
EventNodeStateUpdated _ => EventType.NodeStateUpdated,
EventNodeDeleted _ => EventType.NodeDeleted,
EventNodeCreated _ => EventType.NodeCreated,
EventJobStopped _ => EventType.JobStopped,
_ => throw new NotImplementedException(),
};
@ -90,6 +91,7 @@ public abstract record BaseEvent() {
EventType.ScalesetStateUpdated => typeof(EventScalesetStateUpdated),
EventType.NodeDeleted => typeof(EventNodeDeleted),
EventType.NodeCreated => typeof(EventNodeCreated),
EventType.JobStopped => typeof(EventJobStopped),
_ => throw new ArgumentException($"invalid input {eventType}"),
};

View File

@ -580,7 +580,7 @@ public record JobTaskInfo(
);
public record Job(
[PartitionKey] Guid JobId,
[PartitionKey][RowKey] Guid JobId,
JobState State,
JobConfig Config,
string? Error,

View File

@ -34,7 +34,7 @@ public class TimerTasks {
await foreach (var job in expiredJobs) {
_logger.Info($"stopping expired job. job_id:{job.JobId}");
await _jobOperations.Stopping(job, _taskOperations);
await _jobOperations.Stopping(job);
}
var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);

View File

@ -6,12 +6,13 @@ public interface IJobOperations : IStatefulOrm<Job, JobState> {
Async.Task<Job?> Get(Guid jobId);
Async.Task OnStart(Job job);
IAsyncEnumerable<Job> SearchExpired();
Async.Task Stopping(Job job, ITaskOperations taskOperations);
Async.Task Stopping(Job job);
IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states);
Async.Task StopNeverStartedJobs();
}
public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
private static TimeSpan JOB_NEVER_STARTED_DURATION = TimeSpan.FromDays(30);
public JobOperations(ILogTracer logTracer, IOnefuzzContext context) : base(logTracer, context) {
}
@ -35,13 +36,31 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
return QueryAsync(filter: query);
}
public Async.Task StopNeverStartedJobs() {
throw new NotImplementedException();
public async Async.Task StopNeverStartedJobs() {
// # Note, the "not(end_time...)" with end_time set long before the use of
// # OneFuzz enables identifying those without end_time being set.
var lastTimeStamp = (DateTimeOffset.UtcNow - JOB_NEVER_STARTED_DURATION).ToString("o");
var filter = Query.And(new[] {
$"Timestamp lt datetime'{lastTimeStamp}' and not(end_time ge datetime'2000-01-11T00:00:00.0Z')",
Query.EqualAnyEnum("state", new[] {JobState.Enabled})
});
var jobs = this.QueryAsync(filter);
await foreach (var job in jobs) {
await foreach (var task in _context.TaskOperations.QueryAsync($"PartitionKey eq '{job.JobId}'")) {
await _context.TaskOperations.MarkFailed(task, new Error(ErrorCode.TASK_FAILED, new[] { "job never not start" }));
}
_logTracer.Info($"stopping job that never started: {job.JobId}");
await _context.JobOperations.Stopping(job);
}
}
public async Async.Task Stopping(Job job, ITaskOperations taskOperations) {
public async Async.Task Stopping(Job job) {
job = job with { State = JobState.Stopping };
var tasks = await taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
var tasks = await _context.TaskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
var notStopped = taskNotStopped[true];
@ -49,7 +68,7 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
if (notStopped.Any()) {
foreach (var task in notStopped) {
await taskOperations.MarkStopping(task);
await _context.TaskOperations.MarkStopping(task);
}
} else {
job = job with { State = JobState.Stopped };