Adding option to merge all inputs at once (#282)

This commit is contained in:
Cheick Keita
2020-11-24 05:43:08 -08:00
committed by GitHub
parent 79cc82098a
commit 33b7608aaf
14 changed files with 316 additions and 66 deletions

View File

@ -265,6 +265,10 @@ Each event will be submitted via HTTP POST to the user provided URL.
"ensemble_sync_delay": { "ensemble_sync_delay": {
"title": "Ensemble Sync Delay", "title": "Ensemble Sync Delay",
"type": "integer" "type": "integer"
},
"preserve_existing_outputs": {
"title": "Preserve Existing Outputs",
"type": "boolean"
} }
}, },
"required": [ "required": [
@ -884,6 +888,10 @@ Each event will be submitted via HTTP POST to the user provided URL.
"ensemble_sync_delay": { "ensemble_sync_delay": {
"title": "Ensemble Sync Delay", "title": "Ensemble Sync Delay",
"type": "integer" "type": "integer"
},
"preserve_existing_outputs": {
"title": "Preserve Existing Outputs",
"type": "boolean"
} }
}, },
"required": [ "required": [

View File

@ -10,6 +10,7 @@ pub fn run(args: &clap::ArgMatches) -> Result<()> {
("libfuzzer-coverage", Some(sub)) => crate::debug::libfuzzer_coverage::run(sub)?, ("libfuzzer-coverage", Some(sub)) => crate::debug::libfuzzer_coverage::run(sub)?,
("libfuzzer-crash-report", Some(sub)) => crate::debug::libfuzzer_crash_report::run(sub)?, ("libfuzzer-crash-report", Some(sub)) => crate::debug::libfuzzer_crash_report::run(sub)?,
("libfuzzer-fuzz", Some(sub)) => crate::debug::libfuzzer_fuzz::run(sub)?, ("libfuzzer-fuzz", Some(sub)) => crate::debug::libfuzzer_fuzz::run(sub)?,
("libfuzzer-merge", Some(sub)) => crate::debug::libfuzzer_merge::run(sub)?,
_ => println!("missing subcommand\nUSAGE : {}", args.usage()), _ => println!("missing subcommand\nUSAGE : {}", args.usage()),
} }
@ -23,4 +24,5 @@ pub fn args() -> App<'static, 'static> {
.subcommand(crate::debug::libfuzzer_coverage::args()) .subcommand(crate::debug::libfuzzer_coverage::args())
.subcommand(crate::debug::libfuzzer_crash_report::args()) .subcommand(crate::debug::libfuzzer_crash_report::args())
.subcommand(crate::debug::libfuzzer_fuzz::args()) .subcommand(crate::debug::libfuzzer_fuzz::args())
.subcommand(crate::debug::libfuzzer_merge::args())
} }

View File

@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::CommonConfig,
merge::libfuzzer_merge::{merge_inputs, Config},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;
pub fn run(args: &clap::ArgMatches) -> Result<()> {
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let inputs = value_t!(args, "inputs", String)?;
let unique_inputs = value_t!(args, "unique_inputs", String)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default();
let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}
let config = Arc::new(Config {
target_exe,
target_env,
target_options,
input_queue: None,
inputs: vec![SyncedDir {
path: inputs.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/inputs")?)?,
}],
unique_inputs: SyncedDir {
path: unique_inputs.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/unique_inputs")?)?,
},
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
},
preserve_existing_outputs: true,
});
let mut rt = Runtime::new()?;
rt.block_on(merge_inputs(
config.clone(),
vec![config.clone().inputs[0].path.clone()],
))?;
Ok(())
}
pub fn args() -> App<'static, 'static> {
SubCommand::with_name("libfuzzer-merge")
.about("execute a local-only libfuzzer merge task")
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("inputs").takes_value(true).required(true))
.arg(
Arg::with_name("unique_inputs")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
}

View File

@ -6,3 +6,4 @@ pub mod generic_crash_report;
pub mod libfuzzer_coverage; pub mod libfuzzer_coverage;
pub mod libfuzzer_crash_report; pub mod libfuzzer_crash_report;
pub mod libfuzzer_fuzz; pub mod libfuzzer_fuzz;
pub mod libfuzzer_merge;

View File

