* bug fix

* rename anyNotStoppedJobs  to anyNotStoppedTasks

Co-authored-by: stas <statis@microsoft.com>
This commit is contained in:
Stas 2022-09-14 10:07:15 -07:00 committed by GitHub
parent ca7b6be43b
commit 2fe73ab79c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 24 deletions

View File

@ -200,7 +200,10 @@ public class AgentEvents {
MachineId: machineId,
TaskId: running.TaskId,
State: NodeTaskState.Running);
await _context.NodeTasksOperations.Replace(nodeTask);
var r = await _context.NodeTasksOperations.Replace(nodeTask);
if (!r.IsOk) {
_log.Error($"failed to replace node task {nodeTask.TaskId} due to {r.ErrorV}");
}
if (task.State.ShuttingDown()) {
_log.Info($"ignoring task start from node. machine_id:{machineId} job_id:{task.JobId} task_id:{task.TaskId} (state: {task.State})");
@ -214,7 +217,10 @@ public class AgentEvents {
TaskId: task.TaskId,
MachineId: machineId,
EventData: new WorkerEvent(Running: running));
await _context.TaskEventOperations.Replace(taskEvent);
r = await _context.TaskEventOperations.Replace(taskEvent);
if (!r.IsOk) {
_log.Error($"failed to replace taskEvent for task with id {taskEvent.TaskId} due to {r.ErrorV}");
}
return null;
}

View File

@ -7,10 +7,12 @@ namespace Microsoft.OneFuzz.Service.Functions;
public class Jobs {
private readonly IOnefuzzContext _context;
private readonly IEndpointAuthorization _auth;
private readonly ILogTracer _logTracer;
public Jobs(IEndpointAuthorization auth, IOnefuzzContext context) {
public Jobs(IEndpointAuthorization auth, IOnefuzzContext context, ILogTracer logTracer) {
_context = context;
_auth = auth;
_logTracer = logTracer;
}
[Function("Jobs")]
@ -89,7 +91,10 @@ public class Jobs {
if (job.State != JobState.Stopped && job.State != JobState.Stopping) {
job = job with { State = JobState.Stopping };
await _context.JobOperations.Replace(job);
var r = await _context.JobOperations.Replace(job);
if (!r.IsOk) {
_logTracer.Error($"Failed to replace job {job.JobId} due to {r.ErrorV}");
}
}
return await RequestHandling.Ok(req, JobResponse.ForJob(job));

View File

@ -47,9 +47,16 @@ public class JobOperations : StatefulOrm<Job, JobState, JobOperations>, IJobOper
}
public async Async.Task StopIfAllDone(Job job) {
var anyNotStoppedJobs = await _context.TaskOperations.GetByJobId(job.JobId).AnyAsync(task => task.State != TaskState.Stopped);
if (anyNotStoppedJobs) {
var jobs = _context.TaskOperations.GetByJobId(job.JobId);
if (!await jobs.AnyAsync()) {
_logTracer.Warning($"StopIfAllDone could not find any tasks for job with id {job.JobId}");
}
var anyNotStoppedTasks = await jobs.AnyAsync(task => task.State != TaskState.Stopped);
if (anyNotStoppedTasks) {
return;
}

View File

@ -53,11 +53,11 @@ public class PoolOperations : StatefulOrm<Pool, PoolState, PoolOperations>, IPoo
var result = await pools.ToListAsync();
if (result.Count == 0) {
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, "unable to find pool");
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, $"unable to find pool with name {poolName.String}");
}
if (result.Count != 1) {
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, "error identifying pool");
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, $"error identifying pool with name {poolName.String}");
}
return OneFuzzResult<Pool>.Ok(result.Single());
@ -68,11 +68,11 @@ public class PoolOperations : StatefulOrm<Pool, PoolState, PoolOperations>, IPoo
var result = await pools.ToListAsync();
if (result.Count == 0) {
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, "unable to find pool");
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, $"unable to find pool with id {poolId}");
}
if (result.Count != 1) {
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, "error identifying pool");
return OneFuzzResult<Pool>.Error(ErrorCode.INVALID_REQUEST, $"error identifying pool with id {poolId}");
}
return OneFuzzResult<Pool>.Ok(result.Single());

View File

