From 9d8d3327d23e31c0023a42821a4768a6a612e6d7 Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Wed, 13 Apr 2022 17:19:44 -0700 Subject: [PATCH] Migrating QueueTaskHeartbeat (#1777) * Migrating QueueTaskHeartbeat * changing the name of the input queue * rename type alias Tasks to Async * Fix property casing * fixing types * Removing IStorageProvider * fix function name * address PR comments --- .../ApiService/OneFuzzTypes/Enums.cs | 63 +++++++ .../ApiService/OneFuzzTypes/Events.cs | 10 +- .../ApiService/OneFuzzTypes/Model.cs | 155 ++++++++++++++---- src/ApiService/ApiService/Program.cs | 8 +- src/ApiService/ApiService/QueueFileChanges.cs | 13 +- .../ApiService/QueueNodeHearbeat.cs | 3 +- .../ApiService/QueueProxyHeartbeat.cs | 3 +- .../ApiService/QueueTaskHearbeat.cs | 43 +++++ src/ApiService/ApiService/UserCredentials.cs | 3 +- .../ApiService/onefuzzlib/Events.cs | 9 +- src/ApiService/ApiService/onefuzzlib/Queue.cs | 6 +- .../ApiService/onefuzzlib/TaskOperations.cs | 28 ++++ .../onefuzzlib/WebhookOperations.cs | 9 +- .../onefuzzlib/orm/StorageProvider.cs | 53 ------ 14 files changed, 286 insertions(+), 120 deletions(-) create mode 100644 src/ApiService/ApiService/QueueTaskHearbeat.cs create mode 100644 src/ApiService/ApiService/onefuzzlib/TaskOperations.cs delete mode 100644 src/ApiService/ApiService/onefuzzlib/orm/StorageProvider.cs diff --git a/src/ApiService/ApiService/OneFuzzTypes/Enums.cs b/src/ApiService/ApiService/OneFuzzTypes/Enums.cs index e18a98243..45d7876a1 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Enums.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Enums.cs @@ -43,4 +43,67 @@ public enum WebhookMessageState Retrying, Succeeded, Failed +} + +public enum TaskState +{ + Init, + Waiting, + Scheduled, + Setting_up, + Running, + Stopping, + Stopped, + WaitJob +} + +public enum TaskType +{ + Coverage, + LibfuzzerFuzz, + LibfuzzerCoverage, + LibfuzzerCrashReport, + LibfuzzerMerge, + LibfuzzerRegression, + GenericAnalysis, + GenericSupervisor, + GenericMerge, + GenericGenerator, + GenericCrashReport, + GenericRegression +} + +public enum Os +{ + Windows, + Linux +} + +public enum ContainerType +{ + Analysis, + Coverage, + Crashes, + Inputs, + NoRepro, + ReadonlyInputs, + Reports, + Setup, + Tools, + UniqueInputs, + UniqueReports, + RegressionReports, + Logs +} + + +public enum StatsFormat +{ + AFL +} + +public enum TaskDebugFlag +{ + KeepNodeOnFailure, + KeepNodeOnCompletion, } \ No newline at end of file diff --git a/src/ApiService/ApiService/OneFuzzTypes/Events.cs b/src/ApiService/ApiService/OneFuzzTypes/Events.cs index 7da144eda..6f628b1b3 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Events.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Events.cs @@ -104,11 +104,11 @@ namespace Microsoft.OneFuzz.Service // ) : BaseEvent(); - //record EventTaskHeartbeat( - // JobId: Guid, - // TaskId: Guid, - // Config: TaskConfig - //): BaseEvent(); + record EventTaskHeartbeat( + Guid JobId, + Guid TaskId, + TaskConfig Config + ) : BaseEvent(); //record EventPing( diff --git a/src/ApiService/ApiService/OneFuzzTypes/Model.cs b/src/ApiService/ApiService/OneFuzzTypes/Model.cs index 43489de8e..346431a4d 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Model.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Model.cs @@ -2,6 +2,8 @@ using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; using System; using System.Collections.Generic; using PoolName = System.String; +using Region = System.String; +using Container = System.String; namespace Microsoft.OneFuzz.Service; @@ -29,9 +31,15 @@ public enum HeartbeatType TaskAlive, } -public record HeartbeatData(HeartbeatType type); +public record HeartbeatData(HeartbeatType Type); -public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] data); +public record TaskHeartbeatEntry( + Guid TaskId, + Guid? JobId, + Guid MachineId, + HeartbeatData[] Data + ); +public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] Data); public record NodeCommandStopIfFree(); @@ -79,7 +87,7 @@ public enum NodeState public record ProxyHeartbeat ( - string Region, + Region Region, Guid ProxyId, List Forwards, DateTimeOffset TimeStamp @@ -102,35 +110,35 @@ public partial record Node public partial record ProxyForward -( - [PartitionKey] string Region, +( + [PartitionKey] Region Region, [RowKey] int DstPort, - int SrcPort, + int SrcPort, string DstIp ) : EntityBase(); -public partial record ProxyConfig -( - Uri Url, - string Notification, - string Region, - Guid? ProxyId, - List Forwards, - string InstanceTelemetryKey, +public partial record ProxyConfig +( + Uri Url, + string Notification, + Region Region, + Guid? ProxyId, + List Forwards, + string InstanceTelemetryKey, string MicrosoftTelemetryKey ); public partial record Proxy ( - [PartitionKey] string Region, + [PartitionKey] Region Region, [RowKey] Guid ProxyId, DateTimeOffset? CreatedTimestamp, - VmState State, - Authentication Auth, - string? Ip, - Error? Error, - string Version, + VmState State, + Authentication Auth, + string? Ip, + Error? Error, + string Version, ProxyHeartbeat? heartbeat ) : EntityBase(); @@ -148,23 +156,102 @@ public record EventMessage( ) : EntityBase(); -//record AnyHttpUrl(AnyUrl): -// allowed_schemes = {'http', 'https -// +public record TaskDetails( + + TaskType Type, + int Duration, + string? TargetExe, + Dictionary? TargetEnv, + List? TargetOptions, + int? TargetWorkers, + bool? TargetOptionsMerge, + bool? CheckAsanLog, + bool? CheckDebugger, + int? CheckRetryCount, + bool? CheckFuzzerHelp, + bool? ExpectCrashOnFailure, + bool? RenameOutput, + string? SupervisorExe, + Dictionary? SupervisorEnv, + List? SupervisorOptions, + string? SupervisorInputMarker, + string? GeneratorExe, + Dictionary? GeneratorEnv, + List? GeneratorOptions, + string? AnalyzerExe, + Dictionary? AnalyzerEnv, + List AnalyzerOptions, + ContainerType? WaitForFiles, + string? StatsFile, + StatsFormat? StatsFormat, + bool? RebootAfterSetup, + int? TargetTimeout, + int? EnsembleSyncDelay, + bool? PreserveExistingOutputs, + List? ReportList, + int? MinimizedStackDepth, + string? CoverageFilter +); + +public record TaskVm( + Region Region, + string Sku, + string Image, + int Count, + bool SpotInstance, + bool? RebootAfterSetup +); + +public record TaskPool( + int Count, + PoolName PoolName +); + +public record TaskContainers( + ContainerType Type, + Container Name +); +public record TaskConfig( + Guid JobId, + List? PrereqTasks, + TaskDetails Task, + TaskVm? Vm, + TaskPool? Pool, + List? Containers, + Dictionary? Tags, + List? Debug, + bool? Colocate + ); +public record TaskEventSummary( + DateTimeOffset? Timestamp, + string EventData, + string EventType + ); +public record NodeAssignment( + Guid NodeId, + Guid? ScalesetId, + NodeTaskState State + ); -//public record TaskConfig( -// Guid jobId, -// List PrereqTasks, -// TaskDetails Task, -// TaskVm? vm, -// TaskPool pool: Optional[] -// containers: List[TaskContainers] -// tags: Dict[str, str] -// debug: Optional[List[TaskDebugFlag]] -// colocate: Optional[bool] -// ): EntityBase(); +public record Task( + // Timestamp: Optional[datetime] = Field(alias="Timestamp") + [PartitionKey] Guid JobId, + [RowKey] Guid TaskId, + TaskState State, + Os Os, + TaskConfig Config, + Error? Error, + Authentication? Auth, + DateTimeOffset? Heartbeat, + DateTimeOffset? EndTime, + UserInfo? UserInfo) : EntityBase() +{ + List Events { get; set; } = new List(); + List Nodes { get; set; } = new List(); + +} \ No newline at end of file diff --git a/src/ApiService/ApiService/Program.cs b/src/ApiService/ApiService/Program.cs index fa06b0b8f..02fd6b386 100644 --- a/src/ApiService/ApiService/Program.cs +++ b/src/ApiService/ApiService/Program.cs @@ -1,10 +1,14 @@ +// to avoid collision with Task in model.cs +global using Async = System.Threading.Tasks; + using System; using System.Collections.Generic; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.DependencyInjection; -using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; using ApiService.OneFuzzLib; + + namespace Microsoft.OneFuzz.Service; public class Program @@ -34,11 +38,11 @@ public class Program .ConfigureServices((context, services) => services .AddSingleton(_ => new LogTracerFactory(GetLoggers())) - .AddSingleton(_ => new StorageProvider(EnvironmentVariables.OneFuzz.FuncStorage ?? throw new InvalidOperationException("Missing account id"))) .AddSingleton() .AddSingleton() .AddSingleton() .AddSingleton() + .AddSingleton() .AddSingleton() .AddSingleton(_ => new Creds()) .AddSingleton() diff --git a/src/ApiService/ApiService/QueueFileChanges.cs b/src/ApiService/ApiService/QueueFileChanges.cs index 1a0b5161e..c90b3b977 100644 --- a/src/ApiService/ApiService/QueueFileChanges.cs +++ b/src/ApiService/ApiService/QueueFileChanges.cs @@ -2,7 +2,6 @@ using System; using Microsoft.Azure.Functions.Worker; using System.Collections.Generic; using System.Text.Json; -using System.Threading.Tasks; using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; using System.Linq; @@ -15,19 +14,17 @@ public class QueueFileChanges const int MAX_DEQUEUE_COUNT = 5; private readonly ILogTracerFactory _loggerFactory; - private readonly IStorageProvider _storageProvider; private readonly IStorage _storage; - public QueueFileChanges(ILogTracerFactory loggerFactory, IStorageProvider storageProvider, IStorage storage) + public QueueFileChanges(ILogTracerFactory loggerFactory, IStorage storage) { _loggerFactory = loggerFactory; - _storageProvider = storageProvider; _storage = storage; } [Function("QueueFileChanges")] - public Task Run( + public Async.Task Run( [QueueTrigger("file-changes-refactored", Connection = "AzureWebJobsStorage")] string msg, int dequeueCount) { @@ -42,18 +39,18 @@ public class QueueFileChanges if (!fileChangeEvent.ContainsKey(eventType) || fileChangeEvent[eventType] != "Microsoft.Storage.BlobCreated") { - return Task.CompletedTask; + return Async.Task.CompletedTask; } const string topic = "topic"; if (!fileChangeEvent.ContainsKey(topic) || !_storage.CorpusAccounts(log).Contains(fileChangeEvent[topic])) { - return Task.CompletedTask; + return Async.Task.CompletedTask; } file_added(log, fileChangeEvent, lastTry); - return Task.CompletedTask; + return Async.Task.CompletedTask; } private void file_added(ILogTracer log, Dictionary fileChangeEvent, bool failTaskOnTransientError) diff --git a/src/ApiService/ApiService/QueueNodeHearbeat.cs b/src/ApiService/ApiService/QueueNodeHearbeat.cs index d13fa5285..c7ea5ea8c 100644 --- a/src/ApiService/ApiService/QueueNodeHearbeat.cs +++ b/src/ApiService/ApiService/QueueNodeHearbeat.cs @@ -1,7 +1,6 @@ using System; using Microsoft.Azure.Functions.Worker; using System.Text.Json; -using System.Threading.Tasks; using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; namespace Microsoft.OneFuzz.Service; @@ -22,7 +21,7 @@ public class QueueNodeHearbeat } [Function("QueueNodeHearbeat")] - public async Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) + public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) { var log = _loggerFactory.MakeLogTracer(Guid.NewGuid()); log.Info($"heartbeat: {msg}"); diff --git a/src/ApiService/ApiService/QueueProxyHeartbeat.cs b/src/ApiService/ApiService/QueueProxyHeartbeat.cs index 7e25017fc..9eb437d7b 100644 --- a/src/ApiService/ApiService/QueueProxyHeartbeat.cs +++ b/src/ApiService/ApiService/QueueProxyHeartbeat.cs @@ -1,7 +1,6 @@ using System; using Microsoft.Azure.Functions.Worker; using System.Text.Json; -using System.Threading.Tasks; using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; namespace Microsoft.OneFuzz.Service; @@ -19,7 +18,7 @@ public class QueueProxyHearbeat } [Function("QueueProxyHearbeat")] - public async Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) + public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) { var log = _loggerFactory.MakeLogTracer(Guid.NewGuid()); diff --git a/src/ApiService/ApiService/QueueTaskHearbeat.cs b/src/ApiService/ApiService/QueueTaskHearbeat.cs new file mode 100644 index 000000000..a8d02ffff --- /dev/null +++ b/src/ApiService/ApiService/QueueTaskHearbeat.cs @@ -0,0 +1,43 @@ +using System; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; +using System.Text.Json; +using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; + +namespace Microsoft.OneFuzz.Service; + + +public class QueueTaskHearbeat +{ + private readonly ILogger _logger; + + private readonly IEvents _events; + private readonly ITaskOperations _tasks; + + public QueueTaskHearbeat(ILoggerFactory loggerFactory, ITaskOperations tasks, IEvents events) + { + _logger = loggerFactory.CreateLogger(); + _tasks = tasks; + _events = events; + } + + [Function("QueueTaskHearbeat")] + public async Async.Task Run([QueueTrigger("myqueue-items2", Connection = "AzureWebJobsStorage")] string msg) + { + _logger.LogInformation($"heartbeat: {msg}"); + + var hb = JsonSerializer.Deserialize(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}"); + + var task = await _tasks.GetByTaskId(hb.TaskId); + + if (task == null) + { + _logger.LogWarning($"invalid task id: {hb.TaskId}"); + return; + } + + var newTask = task with { Heartbeat = DateTimeOffset.UtcNow }; + await _tasks.Replace(newTask); + await _events.SendEvent(new EventTaskHeartbeat(newTask.JobId, newTask.TaskId, newTask.Config)); + } +} diff --git a/src/ApiService/ApiService/UserCredentials.cs b/src/ApiService/ApiService/UserCredentials.cs index 67256c409..d4b647d3d 100644 --- a/src/ApiService/ApiService/UserCredentials.cs +++ b/src/ApiService/ApiService/UserCredentials.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.IdentityModel.Tokens; + namespace Microsoft.OneFuzz.Service; public class UserCredentials @@ -53,7 +54,7 @@ public class UserCredentials static Task> GetAllowedTenants() { - return Task.FromResult(OneFuzzResult.Ok(Array.Empty())); + return Async.Task.FromResult(OneFuzzResult.Ok(Array.Empty())); } /* diff --git a/src/ApiService/ApiService/onefuzzlib/Events.cs b/src/ApiService/ApiService/onefuzzlib/Events.cs index 37fced624..1efe316a6 100644 --- a/src/ApiService/ApiService/onefuzzlib/Events.cs +++ b/src/ApiService/ApiService/onefuzzlib/Events.cs @@ -5,7 +5,6 @@ using System.Collections.Generic; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; -using System.Threading.Tasks; namespace Microsoft.OneFuzz.Service { @@ -20,9 +19,9 @@ namespace Microsoft.OneFuzz.Service public interface IEvents { - public Task SendEvent(BaseEvent anEvent); + public Async.Task SendEvent(BaseEvent anEvent); - public Task QueueSignalrEvent(EventMessage message); + public Async.Task QueueSignalrEvent(EventMessage message); } public class Events : IEvents @@ -38,14 +37,14 @@ namespace Microsoft.OneFuzz.Service _webhook = webhook; } - public async Task QueueSignalrEvent(EventMessage eventMessage) + public async Async.Task QueueSignalrEvent(EventMessage eventMessage) { var message = new SignalREvent("events", new List() { eventMessage }); var encodedMessage = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)); await _queue.SendMessage("signalr-events", encodedMessage, StorageType.Config); } - public async Task SendEvent(BaseEvent anEvent) + public async Async.Task SendEvent(BaseEvent anEvent) { var log = _loggerFactory.MakeLogTracer(Guid.NewGuid()); var eventType = anEvent.GetEventType(); diff --git a/src/ApiService/ApiService/onefuzzlib/Queue.cs b/src/ApiService/ApiService/onefuzzlib/Queue.cs index d159d717f..3df5f1785 100644 --- a/src/ApiService/ApiService/onefuzzlib/Queue.cs +++ b/src/ApiService/ApiService/onefuzzlib/Queue.cs @@ -8,8 +8,8 @@ using System.Threading.Tasks; namespace Microsoft.OneFuzz.Service; public interface IQueue { - Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); - Task QueueObject(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout); + Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); + Async.Task QueueObject(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout); } @@ -25,7 +25,7 @@ public class Queue : IQueue } - public async Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) + public async Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) { var queue = GetQueue(name, storageType); if (queue != null) diff --git a/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs b/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs new file mode 100644 index 000000000..ee056cd4d --- /dev/null +++ b/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs @@ -0,0 +1,28 @@ +using ApiService.OneFuzzLib.Orm; +using System; +using System.Linq; + +namespace Microsoft.OneFuzz.Service; + +public interface ITaskOperations : IOrm +{ + Async.Task GetByTaskId(Guid taskId); +} + +public class TaskOperations : Orm, ITaskOperations +{ + + public TaskOperations(IStorage storage) + : base(storage) + { + + } + + public async Async.Task GetByTaskId(Guid taskId) + { + var data = QueryAsync(filter: $"RowKey eq '{taskId}'"); + + return await data.FirstOrDefaultAsync(); + } + +} diff --git a/src/ApiService/ApiService/onefuzzlib/WebhookOperations.cs b/src/ApiService/ApiService/onefuzzlib/WebhookOperations.cs index ebc4bb135..c30f66348 100644 --- a/src/ApiService/ApiService/onefuzzlib/WebhookOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/WebhookOperations.cs @@ -2,7 +2,6 @@ using Microsoft.OneFuzz.Service; using System; using System.Collections.Generic; -using System.Threading.Tasks; namespace ApiService.OneFuzzLib; @@ -29,7 +28,7 @@ public class WebhookMessageLogOperations : Orm, IWebhookMessa } - public async Task QueueWebhook(WebhookMessageLog webhookLog) + public async Async.Task QueueWebhook(WebhookMessageLog webhookLog) { var log = _loggerFactory.MakeLogTracer(Guid.NewGuid()); var obj = new WebhookMessageQueueObj(webhookLog.WebhookId, webhookLog.EventId); @@ -65,7 +64,7 @@ public class WebhookMessageLogOperations : Orm, IWebhookMessa public interface IWebhookOperations { - Task SendEvent(EventMessage eventMessage); + Async.Task SendEvent(EventMessage eventMessage); } public class WebhookOperations : Orm, IWebhookOperations @@ -77,7 +76,7 @@ public class WebhookOperations : Orm, IWebhookOperations _webhookMessageLogOperations = webhookMessageLogOperations; } - async public Task SendEvent(EventMessage eventMessage) + async public Async.Task SendEvent(EventMessage eventMessage) { await foreach (var webhook in GetWebhooksCached()) { @@ -89,7 +88,7 @@ public class WebhookOperations : Orm, IWebhookOperations } } - async private Task AddEvent(Webhook webhook, EventMessage eventMessage) + async private Async.Task AddEvent(Webhook webhook, EventMessage eventMessage) { var message = new WebhookMessageLog( EventId: eventMessage.EventId, diff --git a/src/ApiService/ApiService/onefuzzlib/orm/StorageProvider.cs b/src/ApiService/ApiService/onefuzzlib/orm/StorageProvider.cs deleted file mode 100644 index 1d17ea18c..000000000 --- a/src/ApiService/ApiService/onefuzzlib/orm/StorageProvider.cs +++ /dev/null @@ -1,53 +0,0 @@ -using Azure.Data.Tables; -using System; -using System.Linq; -using System.Threading.Tasks; -using Azure.Core; -using Azure.ResourceManager.Storage; -using Azure.ResourceManager; -using Azure.Identity; - -namespace Microsoft.OneFuzz.Service.OneFuzzLib.Orm; - - -public interface IStorageProvider -{ - Task GetTableClient(string table); - //IAsyncEnumerable QueryAsync(string filter) where T : EntityBase; - //Task Replace(T entity) where T : EntityBase; - -} - -public class StorageProvider : IStorageProvider -{ - private readonly string _accountId; - private readonly EntityConverter _entityConverter; - private readonly ArmClient _armClient; - - public StorageProvider(string accountId) - { - _accountId = accountId; - _entityConverter = new EntityConverter(); - _armClient = new ArmClient(new DefaultAzureCredential()); - } - - public async Task GetTableClient(string table) - { - var (name, key) = GetStorageAccountNameAndKey(_accountId); - var identifier = new ResourceIdentifier(_accountId); - var tableClient = new TableServiceClient(new Uri($"https://{identifier.Name}.table.core.windows.net"), new TableSharedKeyCredential(name, key)); - await tableClient.CreateTableIfNotExistsAsync(table); - return tableClient.GetTableClient(table); - } - - - public (string?, string?) GetStorageAccountNameAndKey(string accountId) - { - var resourceId = new ResourceIdentifier(accountId); - var storageAccount = _armClient.GetStorageAccountResource(resourceId); - var key = storageAccount.GetKeys().Value.Keys.FirstOrDefault(); - return (resourceId.Name, key?.Value); - } - - -} \ No newline at end of file