experimental "local fuzzing" support (#405)

This PR adds an experimental "local" mode for the agent, starting with `libfuzzer`.  For tasks that poll a queue, in local mode, they just monitor a directory for new files.

Supported commands: 
* libfuzzer-fuzz (models the `libfuzzer-fuzz` task)
* libfuzzer-coverage (models the `libfuzzer-coverage` task)
* libfuzzer-crash-report (models the `libfuzzer-crash-report` task)
* libfuzzer (models the `libfuzzer basic` job template, running libfuzzer-fuzz and libfuzzer-crash-report tasks concurrently, where any files that show up in `crashes_dir` are automatically turned into reports, and optionally runs the coverage task which runs the coverage data exporter for each file that shows up in `inputs_dir`).

Under the hood, there are a handful of changes required to the rest of the system to enable this feature.
1. `SyncedDir` URLs are now optional.  In local mode, these no longer make sense.   (We've discussed moving management of `SyncedDirs` to the Supervisor.  This is tangential to that effort.)
2. `InputPoller` uses a `tempdir` rather than abusing `task_id` for temporary directory naming.
3. Moved the `agent` to only use a single tokio runtime, rather than one for each of the subcommands.
4. Sets the default log level to `info`.  (RUST_LOG can still be used as is).

Note, this removes the `onefuzz-agent debug` commands for the tasks that are now exposed via `onefuzz-agent local`, as these provide a more featureful version of the debug tasks.
This commit is contained in:
bmc-msft
2021-01-19 22:33:25 -05:00
committed by GitHub
parent af2ef9f4fa
commit fd956380d4
36 changed files with 1584 additions and 963 deletions

View File

@ -4,25 +4,21 @@
use anyhow::Result; use anyhow::Result;
use clap::{App, SubCommand}; use clap::{App, SubCommand};
pub fn run(args: &clap::ArgMatches) -> Result<()> { use crate::{debug::libfuzzer_merge, local::common::add_common_config};
const LIBFUZZER_MERGE: &str = "libfuzzer-merge";
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
match args.subcommand() { match args.subcommand() {
("generic-crash-report", Some(sub)) => crate::debug::generic_crash_report::run(sub)?, (LIBFUZZER_MERGE, Some(sub)) => libfuzzer_merge::run(sub).await,
("libfuzzer-coverage", Some(sub)) => crate::debug::libfuzzer_coverage::run(sub)?, _ => {
("libfuzzer-crash-report", Some(sub)) => crate::debug::libfuzzer_crash_report::run(sub)?, anyhow::bail!("missing subcommand\nUSAGE: {}", args.usage());
("libfuzzer-fuzz", Some(sub)) => crate::debug::libfuzzer_fuzz::run(sub)?, }
("libfuzzer-merge", Some(sub)) => crate::debug::libfuzzer_merge::run(sub)?,
_ => println!("missing subcommand\nUSAGE : {}", args.usage()),
} }
Ok(())
} }
pub fn args() -> App<'static, 'static> { pub fn args(name: &str) -> App<'static, 'static> {
SubCommand::with_name("debug") SubCommand::with_name(name)
.about("unsupported internal debugging commands") .about("unsupported internal debugging commands")
.subcommand(crate::debug::generic_crash_report::args()) .subcommand(add_common_config(libfuzzer_merge::args(LIBFUZZER_MERGE)))
.subcommand(crate::debug::libfuzzer_coverage::args())
.subcommand(crate::debug::libfuzzer_crash_report::args())
.subcommand(crate::debug::libfuzzer_fuzz::args())
.subcommand(crate::debug::libfuzzer_merge::args())
} }

View File

@ -1,130 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::CommonConfig,
report::generic::{Config, GenericReportProcessor},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{
collections::HashMap,
path::{Path, PathBuf},
};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;
async fn run_impl(input: String, config: Config) -> Result<()> {
let input_path = Path::new(&input);
let test_url = Url::parse("https://contoso.com/sample-container/blob.txt")?;
let heartbeat_client = config.common.init_heartbeat().await?;
let processor = GenericReportProcessor::new(&config, heartbeat_client);
let result = processor.test_input(test_url, input_path).await?;
println!("{:#?}", result);
Ok(())
}
pub fn run(args: &clap::ArgMatches) -> Result<()> {
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let setup_dir = value_t!(args, "setup_dir", PathBuf)?;
let input = value_t!(args, "input", String)?;
let target_timeout = value_t!(args, "target_timeout", u64).ok();
let check_retry_count = value_t!(args, "check_retry_count", u64)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default();
let check_asan_log = args.is_present("check_asan_log");
let check_debugger = !args.is_present("disable_check_debugger");
let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}
let config = Config {
target_exe,
target_env,
target_options,
target_timeout,
check_asan_log,
check_debugger,
check_retry_count,
crashes: None,
input_queue: None,
no_repro: None,
reports: None,
unique_reports: SyncedDir {
path: "unique_reports".into(),
url: BlobContainerUrl::new(url::Url::parse("https://contoso.com/unique_reports")?)?,
},
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
setup_dir,
},
};
let mut rt = Runtime::new()?;
rt.block_on(async { run_impl(input, config).await })?;
Ok(())
}
pub fn args() -> App<'static, 'static> {
SubCommand::with_name("generic-crash-report")
.about("execute a local-only generic crash report")
.arg(
Arg::with_name("setup_dir")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("input").takes_value(true).required(true))
.arg(
Arg::with_name("disable_check_debugger")
.takes_value(false)
.long("disable_check_debugger"),
)
.arg(
Arg::with_name("check_asan_log")
.takes_value(false)
.long("check_asan_log"),
)
.arg(
Arg::with_name("check_retry_count")
.takes_value(true)
.long("check_retry_count")
.default_value("0"),
)
.arg(
Arg::with_name("target_timeout")
.takes_value(true)
.long("target_timeout")
.default_value("5"),
)
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
.arg(
Arg::with_name("target_options")
.long("target_options")
.takes_value(true)
.multiple(true)
.allow_hyphen_values(true)
.default_value("{input}")
.help("Supports hyphens. Recommendation: Set target_env first"),
)
}

View File

@ -1,117 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::CommonConfig,
coverage::libfuzzer_coverage::{Config, CoverageProcessor},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;
async fn run_impl(input: String, config: Config) -> Result<()> {
let mut processor = CoverageProcessor::new(Arc::new(config))
.await
.map_err(|e| format_err!("coverage processor failed: {:?}", e))?;
let input_path = Path::new(&input);
processor
.test_input(input_path)
.await
.map_err(|e| format_err!("test input failed {:?}", e))?;
let info = processor
.total
.info()
.await
.map_err(|e| format_err!("coverage_info failed {:?}", e))?;
println!("{:?}", info);
Ok(())
}
pub fn run(args: &clap::ArgMatches) -> Result<()> {
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let setup_dir = value_t!(args, "setup_dir", PathBuf)?;
let input = value_t!(args, "input", String)?;
let result_dir = value_t!(args, "result_dir", String)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default();
let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}
// this happens during setup, not during runtime
let check_fuzzer_help = true;
let config = Config {
target_exe,
target_env,
target_options,
check_fuzzer_help,
input_queue: None,
readonly_inputs: vec![],
coverage: SyncedDir {
path: result_dir.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/coverage")?)?,
},
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
setup_dir,
},
};
let mut rt = Runtime::new()?;
rt.block_on(run_impl(input, config))?;
Ok(())
}
pub fn args() -> App<'static, 'static> {
SubCommand::with_name("libfuzzer-coverage")
.about("execute a local-only libfuzzer coverage task")
.arg(
Arg::with_name("setup_dir")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("input").takes_value(true).required(true))
.arg(
Arg::with_name("result_dir")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
.arg(
Arg::with_name("target_options")
.long("target_options")
.takes_value(true)
.multiple(true)
.allow_hyphen_values(true)
.default_value("{input}")
.help("Supports hyphens. Recommendation: Set target_env first"),
)
}

View File

@ -1,118 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::CommonConfig,
report::libfuzzer_report::{AsanProcessor, Config},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;
async fn run_impl(input: String, config: Config) -> Result<()> {
let task = AsanProcessor::new(Arc::new(config)).await?;
let test_url = Url::parse("https://contoso.com/sample-container/blob.txt")?;
let input_path = Path::new(&input);
let result = task.test_input(test_url, &input_path).await;
println!("{:#?}", result);
Ok(())
}
pub fn run(args: &clap::ArgMatches) -> Result<()> {
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let setup_dir = value_t!(args, "setup_dir", PathBuf)?;
let input = value_t!(args, "input", String)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default();
let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}
let target_timeout = value_t!(args, "target_timeout", u64).ok();
let check_retry_count = value_t!(args, "check_retry_count", u64)?;
// this happens during setup, not during runtime
let check_fuzzer_help = true;
let config = Config {
target_exe,
target_env,
target_options,
target_timeout,
check_retry_count,
check_fuzzer_help,
input_queue: None,
crashes: None,
reports: None,
no_repro: None,
unique_reports: SyncedDir {
path: "unique_reports".into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/unique_reports")?)?,
},
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
setup_dir,
},
};
let mut rt = Runtime::new()?;
rt.block_on(async { run_impl(input, config).await })?;
Ok(())
}
pub fn args() -> App<'static, 'static> {
SubCommand::with_name("libfuzzer-crash-report")
.about("execute a local-only libfuzzer crash report task")
.arg(
Arg::with_name("setup_dir")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("input").takes_value(true).required(true))
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
.arg(
Arg::with_name("target_options")
.long("target_options")
.takes_value(true)
.multiple(true)
.allow_hyphen_values(true)
.help("Supports hyphens. Recommendation: Set target_env first"),
)
.arg(
Arg::with_name("target_timeout")
.takes_value(true)
.long("target_timeout"),
)
.arg(
Arg::with_name("check_retry_count")
.takes_value(true)
.long("check_retry_count")
.default_value("0"),
)
}

View File

@ -1,127 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::CommonConfig,
fuzz::libfuzzer_fuzz::{Config, LibFuzzerFuzzTask},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{collections::HashMap, path::PathBuf};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;
async fn run_impl(config: Config) -> Result<()> {
let fuzzer = LibFuzzerFuzzTask::new(config)?;
let result = fuzzer.start_fuzzer_monitor(0, None).await?;
println!("{:#?}", result);
Ok(())
}
pub fn run(args: &clap::ArgMatches) -> Result<()> {
let crashes_dir = value_t!(args, "crashes_dir", String)?;
let inputs_dir = value_t!(args, "inputs_dir", String)?;
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let setup_dir = value_t!(args, "setup_dir", PathBuf)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default();
// this happens during setup, not during runtime
let check_fuzzer_help = true;
let expect_crash_on_failure = args.is_present("expect_crash_on_failure");
let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}
let readonly_inputs = None;
let target_workers = Some(1);
let inputs = SyncedDir {
path: inputs_dir.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/inputs")?)?,
};
let crashes = SyncedDir {
path: crashes_dir.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/crashes")?)?,
};
let ensemble_sync_delay = None;
let config = Config {
inputs,
readonly_inputs,
crashes,
target_exe,
target_env,
target_options,
target_workers,
ensemble_sync_delay,
check_fuzzer_help,
expect_crash_on_failure,
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
setup_dir,
},
};
let mut rt = Runtime::new()?;
rt.block_on(async { run_impl(config).await })?;
Ok(())
}
pub fn args() -> App<'static, 'static> {
SubCommand::with_name("libfuzzer-fuzz")
.about("execute a local-only libfuzzer crash report task")
.arg(
Arg::with_name("setup_dir")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
.arg(
Arg::with_name("target_options")
.long("target_options")
.takes_value(true)
.multiple(true)
.allow_hyphen_values(true)
.help("Supports hyphens. Recommendation: Set target_env first"),
)
.arg(
Arg::with_name("inputs_dir")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("crashes_dir")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("expect_crash_on_failure")
.takes_value(false)
.long("expect_crash_on_failure"),
)
}

View File

@ -1,35 +1,27 @@
// Copyright (c) Microsoft Corporation. // Copyright (c) Microsoft Corporation.
// Licensed under the MIT License. // Licensed under the MIT License.
use crate::tasks::{ use crate::{
config::CommonConfig, local::common::{
merge::libfuzzer_merge::{merge_inputs, Config}, add_cmd_options, build_common_config, get_cmd_arg, get_cmd_env, get_cmd_exe, CmdType,
utils::parse_key_value, },
tasks::merge::libfuzzer_merge::{merge_inputs, Config},
}; };
use anyhow::Result; use anyhow::Result;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir}; use onefuzz::syncdir::SyncedDir;
use std::{collections::HashMap, path::PathBuf, sync::Arc}; use std::sync::Arc;
use tokio::runtime::Runtime;
use url::Url; pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
use uuid::Uuid; let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
pub fn run(args: &clap::ArgMatches) -> Result<()> {
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let setup_dir = value_t!(args, "setup_dir", PathBuf)?;
let inputs = value_t!(args, "inputs", String)?; let inputs = value_t!(args, "inputs", String)?;
let unique_inputs = value_t!(args, "unique_inputs", String)?; let unique_inputs = value_t!(args, "unique_inputs", String)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default(); let check_fuzzer_help = false;
// this happens during setup, not during runtime
let check_fuzzer_help = true;
let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}
let common = build_common_config(args)?;
let config = Arc::new(Config { let config = Arc::new(Config {
target_exe, target_exe,
target_env, target_env,
@ -38,56 +30,29 @@ pub fn run(args: &clap::ArgMatches) -> Result<()> {
input_queue: None, input_queue: None,
inputs: vec![SyncedDir { inputs: vec![SyncedDir {
path: inputs.into(), path: inputs.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/inputs")?)?, url: None,
}], }],
unique_inputs: SyncedDir { unique_inputs: SyncedDir {
path: unique_inputs.into(), path: unique_inputs.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/unique_inputs")?)?, url: None,
},
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
setup_dir,
}, },
common,
preserve_existing_outputs: true, preserve_existing_outputs: true,
}); });
let mut rt = Runtime::new()?; let results = merge_inputs(config.clone(), vec![config.clone().inputs[0].path.clone()]).await?;
rt.block_on(merge_inputs( println!("{:#?}", results);
config.clone(),
vec![config.inputs[0].path.clone()],
))?;
Ok(()) Ok(())
} }
pub fn args() -> App<'static, 'static> { pub fn args(name: &'static str) -> App<'static, 'static> {
SubCommand::with_name("libfuzzer-merge") let mut app = SubCommand::with_name(name).about("execute a local-only libfuzzer merge task");
.about("execute a local-only libfuzzer merge task")
.arg( app = add_cmd_options(CmdType::Target, true, true, true, app);
Arg::with_name("setup_dir") app.arg(Arg::with_name("inputs").takes_value(true).required(true))
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("inputs").takes_value(true).required(true))
.arg( .arg(
Arg::with_name("unique_inputs") Arg::with_name("unique_inputs")
.takes_value(true) .takes_value(true)
.required(true), .required(true),
) )
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
} }

