Timer workers (part 1) (#1863)

Co-authored-by: stas <statis@microsoft.com>
This commit is contained in:
Stas
2022-04-30 09:04:45 -07:00
committed by GitHub
parent fed6069f12
commit 732400be1d
16 changed files with 593 additions and 63 deletions

View File

@ -8,7 +8,7 @@ namespace Microsoft.OneFuzz.Service;
public interface ILog {
void Log(Guid correlationId, String message, SeverityLevel level, IReadOnlyDictionary<string, string> tags, string? caller);
void LogEvent(Guid correlationId, String evt, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller);
void LogException(Guid correlationId, Exception ex, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller);
void LogException(Guid correlationId, Exception ex, string message, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller);
void Flush();
}
@ -37,7 +37,7 @@ class AppInsights : ILog {
_telemetryClient.TrackEvent(evt, properties: copyTags, metrics: copyMetrics);
}
public void LogException(Guid correlationId, Exception ex, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
public void LogException(Guid correlationId, Exception ex, string message, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
Dictionary<string, string> copyTags = new(tags);
copyTags["Correlation ID"] = correlationId.ToString();
if (caller is not null) copyTags["CalledBy"] = caller;
@ -47,6 +47,8 @@ class AppInsights : ILog {
copyMetrics = new(metrics);
}
_telemetryClient.TrackException(ex, copyTags, copyMetrics);
Log(correlationId, $"{message} : {ex.Message}", SeverityLevel.Error, tags, caller);
}
public void Flush() {
@ -89,8 +91,8 @@ class Console : ILog {
LogTags(correlationId, tags);
LogMetrics(correlationId, metrics);
}
public void LogException(Guid correlationId, Exception ex, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
System.Console.Out.WriteLine($"[{correlationId}][Exception] {ex}");
public void LogException(Guid correlationId, Exception ex, string message, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
System.Console.Out.WriteLine($"[{correlationId}][Exception] {message}:{ex}");
LogTags(correlationId, tags);
LogMetrics(correlationId, metrics);
}
@ -105,7 +107,7 @@ public interface ILogTracer {
void Critical(string message);
void Error(string message);
void Event(string evt, IReadOnlyDictionary<string, double>? metrics);
void Exception(Exception ex, IReadOnlyDictionary<string, double>? metrics = null);
void Exception(Exception ex, string message = "", IReadOnlyDictionary<string, double>? metrics = null);
void ForceFlush();
void Info(string message);
void Warning(string message);
@ -238,10 +240,10 @@ public class LogTracer : ILogTracerInternal {
}
}
public void Exception(Exception ex, IReadOnlyDictionary<string, double>? metrics) {
public void Exception(Exception ex, string message, IReadOnlyDictionary<string, double>? metrics) {
var caller = GetCaller();
foreach (var logger in _loggers) {
logger.LogException(CorrelationId, ex, Tags, metrics, caller);
logger.LogException(CorrelationId, ex, message, Tags, metrics, caller);
}
}

View File

@ -121,9 +121,9 @@ public enum JobState {
}
public static class JobStateHelper {
private static readonly HashSet<JobState> _shuttingDown = new HashSet<JobState>(new[] { JobState.Stopping, JobState.Stopped });
private static readonly HashSet<JobState> _avaiable = new HashSet<JobState>(new[] { JobState.Init, JobState.Enabled });
private static readonly HashSet<JobState> _needsWork = new HashSet<JobState>(new[] { JobState.Init, JobState.Stopping });
private static readonly IReadOnlySet<JobState> _shuttingDown = new HashSet<JobState>(new[] { JobState.Stopping, JobState.Stopped });
private static readonly IReadOnlySet<JobState> _avaiable = new HashSet<JobState>(new[] { JobState.Init, JobState.Enabled });
private static readonly IReadOnlySet<JobState> _needsWork = new HashSet<JobState>(new[] { JobState.Init, JobState.Stopping });
public static IReadOnlySet<JobState> Available => _avaiable;
public static IReadOnlySet<JobState> NeedsWork => _needsWork;
@ -354,3 +354,33 @@ public enum AgentMode {
Repro,
Proxy
}
public enum NodeState {
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
Shutdown,
Halt,
}
public static class NodeStateHelper {
private static readonly IReadOnlySet<NodeState> _needsWork = new HashSet<NodeState>(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt });
private static readonly IReadOnlySet<NodeState> _readyForReset = new HashSet<NodeState>(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt });
private static readonly IReadOnlySet<NodeState> _canProcessNewWork = new HashSet<NodeState>(new[] { NodeState.Free });
public static IReadOnlySet<NodeState> NeedsWork => _needsWork;
///If Node is in one of these states, ignore updates from the agent.
public static IReadOnlySet<NodeState> ReadyForReset => _readyForReset;
public static IReadOnlySet<NodeState> CanProcessNewWork => _canProcessNewWork;
}

View File

@ -60,6 +60,7 @@ public abstract record BaseEvent() {
EventScalesetFailed _ => EventType.ScalesetFailed,
EventScalesetResizeScheduled _ => EventType.ScalesetResizeScheduled,
EventScalesetStateUpdated _ => EventType.ScalesetStateUpdated,
EventNodeDeleted _ => EventType.NodeDeleted,
_ => throw new NotImplementedException(),
};
@ -81,7 +82,10 @@ public abstract record BaseEvent() {
EventType.TaskFailed => typeof(EventTaskFailed),
EventType.TaskStopped => typeof(EventTaskStopped),
EventType.TaskStateUpdated => typeof(EventTaskStateUpdated),
EventType.ScalesetFailed => typeof(EventScalesetFailed),
EventType.ScalesetResizeScheduled => typeof(EventScalesetResizeScheduled),
EventType.ScalesetStateUpdated => typeof(EventScalesetStateUpdated),
EventType.NodeDeleted => typeof(EventNodeDeleted),
_ => throw new ArgumentException($"invalid input {eventType}"),
};
@ -102,7 +106,7 @@ public record EventTaskStopped(
) : BaseEvent();
record EventTaskFailed(
public record EventTaskFailed(
Guid JobId,
Guid TaskId,
Error Error,
@ -118,14 +122,14 @@ record EventTaskFailed(
// ) : BaseEvent();
record JobTaskStopped(
public record JobTaskStopped(
Guid TaskId,
TaskType TaskType,
Error? Error
) : BaseEvent();
record EventJobStopped(
public record EventJobStopped(
Guid JobId,
JobConfig Config,
UserInfo? UserInfo,
@ -141,7 +145,7 @@ record EventJobStopped(
// ) : BaseEvent();
record EventTaskStateUpdated(
public record EventTaskStateUpdated(
Guid JobId,
Guid TaskId,
TaskState State,
@ -245,11 +249,11 @@ public record EventNodeHeartbeat(
) : BaseEvent();
// record EventNodeDeleted(
// Guid MachineId,
// Guid ScalesetId,
// PoolName PoolName
// ) : BaseEvent();
public record EventNodeDeleted(
Guid MachineId,
Guid? ScalesetId,
PoolName PoolName
) : BaseEvent();
public record EventScalesetStateUpdated(

View File

@ -70,19 +70,8 @@ public record NodeTasks
Guid MachineId,
Guid TaskId,
NodeTaskState State = NodeTaskState.Init
);
) : StatefulEntityBase<NodeTaskState>(State);
public enum NodeState {
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
Shutdown,
Halt,
}
public record ProxyHeartbeat
(

View File

@ -88,6 +88,9 @@ public class Program {
.AddScoped<ILogAnalytics, LogAnalytics>()
.AddScoped<IExtensions, Extensions>()
.AddScoped<IVmssOperations, VmssOperations>()
.AddScoped<INodeTasksOperations, NodeTasksOperations>()
.AddScoped<INodeMessageOperations, NodeMessageOperations>()
.AddSingleton<ICreds, Creds>()
.AddSingleton<IServiceConfig, ServiceConfiguration>()
.AddHttpClient()

View File

@ -0,0 +1,40 @@
namespace Microsoft.OneFuzz.Service;
public class TimerWorkers {
ILogTracer _log;
IScalesetOperations _scaleSetOps;
public TimerWorkers(ILogTracer log, IScalesetOperations scaleSetOps) {
_log = log;
_scaleSetOps = scaleSetOps;
}
void ProcessScaleSets(Scaleset scaleset) {
_log.Verbose($"checking scaleset for updates: {scaleset.ScalesetId}");
_scaleSetOps.UpdateConfigs(scaleset);
}
//public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
// NOTE: Update pools first, such that scalesets impacted by pool updates
// (such as shutdown or resize) happen during this iteration `timer_worker`
// rather than the following iteration.
// NOTE: Nodes, and Scalesets should be processed in a consistent order such
// during 'pool scale down' operations. This means that pools that are
// scaling down will more likely remove from the same scalesets over time.
// By more likely removing from the same scalesets, we are more likely to
// get to empty scalesets, which can safely be deleted.
//}
}

View File

@ -1,5 +1,4 @@
using System.Text;
using System.Text.Json;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
@ -32,8 +31,7 @@ namespace Microsoft.OneFuzz.Service {
public async Async.Task QueueSignalrEvent(EventMessage eventMessage) {
var message = new SignalREvent("events", new List<EventMessage>() { eventMessage });
var encodedMessage = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
await _queue.SendMessage("signalr-events", encodedMessage, StorageType.Config);
await _queue.SendMessage("signalr-events", JsonSerializer.Serialize(message), StorageType.Config);
}
public async Async.Task SendEvent(BaseEvent anEvent) {

View File

@ -1,17 +1,45 @@
using System.Threading.Tasks;
using System.Text.Json;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface INodeOperations : IStatefulOrm<Node, NodeState> {
Task<Node?> GetByMachineId(Guid machineId);
IAsyncEnumerable<Node> SearchStates(Guid? poolId = default,
Guid? scaleSetId = default,
IList<NodeState>? states = default,
string? poolName = default,
bool excludeUpdateScheduled = false,
int? numResults = default);
new Async.Task Delete(Node node);
}
public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
public NodeOperations(IStorage storage, ILogTracer log, IServiceConfig config)
private readonly INodeTasksOperations _nodeTasksOps;
private readonly ITaskOperations _taskOps;
private readonly INodeMessageOperations _nodeMessageOps;
private readonly IEvents _events;
public NodeOperations(
IStorage storage,
ILogTracer log,
IServiceConfig config,
ITaskOperations taskOps,
INodeTasksOperations nodeTasksOps,
INodeMessageOperations nodeMessageOps,
IEvents events
)
: base(storage, log, config) {
_taskOps = taskOps;
_nodeTasksOps = nodeTasksOps;
_nodeMessageOps = nodeMessageOps;
_events = events;
}
public async Task<Node?> GetByMachineId(Guid machineId) {
@ -20,4 +48,174 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
return await data.FirstOrDefaultAsync();
}
public static string SearchStatesQuery(
string oneFuzzVersion,
Guid? poolId = default,
Guid? scaleSetId = default,
IEnumerable<NodeState>? states = default,
string? poolName = default,
bool excludeUpdateScheduled = false,
int? numResults = default) {
List<string> queryParts = new();
if (poolId is not null) {
queryParts.Add($"(pool_id eq '{poolId}')");
}
if (scaleSetId is not null) {
queryParts.Add($"(scaleset_id eq '{scaleSetId}')");
}
if (states is not null) {
IEnumerable<string> convertedStates = states.Select(x => JsonSerializer.Serialize(x, EntityConverter.GetJsonSerializerOptions()).Trim('"'));
var q = Query.EqualAny("state", convertedStates);
queryParts.Add($"({q})");
}
if (excludeUpdateScheduled) {
queryParts.Add($"reimage_requested eq false");
queryParts.Add($"delete_requested eq false");
}
//# azure table query always return false when the column does not exist
//# We write the query this way to allow us to get the nodes where the
//# version is not defined as well as the nodes with a mismatched version
var versionQuery = $"not (version eq '{oneFuzzVersion}')";
queryParts.Add(versionQuery);
return Query.And(queryParts);
}
public IAsyncEnumerable<Node> SearchStates(
Guid? poolId = default,
Guid? scaleSetId = default,
IList<NodeState>? states = default,
string? poolName = default,
bool excludeUpdateScheduled = false,
int? numResults = default) {
var query = NodeOperations.SearchStatesQuery(_config.OneFuzzVersion, poolId, scaleSetId, states, poolName, excludeUpdateScheduled, numResults);
return QueryAsync(query);
}
public async Async.Task MarkTasksStoppedEarly(Node node, Error? error = null) {
if (error is null) {
error = new Error(ErrorCode.TASK_FAILED, new[] { $"node reimaged during task execution. machine_id: {node.MachineId}" });
}
await foreach (var entry in _nodeTasksOps.GetByMachineId(node.MachineId)) {
var task = await _taskOps.GetByTaskId(entry.TaskId);
if (task is not null) {
await _taskOps.MarkFailed(task, error);
}
if (!node.DebugKeepNode) {
await Delete(node);
}
}
}
public new async Async.Task Delete(Node node) {
await MarkTasksStoppedEarly(node);
await _nodeTasksOps.ClearByMachineId(node.MachineId);
await _nodeMessageOps.ClearMessages(node.MachineId);
await base.Delete(node);
await _events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName));
}
}
public interface INodeTasksOperations : IStatefulOrm<NodeTasks, NodeTaskState> {
IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId, INodeOperations nodeOps);
IAsyncEnumerable<NodeAssignment> GetNodeAssignments(Guid taskId, INodeOperations nodeOps);
IAsyncEnumerable<NodeTasks> GetByMachineId(Guid machineId);
IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId);
Async.Task ClearByMachineId(Guid machineId);
}
public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState>, INodeTasksOperations {
ILogTracer _log;
public NodeTasksOperations(IStorage storage, ILogTracer log, IServiceConfig config)
: base(storage, log, config) {
_log = log;
}
//TODO: suggest by Cheick: this can probably be optimize by query all NodesTasks then query the all machine in single request
public async IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId, INodeOperations nodeOps) {
List<Node> results = new();
await foreach (var entry in QueryAsync($"task_id eq '{taskId}'")) {
var node = await nodeOps.GetByMachineId(entry.MachineId);
if (node is not null) {
yield return node;
}
}
}
public async IAsyncEnumerable<NodeAssignment> GetNodeAssignments(Guid taskId, INodeOperations nodeOps) {
await foreach (var entry in QueryAsync($"task_id eq '{taskId}'")) {
var node = await nodeOps.GetByMachineId(entry.MachineId);
if (node is not null) {
var nodeAssignment = new NodeAssignment(node.MachineId, node.ScalesetId, entry.State);
yield return nodeAssignment;
}
}
}
public IAsyncEnumerable<NodeTasks> GetByMachineId(Guid machineId) {
return QueryAsync($"macine_id eq '{machineId}'");
}
public IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId) {
return QueryAsync($"task_id eq '{taskId}'");
}
public async Async.Task ClearByMachineId(Guid machineId) {
_log.Info($"clearing tasks for node {machineId}");
await foreach (var entry in GetByMachineId(machineId)) {
var res = await Delete(entry);
if (!res.IsOk) {
_log.Error($"failed to delete node task entry for machine_id: {entry.MachineId} due to [{res.ErrorV.Item1}] {res.ErrorV.Item2}");
}
}
}
}
//# this isn't anticipated to be needed by the client, hence it not
//# being in onefuzztypes
public record NodeMessage(
[PartitionKey] Guid MachineId,
[RowKey] string MessageId,
NodeCommand Message
) : EntityBase;
public interface INodeMessageOperations : IOrm<NodeMessage> {
IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId);
Async.Task ClearMessages(Guid machineId);
}
public class NodeMessageOperations : Orm<NodeMessage>, INodeMessageOperations {
private readonly ILogTracer _log;
public NodeMessageOperations(IStorage storage, ILogTracer log, IServiceConfig config) : base(storage, log, config) {
_log = log;
}
public IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId) {
return QueryAsync($"machine_id eq '{machineId}'");
}
public async Async.Task ClearMessages(Guid machineId) {
_log.Info($"clearing messages for node {machineId}");
await foreach (var message in GetMessage(machineId)) {
var r = await Delete(message);
if (!r.IsOk) {
_log.Error($"failed to delete message for node {machineId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}");
}
}
}
}

View File

@ -65,7 +65,7 @@ public class NotificationOperations : Orm<Notification>, INotificationOperations
if (containers.Contains(container.ContainerName)) {
_logTracer.Info($"queuing input {container.ContainerName} {filename} {task.TaskId}");
var url = _containers.GetFileSasUrl(container, filename, StorageType.Corpus, BlobSasPermissions.Read | BlobSasPermissions.Delete);
await _queue.SendMessage(task.TaskId.ToString(), System.Text.Encoding.UTF8.GetBytes(url?.ToString() ?? ""), StorageType.Corpus);
await _queue.SendMessage(task.TaskId.ToString(), url?.ToString() ?? "", StorageType.Corpus);
}
}

View File

@ -1,5 +1,6 @@
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Storage;
using Azure.Storage.Queues;
using Azure.Storage.Sas;
@ -7,9 +8,15 @@ using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface IQueue {
Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null);
Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
Async.Task<Uri?> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null);
ResourceIdentifier GeResourceId(string queueName, StorageType storageType);
Task<IList<T>> PeekQueue<T>(string name, StorageType storageType);
Async.Task<bool> RemoveFirstMessage(string name, StorageType storageType);
Async.Task ClearQueue(string name, StorageType storageType);
Async.Task DeleteQueue(string name, StorageType storageType);
Async.Task CreateQueue(string name, StorageType storageType);
}
@ -25,41 +32,123 @@ public class Queue : IQueue {
}
public async Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
var queue = await GetQueue(name, storageType);
if (queue != null) {
await queue.SendMessageAsync(Convert.ToBase64String(message), visibilityTimeout: visibilityTimeout, timeToLive: timeToLive);
public async Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
var queue = await GetQueueClient(name, storageType);
try {
await queue.SendMessageAsync(message, visibilityTimeout: visibilityTimeout, timeToLive: timeToLive);
} catch (Exception ex) {
_log.Exception(ex, $"Failed to send message {message}");
throw;
}
}
public async Task<QueueClient?> GetQueue(string name, StorageType storageType) {
var client = await GetQueueClient(storageType);
public async Task<QueueClient> GetQueueClient(string name, StorageType storageType) {
var client = await GetQueueClientService(storageType);
return client.GetQueueClient(name);
}
public async Task<QueueServiceClient> GetQueueClient(StorageType storageType) {
public async Task<QueueServiceClient> GetQueueClientService(StorageType storageType) {
var accountId = _storage.GetPrimaryAccount(storageType);
//_logger.LogDEbug("getting blob container (account_id: %s)", account_id)
_log.Verbose($"getting blob container (account_id: {accountId})");
var (name, key) = await _storage.GetStorageAccountNameAndKey(accountId);
var accountUrl = new Uri($"https://{name}.queue.core.windows.net");
var client = new QueueServiceClient(accountUrl, new StorageSharedKeyCredential(name, key));
return client;
var options = new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 };
return new QueueServiceClient(accountUrl, new StorageSharedKeyCredential(name, key), options);
}
public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout) {
var queue = await GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
var queueClient = await GetQueueClient(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
try {
var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions());
//var encoded = Encoding.UTF8.GetBytes(serialized);
var response = await queue.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout);
return !response.GetRawResponse().IsError;
var res = await queueClient.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout, timeToLive);
if (res.GetRawResponse().IsError) {
_log.Error($"Failed to send message {serialized} in queue {name} due to {res.GetRawResponse().ReasonPhrase}");
return false;
} else {
return true;
}
} catch (Exception ex) {
_log.Exception(ex, $"Failed to queue message in queue {name}");
return false;
}
}
public async Task<Uri?> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) {
var queue = await GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
var queue = await GetQueueClient(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
var sasaBuilder = new QueueSasBuilder(permissions, DateTimeOffset.UtcNow + (duration ?? DEFAULT_DURATION));
var url = queue.GenerateSasUri(sasaBuilder);
return url;
}
public async Async.Task CreateQueue(string name, StorageType storageType) {
var client = await GetQueueClient(name, storageType);
var resp = await client.CreateIfNotExistsAsync();
if (resp.IsError) {
_log.Error($"failed to create queue {name} due to {resp.ReasonPhrase}");
}
}
public async Async.Task DeleteQueue(string name, StorageType storageType) {
var client = await GetQueueClient(name, storageType);
var resp = await client.DeleteIfExistsAsync();
if (resp.GetRawResponse().IsError) {
_log.Error($"failed to delete queue {name} due to {resp.GetRawResponse().ReasonPhrase}");
}
}
public async Async.Task ClearQueue(string name, StorageType storageType) {
var client = await GetQueueClient(name, storageType);
var resp = await client.ClearMessagesAsync();
if (resp.IsError) {
_log.Error($"failed to clear the queue {name} due to {resp.ReasonPhrase}");
}
}
public async Async.Task<bool> RemoveFirstMessage(string name, StorageType storageType) {
var client = await GetQueueClient(name, storageType);
var msgs = await client.ReceiveMessagesAsync();
foreach (var msg in msgs.Value) {
var resp = await client.DeleteMessageAsync(msg.MessageId, msg.PopReceipt);
if (resp.IsError) {
_log.Error($"failed to delete message from the queue {name} due to {resp.ReasonPhrase}");
return false;
} else {
return true;
}
}
return false; ;
}
public async Task<IList<T>> PeekQueue<T>(string name, StorageType storageType) {
var client = await GetQueueClient(name, storageType);
var result = new List<T>();
var msgs = await client.PeekMessagesAsync(client.MaxPeekableMessages);
if (msgs is null) {
return result;
} else if (msgs.GetRawResponse().IsError) {
_log.Error($"failed to peek messages due to {msgs.GetRawResponse().ReasonPhrase}");
return result;
} else {
foreach (var msg in msgs.Value) {
var obj = JsonSerializer.Deserialize<T>(msg.Body.ToString(), EntityConverter.GetJsonSerializerOptions());
if (obj is not null) {
result.Add(obj);
}
}
}
return result;
}
public ResourceIdentifier GeResourceId(string queueName, StorageType storageType) {
var account = _storage.GetPrimaryAccount(storageType);
return new ResourceIdentifier($"{account}/services/queue/queues/{queueName}");
}
}

View File

@ -17,14 +17,18 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
IEvents _events;
IExtensions _extensions;
IVmssOperations _vmssOps;
IQueue _queue;
INodeOperations _nodeOps;
public ScalesetOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOps, IEvents events, IExtensions extensions, IVmssOperations vmssOps)
public ScalesetOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOps, IEvents events, IExtensions extensions, IVmssOperations vmssOps, IQueue queue, INodeOperations nodeOps)
: base(storage, log, config) {
_log = log;
_poolOps = poolOps;
_events = events;
_extensions = extensions;
_vmssOps = vmssOps;
_queue = queue;
_nodeOps = nodeOps;
}
public IAsyncEnumerable<Scaleset> Search() {
@ -87,7 +91,7 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
var pool = await _poolOps.GetByName(scaleSet.PoolName);
if (!pool.IsOk || pool.OkV is null) {
_log.Error($"{SCALESET_LOG_PREFIX}: unable to find pool during config update. pool:{scaleSet.PoolName}, scaleset_id:{scaleSet.ScalesetId}");
_log.Error($"{SCALESET_LOG_PREFIX} unable to find pool during config update. pool:{scaleSet.PoolName}, scaleset_id:{scaleSet.ScalesetId}");
await SetFailed(scaleSet, pool.ErrorV!);
return;
}
@ -97,7 +101,44 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
var res = await _vmssOps.UpdateExtensions(scaleSet.ScalesetId, extensions);
if (!res.IsOk) {
_log.Info($"{SCALESET_LOG_PREFIX}: unable to update configs {string.Join(',', res.ErrorV.Errors!)}");
_log.Info($"{SCALESET_LOG_PREFIX} unable to update configs {string.Join(',', res.ErrorV.Errors!)}");
}
}
public async Async.Task Halt(Scaleset scaleset) {
var shrinkQueue = new ShrinkQueue(scaleset.ScalesetId, _queue, _log);
await shrinkQueue.Delete();
await foreach (var node in _nodeOps.SearchStates(scaleSetId: scaleset.ScalesetId)) {
_log.Info($"{SCALESET_LOG_PREFIX} deleting node scaleset_id {scaleset.ScalesetId} machine_id {node.MachineId}");
}
//_nodeOps.
}
/// <summary>
/// Cleanup scaleset nodes
/// </summary>
/// <param name="scaleSet"></param>
/// <returns>true if scaleset got modified</returns>
public async Async.Task<bool> CleanupNodes(Scaleset scaleSet) {
_log.Info($"{SCALESET_LOG_PREFIX} cleaning up nodes. scaleset_id {scaleSet.ScalesetId}");
if (scaleSet.State == ScalesetState.Halt) {
_log.Info($"{SCALESET_LOG_PREFIX} halting scaleset scaleset_id {scaleSet.ScalesetId}");
await Halt(scaleSet);
return true;
}
throw new NotImplementedException();
}
}

