Job Result Round #2 (#3380)

* Attempting new background process.

* Formatting.

* Adding JobResultOperations.

* Fixing.

* Adding strorage tests.

* Still trying.

* Adding.

* Adding to rest of queue.

* Adding arg.

* Fixing.

* Updating job result.

* Adding cargo lock.

* Restoring queuetaskheartbeat.

* Fixing job format.

* Properly naming function.

* Adding to program.cs.

* Removing uning unneccessary code.

* More small fixes.

* Adding regular crash results.

* Resolving issues.

* More fixes.

* Debugging crashing input.

* Adding

* Remove

* Fixed.

* Handling other cases.

* Adding value to data.

* Adding values to queue messages.

* Adidng values.

* Adidng await.

* Adding runtimestats.

* Fixing runtime_stats.

* Updating types.

* Fixing types.

* Resolve.

* Resolve type.

* Updading onefuzz-result cargo.

* Responding to comments.

* More comments.

* Removing unused params.

* Respond to more comments.

* Updating cargo.

* Fixing return type.

* UPdating keys."

* UPdating return type.

* Updating JobResult with constructor.

* Remove debug logging.

* Adding warning.

* Making generic into TaskContext.

* Adding crash dump.

* Updating for crash dumps.

* Formatting.

* Updating to warn.

* Formatting.

* Formatting.

* Borrowing new client.
This commit is contained in:
Noah McGregor Harper
2023-08-18 10:01:32 -07:00
committed by GitHub
parent bdb2f1337d
commit 804b2993b7
33 changed files with 672 additions and 41 deletions

View File

@ -0,0 +1,60 @@
using System.Text.Json;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service.Functions;
public class QueueJobResult {
private readonly ILogger _log;
private readonly IOnefuzzContext _context;
public QueueJobResult(ILogger<QueueJobResult> logTracer, IOnefuzzContext context) {
_log = logTracer;
_context = context;
}
[Function("QueueJobResult")]
public async Async.Task Run([QueueTrigger("job-result", Connection = "AzureWebJobsStorage")] string msg) {
var _tasks = _context.TaskOperations;
var _jobs = _context.JobOperations;
_log.LogInformation("job result: {msg}", msg);
var jr = JsonSerializer.Deserialize<TaskJobResultEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");
var task = await _tasks.GetByTaskId(jr.TaskId);
if (task == null) {
_log.LogWarning("invalid {TaskId}", jr.TaskId);
return;
}
var job = await _jobs.Get(task.JobId);
if (job == null) {
_log.LogWarning("invalid {JobId}", task.JobId);
return;
}
JobResultData? data = jr.Data;
if (data == null) {
_log.LogWarning($"job result data is empty, throwing out: {jr}");
return;
}
var jobResultType = data.Type;
_log.LogInformation($"job result data type: {jobResultType}");
Dictionary<string, double> value;
if (jr.Value.Count > 0) {
value = jr.Value;
} else {
_log.LogWarning($"job result data is empty, throwing out: {jr}");
return;
}
var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jobResultType, value);
if (!jobResult.IsOk) {
_log.LogError("failed to create or update with job result {JobId}", job.JobId);
}
}
}

View File

