Migrate timer_task (#1846)

This commit is contained in:
Cheick Keita
2022-04-26 09:30:14 -07:00
committed by GitHub
parent c71cdb6d72
commit 8003b1d3e0
9 changed files with 483 additions and 65 deletions

View File

@ -255,8 +255,43 @@ public static class TaskStateHelper
};
});
}
internal static TaskState[] NeedsWork()
{
return
_states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k =>
new[]{
TaskState.Init,
TaskState.Stopping
}
);
}
public static TaskState[] ShuttingDown()
{
return
_states.GetOrAdd(nameof(TaskStateHelper.ShuttingDown), k =>
new[]{
TaskState.Stopping,
TaskState.Stopping,
}
);
}
internal static TaskState[] HasStarted()
{
return
_states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k =>
new[]{
TaskState.Running,
TaskState.Stopping,
TaskState.Stopped
}
);
}
}
public enum PoolState
{
Init,
@ -272,26 +307,22 @@ public static class PoolStateHelper
{
return
_states.GetOrAdd("NeedsWork", k =>
{
return
new[]{
PoolState.Init,
PoolState.Shutdown,
PoolState.Halt
};
});
}
);
}
public static PoolState[] Available()
{
return
_states.GetOrAdd("Available", k =>
{
return
new[]{
PoolState.Running
};
});
}
);
}
}

View File

@ -57,6 +57,9 @@ public abstract record BaseEvent()
EventCrashReported _ => EventType.CrashReported,
EventRegressionReported _ => EventType.RegressionReported,
EventFileAdded _ => EventType.FileAdded,
EventTaskFailed _ => EventType.TaskFailed,
EventTaskStopped _ => EventType.TaskStopped,
EventTaskStateUpdated _ => EventType.TaskStateUpdated,
_ => throw new NotImplementedException(),
};
@ -76,6 +79,10 @@ public abstract record BaseEvent()
EventType.CrashReported => typeof(EventCrashReported),
EventType.RegressionReported => typeof(EventRegressionReported),
EventType.FileAdded => typeof(EventFileAdded),
EventType.TaskFailed => typeof(EventTaskFailed),
EventType.TaskStopped => typeof(EventTaskStopped),
EventType.TaskStateUpdated => typeof(EventTaskStateUpdated),
_ => throw new ArgumentException($"invalid input {eventType}"),
};
@ -90,21 +97,21 @@ public class EventTypeProvider : ITypeProvider
}
}
//public record EventTaskStopped(
// Guid JobId,
// Guid TaskId,
// UserInfo? UserInfo,
// TaskConfig Config
//) : BaseEvent();
public record EventTaskStopped(
Guid JobId,
Guid TaskId,
UserInfo? UserInfo,
TaskConfig Config
) : BaseEvent();
//record EventTaskFailed(
// Guid JobId,
// Guid TaskId,
// Error Error,
// UserInfo? UserInfo,
// TaskConfig Config
// ) : BaseEvent();
record EventTaskFailed(
Guid JobId,
Guid TaskId,
Error Error,
UserInfo? UserInfo,
TaskConfig Config
) : BaseEvent();
//record EventJobCreated(
@ -114,18 +121,19 @@ public class EventTypeProvider : ITypeProvider
// ) : BaseEvent();
//record JobTaskStopped(
// Guid TaskId,
// TaskType TaskType,
// Error? Error
// ) : BaseEvent();
record JobTaskStopped(
Guid TaskId,
TaskType TaskType,
Error? Error
) : BaseEvent();
//record EventJobStopped(
// Guid JobId: UUId,
// JobConfig Config,
// UserInfo? UserInfo,
// List<JobTaskStopped> TaskInfo
//): BaseEvent();
record EventJobStopped(
Guid JobId,
JobConfig Config,
UserInfo? UserInfo,
List<JobTaskStopped> TaskInfo
) : BaseEvent();
//record EventTaskCreated(
@ -136,13 +144,13 @@ public class EventTypeProvider : ITypeProvider
// ) : BaseEvent();
//record EventTaskStateUpdated(
// Guid JobId,
// Guid TaskId,
// TaskState State,
// DateTimeOffset? EndTime,
// TaskConfig Config
// ) : BaseEvent();
record EventTaskStateUpdated(
Guid JobId,
Guid TaskId,
TaskState State,
DateTimeOffset? EndTime,
TaskConfig Config
) : BaseEvent();
public record EventTaskHeartbeat(

View File

@ -1,4 +1,4 @@
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using System.Text.Json.Serialization;
using Region = System.String;
using PoolName = System.String;
@ -600,7 +600,9 @@ public record Job(
JobState State,
JobConfig Config,
string? Error,
DateTimeOffset? EndTime,
List<JobTaskInfo>? TaskInfo,
UserInfo UserInfo
) : StatefulEntityBase<JobState>(State);
DateTimeOffset? EndTime
) : StatefulEntityBase<JobState>(State)
{
public List<JobTaskInfo>? TaskInfo { get; set; }
public UserInfo? UserInfo { get; set; }
}

