Implement Jobs function for C# (#2157)

#2156 - convert the `jobs` function from Python to C#
This commit is contained in:
George Pollard
2022-07-25 15:17:46 +12:00
committed by GitHub
parent 23a7dae606
commit bae35dec97
10 changed files with 354 additions and 16 deletions

View File

@ -0,0 +1,122 @@
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
namespace Microsoft.OneFuzz.Service.Functions;
public class Jobs {
private readonly IOnefuzzContext _context;
private readonly IEndpointAuthorization _auth;
public Jobs(IEndpointAuthorization auth, IOnefuzzContext context) {
_context = context;
_auth = auth;
}
[Function("Jobs")]
public Async.Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymous, "GET", "POST", "DELETE")] HttpRequestData req)
=> _auth.CallIfUser(req, r => r.Method switch {
"GET" => Get(r),
"DELETE" => Delete(r),
"POST" => Post(r),
var m => throw new NotSupportedException($"Unsupported HTTP method {m}"),
});
private async Task<HttpResponseData> Post(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<JobConfig>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "jobs create");
}
var userInfo = await _context.UserCredentials.ParseJwtToken(req);
if (!userInfo.IsOk) {
return await _context.RequestHandling.NotOk(req, userInfo.ErrorV, "jobs create");
}
var job = new Job(
JobId: Guid.NewGuid(),
State: JobState.Init,
Config: request.OkV) {
UserInfo = userInfo.OkV,
};
await _context.JobOperations.Insert(job);
// create the job logs container
var metadata = new Dictionary<string, string>{
{ "container_type", "logs" }, // TODO: use ContainerType.Logs enum somehow; needs snake case name
};
var containerName = new Container($"logs-{job.JobId}");
var containerSas = await _context.Containers.CreateContainer(containerName, StorageType.Corpus, metadata);
if (containerSas is null) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE_CONTAINER,
Errors: new string[] { "unable to create logs container " }),
"logs");
}
// log container must not have the SAS included
var logContainerUri = new UriBuilder(containerSas) { Query = "" }.Uri;
job = job with { Config = job.Config with { Logs = logContainerUri.ToString() } };
await _context.JobOperations.Update(job);
return await RequestHandling.Ok(req, JobResponse.ForJob(job));
}
private async Task<HttpResponseData> Delete(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<JobGet>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "jobs delete");
}
var jobId = request.OkV.JobId;
var job = await _context.JobOperations.Get(jobId);
if (job is null) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.INVALID_JOB,
Errors: new string[] { "no such job" }),
context: jobId.ToString());
}
if (job.State != JobState.Stopped && job.State != JobState.Stopping) {
job = job with { State = JobState.Stopping };
await _context.JobOperations.Replace(job);
}
return await RequestHandling.Ok(req, JobResponse.ForJob(job));
}
private async Task<HttpResponseData> Get(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<JobSearch>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "jobs");
}
var search = request.OkV;
if (search.JobId is Guid jobId) {
var job = await _context.JobOperations.Get(jobId);
if (job is null) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.INVALID_JOB,
Errors: new string[] { "no such job" }),
context: jobId.ToString());
}
static JobTaskInfo TaskToJobTaskInfo(Task t) => new(t.TaskId, t.Config.Task.Type, t.State);
// TODO: search.WithTasks is not checked in Python code?
var taskInfo = await _context.TaskOperations.SearchStates(jobId).Select(TaskToJobTaskInfo).ToListAsync();
job = job with { TaskInfo = taskInfo };
return await RequestHandling.Ok(req, JobResponse.ForJob(job));
}
var jobs = await _context.JobOperations.SearchState(states: search.State ?? Enumerable.Empty<JobState>()).ToListAsync();
return await RequestHandling.Ok(req, jobs.Select(j => JobResponse.ForJob(j)));
}
}

View File