@ -55,6 +55,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
}
public IAsyncEnumerable<Task> GetByJobId(Guid jobId) {
return QueryAsync(filter: $"PartitionKey eq '{jobId}'");
}
@ -124,7 +125,6 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
_logTracer.Error($"task failed {task.JobId}:{task.TaskId} - {error}");
task = await SetState(task with { Error = error }, TaskState.Stopping);
//self.set_state(TaskState.stopping)
await MarkDependantsFailed(task, taskInJob);
}
@ -151,7 +151,11 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
task = task with { State = state };
}
await this.Replace(task);
var r = await Replace(task);
if (!r.IsOk) {
_logTracer.Error($"Failed to replace task with jobid: {task.JobId} and taskid: {task.TaskId} due to {r.ErrorV}");
}
var _events = _context.Events;
if (task.State == TaskState.Stopped) {
if (task.Error != null) {
@ -221,9 +225,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
if (job != null) {
await jobOperations.OnStart(job);
}
}
return task;
}
@ -280,7 +282,9 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
if (task.Config.Pool is TaskPool p) {
var pool = await _context.PoolOperations.GetByName(p.PoolName);
if (!pool.IsOk) {
_logTracer.Info($"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}");
_logTracer.Info(
$"unable to schedule task to pool [{task.Config.Pool.PoolName}]: {task.TaskId} - {pool.ErrorV}"
);
return null;
}
@ -292,7 +296,9 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
await foreach (var scaleset in scalesets) {
var pool = await _context.PoolOperations.GetByName(scaleset.PoolName);
if (!pool.IsOk) {
_logTracer.Info($"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}");
_logTracer.Info(
$"unable to schedule task to pool [{scaleset.PoolName}]: {task.TaskId} - {pool.ErrorV}"
);
return null;
}

View File

@ -35,7 +35,7 @@ public abstract class JobsTestBase : FunctionTestBase {
[InlineData("DELETE")]
public async Async.Task Access_WithoutAuthorization_IsRejected(string method) {
var auth = new TestEndpointAuthorization(RequestType.NoAuthorization, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var result = await func.Run(TestHttpRequestData.Empty(method));
Assert.Equal(HttpStatusCode.Unauthorized, result.StatusCode);
@ -47,7 +47,7 @@ public abstract class JobsTestBase : FunctionTestBase {
[Fact]
public async Async.Task Delete_NonExistentJob_Fails() {
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var result = await func.Run(TestHttpRequestData.FromJson("DELETE", new JobGet(_jobId)));
Assert.Equal(HttpStatusCode.BadRequest, result.StatusCode);
@ -62,7 +62,7 @@ public abstract class JobsTestBase : FunctionTestBase {
new Job(_jobId, JobState.Enabled, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var result = await func.Run(TestHttpRequestData.FromJson("DELETE", new JobGet(_jobId)));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
@ -81,7 +81,7 @@ public abstract class JobsTestBase : FunctionTestBase {
new Job(_jobId, JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var result = await func.Run(TestHttpRequestData.FromJson("DELETE", new JobGet(_jobId)));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
@ -101,7 +101,7 @@ public abstract class JobsTestBase : FunctionTestBase {
new Job(_jobId, JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var result = await func.Run(TestHttpRequestData.FromJson("GET", new JobSearch(JobId: _jobId)));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
@ -120,7 +120,7 @@ public abstract class JobsTestBase : FunctionTestBase {
new Job(Guid.NewGuid(), JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var req = new JobSearch(State: new List<JobState> { JobState.Enabled });
var result = await func.Run(TestHttpRequestData.FromJson("GET", req));
@ -139,7 +139,7 @@ public abstract class JobsTestBase : FunctionTestBase {
new Job(Guid.NewGuid(), JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
var req = new JobSearch(State: new List<JobState> { JobState.Enabled, JobState.Stopping });
var result = await func.Run(TestHttpRequestData.FromJson("GET", req));
@ -154,7 +154,7 @@ public abstract class JobsTestBase : FunctionTestBase {
[Fact]
public async Async.Task Post_CreatesJob_AndContainer() {
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var func = new Jobs(auth, Context, Logger);
// need user credentials to put into the job object
var userInfo = new UserInfo(Guid.NewGuid(), Guid.NewGuid(), "upn");