View File

@ -2,8 +2,4 @@
// Licensed under the MIT License. // Licensed under the MIT License.
pub mod cmd; pub mod cmd;
pub mod generic_crash_report;
pub mod libfuzzer_coverage;
pub mod libfuzzer_crash_report;
pub mod libfuzzer_fuzz;
pub mod libfuzzer_merge; pub mod libfuzzer_merge;

View File

@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use anyhow::Result;
use clap::{App, SubCommand};
use crate::local::{
common::add_common_config, generic_crash_report, generic_generator, libfuzzer,
libfuzzer_coverage, libfuzzer_crash_report, libfuzzer_fuzz, radamsa,
};
const RADAMSA: &str = "radamsa";
const LIBFUZZER: &str = "libfuzzer";
const LIBFUZZER_FUZZ: &str = "libfuzzer-fuzz";
const LIBFUZZER_CRASH_REPORT: &str = "libfuzzer-crash-report";
const LIBFUZZER_COVERAGE: &str = "libfuzzer-coverage";
const GENERIC_CRASH_REPORT: &str = "generic-crash-report";
const GENERIC_GENERATOR: &str = "generic-generator";
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
match args.subcommand() {
(RADAMSA, Some(sub)) => radamsa::run(sub).await,
(LIBFUZZER, Some(sub)) => libfuzzer::run(sub).await,
(LIBFUZZER_FUZZ, Some(sub)) => libfuzzer_fuzz::run(sub).await,
(LIBFUZZER_COVERAGE, Some(sub)) => libfuzzer_coverage::run(sub).await,
(LIBFUZZER_CRASH_REPORT, Some(sub)) => libfuzzer_crash_report::run(sub).await,
(GENERIC_CRASH_REPORT, Some(sub)) => generic_crash_report::run(sub).await,
(GENERIC_GENERATOR, Some(sub)) => generic_generator::run(sub).await,
_ => {
anyhow::bail!("missing subcommand\nUSAGE: {}", args.usage());
}
}
}
pub fn args(name: &str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("pre-release local fuzzing")
.subcommand(add_common_config(radamsa::args(RADAMSA)))
.subcommand(add_common_config(libfuzzer::args(LIBFUZZER)))
.subcommand(add_common_config(libfuzzer_fuzz::args(LIBFUZZER_FUZZ)))
.subcommand(add_common_config(libfuzzer_coverage::args(
LIBFUZZER_COVERAGE,
)))
.subcommand(add_common_config(libfuzzer_crash_report::args(
LIBFUZZER_CRASH_REPORT,
)))
.subcommand(add_common_config(generic_crash_report::args(
GENERIC_CRASH_REPORT,
)))
.subcommand(add_common_config(generic_generator::args(
GENERIC_GENERATOR,
)))
}

View File

@ -0,0 +1,180 @@
use crate::tasks::config::CommonConfig;
use crate::tasks::utils::parse_key_value;
use anyhow::Result;
use clap::{App, Arg, ArgMatches};
use std::{collections::HashMap, path::PathBuf};
use uuid::Uuid;
pub const SETUP_DIR: &str = "setup_dir";
pub const INPUTS_DIR: &str = "inputs_dir";
pub const CRASHES_DIR: &str = "crashes_dir";
pub const TARGET_WORKERS: &str = "target_workers";
pub const REPORTS_DIR: &str = "reports_dir";
pub const NO_REPRO_DIR: &str = "no_repro_dir";
pub const TARGET_TIMEOUT: &str = "target_timeout";
pub const CHECK_RETRY_COUNT: &str = "check_retry_count";
pub const DISABLE_CHECK_QUEUE: &str = "disable_check_queue";
pub const UNIQUE_REPORTS_DIR: &str = "unique_reports_dir";
pub const COVERAGE_DIR: &str = "coverage_dir";
pub const READONLY_INPUTS: &str = "readonly_inputs_dir";
pub const CHECK_ASAN_LOG: &str = "check_asan_log";
pub const TOOLS_DIR: &str = "tools_dir";
pub const RENAME_OUTPUT: &str = "rename_output";
pub const CHECK_FUZZER_HELP: &str = "check_fuzzer_help";
pub const TARGET_EXE: &str = "target_exe";
pub const TARGET_ENV: &str = "target_env";
pub const TARGET_OPTIONS: &str = "target_options";
pub const SUPERVISOR_EXE: &str = "supervisor_exe";
pub const SUPERVISOR_ENV: &str = "supervisor_env";
pub const SUPERVISOR_OPTIONS: &str = "supervisor_options";
pub const GENERATOR_EXE: &str = "generator_exe";
pub const GENERATOR_ENV: &str = "generator_env";
pub const GENERATOR_OPTIONS: &str = "generator_options";
pub enum CmdType {
Target,
Generator,
Supervisor,
}
pub fn add_cmd_options(
cmd_type: CmdType,
exe: bool,
arg: bool,
env: bool,
mut app: App<'static, 'static>,
) -> App<'static, 'static> {
let (exe_name, env_name, arg_name) = match cmd_type {
CmdType::Target => (TARGET_EXE, TARGET_ENV, TARGET_OPTIONS),
CmdType::Supervisor => (SUPERVISOR_EXE, SUPERVISOR_ENV, SUPERVISOR_OPTIONS),
CmdType::Generator => (GENERATOR_EXE, GENERATOR_ENV, GENERATOR_OPTIONS),
};
if exe {
app = app.arg(Arg::with_name(exe_name).takes_value(true).required(true));
}
if env {
app = app.arg(
Arg::with_name(env_name)
.long(env_name)
.takes_value(true)
.multiple(true),
)
}
if arg {
app = app.arg(
Arg::with_name(arg_name)
.long(arg_name)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
)
}
app
}
pub fn get_cmd_exe(cmd_type: CmdType, args: &clap::ArgMatches<'_>) -> Result<String> {
let name = match cmd_type {
CmdType::Target => TARGET_EXE,
CmdType::Supervisor => SUPERVISOR_EXE,
CmdType::Generator => GENERATOR_EXE,
};
let exe = value_t!(args, name, String)?;
Ok(exe)
}
pub fn get_cmd_arg(cmd_type: CmdType, args: &clap::ArgMatches<'_>) -> Vec<String> {
let name = match cmd_type {
CmdType::Target => TARGET_OPTIONS,
CmdType::Supervisor => SUPERVISOR_OPTIONS,
CmdType::Generator => GENERATOR_OPTIONS,
};
args.values_of_lossy(name).unwrap_or_default()
}
pub fn get_cmd_env(
cmd_type: CmdType,
args: &clap::ArgMatches<'_>,
) -> Result<HashMap<String, String>> {
let env_name = match cmd_type {
CmdType::Target => TARGET_ENV,
CmdType::Supervisor => SUPERVISOR_ENV,
CmdType::Generator => GENERATOR_ENV,
};
let mut env = HashMap::new();
for opt in args.values_of_lossy(env_name).unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
env.insert(k, v);
}
Ok(env)
}
pub fn add_common_config(app: App<'static, 'static>) -> App<'static, 'static> {
app.arg(
Arg::with_name("job_id")
.long("job_id")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("task_id")
.long("task_id")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("instance_id")
.long("instance_id")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("setup_dir")
.long("setup_dir")
.takes_value(true)
.required(false),
)
}
fn get_uuid(name: &str, args: &ArgMatches<'_>) -> Result<Uuid> {
match value_t!(args, name, String) {
Ok(x) => Uuid::parse_str(&x)
.map_err(|x| format_err!("invalid {}. uuid expected. {})", name, x)),
Err(_) => Ok(Uuid::nil()),
}
}
pub fn build_common_config(args: &ArgMatches<'_>) -> Result<CommonConfig> {
let job_id = get_uuid("job_id", args)?;
let task_id = get_uuid("task_id", args)?;
let instance_id = get_uuid("instance_id", args)?;
let setup_dir = if args.is_present(SETUP_DIR) {
value_t!(args, SETUP_DIR, PathBuf)?
} else {
if args.is_present(TARGET_EXE) {
value_t!(args, TARGET_EXE, PathBuf)?
.parent()
.map(|x| x.to_path_buf())
.unwrap_or_default()
} else {
PathBuf::default()
}
};
let config = CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id,
task_id,
instance_id,
setup_dir,
};
Ok(config)
}

View File

@ -0,0 +1,124 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::common::{
build_common_config, get_cmd_arg, get_cmd_env, get_cmd_exe, CmdType, CHECK_ASAN_LOG,
CHECK_RETRY_COUNT, CRASHES_DIR, DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV,
TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
},
tasks::report::generic::{Config, ReportTask},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
pub fn build_report_config(args: &clap::ArgMatches<'_>) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let crashes = Some(value_t!(args, CRASHES_DIR, PathBuf)?.into());
let reports = if args.is_present(REPORTS_DIR) {
Some(value_t!(args, REPORTS_DIR, PathBuf)?).map(|x| x.into())
} else {
None
};
let no_repro = if args.is_present(NO_REPRO_DIR) {
Some(value_t!(args, NO_REPRO_DIR, PathBuf)?).map(|x| x.into())
} else {
None
};
let unique_reports = value_t!(args, UNIQUE_REPORTS_DIR, PathBuf)?.into();
let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok();
let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?;
let check_queue = !args.is_present(DISABLE_CHECK_QUEUE);
let check_asan_log = args.is_present(CHECK_ASAN_LOG);
let check_debugger = !args.is_present("disable_check_debugger");
let common = build_common_config(args)?;
let config = Config {
target_exe,
target_env,
target_options,
target_timeout,
check_asan_log,
check_debugger,
check_retry_count,
check_queue,
crashes,
input_queue: None,
no_repro,
reports,
unique_reports,
common,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let config = build_report_config(args)?;
ReportTask::new(config).local_run().await
}
pub fn build_shared_args() -> Vec<Arg<'static, 'static>> {
vec![
Arg::with_name(TARGET_EXE)
.long(TARGET_EXE)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_ENV)
.long(TARGET_ENV)
.takes_value(true)
.multiple(true),
Arg::with_name(TARGET_OPTIONS)
.default_value("{input}")
.long(TARGET_OPTIONS)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
Arg::with_name(CRASHES_DIR)
.long(CRASHES_DIR)
.takes_value(true)
.required(true),
Arg::with_name(REPORTS_DIR)
.long(REPORTS_DIR)
.takes_value(true)
.required(false),
Arg::with_name(NO_REPRO_DIR)
.long(NO_REPRO_DIR)
.takes_value(true)
.required(false),
Arg::with_name(UNIQUE_REPORTS_DIR)
.long(UNIQUE_REPORTS_DIR)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_TIMEOUT)
.takes_value(true)
.long(TARGET_TIMEOUT)
.default_value("30"),
Arg::with_name(CHECK_RETRY_COUNT)
.takes_value(true)
.long(CHECK_RETRY_COUNT)
.default_value("0"),
Arg::with_name(DISABLE_CHECK_QUEUE)
.takes_value(false)
.long(DISABLE_CHECK_QUEUE),
Arg::with_name(CHECK_ASAN_LOG)
.takes_value(false)
.long(CHECK_ASAN_LOG),
Arg::with_name("disable_check_debugger")
.takes_value(false)
.long("disable_check_debugger"),
]
}
pub fn args(name: &'static str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("execute a local-only generic crash report")
.args(&build_shared_args())
}

View File

@ -0,0 +1,138 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::common::{
build_common_config, get_cmd_arg, get_cmd_env, get_cmd_exe, CmdType, CHECK_ASAN_LOG,
CHECK_RETRY_COUNT, CRASHES_DIR, GENERATOR_ENV, GENERATOR_EXE, GENERATOR_OPTIONS,
READONLY_INPUTS, RENAME_OUTPUT, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT,
TOOLS_DIR,
},
tasks::fuzz::generator::{Config, GeneratorTask},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
pub fn build_fuzz_config(args: &clap::ArgMatches<'_>) -> Result<Config> {
let crashes = value_t!(args, CRASHES_DIR, PathBuf)?.into();
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_options = get_cmd_arg(CmdType::Target, args);
let target_env = get_cmd_env(CmdType::Target, args)?;
let generator_exe = get_cmd_exe(CmdType::Generator, args)?;
let generator_options = get_cmd_arg(CmdType::Generator, args);
let generator_env = get_cmd_env(CmdType::Generator, args)?;
let readonly_inputs = values_t!(args, READONLY_INPUTS, PathBuf)?
.iter()
.map(|x| x.to_owned().into())
.collect();
let rename_output = args.is_present(RENAME_OUTPUT);
let check_asan_log = args.is_present(CHECK_ASAN_LOG);
let check_debugger = !args.is_present("disable_check_debugger");
let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?;
let target_timeout = Some(value_t!(args, TARGET_TIMEOUT, u64)?);
let tools = if args.is_present(TOOLS_DIR) {
Some(value_t!(args, TOOLS_DIR, PathBuf)?.into())
} else {
None
};
let ensemble_sync_delay = None;
let common = build_common_config(args)?;
let config = Config {
tools,
generator_exe,
generator_env,
generator_options,
target_exe,
target_env,
target_options,
target_timeout,
readonly_inputs,
crashes,
ensemble_sync_delay,
check_asan_log,
check_debugger,
check_retry_count,
rename_output,
common,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let config = build_fuzz_config(args)?;
GeneratorTask::new(config).run().await
}
pub fn build_shared_args() -> Vec<Arg<'static, 'static>> {
vec![
Arg::with_name(TARGET_EXE)
.long(TARGET_EXE)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_ENV)
.long(TARGET_ENV)
.takes_value(true)
.multiple(true),
Arg::with_name(TARGET_OPTIONS)
.default_value("{input}")
.long(TARGET_OPTIONS)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
Arg::with_name(GENERATOR_EXE)
.long(GENERATOR_EXE)
.default_value("radamsa")
.takes_value(true)
.required(true),
Arg::with_name(GENERATOR_ENV)
.long(GENERATOR_ENV)
.takes_value(true)
.multiple(true),
Arg::with_name(GENERATOR_OPTIONS)
.long(GENERATOR_OPTIONS)
.takes_value(true)
.value_delimiter(" ")
.default_value("-H sha256 -o {generated_inputs}/input-%h.%s -n 100 -r {input_corpus}")
.help("Use a quoted string with space separation to denote multiple arguments"),
Arg::with_name(CRASHES_DIR)
.takes_value(true)
.required(true)
.long(CRASHES_DIR),
Arg::with_name(READONLY_INPUTS)
.takes_value(true)
.required(true)
.multiple(true)
.long(READONLY_INPUTS),
Arg::with_name(TOOLS_DIR).takes_value(true).long(TOOLS_DIR),
Arg::with_name(CHECK_RETRY_COUNT)
.takes_value(true)
.long(CHECK_RETRY_COUNT)
.default_value("0"),
Arg::with_name(CHECK_ASAN_LOG)
.takes_value(false)
.long(CHECK_ASAN_LOG),
Arg::with_name(RENAME_OUTPUT)
.takes_value(false)
.long(RENAME_OUTPUT),
Arg::with_name(TARGET_TIMEOUT)
.takes_value(true)
.long(TARGET_TIMEOUT)
.default_value("30"),
Arg::with_name("disable_check_debugger")
.takes_value(false)
.long("disable_check_debugger"),
]
}
pub fn args(name: &'static str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("execute a local-only generator fuzzing task")
.args(&build_shared_args())
}

View File

@ -0,0 +1,67 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::{
common::COVERAGE_DIR,
libfuzzer_coverage::{build_coverage_config, build_shared_args as build_coverage_args},
libfuzzer_crash_report::{build_report_config, build_shared_args as build_crash_args},
libfuzzer_fuzz::{build_fuzz_config, build_shared_args as build_fuzz_args},
},
tasks::{
coverage::libfuzzer_coverage::CoverageTask, fuzz::libfuzzer_fuzz::LibFuzzerFuzzTask,
report::libfuzzer_report::ReportTask,
},
};
use anyhow::Result;
use clap::{App, SubCommand};
use std::collections::HashSet;
use tokio::task::spawn;
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let fuzz_config = build_fuzz_config(args)?;
let fuzzer = LibFuzzerFuzzTask::new(fuzz_config)?;
let fuzz_task = spawn(async move { fuzzer.run().await });
let report_config = build_report_config(args)?;
let report = ReportTask::new(report_config);
let report_task = spawn(async move { report.local_run().await });
if args.is_present(COVERAGE_DIR) {
let coverage_config = build_coverage_config(args, true)?;
let coverage = CoverageTask::new(coverage_config);
let coverage_task = spawn(async move { coverage.local_run().await });
let result = tokio::try_join!(fuzz_task, report_task, coverage_task)?;
result.0?;
result.1?;
result.2?;
} else {
let result = tokio::try_join!(fuzz_task, report_task)?;
result.0?;
result.1?;
}
Ok(())
}
pub fn args(name: &'static str) -> App<'static, 'static> {
let mut app = SubCommand::with_name(name).about("run a local libfuzzer & crash reporting task");
let mut used = HashSet::new();
for args in &[
build_fuzz_args(),
build_crash_args(),
build_coverage_args(true),
] {
for arg in args {
if used.contains(arg.b.name) {
continue;
}
used.insert(arg.b.name.to_string());
app = app.arg(arg);
}
}
app
}

View File

@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::common::{
build_common_config, get_cmd_arg, get_cmd_env, get_cmd_exe, CmdType, CHECK_FUZZER_HELP,
COVERAGE_DIR, INPUTS_DIR, READONLY_INPUTS, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
},
tasks::coverage::libfuzzer_coverage::{Config, CoverageTask},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
pub fn build_coverage_config(args: &clap::ArgMatches<'_>, local_job: bool) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let readonly_inputs = if local_job {
vec![value_t!(args, INPUTS_DIR, PathBuf)?.into()]
} else {
values_t!(args, READONLY_INPUTS, PathBuf)?
.iter()
.map(|x| x.to_owned().into())
.collect()
};
let coverage = value_t!(args, COVERAGE_DIR, PathBuf)?.into();
let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP);
let common = build_common_config(args)?;
let config = Config {
target_exe,
target_env,
target_options,
check_fuzzer_help,
input_queue: None,
readonly_inputs,
coverage,
common,
check_queue: false,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let config = build_coverage_config(args, false)?;
let task = CoverageTask::new(config);
task.local_run().await
}
pub fn build_shared_args(local_job: bool) -> Vec<Arg<'static, 'static>> {
let mut args = vec![
Arg::with_name(TARGET_EXE)
.long(TARGET_EXE)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_ENV)
.long(TARGET_ENV)
.takes_value(true)
.multiple(true),
Arg::with_name(TARGET_OPTIONS)
.long(TARGET_OPTIONS)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
Arg::with_name(COVERAGE_DIR)
.takes_value(true)
.required(!local_job)
.long(COVERAGE_DIR),
Arg::with_name(CHECK_FUZZER_HELP)
.takes_value(false)
.long(CHECK_FUZZER_HELP),
];
if local_job {
args.push(
Arg::with_name(INPUTS_DIR)
.long(INPUTS_DIR)
.takes_value(true)
.required(true),
)
} else {
args.push(
Arg::with_name(READONLY_INPUTS)
.takes_value(true)
.required(true)
.long(READONLY_INPUTS)
.multiple(true),
)
}
args
}
pub fn args(name: &'static str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("execute a local-only libfuzzer coverage task")
.args(&build_shared_args(false))
}

View File

@ -0,0 +1,115 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::common::{
build_common_config, get_cmd_arg, get_cmd_env, get_cmd_exe, CmdType, CHECK_FUZZER_HELP,
CHECK_RETRY_COUNT, CRASHES_DIR, DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV,
TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
},
tasks::report::libfuzzer_report::{Config, ReportTask},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
pub fn build_report_config(args: &clap::ArgMatches<'_>) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let crashes = Some(value_t!(args, CRASHES_DIR, PathBuf)?.into());
let reports = if args.is_present(REPORTS_DIR) {
Some(value_t!(args, REPORTS_DIR, PathBuf)?).map(|x| x.into())
} else {
None
};
let no_repro = if args.is_present(NO_REPRO_DIR) {
Some(value_t!(args, NO_REPRO_DIR, PathBuf)?).map(|x| x.into())
} else {
None
};
let unique_reports = value_t!(args, UNIQUE_REPORTS_DIR, PathBuf)?.into();
let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok();
let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?;
let check_queue = !args.is_present(DISABLE_CHECK_QUEUE);
let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP);
let common = build_common_config(args)?;
let config = Config {
target_exe,
target_env,
target_options,
target_timeout,
check_retry_count,
check_fuzzer_help,
input_queue: None,
check_queue,
crashes,
reports,
no_repro,
unique_reports,
common,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let config = build_report_config(args)?;
ReportTask::new(config).local_run().await
}
pub fn build_shared_args() -> Vec<Arg<'static, 'static>> {
vec![
Arg::with_name(TARGET_EXE)
.long(TARGET_EXE)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_ENV)
.long(TARGET_ENV)
.takes_value(true)
.multiple(true),
Arg::with_name(TARGET_OPTIONS)
.long(TARGET_OPTIONS)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
Arg::with_name(CRASHES_DIR)
.long(CRASHES_DIR)
.takes_value(true)
.required(true),
Arg::with_name(REPORTS_DIR)
.long(REPORTS_DIR)
.takes_value(true)
.required(false),
Arg::with_name(NO_REPRO_DIR)
.long(NO_REPRO_DIR)
.takes_value(true)
.required(false),
Arg::with_name(UNIQUE_REPORTS_DIR)
.long(UNIQUE_REPORTS_DIR)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_TIMEOUT)
.takes_value(true)
.long(TARGET_TIMEOUT),
Arg::with_name(CHECK_RETRY_COUNT)
.takes_value(true)
.long(CHECK_RETRY_COUNT)
.default_value("0"),
Arg::with_name(DISABLE_CHECK_QUEUE)
.takes_value(false)
.long(DISABLE_CHECK_QUEUE),
Arg::with_name(CHECK_FUZZER_HELP)
.takes_value(false)
.long(CHECK_FUZZER_HELP),
]
}
pub fn args(name: &'static str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("execute a local-only libfuzzer crash report task")
.args(&build_shared_args())
}

View File

@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::common::{
build_common_config, get_cmd_arg, get_cmd_env, get_cmd_exe, CmdType, CHECK_FUZZER_HELP,
CRASHES_DIR, INPUTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_WORKERS,
},
tasks::fuzz::libfuzzer_fuzz::{Config, LibFuzzerFuzzTask},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
const DISABLE_EXPECT_CRASH_ON_FAILURE: &str = "disable_expect_crash_on_failure";
pub fn build_fuzz_config(args: &clap::ArgMatches<'_>) -> Result<Config> {
let crashes = value_t!(args, CRASHES_DIR, PathBuf)?.into();
let inputs = value_t!(args, INPUTS_DIR, PathBuf)?.into();
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let target_workers = value_t!(args, "target_workers", u64).unwrap_or_default();
let readonly_inputs = None;
let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP);
let expect_crash_on_failure = !args.is_present(DISABLE_EXPECT_CRASH_ON_FAILURE);
let ensemble_sync_delay = None;
let common = build_common_config(args)?;
let config = Config {
inputs,
readonly_inputs,
crashes,
target_exe,
target_env,
target_options,
target_workers,
ensemble_sync_delay,
expect_crash_on_failure,
check_fuzzer_help,
common,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let config = build_fuzz_config(args)?;
LibFuzzerFuzzTask::new(config)?.run().await
}
pub fn build_shared_args() -> Vec<Arg<'static, 'static>> {
vec![
Arg::with_name(TARGET_EXE)
.long(TARGET_EXE)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_ENV)
.long(TARGET_ENV)
.takes_value(true)
.multiple(true),
Arg::with_name(TARGET_OPTIONS)
.long(TARGET_OPTIONS)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
Arg::with_name(INPUTS_DIR)
.long(INPUTS_DIR)
.takes_value(true)
.required(true),
Arg::with_name(CRASHES_DIR)
.long(CRASHES_DIR)
.takes_value(true)
.required(true),
Arg::with_name(TARGET_WORKERS)
.long(TARGET_WORKERS)
.takes_value(true),
Arg::with_name(CHECK_FUZZER_HELP)
.takes_value(false)
.long(CHECK_FUZZER_HELP),
Arg::with_name(DISABLE_EXPECT_CRASH_ON_FAILURE)
.takes_value(false)
.long(DISABLE_EXPECT_CRASH_ON_FAILURE),
]
}
pub fn args(name: &'static str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("execute a local-only libfuzzer fuzzing task")
.args(&build_shared_args())
}