@ -33,6 +33,19 @@ public enum HeartbeatType {
TaskAlive,
}
[SkipRename]
public enum JobResultType {
NewCrashingInput,
NoReproCrashingInput,
NewReport,
NewUniqueReport,
NewRegressionReport,
NewCoverage,
NewCrashDump,
CoverageData,
RuntimeStats,
}
public record HeartbeatData(HeartbeatType Type);
public record TaskHeartbeatEntry(
@ -41,6 +54,16 @@ public record TaskHeartbeatEntry(
Guid MachineId,
HeartbeatData[] Data);
public record JobResultData(JobResultType Type);
public record TaskJobResultEntry(
Guid TaskId,
Guid? JobId,
Guid MachineId,
JobResultData Data,
Dictionary<string, double> Value
);
public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] Data);
public record NodeCommandStopIfFree();
@ -892,6 +915,27 @@ public record SecretAddress<T>(Uri Url) : ISecret<T> {
public record SecretData<T>(ISecret<T> Secret) {
}
public record JobResult(
[PartitionKey][RowKey] Guid JobId,
string Project,
string Name,
double NewCrashingInput = 0,
double NoReproCrashingInput = 0,
double NewReport = 0,
double NewUniqueReport = 0,
double NewRegressionReport = 0,
double NewCrashDump = 0,
double InstructionsCovered = 0,
double TotalInstructions = 0,
double CoverageRate = 0,
double IterationCount = 0
) : EntityBase() {
public JobResult(Guid JobId, string Project, string Name) : this(
JobId: JobId,
Project: Project,
Name: Name, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) { }
}
public record JobConfig(
string Project,
string Name,
@ -1056,6 +1100,7 @@ public record TaskUnitConfig(
string? InstanceTelemetryKey,
string? MicrosoftTelemetryKey,
Uri HeartbeatQueue,
Uri JobResultQueue,
Dictionary<string, string> Tags
) {
public Uri? inputQueue { get; set; }

View File

@ -118,6 +118,7 @@ public class Program {
.AddScoped<IVmOperations, VmOperations>()
.AddScoped<ISecretsOperations, SecretsOperations>()
.AddScoped<IJobOperations, JobOperations>()
.AddScoped<IJobResultOperations, JobResultOperations>()
.AddScoped<INsgOperations, NsgOperations>()
.AddScoped<IScheduler, Scheduler>()
.AddScoped<IConfig, Config>()

View File

@ -71,6 +71,7 @@ public class Config : IConfig {
InstanceTelemetryKey: _serviceConfig.ApplicationInsightsInstrumentationKey,
MicrosoftTelemetryKey: _serviceConfig.OneFuzzTelemetry,
HeartbeatQueue: await _queue.GetQueueSas("task-heartbeat", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas"),
JobResultQueue: await _queue.GetQueueSas("job-result", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas"),
Tags: task.Config.Tags ?? new Dictionary<string, string>()
);

View File

@ -0,0 +1,125 @@
using ApiService.OneFuzzLib.Orm;
using Microsoft.Extensions.Logging;
using Polly;
namespace Microsoft.OneFuzz.Service;
public interface IJobResultOperations : IOrm<JobResult> {
Async.Task<JobResult?> GetJobResult(Guid jobId);
Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary<string, double> resultValue);
}
public class JobResultOperations : Orm<JobResult>, IJobResultOperations {
public JobResultOperations(ILogger<JobResultOperations> log, IOnefuzzContext context)
: base(log, context) {
}
public async Async.Task<JobResult?> GetJobResult(Guid jobId) {
return await SearchByPartitionKeys(new[] { jobId.ToString() }).SingleOrDefaultAsync();
}
private JobResult UpdateResult(JobResult result, JobResultType type, Dictionary<string, double> resultValue) {
var newResult = result;
double newValue;
switch (type) {
case JobResultType.NewCrashingInput:
newValue = result.NewCrashingInput + resultValue["count"];
newResult = result with { NewCrashingInput = newValue };
break;
case JobResultType.NewReport:
newValue = result.NewReport + resultValue["count"];
newResult = result with { NewReport = newValue };
break;
case JobResultType.NewUniqueReport:
newValue = result.NewUniqueReport + resultValue["count"];
newResult = result with { NewUniqueReport = newValue };
break;
case JobResultType.NewRegressionReport:
newValue = result.NewRegressionReport + resultValue["count"];
newResult = result with { NewRegressionReport = newValue };
break;
case JobResultType.NewCrashDump:
newValue = result.NewCrashDump + resultValue["count"];
newResult = result with { NewCrashDump = newValue };
break;
case JobResultType.CoverageData:
double newCovered = resultValue["covered"];
double newTotalCovered = resultValue["features"];
double newCoverageRate = resultValue["rate"];
newResult = result with { InstructionsCovered = newCovered, TotalInstructions = newTotalCovered, CoverageRate = newCoverageRate };
break;
case JobResultType.RuntimeStats:
double newTotalIterations = resultValue["total_count"];
newResult = result with { IterationCount = newTotalIterations };
break;
default:
_logTracer.LogWarning($"Invalid Field {type}.");
break;
}
_logTracer.LogInformation($"Attempting to log new result: {newResult}");
return newResult;
}
private async Async.Task<bool> TryUpdate(Job job, JobResultType resultType, Dictionary<string, double> resultValue) {
var jobId = job.JobId;
var jobResult = await GetJobResult(jobId);
if (jobResult == null) {
_logTracer.LogInformation("Creating new JobResult for Job {JobId}", jobId);
var entry = new JobResult(JobId: jobId, Project: job.Config.Project, Name: job.Config.Name);
jobResult = UpdateResult(entry, resultType, resultValue);
var r = await Insert(jobResult);
if (!r.IsOk) {
_logTracer.AddHttpStatus(r.ErrorV);
_logTracer.LogError("failed to insert job result {JobId}", jobResult.JobId);
throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}");
}
_logTracer.LogInformation("created job result {JobId}", jobResult.JobId);
} else {
_logTracer.LogInformation("Updating existing JobResult entry for Job {JobId}", jobId);
jobResult = UpdateResult(jobResult, resultType, resultValue);
var r = await Update(jobResult);
if (!r.IsOk) {
_logTracer.AddHttpStatus(r.ErrorV);
_logTracer.LogError("failed to update job result {JobId}", jobResult.JobId);
throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}");
}
_logTracer.LogInformation("updated job result {JobId}", jobResult.JobId);
}
return true;
}
public async Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary<string, double> resultValue) {
var job = await _context.JobOperations.Get(jobId);
if (job == null) {
return OneFuzzResultVoid.Error(ErrorCode.INVALID_REQUEST, "invalid job");
}
var success = false;
try {
_logTracer.LogInformation("attempt to update job result {JobId}", job.JobId);
var policy = Policy.Handle<InvalidOperationException>().WaitAndRetryAsync(50, _ => new TimeSpan(0, 0, 5));
await policy.ExecuteAsync(async () => {
success = await TryUpdate(job, resultType, resultValue);
_logTracer.LogInformation("attempt {success}", success);
});
return OneFuzzResultVoid.Ok;
} catch (Exception e) {
return OneFuzzResultVoid.Error(ErrorCode.UNABLE_TO_UPDATE, new string[] {
$"Unexpected failure when attempting to update job result for {job.JobId}",
$"Exception: {e}"
});
}
}
}

View File

@ -19,6 +19,7 @@ public interface IOnefuzzContext {
IExtensions Extensions { get; }
IIpOperations IpOperations { get; }
IJobOperations JobOperations { get; }
IJobResultOperations JobResultOperations { get; }
ILogAnalytics LogAnalytics { get; }
INodeMessageOperations NodeMessageOperations { get; }
INodeOperations NodeOperations { get; }
@ -83,6 +84,7 @@ public class OnefuzzContext : IOnefuzzContext {
public IVmOperations VmOperations => _serviceProvider.GetRequiredService<IVmOperations>();
public ISecretsOperations SecretsOperations => _serviceProvider.GetRequiredService<ISecretsOperations>();
public IJobOperations JobOperations => _serviceProvider.GetRequiredService<IJobOperations>();
public IJobResultOperations JobResultOperations => _serviceProvider.GetRequiredService<IJobResultOperations>();
public IScheduler Scheduler => _serviceProvider.GetRequiredService<IScheduler>();
public IConfig Config => _serviceProvider.GetRequiredService<IConfig>();
public ILogAnalytics LogAnalytics => _serviceProvider.GetRequiredService<ILogAnalytics>();

View File

@ -32,6 +32,7 @@ public sealed class TestContext : IOnefuzzContext {
TaskOperations = new TaskOperations(provider.CreateLogger<TaskOperations>(), Cache, this);
NodeOperations = new NodeOperations(provider.CreateLogger<NodeOperations>(), this);
JobOperations = new JobOperations(provider.CreateLogger<JobOperations>(), this);
JobResultOperations = new JobResultOperations(provider.CreateLogger<JobResultOperations>(), this);
NodeTasksOperations = new NodeTasksOperations(provider.CreateLogger<NodeTasksOperations>(), this);
TaskEventOperations = new TaskEventOperations(provider.CreateLogger<TaskEventOperations>(), this);
NodeMessageOperations = new NodeMessageOperations(provider.CreateLogger<NodeMessageOperations>(), this);
@ -57,6 +58,7 @@ public sealed class TestContext : IOnefuzzContext {
Node n => NodeOperations.Insert(n),
Pool p => PoolOperations.Insert(p),
Job j => JobOperations.Insert(j),
JobResult jr => JobResultOperations.Insert(jr),
Repro r => ReproOperations.Insert(r),
Scaleset ss => ScalesetOperations.Insert(ss),
NodeTasks nt => NodeTasksOperations.Insert(nt),
@ -84,6 +86,7 @@ public sealed class TestContext : IOnefuzzContext {
public ITaskOperations TaskOperations { get; }
public IJobOperations JobOperations { get; }
public IJobResultOperations JobResultOperations { get; }
public INodeOperations NodeOperations { get; }
public INodeTasksOperations NodeTasksOperations { get; }
public ITaskEventOperations TaskEventOperations { get; }

16
src/agent/Cargo.lock generated
View File

@ -2123,6 +2123,7 @@ dependencies = [
"log",
"nix",
"notify",
"onefuzz-result",
"onefuzz-telemetry",
"pete",
"pretty_assertions",
@ -2197,6 +2198,20 @@ dependencies = [
"serde_json",
]
[[package]]
name = "onefuzz-result"
version = "0.2.0"
dependencies = [
"anyhow",
"async-trait",
"log",
"onefuzz-telemetry",
"reqwest",
"serde",
"storage-queue",
"uuid",
]
[[package]]
name = "onefuzz-task"
version = "0.2.0"
@ -2226,6 +2241,7 @@ dependencies = [
"num_cpus",
"onefuzz",
"onefuzz-file-format",
"onefuzz-result",
"onefuzz-telemetry",
"path-absolutize",
"pretty_assertions",

View File

@ -10,6 +10,7 @@ members = [
"onefuzz",
"onefuzz-task",
"onefuzz-agent",
"onefuzz-result",
"onefuzz-file-format",
"onefuzz-telemetry",
"reqwest-retry",

View File

@ -34,6 +34,8 @@ pub struct StaticConfig {
pub heartbeat_queue: Option<Url>,
pub job_result_queue: Option<Url>,
pub instance_id: Uuid,
#[serde(default = "default_as_true")]
@ -71,6 +73,8 @@ struct RawStaticConfig {
pub heartbeat_queue: Option<Url>,
pub job_result_queue: Option<Url>,
pub instance_id: Uuid,
#[serde(default = "default_as_true")]
@ -117,6 +121,7 @@ impl StaticConfig {
microsoft_telemetry_key: config.microsoft_telemetry_key,
instance_telemetry_key: config.instance_telemetry_key,
heartbeat_queue: config.heartbeat_queue,
job_result_queue: config.job_result_queue,
instance_id: config.instance_id,
managed: config.managed,
machine_identity,
@ -152,6 +157,12 @@ impl StaticConfig {
None
};
let job_result_queue = if let Ok(key) = std::env::var("ONEFUZZ_JOB_RESULT") {
Some(Url::parse(&key)?)
} else {
None
};
let instance_telemetry_key =
if let Ok(key) = std::env::var("ONEFUZZ_INSTANCE_TELEMETRY_KEY") {
Some(InstanceTelemetryKey::new(Uuid::parse_str(&key)?))
@ -183,6 +194,7 @@ impl StaticConfig {
instance_telemetry_key,
microsoft_telemetry_key,
heartbeat_queue,
job_result_queue,
instance_id,
managed: !is_unmanaged,
machine_identity,

View File

@ -0,0 +1,18 @@
[package]
name = "onefuzz-result"
version = "0.2.0"
authors = ["fuzzing@microsoft.com"]
edition = "2021"
publish = false
license = "MIT"
[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-trait = "0.1"
reqwest = "0.11"
serde = "1.0"
storage-queue = { path = "../storage-queue" }
uuid = { version = "1.4", features = ["serde", "v4"] }
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
log = "0.4"

View File

@ -0,0 +1,129 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use anyhow::Result;
use async_trait::async_trait;
use onefuzz_telemetry::warn;
use reqwest::Url;
use serde::{self, Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use storage_queue::QueueClient;
use uuid::Uuid;
#[derive(Debug, Deserialize, Serialize, Hash, Eq, PartialEq, Clone)]
#[serde(tag = "type")]
pub enum JobResultData {
NewCrashingInput,
NoReproCrashingInput,
NewReport,
NewUniqueReport,
NewRegressionReport,
NewCoverage,
NewCrashDump,
CoverageData,
RuntimeStats,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct JobResult {
task_id: Uuid,
job_id: Uuid,
machine_id: Uuid,
machine_name: String,
data: JobResultData,
value: HashMap<String, f64>,
}
#[derive(Clone)]
pub struct TaskContext {
task_id: Uuid,
job_id: Uuid,
machine_id: Uuid,
machine_name: String,
}
pub struct JobResultContext<TaskContext> {
pub state: TaskContext,
pub queue_client: QueueClient,
}
pub struct JobResultClient<TaskContext> {
pub context: Arc<JobResultContext<TaskContext>>,
}
impl<TaskContext> JobResultClient<TaskContext> {
pub fn init_job_result(
context: TaskContext,
queue_url: Url,
) -> Result<JobResultClient<TaskContext>>
where
TaskContext: Send + Sync + 'static,
{
let context = Arc::new(JobResultContext {
state: context,
queue_client: QueueClient::new(queue_url)?,
});
Ok(JobResultClient { context })
}
}
pub type TaskJobResultClient = JobResultClient<TaskContext>;
pub async fn init_job_result(
queue_url: Url,
task_id: Uuid,
job_id: Uuid,
machine_id: Uuid,
machine_name: String,
) -> Result<TaskJobResultClient> {
let hb = JobResultClient::init_job_result(
TaskContext {
task_id,
job_id,
machine_id,
machine_name,
},
queue_url,
)?;
Ok(hb)
}
#[async_trait]
pub trait JobResultSender {
async fn send_direct(&self, data: JobResultData, value: HashMap<String, f64>);
}
#[async_trait]
impl JobResultSender for TaskJobResultClient {
async fn send_direct(&self, data: JobResultData, value: HashMap<String, f64>) {
let task_id = self.context.state.task_id;
let job_id = self.context.state.job_id;
let machine_id = self.context.state.machine_id;
let machine_name = self.context.state.machine_name.clone();
let _ = self
.context
.queue_client
.enqueue(JobResult {
task_id,
job_id,
machine_id,
machine_name,
data,
value,
})
.await;
}
}
#[async_trait]
impl JobResultSender for Option<TaskJobResultClient> {
async fn send_direct(&self, data: JobResultData, value: HashMap<String, f64>) {
match self {
Some(client) => client.send_direct(data, value).await,
None => warn!("Failed to send Job Result message data from agent."),
}
}
}

View File

@ -0,0 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
pub mod job_result;

View File

@ -39,6 +39,7 @@ serde_json = "1.0"
serde_yaml = "0.9.21"
onefuzz = { path = "../onefuzz" }
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
onefuzz-result = { path = "../onefuzz-result" }
path-absolutize = "3.1"
reqwest-retry = { path = "../reqwest-retry" }
strum = "0.25"

View File

@ -265,6 +265,7 @@ pub async fn build_local_context(
},
instance_telemetry_key: None,
heartbeat_queue: None,
job_result_queue: None,
microsoft_telemetry_key: None,
logs: None,
min_available_memory_mb: 0,

View File

@ -196,6 +196,7 @@ pub async fn launch(
job_id: Uuid::new_v4(),
instance_id: Uuid::new_v4(),
heartbeat_queue: None,
job_result_queue: None,
instance_telemetry_key: None,
microsoft_telemetry_key: None,
logs: None,

View File

@ -65,6 +65,8 @@ pub async fn run(config: Config) -> Result<()> {
tools.init_pull().await?;
}
let job_result_client = config.common.init_job_result().await?;
// the tempdir is always created, however, the reports_path and
// reports_monitor_future are only created if we have one of the three
// report SyncedDir. The idea is that the option for where to write reports
@ -88,6 +90,7 @@ pub async fn run(config: Config) -> Result<()> {
&config.unique_reports,
&config.reports,
&config.no_repro,
&job_result_client,
);
(
Some(reports_dir.path().to_path_buf()),

View File

@ -14,6 +14,7 @@ use onefuzz::{
machine_id::MachineIdentity,
syncdir::{SyncOperation, SyncedDir},
};
use onefuzz_result::job_result::{init_job_result, TaskJobResultClient};
use onefuzz_telemetry::{
self as telemetry, Event::task_start, EventData, InstanceTelemetryKey, MicrosoftTelemetryKey,
Role,
@ -50,6 +51,8 @@ pub struct CommonConfig {
pub heartbeat_queue: Option<Url>,
pub job_result_queue: Option<Url>,
pub instance_telemetry_key: Option<InstanceTelemetryKey>,
pub microsoft_telemetry_key: Option<MicrosoftTelemetryKey>,
@ -103,6 +106,23 @@ impl CommonConfig {
None => Ok(None),
}
}
pub async fn init_job_result(&self) -> Result<Option<TaskJobResultClient>> {
match &self.job_result_queue {
Some(url) => {
let result = init_job_result(
url.clone(),
self.task_id,
self.job_id,
self.machine_identity.machine_id,
self.machine_identity.machine_name.clone(),
)
.await?;
Ok(Some(result))
}
None => Ok(None),
}
}
}
#[derive(Debug, Deserialize)]

View File

@ -26,6 +26,8 @@ use onefuzz_file_format::coverage::{
binary::{v1::BinaryCoverageJson as BinaryCoverageJsonV1, BinaryCoverageJson},
source::{v1::SourceCoverageJson as SourceCoverageJsonV1, SourceCoverageJson},
};
use onefuzz_result::job_result::JobResultData;
use onefuzz_result::job_result::{JobResultSender, TaskJobResultClient};
use onefuzz_telemetry::{event, warn, Event::coverage_data, Event::coverage_failed, EventData};
use storage_queue::{Message, QueueClient};
use tokio::fs;
@ -114,7 +116,7 @@ impl CoverageTask {
let allowlist = self.load_target_allowlist().await?;
let heartbeat = self.config.common.init_heartbeat(None).await?;
let job_result = self.config.common.init_job_result().await?;
let mut seen_inputs = false;
let target_exe_path =
@ -129,6 +131,7 @@ impl CoverageTask {
coverage,
allowlist,
heartbeat,
job_result,
target_exe.to_string(),
)?;
@ -219,6 +222,7 @@ struct TaskContext<'a> {
module_allowlist: AllowList,
source_allowlist: Arc<AllowList>,
heartbeat: Option<TaskHeartbeatClient>,
job_result: Option<TaskJobResultClient>,
cache: Arc<DebugInfoCache>,
}
@ -228,6 +232,7 @@ impl<'a> TaskContext<'a> {
coverage: BinaryCoverage,
allowlist: TargetAllowList,
heartbeat: Option<TaskHeartbeatClient>,
job_result: Option<TaskJobResultClient>,
target_exe: String,
) -> Result<Self> {
let cache = DebugInfoCache::new(allowlist.source_files.clone());
@ -247,6 +252,7 @@ impl<'a> TaskContext<'a> {
module_allowlist: allowlist.modules,
source_allowlist: Arc::new(allowlist.source_files),
heartbeat,
job_result,
cache: Arc::new(cache),
})
}
@ -455,7 +461,16 @@ impl<'a> TaskContext<'a> {
let s = CoverageStats::new(&coverage);
event!(coverage_data; Covered = s.covered, Features = s.features, Rate = s.rate);
metric!(coverage_data; 1.0; Covered = s.covered, Features = s.features, Rate = s.rate);
self.job_result
.send_direct(
JobResultData::CoverageData,
HashMap::from([
("covered".to_string(), s.covered as f64),
("features".to_string(), s.features as f64),
("rate".to_string(), s.rate),
]),
)
.await;
Ok(())
}

View File

@ -73,6 +73,7 @@ impl GeneratorTask {
}
let hb_client = self.config.common.init_heartbeat(None).await?;
let jr_client = self.config.common.init_job_result().await?;
for dir in &self.config.readonly_inputs {
dir.init_pull().await?;
@ -84,7 +85,10 @@ impl GeneratorTask {
self.config.ensemble_sync_delay,
);
let crash_dir_monitor = self.config.crashes.monitor_results(new_result, false);
let crash_dir_monitor = self
.config
.crashes
.monitor_results(new_result, false, &jr_client);
let fuzzer = self.fuzzing_loop(hb_client);
@ -298,6 +302,7 @@ mod tests {
task_id: Default::default(),
instance_id: Default::default(),
heartbeat_queue: Default::default(),
job_result_queue: Default::default(),
instance_telemetry_key: Default::default(),
microsoft_telemetry_key: Default::default(),
logs: Default::default(),

View File

@ -1,7 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender, utils::default_bool_true};
use crate::tasks::{
config::CommonConfig,
heartbeat::{HeartbeatSender, TaskHeartbeatClient},
utils::default_bool_true,
};
use anyhow::{Context, Result};
use arraydeque::{ArrayDeque, Wrapping};
use async_trait::async_trait;
@ -12,6 +16,7 @@ use onefuzz::{
process::ExitStatus,
syncdir::{continuous_sync, SyncOperation::Pull, SyncedDir},
};
use onefuzz_result::job_result::{JobResultData, JobResultSender, TaskJobResultClient};
use onefuzz_telemetry::{
Event::{new_coverage, new_crashdump, new_result, runtime_stats},
EventData,
@ -126,21 +131,31 @@ where
self.verify().await?;
let hb_client = self.config.common.init_heartbeat(None).await?;
let jr_client = self.config.common.init_job_result().await?;
// To be scheduled.
let resync = self.continuous_sync_inputs();
let new_inputs = self.config.inputs.monitor_results(new_coverage, true);
let new_crashes = self.config.crashes.monitor_results(new_result, true);
let new_inputs = self
.config
.inputs
.monitor_results(new_coverage, true, &jr_client);
let new_crashes = self
.config
.crashes
.monitor_results(new_result, true, &jr_client);
let new_crashdumps = async {
if let Some(crashdumps) = &self.config.crashdumps {
crashdumps.monitor_results(new_crashdump, true).await
crashdumps
.monitor_results(new_crashdump, true, &jr_client)
.await
} else {
Ok(())
}
};
let (stats_sender, stats_receiver) = mpsc::unbounded_channel();
let report_stats = report_runtime_stats(stats_receiver, hb_client);
let report_stats = report_runtime_stats(stats_receiver, &hb_client, &jr_client);
let fuzzers = self.run_fuzzers(Some(&stats_sender));
futures::try_join!(
resync,
@ -183,7 +198,7 @@ where
.inputs
.local_path
.parent()
.ok_or_else(|| anyhow!("Invalid input path"))?;
.ok_or_else(|| anyhow!("invalid input path"))?;
let temp_path = task_dir.join(".temp");
tokio::fs::create_dir_all(&temp_path).await?;
let temp_dir = tempdir_in(temp_path)?;
@ -501,7 +516,7 @@ impl TotalStats {
self.execs_sec = self.worker_stats.values().map(|x| x.execs_sec).sum();
}
fn report(&self) {
async fn report(&self, jr_client: &Option<TaskJobResultClient>) {
event!(
runtime_stats;
EventData::Count = self.count,
@ -513,6 +528,17 @@ impl TotalStats {
EventData::Count = self.count,
EventData::ExecsSecond = self.execs_sec
);
if let Some(jr_client) = jr_client {
let _ = jr_client
.send_direct(
JobResultData::RuntimeStats,
HashMap::from([
("total_count".to_string(), self.count as f64),
("execs_sec".to_string(), self.execs_sec),
]),
)
.await;
}
}
}
@ -542,7 +568,8 @@ impl Timer {
// are approximating nearest-neighbor interpolation on the runtime stats time series.
async fn report_runtime_stats(
mut stats_channel: mpsc::UnboundedReceiver<RuntimeStats>,
heartbeat_client: impl HeartbeatSender,
heartbeat_client: &Option<TaskHeartbeatClient>,
jr_client: &Option<TaskJobResultClient>,
) -> Result<()> {
// Cache the last-reported stats for a given worker.
//
@ -551,7 +578,7 @@ async fn report_runtime_stats(
let mut total = TotalStats::default();
// report all zeros to start
total.report();
total.report(jr_client).await;
let timer = Timer::new(RUNTIME_STATS_PERIOD);
@ -560,10 +587,10 @@ async fn report_runtime_stats(
Some(stats) = stats_channel.recv() => {
heartbeat_client.alive();
total.update(stats);
total.report()
total.report(jr_client).await
}
_ = timer.wait() => {
total.report()
total.report(jr_client).await
}
}
}

View File

@ -79,7 +79,10 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
remote_path: config.crashes.remote_path.clone(),
};
crashes.init().await?;
let monitor_crashes = crashes.monitor_results(new_result, false);
let jr_client = config.common.init_job_result().await?;
let monitor_crashes = crashes.monitor_results(new_result, false, &jr_client);
// setup crashdumps
let (crashdump_dir, monitor_crashdumps) = {
@ -95,9 +98,12 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
};
let monitor_dir = crashdump_dir.clone();
let monitor_jr_client = config.common.init_job_result().await?;
let monitor_crashdumps = async move {
if let Some(crashdumps) = monitor_dir {
crashdumps.monitor_results(new_crashdump, false).await
crashdumps
.monitor_results(new_crashdump, false, &monitor_jr_client)
.await
} else {
Ok(())
}
@ -129,11 +135,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
if let Some(no_repro) = &config.no_repro {
no_repro.init().await?;
}
let monitor_reports_future = monitor_reports(
reports_dir.path(),
&config.unique_reports,
&config.reports,
&config.no_repro,
&jr_client,
);
let inputs = SyncedDir {
@ -156,7 +164,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
delay_with_jitter(delay).await;
}
}
let monitor_inputs = inputs.monitor_results(new_coverage, false);
let monitor_inputs = inputs.monitor_results(new_coverage, false, &jr_client);
let inputs_sync_cancellation = CancellationToken::new(); // never actually cancelled
let inputs_sync_task =
inputs.continuous_sync(Pull, config.ensemble_sync_delay, &inputs_sync_cancellation);
@ -444,6 +452,7 @@ mod tests {
task_id: Default::default(),
instance_id: Default::default(),
heartbeat_queue: Default::default(),
job_result_queue: Default::default(),
instance_telemetry_key: Default::default(),
microsoft_telemetry_key: Default::default(),
logs: Default::default(),

View File

@ -1,8 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::onefuzz::heartbeat::HeartbeatClient;
use anyhow::Result;
use onefuzz::heartbeat::HeartbeatClient;
use reqwest::Url;
use serde::{self, Deserialize, Serialize};
use std::time::Duration;

View File

@ -2,12 +2,14 @@
// Licensed under the MIT License.
use crate::tasks::{
config::CommonConfig,
heartbeat::{HeartbeatSender, TaskHeartbeatClient},
report::crash_report::{parse_report_file, CrashTestResult, RegressionReport},
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use onefuzz::syncdir::SyncedDir;
use onefuzz_result::job_result::TaskJobResultClient;
use reqwest::Url;
use std::path::PathBuf;
@ -24,7 +26,7 @@ pub trait RegressionHandler {
/// Runs the regression task
pub async fn run(
heartbeat_client: Option<TaskHeartbeatClient>,
common_config: &CommonConfig,
regression_reports: &SyncedDir,
crashes: &SyncedDir,
report_dirs: &[&SyncedDir],
@ -35,6 +37,9 @@ pub async fn run(
info!("starting regression task");
regression_reports.init().await?;
let heartbeat_client = common_config.init_heartbeat(None).await?;
let job_result_client = common_config.init_job_result().await?;
handle_crash_reports(
handler,
crashes,
@ -42,6 +47,7 @@ pub async fn run(
report_list,
regression_reports,
&heartbeat_client,
&job_result_client,
)
.await
.context("handling crash reports")?;
@ -52,6 +58,7 @@ pub async fn run(
readonly_inputs,
regression_reports,
&heartbeat_client,
&job_result_client,
)
.await
.context("handling inputs")?;
@ -71,6 +78,7 @@ pub async fn handle_inputs(
readonly_inputs: &SyncedDir,
regression_reports: &SyncedDir,
heartbeat_client: &Option<TaskHeartbeatClient>,
job_result_client: &Option<TaskJobResultClient>,
) -> Result<()> {
readonly_inputs.init_pull().await?;
let mut input_files = tokio::fs::read_dir(&readonly_inputs.local_path).await?;
@ -95,7 +103,7 @@ pub async fn handle_inputs(
crash_test_result,
original_crash_test_result: None,
}
.save(None, regression_reports)
.save(None, regression_reports, job_result_client)
.await?
}
@ -109,6 +117,7 @@ pub async fn handle_crash_reports(
report_list: &Option<Vec<String>>,
regression_reports: &SyncedDir,
heartbeat_client: &Option<TaskHeartbeatClient>,
job_result_client: &Option<TaskJobResultClient>,
) -> Result<()> {
// without crash report containers, skip this method
if report_dirs.is_empty() {
@ -158,7 +167,7 @@ pub async fn handle_crash_reports(
crash_test_result,
original_crash_test_result: Some(original_crash_test_result),
}
.save(Some(file_name), regression_reports)
.save(Some(file_name), regression_reports, job_result_client)
.await?
}
}

View File

@ -89,7 +89,6 @@ impl GenericRegressionTask {
pub async fn run(&self) -> Result<()> {
info!("Starting generic regression task");
let heartbeat_client = self.config.common.init_heartbeat(None).await?;
let mut report_dirs = vec![];
for dir in vec![
@ -103,7 +102,7 @@ impl GenericRegressionTask {
report_dirs.push(dir);
}
common::run(
heartbeat_client,
&self.config.common,
&self.config.regression_reports,
&self.config.crashes,
&report_dirs,

View File

@ -103,9 +103,8 @@ impl LibFuzzerRegressionTask {
report_dirs.push(dir);
}
let heartbeat_client = self.config.common.init_heartbeat(None).await?;
common::run(
heartbeat_client,
&self.config.common,
&self.config.regression_reports,
&self.config.crashes,
&report_dirs,

View File

@ -3,6 +3,7 @@
use anyhow::{Context, Result};
use onefuzz::{blob::BlobUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use onefuzz_result::job_result::{JobResultData, JobResultSender, TaskJobResultClient};
use onefuzz_telemetry::{
Event::{
new_report, new_unable_to_reproduce, new_unique_report, regression_report,
@ -12,6 +13,7 @@ use onefuzz_telemetry::{
};
use serde::{Deserialize, Serialize};
use stacktrace_parser::CrashLog;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use uuid::Uuid;
@ -111,6 +113,7 @@ impl RegressionReport {
self,
report_name: Option<String>,
regression_reports: &SyncedDir,
jr_client: &Option<TaskJobResultClient>,
) -> Result<()> {
let (event, name) = match &self.crash_test_result {
CrashTestResult::CrashReport(report) => {
@ -126,6 +129,15 @@ impl RegressionReport {
if upload_or_save_local(&self, &name, regression_reports).await? {
event!(event; EventData::Path = name.clone());
metric!(event; 1.0; EventData::Path = name.clone());
if let Some(jr_client) = jr_client {
let _ = jr_client
.send_direct(
JobResultData::NewRegressionReport,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
}
Ok(())
}
@ -149,6 +161,7 @@ impl CrashTestResult {
unique_reports: &Option<SyncedDir>,
reports: &Option<SyncedDir>,
no_repro: &Option<SyncedDir>,
jr_client: &Option<TaskJobResultClient>,
) -> Result<()> {
match self {
Self::CrashReport(report) => {
@ -158,6 +171,15 @@ impl CrashTestResult {
if upload_or_save_local(&report, &name, unique_reports).await? {
event!(new_unique_report; EventData::Path = report.unique_blob_name());
metric!(new_unique_report; 1.0; EventData::Path = report.unique_blob_name());
if let Some(jr_client) = jr_client {
let _ = jr_client
.send_direct(
JobResultData::NewUniqueReport,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
}
}
@ -166,6 +188,15 @@ impl CrashTestResult {
if upload_or_save_local(&report, &name, reports).await? {
event!(new_report; EventData::Path = report.blob_name());
metric!(new_report; 1.0; EventData::Path = report.blob_name());
if let Some(jr_client) = jr_client {
let _ = jr_client
.send_direct(
JobResultData::NewReport,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
}
}
}
@ -176,6 +207,15 @@ impl CrashTestResult {
if upload_or_save_local(&report, &name, no_repro).await? {
event!(new_unable_to_reproduce; EventData::Path = report.blob_name());
metric!(new_unable_to_reproduce; 1.0; EventData::Path = report.blob_name());
if let Some(jr_client) = jr_client {
let _ = jr_client
.send_direct(
JobResultData::NoReproCrashingInput,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
}
}
}
@ -324,6 +364,7 @@ pub async fn monitor_reports(
unique_reports: &Option<SyncedDir>,
reports: &Option<SyncedDir>,
no_crash: &Option<SyncedDir>,
jr_client: &Option<TaskJobResultClient>,
) -> Result<()> {
if unique_reports.is_none() && reports.is_none() && no_crash.is_none() {
debug!("no report directories configured");
@ -334,7 +375,9 @@ pub async fn monitor_reports(
while let Some(file) = monitor.next_file().await? {
let result = parse_report_file(file).await?;
result.save(unique_reports, reports, no_crash).await?;
result
.save(unique_reports, reports, no_crash, jr_client)
.await?;
}
Ok(())

View File

@ -8,16 +8,6 @@ use std::{
sync::Arc,
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use onefuzz::expand::Expand;
use onefuzz::fs::set_executable;
use onefuzz::{blob::BlobUrl, sha256, syncdir::SyncedDir};
use reqwest::Url;
use serde::Deserialize;
use storage_queue::{Message, QueueClient};
use tokio::fs;
use crate::tasks::report::crash_report::*;
use crate::tasks::report::dotnet::common::collect_exception_info;
use crate::tasks::{
@ -26,6 +16,16 @@ use crate::tasks::{
heartbeat::{HeartbeatSender, TaskHeartbeatClient},
utils::{default_bool_true, try_resolve_setup_relative_path},
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use onefuzz::expand::Expand;
use onefuzz::fs::set_executable;
use onefuzz::{blob::BlobUrl, sha256, syncdir::SyncedDir};
use onefuzz_result::job_result::TaskJobResultClient;
use reqwest::Url;
use serde::Deserialize;
use storage_queue::{Message, QueueClient};
use tokio::fs;
const DOTNET_DUMP_TOOL_NAME: &str = "dotnet-dump";
@ -114,15 +114,18 @@ impl DotnetCrashReportTask {
pub struct AsanProcessor {
config: Arc<Config>,
heartbeat_client: Option<TaskHeartbeatClient>,
job_result_client: Option<TaskJobResultClient>,
}
impl AsanProcessor {
pub async fn new(config: Arc<Config>) -> Result<Self> {
let heartbeat_client = config.common.init_heartbeat(None).await?;
let job_result_client = config.common.init_job_result().await?;
Ok(Self {
config,
heartbeat_client,
job_result_client,
})
}
@ -260,6 +263,7 @@ impl Processor for AsanProcessor {
&self.config.unique_reports,
&self.config.reports,
&self.config.no_repro,
&self.job_result_client,
)
.await;

View File

@ -13,6 +13,7 @@ use async_trait::async_trait;
use onefuzz::{
blob::BlobUrl, input_tester::Tester, machine_id::MachineIdentity, sha256, syncdir::SyncedDir,
};
use onefuzz_result::job_result::TaskJobResultClient;
use reqwest::Url;
use serde::Deserialize;
use std::{
@ -73,7 +74,9 @@ impl ReportTask {
pub async fn managed_run(&mut self) -> Result<()> {
info!("Starting generic crash report task");
let heartbeat_client = self.config.common.init_heartbeat(None).await?;
let mut processor = GenericReportProcessor::new(&self.config, heartbeat_client);
let job_result_client = self.config.common.init_job_result().await?;
let mut processor =
GenericReportProcessor::new(&self.config, heartbeat_client, job_result_client);
#[allow(clippy::manual_flatten)]
for entry in [
@ -183,13 +186,19 @@ pub async fn test_input(args: TestInputArgs<'_>) -> Result<CrashTestResult> {
pub struct GenericReportProcessor<'a> {
config: &'a Config,
heartbeat_client: Option<TaskHeartbeatClient>,
job_result_client: Option<TaskJobResultClient>,
}
impl<'a> GenericReportProcessor<'a> {
pub fn new(config: &'a Config, heartbeat_client: Option<TaskHeartbeatClient>) -> Self {
pub fn new(
config: &'a Config,
heartbeat_client: Option<TaskHeartbeatClient>,
job_result_client: Option<TaskJobResultClient>,
) -> Self {
Self {
config,
heartbeat_client,
job_result_client,
}
}
@ -239,6 +248,7 @@ impl<'a> Processor for GenericReportProcessor<'a> {
&self.config.unique_reports,
&self.config.reports,
&self.config.no_repro,
&self.job_result_client,
)
.await
.context("saving report failed")

View File

@ -13,6 +13,7 @@ use async_trait::async_trait;
use onefuzz::{
blob::BlobUrl, libfuzzer::LibFuzzer, machine_id::MachineIdentity, sha256, syncdir::SyncedDir,
};
use onefuzz_result::job_result::TaskJobResultClient;
use reqwest::Url;
use serde::Deserialize;
use std::{
@ -196,15 +197,18 @@ pub async fn test_input(args: TestInputArgs<'_>) -> Result<CrashTestResult> {
pub struct AsanProcessor {
config: Arc<Config>,
heartbeat_client: Option<TaskHeartbeatClient>,
job_result_client: Option<TaskJobResultClient>,
}
impl AsanProcessor {
pub async fn new(config: Arc<Config>) -> Result<Self> {
let heartbeat_client = config.common.init_heartbeat(None).await?;
let job_result_client = config.common.init_job_result().await?;
Ok(Self {
config,
heartbeat_client,
job_result_client,
})
}
@ -257,6 +261,7 @@ impl Processor for AsanProcessor {
&self.config.unique_reports,
&self.config.reports,
&self.config.no_repro,
&self.job_result_client,
)
.await
}

View File

@ -44,6 +44,7 @@ tempfile = "3.7.0"
process_control = "4.0"
reqwest-retry = { path = "../reqwest-retry" }
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
onefuzz-result = { path = "../onefuzz-result" }
stacktrace-parser = { path = "../stacktrace-parser" }
backoff = { version = "0.4", features = ["tokio"] }

View File

@ -11,10 +11,12 @@ use crate::{
};
use anyhow::{Context, Result};
use dunce::canonicalize;
use onefuzz_result::job_result::{JobResultData, JobResultSender, TaskJobResultClient};
use onefuzz_telemetry::{Event, EventData};
use reqwest::{StatusCode, Url};
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::{env::current_dir, path::PathBuf, str, time::Duration};
use tokio::{fs, select};
use tokio_util::sync::CancellationToken;
@ -241,6 +243,7 @@ impl SyncedDir {
url: BlobContainerUrl,
event: Event,
ignore_dotfiles: bool,
jr_client: &Option<TaskJobResultClient>,
) -> Result<()> {
debug!("monitoring {}", path.display());
@ -265,9 +268,39 @@ impl SyncedDir {
if ignore_dotfiles && file_name_event_str.starts_with('.') {
continue;
}
event!(event.clone(); EventData::Path = file_name_event_str);
metric!(event.clone(); 1.0; EventData::Path = file_name_str_metric_str);
if let Some(jr_client) = jr_client {
match event {
Event::new_result => {
jr_client
.send_direct(
JobResultData::NewCrashingInput,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
Event::new_coverage => {
jr_client
.send_direct(
JobResultData::CoverageData,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
Event::new_crashdump => {
jr_client
.send_direct(
JobResultData::NewCrashDump,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
_ => {
warn!("Unhandled job result!");
}
}
}
let destination = path.join(file_name);
if let Err(err) = fs::copy(&item, &destination).await {
let error_message = format!(
@ -305,6 +338,29 @@ impl SyncedDir {
event!(event.clone(); EventData::Path = file_name_event_str);
metric!(event.clone(); 1.0; EventData::Path = file_name_str_metric_str);
if let Some(jr_client) = jr_client {
match event {
Event::new_result => {
jr_client
.send_direct(
JobResultData::NewCrashingInput,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
Event::new_coverage => {
jr_client
.send_direct(
JobResultData::CoverageData,
HashMap::from([("count".to_string(), 1.0)]),
)
.await;
}
_ => {
warn!("Unhandled job result!");
}
}
}
if let Err(err) = uploader.upload(item.clone()).await {
let error_message = format!(
"Couldn't upload file. path:{} dir:{} err:{:?}",
@ -336,7 +392,12 @@ impl SyncedDir {
/// The intent of this is to support use cases where we usually want a directory
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
/// a directory, and may reset it.
pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> {
pub async fn monitor_results(
&self,
event: Event,
ignore_dotfiles: bool,
job_result_client: &Option<TaskJobResultClient>,
) -> Result<()> {
if let Some(url) = self.remote_path.clone() {
loop {
debug!("waiting to monitor {}", self.local_path.display());
@ -355,6 +416,7 @@ impl SyncedDir {
url.clone(),
event.clone(),
ignore_dotfiles,
job_result_client,
)
.await?;
}

View File

@ -33,7 +33,7 @@ var storageAccountFuncQueuesParams = [
'update-queue'
'webhooks'
'signalr-events'
'custom-metrics'
'job-result'
]
var fileChangesQueueIndex = 0