Add extra_output container, rename extra container (#3064)

## Summary of the Pull Request

- **Breaking** (but as far as I know this feature is not yet in use): rename the `extra_container` to `extra_setup_container`.
- **Add**: the `extra_output_container`, which pushes its outputs continually.
  - We may also want a type of container which both pushes & pulls? See discussion below.
- **Improved**: if `onefuzz-task` fails upon launch, we will log its output for diagnosis (might close #3113)

---

Some thoughts for the future:

We might want to redesign the containers so that we have something like the following which is passed to the agent, and the agent doesn't need to know the specifics of the containers supplied:

```jsonc
{
    // ...
    "containers": {
        "extra_setup_dir": {
            "mode": "pull",
            "container_name": "yyy",
        },
        "extra_output_dir": {
            "mode": "push",
            "continuous": true, // keep pushing while job is running
            "container_name": "xxx"
        }
    }
}
```

At the moment the agent needs to know what each container is for, for each task type. A more generic and flexible method might be simpler overall.
This commit is contained in:
George Pollard
2023-06-15 14:48:27 +12:00
committed by GitHub
parent 630b083f64
commit aa54a15427
50 changed files with 1541 additions and 1405 deletions

View File

@ -26,7 +26,8 @@ The following values are replaced with the specific values at runtime.
* `{crashes_container}`: Container name for the `crashes` container * `{crashes_container}`: Container name for the `crashes` container
* `{microsoft_telemetry_key}`: Application Insights key used for collecting [non-attributable telemetry](telemetry.md) to improve OneFuzz. * `{microsoft_telemetry_key}`: Application Insights key used for collecting [non-attributable telemetry](telemetry.md) to improve OneFuzz.
* `{instance_telemetry_key}`: Application Insights key used for private, instance-owned telemetry and logging (See [OneFuzz Telemetry](telemetry.md). * `{instance_telemetry_key}`: Application Insights key used for private, instance-owned telemetry and logging (See [OneFuzz Telemetry](telemetry.md).
* `{extra_dir}`: Path to the optionally provided `extra` directory * `{extra_setup_dir}`: Path to the optionally provided `extra_setup` directory
* `{extra_output_dir}`: Path to the optionally provided `extra_output` directory
## Example ## Example

View File

@ -152,7 +152,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -1957,7 +1958,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -2860,7 +2862,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -3343,7 +3346,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -3844,7 +3848,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -4293,7 +4298,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -4769,7 +4775,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },
@ -5375,7 +5382,8 @@ If webhook is set to have Event Grid message format then the payload will look a
"unique_reports", "unique_reports",
"regression_reports", "regression_reports",
"logs", "logs",
"extra" "extra_setup",
"extra_output"
], ],
"title": "ContainerType" "title": "ContainerType"
}, },

View File

@ -112,7 +112,8 @@ public enum ContainerType {
UniqueReports, UniqueReports,
RegressionReports, RegressionReports,
Logs, Logs,
Extra ExtraSetup,
ExtraOutput,
} }

View File

@ -963,42 +963,36 @@ public record TaskDefinition(
public record WorkSet( public record WorkSet(
bool Reboot, bool Reboot,
Uri SetupUrl, Uri SetupUrl,
Uri? ExtraUrl, Uri? ExtraSetupUrl,
bool Script, bool Script,
List<WorkUnit> WorkUnits List<WorkUnit> WorkUnits
); );
public readonly record struct ContainerDefinition(
public record ContainerDefinition(
ContainerType Type, ContainerType Type,
Compare Compare, Compare Compare,
long Value, long Value,
ContainerPermission Permissions); ContainerPermission Permissions);
// TODO: service shouldn't pass SyncedDir, but just the url and let the agent // TODO: service shouldn't pass SyncedDir, but just the url and let the agent
// come up with paths // come up with paths
public record SyncedDir(string Path, Uri Url); public readonly record struct SyncedDir(string Path, Uri Url);
[JsonConverter(typeof(ContainerDefConverter))] [JsonConverter(typeof(ContainerDefConverter))]
public interface IContainerDef { } public interface IContainerDef { }
public record SingleContainer(SyncedDir SyncedDir) : IContainerDef; public record SingleContainer(SyncedDir SyncedDir) : IContainerDef;
public record MultipleContainer(List<SyncedDir> SyncedDirs) : IContainerDef; public record MultipleContainer(IReadOnlyList<SyncedDir> SyncedDirs) : IContainerDef;
public class ContainerDefConverter : JsonConverter<IContainerDef> { public class ContainerDefConverter : JsonConverter<IContainerDef> {
public override IContainerDef? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { public override IContainerDef? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) {
if (reader.TokenType == JsonTokenType.StartObject) { if (reader.TokenType == JsonTokenType.StartObject) {
var result = (SyncedDir?)JsonSerializer.Deserialize(ref reader, typeof(SyncedDir), options); var result = (SyncedDir?)JsonSerializer.Deserialize(ref reader, typeof(SyncedDir), options);
if (result is null) { if (result is SyncedDir sd) {
return null; return new SingleContainer(sd);
} }
return new SingleContainer(result); return null;
} }
if (reader.TokenType == JsonTokenType.StartArray) { if (reader.TokenType == JsonTokenType.StartArray) {
@ -1093,8 +1087,8 @@ public record TaskUnitConfig(
public IContainerDef? UniqueInputs { get; set; } public IContainerDef? UniqueInputs { get; set; }
public IContainerDef? UniqueReports { get; set; } public IContainerDef? UniqueReports { get; set; }
public IContainerDef? RegressionReports { get; set; } public IContainerDef? RegressionReports { get; set; }
public IContainerDef? Extra { get; set; } public IContainerDef? ExtraSetup { get; set; }
public IContainerDef? ExtraOutput { get; set; }
} }
public record NodeCommandEnvelope( public record NodeCommandEnvelope(

View File

@ -75,75 +75,82 @@ public class Config : IConfig {
); );
if (definition.MonitorQueue != null) { if (definition.MonitorQueue != null) {
config.inputQueue = await _queue.GetQueueSas(task.TaskId.ToString(), StorageType.Corpus, QueueSasPermissions.Add | QueueSasPermissions.Read | QueueSasPermissions.Update | QueueSasPermissions.Process); config.inputQueue = await _queue.GetQueueSas(task.TaskId.ToString(), StorageType.Corpus, QueueSasPermissions.All);
} }
var containersByType = definition.Containers.Where(c => c.Type != ContainerType.Setup && task.Config.Containers != null) if (task.Config.Containers is not null) {
.ToAsyncEnumerable() var containersByType =
.SelectAwait(async countainerDef => { await Async.Task.WhenAll(
var containers = await definition.Containers
task.Config.Containers! .Where(c => c.Type is not ContainerType.Setup)
.Where(c => c.Type == countainerDef.Type).Select(container => (countainerDef, container)) .Select(async countainerDef => {
.Where(x => x.container != null) var syncedDirs =
.ToAsyncEnumerable() await Async.Task.WhenAll(
.SelectAwait(async (x, i) => task.Config.Containers
new SyncedDir( .Where(c => c.Type == countainerDef.Type)
string.Join("_", "task", x.Item1.Type.ToString().ToLower(), i), .Select(async (container, i) =>
await _containers.GetContainerSasUrl(x.Item2.Name, StorageType.Corpus, ConvertPermissions(x.Item1.Permissions))) new SyncedDir(
).ToListAsync(); string.Join("_", "task", countainerDef.Type.ToString().ToLower(), i),
return (countainerDef, containers); await _containers.GetContainerSasUrl(container.Name, StorageType.Corpus, ConvertPermissions(countainerDef.Permissions)))
} ));
);
await foreach (var data in containersByType) { return (countainerDef, syncedDirs);
}));
if (!data.containers.Any()) { foreach (var (containerDef, syncedDirs) in containersByType) {
continue; if (!syncedDirs.Any()) {
} continue;
}
IContainerDef def = data.countainerDef switch { IContainerDef def = containerDef switch {
ContainerDefinition { Compare: Compare.Equal, Value: 1 } or ContainerDefinition { Compare: Compare.Equal or Compare.AtMost, Value: 1 }
ContainerDefinition { Compare: Compare.AtMost, Value: 1 } when data.containers.Count == 1 => new SingleContainer(data.containers[0]), when syncedDirs is [var syncedDir] => new SingleContainer(syncedDir),
_ => new MultipleContainer(data.containers) _ => new MultipleContainer(syncedDirs)
}; };
switch (data.countainerDef.Type) { switch (containerDef.Type) {
case ContainerType.Analysis: case ContainerType.Analysis:
config.Analysis = def; config.Analysis = def;
break; break;
case ContainerType.Coverage: case ContainerType.Coverage:
config.Coverage = def; config.Coverage = def;
break; break;
case ContainerType.Crashes: case ContainerType.Crashes:
config.Crashes = def; config.Crashes = def;
break; break;
case ContainerType.Inputs: case ContainerType.Inputs:
config.Inputs = def; config.Inputs = def;
break; break;
case ContainerType.NoRepro: case ContainerType.NoRepro:
config.NoRepro = def; config.NoRepro = def;
break; break;
case ContainerType.ReadonlyInputs: case ContainerType.ReadonlyInputs:
config.ReadonlyInputs = def; config.ReadonlyInputs = def;
break; break;
case ContainerType.Reports: case ContainerType.Reports:
config.Reports = def; config.Reports = def;
break; break;
case ContainerType.Tools: case ContainerType.Tools:
config.Tools = def; config.Tools = def;
break; break;
case ContainerType.UniqueInputs: case ContainerType.UniqueInputs:
config.UniqueInputs = def; config.UniqueInputs = def;
break; break;
case ContainerType.UniqueReports: case ContainerType.UniqueReports:
config.UniqueReports = def; config.UniqueReports = def;
break; break;
case ContainerType.RegressionReports: case ContainerType.RegressionReports:
config.RegressionReports = def; config.RegressionReports = def;
break; break;
case ContainerType.Extra: case ContainerType.ExtraSetup:
config.Extra = def; config.ExtraSetup = def;
break; break;
case ContainerType.ExtraOutput:
config.ExtraOutput = def;
break;
default:
throw new InvalidDataException($"unknown container type: {containerDef.Type}");
}
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -102,25 +102,35 @@ public class Scheduler : IScheduler {
} }
} }
if (bucketConfig is not null) { if (bucketConfig is BucketConfig c) {
var setupUrl = await _containers.GetContainerSasUrl(bucketConfig.setupContainer, StorageType.Corpus, BlobContainerSasPermissions.Read | BlobContainerSasPermissions.List); var readOnlyPermissions = BlobContainerSasPermissions.Read | BlobContainerSasPermissions.List;
var extraUrl = bucketConfig.extraContainer != null ? await _containers.GetContainerSasUrl(bucketConfig.extraContainer, StorageType.Corpus, BlobContainerSasPermissions.Read | BlobContainerSasPermissions.List) : null; var setupUrl = await _containers.GetContainerSasUrl(c.setupContainer, StorageType.Corpus, readOnlyPermissions);
var extraSetupUrl = c.extraSetupContainer is not null
? await _containers.GetContainerSasUrl(c.extraSetupContainer, StorageType.Corpus, readOnlyPermissions)
: null;
var workSet = new WorkSet( var workSet = new WorkSet(
Reboot: bucketConfig.reboot, Reboot: c.reboot,
Script: bucketConfig.setupScript is not null, Script: c.setupScript is not null,
SetupUrl: setupUrl, SetupUrl: setupUrl,
ExtraUrl: extraUrl, ExtraSetupUrl: extraSetupUrl,
WorkUnits: workUnits WorkUnits: workUnits
); );
return (bucketConfig, workSet); return (c, workSet);
} }
return null; return null;
} }
sealed record BucketConfig(long count, bool reboot, Container setupContainer, Container? extraContainer, string? setupScript, Pool pool); readonly record struct BucketConfig(
long count,
bool reboot,
Container setupContainer,
Container? extraSetupContainer,
string? setupScript,
Pool pool);
sealed record PoolKey( sealed record PoolKey(
PoolName? poolName = null, PoolName? poolName = null,
@ -174,7 +184,7 @@ public class Scheduler : IScheduler {
} }
var setupContainer = task.Config.Containers?.FirstOrDefault(c => c.Type == ContainerType.Setup) ?? throw new Exception($"task missing setup container: task_type = {task.Config.Task.Type}"); var setupContainer = task.Config.Containers?.FirstOrDefault(c => c.Type == ContainerType.Setup) ?? throw new Exception($"task missing setup container: task_type = {task.Config.Task.Type}");
var extraContainer = task.Config.Containers?.FirstOrDefault(c => c.Type == ContainerType.Extra); var extraSetupContainer = task.Config.Containers?.FirstOrDefault(c => c is { Type: ContainerType.ExtraSetup });
string? setupScript = null; string? setupScript = null;
if (task.Os == Os.Windows) { if (task.Os == Os.Windows) {
@ -213,7 +223,7 @@ public class Scheduler : IScheduler {
count, count,
reboot, reboot,
setupContainer.Name, setupContainer.Name,
extraContainer?.Name, extraSetupContainer?.Name,
setupScript, setupScript,
pool with { ETag = default, TimeStamp = default }); pool with { ETag = default, TimeStamp = default });

View File

@ -60,7 +60,7 @@ impl Fixture {
WorkSet { WorkSet {
reboot: false, reboot: false,
setup_url: self.setup_url(), setup_url: self.setup_url(),
extra_url: None, extra_setup_url: None,
script: false, script: false,
work_units: vec![self.work_unit()], work_units: vec![self.work_unit()],
} }

View File

@ -167,7 +167,7 @@ fn debug_run_worker(opt: RunWorkerOpt) -> Result<()> {
let work_set = WorkSet { let work_set = WorkSet {
reboot: false, reboot: false,
setup_url: BlobContainerUrl::new(opt.setup_url)?, setup_url: BlobContainerUrl::new(opt.setup_url)?,
extra_url: opt.extra_url.map(BlobContainerUrl::new).transpose()?, extra_setup_url: opt.extra_url.map(BlobContainerUrl::new).transpose()?,
script: opt.script, script: opt.script,
work_units: vec![work_unit], work_units: vec![work_unit],
}; };
@ -192,10 +192,10 @@ async fn run_worker(mut work_set: WorkSet) -> Result<Vec<WorkerEvent>> {
let mut events = vec![]; let mut events = vec![];
let work_unit = work_set.work_units.pop().unwrap(); let work_unit = work_set.work_units.pop().unwrap();
let setup_dir = work_set.setup_dir()?; let setup_dir = work_set.setup_dir()?;
let extra_dir = work_set.extra_dir()?; let extra_setup_dir = work_set.extra_setup_dir()?;
let work_dir = work_unit.working_dir(setup_runner.machine_id)?; let work_dir = work_unit.working_dir(setup_runner.machine_id)?;
let mut worker = Worker::new(work_dir, &setup_dir, extra_dir, work_unit); let mut worker = Worker::new(work_dir, setup_dir, extra_setup_dir, work_unit);
while !worker.is_done() { while !worker.is_done() {
worker = worker worker = worker
.update( .update(

View File

@ -250,12 +250,17 @@ impl State<PendingReboot> {
impl State<Ready> { impl State<Ready> {
pub async fn run(self, machine_id: uuid::Uuid) -> Result<State<Busy>> { pub async fn run(self, machine_id: uuid::Uuid) -> Result<State<Busy>> {
let mut workers = vec![]; let mut workers = vec![];
let setup_dir = &self.ctx.work_set.setup_dir()?; let setup_dir = self.ctx.work_set.setup_dir()?;
let extra_dir = self.ctx.work_set.extra_dir()?; let extra_setup_dir = self.ctx.work_set.extra_setup_dir()?;
for work in self.ctx.work_set.work_units { for work in self.ctx.work_set.work_units {
let work_dir = work.working_dir(machine_id)?; let work_dir = work.working_dir(machine_id)?;
let worker = Some(Worker::new(work_dir, setup_dir, extra_dir.clone(), work)); let worker = Some(Worker::new(
work_dir,
setup_dir.clone(),
extra_setup_dir.clone(),
work,
));
workers.push(worker); workers.push(worker);
} }

View File

@ -43,20 +43,26 @@ pub struct SetupRunner {
impl SetupRunner { impl SetupRunner {
pub async fn run(&self, work_set: &WorkSet) -> Result<SetupOutput> { pub async fn run(&self, work_set: &WorkSet) -> Result<SetupOutput> {
if let (Some(extra_container), Some(extra_dir)) = if let (Some(extra_setup_container), Some(extra_setup_dir)) =
(&work_set.extra_url, work_set.extra_dir()?) (&work_set.extra_setup_url, work_set.extra_setup_dir()?)
{ {
info!("downloading extra container"); info!("downloading extra setup container");
// `azcopy sync` requires the local dir to exist. // `azcopy sync` requires the local dir to exist.
fs::create_dir_all(&extra_dir).await.with_context(|| { fs::create_dir_all(&extra_setup_dir)
format!("unable to create extra container: {}", extra_dir.display()) .await
})?; .with_context(|| {
let extra_url = extra_container.url()?; format!(
az_copy::sync(extra_url.to_string(), &extra_dir, false).await?; "unable to create extra setup container: {}",
extra_setup_dir.display()
)
})?;
let extra_url = extra_setup_container.url()?;
az_copy::sync(extra_url.to_string(), &extra_setup_dir, false).await?;
debug!( debug!(
"synced extra container from {} to {}", "synced extra setup container from {} to {}",
extra_url, extra_url,
extra_dir.display(), extra_setup_dir.display(),
); );
} }

View File

@ -62,7 +62,7 @@ pub async fn validate(command: ValidationCommand) -> Result<()> {
async fn validate_libfuzzer(config: ValidationConfig) -> Result<()> { async fn validate_libfuzzer(config: ValidationConfig) -> Result<()> {
let libfuzzer = LibFuzzer::new( let libfuzzer = LibFuzzer::new(
&config.target_exe, config.target_exe.clone(),
config.target_options.clone(), config.target_options.clone(),
config.target_env.iter().cloned().collect(), config.target_env.iter().cloned().collect(),
config config
@ -70,7 +70,8 @@ async fn validate_libfuzzer(config: ValidationConfig) -> Result<()> {
.clone() .clone()
.or_else(|| config.target_exe.parent().map(|p| p.to_path_buf())) .or_else(|| config.target_exe.parent().map(|p| p.to_path_buf()))
.expect("invalid target_exe"), .expect("invalid target_exe"),
None::<&PathBuf>, None,
None,
MachineIdentity { MachineIdentity {
machine_id: Uuid::nil(), machine_id: Uuid::nil(),
machine_name: "".to_string(), machine_name: "".to_string(),
@ -102,22 +103,26 @@ async fn run_setup(setup_folder: impl AsRef<Path>) -> Result<()> {
} }
async fn get_logs(config: ValidationConfig) -> Result<()> { async fn get_logs(config: ValidationConfig) -> Result<()> {
let setup_folder = config
.setup_folder
.clone()
.or_else(|| config.target_exe.parent().map(|p| p.to_path_buf()))
.expect("invalid setup_folder");
let libfuzzer = LibFuzzer::new( let libfuzzer = LibFuzzer::new(
&config.target_exe, config.target_exe,
config.target_options.clone(), config.target_options.clone(),
config.target_env.iter().cloned().collect(), config.target_env.iter().cloned().collect(),
config setup_folder,
.setup_folder None,
.clone() None,
.or_else(|| config.target_exe.parent().map(|p| p.to_path_buf()))
.expect("invalid setup_folder"),
None::<&PathBuf>,
MachineIdentity { MachineIdentity {
machine_id: Uuid::nil(), machine_id: Uuid::nil(),
machine_name: String::new(), machine_name: String::new(),
scaleset_name: None, scaleset_name: None,
}, },
); );
let cmd = libfuzzer.build_std_command(None, None, None, None, None)?; let cmd = libfuzzer.build_std_command(None, None, None, None, None)?;
print_logs(cmd)?; print_logs(cmd)?;
Ok(()) Ok(())

View File

@ -22,7 +22,7 @@ pub type TaskId = Uuid;
pub struct WorkSet { pub struct WorkSet {
pub reboot: bool, pub reboot: bool,
pub setup_url: BlobContainerUrl, pub setup_url: BlobContainerUrl,
pub extra_url: Option<BlobContainerUrl>, pub extra_setup_url: Option<BlobContainerUrl>,
pub script: bool, pub script: bool,
pub work_units: Vec<WorkUnit>, pub work_units: Vec<WorkUnit>,
} }
@ -93,9 +93,9 @@ impl WorkSet {
self.setup_url.as_path(root) self.setup_url.as_path(root)
} }
pub fn extra_dir(&self) -> Result<Option<PathBuf>> { pub fn extra_setup_dir(&self) -> Result<Option<PathBuf>> {
let root = self.get_root_folder()?; let root = self.get_root_folder()?;
self.extra_url self.extra_setup_url
.as_ref() .as_ref()
.map(|url| url.as_path(root)) .map(|url| url.as_path(root))
.transpose() .transpose()

View File

@ -16,7 +16,10 @@ use onefuzz::{
machine_id::MachineIdentity, machine_id::MachineIdentity,
process::{ExitStatus, Output}, process::{ExitStatus, Output},
}; };
use tokio::{fs, task, time::timeout}; use tokio::{
fs, task,
time::{error::Elapsed, timeout},
};
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
@ -52,16 +55,17 @@ pub enum Worker {
impl Worker { impl Worker {
pub fn new( pub fn new(
work_dir: impl AsRef<Path>, work_dir: PathBuf,
setup_dir: impl AsRef<Path>, setup_dir: PathBuf,
extra_dir: Option<impl AsRef<Path>>, extra_setup_dir: Option<PathBuf>,
work: WorkUnit, work: WorkUnit,
) -> Self { ) -> Self {
let ctx = Ready { let ctx = Ready {
work_dir: PathBuf::from(work_dir.as_ref()), work_dir,
setup_dir: PathBuf::from(setup_dir.as_ref()), setup_dir,
extra_dir: extra_dir.map(|dir| PathBuf::from(dir.as_ref())), extra_setup_dir,
}; };
let state = State { ctx, work }; let state = State { ctx, work };
state.into() state.into()
} }
@ -116,7 +120,7 @@ impl Worker {
pub struct Ready { pub struct Ready {
work_dir: PathBuf, work_dir: PathBuf,
setup_dir: PathBuf, setup_dir: PathBuf,
extra_dir: Option<PathBuf>, extra_setup_dir: Option<PathBuf>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -168,10 +172,10 @@ impl State<Ready> {
// Create and pass the server here // Create and pass the server here
let (from_agent_to_task_server, from_agent_to_task_endpoint) = IpcOneShotServer::new()?; let (from_agent_to_task_server, from_agent_to_task_endpoint) = IpcOneShotServer::new()?;
let (from_task_to_agent_server, from_task_to_agent_endpoint) = IpcOneShotServer::new()?; let (from_task_to_agent_server, from_task_to_agent_endpoint) = IpcOneShotServer::new()?;
let child = runner let mut child = runner
.run( .run(
&self.ctx.setup_dir, &self.ctx.setup_dir,
self.ctx.extra_dir, self.ctx.extra_setup_dir,
&self.work, &self.work,
from_agent_to_task_endpoint, from_agent_to_task_endpoint,
from_task_to_agent_endpoint, from_task_to_agent_endpoint,
@ -193,10 +197,24 @@ impl State<Ready> {
.await .await
{ {
Err(e) => { Err(e) => {
error!("timeout waiting for client_sender_server.accept(): {:?}", e); let _: Elapsed = e; // error here is always Elapsed and has no further info
// see if child exited with any useful information:
let child_output = match child.try_wait() {
Ok(None) => "still running".to_string(),
Ok(Some(output)) => {
format!("{:?}", output)
}
Err(e) => format!("{}", e),
};
error!(
"timeout waiting for client_sender_server.accept(): child status: {}",
child_output,
);
return Err(format_err!( return Err(format_err!(
"timeout waiting for client_sender_server.accept(): {:?}", "timeout waiting for client_sender_server.accept()"
e
)); ));
} }
Ok(res) => res??, Ok(res) => res??,
@ -211,13 +229,24 @@ impl State<Ready> {
.await .await
{ {
Err(e) => { Err(e) => {
let _: Elapsed = e; // error here is always Elapsed and has no further info
// see if child exited with any useful information:
let child_output = match child.try_wait() {
Ok(None) => "still running".to_string(),
Ok(Some(output)) => {
format!("{:?}", output)
}
Err(e) => format!("{}", e),
};
error!( error!(
"timeout waiting for server_receiver_server.accept(): {:?}", "timeout waiting for server_receiver_server.accept(): child status: {}",
e child_output
); );
return Err(format_err!( return Err(format_err!(
"timeout waiting for server_receiver_server.accept(): {:?}", "timeout waiting for server_receiver_server.accept()",
e
)); ));
} }
Ok(res) => res??, Ok(res) => res??,
@ -378,7 +407,7 @@ pub trait IWorkerRunner: Downcast {
async fn run( async fn run(
&self, &self,
setup_dir: &Path, setup_dir: &Path,
extra_dir: Option<PathBuf>, extra_setup_dir: Option<PathBuf>,
work: &WorkUnit, work: &WorkUnit,
from_agent_to_task_endpoint: String, from_agent_to_task_endpoint: String,
from_task_to_agent_endpoint: String, from_task_to_agent_endpoint: String,
@ -410,7 +439,7 @@ impl IWorkerRunner for WorkerRunner {
async fn run( async fn run(
&self, &self,
setup_dir: &Path, setup_dir: &Path,
extra_dir: Option<PathBuf>, extra_setup_dir: Option<PathBuf>,
work: &WorkUnit, work: &WorkUnit,
from_agent_to_task_endpoint: String, from_agent_to_task_endpoint: String,
from_task_to_agent_endpoint: String, from_task_to_agent_endpoint: String,
@ -430,21 +459,21 @@ impl IWorkerRunner for WorkerRunner {
// inject the machine_identity in the config file // inject the machine_identity in the config file
let work_config = work.config.expose_ref(); let work_config = work.config.expose_ref();
let mut config: HashMap<String, Value> = serde_json::from_str(work_config.as_str())?; let mut config: HashMap<&str, Value> = serde_json::from_str(work_config.as_str())?;
config.insert( config.insert(
"machine_identity".to_string(), "machine_identity",
serde_json::to_value(&self.machine_identity)?, serde_json::to_value(&self.machine_identity)?,
); );
config.insert( config.insert(
"from_agent_to_task_endpoint".to_string(), "from_agent_to_task_endpoint",
serde_json::to_value(&from_agent_to_task_endpoint)?, from_agent_to_task_endpoint.into(),
); );
config.insert( config.insert(
"from_task_to_agent_endpoint".to_string(), "from_task_to_agent_endpoint",
serde_json::to_value(&from_task_to_agent_endpoint)?, from_task_to_agent_endpoint.into(),
); );
let config_path = work.config_path(self.machine_identity.machine_id)?; let config_path = work.config_path(self.machine_identity.machine_id)?;
@ -468,10 +497,11 @@ impl IWorkerRunner for WorkerRunner {
let mut cmd = Command::new("onefuzz-task"); let mut cmd = Command::new("onefuzz-task");
cmd.current_dir(&working_dir); cmd.current_dir(&working_dir);
cmd.arg("managed"); cmd.arg("managed");
cmd.arg("config.json"); cmd.arg(config_path);
cmd.arg(setup_dir); cmd.arg(setup_dir);
if let Some(extra_dir) = extra_dir {
cmd.arg(extra_dir); if let Some(extra_setup_dir) = extra_setup_dir {
cmd.arg(extra_setup_dir);
} }
cmd.stderr(Stdio::piped()); cmd.stderr(Stdio::piped());

View File

@ -15,7 +15,7 @@ impl IWorkerRunner for WorkerRunnerDouble {
async fn run( async fn run(
&self, &self,
_setup_dir: &Path, _setup_dir: &Path,
_extra_dir: Option<PathBuf>, _extra_setup_dir: Option<PathBuf>,
_work: &WorkUnit, _work: &WorkUnit,
from_agent_to_task_endpoint: String, from_agent_to_task_endpoint: String,
from_task_to_agent_endpoint: String, from_task_to_agent_endpoint: String,

View File

@ -60,7 +60,7 @@ impl IWorkerRunner for RunnerDouble {
async fn run( async fn run(
&self, &self,
_setup_dir: &Path, _setup_dir: &Path,
_extra_dir: Option<PathBuf>, _extra_setup_dir: Option<PathBuf>,
_work: &WorkUnit, _work: &WorkUnit,
from_agent_to_task_endpoint: String, from_agent_to_task_endpoint: String,
from_task_to_agent_endpoint: String, from_task_to_agent_endpoint: String,
@ -95,7 +95,7 @@ async fn test_ready_run() {
ctx: Ready { ctx: Ready {
work_dir: PathBuf::default(), work_dir: PathBuf::default(),
setup_dir: PathBuf::default(), setup_dir: PathBuf::default(),
extra_dir: None, extra_setup_dir: None,
}, },
work: Fixture.work(), work: Fixture.work(),
}; };
@ -197,7 +197,7 @@ async fn test_worker_ready_update() {
ctx: Ready { ctx: Ready {
work_dir: PathBuf::default(), work_dir: PathBuf::default(),
setup_dir: PathBuf::default(), setup_dir: PathBuf::default(),
extra_dir: None, extra_setup_dir: None,
}, },
work: Fixture.work(), work: Fixture.work(),
}; };

View File

@ -228,6 +228,7 @@ pub async fn build_local_context(
event_sender: Option<Sender<UiEvent>>, event_sender: Option<Sender<UiEvent>>,
) -> Result<LocalContext> { ) -> Result<LocalContext> {
let job_id = get_uuid("job_id", args).unwrap_or_default(); let job_id = get_uuid("job_id", args).unwrap_or_default();
let task_id = get_uuid("task_id", args).unwrap_or_else(|_| { let task_id = get_uuid("task_id", args).unwrap_or_else(|_| {
if generate_task_id { if generate_task_id {
Uuid::new_v4() Uuid::new_v4()
@ -235,9 +236,9 @@ pub async fn build_local_context(
Uuid::nil() Uuid::nil()
} }
}); });
let instance_id = get_uuid("instance_id", args).unwrap_or_default(); let instance_id = get_uuid("instance_id", args).unwrap_or_default();
// let extra_dir = args.get_one::<PathBuf>(EXTRA_DIR).cloned();
let setup_dir = if let Some(setup_dir) = args.get_one::<PathBuf>(SETUP_DIR) { let setup_dir = if let Some(setup_dir) = args.get_one::<PathBuf>(SETUP_DIR) {
setup_dir.clone() setup_dir.clone()
} else if let Some(target_exe) = args.get_one::<String>(TARGET_EXE) { } else if let Some(target_exe) = args.get_one::<String>(TARGET_EXE) {
@ -254,7 +255,8 @@ pub async fn build_local_context(
task_id, task_id,
instance_id, instance_id,
setup_dir, setup_dir,
extra_dir: None, extra_setup_dir: None,
extra_output: None,
machine_identity: MachineIdentity { machine_identity: MachineIdentity {
machine_id: Uuid::nil(), machine_id: Uuid::nil(),
machine_name: "local".to_string(), machine_name: "local".to_string(),
@ -269,6 +271,7 @@ pub async fn build_local_context(
from_agent_to_task_endpoint: "/".to_string(), from_agent_to_task_endpoint: "/".to_string(),
from_task_to_agent_endpoint: "/".to_string(), from_task_to_agent_endpoint: "/".to_string(),
}; };
let current_dir = current_dir()?; let current_dir = current_dir()?;
let job_path = current_dir.join(format!("{job_id}")); let job_path = current_dir.join(format!("{job_id}"));
Ok(LocalContext { Ok(LocalContext {

View File

@ -60,7 +60,7 @@ pub fn build_merge_config(
pub async fn run(args: &clap::ArgMatches, event_sender: Option<Sender<UiEvent>>) -> Result<()> { pub async fn run(args: &clap::ArgMatches, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone()).await?; let context = build_local_context(args, true, event_sender.clone()).await?;
let config = build_merge_config(args, None, context.common_config.clone(), event_sender)?; let config = build_merge_config(args, None, context.common_config.clone(), event_sender)?;
spawn(std::sync::Arc::new(config)).await spawn(config).await
} }
pub fn build_shared_args() -> Vec<Arg> { pub fn build_shared_args() -> Vec<Arg> {

View File

@ -29,7 +29,13 @@ pub async fn run(args: &clap::ArgMatches, event_sender: Option<Sender<UiEvent>>)
.get_one::<u64>(CHECK_RETRY_COUNT) .get_one::<u64>(CHECK_RETRY_COUNT)
.copied() .copied()
.expect("has a default value"); .expect("has a default value");
let extra_dir = context.common_config.extra_dir.as_deref();
let extra_setup_dir = context.common_config.extra_setup_dir.as_deref();
let extra_output_dir = context
.common_config
.extra_output
.as_ref()
.map(|x| x.local_path.as_path());
let config = TestInputArgs { let config = TestInputArgs {
target_exe: target_exe.as_path(), target_exe: target_exe.as_path(),
@ -42,7 +48,8 @@ pub async fn run(args: &clap::ArgMatches, event_sender: Option<Sender<UiEvent>>)
target_timeout, target_timeout,
check_retry_count, check_retry_count,
setup_dir: &context.common_config.setup_dir, setup_dir: &context.common_config.setup_dir,
extra_dir, extra_setup_dir,
extra_output_dir,
minimized_stack_depth: None, minimized_stack_depth: None,
machine_identity: context.common_config.machine_identity, machine_identity: context.common_config.machine_identity,
}; };

View File

@ -44,7 +44,7 @@ pub async fn run(args: &clap::ArgMatches, event_sender: Option<Sender<UiEvent>>)
target_timeout, target_timeout,
check_retry_count, check_retry_count,
setup_dir: &context.common_config.setup_dir, setup_dir: &context.common_config.setup_dir,
extra_dir: context.common_config.extra_dir.as_deref(), extra_setup_dir: context.common_config.extra_setup_dir.as_deref(),
minimized_stack_depth: None, minimized_stack_depth: None,
check_asan_log, check_asan_log,
check_debugger, check_debugger,

View File

@ -38,15 +38,18 @@ pub async fn run(args: &clap::ArgMatches) -> Result<()> {
.start()?; .start()?;
let config_path = args let config_path = args
.get_one::<PathBuf>("config") .get_one::<PathBuf>(CONFIG_ARG)
.expect("marked as required"); .expect("marked as required");
let setup_dir = args let setup_dir = args
.get_one::<PathBuf>("setup_dir") .get_one::<PathBuf>(SETUP_DIR_ARG)
.expect("marked as required"); .expect("marked as required");
let extra_dir = args.get_one::<PathBuf>("extra_dir").map(|f| f.as_path()); let extra_setup_dir = args
let config = Config::from_file(config_path, setup_dir, extra_dir)?; .get_one::<PathBuf>(EXTRA_SETUP_DIR_ARG)
.map(ToOwned::to_owned);
let config = Config::from_file(config_path, setup_dir.clone(), extra_setup_dir)?;
info!("Creating channel from agent to task"); info!("Creating channel from agent to task");
let (agent_sender, receive_from_agent): ( let (agent_sender, receive_from_agent): (
@ -174,21 +177,25 @@ async fn init_telemetry(config: &CommonConfig) {
.await; .await;
} }
const CONFIG_ARG: &str = "config";
const SETUP_DIR_ARG: &str = "setup_dir";
const EXTRA_SETUP_DIR_ARG: &str = "extra_setup_dir";
pub fn args(name: &'static str) -> Command { pub fn args(name: &'static str) -> Command {
Command::new(name) Command::new(name)
.about("managed fuzzing") .about("managed fuzzing")
.arg( .arg(
Arg::new("config") Arg::new(CONFIG_ARG)
.required(true) .required(true)
.value_parser(value_parser!(PathBuf)), .value_parser(value_parser!(PathBuf)),
) )
.arg( .arg(
Arg::new("setup_dir") Arg::new(SETUP_DIR_ARG)
.required(true) .required(true)
.value_parser(value_parser!(PathBuf)), .value_parser(value_parser!(PathBuf)),
) )
.arg( .arg(
Arg::new("extra_dir") Arg::new(EXTRA_SETUP_DIR_ARG)
.required(false) .required(false)
.value_parser(value_parser!(PathBuf)), .value_parser(value_parser!(PathBuf)),
) )

View File

@ -209,8 +209,9 @@ pub async fn run_tool(
.output_dir(&config.analysis.local_path) .output_dir(&config.analysis.local_path)
.tools_dir(&config.tools.local_path) .tools_dir(&config.tools.local_path)
.setup_dir(&config.common.setup_dir) .setup_dir(&config.common.setup_dir)
.set_optional_ref(&config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}) })
.job_id(&config.common.job_id) .job_id(&config.common.job_id)
.task_id(&config.common.task_id) .task_id(&config.common.task_id)

View File

@ -9,8 +9,11 @@ use crate::tasks::{
heartbeat::{init_task_heartbeat, TaskHeartbeatClient}, heartbeat::{init_task_heartbeat, TaskHeartbeatClient},
merge, regression, report, merge, regression, report,
}; };
use anyhow::Result; use anyhow::{Context, Result};
use onefuzz::machine_id::MachineIdentity; use onefuzz::{
machine_id::MachineIdentity,
syncdir::{SyncOperation, SyncedDir},
};
use onefuzz_telemetry::{ use onefuzz_telemetry::{
self as telemetry, Event::task_start, EventData, InstanceTelemetryKey, MicrosoftTelemetryKey, self as telemetry, Event::task_start, EventData, InstanceTelemetryKey, MicrosoftTelemetryKey,
Role, Role,
@ -20,9 +23,9 @@ use serde::{self, Deserialize};
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc,
time::Duration, time::Duration,
}; };
use tokio_util::sync::CancellationToken;
use uuid::Uuid; use uuid::Uuid;
const DEFAULT_MIN_AVAILABLE_MEMORY_MB: u64 = 100; const DEFAULT_MIN_AVAILABLE_MEMORY_MB: u64 = 100;
@ -57,7 +60,10 @@ pub struct CommonConfig {
pub setup_dir: PathBuf, pub setup_dir: PathBuf,
#[serde(default)] #[serde(default)]
pub extra_dir: Option<PathBuf>, pub extra_setup_dir: Option<PathBuf>,
#[serde(default)]
pub extra_output: Option<SyncedDir>,
/// Lower bound on available system memory. If the available memory drops /// Lower bound on available system memory. If the available memory drops
/// below the limit, the task will exit with an error. This is a fail-fast /// below the limit, the task will exit with an error. This is a fail-fast
@ -102,18 +108,15 @@ impl CommonConfig {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(tag = "task_type")] #[serde(tag = "task_type")]
pub enum Config { pub enum Config {
#[cfg(any(target_os = "linux", target_os = "windows"))]
#[serde(alias = "coverage")] #[serde(alias = "coverage")]
Coverage(coverage::generic::Config), Coverage(coverage::generic::Config),
#[cfg(any(target_os = "linux", target_os = "windows"))]
#[serde(alias = "dotnet_coverage")] #[serde(alias = "dotnet_coverage")]
DotnetCoverage(coverage::dotnet::Config), DotnetCoverage(coverage::dotnet::Config),
#[serde(alias = "dotnet_crash_report")] #[serde(alias = "dotnet_crash_report")]
DotnetCrashReport(report::dotnet::generic::Config), DotnetCrashReport(report::dotnet::generic::Config),
#[cfg(any(target_os = "linux", target_os = "windows"))]
#[serde(alias = "libfuzzer_dotnet_fuzz")] #[serde(alias = "libfuzzer_dotnet_fuzz")]
LibFuzzerDotnetFuzz(fuzz::libfuzzer::dotnet::Config), LibFuzzerDotnetFuzz(fuzz::libfuzzer::dotnet::Config),
@ -149,24 +152,26 @@ pub enum Config {
} }
impl Config { impl Config {
pub fn from_file(path: &Path, setup_dir: &Path, extra_dir: Option<&Path>) -> Result<Self> { pub fn from_file(
let json = std::fs::read_to_string(path)?; path: &Path,
let json_config: serde_json::Value = serde_json::from_str(&json)?; setup_dir: PathBuf,
extra_setup_dir: Option<PathBuf>,
) -> Result<Self> {
let json = std::fs::read_to_string(path)
.with_context(|| format!("loading config from {}", path.display()))?;
let mut config = serde_json::from_str::<Self>(&json).context("deserializing Config")?;
// override the setup_dir in the config file with the parameter value if specified // override the setup_dir in the config file with the parameter value if specified
let mut config: Self = serde_json::from_value(json_config)?; config.common_mut().setup_dir = setup_dir;
config.common_mut().extra_setup_dir = extra_setup_dir;
config.common_mut().setup_dir = setup_dir.to_owned();
config.common_mut().extra_dir = extra_dir.map(|x| x.to_owned());
Ok(config) Ok(config)
} }
fn common_mut(&mut self) -> &mut CommonConfig { fn common_mut(&mut self) -> &mut CommonConfig {
match self { match self {
#[cfg(any(target_os = "linux", target_os = "windows"))]
Config::Coverage(c) => &mut c.common, Config::Coverage(c) => &mut c.common,
#[cfg(any(target_os = "linux", target_os = "windows"))]
Config::DotnetCoverage(c) => &mut c.common, Config::DotnetCoverage(c) => &mut c.common,
Config::DotnetCrashReport(c) => &mut c.common, Config::DotnetCrashReport(c) => &mut c.common,
Config::LibFuzzerDotnetFuzz(c) => &mut c.common, Config::LibFuzzerDotnetFuzz(c) => &mut c.common,
@ -185,9 +190,7 @@ impl Config {
pub fn common(&self) -> &CommonConfig { pub fn common(&self) -> &CommonConfig {
match self { match self {
#[cfg(any(target_os = "linux", target_os = "windows"))]
Config::Coverage(c) => &c.common, Config::Coverage(c) => &c.common,
#[cfg(any(target_os = "linux", target_os = "windows"))]
Config::DotnetCoverage(c) => &c.common, Config::DotnetCoverage(c) => &c.common,
Config::DotnetCrashReport(c) => &c.common, Config::DotnetCrashReport(c) => &c.common,
Config::LibFuzzerDotnetFuzz(c) => &c.common, Config::LibFuzzerDotnetFuzz(c) => &c.common,
@ -206,9 +209,7 @@ impl Config {
pub fn report_event(&self) { pub fn report_event(&self) {
let event_type = match self { let event_type = match self {
#[cfg(any(target_os = "linux", target_os = "windows"))]
Config::Coverage(_) => "coverage", Config::Coverage(_) => "coverage",
#[cfg(any(target_os = "linux", target_os = "windows"))]
Config::DotnetCoverage(_) => "dotnet_coverage", Config::DotnetCoverage(_) => "dotnet_coverage",
Config::DotnetCrashReport(_) => "dotnet_crash_report", Config::DotnetCrashReport(_) => "dotnet_crash_report",
Config::LibFuzzerDotnetFuzz(_) => "libfuzzer_fuzz", Config::LibFuzzerDotnetFuzz(_) => "libfuzzer_fuzz",
@ -254,56 +255,89 @@ impl Config {
info!("agent ready, dispatching task"); info!("agent ready, dispatching task");
self.report_event(); self.report_event();
match self { let extra_output_dir = self.common().extra_output.clone();
#[cfg(any(target_os = "linux", target_os = "windows"))] if let Some(dir) = &extra_output_dir {
Config::Coverage(config) => coverage::generic::CoverageTask::new(config).run().await, // setup the directory
#[cfg(any(target_os = "linux", target_os = "windows"))] dir.init().await.context("initing extra_output_dir")?;
Config::DotnetCoverage(config) => {
coverage::dotnet::DotnetCoverageTask::new(config)
.run()
.await
}
Config::DotnetCrashReport(config) => {
report::dotnet::generic::DotnetCrashReportTask::new(config)
.run()
.await
}
Config::LibFuzzerDotnetFuzz(config) => {
fuzz::libfuzzer::dotnet::LibFuzzerDotnetFuzzTask::new(config)?
.run()
.await
}
Config::LibFuzzerFuzz(config) => {
fuzz::libfuzzer::generic::LibFuzzerFuzzTask::new(config)?
.run()
.await
}
Config::LibFuzzerReport(config) => {
report::libfuzzer_report::ReportTask::new(config)
.managed_run()
.await
}
Config::LibFuzzerMerge(config) => merge::libfuzzer_merge::spawn(Arc::new(config)).await,
Config::GenericAnalysis(config) => analysis::generic::run(config).await,
Config::GenericGenerator(config) => {
fuzz::generator::GeneratorTask::new(config).run().await
}
Config::GenericSupervisor(config) => fuzz::supervisor::spawn(config).await,
Config::GenericMerge(config) => merge::generic::spawn(Arc::new(config)).await,
Config::GenericReport(config) => {
report::generic::ReportTask::new(config).managed_run().await
}
Config::GenericRegression(config) => {
regression::generic::GenericRegressionTask::new(config)
.run()
.await
}
Config::LibFuzzerRegression(config) => {
regression::libfuzzer::LibFuzzerRegressionTask::new(config)
.run()
.await
}
} }
let sync_cancellation = CancellationToken::new();
let background_sync_task = async {
if let Some(dir) = extra_output_dir {
// push it continually
dir.continuous_sync(SyncOperation::Push, None, &sync_cancellation)
.await?;
// when we are cancelled, do one more sync, to ensure
// everything is up-to-date
dir.sync_push().await?;
Ok(())
} else {
Ok(())
}
};
let run_task = async {
let result = match self {
Config::Coverage(config) => {
coverage::generic::CoverageTask::new(config).run().await
}
Config::DotnetCoverage(config) => {
coverage::dotnet::DotnetCoverageTask::new(config)
.run()
.await
}
Config::DotnetCrashReport(config) => {
report::dotnet::generic::DotnetCrashReportTask::new(config)
.run()
.await
}
Config::LibFuzzerDotnetFuzz(config) => {
fuzz::libfuzzer::dotnet::LibFuzzerDotnetFuzzTask::new(config)?
.run()
.await
}
Config::LibFuzzerFuzz(config) => {
fuzz::libfuzzer::generic::LibFuzzerFuzzTask::new(config)?
.run()
.await
}
Config::LibFuzzerReport(config) => {
report::libfuzzer_report::ReportTask::new(config)
.managed_run()
.await
}
Config::LibFuzzerMerge(config) => merge::libfuzzer_merge::spawn(config).await,
Config::GenericAnalysis(config) => analysis::generic::run(config).await,
Config::GenericGenerator(config) => {
fuzz::generator::GeneratorTask::new(config).run().await
}
Config::GenericSupervisor(config) => fuzz::supervisor::spawn(config).await,
Config::GenericMerge(config) => merge::generic::spawn(&config).await,
Config::GenericReport(config) => {
report::generic::ReportTask::new(config).managed_run().await
}
Config::GenericRegression(config) => {
regression::generic::GenericRegressionTask::new(config)
.run()
.await
}
Config::LibFuzzerRegression(config) => {
regression::libfuzzer::LibFuzzerRegressionTask::new(config)
.run()
.await
}
};
// once main task is complete, cancel sync;
// this will stop continuous sync and perform one final sync
sync_cancellation.cancel();
result
};
tokio::try_join!(run_task, background_sync_task)?;
Ok(())
} }
} }

View File

@ -298,8 +298,9 @@ impl<'a> TaskContext<'a> {
.input_path(input) .input_path(input)
.job_id(&self.config.common.job_id) .job_id(&self.config.common.job_id)
.setup_dir(&self.config.common.setup_dir) .setup_dir(&self.config.common.setup_dir)
.set_optional_ref(&self.config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&self.config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&self.config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}) })
.target_exe(&target_exe) .target_exe(&target_exe)
.target_options(&self.config.target_options) .target_options(&self.config.target_options)

View File

@ -298,8 +298,9 @@ impl<'a> TaskContext<'a> {
.input_path(input) .input_path(input)
.job_id(&self.config.common.job_id) .job_id(&self.config.common.job_id)
.setup_dir(&self.config.common.setup_dir) .setup_dir(&self.config.common.setup_dir)
.set_optional_ref(&self.config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&self.config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&self.config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}) })
.target_exe(&target_exe) .target_exe(&target_exe)
.target_options(&self.config.target_options) .target_options(&self.config.target_options)

View File

@ -99,7 +99,7 @@ impl GeneratorTask {
let tester = Tester::new( let tester = Tester::new(
&self.config.common.setup_dir, &self.config.common.setup_dir,
self.config.common.extra_dir.as_deref(), self.config.common.extra_setup_dir.as_deref(),
&target_exe, &target_exe,
&self.config.target_options, &self.config.target_options,
&self.config.target_env, &self.config.target_env,
@ -168,8 +168,9 @@ impl GeneratorTask {
let expand = Expand::new(&self.config.common.machine_identity) let expand = Expand::new(&self.config.common.machine_identity)
.machine_id() .machine_id()
.setup_dir(&self.config.common.setup_dir) .setup_dir(&self.config.common.setup_dir)
.set_optional_ref(&self.config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&self.config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&self.config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}) })
.generated_inputs(&output_dir) .generated_inputs(&output_dir)
.input_corpus(&corpus_dir) .input_corpus(&corpus_dir)
@ -301,7 +302,8 @@ mod tests {
microsoft_telemetry_key: Default::default(), microsoft_telemetry_key: Default::default(),
logs: Default::default(), logs: Default::default(),
setup_dir: Default::default(), setup_dir: Default::default(),
extra_dir: Default::default(), extra_setup_dir: Default::default(),
extra_output: Default::default(),
min_available_memory_mb: Default::default(), min_available_memory_mb: Default::default(),
machine_identity: onefuzz::machine_id::MachineIdentity { machine_identity: onefuzz::machine_id::MachineIdentity {
machine_id: uuid::Uuid::new_v4(), machine_id: uuid::Uuid::new_v4(),

View File

@ -85,8 +85,13 @@ impl common::LibFuzzerType for LibFuzzerDotnet {
config.extra.libfuzzer_dotnet_path(), config.extra.libfuzzer_dotnet_path(),
options, options,
env, env,
&config.common.setup_dir, config.common.setup_dir.clone(),
config.common.extra_dir.as_ref(), config.common.extra_setup_dir.clone(),
config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.clone()),
config.common.machine_identity.clone(), config.common.machine_identity.clone(),
)) ))
} }

View File

@ -27,8 +27,13 @@ impl common::LibFuzzerType for GenericLibFuzzer {
target_exe, target_exe,
config.target_options.clone(), config.target_options.clone(),
config.target_env.clone(), config.target_env.clone(),
&config.common.setup_dir, config.common.setup_dir.clone(),
config.common.extra_dir.as_ref(), config.common.extra_setup_dir.clone(),
config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.clone()),
config.common.machine_identity.clone(), config.common.machine_identity.clone(),
)) ))
} }

View File

@ -33,6 +33,7 @@ use tokio::{
process::{Child, Command}, process::{Child, Command},
sync::Notify, sync::Notify,
}; };
use tokio_util::sync::CancellationToken;
use futures::TryFutureExt; use futures::TryFutureExt;
@ -83,7 +84,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
if let Some(coverage) = &config.coverage { if let Some(coverage) = &config.coverage {
coverage.init_pull().await?; coverage.init_pull().await?;
} }
let monitor_coverage_future = monitor_coverage(&config.coverage, config.ensemble_sync_delay);
let monitor_coverage_cancellation = CancellationToken::new(); // never actually cancelled, yet
let monitor_coverage_future = monitor_coverage(
&config.coverage,
config.ensemble_sync_delay,
&monitor_coverage_cancellation,
);
// setup reports // setup reports
let reports_dir = tempdir()?; let reports_dir = tempdir()?;
@ -124,7 +131,9 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
} }
} }
let monitor_inputs = inputs.monitor_results(new_coverage, false); let monitor_inputs = inputs.monitor_results(new_coverage, false);
let continuous_sync_task = inputs.continuous_sync(Pull, config.ensemble_sync_delay); let inputs_sync_cancellation = CancellationToken::new(); // never actually cancelled
let inputs_sync_task =
inputs.continuous_sync(Pull, config.ensemble_sync_delay, &inputs_sync_cancellation);
let process = start_supervisor( let process = start_supervisor(
&runtime_dir.path(), &runtime_dir.path(),
@ -162,7 +171,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
monitor_stats.map_err(|e| e.context("Failure in monitor_stats")), monitor_stats.map_err(|e| e.context("Failure in monitor_stats")),
monitor_crashes.map_err(|e| e.context("Failure in monitor_crashes")), monitor_crashes.map_err(|e| e.context("Failure in monitor_crashes")),
monitor_inputs.map_err(|e| e.context("Failure in monitor_inputs")), monitor_inputs.map_err(|e| e.context("Failure in monitor_inputs")),
continuous_sync_task.map_err(|e| e.context("Failure in continuous_sync_task")), inputs_sync_task.map_err(|e| e.context("Failure in continuous_sync_task")),
monitor_reports_future.map_err(|e| e.context("Failure in monitor_reports_future")), monitor_reports_future.map_err(|e| e.context("Failure in monitor_reports_future")),
monitor_coverage_future.map_err(|e| e.context("Failure in monitor_coverage_future")), monitor_coverage_future.map_err(|e| e.context("Failure in monitor_coverage_future")),
)?; )?;
@ -173,9 +182,12 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
async fn monitor_coverage( async fn monitor_coverage(
coverage: &Option<SyncedDir>, coverage: &Option<SyncedDir>,
ensemble_sync_delay: Option<u64>, ensemble_sync_delay: Option<u64>,
cancellation_token: &CancellationToken,
) -> Result<()> { ) -> Result<()> {
if let Some(coverage) = coverage { if let Some(coverage) = coverage {
coverage.continuous_sync(Push, ensemble_sync_delay).await?; coverage
.continuous_sync(Push, ensemble_sync_delay, cancellation_token)
.await?;
} }
Ok(()) Ok(())
} }
@ -212,8 +224,9 @@ async fn start_supervisor(
.input_corpus(&inputs.local_path) .input_corpus(&inputs.local_path)
.reports_dir(reports_dir) .reports_dir(reports_dir)
.setup_dir(&config.common.setup_dir) .setup_dir(&config.common.setup_dir)
.set_optional_ref(&config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}) })
.job_id(&config.common.job_id) .job_id(&config.common.job_id)
.task_id(&config.common.task_id) .task_id(&config.common.task_id)
@ -392,7 +405,8 @@ mod tests {
microsoft_telemetry_key: Default::default(), microsoft_telemetry_key: Default::default(),
logs: Default::default(), logs: Default::default(),
setup_dir: Default::default(), setup_dir: Default::default(),
extra_dir: Default::default(), extra_setup_dir: Default::default(),
extra_output: Default::default(),
min_available_memory_mb: Default::default(), min_available_memory_mb: Default::default(),
machine_identity: MachineIdentity { machine_identity: MachineIdentity {
machine_id: uuid::Uuid::new_v4(), machine_id: uuid::Uuid::new_v4(),

View File

@ -18,7 +18,6 @@ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
process::Stdio, process::Stdio,
sync::Arc,
}; };
use storage_queue::{QueueClient, EMPTY_QUEUE_DELAY}; use storage_queue::{QueueClient, EMPTY_QUEUE_DELAY};
use tokio::process::Command; use tokio::process::Command;
@ -41,7 +40,7 @@ pub struct Config {
pub common: CommonConfig, pub common: CommonConfig,
} }
pub async fn spawn(config: Arc<Config>) -> Result<()> { pub async fn spawn(config: &Config) -> Result<()> {
config.tools.init_pull().await?; config.tools.init_pull().await?;
set_executable(&config.tools.local_path).await?; set_executable(&config.tools.local_path).await?;
@ -64,7 +63,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
} }
}; };
if let Err(error) = process_message(config.clone(), &input_url, &tmp_dir).await { if let Err(error) = process_message(config, &input_url, &tmp_dir).await {
error!( error!(
"failed to process latest message from notification queue: {}", "failed to process latest message from notification queue: {}",
error error
@ -90,13 +89,13 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
} }
} }
async fn process_message(config: Arc<Config>, input_url: &Url, tmp_dir: &Path) -> Result<()> { async fn process_message(config: &Config, input_url: &Url, tmp_dir: &Path) -> Result<()> {
let input_path = let input_path =
utils::download_input(input_url.clone(), &config.unique_inputs.local_path).await?; utils::download_input(input_url.clone(), &config.unique_inputs.local_path).await?;
info!("downloaded input to {}", input_path.display()); info!("downloaded input to {}", input_path.display());
info!("Merging corpus"); info!("Merging corpus");
match merge(&config, tmp_dir).await { match merge(config, tmp_dir).await {
Ok(_) => { Ok(_) => {
// remove the 'queue' folder // remove the 'queue' folder
let mut queue_dir = tmp_dir.to_path_buf(); let mut queue_dir = tmp_dir.to_path_buf();
@ -140,8 +139,9 @@ async fn merge(config: &Config, output_dir: impl AsRef<Path>) -> Result<()> {
.generated_inputs(output_dir) .generated_inputs(output_dir)
.target_exe(&target_exe) .target_exe(&target_exe)
.setup_dir(&config.common.setup_dir) .setup_dir(&config.common.setup_dir)
.set_optional_ref(&config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}) })
.tools_dir(&config.tools.local_path) .tools_dir(&config.tools.local_path)
.job_id(&config.common.job_id) .job_id(&config.common.job_id)

View File

@ -19,7 +19,6 @@ use serde::Deserialize;
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc,
}; };
use storage_queue::{QueueClient, EMPTY_QUEUE_DELAY}; use storage_queue::{QueueClient, EMPTY_QUEUE_DELAY};
@ -40,13 +39,18 @@ pub struct Config {
pub common: CommonConfig, pub common: CommonConfig,
} }
pub async fn spawn(config: Arc<Config>) -> Result<()> { pub async fn spawn(config: Config) -> Result<()> {
let fuzzer = LibFuzzer::new( let fuzzer = LibFuzzer::new(
&config.target_exe, config.target_exe.clone(),
config.target_options.clone(), config.target_options.clone(),
config.target_env.clone(), config.target_env.clone(),
&config.common.setup_dir, config.common.setup_dir.clone(),
config.common.extra_dir.clone(), config.common.extra_setup_dir.clone(),
config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.clone()),
config.common.machine_identity.clone(), config.common.machine_identity.clone(),
); );
fuzzer.verify(config.check_fuzzer_help, None).await?; fuzzer.verify(config.check_fuzzer_help, None).await?;
@ -54,7 +58,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
config.unique_inputs.init().await?; config.unique_inputs.init().await?;
if let Some(queue) = config.input_queue.clone() { if let Some(queue) = config.input_queue.clone() {
loop { loop {
if let Err(error) = process_message(config.clone(), queue.clone()).await { if let Err(error) = process_message(&config, queue.clone()).await {
error!( error!(
"failed to process latest message from notification queue: {}", "failed to process latest message from notification queue: {}",
error error
@ -68,7 +72,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
} }
let input_paths = config.inputs.iter().map(|i| &i.local_path).collect(); let input_paths = config.inputs.iter().map(|i| &i.local_path).collect();
sync_and_merge( sync_and_merge(
config.clone(), &config,
input_paths, input_paths,
false, false,
config.preserve_existing_outputs, config.preserve_existing_outputs,
@ -78,7 +82,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
} }
} }
async fn process_message(config: Arc<Config>, input_queue: QueueClient) -> Result<()> { async fn process_message(config: &Config, input_queue: QueueClient) -> Result<()> {
let hb_client = config.common.init_heartbeat(None).await?; let hb_client = config.common.init_heartbeat(None).await?;
hb_client.alive(); hb_client.alive();
let tmp_dir = "./tmp"; let tmp_dir = "./tmp";
@ -100,7 +104,7 @@ async fn process_message(config: Arc<Config>, input_queue: QueueClient) -> Resul
let input_path = utils::download_input(input_url.clone(), tmp_dir).await?; let input_path = utils::download_input(input_url.clone(), tmp_dir).await?;
info!("downloaded input to {}", input_path.display()); info!("downloaded input to {}", input_path.display());
sync_and_merge(config.clone(), vec![tmp_dir], true, true).await?; sync_and_merge(config, vec![tmp_dir], true, true).await?;
debug!("will delete popped message with id = {}", msg.id()); debug!("will delete popped message with id = {}", msg.id());
@ -123,7 +127,7 @@ async fn process_message(config: Arc<Config>, input_queue: QueueClient) -> Resul
} }
async fn sync_and_merge( async fn sync_and_merge(
config: Arc<Config>, config: &Config,
input_dirs: Vec<impl AsRef<Path>>, input_dirs: Vec<impl AsRef<Path>>,
pull_inputs: bool, pull_inputs: bool,
preserve_existing_outputs: bool, preserve_existing_outputs: bool,
@ -131,7 +135,7 @@ async fn sync_and_merge(
if pull_inputs { if pull_inputs {
config.unique_inputs.sync_pull().await?; config.unique_inputs.sync_pull().await?;
} }
match merge_inputs(config.clone(), input_dirs).await { match merge_inputs(config, input_dirs).await {
Ok(result) => { Ok(result) => {
if result.added_files_count > 0 { if result.added_files_count > 0 {
info!("Added {} new files to the corpus", result.added_files_count); info!("Added {} new files to the corpus", result.added_files_count);
@ -152,16 +156,21 @@ async fn sync_and_merge(
} }
pub async fn merge_inputs( pub async fn merge_inputs(
config: Arc<Config>, config: &Config,
candidates: Vec<impl AsRef<Path>>, candidates: Vec<impl AsRef<Path>>,
) -> Result<LibFuzzerMergeOutput> { ) -> Result<LibFuzzerMergeOutput> {
info!("Merging corpus"); info!("Merging corpus");
let merger = LibFuzzer::new( let merger = LibFuzzer::new(
&config.target_exe, config.target_exe.clone(),
config.target_options.clone(), config.target_options.clone(),
config.target_env.clone(), config.target_env.clone(),
&config.common.setup_dir, config.common.setup_dir.clone(),
config.common.extra_dir.clone(), config.common.extra_setup_dir.clone(),
config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.clone()),
config.common.machine_identity.clone(), config.common.machine_identity.clone(),
); );
merger merger

View File

@ -60,7 +60,7 @@ impl RegressionHandler for GenericRegressionTask {
try_resolve_setup_relative_path(&self.config.common.setup_dir, &self.config.target_exe) try_resolve_setup_relative_path(&self.config.common.setup_dir, &self.config.target_exe)
.await?; .await?;
let extra_dir = self.config.common.extra_dir.as_deref(); let extra_setup_dir = self.config.common.extra_setup_dir.as_deref();
let args = generic::TestInputArgs { let args = generic::TestInputArgs {
input_url: Some(input_url), input_url: Some(input_url),
input: &input, input: &input,
@ -68,7 +68,7 @@ impl RegressionHandler for GenericRegressionTask {
target_options: &self.config.target_options, target_options: &self.config.target_options,
target_env: &self.config.target_env, target_env: &self.config.target_env,
setup_dir: &self.config.common.setup_dir, setup_dir: &self.config.common.setup_dir,
extra_dir, extra_setup_dir,
task_id: self.config.common.task_id, task_id: self.config.common.task_id,
job_id: self.config.common.job_id, job_id: self.config.common.job_id,
target_timeout: self.config.target_timeout, target_timeout: self.config.target_timeout,

View File

@ -66,7 +66,13 @@ impl RegressionHandler for LibFuzzerRegressionTask {
target_options: &self.config.target_options, target_options: &self.config.target_options,
target_env: &self.config.target_env, target_env: &self.config.target_env,
setup_dir: &self.config.common.setup_dir, setup_dir: &self.config.common.setup_dir,
extra_dir: self.config.common.extra_dir.as_deref(), extra_setup_dir: self.config.common.extra_setup_dir.as_deref(),
extra_output_dir: self
.config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.as_path()),
task_id: self.config.common.task_id, task_id: self.config.common.task_id,
job_id: self.config.common.job_id, job_id: self.config.common.job_id,
target_timeout: self.config.target_timeout, target_timeout: self.config.target_timeout,
@ -74,6 +80,7 @@ impl RegressionHandler for LibFuzzerRegressionTask {
minimized_stack_depth: self.config.minimized_stack_depth, minimized_stack_depth: self.config.minimized_stack_depth,
machine_identity: self.config.common.machine_identity.clone(), machine_identity: self.config.common.machine_identity.clone(),
}; };
libfuzzer_report::test_input(args).await libfuzzer_report::test_input(args).await
} }
} }

View File

@ -183,9 +183,11 @@ impl AsanProcessor {
let expand = Expand::new(&self.config.common.machine_identity) let expand = Expand::new(&self.config.common.machine_identity)
.input_path(input) .input_path(input)
.setup_dir(&self.config.common.setup_dir) .setup_dir(&self.config.common.setup_dir)
.set_optional_ref(&self.config.common.extra_dir, |expand, extra_dir| { .set_optional_ref(&self.config.common.extra_setup_dir, Expand::extra_setup_dir)
expand.extra_dir(extra_dir) .set_optional_ref(&self.config.common.extra_output, |expand, value| {
expand.extra_output_dir(value.local_path.as_path())
}); });
let expanded_args = expand.evaluate(&args)?; let expanded_args = expand.evaluate(&args)?;
let env = { let env = {

View File

@ -113,7 +113,7 @@ pub struct TestInputArgs<'a> {
pub target_options: &'a [String], pub target_options: &'a [String],
pub target_env: &'a HashMap<String, String>, pub target_env: &'a HashMap<String, String>,
pub setup_dir: &'a Path, pub setup_dir: &'a Path,
pub extra_dir: Option<&'a Path>, pub extra_setup_dir: Option<&'a Path>,
pub task_id: Uuid, pub task_id: Uuid,
pub job_id: Uuid, pub job_id: Uuid,
pub target_timeout: Option<u64>, pub target_timeout: Option<u64>,
@ -125,10 +125,10 @@ pub struct TestInputArgs<'a> {
} }
pub async fn test_input(args: TestInputArgs<'_>) -> Result<CrashTestResult> { pub async fn test_input(args: TestInputArgs<'_>) -> Result<CrashTestResult> {
let extra_dir = args.extra_dir; let extra_setup_dir = args.extra_setup_dir;
let tester = Tester::new( let tester = Tester::new(
args.setup_dir, args.setup_dir,
extra_dir, extra_setup_dir,
args.target_exe, args.target_exe,
args.target_options, args.target_options,
args.target_env, args.target_env,
@ -204,7 +204,7 @@ impl<'a> GenericReportProcessor<'a> {
try_resolve_setup_relative_path(&self.config.common.setup_dir, &self.config.target_exe) try_resolve_setup_relative_path(&self.config.common.setup_dir, &self.config.target_exe)
.await?; .await?;
let extra_dir = self.config.common.extra_dir.as_deref(); let extra_setup_dir = self.config.common.extra_setup_dir.as_deref();
let args = TestInputArgs { let args = TestInputArgs {
input_url, input_url,
input, input,
@ -212,7 +212,7 @@ impl<'a> GenericReportProcessor<'a> {
target_options: &self.config.target_options, target_options: &self.config.target_options,
target_env: &self.config.target_env, target_env: &self.config.target_env,
setup_dir: &self.config.common.setup_dir, setup_dir: &self.config.common.setup_dir,
extra_dir, extra_setup_dir,
task_id: self.config.common.task_id, task_id: self.config.common.task_id,
job_id: self.config.common.job_id, job_id: self.config.common.job_id,
target_timeout: self.config.target_timeout, target_timeout: self.config.target_timeout,

View File

@ -72,13 +72,19 @@ impl ReportTask {
.await?; .await?;
let fuzzer = LibFuzzer::new( let fuzzer = LibFuzzer::new(
&target_exe, target_exe,
self.config.target_options.clone(), self.config.target_options.clone(),
self.config.target_env.clone(), self.config.target_env.clone(),
&self.config.common.setup_dir, self.config.common.setup_dir.clone(),
self.config.common.extra_dir.clone(), self.config.common.extra_setup_dir.clone(),
self.config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.clone()),
self.config.common.machine_identity.clone(), self.config.common.machine_identity.clone(),
); );
fuzzer.verify(self.config.check_fuzzer_help, None).await fuzzer.verify(self.config.check_fuzzer_help, None).await
} }
@ -119,7 +125,8 @@ pub struct TestInputArgs<'a> {
pub target_options: &'a [String], pub target_options: &'a [String],
pub target_env: &'a HashMap<String, String>, pub target_env: &'a HashMap<String, String>,
pub setup_dir: &'a Path, pub setup_dir: &'a Path,
pub extra_dir: Option<&'a Path>, pub extra_setup_dir: Option<&'a Path>,
pub extra_output_dir: Option<&'a Path>,
pub task_id: uuid::Uuid, pub task_id: uuid::Uuid,
pub job_id: uuid::Uuid, pub job_id: uuid::Uuid,
pub target_timeout: Option<u64>, pub target_timeout: Option<u64>,
@ -130,11 +137,12 @@ pub struct TestInputArgs<'a> {
pub async fn test_input(args: TestInputArgs<'_>) -> Result<CrashTestResult> { pub async fn test_input(args: TestInputArgs<'_>) -> Result<CrashTestResult> {
let fuzzer = LibFuzzer::new( let fuzzer = LibFuzzer::new(
args.target_exe, args.target_exe.to_owned(),
args.target_options.to_vec(), args.target_options.to_vec(),
args.target_env.clone(), args.target_env.clone(),
args.setup_dir, args.setup_dir.to_owned(),
args.extra_dir.map(PathBuf::from), args.extra_setup_dir.map(PathBuf::from),
args.extra_output_dir.map(PathBuf::from),
args.machine_identity, args.machine_identity,
); );
@ -218,7 +226,13 @@ impl AsanProcessor {
target_options: &self.config.target_options, target_options: &self.config.target_options,
target_env: &self.config.target_env, target_env: &self.config.target_env,
setup_dir: &self.config.common.setup_dir, setup_dir: &self.config.common.setup_dir,
extra_dir: self.config.common.extra_dir.as_deref(), extra_setup_dir: self.config.common.extra_setup_dir.as_deref(),
extra_output_dir: self
.config
.common
.extra_output
.as_ref()
.map(|x| x.local_path.as_path()),
task_id: self.config.common.task_id, task_id: self.config.common.task_id,
job_id: self.config.common.job_id, job_id: self.config.common.job_id,
target_timeout: self.config.target_timeout, target_timeout: self.config.target_timeout,
@ -226,6 +240,7 @@ impl AsanProcessor {
minimized_stack_depth: self.config.minimized_stack_depth, minimized_stack_depth: self.config.minimized_stack_depth,
machine_identity: self.config.common.machine_identity.clone(), machine_identity: self.config.common.machine_identity.clone(),
}; };
let result = test_input(args).await?; let result = test_input(args).await?;
Ok(result) Ok(result)

View File

@ -41,7 +41,8 @@ pub enum PlaceHolder {
SupervisorExe, SupervisorExe,
SupervisorOptions, SupervisorOptions,
SetupDir, SetupDir,
ExtraDir, ExtraSetupDir,
ExtraOutputDir,
ReportsDir, ReportsDir,
JobId, JobId,
TaskId, TaskId,
@ -75,7 +76,8 @@ impl PlaceHolder {
Self::SupervisorExe => "{supervisor_exe}", Self::SupervisorExe => "{supervisor_exe}",
Self::SupervisorOptions => "{supervisor_options}", Self::SupervisorOptions => "{supervisor_options}",
Self::SetupDir => "{setup_dir}", Self::SetupDir => "{setup_dir}",
Self::ExtraDir => "{extra_dir}", Self::ExtraSetupDir => "{extra_setup_dir}",
Self::ExtraOutputDir => "{extra_output_dir}",
Self::ReportsDir => "{reports_dir}", Self::ReportsDir => "{reports_dir}",
Self::JobId => "{job_id}", Self::JobId => "{job_id}",
Self::TaskId => "{task_id}", Self::TaskId => "{task_id}",
@ -318,10 +320,16 @@ impl<'a> Expand<'a> {
self.set_value(PlaceHolder::SetupDir, ExpandedValue::Path(path)) self.set_value(PlaceHolder::SetupDir, ExpandedValue::Path(path))
} }
pub fn extra_dir(self, arg: impl AsRef<Path>) -> Self { pub fn extra_setup_dir(self, arg: impl AsRef<Path>) -> Self {
let arg = arg.as_ref(); let arg = arg.as_ref();
let path = String::from(arg.to_string_lossy()); let path = String::from(arg.to_string_lossy());
self.set_value(PlaceHolder::ExtraDir, ExpandedValue::Path(path)) self.set_value(PlaceHolder::ExtraSetupDir, ExpandedValue::Path(path))
}
pub fn extra_output_dir(self, arg: impl AsRef<Path>) -> Self {
let arg = arg.as_ref();
let path = String::from(arg.to_string_lossy());
self.set_value(PlaceHolder::ExtraOutputDir, ExpandedValue::Path(path))
} }
pub fn coverage_dir(self, arg: impl AsRef<Path>) -> Self { pub fn coverage_dir(self, arg: impl AsRef<Path>) -> Self {

View File

@ -26,7 +26,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
pub struct Tester<'a> { pub struct Tester<'a> {
setup_dir: &'a Path, setup_dir: &'a Path,
extra_dir: Option<&'a Path>, extra_setup_dir: Option<&'a Path>,
exe_path: &'a Path, exe_path: &'a Path,
arguments: &'a [String], arguments: &'a [String],
environ: &'a HashMap<String, String>, environ: &'a HashMap<String, String>,
@ -56,7 +56,7 @@ pub struct TestResult {
impl<'a> Tester<'a> { impl<'a> Tester<'a> {
pub fn new( pub fn new(
setup_dir: &'a Path, setup_dir: &'a Path,
extra_dir: Option<&'a Path>, extra_setup_dir: Option<&'a Path>,
exe_path: &'a Path, exe_path: &'a Path,
arguments: &'a [String], arguments: &'a [String],
environ: &'a HashMap<String, String>, environ: &'a HashMap<String, String>,
@ -64,7 +64,7 @@ impl<'a> Tester<'a> {
) -> Self { ) -> Self {
Self { Self {
setup_dir, setup_dir,
extra_dir, extra_setup_dir,
exe_path, exe_path,
arguments, arguments,
environ, environ,
@ -292,7 +292,7 @@ impl<'a> Tester<'a> {
.target_exe(self.exe_path) .target_exe(self.exe_path)
.target_options(self.arguments) .target_options(self.arguments)
.setup_dir(self.setup_dir) .setup_dir(self.setup_dir)
.set_optional(self.extra_dir, Expand::extra_dir); .set_optional(self.extra_setup_dir, Expand::extra_setup_dir);
let argv = expand.evaluate(self.arguments)?; let argv = expand.evaluate(self.arguments)?;
let mut env: HashMap<String, String> = HashMap::new(); let mut env: HashMap<String, String> = HashMap::new();

View File

@ -38,7 +38,8 @@ pub struct LibFuzzerMergeOutput {
pub struct LibFuzzer { pub struct LibFuzzer {
setup_dir: PathBuf, setup_dir: PathBuf,
extra_dir: Option<PathBuf>, extra_setup_dir: Option<PathBuf>,
extra_output_dir: Option<PathBuf>,
exe: PathBuf, exe: PathBuf,
options: Vec<String>, options: Vec<String>,
env: HashMap<String, String>, env: HashMap<String, String>,
@ -47,19 +48,21 @@ pub struct LibFuzzer {
impl LibFuzzer { impl LibFuzzer {
pub fn new( pub fn new(
exe: impl Into<PathBuf>, exe: PathBuf,
options: Vec<String>, options: Vec<String>,
env: HashMap<String, String>, env: HashMap<String, String>,
setup_dir: impl Into<PathBuf>, setup_dir: PathBuf,
extra_dir: Option<impl Into<PathBuf>>, extra_setup_dir: Option<PathBuf>,
extra_output_dir: Option<PathBuf>,
machine_identity: MachineIdentity, machine_identity: MachineIdentity,
) -> Self { ) -> Self {
Self { Self {
exe: exe.into(), exe,
options, options,
env, env,
setup_dir: setup_dir.into(), setup_dir,
extra_dir: extra_dir.map(|x| x.into()), extra_setup_dir,
extra_output_dir,
machine_identity, machine_identity,
} }
} }
@ -119,7 +122,8 @@ impl LibFuzzer {
.target_exe(&self.exe) .target_exe(&self.exe)
.target_options(&self.options) .target_options(&self.options)
.setup_dir(&self.setup_dir) .setup_dir(&self.setup_dir)
.set_optional(self.extra_dir.as_ref(), Expand::extra_dir) .set_optional_ref(&self.extra_setup_dir, Expand::extra_setup_dir)
.set_optional_ref(&self.extra_output_dir, Expand::extra_output_dir)
.set_optional(corpus_dir, Expand::input_corpus) .set_optional(corpus_dir, Expand::input_corpus)
.set_optional(fault_dir, Expand::crashes); .set_optional(fault_dir, Expand::crashes);
@ -371,7 +375,7 @@ impl LibFuzzer {
let mut tester = Tester::new( let mut tester = Tester::new(
&self.setup_dir, &self.setup_dir,
self.extra_dir.as_deref(), self.extra_setup_dir.as_deref(),
&self.exe, &self.exe,
&options, &options,
&self.env, &self.env,
@ -503,8 +507,9 @@ mod tests {
bad_bin, bad_bin,
options.clone(), options.clone(),
env.clone(), env.clone(),
temp_setup_dir.path(), temp_setup_dir.path().to_owned(),
Option::<PathBuf>::None, None,
None,
MachineIdentity { MachineIdentity {
machine_id: uuid::Uuid::new_v4(), machine_id: uuid::Uuid::new_v4(),
machine_name: "test-input".into(), machine_name: "test-input".into(),
@ -537,8 +542,9 @@ mod tests {
good_bin, good_bin,
options.clone(), options.clone(),
env.clone(), env.clone(),
temp_setup_dir.path(), temp_setup_dir.path().to_owned(),
Option::<PathBuf>::None, None,
None,
MachineIdentity { MachineIdentity {
machine_id: uuid::Uuid::new_v4(), machine_id: uuid::Uuid::new_v4(),
machine_name: "test-input".into(), machine_name: "test-input".into(),

View File

@ -16,7 +16,8 @@ use reqwest::{StatusCode, Url};
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{env::current_dir, path::PathBuf, str, time::Duration}; use std::{env::current_dir, path::PathBuf, str, time::Duration};
use tokio::fs; use tokio::{fs, select};
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum SyncOperation { pub enum SyncOperation {
@ -163,6 +164,7 @@ impl SyncedDir {
&self, &self,
operation: SyncOperation, operation: SyncOperation,
delay_seconds: Option<u64>, delay_seconds: Option<u64>,
cancellation_token: &CancellationToken,
) -> Result<()> { ) -> Result<()> {
let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS); let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS);
if delay_seconds == 0 { if delay_seconds == 0 {
@ -172,8 +174,17 @@ impl SyncedDir {
loop { loop {
self.sync(operation, false).await?; self.sync(operation, false).await?;
delay_with_jitter(delay).await; select! {
_ = cancellation_token.cancelled() => {
break;
}
_ = delay_with_jitter(delay) => {
continue;
}
}
} }
Ok(())
} }
// Conditionally upload a report, if it would not be a duplicate. // Conditionally upload a report, if it would not be a duplicate.

View File

@ -53,7 +53,7 @@ class AFL(Command):
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None, ensemble_sync_delay: Optional[int] = None,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" """
Basic AFL job Basic AFL job
@ -135,9 +135,12 @@ class AFL(Command):
(ContainerType.inputs, helper.containers[ContainerType.inputs]), (ContainerType.inputs, helper.containers[ContainerType.inputs]),
] ]
if extra_container is not None: if extra_setup_container is not None:
containers.append( containers.append(
(ContainerType.extra, helper.containers[ContainerType.extra]) (
ContainerType.extra_setup,
helper.containers[ContainerType.extra_setup],
)
) )
self.logger.info("creating afl fuzz task") self.logger.info("creating afl fuzz task")
@ -173,9 +176,12 @@ class AFL(Command):
), ),
] ]
if extra_container is not None: if extra_setup_container is not None:
report_containers.append( report_containers.append(
(ContainerType.extra, helper.containers[ContainerType.extra]) (
ContainerType.extra_setup,
helper.containers[ContainerType.extra_setup],
)
) )
self.logger.info("creating generic_crash_report task") self.logger.info("creating generic_crash_report task")

View File

@ -6,7 +6,7 @@
import os import os
import tempfile import tempfile
from enum import Enum from enum import Enum
from typing import Dict, List, Optional from typing import Dict, List, Optional, Tuple
from onefuzztypes.enums import OS, ContainerType, TaskDebugFlag, TaskType from onefuzztypes.enums import OS, ContainerType, TaskDebugFlag, TaskType
from onefuzztypes.models import Job, NotificationConfig from onefuzztypes.models import Job, NotificationConfig
@ -43,6 +43,14 @@ class Libfuzzer(Command):
if LIBFUZZER_MAGIC_STRING not in data: if LIBFUZZER_MAGIC_STRING not in data:
raise Exception("not a libfuzzer binary: %s" % target_exe) raise Exception("not a libfuzzer binary: %s" % target_exe)
def _add_optional_containers(
self,
dest: List[Tuple[ContainerType, Container]],
source: Dict[ContainerType, Container],
types: List[ContainerType],
) -> None:
dest.extend([(c, source[c]) for c in types if c in source])
def _create_tasks( def _create_tasks(
self, self,
*, *,
@ -86,10 +94,11 @@ class Libfuzzer(Command):
), ),
] ]
if ContainerType.extra in containers: self._add_optional_containers(
regression_containers.append( regression_containers,
(ContainerType.extra, containers[ContainerType.extra]) containers,
) [ContainerType.extra_setup, ContainerType.extra_output],
)
# We don't really need a separate timeout for crash reporting, and we could just # We don't really need a separate timeout for crash reporting, and we could just
# use `target_timeout`. But `crash_report_timeout` was introduced first, so we # use `target_timeout`. But `crash_report_timeout` was introduced first, so we
@ -124,18 +133,15 @@ class Libfuzzer(Command):
(ContainerType.inputs, containers[ContainerType.inputs]), (ContainerType.inputs, containers[ContainerType.inputs]),
] ]
if ContainerType.readonly_inputs in containers: self._add_optional_containers(
fuzzer_containers.append( fuzzer_containers,
( containers,
ContainerType.readonly_inputs, [
containers[ContainerType.readonly_inputs], ContainerType.extra_setup,
) ContainerType.extra_output,
) ContainerType.readonly_inputs,
],
if ContainerType.extra in containers: )
fuzzer_containers.append(
(ContainerType.extra, containers[ContainerType.extra])
)
self.logger.info("creating libfuzzer task") self.logger.info("creating libfuzzer task")
@ -180,18 +186,15 @@ class Libfuzzer(Command):
(ContainerType.readonly_inputs, containers[ContainerType.inputs]), (ContainerType.readonly_inputs, containers[ContainerType.inputs]),
] ]
if ContainerType.extra in containers: self._add_optional_containers(
coverage_containers.append( coverage_containers,
(ContainerType.extra, containers[ContainerType.extra]) containers,
) [
ContainerType.extra_setup,
if ContainerType.readonly_inputs in containers: ContainerType.extra_output,
coverage_containers.append( ContainerType.readonly_inputs,
( ],
ContainerType.readonly_inputs, )
containers[ContainerType.readonly_inputs],
)
)
self.logger.info("creating coverage task") self.logger.info("creating coverage task")
@ -245,10 +248,11 @@ class Libfuzzer(Command):
(ContainerType.no_repro, containers[ContainerType.no_repro]), (ContainerType.no_repro, containers[ContainerType.no_repro]),
] ]
if ContainerType.extra in containers: self._add_optional_containers(
report_containers.append( report_containers,
(ContainerType.extra, containers[ContainerType.extra]) containers,
) [ContainerType.extra_setup, ContainerType.extra_output],
)
self.logger.info("creating libfuzzer_crash_report task") self.logger.info("creating libfuzzer_crash_report task")
self.onefuzz.tasks.create( self.onefuzz.tasks.create(
@ -288,10 +292,11 @@ class Libfuzzer(Command):
(ContainerType.crashes, containers[ContainerType.crashes]), (ContainerType.crashes, containers[ContainerType.crashes]),
] ]
if ContainerType.extra in containers: self._add_optional_containers(
analysis_containers.append( analysis_containers,
(ContainerType.extra, containers[ContainerType.extra]) containers,
) [ContainerType.extra_setup, ContainerType.extra_output],
)
self.onefuzz.tasks.create( self.onefuzz.tasks.create(
job.job_id, job.job_id,
@ -357,7 +362,8 @@ class Libfuzzer(Command):
analyzer_options: Optional[List[str]] = None, analyzer_options: Optional[List[str]] = None,
analyzer_env: Optional[Dict[str, str]] = None, analyzer_env: Optional[Dict[str, str]] = None,
tools: Optional[Container] = None, tools: Optional[Container] = None,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
extra_output_container: Optional[Container] = None,
crashes: Optional[Container] = None, crashes: Optional[Container] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" """
@ -456,8 +462,11 @@ class Libfuzzer(Command):
containers = helper.containers containers = helper.containers
if extra_container is not None: if extra_setup_container is not None:
containers[ContainerType.extra] = extra_container containers[ContainerType.extra_setup] = extra_setup_container
if extra_output_container is not None:
containers[ContainerType.extra_output] = extra_output_container
self._create_tasks( self._create_tasks(
job=helper.job, job=helper.job,
@ -521,7 +530,7 @@ class Libfuzzer(Command):
preserve_existing_outputs: bool = False, preserve_existing_outputs: bool = False,
check_fuzzer_help: bool = False, check_fuzzer_help: bool = False,
no_check_fuzzer_help: bool = False, no_check_fuzzer_help: bool = False,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" """
libfuzzer merge task libfuzzer merge task
@ -590,8 +599,8 @@ class Libfuzzer(Command):
), ),
] ]
if extra_container is not None: if extra_setup_container is not None:
merge_containers.append((ContainerType.extra, extra_container)) merge_containers.append((ContainerType.extra_setup, extra_setup_container))
if inputs: if inputs:
merge_containers.append( merge_containers.append(
@ -657,7 +666,7 @@ class Libfuzzer(Command):
colocate_secondary_tasks: bool = True, colocate_secondary_tasks: bool = True,
expect_crash_on_failure: bool = False, expect_crash_on_failure: bool = False,
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
crashes: Optional[Container] = None, crashes: Optional[Container] = None,
) -> Optional[Job]: ) -> Optional[Job]:
pool = self.onefuzz.pools.get(pool_name) pool = self.onefuzz.pools.get(pool_name)
@ -740,8 +749,8 @@ class Libfuzzer(Command):
(ContainerType.tools, fuzzer_tools_container), (ContainerType.tools, fuzzer_tools_container),
] ]
if extra_container is not None: if extra_setup_container is not None:
fuzzer_containers.append((ContainerType.extra, extra_container)) fuzzer_containers.append((ContainerType.extra_setup, extra_setup_container))
helper.create_containers() helper.create_containers()
helper.setup_notifications(notification_config) helper.setup_notifications(notification_config)
@ -798,8 +807,10 @@ class Libfuzzer(Command):
(ContainerType.tools, fuzzer_tools_container), (ContainerType.tools, fuzzer_tools_container),
] ]
if extra_container is not None: if extra_setup_container is not None:
coverage_containers.append((ContainerType.extra, extra_container)) coverage_containers.append(
(ContainerType.extra_setup, extra_setup_container)
)
self.logger.info("creating `dotnet_coverage` task") self.logger.info("creating `dotnet_coverage` task")
self.onefuzz.tasks.create( self.onefuzz.tasks.create(
@ -829,8 +840,8 @@ class Libfuzzer(Command):
(ContainerType.tools, fuzzer_tools_container), (ContainerType.tools, fuzzer_tools_container),
] ]
if extra_container is not None: if extra_setup_container is not None:
report_containers.append((ContainerType.extra, extra_container)) report_containers.append((ContainerType.extra_setup, extra_setup_container))
self.logger.info("creating `dotnet_crash_report` task") self.logger.info("creating `dotnet_crash_report` task")
self.onefuzz.tasks.create( self.onefuzz.tasks.create(
@ -886,7 +897,7 @@ class Libfuzzer(Command):
check_retry_count: Optional[int] = 300, check_retry_count: Optional[int] = 300,
check_fuzzer_help: bool = False, check_fuzzer_help: bool = False,
no_check_fuzzer_help: bool = False, no_check_fuzzer_help: bool = False,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
crashes: Optional[Container] = None, crashes: Optional[Container] = None,
readonly_inputs: Optional[Container] = None, readonly_inputs: Optional[Container] = None,
) -> Optional[Job]: ) -> Optional[Job]:
@ -957,8 +968,8 @@ class Libfuzzer(Command):
(ContainerType.inputs, helper.containers[ContainerType.inputs]), (ContainerType.inputs, helper.containers[ContainerType.inputs]),
] ]
if extra_container is not None: if extra_setup_container is not None:
fuzzer_containers.append((ContainerType.extra, extra_container)) fuzzer_containers.append((ContainerType.extra_setup, extra_setup_container))
if readonly_inputs is not None: if readonly_inputs is not None:
self.onefuzz.containers.get(readonly_inputs) # ensure it exists self.onefuzz.containers.get(readonly_inputs) # ensure it exists
@ -1057,8 +1068,8 @@ class Libfuzzer(Command):
(ContainerType.no_repro, helper.containers[ContainerType.no_repro]), (ContainerType.no_repro, helper.containers[ContainerType.no_repro]),
] ]
if extra_container is not None: if extra_setup_container is not None:
report_containers.append((ContainerType.extra, extra_container)) report_containers.append((ContainerType.extra_setup, extra_setup_container))
self.logger.info("creating libfuzzer_crash_report task") self.logger.info("creating libfuzzer_crash_report task")
self.onefuzz.tasks.create( self.onefuzz.tasks.create(

View File

@ -119,7 +119,7 @@ class OssFuzz(Command):
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None, ensemble_sync_delay: Optional[int] = None,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> None: ) -> None:
""" """
OssFuzz style libfuzzer jobs OssFuzz style libfuzzer jobs
@ -214,8 +214,8 @@ class OssFuzz(Command):
ContainerType.coverage, ContainerType.coverage,
) )
if extra_container is not None: if extra_setup_container is not None:
helper.containers[ContainerType.extra] = extra_container helper.containers[ContainerType.extra_setup] = extra_setup_container
helper.create_containers() helper.create_containers()
helper.setup_notifications(notification_config) helper.setup_notifications(notification_config)

View File

@ -50,7 +50,7 @@ class Radamsa(Command):
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None, ensemble_sync_delay: Optional[int] = None,
target_timeout: Optional[int] = None, target_timeout: Optional[int] = None,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" """
Basic radamsa job Basic radamsa job
@ -157,8 +157,8 @@ class Radamsa(Command):
), ),
] ]
if extra_container is not None: if extra_setup_container is not None:
containers.append((ContainerType.extra, extra_container)) containers.append((ContainerType.extra_setup, extra_setup_container))
fuzzer_task = self.onefuzz.tasks.create( fuzzer_task = self.onefuzz.tasks.create(
helper.job.job_id, helper.job.job_id,
@ -193,8 +193,8 @@ class Radamsa(Command):
(ContainerType.no_repro, helper.containers[ContainerType.no_repro]), (ContainerType.no_repro, helper.containers[ContainerType.no_repro]),
] ]
if extra_container is not None: if extra_setup_container is not None:
report_containers.append((ContainerType.extra, extra_container)) report_containers.append((ContainerType.extra_setup, extra_setup_container))
self.logger.info("creating generic_crash_report task") self.logger.info("creating generic_crash_report task")
self.onefuzz.tasks.create( self.onefuzz.tasks.create(
@ -239,8 +239,10 @@ class Radamsa(Command):
(ContainerType.crashes, helper.containers[ContainerType.crashes]), (ContainerType.crashes, helper.containers[ContainerType.crashes]),
] ]
if extra_container is not None: if extra_setup_container is not None:
analysis_containers.append((ContainerType.extra, extra_container)) analysis_containers.append(
(ContainerType.extra_setup, extra_setup_container)
)
self.onefuzz.tasks.create( self.onefuzz.tasks.create(
helper.job.job_id, helper.job.job_id,

View File

@ -56,7 +56,7 @@ class Regression(Command):
check_fuzzer_help: bool = True, check_fuzzer_help: bool = True,
delete_input_container: bool = True, delete_input_container: bool = True,
check_regressions: bool = False, check_regressions: bool = False,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> None: ) -> None:
""" """
generic regression task generic regression task
@ -90,7 +90,7 @@ class Regression(Command):
check_fuzzer_help=check_fuzzer_help, check_fuzzer_help=check_fuzzer_help,
delete_input_container=delete_input_container, delete_input_container=delete_input_container,
check_regressions=check_regressions, check_regressions=check_regressions,
extra_container=extra_container, extra_setup_container=extra_setup_container,
) )
def libfuzzer( def libfuzzer(
@ -117,7 +117,7 @@ class Regression(Command):
check_fuzzer_help: bool = True, check_fuzzer_help: bool = True,
delete_input_container: bool = True, delete_input_container: bool = True,
check_regressions: bool = False, check_regressions: bool = False,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> None: ) -> None:
""" """
libfuzzer regression task libfuzzer regression task
@ -151,7 +151,7 @@ class Regression(Command):
check_fuzzer_help=check_fuzzer_help, check_fuzzer_help=check_fuzzer_help,
delete_input_container=delete_input_container, delete_input_container=delete_input_container,
check_regressions=check_regressions, check_regressions=check_regressions,
extra_container=extra_container, extra_setup_container=extra_setup_container,
) )
def _create_job( def _create_job(
@ -179,7 +179,7 @@ class Regression(Command):
check_fuzzer_help: bool = True, check_fuzzer_help: bool = True,
delete_input_container: bool = True, delete_input_container: bool = True,
check_regressions: bool = False, check_regressions: bool = False,
extra_container: Optional[Container] = None, extra_setup_container: Optional[Container] = None,
) -> None: ) -> None:
if dryrun: if dryrun:
return None return None
@ -221,8 +221,8 @@ class Regression(Command):
), ),
] ]
if extra_container: if extra_setup_container:
containers.append((ContainerType.extra, extra_container)) containers.append((ContainerType.extra_setup, extra_setup_container))
if crashes: if crashes:
helper.containers[ helper.containers[

View File

@ -106,10 +106,14 @@ TARGETS: Dict[str, Integration] = {
ContainerType.unique_reports: 1, ContainerType.unique_reports: 1,
ContainerType.coverage: 1, ContainerType.coverage: 1,
ContainerType.inputs: 2, ContainerType.inputs: 2,
ContainerType.extra_output: 1,
}, },
reboot_after_setup=True, reboot_after_setup=True,
inject_fake_regression=True, inject_fake_regression=True,
fuzzing_target_options=["--test:{extra_dir}"], fuzzing_target_options=[
"--test:{extra_setup_dir}",
"--write_test_file={extra_output_dir}/test.txt",
],
), ),
"linux-libfuzzer-with-options": Integration( "linux-libfuzzer-with-options": Integration(
template=TemplateType.libfuzzer, template=TemplateType.libfuzzer,
@ -181,7 +185,7 @@ TARGETS: Dict[str, Integration] = {
os=OS.linux, os=OS.linux,
target_exe="fuzz_target_1", target_exe="fuzz_target_1",
wait_for_files={ContainerType.unique_reports: 1, ContainerType.coverage: 1}, wait_for_files={ContainerType.unique_reports: 1, ContainerType.coverage: 1},
fuzzing_target_options=["--test:{extra_dir}"], fuzzing_target_options=["--test:{extra_setup_dir}"],
), ),
"linux-trivial-crash": Integration( "linux-trivial-crash": Integration(
template=TemplateType.radamsa, template=TemplateType.radamsa,
@ -209,9 +213,13 @@ TARGETS: Dict[str, Integration] = {
ContainerType.inputs: 2, ContainerType.inputs: 2,
ContainerType.unique_reports: 1, ContainerType.unique_reports: 1,
ContainerType.coverage: 1, ContainerType.coverage: 1,
ContainerType.extra_output: 1,
}, },
inject_fake_regression=True, inject_fake_regression=True,
fuzzing_target_options=["--test:{extra_dir}"], fuzzing_target_options=[
"--test:{extra_setup_dir}",
"--write_test_file={extra_output_dir}/test.txt",
],
), ),
"windows-libfuzzer-linked-library": Integration( "windows-libfuzzer-linked-library": Integration(
template=TemplateType.libfuzzer, template=TemplateType.libfuzzer,
@ -538,7 +546,7 @@ class TestOnefuzz:
) -> List[UUID]: ) -> List[UUID]:
"""Launch all of the fuzzing templates""" """Launch all of the fuzzing templates"""
pools = {} pools: Dict[OS, Pool] = {}
if unmanaged_pool is not None: if unmanaged_pool is not None:
pools[unmanaged_pool.the_os] = self.of.pools.get(unmanaged_pool.pool_name) pools[unmanaged_pool.the_os] = self.of.pools.get(unmanaged_pool.pool_name)
else: else:
@ -559,12 +567,14 @@ class TestOnefuzz:
self.logger.info("launching: %s", target) self.logger.info("launching: %s", target)
setup: Directory | str | None
if config.setup_dir is None: if config.setup_dir is None:
setup = ( if config.use_setup:
Directory(os.path.join(path, target)) if config.use_setup else None setup = Directory(os.path.join(path, target))
) else:
setup = None
else: else:
setup = config.setup_dir setup = Directory(config.setup_dir)
target_exe = File(os.path.join(path, target, config.target_exe)) target_exe = File(os.path.join(path, target, config.target_exe))
inputs = ( inputs = (
@ -577,87 +587,9 @@ class TestOnefuzz:
setup = Directory(os.path.join(setup, config.nested_setup_dir)) setup = Directory(os.path.join(setup, config.nested_setup_dir))
job: Optional[Job] = None job: Optional[Job] = None
if config.template == TemplateType.libfuzzer: job = self.build_job(
# building the extra container to test this variable substitution duration, pools, target, config, setup, target_exe, inputs
extra = self.of.containers.create("extra") )
job = self.of.template.libfuzzer.basic(
self.project,
target,
BUILD,
pools[config.os].name,
target_exe=target_exe,
inputs=inputs,
setup_dir=setup,
duration=duration,
vm_count=1,
reboot_after_setup=config.reboot_after_setup or False,
target_options=config.target_options,
fuzzing_target_options=config.fuzzing_target_options,
extra_container=Container(extra.name),
)
elif config.template == TemplateType.libfuzzer_dotnet:
if setup is None:
raise Exception("setup required for libfuzzer_dotnet")
if config.target_class is None:
raise Exception("target_class required for libfuzzer_dotnet")
if config.target_method is None:
raise Exception("target_method required for libfuzzer_dotnet")
job = self.of.template.libfuzzer.dotnet(
self.project,
target,
BUILD,
pools[config.os].name,
target_dll=config.target_exe,
inputs=inputs,
setup_dir=setup,
duration=duration,
vm_count=1,
fuzzing_target_options=config.target_options,
target_class=config.target_class,
target_method=config.target_method,
)
elif config.template == TemplateType.libfuzzer_qemu_user:
job = self.of.template.libfuzzer.qemu_user(
self.project,
target,
BUILD,
pools[config.os].name,
inputs=inputs,
target_exe=target_exe,
duration=duration,
vm_count=1,
target_options=config.target_options,
)
elif config.template == TemplateType.radamsa:
job = self.of.template.radamsa.basic(
self.project,
target,
BUILD,
pool_name=pools[config.os].name,
target_exe=target_exe,
inputs=inputs,
setup_dir=setup,
check_asan_log=config.check_asan_log or False,
disable_check_debugger=config.disable_check_debugger or False,
duration=duration,
vm_count=1,
)
elif config.template == TemplateType.afl:
job = self.of.template.afl.basic(
self.project,
target,
BUILD,
pool_name=pools[config.os].name,
target_exe=target_exe,
inputs=inputs,
setup_dir=setup,
duration=duration,
vm_count=1,
target_options=config.target_options,
)
else:
raise NotImplementedError
if config.inject_fake_regression and job is not None: if config.inject_fake_regression and job is not None:
self.of.debug.notification.job(job.job_id) self.of.debug.notification.job(job.job_id)
@ -669,6 +601,101 @@ class TestOnefuzz:
return job_ids return job_ids
def build_job(
self,
duration: int,
pools: Dict[OS, Pool],
target: str,
config: Integration,
setup: Optional[Directory],
target_exe: File,
inputs: Optional[Directory],
) -> Optional[Job]:
if config.template == TemplateType.libfuzzer:
# building the extra_setup & extra_output containers to test variable substitution
# and upload of files (in the case of extra_output)
extra_setup_container = self.of.containers.create("extra-setup")
extra_output_container = self.of.containers.create("extra-output")
return self.of.template.libfuzzer.basic(
self.project,
target,
BUILD,
pools[config.os].name,
target_exe=target_exe,
inputs=inputs,
setup_dir=setup,
duration=duration,
vm_count=1,
reboot_after_setup=config.reboot_after_setup or False,
target_options=config.target_options,
fuzzing_target_options=config.fuzzing_target_options,
extra_setup_container=Container(extra_setup_container.name),
extra_output_container=Container(extra_output_container.name),
)
elif config.template == TemplateType.libfuzzer_dotnet:
if setup is None:
raise Exception("setup required for libfuzzer_dotnet")
if config.target_class is None:
raise Exception("target_class required for libfuzzer_dotnet")
if config.target_method is None:
raise Exception("target_method required for libfuzzer_dotnet")
return self.of.template.libfuzzer.dotnet(
self.project,
target,
BUILD,
pools[config.os].name,
target_dll=File(config.target_exe),
inputs=inputs,
setup_dir=setup,
duration=duration,
vm_count=1,
fuzzing_target_options=config.target_options,
target_class=config.target_class,
target_method=config.target_method,
)
elif config.template == TemplateType.libfuzzer_qemu_user:
return self.of.template.libfuzzer.qemu_user(
self.project,
target,
BUILD,
pools[config.os].name,
inputs=inputs,
target_exe=target_exe,
duration=duration,
vm_count=1,
target_options=config.target_options,
)
elif config.template == TemplateType.radamsa:
return self.of.template.radamsa.basic(
self.project,
target,
BUILD,
pool_name=pools[config.os].name,
target_exe=target_exe,
inputs=inputs,
setup_dir=setup,
check_asan_log=config.check_asan_log or False,
disable_check_debugger=config.disable_check_debugger or False,
duration=duration,
vm_count=1,
)
elif config.template == TemplateType.afl:
return self.of.template.afl.basic(
self.project,
target,
BUILD,
pool_name=pools[config.os].name,
target_exe=target_exe,
inputs=inputs,
setup_dir=setup,
duration=duration,
vm_count=1,
target_options=config.target_options,
)
else:
raise NotImplementedError
def check_task( def check_task(
self, job: Job, task: Task, scalesets: List[Scaleset] self, job: Job, task: Task, scalesets: List[Scaleset]
) -> TaskTestState: ) -> TaskTestState:
@ -736,15 +763,18 @@ class TestOnefuzz:
job_tasks[job.job_id] = tasks job_tasks[job.job_id] = tasks
check_containers[job.job_id] = {} check_containers[job.job_id] = {}
for task in tasks: for task in tasks:
for container in task.config.containers: if task.config.containers:
if container.type in TARGETS[job.config.name].wait_for_files: for container in task.config.containers:
count = TARGETS[job.config.name].wait_for_files[container.type] if container.type in TARGETS[job.config.name].wait_for_files:
check_containers[job.job_id][container.name] = ( count = TARGETS[job.config.name].wait_for_files[
ContainerWrapper( container.type
self.of.containers.get(container.name).sas_url ]
), check_containers[job.job_id][container.name] = (
count, ContainerWrapper(
) self.of.containers.get(container.name).sas_url
),
count,
)
self.success = True self.success = True
self.logger.info("checking %d jobs", len(jobs)) self.logger.info("checking %d jobs", len(jobs))
@ -861,16 +891,17 @@ class TestOnefuzz:
def get_job_crash_report(self, job_id: UUID) -> Optional[Tuple[Container, str]]: def get_job_crash_report(self, job_id: UUID) -> Optional[Tuple[Container, str]]:
for task in self.of.tasks.list(job_id=job_id, state=None): for task in self.of.tasks.list(job_id=job_id, state=None):
for container in task.config.containers: if task.config.containers:
if container.type not in [ for container in task.config.containers:
ContainerType.unique_reports, if container.type not in [
ContainerType.reports, ContainerType.unique_reports,
]: ContainerType.reports,
continue ]:
continue
files = self.of.containers.files.list(container.name) files = self.of.containers.files.list(container.name)
if len(files.files) > 0: if len(files.files) > 0:
return (container.name, files.files[0]) return (container.name, files.files[0])
return None return None
def launch_repro( def launch_repro(
@ -1044,12 +1075,13 @@ class TestOnefuzz:
container_names = set() container_names = set()
for job in jobs: for job in jobs:
for task in self.of.tasks.list(job_id=job.job_id, state=None): for task in self.of.tasks.list(job_id=job.job_id, state=None):
for container in task.config.containers: if task.config.containers:
if container.type in [ for container in task.config.containers:
ContainerType.reports, if container.type in [
ContainerType.unique_reports, ContainerType.reports,
]: ContainerType.unique_reports,
container_names.add(container.name) ]:
container_names.add(container.name)
for repro in self.of.repro.list(): for repro in self.of.repro.list():
if repro.config.container in container_names: if repro.config.container in container_names:

View File

@ -3,7 +3,37 @@
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
// allow an argument --write_test_file=xxx.txt to be set
// which is useful for exercising some OneFuzz features in integration tests
int LLVMFuzzerInitialize(int *argc, char ***argv) {
const int num_args = *argc;
char** args = *argv;
for (int i = 0; i < num_args; ++i) {
// look for argument starting with --write_test_file=
const char* arg_name = "--write_test_file=";
if (strncmp(args[i], arg_name, strlen(arg_name)) == 0) {
// extract filename part
const char* file_name = args[i] + strlen(arg_name);
// write file
FILE* output = fopen(file_name, "a");
if (!output) {
perror("failed to open file");
return -1;
}
fputs("Hello from simple fuzzer\n", output);
fclose(output);
break;
}
}
return 0;
}
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t len) { int LLVMFuzzerTestOneInput(const uint8_t *data, size_t len) {
int cnt = 0; int cnt = 0;

View File

@ -228,7 +228,8 @@ class ContainerType(Enum):
unique_reports = "unique_reports" unique_reports = "unique_reports"
regression_reports = "regression_reports" regression_reports = "regression_reports"
logs = "logs" logs = "logs"
extra = "extra" extra_setup = "extra_setup"
extra_output = "extra_output"
@classmethod @classmethod
def reset_defaults(cls) -> List["ContainerType"]: def reset_defaults(cls) -> List["ContainerType"]:

View File

@ -352,69 +352,6 @@ class AgentConfig(BaseModel):
managed: Optional[bool] = Field(default=True) managed: Optional[bool] = Field(default=True)
class TaskUnitConfig(BaseModel):
instance_id: UUID
logs: Optional[str]
job_id: UUID
task_id: UUID
task_type: TaskType
instance_telemetry_key: Optional[str]
microsoft_telemetry_key: Optional[str]
heartbeat_queue: str
# command_queue: str
input_queue: Optional[str]
supervisor_exe: Optional[str]
supervisor_env: Optional[Dict[str, str]]
supervisor_options: Optional[List[str]]
supervisor_input_marker: Optional[str]
target_exe: Optional[str]
target_env: Optional[Dict[str, str]]
target_options: Optional[List[str]]
target_timeout: Optional[int]
target_options_merge: Optional[bool]
target_workers: Optional[int]
check_asan_log: Optional[bool]
check_debugger: Optional[bool]
check_retry_count: Optional[int]
check_fuzzer_help: Optional[bool]
expect_crash_on_failure: Optional[bool]
rename_output: Optional[bool]
generator_exe: Optional[str]
generator_env: Optional[Dict[str, str]]
generator_options: Optional[List[str]]
wait_for_files: Optional[str]
analyzer_exe: Optional[str]
analyzer_env: Optional[Dict[str, str]]
analyzer_options: Optional[List[str]]
stats_file: Optional[str]
stats_format: Optional[StatsFormat]
ensemble_sync_delay: Optional[int]
report_list: Optional[List[str]]
minimized_stack_depth: Optional[int]
coverage_filter: Optional[str]
function_allowlist: Optional[str]
module_allowlist: Optional[str]
source_allowlist: Optional[str]
target_assembly: Optional[str]
target_class: Optional[str]
target_method: Optional[str]
# from here forwards are Container definitions. These need to be inline
# with TaskDefinitions and ContainerTypes
analysis: CONTAINER_DEF
coverage: CONTAINER_DEF
crashes: CONTAINER_DEF
inputs: CONTAINER_DEF
no_repro: CONTAINER_DEF
readonly_inputs: CONTAINER_DEF
reports: CONTAINER_DEF
tools: CONTAINER_DEF
unique_inputs: CONTAINER_DEF
unique_reports: CONTAINER_DEF
regression_reports: CONTAINER_DEF
extra: CONTAINER_DEF
class Forward(BaseModel): class Forward(BaseModel):
src_port: int src_port: int
dst_ip: str dst_ip: str
@ -443,22 +380,6 @@ class Files(BaseModel):
files: List[str] files: List[str]
class WorkUnit(BaseModel):
job_id: UUID
task_id: UUID
task_type: TaskType
# JSON-serialized `TaskUnitConfig`.
config: str
class WorkSet(BaseModel):
reboot: bool
setup_url: str
script: bool
work_units: List[WorkUnit]
class WorkUnitSummary(BaseModel): class WorkUnitSummary(BaseModel):
job_id: UUID job_id: UUID
task_id: UUID task_id: UUID

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import unittest
from onefuzztypes.enums import ContainerType
from onefuzztypes.models import TaskUnitConfig
class TestModelsVerify(unittest.TestCase):
def test_container_type_setup(self) -> None:
for item in ContainerType:
# setup container is explicitly ignored for now
if item == ContainerType.setup:
continue
self.assertIn(item.name, TaskUnitConfig.__fields__)
if __name__ == "__main__":
unittest.main()