View File

@ -0,0 +1,58 @@
namespace Microsoft.OneFuzz.Service;
public record ShrinkEntry(Guid ShrinkId);
public class ShrinkQueue {
Guid _baseId;
IQueue _queueOps;
ILogTracer _log;
public ShrinkQueue(Guid baseId, IQueue queueOps, ILogTracer log) {
_baseId = baseId;
_queueOps = queueOps;
_log = log;
}
public override string ToString() {
return $"to-shrink-{_baseId.ToString("N")}";
}
public string QueueName => this.ToString();
public async Async.Task Clear() {
await _queueOps.ClearQueue(QueueName, StorageType.Config);
}
public async Async.Task Create() {
await _queueOps.CreateQueue(QueueName, StorageType.Config);
}
public async Async.Task Delete() {
await _queueOps.DeleteQueue(QueueName, StorageType.Config);
}
public async Async.Task<bool> AddEntry() {
return await _queueOps.QueueObject<ShrinkEntry>(QueueName, new ShrinkEntry(Guid.NewGuid()), StorageType.Config);
}
public async Async.Task SetSize(int size) {
await Clear();
var i = 0;
while (i < size) {
var r = await AddEntry();
if (r) {
i++;
} else {
//TODO: retry after a delay ? I guess make a decision on this
//if we hit this error message... For now just log and move on to
//make it behave same as Python code.
_log.Error($"failed to add entry to shrink queue");
i++;
}
}
}
}

