Replace queue_task_hearbeat (#1899)

* Replace queue_task_hearbeat

* dont rename statsFormat

* using hashsets for the helpers
This commit is contained in:
Cheick Keita
2022-05-05 11:02:36 -07:00
committed by GitHub
parent 28f0cfbf9f
commit 5393dbab65
12 changed files with 51 additions and 146 deletions

View File

@ -54,7 +54,7 @@ public class AgentCanSchedule {
} }
var task = await _taskOperations.GetByTaskId(canScheduleRequest.TaskId); var task = await _taskOperations.GetByTaskId(canScheduleRequest.TaskId);
workStopped = task == null || TaskStateHelper.ShuttingDown().Contains(task.State); workStopped = task == null || TaskStateHelper.ShuttingDown.Contains(task.State);
if (allowed) { if (allowed) {
allowed = (await _nodeOperations.AcquireScaleInProtection(node)).IsOk; allowed = (await _nodeOperations.AcquireScaleInProtection(node)).IsOk;

View File

@ -1,5 +1,4 @@
using System.Collections.Concurrent; using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
@ -96,6 +95,7 @@ public enum ContainerType {
} }
[SkipRename]
public enum StatsFormat { public enum StatsFormat {
AFL AFL
} }
@ -136,133 +136,55 @@ public static class JobStateHelper {
public static class ScalesetStateHelper { public static class ScalesetStateHelper {
static ConcurrentDictionary<string, ScalesetState[]> _states = new ConcurrentDictionary<string, ScalesetState[]>(); private static readonly IReadOnlySet<ScalesetState> _canUpdate = new HashSet<ScalesetState> { ScalesetState.Init, ScalesetState.Resize };
private static readonly IReadOnlySet<ScalesetState> _needsWork =
/// set of states that indicate the scaleset can be updated new HashSet<ScalesetState>{
public static ScalesetState[] CanUpdate() {
return
_states.GetOrAdd(nameof(CanUpdate), k => new[]{
ScalesetState.Running,
ScalesetState.Resize
});
}
/// set of states that indicate work is needed during eventing
public static ScalesetState[] NeedsWork() {
return
_states.GetOrAdd(nameof(NeedsWork), k => new[]{
ScalesetState.Init, ScalesetState.Init,
ScalesetState.Setup, ScalesetState.Setup,
ScalesetState.Resize, ScalesetState.Resize,
ScalesetState.Shutdown, ScalesetState.Shutdown,
ScalesetState.Halt, ScalesetState.Halt
}); };
} private static readonly IReadOnlySet<ScalesetState> _available = new HashSet<ScalesetState> { ScalesetState.Resize, ScalesetState.Running };
private static readonly IReadOnlySet<ScalesetState> _resizing = new HashSet<ScalesetState> { ScalesetState.Halt, ScalesetState.Init, ScalesetState.Setup };
/// set of states that indicate the scaleset can be updated
public static IReadOnlySet<ScalesetState> CanUpdate => _canUpdate;
/// set of states that indicate work is needed during eventing
public static IReadOnlySet<ScalesetState> NeedsWork => _needsWork;
/// set of states that indicate if it's available for work /// set of states that indicate if it's available for work
public static ScalesetState[] Available() { public static IReadOnlySet<ScalesetState> Available => _available;
return
_states.GetOrAdd(nameof(Available), k => {
return
new[]{
ScalesetState.Resize,
ScalesetState.Running,
};
});
}
/// set of states that indicate scaleset is resizing /// set of states that indicate scaleset is resizing
public static ScalesetState[] Resizing() { public static IReadOnlySet<ScalesetState> Resizing => _resizing;
return
_states.GetOrAdd(nameof(Resizing), k => {
return
new[]{
ScalesetState.Halt,
ScalesetState.Init,
ScalesetState.Setup,
};
});
}
} }
public static class VmStateHelper { public static class VmStateHelper {
static ConcurrentDictionary<string, VmState[]> _states = new ConcurrentDictionary<string, VmState[]>(); private static readonly IReadOnlySet<VmState> _needsWork = new HashSet<VmState> { VmState.Init, VmState.Init, VmState.ExtensionsLaunch, VmState.Stopping };
public static VmState[] NeedsWork() { private static readonly IReadOnlySet<VmState> _available = new HashSet<VmState> { VmState.Init, VmState.ExtensionsLaunch, VmState.ExtensionsFailed, VmState.VmAllocationFailed, VmState.Running, };
return
_states.GetOrAdd(nameof(VmStateHelper.NeedsWork), k => {
return
new[]{
VmState.Init,
VmState.ExtensionsLaunch,
VmState.Stopping
};
});
}
public static VmState[] Available() { public static IReadOnlySet<VmState> NeedsWork => _needsWork;
return public static IReadOnlySet<VmState> Available => _available;
_states.GetOrAdd(nameof(VmStateHelper.Available), k => {
return
new[]{
VmState.Init,
VmState.ExtensionsLaunch,
VmState.ExtensionsFailed,
VmState.VmAllocationFailed,
VmState.Running,
};
});
}
} }
public static class TaskStateHelper { public static class TaskStateHelper {
static ConcurrentDictionary<string, TaskState[]> _states = new ConcurrentDictionary<string, TaskState[]>();
public static TaskState[] Available() {
return
_states.GetOrAdd(nameof(Available), k => {
return
new[]{
TaskState.Waiting,
TaskState.Scheduled,
TaskState.SettingUp,
TaskState.Running,
TaskState.WaitJob
};
});
}
internal static TaskState[] NeedsWork() { private static readonly IReadOnlySet<TaskState> _available = new HashSet<TaskState> { TaskState.Waiting, TaskState.Scheduled, TaskState.SettingUp, TaskState.Running, TaskState.WaitJob };
return private static readonly IReadOnlySet<TaskState> _needsWork = new HashSet<TaskState> { TaskState.Init, TaskState.Stopping };
_states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k => private static readonly IReadOnlySet<TaskState> _shuttingDown = new HashSet<TaskState> { TaskState.Stopping, TaskState.Stopped };
new[]{ private static readonly IReadOnlySet<TaskState> _hasStarted = new HashSet<TaskState> { TaskState.Running, TaskState.Stopping, TaskState.Stopped };
TaskState.Init,
TaskState.Stopping
}
);
}
public static IReadOnlySet<TaskState> Available => _available;
public static TaskState[] ShuttingDown() { public static IReadOnlySet<TaskState> NeedsWork => _needsWork;
return
_states.GetOrAdd(nameof(TaskStateHelper.ShuttingDown), k =>
new[]{
TaskState.Stopping,
TaskState.Stopping,
}
);
}
internal static TaskState[] HasStarted() { public static IReadOnlySet<TaskState> ShuttingDown => _shuttingDown;
return
_states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k => public static IReadOnlySet<TaskState> HasStarted => _hasStarted;
new[]{
TaskState.Running,
TaskState.Stopping,
TaskState.Stopped
}
);
}
} }
public enum PoolState { public enum PoolState {
@ -273,26 +195,11 @@ public enum PoolState {
} }
public static class PoolStateHelper { public static class PoolStateHelper {
static ConcurrentDictionary<string, PoolState[]> _states = new ConcurrentDictionary<string, PoolState[]>(); private static readonly IReadOnlySet<PoolState> _needsWork = new HashSet<PoolState> { PoolState.Init, PoolState.Shutdown, PoolState.Halt };
public static PoolState[] NeedsWork() { private static readonly IReadOnlySet<PoolState> _available = new HashSet<PoolState> { PoolState.Running };
return
_states.GetOrAdd("NeedsWork", k =>
new[]{
PoolState.Init,
PoolState.Shutdown,
PoolState.Halt
}
);
}
public static PoolState[] Available() { public static IReadOnlySet<PoolState> NeedsWork => _needsWork;
return public static IReadOnlySet<PoolState> Available => _available;
_states.GetOrAdd("Available", k =>
new[]{
PoolState.Running
}
);
}
} }
public enum Architecture { public enum Architecture {
@ -345,8 +252,6 @@ public enum ContainerPermission {
} }
public enum Compare { public enum Compare {
Equal, Equal,
AtLeast, AtLeast,

View File

@ -18,8 +18,8 @@ public class QueueTaskHearbeat {
_events = events; _events = events;
} }
//[Function("QueueTaskHearbeat")] [Function("QueueTaskHearbeat")]
public async Async.Task Run([QueueTrigger("myqueue-items2", Connection = "AzureWebJobsStorage")] string msg) { public async Async.Task Run([QueueTrigger("task-heartbeat", Connection = "AzureWebJobsStorage")] string msg) {
_logger.LogInformation($"heartbeat: {msg}"); _logger.LogInformation($"heartbeat: {msg}");
var hb = JsonSerializer.Deserialize<TaskHeartbeatEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}"); var hb = JsonSerializer.Deserialize<TaskHeartbeatEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");

View File

@ -33,7 +33,7 @@ public partial class TimerProxy {
var proxies = await _proxYOperations.QueryAsync().ToListAsync(); var proxies = await _proxYOperations.QueryAsync().ToListAsync();
foreach (var proxy in proxies) { foreach (var proxy in proxies) {
if (VmStateHelper.Available().Contains(proxy.State)) { if (VmStateHelper.Available.Contains(proxy.State)) {
// Note, outdated checked at the start, but set at the end of this loop. // Note, outdated checked at the start, but set at the end of this loop.
// As this function is called via a timer, this works around a user // As this function is called via a timer, this works around a user
// requesting to use the proxy while this function is checking if it's // requesting to use the proxy while this function is checking if it's
@ -49,7 +49,7 @@ public partial class TimerProxy {
} }
} }
if (VmStateHelper.NeedsWork().Contains(proxy.State)) { if (VmStateHelper.NeedsWork.Contains(proxy.State)) {
_logger.Error($"scaleset-proxy: update state. proxy:{proxy.Region} state:{proxy.State}"); _logger.Error($"scaleset-proxy: update state. proxy:{proxy.Region} state:{proxy.State}");
await _proxYOperations.ProcessStateUpdate(proxy); await _proxYOperations.ProcessStateUpdate(proxy);
} }

View File

@ -25,7 +25,7 @@ public class TimerRepro {
var expiredVmIds = expired.Select(repro => repro?.VmId); var expiredVmIds = expired.Select(repro => repro?.VmId);
await foreach (var repro in _reproOperations.SearchStates(VmStateHelper.NeedsWork())) { await foreach (var repro in _reproOperations.SearchStates(VmStateHelper.NeedsWork)) {
if (await expiredVmIds.ContainsAsync(repro.VmId)) { if (await expiredVmIds.ContainsAsync(repro.VmId)) {
// this VM already got processed during the expired phase // this VM already got processed during the expired phase
continue; continue;

View File

@ -44,7 +44,7 @@ public class TimerTasks {
await _jobOperations.ProcessStateUpdates(job); await _jobOperations.ProcessStateUpdates(job);
} }
var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWork()); var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWork);
await foreach (var task in tasks) { await foreach (var task in tasks) {
_logger.Info($"update task: {task.TaskId}"); _logger.Info($"update task: {task.TaskId}");
await _taskOperations.ProcessStateUpdate(task); await _taskOperations.ProcessStateUpdate(task);

View File

@ -134,7 +134,7 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
} }
var scaleset = scalesetResult.OkV!; var scaleset = scalesetResult.OkV!;
if (!ScalesetStateHelper.Available().Contains(scaleset.State)) { if (!ScalesetStateHelper.Available.Contains(scaleset.State)) {
_logTracer.Info($"can_process_new_work scaleset not available for work. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}"); _logTracer.Info($"can_process_new_work scaleset not available for work. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
return false; return false;
} }
@ -147,7 +147,7 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
} }
var pool = poolResult.OkV!; var pool = poolResult.OkV!;
if (!PoolStateHelper.Available().Contains(pool.State)) { if (!PoolStateHelper.Available.Contains(pool.State)) {
_logTracer.Info($"can_schedule - pool is not available for work. pool_name:{node.PoolName} machine_id:{node.MachineId}"); _logTracer.Info($"can_schedule - pool is not available for work. pool_name:{node.PoolName} machine_id:{node.MachineId}");
return false; return false;
} }

View File

@ -78,7 +78,7 @@ public class NotificationOperations : Orm<Notification>, INotificationOperations
public IAsyncEnumerable<(Task, IEnumerable<string>)> GetQueueTasks() { public IAsyncEnumerable<(Task, IEnumerable<string>)> GetQueueTasks() {
// Nullability mismatch: We filter tuples where the containers are null // Nullability mismatch: We filter tuples where the containers are null
return _context.TaskOperations.SearchStates(states: TaskStateHelper.Available()) return _context.TaskOperations.SearchStates(states: TaskStateHelper.Available)
.Select(task => (task, _context.TaskOperations.GetInputContainerQueues(task.Config))) .Select(task => (task, _context.TaskOperations.GetInputContainerQueues(task.Config)))
.Where(taskTuple => taskTuple.Item2 != null)!; .Where(taskTuple => taskTuple.Item2 != null)!;
} }

View File

@ -40,7 +40,7 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
continue; continue;
} }
if (!VmStateHelper.Available().Contains(proxy.State)) { if (!VmStateHelper.Available.Contains(proxy.State)) {
continue; continue;
} }
return proxy; return proxy;
@ -71,7 +71,7 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
} }
public bool IsOutdated(Proxy proxy) { public bool IsOutdated(Proxy proxy) {
if (!VmStateHelper.Available().Contains(proxy.State)) { if (!VmStateHelper.Available.Contains(proxy.State)) {
return false; return false;
} }

View File

@ -58,7 +58,7 @@ public class Scheduler : IScheduler {
} }
private async Async.Task<bool> ScheduleWorkset(WorkSet workSet, Pool pool, int count) { private async Async.Task<bool> ScheduleWorkset(WorkSet workSet, Pool pool, int count) {
if (!PoolStateHelper.Available().Contains(pool.State)) { if (!PoolStateHelper.Available.Contains(pool.State)) {
_logTracer.Info($"pool not available for work: {pool.Name} state: {pool.State}"); _logTracer.Info($"pool not available for work: {pool.Name} state: {pool.State}");
return false; return false;
} }

View File

@ -71,19 +71,19 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
} }
public async Async.Task MarkStopping(Task task) { public async Async.Task MarkStopping(Task task) {
if (TaskStateHelper.ShuttingDown().Contains(task.State)) { if (TaskStateHelper.ShuttingDown.Contains(task.State)) {
_logTracer.Verbose($"ignoring post - task stop calls to stop {task.JobId}:{task.TaskId}"); _logTracer.Verbose($"ignoring post - task stop calls to stop {task.JobId}:{task.TaskId}");
return; return;
} }
if (TaskStateHelper.HasStarted().Contains(task.State)) { if (TaskStateHelper.HasStarted.Contains(task.State)) {
await MarkFailed(task, new Error(Code: ErrorCode.TASK_FAILED, Errors: new[] { "task never started" })); await MarkFailed(task, new Error(Code: ErrorCode.TASK_FAILED, Errors: new[] { "task never started" }));
} }
} }
public async Async.Task MarkFailed(Task task, Error error, List<Task>? taskInJob = null) { public async Async.Task MarkFailed(Task task, Error error, List<Task>? taskInJob = null) {
if (TaskStateHelper.ShuttingDown().Contains(task.State)) { if (TaskStateHelper.ShuttingDown.Contains(task.State)) {
_logTracer.Verbose( _logTracer.Verbose(
$"ignoring post-task stop failures for {task.JobId}:{task.TaskId}" $"ignoring post-task stop failures for {task.JobId}:{task.TaskId}"
); );
@ -210,7 +210,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
return false; return false;
} }
if (!TaskStateHelper.HasStarted().Contains(t.State)) { if (!TaskStateHelper.HasStarted.Contains(t.State)) {
return false; return false;
} }
} }