Two fixes to C# scheduling (#2390)

Two fixes to scheduling code:

- `GetPool` was not correct for the VM case (this code is possibly legacy and not used any more)
- `BuildWorkUnit` could fetch the same pool multiple times and then fail due to `BucketConfig` mismatch (on `TimeStamp`)
  - add a cache to the loop so that we only fetch each pool once
This commit is contained in:
George Pollard
2022-09-14 14:06:01 +12:00
committed by GitHub
parent 3b8cbc3f1e
commit f375ee719e
3 changed files with 84 additions and 59 deletions

View File

@ -71,7 +71,7 @@ public class JobOperations : StatefulOrm<Job, JobState, JobOperations>, IJobOper
var jobs = this.QueryAsync(filter);
await foreach (var job in jobs) {
await foreach (var task in _context.TaskOperations.QueryAsync($"PartitionKey eq '{job.JobId}'")) {
await foreach (var task in _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString()))) {
await _context.TaskOperations.MarkFailed(task, new Error(ErrorCode.TASK_FAILED, new[] { "job never not start" }));
}
_logTracer.Info($"stopping job that never started: {job.JobId}");

View File

@ -33,19 +33,19 @@ public class Scheduler : IScheduler {
var buckets = BucketTasks(tasks.Values);
// only fetch pools once from storage; see explanation in BuildWorkUnit for more
var poolCache = new Dictionary<PoolKey, Pool>();
foreach (var bucketedTasks in buckets) {
foreach (var chunks in bucketedTasks.Chunk(MAX_TASKS_PER_SET)) {
var result = await BuildWorkSet(chunks);
if (result == null) {
continue;
}
var (bucketConfig, workSet) = result.Value;
if (await ScheduleWorkset(workSet, bucketConfig.pool, bucketConfig.count)) {
foreach (var workUnit in workSet.WorkUnits) {
var task1 = tasks[workUnit.TaskId];
Task task = await _taskOperations.SetState(task1, TaskState.Scheduled);
seen.Add(task.TaskId);
var result = await BuildWorkSet(chunks, poolCache);
if (result is var (bucketConfig, workSet)) {
if (await ScheduleWorkset(workSet, bucketConfig.pool, bucketConfig.count)) {
foreach (var workUnit in workSet.WorkUnits) {
var task1 = tasks[workUnit.TaskId];
Task task = await _taskOperations.SetState(task1, TaskState.Scheduled);
seen.Add(task.TaskId);
}
}
}
}
@ -72,41 +72,39 @@ public class Scheduler : IScheduler {
return true;
}
private async Async.Task<(BucketConfig, WorkSet)?> BuildWorkSet(Task[] tasks) {
private async Async.Task<(BucketConfig, WorkSet)?> BuildWorkSet(Task[] tasks, Dictionary<PoolKey, Pool> poolCache) {
var taskIds = tasks.Select(x => x.TaskId).ToHashSet();
var workUnits = new List<WorkUnit>();
BucketConfig? bucketConfig = null;
foreach (var task in tasks) {
if ((task.Config.PrereqTasks?.Count ?? 0) > 0) {
if (task.Config.PrereqTasks is List<Guid> prereqTasks && prereqTasks.Any()) {
// if all of the prereqs are in this bucket, they will be
// scheduled together
if (!taskIds.IsSupersetOf(task.Config.PrereqTasks!)) {
if (!(await _taskOperations.CheckPrereqTasks(task))) {
if (!taskIds.IsSupersetOf(prereqTasks)) {
if (!await _taskOperations.CheckPrereqTasks(task)) {
continue;
}
}
}
var result = await BuildWorkunit(task);
if (result == null) {
continue;
}
var result = await BuildWorkunit(task, poolCache);
if (result is var (newBucketConfig, workUnit)) {
if (bucketConfig is null) {
bucketConfig = newBucketConfig;
} else if (bucketConfig != newBucketConfig) {
throw new Exception($"bucket configs differ: {bucketConfig} VS {newBucketConfig}");
}
if (bucketConfig == null) {
bucketConfig = result.Value.Item1;
} else if (bucketConfig != result.Value.Item1) {
throw new Exception($"bucket configs differ: {bucketConfig} VS {result.Value.Item1}");
workUnits.Add(workUnit);
}
workUnits.Add(result.Value.Item2);
}
if (bucketConfig != null) {
var setupUrl = await _containers.GetContainerSasUrl(bucketConfig.setupContainer, StorageType.Corpus, BlobContainerSasPermissions.Read | BlobContainerSasPermissions.List) ?? throw new Exception("container not found");
if (bucketConfig is not null) {
var setupUrl = await _containers.GetContainerSasUrl(bucketConfig.setupContainer, StorageType.Corpus, BlobContainerSasPermissions.Read | BlobContainerSasPermissions.List);
var workSet = new WorkSet(
Reboot: bucketConfig.reboot,
Script: bucketConfig.setupScript != null,
Script: bucketConfig.setupScript is not null,
SetupUrl: setupUrl,
WorkUnits: workUnits
);
@ -120,23 +118,52 @@ public class Scheduler : IScheduler {
record BucketConfig(long count, bool reboot, Container setupContainer, string? setupScript, Pool pool);
private async Async.Task<(BucketConfig, WorkUnit)?> BuildWorkunit(Task task) {
Pool? pool = await _taskOperations.GetPool(task);
if (pool == null) {
_logTracer.Info($"unable to find pool for task: {task.TaskId}");
record PoolKey(
PoolName? poolName = null,
(string sku, string image)? vm = null);
private static PoolKey? GetPoolKey(Task task) {
// the behaviour of this key should match the behaviour of TaskOperations.GetPool
if (task.Config.Pool is TaskPool p) {
return new PoolKey(poolName: p.PoolName);
}
if (task.Config.Vm is TaskVm vm) {
return new PoolKey(vm: (vm.Sku, vm.Image));
}
return null;
}
private async Async.Task<(BucketConfig, WorkUnit)?> BuildWorkunit(Task task, Dictionary<PoolKey, Pool> poolCache) {
var poolKey = GetPoolKey(task);
if (poolKey is null) {
return null;
}
// we cache the pools by key so that we only fetch each pool once
// this reduces load on storage and also ensures that we don't
// have multiple copies of the same pool entity with differing values
if (!poolCache.TryGetValue(poolKey, out var pool)) {
var foundPool = await _taskOperations.GetPool(task);
if (foundPool is null) {
_logTracer.Info($"unable to find pool for task: {task.TaskId}");
return null;
}
pool = poolCache[poolKey] = foundPool;
}
_logTracer.Info($"scheduling task: {task.TaskId}");
var job = await _jobOperations.Get(task.JobId);
if (job == null) {
if (job is null) {
throw new Exception($"invalid job_id {task.JobId} for task {task.TaskId}");
}
var taskConfig = await _config.BuildTaskConfig(job, task);
if (taskConfig == null) {
if (taskConfig is null) {
_logTracer.Info($"unable to build task config for task: {task.TaskId}");
return null;
}
@ -156,14 +183,14 @@ public class Scheduler : IScheduler {
var reboot = false;
var count = 1L;
if (task.Config.Pool != null) {
count = task.Config.Pool.Count;
if (task.Config.Pool is TaskPool p) {
count = p.Count;
reboot = task.Config.Task.RebootAfterSetup ?? false;
} else if (task.Config.Vm != null) {
count = task.Config.Vm.Count;
reboot = (task.Config.Vm.RebootAfterSetup ?? false) || (task.Config.Task.RebootAfterSetup ?? false);
} else if (task.Config.Vm is TaskVm vm) {
count = vm.Count;
reboot = (vm.RebootAfterSetup ?? false) || (task.Config.Task.RebootAfterSetup ?? false);
} else {
throw new Exception();
throw new Exception("Either Pool or VM should be set");
}
var workUnit = new WorkUnit(
@ -179,7 +206,7 @@ public class Scheduler : IScheduler {
reboot,
setupContainer.Name,
setupScript,
pool with { ETag = null });
pool with { ETag = default, TimeStamp = default });
return (bucketConfig, workUnit);
}

View File

@ -274,36 +274,34 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
}
public async Async.Task<Pool?> GetPool(Task task) {
if (task.Config.Pool != null) {
var pool = await _context.PoolOperations.GetByName(task.Config.Pool.PoolName);
// Note: if the behaviour of this method changes,
// then Scheduler.GetPoolKey will probably also need to change
if (task.Config.Pool is TaskPool p) {
var pool = await _context.PoolOperations.GetByName(p.PoolName);
if (!pool.IsOk) {
_logTracer.Info(
$"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}"
);
_logTracer.Info($"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}");
return null;
}
return pool.OkV;
} else if (task.Config.Vm != null) {
var scalesets = _context.ScalesetOperations.Search().Where(s => s.VmSku == task.Config.Vm.Sku && s.Image == task.Config.Vm.Image);
return pool.OkV;
}
if (task.Config.Vm is TaskVm taskVm) {
var scalesets = _context.ScalesetOperations.Search().Where(s => s.VmSku == taskVm.Sku && s.Image == taskVm.Image);
await foreach (var scaleset in scalesets) {
if (task.Config.Pool == null) {
continue;
}
var pool = await _context.PoolOperations.GetByName(task.Config.Pool.PoolName);
var pool = await _context.PoolOperations.GetByName(scaleset.PoolName);
if (!pool.IsOk) {
_logTracer.Info(
$"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}"
);
_logTracer.Info($"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}");
return null;
}
return pool.OkV;
}
}
_logTracer.Warning($"unable to find a scaleset that matches the task prereqs: {task.TaskId}");
return null;
}
public async Async.Task<Task> Init(Task task) {