View File

@ -14,6 +14,8 @@ public interface ITaskOperations : IStatefulOrm<Task, TaskState> {
IAsyncEnumerable<Task> SearchExpired();
Async.Task MarkStopping(Task task);
Async.Task MarkFailed(Task task, Error error, List<Task>? taskInJob = null);
Async.Task<TaskVm?> GetReproVmConfig(Task task);
Async.Task<bool> CheckPrereqTasks(Task task);
System.Threading.Tasks.Task<Pool?> GetPool(Task task);

View File

@ -0,0 +1,17 @@
namespace ApiService.OneFuzzLib.Orm {
public static class Query {
public static string Or(IEnumerable<string> queries) {
return string.Join(" or ", queries.Select(x => $"({x})"));
}
public static string And(IEnumerable<string> queries) {
return string.Join(" and ", queries.Select(x => $"({x})"));
}
public static string EqualAny(string property, IEnumerable<string> values) {
return Or(values.Select(x => $"{property} eq '{x}'"));
}
}
}

View File

@ -24,6 +24,13 @@ namespace Tests {
Arb.Generate<EventCrashReported>().Select(e => e as BaseEvent),
Arb.Generate<EventRegressionReported>().Select(e => e as BaseEvent),
Arb.Generate<EventFileAdded>().Select(e => e as BaseEvent),
Arb.Generate<EventTaskFailed>().Select(e => e as BaseEvent),
Arb.Generate<EventTaskStopped>().Select(e => e as BaseEvent),
Arb.Generate<EventTaskStateUpdated>().Select(e => e as BaseEvent),
Arb.Generate<EventScalesetFailed>().Select(e => e as BaseEvent),
Arb.Generate<EventScalesetResizeScheduled>().Select(e => e as BaseEvent),
Arb.Generate<EventScalesetStateUpdated>().Select(e => e as BaseEvent),
Arb.Generate<EventNodeDeleted>().Select(e => e as BaseEvent),
});
}
@ -47,6 +54,17 @@ namespace Tests {
));
}
public static Gen<NodeTasks> NodeTasks() {
return Arb.Generate<Tuple<Guid, Guid, NodeTaskState>>().Select(
arg =>
new NodeTasks(
MachineId: arg.Item1,
TaskId: arg.Item2,
State: arg.Item3
)
);
}
public static Gen<Node> Node() {
return Arb.Generate<Tuple<Tuple<DateTimeOffset?, string, Guid?, Guid, NodeState>, Tuple<Guid?, DateTimeOffset, string, bool, bool, bool>>>().Select(
arg => new Node(
@ -311,6 +329,10 @@ namespace Tests {
return Arb.From(OrmGenerators.BaseEvent());
}
public static Arbitrary<NodeTasks> NodeTasks() {
return Arb.From(OrmGenerators.NodeTasks());
}
public static Arbitrary<Node> Node() {
return Arb.From(OrmGenerators.Node());
}

View File

@ -0,0 +1,37 @@
using System;
using Microsoft.OneFuzz.Service;
using Xunit;
namespace Tests {
public class QueryTests {
[Fact]
public void NodeOperationsSearchStatesQuery() {
var query1 = NodeOperations.SearchStatesQuery("1.2.3");
Assert.Equal("(not (version eq '1.2.3'))", query1);
var query2 = NodeOperations.SearchStatesQuery("1.2.3", poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618"));
Assert.Equal("((pool_id eq '3b0426d3-9bde-4ae8-89ac-4edf0d3b3618')) and (not (version eq '1.2.3'))", query2);
var query3 = NodeOperations.SearchStatesQuery("1.2.3", scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b"));
Assert.Equal("((scaleset_id eq '4c96dd6b-9bdb-4758-9720-1010c244fa4b')) and (not (version eq '1.2.3'))", query3);
var query4 = NodeOperations.SearchStatesQuery("1.2.3", states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready });
Assert.Equal("(((state eq 'free') or (state eq 'done') or (state eq 'ready'))) and (not (version eq '1.2.3'))", query4);
var query5 = NodeOperations.SearchStatesQuery("1.2.3", excludeUpdateScheduled: true);
Assert.Equal("(reimage_requested eq false) and (delete_requested eq false) and (not (version eq '1.2.3'))", query5);
var query7 = NodeOperations.SearchStatesQuery(
"1.2.3",
poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618"),
scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b"),
states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready },
excludeUpdateScheduled: true);
Assert.Equal("((pool_id eq '3b0426d3-9bde-4ae8-89ac-4edf0d3b3618')) and ((scaleset_id eq '4c96dd6b-9bdb-4758-9720-1010c244fa4b')) and (((state eq 'free') or (state eq 'done') or (state eq 'ready'))) and (reimage_requested eq false) and (delete_requested eq false) and (not (version eq '1.2.3'))", query7);
}
}
}