diff --git a/src/ApiService/ApiService/Functions/QueueJobResult.cs b/src/ApiService/ApiService/Functions/QueueJobResult.cs new file mode 100644 index 000000000..d781a4d1e --- /dev/null +++ b/src/ApiService/ApiService/Functions/QueueJobResult.cs @@ -0,0 +1,60 @@ +using System.Text.Json; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; +using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; +namespace Microsoft.OneFuzz.Service.Functions; + + +public class QueueJobResult { + private readonly ILogger _log; + private readonly IOnefuzzContext _context; + + public QueueJobResult(ILogger logTracer, IOnefuzzContext context) { + _log = logTracer; + _context = context; + } + + [Function("QueueJobResult")] + public async Async.Task Run([QueueTrigger("job-result", Connection = "AzureWebJobsStorage")] string msg) { + + var _tasks = _context.TaskOperations; + var _jobs = _context.JobOperations; + + _log.LogInformation("job result: {msg}", msg); + var jr = JsonSerializer.Deserialize(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}"); + + var task = await _tasks.GetByTaskId(jr.TaskId); + if (task == null) { + _log.LogWarning("invalid {TaskId}", jr.TaskId); + return; + } + + var job = await _jobs.Get(task.JobId); + if (job == null) { + _log.LogWarning("invalid {JobId}", task.JobId); + return; + } + + JobResultData? data = jr.Data; + if (data == null) { + _log.LogWarning($"job result data is empty, throwing out: {jr}"); + return; + } + + var jobResultType = data.Type; + _log.LogInformation($"job result data type: {jobResultType}"); + + Dictionary value; + if (jr.Value.Count > 0) { + value = jr.Value; + } else { + _log.LogWarning($"job result data is empty, throwing out: {jr}"); + return; + } + + var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jobResultType, value); + if (!jobResult.IsOk) { + _log.LogError("failed to create or update with job result {JobId}", job.JobId); + } + } +} diff --git a/src/ApiService/ApiService/OneFuzzTypes/Model.cs b/src/ApiService/ApiService/OneFuzzTypes/Model.cs index e430c1448..b839f52dd 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Model.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Model.cs @@ -33,6 +33,19 @@ public enum HeartbeatType { TaskAlive, } +[SkipRename] +public enum JobResultType { + NewCrashingInput, + NoReproCrashingInput, + NewReport, + NewUniqueReport, + NewRegressionReport, + NewCoverage, + NewCrashDump, + CoverageData, + RuntimeStats, +} + public record HeartbeatData(HeartbeatType Type); public record TaskHeartbeatEntry( @@ -41,6 +54,16 @@ public record TaskHeartbeatEntry( Guid MachineId, HeartbeatData[] Data); +public record JobResultData(JobResultType Type); + +public record TaskJobResultEntry( + Guid TaskId, + Guid? JobId, + Guid MachineId, + JobResultData Data, + Dictionary Value + ); + public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] Data); public record NodeCommandStopIfFree(); @@ -892,6 +915,27 @@ public record SecretAddress(Uri Url) : ISecret { public record SecretData(ISecret Secret) { } +public record JobResult( + [PartitionKey][RowKey] Guid JobId, + string Project, + string Name, + double NewCrashingInput = 0, + double NoReproCrashingInput = 0, + double NewReport = 0, + double NewUniqueReport = 0, + double NewRegressionReport = 0, + double NewCrashDump = 0, + double InstructionsCovered = 0, + double TotalInstructions = 0, + double CoverageRate = 0, + double IterationCount = 0 +) : EntityBase() { + public JobResult(Guid JobId, string Project, string Name) : this( + JobId: JobId, + Project: Project, + Name: Name, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) { } +} + public record JobConfig( string Project, string Name, @@ -1056,6 +1100,7 @@ public record TaskUnitConfig( string? InstanceTelemetryKey, string? MicrosoftTelemetryKey, Uri HeartbeatQueue, + Uri JobResultQueue, Dictionary Tags ) { public Uri? inputQueue { get; set; } diff --git a/src/ApiService/ApiService/Program.cs b/src/ApiService/ApiService/Program.cs index f425c0080..d5ee30b45 100644 --- a/src/ApiService/ApiService/Program.cs +++ b/src/ApiService/ApiService/Program.cs @@ -118,6 +118,7 @@ public class Program { .AddScoped() .AddScoped() .AddScoped() + .AddScoped() .AddScoped() .AddScoped() .AddScoped() diff --git a/src/ApiService/ApiService/onefuzzlib/Config.cs b/src/ApiService/ApiService/onefuzzlib/Config.cs index 71af31734..872cedbc0 100644 --- a/src/ApiService/ApiService/onefuzzlib/Config.cs +++ b/src/ApiService/ApiService/onefuzzlib/Config.cs @@ -71,6 +71,7 @@ public class Config : IConfig { InstanceTelemetryKey: _serviceConfig.ApplicationInsightsInstrumentationKey, MicrosoftTelemetryKey: _serviceConfig.OneFuzzTelemetry, HeartbeatQueue: await _queue.GetQueueSas("task-heartbeat", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas"), + JobResultQueue: await _queue.GetQueueSas("job-result", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas"), Tags: task.Config.Tags ?? new Dictionary() ); diff --git a/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs b/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs new file mode 100644 index 000000000..6e3389050 --- /dev/null +++ b/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs @@ -0,0 +1,125 @@ +using ApiService.OneFuzzLib.Orm; +using Microsoft.Extensions.Logging; +using Polly; +namespace Microsoft.OneFuzz.Service; + +public interface IJobResultOperations : IOrm { + + Async.Task GetJobResult(Guid jobId); + Async.Task CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary resultValue); + +} +public class JobResultOperations : Orm, IJobResultOperations { + + public JobResultOperations(ILogger log, IOnefuzzContext context) + : base(log, context) { + } + + public async Async.Task GetJobResult(Guid jobId) { + return await SearchByPartitionKeys(new[] { jobId.ToString() }).SingleOrDefaultAsync(); + } + + private JobResult UpdateResult(JobResult result, JobResultType type, Dictionary resultValue) { + + var newResult = result; + double newValue; + switch (type) { + case JobResultType.NewCrashingInput: + newValue = result.NewCrashingInput + resultValue["count"]; + newResult = result with { NewCrashingInput = newValue }; + break; + case JobResultType.NewReport: + newValue = result.NewReport + resultValue["count"]; + newResult = result with { NewReport = newValue }; + break; + case JobResultType.NewUniqueReport: + newValue = result.NewUniqueReport + resultValue["count"]; + newResult = result with { NewUniqueReport = newValue }; + break; + case JobResultType.NewRegressionReport: + newValue = result.NewRegressionReport + resultValue["count"]; + newResult = result with { NewRegressionReport = newValue }; + break; + case JobResultType.NewCrashDump: + newValue = result.NewCrashDump + resultValue["count"]; + newResult = result with { NewCrashDump = newValue }; + break; + case JobResultType.CoverageData: + double newCovered = resultValue["covered"]; + double newTotalCovered = resultValue["features"]; + double newCoverageRate = resultValue["rate"]; + newResult = result with { InstructionsCovered = newCovered, TotalInstructions = newTotalCovered, CoverageRate = newCoverageRate }; + break; + case JobResultType.RuntimeStats: + double newTotalIterations = resultValue["total_count"]; + newResult = result with { IterationCount = newTotalIterations }; + break; + default: + _logTracer.LogWarning($"Invalid Field {type}."); + break; + } + _logTracer.LogInformation($"Attempting to log new result: {newResult}"); + return newResult; + } + + private async Async.Task TryUpdate(Job job, JobResultType resultType, Dictionary resultValue) { + var jobId = job.JobId; + + var jobResult = await GetJobResult(jobId); + + if (jobResult == null) { + _logTracer.LogInformation("Creating new JobResult for Job {JobId}", jobId); + + var entry = new JobResult(JobId: jobId, Project: job.Config.Project, Name: job.Config.Name); + + jobResult = UpdateResult(entry, resultType, resultValue); + + var r = await Insert(jobResult); + if (!r.IsOk) { + _logTracer.AddHttpStatus(r.ErrorV); + _logTracer.LogError("failed to insert job result {JobId}", jobResult.JobId); + throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}"); + } + _logTracer.LogInformation("created job result {JobId}", jobResult.JobId); + } else { + _logTracer.LogInformation("Updating existing JobResult entry for Job {JobId}", jobId); + + jobResult = UpdateResult(jobResult, resultType, resultValue); + + var r = await Update(jobResult); + if (!r.IsOk) { + _logTracer.AddHttpStatus(r.ErrorV); + _logTracer.LogError("failed to update job result {JobId}", jobResult.JobId); + throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}"); + } + _logTracer.LogInformation("updated job result {JobId}", jobResult.JobId); + } + + return true; + } + + public async Async.Task CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary resultValue) { + + var job = await _context.JobOperations.Get(jobId); + if (job == null) { + return OneFuzzResultVoid.Error(ErrorCode.INVALID_REQUEST, "invalid job"); + } + + var success = false; + try { + _logTracer.LogInformation("attempt to update job result {JobId}", job.JobId); + var policy = Policy.Handle().WaitAndRetryAsync(50, _ => new TimeSpan(0, 0, 5)); + await policy.ExecuteAsync(async () => { + success = await TryUpdate(job, resultType, resultValue); + _logTracer.LogInformation("attempt {success}", success); + }); + return OneFuzzResultVoid.Ok; + } catch (Exception e) { + return OneFuzzResultVoid.Error(ErrorCode.UNABLE_TO_UPDATE, new string[] { + $"Unexpected failure when attempting to update job result for {job.JobId}", + $"Exception: {e}" + }); + } + } +} + diff --git a/src/ApiService/ApiService/onefuzzlib/OnefuzzContext.cs b/src/ApiService/ApiService/onefuzzlib/OnefuzzContext.cs index d877bfddb..03c632266 100644 --- a/src/ApiService/ApiService/onefuzzlib/OnefuzzContext.cs +++ b/src/ApiService/ApiService/onefuzzlib/OnefuzzContext.cs @@ -19,6 +19,7 @@ public interface IOnefuzzContext { IExtensions Extensions { get; } IIpOperations IpOperations { get; } IJobOperations JobOperations { get; } + IJobResultOperations JobResultOperations { get; } ILogAnalytics LogAnalytics { get; } INodeMessageOperations NodeMessageOperations { get; } INodeOperations NodeOperations { get; } @@ -83,6 +84,7 @@ public class OnefuzzContext : IOnefuzzContext { public IVmOperations VmOperations => _serviceProvider.GetRequiredService(); public ISecretsOperations SecretsOperations => _serviceProvider.GetRequiredService(); public IJobOperations JobOperations => _serviceProvider.GetRequiredService(); + public IJobResultOperations JobResultOperations => _serviceProvider.GetRequiredService(); public IScheduler Scheduler => _serviceProvider.GetRequiredService(); public IConfig Config => _serviceProvider.GetRequiredService(); public ILogAnalytics LogAnalytics => _serviceProvider.GetRequiredService(); diff --git a/src/ApiService/IntegrationTests/Fakes/TestContext.cs b/src/ApiService/IntegrationTests/Fakes/TestContext.cs index c46ff5fce..66d121e74 100644 --- a/src/ApiService/IntegrationTests/Fakes/TestContext.cs +++ b/src/ApiService/IntegrationTests/Fakes/TestContext.cs @@ -32,6 +32,7 @@ public sealed class TestContext : IOnefuzzContext { TaskOperations = new TaskOperations(provider.CreateLogger(), Cache, this); NodeOperations = new NodeOperations(provider.CreateLogger(), this); JobOperations = new JobOperations(provider.CreateLogger(), this); + JobResultOperations = new JobResultOperations(provider.CreateLogger(), this); NodeTasksOperations = new NodeTasksOperations(provider.CreateLogger(), this); TaskEventOperations = new TaskEventOperations(provider.CreateLogger(), this); NodeMessageOperations = new NodeMessageOperations(provider.CreateLogger(), this); @@ -57,6 +58,7 @@ public sealed class TestContext : IOnefuzzContext { Node n => NodeOperations.Insert(n), Pool p => PoolOperations.Insert(p), Job j => JobOperations.Insert(j), + JobResult jr => JobResultOperations.Insert(jr), Repro r => ReproOperations.Insert(r), Scaleset ss => ScalesetOperations.Insert(ss), NodeTasks nt => NodeTasksOperations.Insert(nt), @@ -84,6 +86,7 @@ public sealed class TestContext : IOnefuzzContext { public ITaskOperations TaskOperations { get; } public IJobOperations JobOperations { get; } + public IJobResultOperations JobResultOperations { get; } public INodeOperations NodeOperations { get; } public INodeTasksOperations NodeTasksOperations { get; } public ITaskEventOperations TaskEventOperations { get; } diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index a1d86e7d2..254684be9 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -2123,6 +2123,7 @@ dependencies = [ "log", "nix", "notify", + "onefuzz-result", "onefuzz-telemetry", "pete", "pretty_assertions", @@ -2197,6 +2198,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "onefuzz-result" +version = "0.2.0" +dependencies = [ + "anyhow", + "async-trait", + "log", + "onefuzz-telemetry", + "reqwest", + "serde", + "storage-queue", + "uuid", +] + [[package]] name = "onefuzz-task" version = "0.2.0" @@ -2226,6 +2241,7 @@ dependencies = [ "num_cpus", "onefuzz", "onefuzz-file-format", + "onefuzz-result", "onefuzz-telemetry", "path-absolutize", "pretty_assertions", diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 2f4cea41a..ce01ae880 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -10,6 +10,7 @@ members = [ "onefuzz", "onefuzz-task", "onefuzz-agent", + "onefuzz-result", "onefuzz-file-format", "onefuzz-telemetry", "reqwest-retry", diff --git a/src/agent/onefuzz-agent/src/config.rs b/src/agent/onefuzz-agent/src/config.rs index 87edfb2c1..fc623e72a 100644 --- a/src/agent/onefuzz-agent/src/config.rs +++ b/src/agent/onefuzz-agent/src/config.rs @@ -34,6 +34,8 @@ pub struct StaticConfig { pub heartbeat_queue: Option, + pub job_result_queue: Option, + pub instance_id: Uuid, #[serde(default = "default_as_true")] @@ -71,6 +73,8 @@ struct RawStaticConfig { pub heartbeat_queue: Option, + pub job_result_queue: Option, + pub instance_id: Uuid, #[serde(default = "default_as_true")] @@ -117,6 +121,7 @@ impl StaticConfig { microsoft_telemetry_key: config.microsoft_telemetry_key, instance_telemetry_key: config.instance_telemetry_key, heartbeat_queue: config.heartbeat_queue, + job_result_queue: config.job_result_queue, instance_id: config.instance_id, managed: config.managed, machine_identity, @@ -152,6 +157,12 @@ impl StaticConfig { None }; + let job_result_queue = if let Ok(key) = std::env::var("ONEFUZZ_JOB_RESULT") { + Some(Url::parse(&key)?) + } else { + None + }; + let instance_telemetry_key = if let Ok(key) = std::env::var("ONEFUZZ_INSTANCE_TELEMETRY_KEY") { Some(InstanceTelemetryKey::new(Uuid::parse_str(&key)?)) @@ -183,6 +194,7 @@ impl StaticConfig { instance_telemetry_key, microsoft_telemetry_key, heartbeat_queue, + job_result_queue, instance_id, managed: !is_unmanaged, machine_identity, diff --git a/src/agent/onefuzz-result/Cargo.toml b/src/agent/onefuzz-result/Cargo.toml new file mode 100644 index 000000000..7c7de6615 --- /dev/null +++ b/src/agent/onefuzz-result/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "onefuzz-result" +version = "0.2.0" +authors = ["fuzzing@microsoft.com"] +edition = "2021" +publish = false +license = "MIT" + +[dependencies] +anyhow = { version = "1.0", features = ["backtrace"] } +async-trait = "0.1" +reqwest = "0.11" +serde = "1.0" +storage-queue = { path = "../storage-queue" } +uuid = { version = "1.4", features = ["serde", "v4"] } +onefuzz-telemetry = { path = "../onefuzz-telemetry" } +log = "0.4" + diff --git a/src/agent/onefuzz-result/src/job_result.rs b/src/agent/onefuzz-result/src/job_result.rs new file mode 100644 index 000000000..b305eca2c --- /dev/null +++ b/src/agent/onefuzz-result/src/job_result.rs @@ -0,0 +1,129 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use anyhow::Result; +use async_trait::async_trait; +use onefuzz_telemetry::warn; +use reqwest::Url; +use serde::{self, Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use storage_queue::QueueClient; +use uuid::Uuid; + +#[derive(Debug, Deserialize, Serialize, Hash, Eq, PartialEq, Clone)] +#[serde(tag = "type")] +pub enum JobResultData { + NewCrashingInput, + NoReproCrashingInput, + NewReport, + NewUniqueReport, + NewRegressionReport, + NewCoverage, + NewCrashDump, + CoverageData, + RuntimeStats, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +struct JobResult { + task_id: Uuid, + job_id: Uuid, + machine_id: Uuid, + machine_name: String, + data: JobResultData, + value: HashMap, +} + +#[derive(Clone)] +pub struct TaskContext { + task_id: Uuid, + job_id: Uuid, + machine_id: Uuid, + machine_name: String, +} + +pub struct JobResultContext { + pub state: TaskContext, + pub queue_client: QueueClient, +} + +pub struct JobResultClient { + pub context: Arc>, +} + +impl JobResultClient { + pub fn init_job_result( + context: TaskContext, + queue_url: Url, + ) -> Result> + where + TaskContext: Send + Sync + 'static, + { + let context = Arc::new(JobResultContext { + state: context, + queue_client: QueueClient::new(queue_url)?, + }); + + Ok(JobResultClient { context }) + } +} + +pub type TaskJobResultClient = JobResultClient; + +pub async fn init_job_result( + queue_url: Url, + task_id: Uuid, + job_id: Uuid, + machine_id: Uuid, + machine_name: String, +) -> Result { + let hb = JobResultClient::init_job_result( + TaskContext { + task_id, + job_id, + machine_id, + machine_name, + }, + queue_url, + )?; + Ok(hb) +} + +#[async_trait] +pub trait JobResultSender { + async fn send_direct(&self, data: JobResultData, value: HashMap); +} + +#[async_trait] +impl JobResultSender for TaskJobResultClient { + async fn send_direct(&self, data: JobResultData, value: HashMap) { + let task_id = self.context.state.task_id; + let job_id = self.context.state.job_id; + let machine_id = self.context.state.machine_id; + let machine_name = self.context.state.machine_name.clone(); + + let _ = self + .context + .queue_client + .enqueue(JobResult { + task_id, + job_id, + machine_id, + machine_name, + data, + value, + }) + .await; + } +} + +#[async_trait] +impl JobResultSender for Option { + async fn send_direct(&self, data: JobResultData, value: HashMap) { + match self { + Some(client) => client.send_direct(data, value).await, + None => warn!("Failed to send Job Result message data from agent."), + } + } +} diff --git a/src/agent/onefuzz-result/src/lib.rs b/src/agent/onefuzz-result/src/lib.rs new file mode 100644 index 000000000..dae666ca9 --- /dev/null +++ b/src/agent/onefuzz-result/src/lib.rs @@ -0,0 +1,4 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +pub mod job_result; diff --git a/src/agent/onefuzz-task/Cargo.toml b/src/agent/onefuzz-task/Cargo.toml index 0ad2f9aa4..4e0bd381b 100644 --- a/src/agent/onefuzz-task/Cargo.toml +++ b/src/agent/onefuzz-task/Cargo.toml @@ -39,6 +39,7 @@ serde_json = "1.0" serde_yaml = "0.9.21" onefuzz = { path = "../onefuzz" } onefuzz-telemetry = { path = "../onefuzz-telemetry" } +onefuzz-result = { path = "../onefuzz-result" } path-absolutize = "3.1" reqwest-retry = { path = "../reqwest-retry" } strum = "0.25" diff --git a/src/agent/onefuzz-task/src/local/common.rs b/src/agent/onefuzz-task/src/local/common.rs index f8d7949e8..ce31c7401 100644 --- a/src/agent/onefuzz-task/src/local/common.rs +++ b/src/agent/onefuzz-task/src/local/common.rs @@ -265,6 +265,7 @@ pub async fn build_local_context( }, instance_telemetry_key: None, heartbeat_queue: None, + job_result_queue: None, microsoft_telemetry_key: None, logs: None, min_available_memory_mb: 0, diff --git a/src/agent/onefuzz-task/src/local/template.rs b/src/agent/onefuzz-task/src/local/template.rs index b2e0c425f..f85bcf627 100644 --- a/src/agent/onefuzz-task/src/local/template.rs +++ b/src/agent/onefuzz-task/src/local/template.rs @@ -196,6 +196,7 @@ pub async fn launch( job_id: Uuid::new_v4(), instance_id: Uuid::new_v4(), heartbeat_queue: None, + job_result_queue: None, instance_telemetry_key: None, microsoft_telemetry_key: None, logs: None, diff --git a/src/agent/onefuzz-task/src/tasks/analysis/generic.rs b/src/agent/onefuzz-task/src/tasks/analysis/generic.rs index 3ba068a61..d89ab30bd 100644 --- a/src/agent/onefuzz-task/src/tasks/analysis/generic.rs +++ b/src/agent/onefuzz-task/src/tasks/analysis/generic.rs @@ -65,6 +65,8 @@ pub async fn run(config: Config) -> Result<()> { tools.init_pull().await?; } + let job_result_client = config.common.init_job_result().await?; + // the tempdir is always created, however, the reports_path and // reports_monitor_future are only created if we have one of the three // report SyncedDir. The idea is that the option for where to write reports @@ -88,6 +90,7 @@ pub async fn run(config: Config) -> Result<()> { &config.unique_reports, &config.reports, &config.no_repro, + &job_result_client, ); ( Some(reports_dir.path().to_path_buf()), diff --git a/src/agent/onefuzz-task/src/tasks/config.rs b/src/agent/onefuzz-task/src/tasks/config.rs index 0848379d7..e29e0fd60 100644 --- a/src/agent/onefuzz-task/src/tasks/config.rs +++ b/src/agent/onefuzz-task/src/tasks/config.rs @@ -14,6 +14,7 @@ use onefuzz::{ machine_id::MachineIdentity, syncdir::{SyncOperation, SyncedDir}, }; +use onefuzz_result::job_result::{init_job_result, TaskJobResultClient}; use onefuzz_telemetry::{ self as telemetry, Event::task_start, EventData, InstanceTelemetryKey, MicrosoftTelemetryKey, Role, @@ -50,6 +51,8 @@ pub struct CommonConfig { pub heartbeat_queue: Option, + pub job_result_queue: Option, + pub instance_telemetry_key: Option, pub microsoft_telemetry_key: Option, @@ -103,6 +106,23 @@ impl CommonConfig { None => Ok(None), } } + + pub async fn init_job_result(&self) -> Result> { + match &self.job_result_queue { + Some(url) => { + let result = init_job_result( + url.clone(), + self.task_id, + self.job_id, + self.machine_identity.machine_id, + self.machine_identity.machine_name.clone(), + ) + .await?; + Ok(Some(result)) + } + None => Ok(None), + } + } } #[derive(Debug, Deserialize)] diff --git a/src/agent/onefuzz-task/src/tasks/coverage/generic.rs b/src/agent/onefuzz-task/src/tasks/coverage/generic.rs index b112cfefb..4fde9efb3 100644 --- a/src/agent/onefuzz-task/src/tasks/coverage/generic.rs +++ b/src/agent/onefuzz-task/src/tasks/coverage/generic.rs @@ -26,6 +26,8 @@ use onefuzz_file_format::coverage::{ binary::{v1::BinaryCoverageJson as BinaryCoverageJsonV1, BinaryCoverageJson}, source::{v1::SourceCoverageJson as SourceCoverageJsonV1, SourceCoverageJson}, }; +use onefuzz_result::job_result::JobResultData; +use onefuzz_result::job_result::{JobResultSender, TaskJobResultClient}; use onefuzz_telemetry::{event, warn, Event::coverage_data, Event::coverage_failed, EventData}; use storage_queue::{Message, QueueClient}; use tokio::fs; @@ -114,7 +116,7 @@ impl CoverageTask { let allowlist = self.load_target_allowlist().await?; let heartbeat = self.config.common.init_heartbeat(None).await?; - + let job_result = self.config.common.init_job_result().await?; let mut seen_inputs = false; let target_exe_path = @@ -129,6 +131,7 @@ impl CoverageTask { coverage, allowlist, heartbeat, + job_result, target_exe.to_string(), )?; @@ -219,6 +222,7 @@ struct TaskContext<'a> { module_allowlist: AllowList, source_allowlist: Arc, heartbeat: Option, + job_result: Option, cache: Arc, } @@ -228,6 +232,7 @@ impl<'a> TaskContext<'a> { coverage: BinaryCoverage, allowlist: TargetAllowList, heartbeat: Option, + job_result: Option, target_exe: String, ) -> Result { let cache = DebugInfoCache::new(allowlist.source_files.clone()); @@ -247,6 +252,7 @@ impl<'a> TaskContext<'a> { module_allowlist: allowlist.modules, source_allowlist: Arc::new(allowlist.source_files), heartbeat, + job_result, cache: Arc::new(cache), }) } @@ -455,7 +461,16 @@ impl<'a> TaskContext<'a> { let s = CoverageStats::new(&coverage); event!(coverage_data; Covered = s.covered, Features = s.features, Rate = s.rate); metric!(coverage_data; 1.0; Covered = s.covered, Features = s.features, Rate = s.rate); - + self.job_result + .send_direct( + JobResultData::CoverageData, + HashMap::from([ + ("covered".to_string(), s.covered as f64), + ("features".to_string(), s.features as f64), + ("rate".to_string(), s.rate), + ]), + ) + .await; Ok(()) } diff --git a/src/agent/onefuzz-task/src/tasks/fuzz/generator.rs b/src/agent/onefuzz-task/src/tasks/fuzz/generator.rs index d9116a1ed..bd7511cac 100644 --- a/src/agent/onefuzz-task/src/tasks/fuzz/generator.rs +++ b/src/agent/onefuzz-task/src/tasks/fuzz/generator.rs @@ -73,6 +73,7 @@ impl GeneratorTask { } let hb_client = self.config.common.init_heartbeat(None).await?; + let jr_client = self.config.common.init_job_result().await?; for dir in &self.config.readonly_inputs { dir.init_pull().await?; @@ -84,7 +85,10 @@ impl GeneratorTask { self.config.ensemble_sync_delay, ); - let crash_dir_monitor = self.config.crashes.monitor_results(new_result, false); + let crash_dir_monitor = self + .config + .crashes + .monitor_results(new_result, false, &jr_client); let fuzzer = self.fuzzing_loop(hb_client); @@ -298,6 +302,7 @@ mod tests { task_id: Default::default(), instance_id: Default::default(), heartbeat_queue: Default::default(), + job_result_queue: Default::default(), instance_telemetry_key: Default::default(), microsoft_telemetry_key: Default::default(), logs: Default::default(), diff --git a/src/agent/onefuzz-task/src/tasks/fuzz/libfuzzer/common.rs b/src/agent/onefuzz-task/src/tasks/fuzz/libfuzzer/common.rs index 4f8c67ae8..bfd9f3f5c 100644 --- a/src/agent/onefuzz-task/src/tasks/fuzz/libfuzzer/common.rs +++ b/src/agent/onefuzz-task/src/tasks/fuzz/libfuzzer/common.rs @@ -1,7 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender, utils::default_bool_true}; +use crate::tasks::{ + config::CommonConfig, + heartbeat::{HeartbeatSender, TaskHeartbeatClient}, + utils::default_bool_true, +}; use anyhow::{Context, Result}; use arraydeque::{ArrayDeque, Wrapping}; use async_trait::async_trait; @@ -12,6 +16,7 @@ use onefuzz::{ process::ExitStatus, syncdir::{continuous_sync, SyncOperation::Pull, SyncedDir}, }; +use onefuzz_result::job_result::{JobResultData, JobResultSender, TaskJobResultClient}; use onefuzz_telemetry::{ Event::{new_coverage, new_crashdump, new_result, runtime_stats}, EventData, @@ -126,21 +131,31 @@ where self.verify().await?; let hb_client = self.config.common.init_heartbeat(None).await?; + let jr_client = self.config.common.init_job_result().await?; // To be scheduled. let resync = self.continuous_sync_inputs(); - let new_inputs = self.config.inputs.monitor_results(new_coverage, true); - let new_crashes = self.config.crashes.monitor_results(new_result, true); + + let new_inputs = self + .config + .inputs + .monitor_results(new_coverage, true, &jr_client); + let new_crashes = self + .config + .crashes + .monitor_results(new_result, true, &jr_client); let new_crashdumps = async { if let Some(crashdumps) = &self.config.crashdumps { - crashdumps.monitor_results(new_crashdump, true).await + crashdumps + .monitor_results(new_crashdump, true, &jr_client) + .await } else { Ok(()) } }; let (stats_sender, stats_receiver) = mpsc::unbounded_channel(); - let report_stats = report_runtime_stats(stats_receiver, hb_client); + let report_stats = report_runtime_stats(stats_receiver, &hb_client, &jr_client); let fuzzers = self.run_fuzzers(Some(&stats_sender)); futures::try_join!( resync, @@ -183,7 +198,7 @@ where .inputs .local_path .parent() - .ok_or_else(|| anyhow!("Invalid input path"))?; + .ok_or_else(|| anyhow!("invalid input path"))?; let temp_path = task_dir.join(".temp"); tokio::fs::create_dir_all(&temp_path).await?; let temp_dir = tempdir_in(temp_path)?; @@ -501,7 +516,7 @@ impl TotalStats { self.execs_sec = self.worker_stats.values().map(|x| x.execs_sec).sum(); } - fn report(&self) { + async fn report(&self, jr_client: &Option) { event!( runtime_stats; EventData::Count = self.count, @@ -513,6 +528,17 @@ impl TotalStats { EventData::Count = self.count, EventData::ExecsSecond = self.execs_sec ); + if let Some(jr_client) = jr_client { + let _ = jr_client + .send_direct( + JobResultData::RuntimeStats, + HashMap::from([ + ("total_count".to_string(), self.count as f64), + ("execs_sec".to_string(), self.execs_sec), + ]), + ) + .await; + } } } @@ -542,7 +568,8 @@ impl Timer { // are approximating nearest-neighbor interpolation on the runtime stats time series. async fn report_runtime_stats( mut stats_channel: mpsc::UnboundedReceiver, - heartbeat_client: impl HeartbeatSender, + heartbeat_client: &Option, + jr_client: &Option, ) -> Result<()> { // Cache the last-reported stats for a given worker. // @@ -551,7 +578,7 @@ async fn report_runtime_stats( let mut total = TotalStats::default(); // report all zeros to start - total.report(); + total.report(jr_client).await; let timer = Timer::new(RUNTIME_STATS_PERIOD); @@ -560,10 +587,10 @@ async fn report_runtime_stats( Some(stats) = stats_channel.recv() => { heartbeat_client.alive(); total.update(stats); - total.report() + total.report(jr_client).await } _ = timer.wait() => { - total.report() + total.report(jr_client).await } } } diff --git a/src/agent/onefuzz-task/src/tasks/fuzz/supervisor.rs b/src/agent/onefuzz-task/src/tasks/fuzz/supervisor.rs index de1e1106b..3f00e20b8 100644 --- a/src/agent/onefuzz-task/src/tasks/fuzz/supervisor.rs +++ b/src/agent/onefuzz-task/src/tasks/fuzz/supervisor.rs @@ -79,7 +79,10 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { remote_path: config.crashes.remote_path.clone(), }; crashes.init().await?; - let monitor_crashes = crashes.monitor_results(new_result, false); + + let jr_client = config.common.init_job_result().await?; + + let monitor_crashes = crashes.monitor_results(new_result, false, &jr_client); // setup crashdumps let (crashdump_dir, monitor_crashdumps) = { @@ -95,9 +98,12 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { }; let monitor_dir = crashdump_dir.clone(); + let monitor_jr_client = config.common.init_job_result().await?; let monitor_crashdumps = async move { if let Some(crashdumps) = monitor_dir { - crashdumps.monitor_results(new_crashdump, false).await + crashdumps + .monitor_results(new_crashdump, false, &monitor_jr_client) + .await } else { Ok(()) } @@ -129,11 +135,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { if let Some(no_repro) = &config.no_repro { no_repro.init().await?; } + let monitor_reports_future = monitor_reports( reports_dir.path(), &config.unique_reports, &config.reports, &config.no_repro, + &jr_client, ); let inputs = SyncedDir { @@ -156,7 +164,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { delay_with_jitter(delay).await; } } - let monitor_inputs = inputs.monitor_results(new_coverage, false); + let monitor_inputs = inputs.monitor_results(new_coverage, false, &jr_client); let inputs_sync_cancellation = CancellationToken::new(); // never actually cancelled let inputs_sync_task = inputs.continuous_sync(Pull, config.ensemble_sync_delay, &inputs_sync_cancellation); @@ -444,6 +452,7 @@ mod tests { task_id: Default::default(), instance_id: Default::default(), heartbeat_queue: Default::default(), + job_result_queue: Default::default(), instance_telemetry_key: Default::default(), microsoft_telemetry_key: Default::default(), logs: Default::default(), diff --git a/src/agent/onefuzz-task/src/tasks/heartbeat.rs b/src/agent/onefuzz-task/src/tasks/heartbeat.rs index 515fa39d0..e13b66190 100644 --- a/src/agent/onefuzz-task/src/tasks/heartbeat.rs +++ b/src/agent/onefuzz-task/src/tasks/heartbeat.rs @@ -1,8 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use crate::onefuzz::heartbeat::HeartbeatClient; use anyhow::Result; +use onefuzz::heartbeat::HeartbeatClient; use reqwest::Url; use serde::{self, Deserialize, Serialize}; use std::time::Duration; diff --git a/src/agent/onefuzz-task/src/tasks/regression/common.rs b/src/agent/onefuzz-task/src/tasks/regression/common.rs index 60023cfa6..b61a97df4 100644 --- a/src/agent/onefuzz-task/src/tasks/regression/common.rs +++ b/src/agent/onefuzz-task/src/tasks/regression/common.rs @@ -2,12 +2,14 @@ // Licensed under the MIT License. use crate::tasks::{ + config::CommonConfig, heartbeat::{HeartbeatSender, TaskHeartbeatClient}, report::crash_report::{parse_report_file, CrashTestResult, RegressionReport}, }; use anyhow::{Context, Result}; use async_trait::async_trait; use onefuzz::syncdir::SyncedDir; +use onefuzz_result::job_result::TaskJobResultClient; use reqwest::Url; use std::path::PathBuf; @@ -24,7 +26,7 @@ pub trait RegressionHandler { /// Runs the regression task pub async fn run( - heartbeat_client: Option, + common_config: &CommonConfig, regression_reports: &SyncedDir, crashes: &SyncedDir, report_dirs: &[&SyncedDir], @@ -35,6 +37,9 @@ pub async fn run( info!("starting regression task"); regression_reports.init().await?; + let heartbeat_client = common_config.init_heartbeat(None).await?; + let job_result_client = common_config.init_job_result().await?; + handle_crash_reports( handler, crashes, @@ -42,6 +47,7 @@ pub async fn run( report_list, regression_reports, &heartbeat_client, + &job_result_client, ) .await .context("handling crash reports")?; @@ -52,6 +58,7 @@ pub async fn run( readonly_inputs, regression_reports, &heartbeat_client, + &job_result_client, ) .await .context("handling inputs")?; @@ -71,6 +78,7 @@ pub async fn handle_inputs( readonly_inputs: &SyncedDir, regression_reports: &SyncedDir, heartbeat_client: &Option, + job_result_client: &Option, ) -> Result<()> { readonly_inputs.init_pull().await?; let mut input_files = tokio::fs::read_dir(&readonly_inputs.local_path).await?; @@ -95,7 +103,7 @@ pub async fn handle_inputs( crash_test_result, original_crash_test_result: None, } - .save(None, regression_reports) + .save(None, regression_reports, job_result_client) .await? } @@ -109,6 +117,7 @@ pub async fn handle_crash_reports( report_list: &Option>, regression_reports: &SyncedDir, heartbeat_client: &Option, + job_result_client: &Option, ) -> Result<()> { // without crash report containers, skip this method if report_dirs.is_empty() { @@ -158,7 +167,7 @@ pub async fn handle_crash_reports( crash_test_result, original_crash_test_result: Some(original_crash_test_result), } - .save(Some(file_name), regression_reports) + .save(Some(file_name), regression_reports, job_result_client) .await? } } diff --git a/src/agent/onefuzz-task/src/tasks/regression/generic.rs b/src/agent/onefuzz-task/src/tasks/regression/generic.rs index 640e80db9..8570208d5 100644 --- a/src/agent/onefuzz-task/src/tasks/regression/generic.rs +++ b/src/agent/onefuzz-task/src/tasks/regression/generic.rs @@ -89,7 +89,6 @@ impl GenericRegressionTask { pub async fn run(&self) -> Result<()> { info!("Starting generic regression task"); - let heartbeat_client = self.config.common.init_heartbeat(None).await?; let mut report_dirs = vec![]; for dir in vec![ @@ -103,7 +102,7 @@ impl GenericRegressionTask { report_dirs.push(dir); } common::run( - heartbeat_client, + &self.config.common, &self.config.regression_reports, &self.config.crashes, &report_dirs, diff --git a/src/agent/onefuzz-task/src/tasks/regression/libfuzzer.rs b/src/agent/onefuzz-task/src/tasks/regression/libfuzzer.rs index 06dd7c00d..e65f46bb6 100644 --- a/src/agent/onefuzz-task/src/tasks/regression/libfuzzer.rs +++ b/src/agent/onefuzz-task/src/tasks/regression/libfuzzer.rs @@ -103,9 +103,8 @@ impl LibFuzzerRegressionTask { report_dirs.push(dir); } - let heartbeat_client = self.config.common.init_heartbeat(None).await?; common::run( - heartbeat_client, + &self.config.common, &self.config.regression_reports, &self.config.crashes, &report_dirs, diff --git a/src/agent/onefuzz-task/src/tasks/report/crash_report.rs b/src/agent/onefuzz-task/src/tasks/report/crash_report.rs index 23171bc43..290b98ccd 100644 --- a/src/agent/onefuzz-task/src/tasks/report/crash_report.rs +++ b/src/agent/onefuzz-task/src/tasks/report/crash_report.rs @@ -3,6 +3,7 @@ use anyhow::{Context, Result}; use onefuzz::{blob::BlobUrl, monitor::DirectoryMonitor, syncdir::SyncedDir}; +use onefuzz_result::job_result::{JobResultData, JobResultSender, TaskJobResultClient}; use onefuzz_telemetry::{ Event::{ new_report, new_unable_to_reproduce, new_unique_report, regression_report, @@ -12,6 +13,7 @@ use onefuzz_telemetry::{ }; use serde::{Deserialize, Serialize}; use stacktrace_parser::CrashLog; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use uuid::Uuid; @@ -111,6 +113,7 @@ impl RegressionReport { self, report_name: Option, regression_reports: &SyncedDir, + jr_client: &Option, ) -> Result<()> { let (event, name) = match &self.crash_test_result { CrashTestResult::CrashReport(report) => { @@ -126,6 +129,15 @@ impl RegressionReport { if upload_or_save_local(&self, &name, regression_reports).await? { event!(event; EventData::Path = name.clone()); metric!(event; 1.0; EventData::Path = name.clone()); + + if let Some(jr_client) = jr_client { + let _ = jr_client + .send_direct( + JobResultData::NewRegressionReport, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } } Ok(()) } @@ -149,6 +161,7 @@ impl CrashTestResult { unique_reports: &Option, reports: &Option, no_repro: &Option, + jr_client: &Option, ) -> Result<()> { match self { Self::CrashReport(report) => { @@ -158,6 +171,15 @@ impl CrashTestResult { if upload_or_save_local(&report, &name, unique_reports).await? { event!(new_unique_report; EventData::Path = report.unique_blob_name()); metric!(new_unique_report; 1.0; EventData::Path = report.unique_blob_name()); + + if let Some(jr_client) = jr_client { + let _ = jr_client + .send_direct( + JobResultData::NewUniqueReport, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } } } @@ -166,6 +188,15 @@ impl CrashTestResult { if upload_or_save_local(&report, &name, reports).await? { event!(new_report; EventData::Path = report.blob_name()); metric!(new_report; 1.0; EventData::Path = report.blob_name()); + + if let Some(jr_client) = jr_client { + let _ = jr_client + .send_direct( + JobResultData::NewReport, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } } } } @@ -176,6 +207,15 @@ impl CrashTestResult { if upload_or_save_local(&report, &name, no_repro).await? { event!(new_unable_to_reproduce; EventData::Path = report.blob_name()); metric!(new_unable_to_reproduce; 1.0; EventData::Path = report.blob_name()); + + if let Some(jr_client) = jr_client { + let _ = jr_client + .send_direct( + JobResultData::NoReproCrashingInput, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } } } } @@ -324,6 +364,7 @@ pub async fn monitor_reports( unique_reports: &Option, reports: &Option, no_crash: &Option, + jr_client: &Option, ) -> Result<()> { if unique_reports.is_none() && reports.is_none() && no_crash.is_none() { debug!("no report directories configured"); @@ -334,7 +375,9 @@ pub async fn monitor_reports( while let Some(file) = monitor.next_file().await? { let result = parse_report_file(file).await?; - result.save(unique_reports, reports, no_crash).await?; + result + .save(unique_reports, reports, no_crash, jr_client) + .await?; } Ok(()) diff --git a/src/agent/onefuzz-task/src/tasks/report/dotnet/generic.rs b/src/agent/onefuzz-task/src/tasks/report/dotnet/generic.rs index 9b626a7d8..b8659845d 100644 --- a/src/agent/onefuzz-task/src/tasks/report/dotnet/generic.rs +++ b/src/agent/onefuzz-task/src/tasks/report/dotnet/generic.rs @@ -8,16 +8,6 @@ use std::{ sync::Arc, }; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use onefuzz::expand::Expand; -use onefuzz::fs::set_executable; -use onefuzz::{blob::BlobUrl, sha256, syncdir::SyncedDir}; -use reqwest::Url; -use serde::Deserialize; -use storage_queue::{Message, QueueClient}; -use tokio::fs; - use crate::tasks::report::crash_report::*; use crate::tasks::report::dotnet::common::collect_exception_info; use crate::tasks::{ @@ -26,6 +16,16 @@ use crate::tasks::{ heartbeat::{HeartbeatSender, TaskHeartbeatClient}, utils::{default_bool_true, try_resolve_setup_relative_path}, }; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use onefuzz::expand::Expand; +use onefuzz::fs::set_executable; +use onefuzz::{blob::BlobUrl, sha256, syncdir::SyncedDir}; +use onefuzz_result::job_result::TaskJobResultClient; +use reqwest::Url; +use serde::Deserialize; +use storage_queue::{Message, QueueClient}; +use tokio::fs; const DOTNET_DUMP_TOOL_NAME: &str = "dotnet-dump"; @@ -114,15 +114,18 @@ impl DotnetCrashReportTask { pub struct AsanProcessor { config: Arc, heartbeat_client: Option, + job_result_client: Option, } impl AsanProcessor { pub async fn new(config: Arc) -> Result { let heartbeat_client = config.common.init_heartbeat(None).await?; + let job_result_client = config.common.init_job_result().await?; Ok(Self { config, heartbeat_client, + job_result_client, }) } @@ -260,6 +263,7 @@ impl Processor for AsanProcessor { &self.config.unique_reports, &self.config.reports, &self.config.no_repro, + &self.job_result_client, ) .await; diff --git a/src/agent/onefuzz-task/src/tasks/report/generic.rs b/src/agent/onefuzz-task/src/tasks/report/generic.rs index 9088f98ac..8ad259f0a 100644 --- a/src/agent/onefuzz-task/src/tasks/report/generic.rs +++ b/src/agent/onefuzz-task/src/tasks/report/generic.rs @@ -13,6 +13,7 @@ use async_trait::async_trait; use onefuzz::{ blob::BlobUrl, input_tester::Tester, machine_id::MachineIdentity, sha256, syncdir::SyncedDir, }; +use onefuzz_result::job_result::TaskJobResultClient; use reqwest::Url; use serde::Deserialize; use std::{ @@ -73,7 +74,9 @@ impl ReportTask { pub async fn managed_run(&mut self) -> Result<()> { info!("Starting generic crash report task"); let heartbeat_client = self.config.common.init_heartbeat(None).await?; - let mut processor = GenericReportProcessor::new(&self.config, heartbeat_client); + let job_result_client = self.config.common.init_job_result().await?; + let mut processor = + GenericReportProcessor::new(&self.config, heartbeat_client, job_result_client); #[allow(clippy::manual_flatten)] for entry in [ @@ -183,13 +186,19 @@ pub async fn test_input(args: TestInputArgs<'_>) -> Result { pub struct GenericReportProcessor<'a> { config: &'a Config, heartbeat_client: Option, + job_result_client: Option, } impl<'a> GenericReportProcessor<'a> { - pub fn new(config: &'a Config, heartbeat_client: Option) -> Self { + pub fn new( + config: &'a Config, + heartbeat_client: Option, + job_result_client: Option, + ) -> Self { Self { config, heartbeat_client, + job_result_client, } } @@ -239,6 +248,7 @@ impl<'a> Processor for GenericReportProcessor<'a> { &self.config.unique_reports, &self.config.reports, &self.config.no_repro, + &self.job_result_client, ) .await .context("saving report failed") diff --git a/src/agent/onefuzz-task/src/tasks/report/libfuzzer_report.rs b/src/agent/onefuzz-task/src/tasks/report/libfuzzer_report.rs index f18f638fa..587ed2e3d 100644 --- a/src/agent/onefuzz-task/src/tasks/report/libfuzzer_report.rs +++ b/src/agent/onefuzz-task/src/tasks/report/libfuzzer_report.rs @@ -13,6 +13,7 @@ use async_trait::async_trait; use onefuzz::{ blob::BlobUrl, libfuzzer::LibFuzzer, machine_id::MachineIdentity, sha256, syncdir::SyncedDir, }; +use onefuzz_result::job_result::TaskJobResultClient; use reqwest::Url; use serde::Deserialize; use std::{ @@ -196,15 +197,18 @@ pub async fn test_input(args: TestInputArgs<'_>) -> Result { pub struct AsanProcessor { config: Arc, heartbeat_client: Option, + job_result_client: Option, } impl AsanProcessor { pub async fn new(config: Arc) -> Result { let heartbeat_client = config.common.init_heartbeat(None).await?; + let job_result_client = config.common.init_job_result().await?; Ok(Self { config, heartbeat_client, + job_result_client, }) } @@ -257,6 +261,7 @@ impl Processor for AsanProcessor { &self.config.unique_reports, &self.config.reports, &self.config.no_repro, + &self.job_result_client, ) .await } diff --git a/src/agent/onefuzz/Cargo.toml b/src/agent/onefuzz/Cargo.toml index c096c8ddf..1f3c27985 100644 --- a/src/agent/onefuzz/Cargo.toml +++ b/src/agent/onefuzz/Cargo.toml @@ -44,6 +44,7 @@ tempfile = "3.7.0" process_control = "4.0" reqwest-retry = { path = "../reqwest-retry" } onefuzz-telemetry = { path = "../onefuzz-telemetry" } +onefuzz-result = { path = "../onefuzz-result" } stacktrace-parser = { path = "../stacktrace-parser" } backoff = { version = "0.4", features = ["tokio"] } diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index 025209956..2e73b7a69 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -11,10 +11,12 @@ use crate::{ }; use anyhow::{Context, Result}; use dunce::canonicalize; +use onefuzz_result::job_result::{JobResultData, JobResultSender, TaskJobResultClient}; use onefuzz_telemetry::{Event, EventData}; use reqwest::{StatusCode, Url}; use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::{env::current_dir, path::PathBuf, str, time::Duration}; use tokio::{fs, select}; use tokio_util::sync::CancellationToken; @@ -241,6 +243,7 @@ impl SyncedDir { url: BlobContainerUrl, event: Event, ignore_dotfiles: bool, + jr_client: &Option, ) -> Result<()> { debug!("monitoring {}", path.display()); @@ -265,9 +268,39 @@ impl SyncedDir { if ignore_dotfiles && file_name_event_str.starts_with('.') { continue; } - event!(event.clone(); EventData::Path = file_name_event_str); metric!(event.clone(); 1.0; EventData::Path = file_name_str_metric_str); + if let Some(jr_client) = jr_client { + match event { + Event::new_result => { + jr_client + .send_direct( + JobResultData::NewCrashingInput, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } + Event::new_coverage => { + jr_client + .send_direct( + JobResultData::CoverageData, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } + Event::new_crashdump => { + jr_client + .send_direct( + JobResultData::NewCrashDump, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } + _ => { + warn!("Unhandled job result!"); + } + } + } let destination = path.join(file_name); if let Err(err) = fs::copy(&item, &destination).await { let error_message = format!( @@ -305,6 +338,29 @@ impl SyncedDir { event!(event.clone(); EventData::Path = file_name_event_str); metric!(event.clone(); 1.0; EventData::Path = file_name_str_metric_str); + if let Some(jr_client) = jr_client { + match event { + Event::new_result => { + jr_client + .send_direct( + JobResultData::NewCrashingInput, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } + Event::new_coverage => { + jr_client + .send_direct( + JobResultData::CoverageData, + HashMap::from([("count".to_string(), 1.0)]), + ) + .await; + } + _ => { + warn!("Unhandled job result!"); + } + } + } if let Err(err) = uploader.upload(item.clone()).await { let error_message = format!( "Couldn't upload file. path:{} dir:{} err:{:?}", @@ -336,7 +392,12 @@ impl SyncedDir { /// The intent of this is to support use cases where we usually want a directory /// to be initialized, but a user-supplied binary, (such as AFL) logically owns /// a directory, and may reset it. - pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> { + pub async fn monitor_results( + &self, + event: Event, + ignore_dotfiles: bool, + job_result_client: &Option, + ) -> Result<()> { if let Some(url) = self.remote_path.clone() { loop { debug!("waiting to monitor {}", self.local_path.display()); @@ -355,6 +416,7 @@ impl SyncedDir { url.clone(), event.clone(), ignore_dotfiles, + job_result_client, ) .await?; } diff --git a/src/deployment/bicep-templates/storageAccounts.bicep b/src/deployment/bicep-templates/storageAccounts.bicep index 6a96cea6a..27f2da21d 100644 --- a/src/deployment/bicep-templates/storageAccounts.bicep +++ b/src/deployment/bicep-templates/storageAccounts.bicep @@ -33,7 +33,7 @@ var storageAccountFuncQueuesParams = [ 'update-queue' 'webhooks' 'signalr-events' - 'custom-metrics' + 'job-result' ] var fileChangesQueueIndex = 0