View File

@ -1,4 +1,4 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker;
namespace Microsoft.OneFuzz.Service;
@ -30,7 +30,7 @@ public partial class TimerProxy
_subnet = subnet;
}
//[Function("TimerDaily")]
//[Function("TimerProxy")]
public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer)
{
var proxies = await _proxYOperations.QueryAsync().ToListAsync();

View File

@ -0,0 +1,66 @@
using Microsoft.Azure.Functions.Worker;
namespace Microsoft.OneFuzz.Service;
public class TimerTasks
{
private readonly ILogTracer _logger;
private readonly ITaskOperations _taskOperations;
private readonly IJobOperations _jobOperations;
private readonly IScheduler _scheduler;
public TimerTasks(ILogTracer logger, ITaskOperations taskOperations, IJobOperations jobOperations, IScheduler scheduler)
{
_logger = logger;
_taskOperations = taskOperations;
_jobOperations = jobOperations;
_scheduler = scheduler;
}
//[Function("TimerTasks")]
public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer)
{
var expriredTasks = _taskOperations.SearchExpired();
await foreach (var task in expriredTasks)
{
_logger.Info($"stopping expired task. job_id:{task.JobId} task_id:{task.TaskId}");
await _taskOperations.MarkStopping(task);
}
var expiredJobs = _jobOperations.SearchExpired();
await foreach (var job in expiredJobs)
{
_logger.Info($"stopping expired job. job_id:{job.JobId }");
await _jobOperations.Stopping(job);
}
var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);
await foreach (var job in jobs)
{
_logger.Info($"update job: {job.JobId}");
await _jobOperations.ProcessStateUpdates(job);
}
var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWork());
await foreach (var task in tasks)
{
_logger.Info($"update task: {task.TaskId}");
await _taskOperations.ProcessStateUpdate(task);
}
await _scheduler.ScheduleTasks();
await _jobOperations.StopNeverStartedJobs();
}
}

View File

@ -4,16 +4,81 @@ namespace Microsoft.OneFuzz.Service;
public interface IJobOperations : IStatefulOrm<Job, JobState>
{
System.Threading.Tasks.Task<Job?> Get(Guid jobId);
System.Threading.Tasks.Task OnStart(Job job);
IAsyncEnumerable<Job> SearchExpired();
System.Threading.Tasks.Task Stopping(Job job);
IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states);
System.Threading.Tasks.Task StopNeverStartedJobs();
}
public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations
{
private readonly ITaskOperations _taskOperations;
private readonly IEvents _events;
public JobOperations(IStorage storage, ILogTracer log, IServiceConfig config)
: base(storage, log, config)
public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, ITaskOperations taskOperations, IEvents events) : base(storage, logTracer, config)
{
_taskOperations = taskOperations;
_events = events;
}
public async System.Threading.Tasks.Task<Job?> Get(Guid jobId)
{
return await QueryAsync($"PartitionKey eq '{jobId}'").FirstOrDefaultAsync();
}
public async System.Threading.Tasks.Task OnStart(Job job)
{
if (job.EndTime == null)
{
await Replace(job with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(job.Config.Duration) });
}
}
public IAsyncEnumerable<Job> SearchExpired()
{
return QueryAsync(filter: $"end_time lt datetime'{DateTimeOffset.UtcNow}'");
}
public IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states)
{
var query =
string.Join(" or ",
states.Select(x => $"state eq '{x}'"));
return QueryAsync(filter: query);
}
public System.Threading.Tasks.Task StopNeverStartedJobs()
{
throw new NotImplementedException();
}
public async System.Threading.Tasks.Task Stopping(Job job)
{
job = job with { State = JobState.Stopping };
var tasks = await _taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
var notStopped = taskNotStopped[true];
var stopped = taskNotStopped[false];
if (notStopped.Any())
{
foreach (var task in notStopped)
{
await _taskOperations.MarkStopping(task);
}
}
else
{
job = job with { State = JobState.Stopped };
var taskInfo = stopped.Select(t => new JobTaskStopped(t.TaskId, t.Config.Task.Type, t.Error)).ToList();
await _events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo));
}
await Replace(job);
}
}