@ -28,6 +28,7 @@ public enum ErrorCode {
UNABLE_TO_UPDATE = 471, UNABLE_TO_UPDATE = 471,
PROXY_FAILED = 472, PROXY_FAILED = 472,
INVALID_CONFIGURATION = 473, INVALID_CONFIGURATION = 473,
UNABLE_TO_CREATE_CONTAINER = 474,
} }
public enum VmState { public enum VmState {

View File

@ -592,8 +592,8 @@ public record Job(
[PartitionKey][RowKey] Guid JobId, [PartitionKey][RowKey] Guid JobId,
JobState State, JobState State,
JobConfig Config, JobConfig Config,
string? Error, string? Error = null,
DateTimeOffset? EndTime DateTimeOffset? EndTime = null
) : StatefulEntityBase<JobState>(State) { ) : StatefulEntityBase<JobState>(State) {
public List<JobTaskInfo>? TaskInfo { get; set; } public List<JobTaskInfo>? TaskInfo { get; set; }
public UserInfo? UserInfo { get; set; } public UserInfo? UserInfo { get; set; }

View File

@ -112,3 +112,14 @@ public record ContainerDelete(
Container Name, Container Name,
IDictionary<string, string>? Metadata = null IDictionary<string, string>? Metadata = null
) : BaseRequest; ) : BaseRequest;
public record JobGet(
Guid JobId
);
public record JobSearch(
Guid? JobId = null,
List<JobState>? State = null,
List<TaskState>? TaskState = null,
bool? WithTasks = null
);

View File

@ -71,6 +71,26 @@ public record ContainerInfo(
Uri SasUrl Uri SasUrl
) : BaseResponse(); ) : BaseResponse();
public record JobResponse(
Guid JobId,
JobState State,
JobConfig Config,
string? Error,
DateTimeOffset? EndTime,
List<JobTaskInfo>? TaskInfo
// not including UserInfo from Job model
) : BaseResponse() {
public static JobResponse ForJob(Job j)
=> new(
JobId: j.JobId,
State: j.State,
Config: j.Config,
Error: j.Error,
EndTime: j.EndTime,
TaskInfo: j.TaskInfo
);
}
public class BaseResponseConverter : JsonConverter<BaseResponse> { public class BaseResponseConverter : JsonConverter<BaseResponse> {
public override BaseResponse? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { public override BaseResponse? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) {
return null; return null;

View File

@ -33,6 +33,10 @@ namespace Microsoft.OneFuzz.Service {
public T_Ok? OkV { get; } public T_Ok? OkV { get; }
} }
public static class OneFuzzResult {
public static OneFuzzResult<T> Ok<T>(T val) => OneFuzzResult<T>.Ok(val);
}
public struct OneFuzzResult<T_Ok> { public struct OneFuzzResult<T_Ok> {
static Error NoError = new(0); static Error NoError = new(0);

View File

@ -22,8 +22,7 @@ public class UserCredentials : IUserCredentials {
} }
public string? GetBearerToken(HttpRequestData req) { public string? GetBearerToken(HttpRequestData req) {
var authHeader = req.Headers.GetValues("Authorization"); if (!req.Headers.TryGetValues("Authorization", out var authHeader) || authHeader.IsNullOrEmpty()) {
if (authHeader.IsNullOrEmpty()) {
return null; return null;
} else { } else {
var auth = AuthenticationHeaderValue.Parse(authHeader.First()); var auth = AuthenticationHeaderValue.Parse(authHeader.First());
@ -39,8 +38,7 @@ public class UserCredentials : IUserCredentials {
if (token is not null) { if (token is not null) {
return token; return token;
} else { } else {
var tokenHeader = req.Headers.GetValues("x-ms-token-aad-id-token"); if (!req.Headers.TryGetValues("x-ms-token-aad-id-token", out var tokenHeader) || tokenHeader.IsNullOrEmpty()) {
if (tokenHeader.IsNullOrEmpty()) {
return null; return null;
} else { } else {
return tokenHeader.First(); return tokenHeader.First();

View File

@ -9,10 +9,11 @@ namespace ApiService.OneFuzzLib.Orm {
public interface IOrm<T> where T : EntityBase { public interface IOrm<T> where T : EntityBase {
Task<TableClient> GetTableClient(string table, string? accountId = null); Task<TableClient> GetTableClient(string table, string? accountId = null);
IAsyncEnumerable<T> QueryAsync(string? filter = null); IAsyncEnumerable<T> QueryAsync(string? filter = null);
Task<ResultVoid<(int, string)>> Replace(T entity);
Task<T> GetEntityAsync(string partitionKey, string rowKey); Task<T> GetEntityAsync(string partitionKey, string rowKey);
Task<ResultVoid<(int, string)>> Insert(T entity); Task<ResultVoid<(int, string)>> Insert(T entity);
Task<ResultVoid<(int, string)>> Replace(T entity);
Task<ResultVoid<(int, string)>> Update(T entity);
Task<ResultVoid<(int, string)>> Delete(T entity); Task<ResultVoid<(int, string)>> Delete(T entity);
IAsyncEnumerable<T> SearchAll(); IAsyncEnumerable<T> SearchAll();
@ -48,6 +49,8 @@ namespace ApiService.OneFuzzLib.Orm {
} }
} }
/// Inserts the entity into table storage.
/// If successful, updates the ETag of the passed-in entity.
public async Task<ResultVoid<(int, string)>> Insert(T entity) { public async Task<ResultVoid<(int, string)>> Insert(T entity) {
var tableClient = await GetTableClient(typeof(T).Name); var tableClient = await GetTableClient(typeof(T).Name);
var tableEntity = _entityConverter.ToTableEntity(entity); var tableEntity = _entityConverter.ToTableEntity(entity);
@ -56,6 +59,9 @@ namespace ApiService.OneFuzzLib.Orm {
if (response.IsError) { if (response.IsError) {
return ResultVoid<(int, string)>.Error((response.Status, response.ReasonPhrase)); return ResultVoid<(int, string)>.Error((response.Status, response.ReasonPhrase));
} else { } else {
// update ETag
entity.ETag = response.Headers.ETag;
return ResultVoid<(int, string)>.Ok(); return ResultVoid<(int, string)>.Ok();
} }
} }
@ -72,18 +78,18 @@ namespace ApiService.OneFuzzLib.Orm {
} }
public async Task<ResultVoid<(int, string)>> Update(T entity) { public async Task<ResultVoid<(int, string)>> Update(T entity) {
if (entity.ETag is null) {
throw new ArgumentException("ETag must be set when updating an entity", nameof(entity));
}
var tableClient = await GetTableClient(typeof(T).Name); var tableClient = await GetTableClient(typeof(T).Name);
var tableEntity = _entityConverter.ToTableEntity(entity); var tableEntity = _entityConverter.ToTableEntity(entity);
if (entity.ETag is null) { var response = await tableClient.UpdateEntityAsync(tableEntity, entity.ETag.Value);
return ResultVoid<(int, string)>.Error((0, "ETag must be set when updating an entity")); if (response.IsError) {
return ResultVoid<(int, string)>.Error((response.Status, response.ReasonPhrase));
} else { } else {
var response = await tableClient.UpdateEntityAsync(tableEntity, entity.ETag.Value); return ResultVoid<(int, string)>.Ok();
if (response.IsError) {
return ResultVoid<(int, string)>.Error((response.Status, response.ReasonPhrase));
} else {
return ResultVoid<(int, string)>.Ok();
}
} }
} }

View File

@ -58,7 +58,7 @@ sealed class TestHttpRequestData : HttpRequestData {
public override Stream Body => _body.ToStream(); public override Stream Body => _body.ToStream();
public override HttpHeadersCollection Headers => throw new NotImplementedException(); public override HttpHeadersCollection Headers { get; } = new HttpHeadersCollection();
public override IReadOnlyCollection<IHttpCookie> Cookies => throw new NotImplementedException(); public override IReadOnlyCollection<IHttpCookie> Cookies => throw new NotImplementedException();

View File

@ -0,0 +1,176 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using IntegrationTests.Fakes;
using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service.Functions;
using Xunit;
using Xunit.Abstractions;
using Async = System.Threading.Tasks;
namespace IntegrationTests;
[Trait("Category", "Live")]
public class AzureStorageJobsTest : JobsTestBase {
public AzureStorageJobsTest(ITestOutputHelper output)
: base(output, Integration.AzureStorage.FromEnvironment()) { }
}
public class AzuriteJobsTest : JobsTestBase {
public AzuriteJobsTest(ITestOutputHelper output)
: base(output, new Integration.AzuriteStorage()) { }
}
public abstract class JobsTestBase : FunctionTestBase {
public JobsTestBase(ITestOutputHelper output, IStorage storage)
: base(output, storage) { }
private readonly Guid _jobId = Guid.NewGuid();
private readonly JobConfig _config = new("project", "name", "build", 1000, null);
[Theory]
[InlineData("POST")]
[InlineData("GET")]
[InlineData("DELETE")]
public async Async.Task Access_WithoutAuthorization_IsRejected(string method) {
var auth = new TestEndpointAuthorization(RequestType.NoAuthorization, Logger, Context);
var func = new Jobs(auth, Context);
var result = await func.Run(TestHttpRequestData.Empty(method));
Assert.Equal(HttpStatusCode.Unauthorized, result.StatusCode);
var err = BodyAs<Error>(result);
Assert.Equal(ErrorCode.UNAUTHORIZED, err.Code);
}
[Fact]
public async Async.Task Delete_NonExistentJob_Fails() {
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var result = await func.Run(TestHttpRequestData.FromJson("DELETE", new JobGet(_jobId)));
Assert.Equal(HttpStatusCode.BadRequest, result.StatusCode);
var err = BodyAs<Error>(result);
Assert.Equal(ErrorCode.INVALID_JOB, err.Code);
}
[Fact]
public async Async.Task Delete_ExistingJob_SetsStoppingState() {
await Context.InsertAll(
new Job(_jobId, JobState.Enabled, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var result = await func.Run(TestHttpRequestData.FromJson("DELETE", new JobGet(_jobId)));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
var response = BodyAs<JobResponse>(result);
Assert.Equal(_jobId, response.JobId);
Assert.Equal(JobState.Stopping, response.State);
var job = await Context.JobOperations.Get(_jobId);
Assert.Equal(JobState.Stopping, job?.State);
}
[Fact]
public async Async.Task Delete_ExistingStoppedJob_DoesNotSetStoppingState() {
await Context.InsertAll(
new Job(_jobId, JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var result = await func.Run(TestHttpRequestData.FromJson("DELETE", new JobGet(_jobId)));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
var response = BodyAs<JobResponse>(result);
Assert.Equal(_jobId, response.JobId);
Assert.Equal(JobState.Stopped, response.State);
var job = await Context.JobOperations.Get(_jobId);
Assert.Equal(JobState.Stopped, job?.State);
}
[Fact]
public async Async.Task Get_CanFindSpecificJob() {
await Context.InsertAll(
new Job(_jobId, JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var result = await func.Run(TestHttpRequestData.FromJson("GET", new JobSearch(JobId: _jobId)));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
var response = BodyAs<JobResponse>(result);
Assert.Equal(_jobId, response.JobId);
Assert.Equal(JobState.Stopped, response.State);
}
[Fact]
public async Async.Task Get_CanFindJobsInState() {
await Context.InsertAll(
new Job(Guid.NewGuid(), JobState.Init, _config),
new Job(Guid.NewGuid(), JobState.Stopping, _config),
new Job(Guid.NewGuid(), JobState.Enabled, _config),
new Job(Guid.NewGuid(), JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var req = new JobSearch(State: new List<JobState> { JobState.Enabled });
var result = await func.Run(TestHttpRequestData.FromJson("GET", req));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
var response = BodyAs<JobResponse>(result);
Assert.Equal(JobState.Enabled, response.State);
}
[Fact]
public async Async.Task Get_CanFindMultipleJobsInState() {
await Context.InsertAll(
new Job(Guid.NewGuid(), JobState.Init, _config),
new Job(Guid.NewGuid(), JobState.Stopping, _config),
new Job(Guid.NewGuid(), JobState.Enabled, _config),
new Job(Guid.NewGuid(), JobState.Stopped, _config));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
var req = new JobSearch(State: new List<JobState> { JobState.Enabled, JobState.Stopping });
var result = await func.Run(TestHttpRequestData.FromJson("GET", req));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
var response = BodyAs<JobResponse[]>(result);
Assert.Equal(2, response.Length);
Assert.Contains(response, j => j.State == JobState.Stopping);
Assert.Contains(response, j => j.State == JobState.Enabled);
}
[Fact]
public async Async.Task Post_CreatesJob_AndContainer() {
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Jobs(auth, Context);
// need user credentials to put into the job object
var userInfo = new UserInfo(Guid.NewGuid(), Guid.NewGuid(), "upn");
Context.UserCredentials = new TestUserCredentials(Logger, Context.ConfigOperations, OneFuzzResult.Ok(userInfo));
var result = await func.Run(TestHttpRequestData.FromJson("POST", _config));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
var job = Assert.Single(await Context.JobOperations.SearchAll().ToListAsync());
var response = BodyAs<JobResponse>(result);
Assert.Equal(job.JobId, response.JobId);
Assert.NotNull(job.Config.Logs);
Assert.Empty(new Uri(job.Config.Logs!).Query);
var container = Assert.Single(await Context.Containers.GetContainers(StorageType.Corpus), c => c.Key.Contains(job.JobId.ToString()));
var metadata = Assert.Single(container.Value);
Assert.Equal(new KeyValuePair<string, string>("container_type", "logs"), metadata);
}
}