breaking the circular dependency between JobOperations and TaskOperations (#1848)

This commit is contained in:
Cheick Keita
2022-04-26 12:07:57 -07:00
committed by GitHub
parent 5f8e381fcb
commit af7d815e4f
5 changed files with 11 additions and 13 deletions

View File

@ -78,6 +78,8 @@ public class Program {
.AddScoped<IVmOperations, VmOperations>() .AddScoped<IVmOperations, VmOperations>()
.AddScoped<ISecretsOperations, SecretsOperations>() .AddScoped<ISecretsOperations, SecretsOperations>()
.AddScoped<IJobOperations, JobOperations>() .AddScoped<IJobOperations, JobOperations>()
.AddScoped<IScheduler, Scheduler>()
.AddScoped<IConfig, Config>()
//Move out expensive resources into separate class, and add those as Singleton //Move out expensive resources into separate class, and add those as Singleton
// ArmClient, Table Client(s), Queue Client(s), HttpClient, etc. // ArmClient, Table Client(s), Queue Client(s), HttpClient, etc.

View File

@ -34,7 +34,7 @@ public class TimerTasks {
await foreach (var job in expiredJobs) { await foreach (var job in expiredJobs) {
_logger.Info($"stopping expired job. job_id:{job.JobId }"); _logger.Info($"stopping expired job. job_id:{job.JobId }");
await _jobOperations.Stopping(job); await _jobOperations.Stopping(job, _taskOperations);
} }
var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork); var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);

View File

@ -6,17 +6,15 @@ public interface IJobOperations : IStatefulOrm<Job, JobState> {
System.Threading.Tasks.Task<Job?> Get(Guid jobId); System.Threading.Tasks.Task<Job?> Get(Guid jobId);
System.Threading.Tasks.Task OnStart(Job job); System.Threading.Tasks.Task OnStart(Job job);
IAsyncEnumerable<Job> SearchExpired(); IAsyncEnumerable<Job> SearchExpired();
System.Threading.Tasks.Task Stopping(Job job); System.Threading.Tasks.Task Stopping(Job job, ITaskOperations taskOperations);
IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states); IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states);
System.Threading.Tasks.Task StopNeverStartedJobs(); System.Threading.Tasks.Task StopNeverStartedJobs();
} }
public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations { public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
private readonly ITaskOperations _taskOperations;
private readonly IEvents _events; private readonly IEvents _events;
public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, ITaskOperations taskOperations, IEvents events) : base(storage, logTracer, config) { public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, IEvents events) : base(storage, logTracer, config) {
_taskOperations = taskOperations;
_events = events; _events = events;
} }
@ -31,7 +29,7 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
} }
public IAsyncEnumerable<Job> SearchExpired() { public IAsyncEnumerable<Job> SearchExpired() {
return QueryAsync(filter: $"end_time lt datetime'{DateTimeOffset.UtcNow}'"); return QueryAsync(filter: $"end_time lt datetime'{DateTimeOffset.UtcNow.ToString("o")}'");
} }
public IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states) { public IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states) {
@ -46,9 +44,9 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
throw new NotImplementedException(); throw new NotImplementedException();
} }
public async System.Threading.Tasks.Task Stopping(Job job) { public async System.Threading.Tasks.Task Stopping(Job job, ITaskOperations taskOperations) {
job = job with { State = JobState.Stopping }; job = job with { State = JobState.Stopping };
var tasks = await _taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync(); var tasks = await taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped); var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
var notStopped = taskNotStopped[true]; var notStopped = taskNotStopped[true];
@ -56,7 +54,7 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
if (notStopped.Any()) { if (notStopped.Any()) {
foreach (var task in notStopped) { foreach (var task in notStopped) {
await _taskOperations.MarkStopping(task); await taskOperations.MarkStopping(task);
} }
} else { } else {
job = job with { State = JobState.Stopped }; job = job with { State = JobState.Stopped };

View File

@ -9,12 +9,10 @@ public interface IPoolOperations {
public class PoolOperations : StatefulOrm<Pool, PoolState>, IPoolOperations { public class PoolOperations : StatefulOrm<Pool, PoolState>, IPoolOperations {
private IConfigOperations _configOperations; private IConfigOperations _configOperations;
private ITaskOperations _taskOperations;
public PoolOperations(IStorage storage, ILogTracer log, IServiceConfig config, IConfigOperations configOperations, ITaskOperations taskOperations) public PoolOperations(IStorage storage, ILogTracer log, IServiceConfig config, IConfigOperations configOperations)
: base(storage, log, config) { : base(storage, log, config) {
_configOperations = configOperations; _configOperations = configOperations;
_taskOperations = taskOperations;
} }
public async Async.Task<Result<Pool, Error>> GetByName(string poolName) { public async Async.Task<Result<Pool, Error>> GetByName(string poolName) {

View File

@ -67,7 +67,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
public IAsyncEnumerable<Task> SearchExpired() { public IAsyncEnumerable<Task> SearchExpired() {
var timeFilter = $"end_time lt datetime'{DateTimeOffset.UtcNow.ToString("o") }'"; var timeFilter = $"end_time lt Datetime'{DateTimeOffset.UtcNow.ToString("o") }'";
return QueryAsync(filter: timeFilter); return QueryAsync(filter: timeFilter);
} }