View File

@ -0,0 +1,110 @@
namespace Microsoft.OneFuzz.Service;
public interface IScheduler
{
Async.Task ScheduleTasks();
}
public class Scheduler : IScheduler
{
private readonly ITaskOperations _taskOperations;
private readonly IConfig _config;
// TODO: eventually, this should be tied to the pool.
const int MAX_TASKS_PER_SET = 10;
public Scheduler(ITaskOperations taskOperations, IConfig config)
{
_taskOperations = taskOperations;
_config = config;
}
public async System.Threading.Tasks.Task ScheduleTasks()
{
var tasks = await _taskOperations.SearchStates(states: new[] { TaskState.Waiting }).ToDictionaryAsync(x => x.TaskId);
var seen = new HashSet<Guid>();
var buckets = BucketTasks(tasks.Values);
foreach (var bucketedTasks in buckets)
{
foreach (var chunks in bucketedTasks.Chunk(MAX_TASKS_PER_SET))
{
var result = BuildWorkSet(chunks);
}
}
throw new NotImplementedException();
}
private object BuildWorkSet(Task[] chunks)
{
throw new NotImplementedException();
}
record struct BucketId(Os os, Guid jobId, (string, string)? vm, string? pool, string setupContainer, bool? reboot, Guid? unique);
private ILookup<BucketId, Task> BucketTasks(IEnumerable<Task> tasks)
{
// buckets are hashed by:
// OS, JOB ID, vm sku & image (if available), pool name (if available),
// if the setup script requires rebooting, and a 'unique' value
//
// The unique value is set based on the following conditions:
// * if the task is set to run on more than one VM, than we assume it can't be shared
// * if the task is missing the 'colocate' flag or it's set to False
return tasks.ToLookup(task =>
{
Guid? unique = null;
// check for multiple VMs for pre-1.0.0 tasks
(string, string)? vm = task.Config.Vm != null ? (task.Config.Vm.Sku, task.Config.Vm.Image) : null;
if ((task.Config.Vm?.Count ?? 0) > 1)
{
unique = Guid.NewGuid();
}
// check for multiple VMs for 1.0.0 and later tasks
string? pool = task.Config.Pool?.PoolName;
if ((task.Config.Pool?.Count ?? 0) > 1)
{
unique = Guid.NewGuid();
}
if (!(task.Config.Colocate ?? false))
{
unique = Guid.NewGuid();
}
return new BucketId(task.Os, task.JobId, vm, pool, _config.GetSetupContainer(task.Config), task.Config.Task.RebootAfterSetup, unique);
});
}
}
public interface IConfig
{
string GetSetupContainer(TaskConfig config);
}
public class Config : IConfig
{
public string GetSetupContainer(TaskConfig config)
{
foreach (var container in config.Containers ?? throw new Exception("Missing containers"))
{
if (container.Type == ContainerType.Setup)
{
return container.Name.ContainerName;
}
}
throw new Exception($"task missing setup container: task_type = {config.Task.Type}");
}
}

View File

@ -13,20 +13,26 @@ public interface ITaskOperations : IStatefulOrm<Task, TaskState>
IEnumerable<string>? GetInputContainerQueues(TaskConfig config);
IAsyncEnumerable<Task> SearchExpired();
Async.Task MarkStopping(Task task);
Async.Task<TaskVm?> GetReproVmConfig(Task task);
}
public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations
{
private readonly IEvents _events;
private readonly IJobOperations _jobOperations;
private readonly IPoolOperations _poolOperations;
private readonly IScalesetOperations _scalesetOperations;
private IPoolOperations _poolOperations;
private IScalesetOperations _scalesetOperations;
public TaskOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOperations, IScalesetOperations scalesetOperations)
public TaskOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOperations, IScalesetOperations scalesetOperations, IEvents events, IJobOperations jobOperations)
: base(storage, log, config)
{
_poolOperations = poolOperations;
_scalesetOperations = scalesetOperations;
_events = events;
_jobOperations = jobOperations;
}
public async Async.Task<Task?> GetByTaskId(Guid taskId)
@ -69,6 +75,137 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations
throw new NotImplementedException();
}
public IAsyncEnumerable<Task> SearchExpired()
{
var timeFilter = $"end_time lt datetime'{DateTimeOffset.UtcNow.ToString("o") }'";
return QueryAsync(filter: timeFilter);
}
public async System.Threading.Tasks.Task MarkStopping(Task task)
{
if (TaskStateHelper.ShuttingDown().Contains(task.State))
{
_logTracer.Verbose($"ignoring post - task stop calls to stop {task.JobId}:{task.TaskId}");
return;
}
if (TaskStateHelper.HasStarted().Contains(task.State))
{
await MarkFailed(task, new Error(Code: ErrorCode.TASK_FAILED, Errors: new[] { "task never started" }));
}
}
public async Async.Task MarkFailed(Task task, Error error, List<Task>? taskInJob = null)
{
if (TaskStateHelper.ShuttingDown().Contains(task.State))
{
_logTracer.Verbose(
$"ignoring post-task stop failures for {task.JobId}:{task.TaskId}"
);
return;
}
if (task.Error != null)
{
_logTracer.Verbose(
$"ignoring additional task error {task.JobId}:{task.TaskId}"
);
return;
}
_logTracer.Error($"task failed {task.JobId}:{task.TaskId} - {error}");
task = await SetState(task with { Error = error }, TaskState.Stopping);
//self.set_state(TaskState.stopping)
await MarkDependantsFailed(task, taskInJob);
}
private async System.Threading.Tasks.Task MarkDependantsFailed(Task task, List<Task>? taskInJob = null)
{
taskInJob = taskInJob ?? await QueryAsync(filter: $"job_id eq ''{task.JobId}").ToListAsync();
foreach (var t in taskInJob)
{
if (t.Config.PrereqTasks != null)
{
if (t.Config.PrereqTasks.Contains(t.TaskId))
{
await MarkFailed(task, new Error(ErrorCode.TASK_FAILED, new[] { $"prerequisite task failed. task_id:{t.TaskId}" }), taskInJob);
}
}
}
}
private async Async.Task<Task> SetState(Task task, TaskState state)
{
if (task.State == state)
{
return task;
}
if (task.State == TaskState.Running || task.State == TaskState.SettingUp)
{
task = await OnStart(task with { State = state });
}
await this.Replace(task);
if (task.State == TaskState.Stopped)
{
if (task.Error != null)
{
await _events.SendEvent(new EventTaskFailed(
JobId: task.JobId,
TaskId: task.TaskId,
Error: task.Error,
UserInfo: task.UserInfo,
Config: task.Config)
);
}
else
{
await _events.SendEvent(new EventTaskStopped(
JobId: task.JobId,
TaskId: task.TaskId,
UserInfo: task.UserInfo,
Config: task.Config)
);
}
}
else
{
await _events.SendEvent(new EventTaskStateUpdated(
JobId: task.JobId,
TaskId: task.TaskId,
State: task.State,
EndTime: task.EndTime,
Config: task.Config)
);
}
return task;
}
private async Async.Task<Task> OnStart(Task task)
{
if (task.EndTime == null)
{
task = task with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(task.Config.Task.Duration) };
Job? job = await _jobOperations.Get(task.JobId);
if (job != null)
{
await _jobOperations.OnStart(job);
}
}
return task;
}
public async Async.Task<TaskVm?> GetReproVmConfig(Task task)
{
if (task.Config.Vm != null)
@ -100,4 +237,5 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations
return new TaskVm(scaleset.Region, scaleset.VmSku, scaleset.Image, null);
}
}

View File

@ -319,9 +319,7 @@ namespace Tests
State: arg.Item2,
Config: arg.Item3,
Error: arg.Item4,
EndTime: arg.Item5,
TaskInfo: arg.Item6,
UserInfo: arg.Item7
EndTime: arg.Item5
)
);
}