View File

@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
pub mod cmd;
pub mod common;
pub mod generic_crash_report;
pub mod generic_generator;
pub mod libfuzzer;
pub mod libfuzzer_coverage;
pub mod libfuzzer_crash_report;
pub mod libfuzzer_fuzz;
pub mod radamsa;

View File

@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
local::{
generic_crash_report::{build_report_config, build_shared_args as build_crash_args},
generic_generator::{build_fuzz_config, build_shared_args as build_fuzz_args},
},
tasks::{fuzz::generator::GeneratorTask, report::generic::ReportTask},
};
use anyhow::Result;
use clap::{App, SubCommand};
use std::collections::HashSet;
use tokio::task::spawn;
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let fuzz_config = build_fuzz_config(args)?;
let fuzzer = GeneratorTask::new(fuzz_config);
let fuzz_task = spawn(async move { fuzzer.run().await });
let report_config = build_report_config(args)?;
let report = ReportTask::new(report_config);
let report_task = spawn(async move { report.local_run().await });
let result = tokio::try_join!(fuzz_task, report_task)?;
result.0?;
result.1?;
Ok(())
}
pub fn args(name: &'static str) -> App<'static, 'static> {
let mut app = SubCommand::with_name(name).about("run a local generator & crash reporting job");
let mut used = HashSet::new();
for args in &[build_fuzz_args(), build_crash_args()] {
for arg in args {
if used.contains(arg.b.name) {
continue;
}
used.insert(arg.b.name.to_string());
app = app.arg(arg);
}
}
app
}

View File

@ -10,19 +10,22 @@ extern crate onefuzz;
#[macro_use] #[macro_use]
extern crate clap; extern crate clap;
use std::path::PathBuf;
use anyhow::Result; use anyhow::Result;
use clap::{App, Arg, SubCommand}; use clap::{App, ArgMatches, SubCommand};
use onefuzz::telemetry::{self}; use std::io::{stdout, Write};
mod debug; mod debug;
mod local;
mod managed;
mod tasks; mod tasks;
use tasks::config::Config; const LICENSE_CMD: &str = "licenses";
const LOCAL_CMD: &str = "local";
const DEBUG_CMD: &str = "debug";
const MANAGED_CMD: &str = "managed";
fn main() -> Result<()> { fn main() -> Result<()> {
env_logger::init(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let built_version = format!( let built_version = format!(
"{} onefuzz:{} git:{}", "{} onefuzz:{} git:{}",
@ -33,65 +36,30 @@ fn main() -> Result<()> {
let app = App::new("onefuzz-agent") let app = App::new("onefuzz-agent")
.version(built_version.as_str()) .version(built_version.as_str())
.arg( .subcommand(managed::cmd::args(MANAGED_CMD))
Arg::with_name("config") .subcommand(local::cmd::args(LOCAL_CMD))
.long("config") .subcommand(debug::cmd::args(DEBUG_CMD))
.short("c") .subcommand(SubCommand::with_name(LICENSE_CMD).about("display third-party licenses"));
.takes_value(true),
)
.arg(
Arg::with_name("setup_dir")
.long("setup_dir")
.short("s")
.takes_value(true),
)
.subcommand(debug::cmd::args())
.subcommand(SubCommand::with_name("licenses").about("display third-party licenses"));
let matches = app.get_matches(); let matches = app.get_matches();
match matches.subcommand() {
("licenses", Some(_)) => {
return licenses();
}
("debug", Some(sub)) => return crate::debug::cmd::run(sub),
_ => {} // no subcommand
}
if matches.value_of("config").is_none() {
println!("Missing '--config'\n{}", matches.usage());
return Ok(());
}
let config_path: PathBuf = matches.value_of("config").unwrap().parse()?;
let setup_dir = matches.value_of("setup_dir");
let config = Config::from_file(config_path, setup_dir)?;
init_telemetry(&config);
verbose!("config parsed");
let mut rt = tokio::runtime::Runtime::new()?; let mut rt = tokio::runtime::Runtime::new()?;
rt.block_on(run(matches))
}
let result = rt.block_on(config.run()); async fn run(args: ArgMatches<'_>) -> Result<()> {
match args.subcommand() {
if let Err(err) = &result { (LICENSE_CMD, Some(_)) => return licenses(),
error!("error running task: {}", err); (DEBUG_CMD, Some(sub)) => return debug::cmd::run(sub).await,
(LOCAL_CMD, Some(sub)) => return local::cmd::run(sub).await,
(MANAGED_CMD, Some(sub)) => return managed::cmd::run(sub).await,
_ => {
anyhow::bail!("missing subcommand\nUSAGE: {}", args.usage());
}
} }
telemetry::try_flush_and_close();
result
} }
fn licenses() -> Result<()> { fn licenses() -> Result<()> {
use std::io::{self, Write}; stdout().write_all(include_bytes!("../../data/licenses.json"))?;
io::stdout().write_all(include_bytes!("../../data/licenses.json"))?;
Ok(()) Ok(())
} }
fn init_telemetry(config: &Config) {
let inst_key = config.common().instrumentation_key;
let tele_key = config.common().telemetry_key;
telemetry::set_appinsights_clients(inst_key, tele_key);
}

View File

@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::config::{CommonConfig, Config};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::telemetry;
use std::path::PathBuf;
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let config_path = value_t!(args, "config", PathBuf)?;
let setup_dir = value_t!(args, "setup_dir", PathBuf)?;
let config = Config::from_file(config_path, setup_dir)?;
init_telemetry(config.common());
let result = config.run().await;
if let Err(err) = &result {
error!("error running task: {}", err);
}
telemetry::try_flush_and_close();
result
}
fn init_telemetry(config: &CommonConfig) {
telemetry::set_appinsights_clients(config.instrumentation_key, config.telemetry_key);
}
pub fn args(name: &str) -> App<'static, 'static> {
SubCommand::with_name(name)
.about("managed fuzzing")
.arg(Arg::with_name("config").required(true))
.arg(Arg::with_name("setup_dir").required(true))
}

View File

@ -0,0 +1 @@
pub mod cmd;

View File

@ -66,8 +66,9 @@ async fn run_existing(config: &Config) -> Result<()> {
async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> { async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
let result = if let Some(crashes) = &config.crashes { let result = if let Some(crashes) = &config.crashes {
crashes.url.account() == input.account() let url = crashes.try_url()?;
&& crashes.url.container() == input.container() url.account() == input.account()
&& url.container() == input.container()
&& crashes.path.join(input.name()).exists() && crashes.path.join(input.name()).exists()
} else { } else {
false false

View File

@ -10,7 +10,7 @@ use onefuzz::{
}; };
use reqwest::Url; use reqwest::Url;
use serde::{self, Deserialize}; use serde::{self, Deserialize};
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
@ -20,7 +20,7 @@ pub enum ContainerType {
Inputs, Inputs,
} }
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone, Default)]
pub struct CommonConfig { pub struct CommonConfig {
pub job_id: Uuid, pub job_id: Uuid,
@ -34,6 +34,7 @@ pub struct CommonConfig {
pub telemetry_key: Option<Uuid>, pub telemetry_key: Option<Uuid>,
#[serde(default)]
pub setup_dir: PathBuf, pub setup_dir: PathBuf,
} }
@ -68,7 +69,7 @@ pub enum Config {
GenericAnalysis(analysis::generic::Config), GenericAnalysis(analysis::generic::Config),
#[serde(alias = "generic_generator")] #[serde(alias = "generic_generator")]
GenericGenerator(fuzz::generator::GeneratorConfig), GenericGenerator(fuzz::generator::Config),
#[serde(alias = "generic_supervisor")] #[serde(alias = "generic_supervisor")]
GenericSupervisor(fuzz::supervisor::SupervisorConfig), GenericSupervisor(fuzz::supervisor::SupervisorConfig),
@ -81,16 +82,29 @@ pub enum Config {
} }
impl Config { impl Config {
pub fn from_file(path: impl AsRef<Path>, setup_dir: Option<impl AsRef<Path>>) -> Result<Self> { pub fn from_file(path: PathBuf, setup_dir: PathBuf) -> Result<Self> {
let json = std::fs::read_to_string(path)?; let json = std::fs::read_to_string(path)?;
let mut json_config: serde_json::Value = serde_json::from_str(&json)?; let json_config: serde_json::Value = serde_json::from_str(&json)?;
// override the setup_dir in the config file with the parameter value if specified
if let Some(setup_dir) = setup_dir {
json_config["setup_dir"] =
serde_json::Value::String(setup_dir.as_ref().to_string_lossy().into());
}
Ok(serde_json::from_value(json_config)?) // override the setup_dir in the config file with the parameter value if specified
let mut config: Self = serde_json::from_value(json_config)?;
config.common_mut().setup_dir = setup_dir;
Ok(config)
}
fn common_mut(&mut self) -> &mut CommonConfig {
match self {
Config::LibFuzzerFuzz(c) => &mut c.common,
Config::LibFuzzerMerge(c) => &mut c.common,
Config::LibFuzzerReport(c) => &mut c.common,
Config::LibFuzzerCoverage(c) => &mut c.common,
Config::GenericAnalysis(c) => &mut c.common,
Config::GenericMerge(c) => &mut c.common,
Config::GenericReport(c) => &mut c.common,
Config::GenericSupervisor(c) => &mut c.common,
Config::GenericGenerator(c) => &mut c.common,
}
} }
pub fn common(&self) -> &CommonConfig { pub fn common(&self) -> &CommonConfig {
@ -150,25 +164,29 @@ impl Config {
match self { match self {
Config::LibFuzzerFuzz(config) => { Config::LibFuzzerFuzz(config) => {
fuzz::libfuzzer_fuzz::LibFuzzerFuzzTask::new(config)? fuzz::libfuzzer_fuzz::LibFuzzerFuzzTask::new(config)?
.start() .run()
.await .await
} }
Config::LibFuzzerReport(config) => { Config::LibFuzzerReport(config) => {
report::libfuzzer_report::ReportTask::new(config) report::libfuzzer_report::ReportTask::new(config)
.run() .managed_run()
.await .await
} }
Config::LibFuzzerCoverage(config) => { Config::LibFuzzerCoverage(config) => {
coverage::libfuzzer_coverage::CoverageTask::new(Arc::new(config)) coverage::libfuzzer_coverage::CoverageTask::new(config)
.run() .managed_run()
.await .await
} }
Config::LibFuzzerMerge(config) => merge::libfuzzer_merge::spawn(Arc::new(config)).await, Config::LibFuzzerMerge(config) => merge::libfuzzer_merge::spawn(Arc::new(config)).await,
Config::GenericAnalysis(config) => analysis::generic::spawn(config).await, Config::GenericAnalysis(config) => analysis::generic::spawn(config).await,
Config::GenericGenerator(config) => fuzz::generator::spawn(Arc::new(config)).await, Config::GenericGenerator(config) => {
fuzz::generator::GeneratorTask::new(config).run().await
}
Config::GenericSupervisor(config) => fuzz::supervisor::spawn(config).await, Config::GenericSupervisor(config) => fuzz::supervisor::spawn(config).await,
Config::GenericMerge(config) => merge::generic::spawn(Arc::new(config)).await, Config::GenericMerge(config) => merge::generic::spawn(Arc::new(config)).await,
Config::GenericReport(config) => report::generic::ReportTask::new(&config).run().await, Config::GenericReport(config) => {
report::generic::ReportTask::new(config).managed_run().await
}
} }
} }
} }

View File

@ -65,6 +65,9 @@ pub struct Config {
pub readonly_inputs: Vec<SyncedDir>, pub readonly_inputs: Vec<SyncedDir>,
pub coverage: SyncedDir, pub coverage: SyncedDir,
#[serde(default = "default_bool_true")]
pub check_queue: bool,
#[serde(default = "default_bool_true")] #[serde(default = "default_bool_true")]
pub check_fuzzer_help: bool, pub check_fuzzer_help: bool,
@ -86,17 +89,26 @@ pub struct CoverageTask {
} }
impl CoverageTask { impl CoverageTask {
pub fn new(config: impl Into<Arc<Config>>) -> Self { pub fn new(config: Config) -> Self {
let config = config.into(); let config = Arc::new(config);
let poller = InputPoller::new();
let task_dir = PathBuf::from(config.common.task_id.to_string());
let poller_dir = task_dir.join("poller");
let poller = InputPoller::<Message>::new(poller_dir);
Self { config, poller } Self { config, poller }
} }
pub async fn run(&mut self) -> Result<()> { pub async fn local_run(&self) -> Result<()> {
let mut processor = CoverageProcessor::new(self.config.clone()).await?;
self.config.coverage.init().await?;
for synced_dir in &self.config.readonly_inputs {
synced_dir.init().await?;
self.record_corpus_coverage(&mut processor, &synced_dir)
.await?;
}
Ok(())
}
pub async fn managed_run(&mut self) -> Result<()> {
info!("starting libFuzzer coverage task"); info!("starting libFuzzer coverage task");
if self.config.check_fuzzer_help { if self.config.check_fuzzer_help {
@ -116,6 +128,7 @@ impl CoverageTask {
async fn process(&mut self) -> Result<()> { async fn process(&mut self) -> Result<()> {
let mut processor = CoverageProcessor::new(self.config.clone()).await?; let mut processor = CoverageProcessor::new(self.config.clone()).await?;
info!("processing initial dataset");
let mut seen_inputs = false; let mut seen_inputs = false;
// Update the total with the coverage from each seed corpus. // Update the total with the coverage from each seed corpus.
for dir in &self.config.readonly_inputs { for dir in &self.config.readonly_inputs {
@ -144,7 +157,7 @@ impl CoverageTask {
// If a queue has been provided, poll it for new coverage. // If a queue has been provided, poll it for new coverage.
if let Some(queue) = &self.config.input_queue { if let Some(queue) = &self.config.input_queue {
verbose!("polling queue for new coverage"); info!("polling queue for new coverage");
let callback = CallbackImpl::new(queue.clone(), processor); let callback = CallbackImpl::new(queue.clone(), processor);
self.poller.run(callback).await?; self.poller.run(callback).await?;
} }
@ -273,7 +286,7 @@ impl CoverageProcessor {
#[async_trait] #[async_trait]
impl Processor for CoverageProcessor { impl Processor for CoverageProcessor {
async fn process(&mut self, _url: Url, input: &Path) -> Result<()> { async fn process(&mut self, _url: Option<Url>, input: &Path) -> Result<()> {
self.heartbeat_client.alive(); self.heartbeat_client.alive();
self.test_input(input).await?; self.test_input(input).await?;
self.report_total().await?; self.report_total().await?;

View File

@ -6,7 +6,7 @@ use crate::tasks::{
heartbeat::*, heartbeat::*,
utils::{self, default_bool_true}, utils::{self, default_bool_true},
}; };
use anyhow::{Error, Result}; use anyhow::Result;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use onefuzz::{ use onefuzz::{
expand::Expand, expand::Expand,
@ -23,18 +23,18 @@ use std::{
ffi::OsString, ffi::OsString,
path::{Path, PathBuf}, path::{Path, PathBuf},
process::Stdio, process::Stdio,
sync::Arc,
}; };
use tempfile::tempdir;
use tokio::{fs, process::Command}; use tokio::{fs, process::Command};
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
pub struct GeneratorConfig { pub struct Config {
pub generator_exe: String, pub generator_exe: String,
pub generator_env: HashMap<String, String>, pub generator_env: HashMap<String, String>,
pub generator_options: Vec<String>, pub generator_options: Vec<String>,
pub readonly_inputs: Vec<SyncedDir>, pub readonly_inputs: Vec<SyncedDir>,
pub crashes: SyncedDir, pub crashes: SyncedDir,
pub tools: SyncedDir, pub tools: Option<SyncedDir>,
pub target_exe: PathBuf, pub target_exe: PathBuf,
pub target_env: HashMap<String, String>, pub target_env: HashMap<String, String>,
@ -52,134 +52,141 @@ pub struct GeneratorConfig {
pub common: CommonConfig, pub common: CommonConfig,
} }
pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> { pub struct GeneratorTask {
config.crashes.init().await?; config: Config,
config.tools.init_pull().await?;
set_executable(&config.tools.path).await?;
let hb_client = config.common.init_heartbeat().await?;
for dir in &config.readonly_inputs {
dir.init_pull().await?;
}
let sync_task = continuous_sync(&config.readonly_inputs, Pull, config.ensemble_sync_delay);
let crash_dir_monitor = config.crashes.monitor_results(new_result);
let tester = Tester::new(
&config.common.setup_dir,
&config.target_exe,
&config.target_options,
&config.target_env,
&config.target_timeout,
config.check_asan_log,
false,
config.check_debugger,
config.check_retry_count,
);
let inputs: Vec<_> = config.readonly_inputs.iter().map(|x| &x.path).collect();
let fuzzing_monitor = start_fuzzing(&config, inputs, tester, hb_client);
futures::try_join!(fuzzing_monitor, sync_task, crash_dir_monitor)?;
Ok(())
} }
async fn generate_input( impl GeneratorTask {
generator_exe: &str, pub fn new(config: Config) -> Self {
generator_env: &HashMap<String, String>, Self { config }
generator_options: &[String],
tools_dir: impl AsRef<Path>,
corpus_dir: impl AsRef<Path>,
output_dir: impl AsRef<Path>,
) -> Result<()> {
let mut expand = Expand::new();
expand
.generated_inputs(&output_dir)
.input_corpus(&corpus_dir)
.generator_exe(&generator_exe)
.generator_options(&generator_options)
.tools_dir(&tools_dir);
utils::reset_tmp_dir(&output_dir).await?;
let generator_path = Expand::new()
.tools_dir(tools_dir.as_ref())
.evaluate_value(generator_exe)?;
let mut generator = Command::new(&generator_path);
generator
.kill_on_drop(true)
.env_remove("RUST_LOG")
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for arg in expand.evaluate(generator_options)? {
generator.arg(arg);
} }
for (k, v) in generator_env { pub async fn run(&self) -> Result<()> {
generator.env(k, expand.evaluate_value(v)?); self.config.crashes.init().await?;
if let Some(tools) = &self.config.tools {
if tools.url.is_some() {
tools.init_pull().await?;
set_executable(&tools.path).await?;
}
}
let hb_client = self.config.common.init_heartbeat().await?;
for dir in &self.config.readonly_inputs {
dir.init_pull().await?;
}
let sync_task = continuous_sync(
&self.config.readonly_inputs,
Pull,
self.config.ensemble_sync_delay,
);
let crash_dir_monitor = self.config.crashes.monitor_results(new_result);
let fuzzer = self.fuzzing_loop(hb_client);
futures::try_join!(fuzzer, sync_task, crash_dir_monitor)?;
Ok(())
} }
info!("Generating test cases with {:?}", generator); async fn fuzzing_loop(&self, heartbeat_client: Option<TaskHeartbeatClient>) -> Result<()> {
let output = generator.spawn()?; let tester = Tester::new(
monitor_process(output, "generator".to_string(), true, None).await?; &self.config.common.setup_dir,
&self.config.target_exe,
&self.config.target_options,
&self.config.target_env,
&self.config.target_timeout,
self.config.check_asan_log,
false,
self.config.check_debugger,
self.config.check_retry_count,
);
Ok(()) loop {
} for corpus_dir in &self.config.readonly_inputs {
heartbeat_client.alive();
let corpus_dir = &corpus_dir.path;
let generated_inputs = tempdir()?;
let generated_inputs_path = generated_inputs.path();
async fn start_fuzzing<'a>( self.generate_inputs(corpus_dir, &generated_inputs_path)
config: &GeneratorConfig, .await?;
corpus_dirs: Vec<impl AsRef<Path>>, self.test_inputs(&generated_inputs_path, &tester).await?;
tester: Tester<'_>, }
heartbeat_client: Option<TaskHeartbeatClient>, }
) -> Result<()> { }
let generator_tmp = "generator_tmp";
info!("Starting generator fuzzing loop"); async fn test_inputs(
&self,
generated_inputs: impl AsRef<Path>,
tester: &Tester<'_>,
) -> Result<()> {
let mut read_dir = fs::read_dir(generated_inputs).await?;
while let Some(file) = read_dir.next().await {
let file = file?;
loop { verbose!("testing input: {:?}", file);
heartbeat_client.alive();
for corpus_dir in &corpus_dirs { let destination_file = if self.config.rename_output {
let corpus_dir = corpus_dir.as_ref(); let hash = sha256::digest_file(file.path()).await?;
OsString::from(hash)
} else {
file.file_name()
};
generate_input( let destination_file = self.config.crashes.path.join(destination_file);
&config.generator_exe, if tester.is_crash(file.path()).await? {
&config.generator_env, fs::rename(file.path(), &destination_file).await?;
&config.generator_options, verbose!("crash found {}", destination_file.display());
&config.tools.path, }
corpus_dir, }
generator_tmp, Ok(())
) }
.await?;
let mut read_dir = fs::read_dir(generator_tmp).await?; async fn generate_inputs(
while let Some(file) = read_dir.next().await { &self,
verbose!("Processing file {:?}", file); corpus_dir: impl AsRef<Path>,
let file = file?; output_dir: impl AsRef<Path>,
) -> Result<()> {
utils::reset_tmp_dir(&output_dir).await?;
let mut generator = {
let mut expand = Expand::new();
expand
.generated_inputs(&output_dir)
.input_corpus(&corpus_dir)
.generator_exe(&self.config.generator_exe)
.generator_options(&self.config.generator_options);
let destination_file = if config.rename_output { if let Some(tools) = &self.config.tools {
let hash = sha256::digest_file(file.path()).await?; expand.tools_dir(&tools.path);
OsString::from(hash)
} else {
file.file_name()
};
let destination_file = config.crashes.path.join(destination_file);
if tester.is_crash(file.path()).await? {
info!("Crash found, path = {}", file.path().display());
if let Err(err) = fs::rename(file.path(), &destination_file).await {
warn!("Unable to move file {:?} : {:?}", file.path(), err);
}
}
} }
verbose!( let generator_path = expand.evaluate_value(&self.config.generator_exe)?;
"Tested generated inputs for corpus = {}",
corpus_dir.display() let mut generator = Command::new(&generator_path);
); generator
} .kill_on_drop(true)
.env_remove("RUST_LOG")
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for arg in expand.evaluate(&self.config.generator_options)? {
generator.arg(arg);
}
for (k, v) in &self.config.generator_env {
generator.env(k, expand.evaluate_value(v)?);
}
generator
};
info!("Generating test cases with {:?}", generator);
let output = generator.spawn()?;
monitor_process(output, "generator".to_string(), true, None).await?;
Ok(())
} }
} }
@ -187,16 +194,22 @@ mod tests {
#[tokio::test] #[tokio::test]
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
#[ignore] #[ignore]
async fn test_radamsa_linux() { async fn test_radamsa_linux() -> anyhow::Result<()> {
use super::*; use super::{Config, GeneratorTask};
use crate::tasks::config::CommonConfig;
use onefuzz::syncdir::SyncedDir;
use std::collections::HashMap;
use std::env; use std::env;
use std::path::Path;
use tempfile::tempdir;
let radamsa_path = env::var("ONEFUZZ_TEST_RADAMSA_LINUX").unwrap(); let crashes_temp = tempfile::tempdir()?;
let corpus_dir_temp = tempfile::tempdir().unwrap(); let crashes = crashes_temp.path();
let corpus_dir = corpus_dir_temp.into_path();
let seed_file_name = corpus_dir.clone().join("seed.txt"); let inputs_temp = tempfile::tempdir().unwrap();
let radamsa_output_temp = tempfile::tempdir().unwrap(); let inputs = inputs_temp.path();
let radamsa_output = radamsa_output_temp.into_path(); let input_file = inputs.join("seed.txt");
tokio::fs::write(input_file, "test").await?;
let generator_options: Vec<String> = vec![ let generator_options: Vec<String> = vec![
"-o", "-o",
@ -210,22 +223,45 @@ mod tests {
.map(|p| p.to_string()) .map(|p| p.to_string())
.collect(); .collect();
let radamsa_path = env::var("ONEFUZZ_TEST_RADAMSA_LINUX")?;
let radamsa_as_path = Path::new(&radamsa_path); let radamsa_as_path = Path::new(&radamsa_path);
let radamsa_dir = radamsa_as_path.parent().unwrap(); let radamsa_dir = radamsa_as_path.parent().unwrap();
let radamsa_exe = String::from("{tools_dir}/radamsa");
let radamsa_env = HashMap::new();
tokio::fs::write(seed_file_name, "test").await.unwrap(); let config = Config {
let _output = generate_input( generator_exe: String::from("{tools_dir}/radamsa"),
&radamsa_exe, generator_options,
&radamsa_env, readonly_inputs: vec![SyncedDir {
&generator_options, path: inputs.to_path_buf(),
&radamsa_dir, url: None,
corpus_dir, }],
radamsa_output.clone(), crashes: SyncedDir {
) path: crashes.to_path_buf(),
.await; url: None,
let generated_outputs = std::fs::read_dir(radamsa_output.clone()).unwrap(); },
assert_eq!(generated_outputs.count(), 100, "No crashes generated"); tools: Some(SyncedDir {
path: radamsa_dir.to_path_buf(),
url: None,
}),
target_exe: Default::default(),
target_env: Default::default(),
target_options: Default::default(),
target_timeout: None,
check_asan_log: false,
check_debugger: false,
rename_output: false,
ensemble_sync_delay: None,
generator_env: HashMap::default(),
check_retry_count: 0,
common: CommonConfig::default(),
};
let task = GeneratorTask::new(config);
let generated_inputs = tempdir()?;
task.generate_inputs(inputs.to_path_buf(), generated_inputs.path())
.await?;
let count = std::fs::read_dir(generated_inputs.path())?.count();
assert_eq!(count, 100, "No inputs generated");
Ok(())
} }
} }

View File

@ -36,6 +36,11 @@ const PROC_INFO_PERIOD: Duration = Duration::from_secs(30);
// Period of reporting fuzzer-generated runtime stats. // Period of reporting fuzzer-generated runtime stats.
const RUNTIME_STATS_PERIOD: Duration = Duration::from_secs(60); const RUNTIME_STATS_PERIOD: Duration = Duration::from_secs(60);
pub fn default_workers() -> u64 {
let cpus = num_cpus::get() as u64;
u64::max(1, cpus - 1)
}
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
pub struct Config { pub struct Config {
pub inputs: SyncedDir, pub inputs: SyncedDir,
@ -44,7 +49,9 @@ pub struct Config {
pub target_exe: PathBuf, pub target_exe: PathBuf,
pub target_env: HashMap<String, String>, pub target_env: HashMap<String, String>,
pub target_options: Vec<String>, pub target_options: Vec<String>,
pub target_workers: Option<u64>,
#[serde(default = "default_workers")]
pub target_workers: u64,
pub ensemble_sync_delay: Option<u64>, pub ensemble_sync_delay: Option<u64>,
#[serde(default = "default_bool_true")] #[serde(default = "default_bool_true")]
@ -66,23 +73,16 @@ impl LibFuzzerFuzzTask {
Ok(Self { config }) Ok(Self { config })
} }
pub async fn start(&self) -> Result<()> { fn workers(&self) -> u64 {
if self.config.check_fuzzer_help { match self.config.target_workers {
let target = LibFuzzer::new( 0 => default_workers(),
&self.config.target_exe, x => x,
&self.config.target_options,
&self.config.target_env,
&self.config.common.setup_dir,
);
target.check_help().await?;
} }
}
let workers = self.config.target_workers.unwrap_or_else(|| { pub async fn run(&self) -> Result<()> {
let cpus = num_cpus::get() as u64;
u64::max(1, cpus - 1)
});
self.init_directories().await?; self.init_directories().await?;
let hb_client = self.config.common.init_heartbeat().await?; let hb_client = self.config.common.init_heartbeat().await?;
// To be scheduled. // To be scheduled.
@ -91,15 +91,19 @@ impl LibFuzzerFuzzTask {
let new_crashes = self.config.crashes.monitor_results(new_result); let new_crashes = self.config.crashes.monitor_results(new_result);
let (stats_sender, stats_receiver) = mpsc::unbounded_channel(); let (stats_sender, stats_receiver) = mpsc::unbounded_channel();
let report_stats = report_runtime_stats(workers as usize, stats_receiver, hb_client); let report_stats = report_runtime_stats(self.workers() as usize, stats_receiver, hb_client);
let fuzzers = self.run_fuzzers(Some(&stats_sender));
futures::try_join!(resync, new_inputs, new_crashes, fuzzers, report_stats)?;
let fuzzers: Vec<_> = (0..workers) Ok(())
.map(|id| self.start_fuzzer_monitor(id, Some(&stats_sender))) }
pub async fn run_fuzzers(&self, stats_sender: Option<&StatsSender>) -> Result<()> {
let fuzzers: Vec<_> = (0..self.workers())
.map(|id| self.start_fuzzer_monitor(id, stats_sender))
.collect(); .collect();
let fuzzers = try_join_all(fuzzers); try_join_all(fuzzers).await?;
futures::try_join!(resync, new_inputs, new_crashes, fuzzers, report_stats)?;
Ok(()) Ok(())
} }
@ -146,7 +150,7 @@ impl LibFuzzerFuzzTask {
let crash_dir = tempdir()?; let crash_dir = tempdir()?;
let run_id = Uuid::new_v4(); let run_id = Uuid::new_v4();
info!("starting fuzzer run, run_id = {}", run_id); verbose!("starting fuzzer run, run_id = {}", run_id);
let mut inputs = vec![&self.config.inputs.path]; let mut inputs = vec![&self.config.inputs.path];
if let Some(readonly_inputs) = &self.config.readonly_inputs { if let Some(readonly_inputs) = &self.config.readonly_inputs {

View File

@ -5,8 +5,9 @@ use std::{fmt, path::PathBuf};
use anyhow::Result; use anyhow::Result;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use onefuzz::{blob::BlobUrl, fs::OwnedDir, jitter::delay_with_jitter, syncdir::SyncedDir}; use onefuzz::{blob::BlobUrl, jitter::delay_with_jitter, syncdir::SyncedDir};
use reqwest::Url; use reqwest::Url;
use tempfile::{tempdir, TempDir};
use tokio::{fs, time::Duration}; use tokio::{fs, time::Duration};
mod callback; mod callback;
@ -17,12 +18,12 @@ const POLL_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Debug)]
pub enum State<M> { pub enum State<M> {
Ready, Ready,
Polled(Option<M>), Polled(Option<M>),
Parsed(M, Url), Parsed(M, Url),
Downloaded(M, Url, PathBuf), Downloaded(M, Url, PathBuf, TempDir),
Processed(M), Processed(M),
} }
@ -78,10 +79,6 @@ impl<'a, M> fmt::Debug for Event<'a, M> {
/// application data (here, the input URL, in some encoding) and metadata for /// application data (here, the input URL, in some encoding) and metadata for
/// operations like finalizing a dequeue with a pop receipt. /// operations like finalizing a dequeue with a pop receipt.
pub struct InputPoller<M> { pub struct InputPoller<M> {
/// Agent-local directory where the poller will download inputs.
/// Will be reset for each new input.
working_dir: OwnedDir,
/// Internal automaton state. /// Internal automaton state.
/// ///
/// This is only nullable so we can internally `take()` the current state /// This is only nullable so we can internally `take()` the current state
@ -92,12 +89,10 @@ pub struct InputPoller<M> {
} }
impl<M> InputPoller<M> { impl<M> InputPoller<M> {
pub fn new(working_dir: impl Into<PathBuf>) -> Self { pub fn new() -> Self {
let working_dir = OwnedDir::new(working_dir);
let state = Some(State::Ready); let state = Some(State::Ready);
Self { Self {
state, state,
working_dir,
batch_dir: None, batch_dir: None,
} }
} }
@ -109,11 +104,14 @@ impl<M> InputPoller<M> {
to_process: &SyncedDir, to_process: &SyncedDir,
) -> Result<()> { ) -> Result<()> {
self.batch_dir = Some(to_process.clone()); self.batch_dir = Some(to_process.clone());
to_process.init_pull().await?; if to_process.url.is_some() {
to_process.init_pull().await?;
}
info!("batch processing directory: {}", to_process.path.display());
let mut read_dir = fs::read_dir(&to_process.path).await?; let mut read_dir = fs::read_dir(&to_process.path).await?;
while let Some(file) = read_dir.next().await { while let Some(file) = read_dir.next().await {
verbose!("Processing batch-downloaded input {:?}", file); info!("Processing batch-downloaded input {:?}", file);
let file = file?; let file = file?;
let path = file.path(); let path = file.path();
@ -126,7 +124,7 @@ impl<M> InputPoller<M> {
let dir_relative = input_path.strip_prefix(&dir_path)?; let dir_relative = input_path.strip_prefix(&dir_path)?;
dir_relative.display().to_string() dir_relative.display().to_string()
}; };
let url = to_process.url.blob(blob_name).url(); let url = to_process.try_url().map(|x| x.blob(blob_name).url()).ok();
processor.process(url, &path).await?; processor.process(url, &path).await?;
} }
@ -137,8 +135,8 @@ impl<M> InputPoller<M> {
pub async fn seen_in_batch(&self, url: &Url) -> Result<bool> { pub async fn seen_in_batch(&self, url: &Url) -> Result<bool> {
let result = if let Some(batch_dir) = &self.batch_dir { let result = if let Some(batch_dir) = &self.batch_dir {
if let Ok(blob) = BlobUrl::new(url.clone()) { if let Ok(blob) = BlobUrl::new(url.clone()) {
batch_dir.url.account() == blob.account() batch_dir.try_url()?.account() == blob.account()
&& batch_dir.url.container() == blob.container() && batch_dir.try_url()?.container() == blob.container()
&& batch_dir.path.join(blob.name()).exists() && batch_dir.path.join(blob.name()).exists()
} else { } else {
false false
@ -149,15 +147,6 @@ impl<M> InputPoller<M> {
Ok(result) Ok(result)
} }
/// Path to the working directory.
///
/// We will create or reset the working directory before entering the
/// `Downloaded` state, but a caller cannot otherwise assume it exists.
#[allow(unused)]
pub fn working_dir(&self) -> &OwnedDir {
&self.working_dir
}
/// Get the current automaton state, including the state data. /// Get the current automaton state, including the state data.
pub fn state(&self) -> &State<M> { pub fn state(&self) -> &State<M> {
self.state.as_ref().unwrap_or_else(|| unreachable!()) self.state.as_ref().unwrap_or_else(|| unreachable!())
@ -168,13 +157,14 @@ impl<M> InputPoller<M> {
} }
pub async fn run(&mut self, mut cb: impl Callback<M>) -> Result<()> { pub async fn run(&mut self, mut cb: impl Callback<M>) -> Result<()> {
info!("starting input queue polling");
loop { loop {
match self.state() { match self.state() {
State::Polled(None) => { State::Polled(None) => {
verbose!("Input queue empty, sleeping"); verbose!("Input queue empty, sleeping");
delay_with_jitter(POLL_INTERVAL).await; delay_with_jitter(POLL_INTERVAL).await;
} }
State::Downloaded(_msg, _url, input) => { State::Downloaded(_msg, _url, input, _tempdir) => {
info!("Processing downloaded input: {:?}", input); info!("Processing downloaded input: {:?}", input);
} }
_ => {} _ => {}
@ -249,21 +239,23 @@ impl<M> InputPoller<M> {
} }
} }
(Parsed(msg, url), Download(downloader)) => { (Parsed(msg, url), Download(downloader)) => {
self.working_dir.reset().await?; let download_dir = tempdir()?;
if self.seen_in_batch(&url).await? { if self.seen_in_batch(&url).await? {
verbose!("url was seen during batch processing: {:?}", url); verbose!("url was seen during batch processing: {:?}", url);
self.set_state(Processed(msg)); self.set_state(Processed(msg));
} else { } else {
let input = downloader let input = downloader
.download(url.clone(), self.working_dir.path()) .download(url.clone(), download_dir.path())
.await?; .await?;
self.set_state(Downloaded(msg, url, input)); self.set_state(Downloaded(msg, url, input, download_dir));
} }
} }
(Downloaded(msg, url, input), Process(processor)) => { // NOTE: _download_dir is a TempDir, which the physical path gets
processor.process(url, &input).await?; // deleted automatically upon going out of scope. Keep it in-scope until
// here.
(Downloaded(msg, url, input, _download_dir), Process(processor)) => {
processor.process(Some(url), &input).await?;
self.set_state(Processed(msg)); self.set_state(Processed(msg));
} }

View File

@ -26,7 +26,7 @@ pub trait Downloader {
#[async_trait] #[async_trait]
pub trait Processor { pub trait Processor {
async fn process(&mut self, url: Url, input: &Path) -> Result<()>; async fn process(&mut self, url: Option<Url>, input: &Path) -> Result<()>;
} }
pub trait Callback<M> { pub trait Callback<M> {

View File

@ -5,7 +5,6 @@ use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use reqwest::Url; use reqwest::Url;
use std::path::Path; use std::path::Path;
use tempfile::{tempdir, TempDir};
use super::*; use super::*;
@ -84,12 +83,12 @@ impl Downloader for TestDownloader {
#[derive(Default)] #[derive(Default)]
struct TestProcessor { struct TestProcessor {
processed: Vec<(Url, PathBuf)>, processed: Vec<(Option<Url>, PathBuf)>,
} }
#[async_trait] #[async_trait]
impl Processor for TestProcessor { impl Processor for TestProcessor {
async fn process(&mut self, url: Url, input: &Path) -> Result<()> { async fn process(&mut self, url: Option<Url>, input: &Path) -> Result<()> {
self.processed.push((url, input.to_owned())); self.processed.push((url, input.to_owned()));
Ok(()) Ok(())
@ -100,11 +99,10 @@ fn url_input_name(url: &Url) -> String {
url.path_segments().unwrap().last().unwrap().to_owned() url.path_segments().unwrap().last().unwrap().to_owned()
} }
fn fixture() -> (TempDir, InputPoller<Msg>) { fn fixture() -> InputPoller<Msg> {
let dir = tempdir().unwrap(); let task = InputPoller::new();
let task = InputPoller::new(dir.path());
(dir, task) task
} }
fn url_fixture(msg: Msg) -> Url { fn url_fixture(msg: Msg) -> Url {
@ -118,7 +116,7 @@ fn input_fixture(dir: &Path, msg: Msg) -> PathBuf {
#[tokio::test] #[tokio::test]
async fn test_ready_poll() { async fn test_ready_poll() {
let (_, mut task) = fixture(); let mut task = fixture();
let msg: Msg = 0; let msg: Msg = 0;
@ -135,7 +133,7 @@ async fn test_ready_poll() {
#[tokio::test] #[tokio::test]
async fn test_polled_some_parse() { async fn test_polled_some_parse() {
let (_, mut task) = fixture(); let mut task = fixture();
let msg: Msg = 0; let msg: Msg = 0;
let url = url_fixture(msg); let url = url_fixture(msg);
@ -153,7 +151,7 @@ async fn test_polled_some_parse() {
#[tokio::test] #[tokio::test]
async fn test_polled_none_parse() { async fn test_polled_none_parse() {
let (_, mut task) = fixture(); let mut task = fixture();
task.set_state(State::Polled(None)); task.set_state(State::Polled(None));
@ -166,11 +164,12 @@ async fn test_polled_none_parse() {
#[tokio::test] #[tokio::test]
async fn test_parsed_download() { async fn test_parsed_download() {
let (dir, mut task) = fixture(); let mut task = fixture();
let dir = Path::new("etc");
let msg: Msg = 0; let msg: Msg = 0;
let url = url_fixture(msg); let url = url_fixture(msg);
let input = input_fixture(dir.path(), msg); let input = input_fixture(&dir, msg);
task.set_state(State::Parsed(msg, url.clone())); task.set_state(State::Parsed(msg, url.clone()));
@ -180,17 +179,27 @@ async fn test_parsed_download() {
.await .await
.unwrap(); .unwrap();
assert_eq!(task.state(), &State::Downloaded(msg, url.clone(), input)); match task.state() {
assert_eq!(downloader.downloaded, vec![url]); State::Downloaded(got_msg, got_url, got_path) => {
assert_eq!(*got_msg, msg);
assert_eq!(*got_url, url);
assert_eq!(got_path.file_name(), input.file_name());
}
_ => {
panic!("unexpected state");
}
}
} }
#[tokio::test] #[tokio::test]
async fn test_downloaded_process() { async fn test_downloaded_process() {
let (dir, mut task) = fixture(); let mut task = fixture();
let dir = Path::new("etc");
let msg: Msg = 0; let msg: Msg = 0;
let url = url_fixture(msg); let url = url_fixture(msg);
let input = input_fixture(dir.path(), msg); let input = input_fixture(dir, msg);
task.set_state(State::Downloaded(msg, url.clone(), input.clone())); task.set_state(State::Downloaded(msg, url.clone(), input.clone()));
@ -199,12 +208,12 @@ async fn test_downloaded_process() {
task.trigger(Event::Process(&mut processor)).await.unwrap(); task.trigger(Event::Process(&mut processor)).await.unwrap();
assert_eq!(task.state(), &State::Processed(msg)); assert_eq!(task.state(), &State::Processed(msg));
assert_eq!(processor.processed, vec![(url, input)]); assert_eq!(processor.processed, vec![(Some(url), input)]);
} }
#[tokio::test] #[tokio::test]
async fn test_processed_finish() { async fn test_processed_finish() {
let (_, mut task) = fixture(); let mut task = fixture();
let msg: Msg = 0; let msg: Msg = 0;
@ -220,7 +229,7 @@ async fn test_processed_finish() {
#[tokio::test] #[tokio::test]
async fn test_invalid_trigger() { async fn test_invalid_trigger() {
let (_, mut task) = fixture(); let mut task = fixture();
let mut queue = TestQueue::default(); let mut queue = TestQueue::default();
@ -233,7 +242,7 @@ async fn test_invalid_trigger() {
#[tokio::test] #[tokio::test]
async fn test_valid_trigger_failed_action() { async fn test_valid_trigger_failed_action() {
let (_, mut task) = fixture(); let mut task = fixture();
let mut queue = TestQueueAlwaysFails; let mut queue = TestQueueAlwaysFails;

View File

@ -4,22 +4,27 @@
use anyhow::Result; use anyhow::Result;
use onefuzz::{ use onefuzz::{
asan::AsanLog, asan::AsanLog,
blob::{BlobClient, BlobContainerUrl, BlobUrl}, blob::{BlobClient, BlobUrl},
fs::exists,
syncdir::SyncedDir, syncdir::SyncedDir,
telemetry::Event::{new_report, new_unable_to_reproduce, new_unique_report}, telemetry::{
Event::{new_report, new_unable_to_reproduce, new_unique_report},
EventData,
},
}; };
use reqwest::{StatusCode, Url};
use reqwest::StatusCode;
use reqwest_retry::SendRetry; use reqwest_retry::SendRetry;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct CrashReport { pub struct CrashReport {
pub input_sha256: String, pub input_sha256: String,
pub input_blob: InputBlob, #[serde(skip_serializing_if = "Option::is_none")]
pub input_blob: Option<InputBlob>,
pub executable: PathBuf, pub executable: PathBuf,
@ -44,7 +49,8 @@ pub struct CrashReport {
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct NoCrash { pub struct NoCrash {
pub input_sha256: String, pub input_sha256: String,
pub input_blob: InputBlob, #[serde(skip_serializing_if = "Option::is_none")]
pub input_blob: Option<InputBlob>,
pub executable: PathBuf, pub executable: PathBuf,
pub task_id: Uuid, pub task_id: Uuid,
pub job_id: Uuid, pub job_id: Uuid,
@ -59,44 +65,44 @@ pub enum CrashTestResult {
} }
// Conditionally upload a report, if it would not be a duplicate. // Conditionally upload a report, if it would not be a duplicate.
// async fn upload<T: Serialize>(report: &T, url: Url) -> Result<bool> {
// Use SHA-256 of call stack as dedupe key.
async fn upload_deduped(report: &CrashReport, container: &BlobContainerUrl) -> Result<()> {
let blob = BlobClient::new(); let blob = BlobClient::new();
let deduped_name = report.unique_blob_name();
let deduped_url = container.blob(deduped_name).url();
let result = blob let result = blob
.put(deduped_url) .put(url)
.json(report) .json(report)
// Conditional PUT, only if-not-exists. // Conditional PUT, only if-not-exists.
// https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations // https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations
.header("If-None-Match", "*") .header("If-None-Match", "*")
.send_retry_default() .send_retry_default()
.await?; .await?;
if result.status() == StatusCode::CREATED { Ok(result.status() == StatusCode::CREATED)
event!(new_unique_report;); }
async fn upload_or_save_local<T: Serialize>(
report: &T,
dest_name: &str,
container: &SyncedDir,
) -> Result<bool> {
match &container.url {
Some(blob_url) => {
let url = blob_url.blob(dest_name).url();
upload(report, url).await
}
None => {
let path = container.path.join(dest_name);
if !exists(&path).await? {
let data = serde_json::to_vec(&report)?;
fs::write(path, data).await?;
Ok(true)
} else {
Ok(false)
}
}
} }
Ok(())
}
async fn upload_report(report: &CrashReport, container: &BlobContainerUrl) -> Result<()> {
event!(new_report;);
let blob = BlobClient::new();
let url = container.blob(report.blob_name()).url();
blob.put(url).json(report).send_retry_default().await?;
Ok(())
}
async fn upload_no_repro(report: &NoCrash, container: &BlobContainerUrl) -> Result<()> {
event!(new_unable_to_reproduce;);
let blob = BlobClient::new();
let url = container.blob(report.blob_name()).url();
blob.put(url).json(report).send_retry_default().await?;
Ok(())
} }
impl CrashTestResult { impl CrashTestResult {
pub async fn upload( pub async fn save(
&self, &self,
unique_reports: &SyncedDir, unique_reports: &SyncedDir,
reports: &Option<SyncedDir>, reports: &Option<SyncedDir>,
@ -104,14 +110,26 @@ impl CrashTestResult {
) -> Result<()> { ) -> Result<()> {
match self { match self {
Self::CrashReport(report) => { Self::CrashReport(report) => {
upload_deduped(report, &unique_reports.url).await?; // Use SHA-256 of call stack as dedupe key.
let name = report.unique_blob_name();
if upload_or_save_local(&report, &name, unique_reports).await? {
event!(new_unique_report; EventData::Path = name);
}
if let Some(reports) = reports { if let Some(reports) = reports {
upload_report(report, &reports.url).await?; let name = report.blob_name();
if upload_or_save_local(&report, &name, reports).await? {
event!(new_report; EventData::Path = name);
}
} }
} }
Self::NoRepro(report) => { Self::NoRepro(report) => {
if let Some(no_repro) = no_repro { if let Some(no_repro) = no_repro {
upload_no_repro(report, &no_repro.url).await?; let name = report.blob_name();
if upload_or_save_local(&report, &name, no_repro).await? {
event!(new_unable_to_reproduce; EventData::Path = name);
}
} }
} }
} }
@ -142,7 +160,7 @@ impl CrashReport {
task_id: Uuid, task_id: Uuid,
job_id: Uuid, job_id: Uuid,
executable: impl Into<PathBuf>, executable: impl Into<PathBuf>,
input_blob: InputBlob, input_blob: Option<InputBlob>,
input_sha256: String, input_sha256: String,
) -> Self { ) -> Self {
Self { Self {

View File

@ -10,7 +10,10 @@ use crate::tasks::{
}; };
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use onefuzz::{blob::BlobUrl, input_tester::Tester, sha256, syncdir::SyncedDir}; use futures::stream::StreamExt;
use onefuzz::{
blob::BlobUrl, input_tester::Tester, monitor::DirectoryMonitor, sha256, syncdir::SyncedDir,
};
use reqwest::Url; use reqwest::Url;
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
@ -44,35 +47,64 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub check_retry_count: u64, pub check_retry_count: u64,
#[serde(default = "default_bool_true")]
pub check_queue: bool,
#[serde(flatten)] #[serde(flatten)]
pub common: CommonConfig, pub common: CommonConfig,
} }
pub struct ReportTask<'a> { pub struct ReportTask {
config: &'a Config, config: Config,
poller: InputPoller<Message>, poller: InputPoller<Message>,
} }
impl<'a> ReportTask<'a> { impl ReportTask {
pub fn new(config: &'a Config) -> Self { pub fn new(config: Config) -> Self {
let working_dir = config.common.task_id.to_string(); let poller = InputPoller::new();
let poller = InputPoller::new(working_dir);
Self { config, poller } Self { config, poller }
} }
pub async fn run(&mut self) -> Result<()> { pub async fn local_run(&self) -> Result<()> {
let mut processor = GenericReportProcessor::new(&self.config, None);
info!("Starting generic crash report task");
let crashes = match &self.config.crashes {
Some(x) => x,
None => bail!("missing crashes directory"),
};
let mut read_dir = tokio::fs::read_dir(&crashes.path).await?;
while let Some(crash) = read_dir.next().await {
processor.process(None, &crash?.path()).await?;
}
if self.config.check_queue {
let mut monitor = DirectoryMonitor::new(&crashes.path);
monitor.start()?;
while let Some(crash) = monitor.next().await {
processor.process(None, &crash).await?;
}
}
Ok(())
}
pub async fn managed_run(&mut self) -> Result<()> {
info!("Starting generic crash report task"); info!("Starting generic crash report task");
let heartbeat_client = self.config.common.init_heartbeat().await?; let heartbeat_client = self.config.common.init_heartbeat().await?;
let mut processor = GenericReportProcessor::new(&self.config, heartbeat_client); let mut processor = GenericReportProcessor::new(&self.config, heartbeat_client);
info!("processing existing crashes");
if let Some(crashes) = &self.config.crashes { if let Some(crashes) = &self.config.crashes {
self.poller.batch_process(&mut processor, &crashes).await?; self.poller.batch_process(&mut processor, &crashes).await?;
} }
if let Some(queue) = &self.config.input_queue { info!("processing crashes from queue");
let callback = CallbackImpl::new(queue.clone(), processor); if self.config.check_queue {
self.poller.run(callback).await?; if let Some(queue) = &self.config.input_queue {
let callback = CallbackImpl::new(queue.clone(), processor);
self.poller.run(callback).await?;
}
} }
Ok(()) Ok(())
} }
@ -105,12 +137,19 @@ impl<'a> GenericReportProcessor<'a> {
} }
} }
pub async fn test_input(&self, input_url: Url, input: &Path) -> Result<CrashTestResult> { pub async fn test_input(
&self,
input_url: Option<Url>,
input: &Path,
) -> Result<CrashTestResult> {
self.heartbeat_client.alive(); self.heartbeat_client.alive();
let input_sha256 = sha256::digest_file(input).await?; let input_sha256 = sha256::digest_file(input).await?;
let task_id = self.config.common.task_id; let task_id = self.config.common.task_id;
let job_id = self.config.common.job_id; let job_id = self.config.common.job_id;
let input_blob = InputBlob::from(BlobUrl::new(input_url)?); let input_blob = match input_url {
Some(x) => Some(InputBlob::from(BlobUrl::new(x)?)),
None => None,
};
let test_report = self.tester.test_input(input).await?; let test_report = self.tester.test_input(input).await?;
@ -160,10 +199,11 @@ impl<'a> GenericReportProcessor<'a> {
#[async_trait] #[async_trait]
impl<'a> Processor for GenericReportProcessor<'a> { impl<'a> Processor for GenericReportProcessor<'a> {
async fn process(&mut self, url: Url, input: &Path) -> Result<()> { async fn process(&mut self, url: Option<Url>, input: &Path) -> Result<()> {
verbose!("generating crash report for: {}", input.display());
let report = self.test_input(url, input).await?; let report = self.test_input(url, input).await?;
report report
.upload( .save(
&self.config.unique_reports, &self.config.unique_reports,
&self.config.reports, &self.config.reports,
&self.config.no_repro, &self.config.no_repro,

View File

@ -5,9 +5,12 @@ use super::crash_report::*;
use crate::tasks::{ use crate::tasks::{
config::CommonConfig, generic::input_poller::*, heartbeat::*, utils::default_bool_true, config::CommonConfig, generic::input_poller::*, heartbeat::*, utils::default_bool_true,
}; };
use anyhow::Result; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use onefuzz::{blob::BlobUrl, libfuzzer::LibFuzzer, sha256, syncdir::SyncedDir}; use futures::stream::StreamExt;
use onefuzz::{
blob::BlobUrl, libfuzzer::LibFuzzer, monitor::DirectoryMonitor, sha256, syncdir::SyncedDir,
};
use reqwest::Url; use reqwest::Url;
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
@ -36,6 +39,9 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub check_retry_count: u64, pub check_retry_count: u64,
#[serde(default = "default_bool_true")]
pub check_queue: bool,
#[serde(flatten)] #[serde(flatten)]
pub common: CommonConfig, pub common: CommonConfig,
} }
@ -46,26 +52,52 @@ pub struct ReportTask {
} }
impl ReportTask { impl ReportTask {
pub fn new(config: impl Into<Arc<Config>>) -> Self { pub fn new(config: Config) -> Self {
let config = config.into(); let poller = InputPoller::new();
let config = Arc::new(config);
let working_dir = config.common.task_id.to_string();
let poller = InputPoller::new(working_dir);
Self { config, poller } Self { config, poller }
} }
pub async fn run(&mut self) -> Result<()> { pub async fn local_run(&self) -> Result<()> {
if self.config.check_fuzzer_help { let mut processor = AsanProcessor::new(self.config.clone()).await?;
let target = LibFuzzer::new( let crashes = match &self.config.crashes {
&self.config.target_exe, Some(x) => x,
&self.config.target_options, None => bail!("missing crashes directory"),
&self.config.target_env, };
&self.config.common.setup_dir, crashes.init().await?;
);
target.check_help().await?; self.config.unique_reports.init().await?;
if let Some(reports) = &self.config.reports {
reports.init().await?;
}
if let Some(no_repro) = &self.config.no_repro {
no_repro.init().await?;
} }
let mut read_dir = tokio::fs::read_dir(&crashes.path).await.with_context(|| {
format_err!(
"unable to read crashes directory {}",
crashes.path.display()
)
})?;
while let Some(crash) = read_dir.next().await {
processor.process(None, &crash?.path()).await?;
}
if self.config.check_queue {
let mut monitor = DirectoryMonitor::new(crashes.path.clone());
monitor.start()?;
while let Some(crash) = monitor.next().await {
processor.process(None, &crash).await?;
}
}
Ok(())
}
pub async fn managed_run(&mut self) -> Result<()> {
info!("Starting libFuzzer crash report task"); info!("Starting libFuzzer crash report task");
let mut processor = AsanProcessor::new(self.config.clone()).await?; let mut processor = AsanProcessor::new(self.config.clone()).await?;
@ -73,9 +105,11 @@ impl ReportTask {
self.poller.batch_process(&mut processor, crashes).await?; self.poller.batch_process(&mut processor, crashes).await?;
} }
if let Some(queue) = &self.config.input_queue { if self.config.check_queue {
let callback = CallbackImpl::new(queue.clone(), processor); if let Some(queue) = &self.config.input_queue {
self.poller.run(callback).await?; let callback = CallbackImpl::new(queue.clone(), processor);
self.poller.run(callback).await?;
}
} }
Ok(()) Ok(())
} }
@ -96,7 +130,11 @@ impl AsanProcessor {
}) })
} }
pub async fn test_input(&self, input_url: Url, input: &Path) -> Result<CrashTestResult> { pub async fn test_input(
&self,
input_url: Option<Url>,
input: &Path,
) -> Result<CrashTestResult> {
self.heartbeat_client.alive(); self.heartbeat_client.alive();
let fuzzer = LibFuzzer::new( let fuzzer = LibFuzzer::new(
&self.config.target_exe, &self.config.target_exe,
@ -107,8 +145,13 @@ impl AsanProcessor {
let task_id = self.config.common.task_id; let task_id = self.config.common.task_id;
let job_id = self.config.common.job_id; let job_id = self.config.common.job_id;
let input_blob = InputBlob::from(BlobUrl::new(input_url)?); let input_blob = match input_url {
let input_sha256 = sha256::digest_file(input).await?; Some(x) => Some(InputBlob::from(BlobUrl::new(x)?)),
None => None,
};
let input_sha256 = sha256::digest_file(input).await.with_context(|| {
format_err!("unable to sha256 digest input file: {}", input.display())
})?;
let test_report = fuzzer let test_report = fuzzer
.repro( .repro(
@ -149,10 +192,11 @@ impl AsanProcessor {
#[async_trait] #[async_trait]
impl Processor for AsanProcessor { impl Processor for AsanProcessor {
async fn process(&mut self, url: Url, input: &Path) -> Result<()> { async fn process(&mut self, url: Option<Url>, input: &Path) -> Result<()> {
verbose!("processing libfuzzer crash url:{:?} path:{:?}", url, input);
let report = self.test_input(url, input).await?; let report = self.test_input(url, input).await?;
report report
.upload( .save(
&self.config.unique_reports, &self.config.unique_reports,
&self.config.reports, &self.config.reports,
&self.config.no_repro, &self.config.no_repro,

View File

@ -225,9 +225,8 @@ impl IWorkerRunner for WorkerRunner {
let mut cmd = Command::new("onefuzz-agent"); let mut cmd = Command::new("onefuzz-agent");
cmd.current_dir(&working_dir); cmd.current_dir(&working_dir);
cmd.arg("-c"); cmd.arg("managed");
cmd.arg("config.json"); cmd.arg("config.json");
cmd.arg("-s");
cmd.arg(setup_dir); cmd.arg(setup_dir);
cmd.stderr(Stdio::piped()); cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped()); cmd.stdout(Stdio::piped());

View File

@ -16,6 +16,7 @@ use tokio::process::{Child, Command};
const DEFAULT_MAX_TOTAL_SECONDS: i32 = 10 * 60; const DEFAULT_MAX_TOTAL_SECONDS: i32 = 10 * 60;
#[derive(Debug)]
pub struct LibFuzzerMergeOutput { pub struct LibFuzzerMergeOutput {
pub added_files_count: i32, pub added_files_count: i32,
pub added_feature_count: i32, pub added_feature_count: i32,

View File

@ -26,13 +26,18 @@ const DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS: u64 = 60;
#[derive(Debug, Deserialize, Clone, PartialEq)] #[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct SyncedDir { pub struct SyncedDir {
pub path: PathBuf, pub path: PathBuf,
pub url: BlobContainerUrl, pub url: Option<BlobContainerUrl>,
} }
impl SyncedDir { impl SyncedDir {
pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> { pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
if self.url.is_none() {
verbose!("not syncing as SyncedDir is missing remote URL");
return Ok(());
}
let dir = &self.path; let dir = &self.path;
let url = self.url.url(); let url = self.url.as_ref().unwrap().url();
let url = url.as_ref(); let url = url.as_ref();
verbose!("syncing {:?} {}", operation, dir.display()); verbose!("syncing {:?} {}", operation, dir.display());
match operation { match operation {
@ -41,6 +46,14 @@ impl SyncedDir {
} }
} }
pub fn try_url(&self) -> Result<&BlobContainerUrl> {
let url = match &self.url {
Some(x) => x,
None => bail!("missing URL context"),
};
Ok(url)
}
pub async fn init_pull(&self) -> Result<()> { pub async fn init_pull(&self) -> Result<()> {
self.init().await?; self.init().await?;
self.sync(SyncOperation::Pull, false).await self.sync(SyncOperation::Pull, false).await
@ -55,7 +68,7 @@ impl SyncedDir {
anyhow::bail!("File with name '{}' already exists", self.path.display()); anyhow::bail!("File with name '{}' already exists", self.path.display());
} }
} }
Err(_) => fs::create_dir(&self.path).await.with_context(|| { Err(_) => fs::create_dir_all(&self.path).await.with_context(|| {
format!("unable to create local SyncedDir: {}", self.path.display()) format!("unable to create local SyncedDir: {}", self.path.display())
}), }),
} }
@ -74,6 +87,11 @@ impl SyncedDir {
operation: SyncOperation, operation: SyncOperation,
delay_seconds: Option<u64>, delay_seconds: Option<u64>,
) -> Result<()> { ) -> Result<()> {
if self.url.is_none() {
verbose!("not continuously syncing, as SyncDir does not have a remote URL");
return Ok(());
}
let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS); let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS);
if delay_seconds == 0 { if delay_seconds == 0 {
return Ok(()); return Ok(());
@ -86,23 +104,24 @@ impl SyncedDir {
} }
} }
async fn file_uploader_monitor(&self, event: Event) -> Result<()> { async fn file_monitor_event(&self, event: Event) -> Result<()> {
let url = self.url.url();
verbose!("monitoring {}", self.path.display()); verbose!("monitoring {}", self.path.display());
let mut monitor = DirectoryMonitor::new(self.path.clone()); let mut monitor = DirectoryMonitor::new(self.path.clone());
monitor.start()?; monitor.start()?;
let mut uploader = BlobUploader::new(url);
let mut uploader = self.url.as_ref().map(|x| BlobUploader::new(x.url()));
while let Some(item) = monitor.next().await { while let Some(item) = monitor.next().await {
event!(event.clone(); EventData::Path = item.display().to_string()); event!(event.clone(); EventData::Path = item.display().to_string());
if let Err(err) = uploader.upload(item.clone()).await { if let Some(uploader) = &mut uploader {
bail!( if let Err(err) = uploader.upload(item.clone()).await {
"Couldn't upload file. path:{} dir:{} err:{}", bail!(
item.display(), "Couldn't upload file. path:{} dir:{} err:{}",
self.path.display(), item.display(),
err self.path.display(),
); err
);
}
} }
} }
@ -129,16 +148,34 @@ impl SyncedDir {
} }
verbose!("starting monitor for {}", self.path.display()); verbose!("starting monitor for {}", self.path.display());
self.file_uploader_monitor(event.clone()).await?; self.file_monitor_event(event.clone()).await?;
} }
} }
} }
impl From<PathBuf> for SyncedDir {
fn from(path: PathBuf) -> Self {
Self { path, url: None }
}
}
pub async fn continuous_sync( pub async fn continuous_sync(
dirs: &[SyncedDir], dirs: &[SyncedDir],
operation: SyncOperation, operation: SyncOperation,
delay_seconds: Option<u64>, delay_seconds: Option<u64>,
) -> Result<()> { ) -> Result<()> {
let mut should_loop = false;
for dir in dirs {
if dir.url.is_some() {
should_loop = true;
break;
}
}
if !should_loop {
verbose!("not syncing as SyncDirs do not have remote URLs");
return Ok(());
}
let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS); let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS);
if delay_seconds == 0 { if delay_seconds == 0 {
return Ok(()); return Ok(());

View File

@ -309,6 +309,16 @@ pub fn set_property(entry: EventData) {
} }
} }
fn local_log_event(event: &Event, properties: &[EventData]) {
let as_values = properties
.iter()
.map(|x| x.as_values())
.map(|(x, y)| format!("{}:{}", x, y))
.collect::<Vec<String>>()
.join(" ");
log::log!(log::Level::Info, "{} {}", event.as_str(), as_values);
}
pub fn track_event(event: Event, properties: Vec<EventData>) { pub fn track_event(event: Event, properties: Vec<EventData>) {
use appinsights::telemetry::Telemetry; use appinsights::telemetry::Telemetry;
@ -334,6 +344,7 @@ pub fn track_event(event: Event, properties: Vec<EventData>) {
} }
client.track(evt); client.track(evt);
} }
local_log_event(&event, &properties);
} }
#[macro_export] #[macro_export]

View File

@ -154,7 +154,7 @@ def build_work_unit(task: Task) -> Optional[Tuple[BucketConfig, WorkUnit]]:
job_id=task_config.job_id, job_id=task_config.job_id,
task_id=task_config.task_id, task_id=task_config.task_id,
task_type=task_config.task_type, task_type=task_config.task_type,
config=task_config.json(), config=task_config.json(exclude_none=True, exclude_unset=True),
) )
bucket_config = BucketConfig( bucket_config = BucketConfig(