@ -7,7 +7,7 @@ use onefuzz::{
http::ResponseExt, http::ResponseExt,
jitter::delay_with_jitter, jitter::delay_with_jitter,
libfuzzer::{LibFuzzer, LibFuzzerMergeOutput}, libfuzzer::{LibFuzzer, LibFuzzerMergeOutput},
syncdir::SyncedDir, syncdir::{SyncOperation, SyncedDir},
}; };
use reqwest::Url; use reqwest::Url;
use reqwest_retry::SendRetry; use reqwest_retry::SendRetry;
@ -22,7 +22,6 @@ use storage_queue::{QueueClient, EMPTY_QUEUE_DELAY};
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct QueueMessage { struct QueueMessage {
content_length: u32, content_length: u32,
url: Url, url: Url,
} }
@ -31,39 +30,52 @@ pub struct Config {
pub target_exe: PathBuf, pub target_exe: PathBuf,
pub target_env: HashMap<String, String>, pub target_env: HashMap<String, String>,
pub target_options: Vec<String>, pub target_options: Vec<String>,
pub input_queue: Url, pub input_queue: Option<Url>,
pub inputs: SyncedDir, pub inputs: Vec<SyncedDir>,
pub unique_inputs: SyncedDir, pub unique_inputs: SyncedDir,
pub preserve_existing_outputs: bool,
#[serde(flatten)] #[serde(flatten)]
pub common: CommonConfig, pub common: CommonConfig,
} }
pub async fn spawn(config: Arc<Config>) -> Result<()> { pub async fn spawn(config: Arc<Config>) -> Result<()> {
let hb_client = config.common.init_heartbeat().await?;
config.unique_inputs.init().await?; config.unique_inputs.init().await?;
loop { if let Some(url) = config.input_queue.clone() {
hb_client.alive(); loop {
if let Err(error) = process_message(config.clone()).await { let queue = QueueClient::new(url.clone());
error!( if let Err(error) = process_message(config.clone(), queue).await {
"failed to process latest message from notification queue: {}", error!(
error "failed to process latest message from notification queue: {}",
); error
);
}
} }
} else {
for input in config.inputs.iter() {
input.init().await?;
input.sync_pull().await?;
}
let input_paths = config.inputs.iter().map(|i| &i.path).collect();
sync_and_merge(
config.clone(),
input_paths,
false,
config.preserve_existing_outputs,
)
.await?;
Ok(())
} }
} }
async fn process_message(config: Arc<Config>) -> Result<()> { async fn process_message(config: Arc<Config>, mut input_queue: QueueClient) -> Result<()> {
let hb_client = config.common.init_heartbeat().await?;
hb_client.alive();
let tmp_dir = "./tmp"; let tmp_dir = "./tmp";
verbose!("tmp dir reset"); verbose!("tmp dir reset");
utils::reset_tmp_dir(tmp_dir).await?; utils::reset_tmp_dir(tmp_dir).await?;
config.unique_inputs.sync_pull().await?;
let mut queue = QueueClient::new(config.input_queue.clone()); if let Some(msg) = input_queue.pop().await? {
if let Some(msg) = queue.pop().await? {
let input_url = match utils::parse_url_data(msg.data()) { let input_url = match utils::parse_url_data(msg.data()) {
Ok(url) => url, Ok(url) => url,
Err(err) => { Err(err) => {
@ -74,28 +86,11 @@ async fn process_message(config: Arc<Config>) -> Result<()> {
let input_path = utils::download_input(input_url.clone(), tmp_dir).await?; let input_path = utils::download_input(input_url.clone(), tmp_dir).await?;
info!("downloaded input to {}", input_path.display()); info!("downloaded input to {}", input_path.display());
sync_and_merge(config.clone(), vec![tmp_dir], true, true).await?;
info!("Merging corpus");
match merge(
&config.target_exe,
&config.target_options,
&config.target_env,
&config.unique_inputs.path,
&tmp_dir,
)
.await
{
Ok(result) if result.added_files_count > 0 => {
info!("Added {} new files to the corpus", result.added_files_count);
config.unique_inputs.sync_push().await?;
}
Ok(_) => info!("No new files added by the merge"),
Err(e) => error!("Merge failed : {}", e),
}
verbose!("will delete popped message with id = {}", msg.id()); verbose!("will delete popped message with id = {}", msg.id());
queue.delete(msg).await?; input_queue.delete(msg).await?;
verbose!( verbose!(
"Attempting to delete {} from the candidate container", "Attempting to delete {} from the candidate container",
@ -113,6 +108,48 @@ async fn process_message(config: Arc<Config>) -> Result<()> {
} }
} }
async fn sync_and_merge(
config: Arc<Config>,
input_dirs: Vec<impl AsRef<Path>>,
pull_inputs: bool,
preserve_existing_outputs: bool,
) -> Result<LibFuzzerMergeOutput> {
if pull_inputs {
config.unique_inputs.sync_pull().await?;
}
match merge_inputs(config.clone(), input_dirs).await {
Ok(result) => {
if result.added_files_count > 0 {
info!("Added {} new files to the corpus", result.added_files_count);
config
.unique_inputs
.sync(SyncOperation::Push, !preserve_existing_outputs)
.await?;
} else {
info!("No new files added by the merge")
}
Ok(result)
}
Err(e) => {
error!("Merge failed : {}", e);
Err(e)
}
}
}
pub async fn merge_inputs(
config: Arc<Config>,
candidates: Vec<impl AsRef<Path>>,
) -> Result<LibFuzzerMergeOutput> {
info!("Merging corpus");
let merger = LibFuzzer::new(
&config.target_exe,
&config.target_options,
&config.target_env,
);
merger.merge(&config.unique_inputs.path, &candidates).await
}
async fn try_delete_blob(input_url: Url) -> Result<()> { async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new(); let http_client = reqwest::Client::new();
match http_client match http_client
@ -126,15 +163,3 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
} }
} }
async fn merge(
target_exe: &Path,
target_options: &[String],
target_env: &HashMap<String, String>,
corpus_dir: &Path,
candidate_dir: impl AsRef<Path>,
) -> Result<LibFuzzerMergeOutput> {
let merger = LibFuzzer::new(target_exe, target_options, target_env);
let candidates = vec![candidate_dir];
merger.merge(&corpus_dir, &candidates).await
}

View File

@ -170,8 +170,10 @@ impl State<SettingUp> {
match output { match output {
Ok(Some(output)) => { Ok(Some(output)) => {
if !output.exit_status.success { if !output.exit_status.success {
let error = "error running target setup script".to_owned();
warn!("{}", error);
let cause = DoneCause::SetupError { let cause = DoneCause::SetupError {
error: "error running target setup script".to_owned(), error,
script_output: Some(output), script_output: Some(output),
}; };
let ctx = Done { cause }; let ctx = Done { cause };
@ -182,8 +184,10 @@ impl State<SettingUp> {
// No script was executed. // No script was executed.
} }
Err(err) => { Err(err) => {
let error = err.to_string();
warn!("{}", error);
let cause = DoneCause::SetupError { let cause = DoneCause::SetupError {
error: err.to_string(), error,
script_output: None, script_output: None,
}; };
let ctx = Done { cause }; let ctx = Done { cause };

View File

@ -47,7 +47,7 @@ impl SetupRunner {
// `azcopy sync` requires the local dir to exist. // `azcopy sync` requires the local dir to exist.
fs::create_dir_all(&setup_dir).await?; fs::create_dir_all(&setup_dir).await?;
az_copy::sync(setup_url.to_string(), &setup_dir).await?; az_copy::sync(setup_url.to_string(), &setup_dir, false).await?;
verbose!( verbose!(
"synced setup container from {} to {}", "synced setup container from {} to {}",

View File

@ -5,7 +5,7 @@ use anyhow::Result;
use std::ffi::OsStr; use std::ffi::OsStr;
use tokio::process::Command; use tokio::process::Command;
pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>) -> Result<()> { pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, delete_dst: bool) -> Result<()> {
use std::process::Stdio; use std::process::Stdio;
let mut cmd = Command::new("azcopy"); let mut cmd = Command::new("azcopy");
@ -17,6 +17,10 @@ pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>) -> Result<()>
.arg(&src) .arg(&src)
.arg(&dst); .arg(&dst);
if delete_dst {
cmd.arg("--delete-destination");
}
let output = cmd.spawn()?.wait_with_output().await?; let output = cmd.spawn()?.wait_with_output().await?;
if !output.status.success() { if !output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout); let stdout = String::from_utf8_lossy(&output.stdout);

View File

@ -30,20 +30,20 @@ pub struct SyncedDir {
} }
impl SyncedDir { impl SyncedDir {
pub async fn sync(&self, operation: SyncOperation) -> Result<()> { pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
let dir = &self.path; let dir = &self.path;
let url = self.url.url(); let url = self.url.url();
let url = url.as_ref(); let url = url.as_ref();
verbose!("syncing {:?} {}", operation, dir.display()); verbose!("syncing {:?} {}", operation, dir.display());
match operation { match operation {
SyncOperation::Push => az_copy::sync(dir, url).await, SyncOperation::Push => az_copy::sync(dir, url, delete_dst).await,
SyncOperation::Pull => az_copy::sync(url, dir).await, SyncOperation::Pull => az_copy::sync(url, dir, delete_dst).await,
} }
} }
pub async fn init_pull(&self) -> Result<()> { pub async fn init_pull(&self) -> Result<()> {
self.init().await?; self.init().await?;
self.sync(SyncOperation::Pull).await self.sync(SyncOperation::Pull, false).await
} }
pub async fn init(&self) -> Result<()> { pub async fn init(&self) -> Result<()> {
@ -60,11 +60,11 @@ impl SyncedDir {
} }
pub async fn sync_pull(&self) -> Result<()> { pub async fn sync_pull(&self) -> Result<()> {
self.sync(SyncOperation::Pull).await self.sync(SyncOperation::Pull, false).await
} }
pub async fn sync_push(&self) -> Result<()> { pub async fn sync_push(&self) -> Result<()> {
self.sync(SyncOperation::Push).await self.sync(SyncOperation::Push, false).await
} }
pub async fn continuous_sync( pub async fn continuous_sync(
@ -79,7 +79,7 @@ impl SyncedDir {
let delay = Duration::from_secs(delay_seconds); let delay = Duration::from_secs(delay_seconds);
loop { loop {
self.sync(operation).await?; self.sync(operation, false).await?;
delay_with_jitter(delay).await; delay_with_jitter(delay).await;
} }
} }
@ -146,7 +146,7 @@ pub async fn continuous_sync(
loop { loop {
for dir in dirs { for dir in dirs {
dir.sync(operation).await?; dir.sync(operation, false).await?;
} }
delay_with_jitter(delay).await; delay_with_jitter(delay).await;
} }

View File

@ -202,12 +202,12 @@ TASK_DEFINITIONS = {
), ),
ContainerDefinition( ContainerDefinition(
type=ContainerType.inputs, type=ContainerType.inputs,
compare=Compare.Equal, compare=Compare.AtLeast,
value=1, value=0,
permissions=[ContainerPermission.Create, ContainerPermission.List], permissions=[ContainerPermission.Read, ContainerPermission.List],
), ),
], ],
monitor_queue=ContainerType.inputs, monitor_queue=None,
), ),
TaskType.generic_supervisor: TaskDefinition( TaskType.generic_supervisor: TaskDefinition(
features=[ features=[
@ -263,6 +263,7 @@ TASK_DEFINITIONS = {
TaskFeature.supervisor_options, TaskFeature.supervisor_options,
TaskFeature.supervisor_input_marker, TaskFeature.supervisor_input_marker,
TaskFeature.stats_file, TaskFeature.stats_file,
TaskFeature.preserve_existing_outputs,
], ],
vm=VmDefinition(compare=Compare.AtLeast, value=1), vm=VmDefinition(compare=Compare.AtLeast, value=1),
containers=[ containers=[

View File

@ -800,6 +800,7 @@ class Tasks(Endpoint):
target_timeout: Optional[int] = None, target_timeout: Optional[int] = None,
target_workers: Optional[int] = None, target_workers: Optional[int] = None,
vm_count: int = 1, vm_count: int = 1,
preserve_existing_outputs: bool = False,
) -> models.Task: ) -> models.Task:
""" """
Create a task Create a task

View File

@ -191,6 +191,7 @@ class Libfuzzer(Command):
ContainerType.crashes, ContainerType.crashes,
ContainerType.reports, ContainerType.reports,
ContainerType.unique_reports, ContainerType.unique_reports,
ContainerType.unique_inputs,
ContainerType.no_repro, ContainerType.no_repro,
ContainerType.coverage, ContainerType.coverage,
ContainerType.unique_inputs, ContainerType.unique_inputs,
@ -233,3 +234,122 @@ class Libfuzzer(Command):
self.logger.info("done creating tasks") self.logger.info("done creating tasks")
helper.wait() helper.wait()
return helper.job return helper.job
def merge(
self,
project: str,
name: str,
build: str,
pool_name: str,
*,
target_exe: File = File("fuzz.exe"),
setup_dir: Optional[Directory] = None,
inputs: Optional[Directory] = None,
output_container: Optional[Container] = None,
reboot_after_setup: bool = False,
duration: int = 24,
target_options: Optional[List[str]] = None,
target_env: Optional[Dict[str, str]] = None,
check_retry_count: Optional[int] = None,
crash_report_timeout: Optional[int] = None,
tags: Optional[Dict[str, str]] = None,
wait_for_running: bool = False,
wait_for_files: Optional[List[ContainerType]] = None,
extra_files: Optional[List[File]] = None,
existing_inputs: Optional[List[Container]] = None,
dryrun: bool = False,
notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None,
preserve_existing_outputs: bool = False,
) -> Optional[Job]:
"""
libfuzzer merge task
"""
# verify containers exist
if existing_inputs:
for existing_container in existing_inputs:
self.onefuzz.containers.get(existing_container)
elif not inputs:
self.logger.error(
"please specify either an input folder or at least one existing inputs container"
)
return None
if dryrun:
return None
self.logger.info("creating libfuzzer merge from template")
self._check_is_libfuzzer(target_exe)
helper = JobHelper(
self.onefuzz,
self.logger,
project,
name,
build,
duration,
pool_name=pool_name,
target_exe=target_exe,
)
helper.add_tags(tags)
helper.define_containers(
ContainerType.setup,
)
if inputs:
helper.define_containers(ContainerType.inputs)
if output_container:
if self.onefuzz.containers.get(output_container):
helper.define_containers(ContainerType.unique_inputs)
helper.create_containers()
helper.setup_notifications(notification_config)
helper.upload_setup(setup_dir, target_exe, extra_files)
if inputs:
helper.upload_inputs(inputs)
helper.wait_on(wait_for_files, wait_for_running)
target_exe_blob_name = helper.target_exe_blob_name(target_exe, setup_dir)
merge_containers = [
(ContainerType.setup, helper.containers[ContainerType.setup]),
(
ContainerType.unique_inputs,
output_container or helper.containers[ContainerType.unique_inputs],
),
]
if inputs:
merge_containers.append(
(ContainerType.inputs, helper.containers[ContainerType.inputs])
)
if existing_inputs:
for existing_container in existing_inputs:
merge_containers.append((ContainerType.inputs, existing_container))
self.logger.info("creating libfuzzer_merge task")
self.onefuzz.tasks.create(
helper.job.job_id,
TaskType.libfuzzer_merge,
target_exe_blob_name,
merge_containers,
pool_name=pool_name,
duration=duration,
vm_count=1,
reboot_after_setup=reboot_after_setup,
target_options=target_options,
target_env=target_env,
tags=tags,
target_timeout=crash_report_timeout,
check_retry_count=check_retry_count,
debug=debug,
preserve_existing_outputs=preserve_existing_outputs,
)
self.logger.info("done creating tasks")
helper.wait()
return helper.job

View File

@ -74,6 +74,7 @@ class TaskFeature(Enum):
check_debugger = "check_debugger" check_debugger = "check_debugger"
check_retry_count = "check_retry_count" check_retry_count = "check_retry_count"
ensemble_sync_delay = "ensemble_sync_delay" ensemble_sync_delay = "ensemble_sync_delay"
preserve_existing_outputs = "preserve_existing_outputs"
# Permissions for an Azure Blob Storage Container. # Permissions for an Azure Blob Storage Container.

View File

@ -133,6 +133,7 @@ class TaskDetails(BaseModel):
reboot_after_setup: Optional[bool] reboot_after_setup: Optional[bool]
target_timeout: Optional[int] target_timeout: Optional[int]
ensemble_sync_delay: Optional[int] ensemble_sync_delay: Optional[int]
preserve_existing_outputs: Optional[bool]
@validator("check_retry_count", allow_reuse=True) @validator("check_retry_count", allow_reuse=True)
def validate_check_retry_count(cls, value: int) -> int: def validate_check_retry_count(cls, value: int) -> int: