unify syncdir management & add jitter (#180)

This commit is contained in:
bmc-msft
2020-10-20 17:17:45 -04:00
committed by GitHub
parent 041c6ae130
commit 3dd0f136b8
25 changed files with 273 additions and 280 deletions

1
src/agent/Cargo.lock generated
View File

@ -1254,6 +1254,7 @@ dependencies = [
"pete",
"proc-maps",
"process_control",
"rand",
"regex",
"reqwest",
"ring",

View File

@ -2,13 +2,13 @@
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
config::CommonConfig,
report::generic::{Config, GenericReportProcessor},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::blob::BlobContainerUrl;
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{
collections::HashMap,
path::{Path, PathBuf},

View File

@ -2,13 +2,13 @@
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
config::CommonConfig,
coverage::libfuzzer_coverage::{Config, CoverageProcessor},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::blob::BlobContainerUrl;
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{
collections::HashMap,
path::{Path, PathBuf},

View File

@ -2,13 +2,13 @@
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
config::CommonConfig,
report::libfuzzer_report::{AsanProcessor, Config},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::blob::BlobContainerUrl;
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{
collections::HashMap,
path::{Path, PathBuf},

View File

@ -2,13 +2,13 @@
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
config::CommonConfig,
fuzz::libfuzzer_fuzz::{Config, LibFuzzerFuzzTask},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::blob::BlobContainerUrl;
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{collections::HashMap, path::PathBuf};
use tokio::runtime::Runtime;
use url::Url;

View File

@ -1,15 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
heartbeat::HeartbeatSender,
utils,
};
use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender};
use anyhow::Result;
use futures::stream::StreamExt;
use onefuzz::{az_copy, blob::url::BlobUrl};
use onefuzz::{expand::Expand, fs::set_executable, fs::OwnedDir};
use onefuzz::{
expand::Expand, fs::set_executable, fs::OwnedDir, jitter::delay_with_jitter, syncdir::SyncedDir,
};
use reqwest::Url;
use serde::Deserialize;
use std::{
@ -43,9 +41,9 @@ pub async fn spawn(config: Config) -> Result<()> {
let tmp = OwnedDir::new(tmp_dir);
tmp.reset().await?;
utils::init_dir(&config.analysis.path).await?;
utils::init_dir(&config.tools.path).await?;
utils::sync_remote_dir(&config.tools, utils::SyncOperation::Pull).await?;
config.analysis.init().await?;
config.tools.init_pull().await?;
set_executable(&config.tools.path).await?;
run_existing(&config).await?;
poll_inputs(&config, tmp).await?;
@ -54,16 +52,14 @@ pub async fn spawn(config: Config) -> Result<()> {
async fn run_existing(config: &Config) -> Result<()> {
if let Some(crashes) = &config.crashes {
utils::init_dir(&crashes.path).await?;
utils::sync_remote_dir(&crashes, utils::SyncOperation::Pull).await?;
crashes.init_pull().await?;
let mut read_dir = fs::read_dir(&crashes.path).await?;
while let Some(file) = read_dir.next().await {
verbose!("Processing file {:?}", file);
let file = file?;
run_tool(file.path(), &config).await?;
}
utils::sync_remote_dir(&config.analysis, utils::SyncOperation::Push).await?;
config.analysis.sync_push().await?;
}
Ok(())
}
@ -103,12 +99,12 @@ async fn poll_inputs(config: &Config, tmp_dir: OwnedDir) -> Result<()> {
az_copy::copy(input_url.url().as_ref(), &destination_path, false).await?;
run_tool(destination_path, &config).await?;
utils::sync_remote_dir(&config.analysis, utils::SyncOperation::Push).await?;
config.analysis.sync_push().await?
}
input_queue.delete(message).await?;
} else {
warn!("no new candidate inputs found, sleeping");
tokio::time::delay_for(EMPTY_QUEUE_DELAY).await;
delay_with_jitter(EMPTY_QUEUE_DELAY).await;
}
}
}

View File

@ -5,13 +5,12 @@
use crate::tasks::{analysis, coverage, fuzz, heartbeat::*, merge, report};
use anyhow::Result;
use onefuzz::{
blob::BlobContainerUrl,
machine_id::{get_machine_id, get_scaleset_name},
telemetry::{self, Event::task_start, EventData},
};
use reqwest::Url;
use serde::{self, Deserialize};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;
use uuid::Uuid;
@ -150,9 +149,3 @@ impl Config {
}
}
}
#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct SyncedDir {
pub path: PathBuf,
pub url: BlobContainerUrl,
}

View File

@ -30,17 +30,15 @@
//!
//! Versions in parentheses have been tested.
use crate::tasks::config::SyncedDir;
use crate::tasks::coverage::{recorder::CoverageRecorder, total::TotalCoverage};
use crate::tasks::heartbeat::*;
use crate::tasks::utils::{init_dir, sync_remote_dir, SyncOperation};
use crate::tasks::{config::CommonConfig, generic::input_poller::*};
use anyhow::Result;
use async_trait::async_trait;
use futures::stream::StreamExt;
use onefuzz::fs::list_files;
use onefuzz::telemetry::Event::coverage_data;
use onefuzz::telemetry::EventData;
use onefuzz::{
fs::list_files, syncdir::SyncedDir, telemetry::Event::coverage_data, telemetry::EventData,
};
use reqwest::Url;
use serde::Deserialize;
use std::collections::HashMap;
@ -93,19 +91,7 @@ impl CoverageTask {
pub async fn run(&mut self) -> Result<()> {
info!("starting libFuzzer coverage task");
init_dir(&self.config.coverage.path).await?;
verbose!(
"initialized coverage dir, path = {}",
self.config.coverage.path.display()
);
sync_remote_dir(&self.config.coverage, SyncOperation::Pull).await?;
verbose!(
"synced coverage dir, path = {}",
self.config.coverage.path.display()
);
self.config.coverage.init_pull().await?;
self.process().await
}
@ -115,12 +101,11 @@ impl CoverageTask {
// Update the total with the coverage from each seed corpus.
for dir in &self.config.readonly_inputs {
verbose!("recording coverage for {}", dir.path.display());
init_dir(&dir.path).await?;
sync_remote_dir(&dir, SyncOperation::Pull).await?;
dir.init_pull().await?;
self.record_corpus_coverage(&mut processor, dir).await?;
fs::remove_dir_all(&dir.path).await?;
sync_remote_dir(&self.config.coverage, SyncOperation::Push).await?;
}
self.config.coverage.sync_push().await?;
info!(
"recorded coverage for {} containers in `readonly_inputs`",
@ -246,7 +231,7 @@ impl Processor for CoverageProcessor {
self.heartbeat_client.alive();
self.test_input(input).await?;
self.report_total().await?;
sync_remote_dir(&self.config.coverage, SyncOperation::Push).await?;
self.config.coverage.sync_push().await?;
Ok(())
}
}

View File

@ -1,15 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
heartbeat::*,
utils,
};
use crate::tasks::{config::CommonConfig, heartbeat::*, utils};
use anyhow::{Error, Result};
use futures::stream::StreamExt;
use onefuzz::{
expand::Expand, fs::set_executable, input_tester::Tester, sha256, telemetry::Event::new_result,
expand::Expand,
fs::set_executable,
input_tester::Tester,
sha256,
syncdir::{continuous_sync, SyncOperation::Pull, SyncedDir},
telemetry::Event::new_result,
};
use serde::Deserialize;
use std::collections::HashMap;
@ -50,22 +51,18 @@ pub struct GeneratorConfig {
}
pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> {
utils::init_dir(&config.crashes.path).await?;
utils::init_dir(&config.tools.path).await?;
utils::sync_remote_dir(&config.tools, utils::SyncOperation::Pull).await?;
config.crashes.init().await?;
config.tools.init_pull().await?;
set_executable(&config.tools.path).await?;
let hb_client = config.common.init_heartbeat().await?;
for sync_dir in &config.readonly_inputs {
utils::init_dir(&sync_dir.path).await?;
utils::sync_remote_dir(&sync_dir, utils::SyncOperation::Pull).await?;
for dir in &config.readonly_inputs {
dir.init_pull().await?;
}
let resync = resync_corpuses(
config.readonly_inputs.clone(),
std::time::Duration::from_secs(10),
);
let crash_dir_monitor = utils::monitor_result_dir(config.crashes.clone(), new_result);
let sync_task = continuous_sync(&config.readonly_inputs, Pull, None);
let crash_dir_monitor = config.crashes.monitor_results(new_result);
let tester = Tester::new(
&config.target_exe,
&config.target_options,
@ -78,7 +75,7 @@ pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> {
);
let inputs: Vec<_> = config.readonly_inputs.iter().map(|x| &x.path).collect();
let fuzzing_monitor = start_fuzzing(&config, inputs, tester, hb_client);
futures::try_join!(fuzzing_monitor, resync, crash_dir_monitor)?;
futures::try_join!(fuzzing_monitor, sync_task, crash_dir_monitor)?;
Ok(())
}
@ -183,17 +180,6 @@ async fn start_fuzzing<'a>(
}
}
pub async fn resync_corpuses(dirs: Vec<SyncedDir>, delay: std::time::Duration) -> Result<()> {
loop {
for sync_dir in &dirs {
utils::sync_remote_dir(sync_dir, utils::SyncOperation::Pull)
.await
.ok();
}
tokio::time::delay_for(delay).await;
}
}
mod tests {
#[tokio::test]
#[cfg(target_os = "linux")]

View File

@ -1,17 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
heartbeat::HeartbeatSender,
utils,
};
use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender};
use anyhow::Result;
use futures::{future::try_join_all, stream::StreamExt};
use onefuzz::{
fs::list_files,
libfuzzer::{LibFuzzer, LibFuzzerLine},
process::ExitStatus,
syncdir::{continuous_sync, SyncOperation::Pull, SyncedDir},
system,
telemetry::{
Event::{new_coverage, new_result, process_stats, runtime_stats},
@ -30,9 +27,6 @@ use tokio::{
};
use uuid::Uuid;
// Time between resync of all corpus container directories.
const RESYNC_PERIOD: Duration = Duration::from_secs(30);
// Delay to allow for observation of CPU usage when reporting proc info.
const PROC_INFO_COLLECTION_DELAY: Duration = Duration::from_secs(1);
@ -72,13 +66,12 @@ impl LibFuzzerFuzzTask {
});
self.init_directories().await?;
self.sync_all_corpuses().await?;
let hb_client = self.config.common.init_heartbeat().await?;
// To be scheduled.
let resync = self.resync_all_corpuses();
let new_inputs = utils::monitor_result_dir(self.config.inputs.clone(), new_coverage);
let new_crashes = utils::monitor_result_dir(self.config.crashes.clone(), new_result);
let resync = self.continuous_sync_inputs();
let new_inputs = self.config.inputs.monitor_results(new_coverage);
let new_crashes = self.config.crashes.monitor_results(new_result);
let (stats_sender, stats_receiver) = mpsc::unbounded_channel();
let report_stats = report_runtime_stats(workers as usize, stats_receiver, hb_client);
@ -178,34 +171,23 @@ impl LibFuzzerFuzzTask {
}
async fn init_directories(&self) -> Result<()> {
utils::init_dir(&self.config.inputs.path).await?;
utils::init_dir(&self.config.crashes.path).await?;
self.config.inputs.init().await?;
self.config.crashes.init().await?;
if let Some(readonly_inputs) = &self.config.readonly_inputs {
for dir in readonly_inputs {
utils::init_dir(&dir.path).await?;
dir.init().await?;
}
}
Ok(())
}
async fn sync_all_corpuses(&self) -> Result<()> {
utils::sync_remote_dir(&self.config.inputs, utils::SyncOperation::Pull).await?;
if let Some(readonly_inputs) = &self.config.readonly_inputs {
for corpus in readonly_inputs {
utils::sync_remote_dir(corpus, utils::SyncOperation::Pull).await?;
}
}
Ok(())
}
async fn resync_all_corpuses(&self) -> Result<()> {
loop {
time::delay_for(RESYNC_PERIOD).await;
self.sync_all_corpuses().await?;
async fn continuous_sync_inputs(&self) -> Result<()> {
let mut dirs = vec![self.config.inputs.clone()];
if let Some(inputs) = &self.config.readonly_inputs {
let inputs = inputs.clone();
dirs.extend(inputs);
}
continuous_sync(&dirs, Pull, None).await
}
}

View File

@ -3,15 +3,17 @@
#![allow(clippy::too_many_arguments)]
use crate::tasks::{
config::{CommonConfig, ContainerType, SyncedDir},
config::{CommonConfig, ContainerType},
heartbeat::*,
stats::common::{monitor_stats, StatsFormat},
utils::{self, CheckNotify},
utils::CheckNotify,
};
use anyhow::{Error, Result};
use onefuzz::{
expand::Expand,
fs::{has_files, set_executable, OwnedDir},
jitter::delay_with_jitter,
syncdir::{SyncOperation::Pull, SyncedDir},
telemetry::Event::new_result,
};
use serde::Deserialize;
@ -50,8 +52,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
let runtime_dir = OwnedDir::new(config.common.task_id.to_string());
runtime_dir.create_if_missing().await?;
utils::init_dir(&config.tools.path).await?;
utils::sync_remote_dir(&config.tools, utils::SyncOperation::Pull).await?;
config.tools.init_pull().await?;
set_executable(&config.tools.path).await?;
let supervisor_path = Expand::new()
@ -63,17 +64,14 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
url: config.crashes.url.clone(),
};
utils::init_dir(&crashes.path).await?;
let monitor_crashes = utils::monitor_result_dir(crashes.clone(), new_result);
crashes.init().await?;
let monitor_crashes = crashes.monitor_results(new_result);
let inputs = SyncedDir {
path: runtime_dir.path().join("inputs"),
url: config.inputs.url.clone(),
};
utils::init_dir(&inputs.path).await?;
verbose!("initialized {}", inputs.path.display());
let sync_inputs = resync_corpus(inputs.clone());
inputs.init().await?;
if let Some(context) = &config.wait_for_files {
let dir = match context {
@ -82,14 +80,16 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
let delay = std::time::Duration::from_secs(10);
loop {
utils::sync_remote_dir(dir, utils::SyncOperation::Pull).await?;
dir.sync_pull().await?;
if has_files(&dir.path).await? {
break;
}
tokio::time::delay_for(delay).await;
delay_with_jitter(delay).await;
}
}
let continuous_sync_task = inputs.continuous_sync(Pull, None);
let process = start_supervisor(
&runtime_dir.path(),
&supervisor_path,
@ -127,7 +127,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
monitor_process,
monitor_stats,
monitor_crashes,
sync_inputs,
continuous_sync_task,
)?;
Ok(())
@ -207,20 +207,6 @@ async fn start_supervisor(
Ok(child)
}
pub async fn resync_corpus(sync_dir: SyncedDir) -> Result<()> {
let delay = std::time::Duration::from_secs(10);
loop {
let result = utils::sync_remote_dir(&sync_dir, utils::SyncOperation::Pull).await;
if result.is_err() {
warn!("error syncing dir: {:?}", sync_dir);
}
tokio::time::delay_for(delay).await;
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -5,15 +5,9 @@ use std::{fmt, path::PathBuf};
use anyhow::Result;
use futures::stream::StreamExt;
use onefuzz::blob::BlobUrl;
use onefuzz::fs::OwnedDir;
use onefuzz::{blob::BlobUrl, fs::OwnedDir, jitter::delay_with_jitter, syncdir::SyncedDir};
use reqwest::Url;
use tokio::{
fs,
time::{self, Duration},
};
use crate::tasks::{config::SyncedDir, utils};
use tokio::{fs, time::Duration};
mod callback;
pub use callback::*;
@ -115,8 +109,8 @@ impl<M> InputPoller<M> {
to_process: &SyncedDir,
) -> Result<()> {
self.batch_dir = Some(to_process.clone());
utils::init_dir(&to_process.path).await?;
utils::sync_remote_dir(&to_process, utils::SyncOperation::Pull).await?;
to_process.init_pull().await?;
let mut read_dir = fs::read_dir(&to_process.path).await?;
while let Some(file) = read_dir.next().await {
verbose!("Processing batch-downloaded input {:?}", file);
@ -178,7 +172,7 @@ impl<M> InputPoller<M> {
match self.state() {
State::Polled(None) => {
verbose!("Input queue empty, sleeping");
time::delay_for(POLL_INTERVAL).await;
delay_with_jitter(POLL_INTERVAL).await;
}
State::Downloaded(_msg, _url, input) => {
info!("Processing downloaded input: {:?}", input);

View File

@ -1,13 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
heartbeat::HeartbeatSender,
utils,
};
use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender, utils};
use anyhow::Result;
use onefuzz::{expand::Expand, fs::set_executable, http::ResponseExt};
use onefuzz::{
expand::Expand, fs::set_executable, http::ResponseExt, jitter::delay_with_jitter,
syncdir::SyncedDir,
};
use reqwest::Url;
use serde::Deserialize;
use std::{
@ -45,18 +44,17 @@ pub struct Config {
}
pub async fn spawn(config: Arc<Config>) -> Result<()> {
utils::init_dir(&config.tools.path).await?;
utils::sync_remote_dir(&config.tools, utils::SyncOperation::Pull).await?;
config.tools.init_pull().await?;
set_executable(&config.tools.path).await?;
utils::init_dir(&config.unique_inputs.path).await?;
config.unique_inputs.init().await?;
let hb_client = config.common.init_heartbeat().await?;
loop {
hb_client.alive();
let tmp_dir = PathBuf::from("./tmp");
verbose!("tmp dir reset");
utils::reset_tmp_dir(&tmp_dir).await?;
utils::sync_remote_dir(&config.unique_inputs, utils::SyncOperation::Pull).await?;
config.unique_inputs.sync_pull().await?;
let mut queue = QueueClient::new(config.input_queue.clone());
if let Some(msg) = queue.pop().await? {
let input_url = match utils::parse_url_data(msg.data()) {
@ -88,7 +86,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
}
} else {
warn!("no new candidate inputs found, sleeping");
tokio::time::delay_for(EMPTY_QUEUE_DELAY).await;
delay_with_jitter(EMPTY_QUEUE_DELAY).await;
}
}
}
@ -108,7 +106,7 @@ async fn process_message(config: Arc<Config>, input_url: &Url, tmp_dir: &PathBuf
path: tmp_dir.clone(),
url: config.unique_inputs.url.clone(),
};
utils::sync_remote_dir(&synced_dir, utils::SyncOperation::Push).await?;
synced_dir.sync_push().await?
}
Err(e) => error!("Merge failed : {}", e),
}

View File

@ -1,15 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::{
config::{CommonConfig, SyncedDir},
heartbeat::*,
utils,
};
use crate::tasks::{config::CommonConfig, heartbeat::*, utils};
use anyhow::Result;
use onefuzz::{
http::ResponseExt,
jitter::delay_with_jitter,
libfuzzer::{LibFuzzer, LibFuzzerMergeOutput},
syncdir::SyncedDir,
};
use reqwest::Url;
use serde::Deserialize;
@ -42,7 +40,7 @@ pub struct Config {
pub async fn spawn(config: Arc<Config>) -> Result<()> {
let hb_client = config.common.init_heartbeat().await?;
utils::init_dir(&config.unique_inputs.path).await?;
config.unique_inputs.init().await?;
loop {
hb_client.alive();
if let Err(error) = process_message(config.clone()).await {
@ -60,7 +58,7 @@ async fn process_message(config: Arc<Config>) -> Result<()> {
verbose!("tmp dir reset");
utils::reset_tmp_dir(tmp_dir).await?;
utils::sync_remote_dir(&config.unique_inputs, utils::SyncOperation::Pull).await?;
config.unique_inputs.sync_pull().await?;
let mut queue = QueueClient::new(config.input_queue.clone());
@ -88,7 +86,7 @@ async fn process_message(config: Arc<Config>) -> Result<()> {
{
Ok(result) if result.added_files_count > 0 => {
info!("Added {} new files to the corpus", result.added_files_count);
utils::sync_remote_dir(&config.unique_inputs, utils::SyncOperation::Push).await?;
config.unique_inputs.sync_push().await?;
}
Ok(_) => info!("No new files added by the merge"),
Err(e) => error!("Merge failed : {}", e),
@ -109,7 +107,7 @@ async fn process_message(config: Arc<Config>) -> Result<()> {
Ok(())
} else {
warn!("no new candidate inputs found, sleeping");
tokio::time::delay_for(EMPTY_QUEUE_DELAY).await;
delay_with_jitter(EMPTY_QUEUE_DELAY).await;
Ok(())
}
}

View File

@ -1,11 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::tasks::config::SyncedDir;
use anyhow::Result;
use onefuzz::{
asan::AsanLog,
blob::{BlobClient, BlobContainerUrl, BlobUrl},
syncdir::SyncedDir,
};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

View File

@ -3,13 +3,13 @@
use super::crash_report::{CrashReport, CrashTestResult, InputBlob, NoCrash};
use crate::tasks::{
config::{CommonConfig, SyncedDir},
config::CommonConfig,
generic::input_poller::{CallbackImpl, InputPoller, Processor},
heartbeat::*,
};
use anyhow::Result;
use async_trait::async_trait;
use onefuzz::{blob::BlobUrl, input_tester::Tester, sha256};
use onefuzz::{blob::BlobUrl, input_tester::Tester, sha256, syncdir::SyncedDir};
use reqwest::Url;
use serde::Deserialize;
use std::{

View File

@ -2,14 +2,10 @@
// Licensed under the MIT License.
use super::crash_report::*;
use crate::tasks::{
config::{CommonConfig, SyncedDir},
generic::input_poller::*,
heartbeat::*,
};
use crate::tasks::{config::CommonConfig, generic::input_poller::*, heartbeat::*};
use anyhow::Result;
use async_trait::async_trait;
use onefuzz::{blob::BlobUrl, libfuzzer::LibFuzzer, sha256};
use onefuzz::{blob::BlobUrl, libfuzzer::LibFuzzer, sha256, syncdir::SyncedDir};
use reqwest::Url;
use serde::Deserialize;
use std::{

View File

@ -3,7 +3,10 @@
use super::afl;
use anyhow::{Error, Result};
use onefuzz::telemetry::{track_event, Event::runtime_stats};
use onefuzz::{
jitter::delay_with_jitter,
telemetry::{track_event, Event::runtime_stats},
};
use serde::Deserialize;
pub const STATS_DELAY: std::time::Duration = std::time::Duration::from_secs(30);
@ -22,7 +25,7 @@ pub async fn monitor_stats(path: Option<String>, format: Option<StatsFormat>) ->
if let Ok(stats) = stats {
track_event(runtime_stats, stats);
}
tokio::time::delay_for(STATS_DELAY).await;
delay_with_jitter(STATS_DELAY).await;
}
}
}

View File

@ -1,29 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use futures::stream::StreamExt;
use onefuzz::{
az_copy,
monitor::DirectoryMonitor,
telemetry::{Event, EventData},
uploader::BlobUploader,
};
use onefuzz::jitter::delay_with_jitter;
use reqwest::Url;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::{fs, io};
use crate::tasks::config::SyncedDir;
#[derive(Debug)]
pub enum SyncOperation {
Push,
Pull,
}
pub async fn download_input(input_url: Url, dst: impl AsRef<Path>) -> Result<PathBuf> {
let file_name = input_url.path_segments().unwrap().last().unwrap();
let file_path = dst.as_ref().join(file_name);
@ -63,32 +48,6 @@ pub async fn reset_tmp_dir(tmp_dir: impl AsRef<Path>) -> Result<()> {
Ok(())
}
pub async fn sync_remote_dir(sync_dir: &SyncedDir, sync_operation: SyncOperation) -> Result<()> {
let dir = &sync_dir.path;
let url = sync_dir.url.url();
let url = url.as_ref();
info!("syncing {:?} {:?}", sync_operation, sync_dir.path);
match sync_operation {
SyncOperation::Push => az_copy::sync(dir, url).await,
SyncOperation::Pull => az_copy::sync(url, dir).await,
}
}
pub async fn init_dir(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
match fs::metadata(path).await {
Ok(m) => {
if m.is_dir() {
Ok(())
} else {
anyhow::bail!("File with name '{}' already exists", path.display());
}
}
Err(_) => fs::create_dir(path).await.map_err(|e| e.into()),
}
}
pub fn parse_url_data(data: &[u8]) -> Result<Url> {
let text = std::str::from_utf8(data)?;
let url = Url::parse(text)?;
@ -106,60 +65,12 @@ impl CheckNotify for tokio::sync::Notify {
async fn is_notified(&self, delay: Duration) -> bool {
let notify = self;
tokio::select! {
() = tokio::time::delay_for(delay) => false,
() = delay_with_jitter(delay) => false,
() = notify.notified() => true,
}
}
}
const DELAY: Duration = Duration::from_secs(10);
async fn file_uploader_monitor(synced_dir: SyncedDir, event: Event) -> Result<()> {
let url = synced_dir.url.url();
let dir = synced_dir.path.clone();
verbose!("monitoring {}", synced_dir.path.display());
let mut monitor = DirectoryMonitor::new(dir);
monitor.start()?;
let mut uploader = BlobUploader::new(url);
while let Some(item) = monitor.next().await {
event!(event.clone(); EventData::Path = item.display().to_string());
if let Err(err) = uploader.upload(item.clone()).await {
bail!("Couldn't upload file: {}", err);
}
}
Ok(())
}
/// Monitor a directory for results.
///
/// This function does not require the directory to exist before it is called.
/// If the directory is reset (unlinked and recreated), this function will stop
/// listening to the original filesystem node, and begin watching the new one
/// once it has been created.
///
/// The intent of this is to support use cases where we usually want a directory
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
/// a directory, and may reset it.
pub async fn monitor_result_dir(synced_dir: SyncedDir, event: Event) -> Result<()> {
loop {
verbose!("waiting to monitor {}", synced_dir.path.display());
while fs::metadata(&synced_dir.path).await.is_err() {
verbose!(
"dir {} not ready to monitor, delaying",
synced_dir.path.display()
);
tokio::time::delay_for(DELAY).await;
}
verbose!("starting monitor for {}", synced_dir.path.display());
file_uploader_monitor(synced_dir.clone(), event.clone()).await?;
}
}
pub fn parse_key_value(value: String) -> Result<(String, String)> {
let offset = value
.find('=')

View File

@ -2,7 +2,7 @@
// Licensed under the MIT License.
use anyhow::Result;
use onefuzz::http::ResponseExt;
use onefuzz::{http::ResponseExt, jitter::delay_with_jitter};
use reqwest::StatusCode;
use std::{
path::{Path, PathBuf},
@ -179,7 +179,7 @@ impl Registration {
err,
REGISTRATION_RETRY_PERIOD.as_secs()
);
tokio::time::delay_for(REGISTRATION_RETRY_PERIOD).await;
delay_with_jitter(REGISTRATION_RETRY_PERIOD).await;
}
Err(err) => return Err(err.into()),
}

View File

@ -24,6 +24,7 @@ sha2 = "0.9"
url = { version = "2.1.1", features = ["serde"] }
serde = "1.0"
serde_json = "1.0"
rand = "0.7.3"
serde_derive = "1.0"
sysinfo = "0.14"
tokio = { version = "0.2", features = ["full"] }

View File

@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use rand::prelude::*;
use std::time::Duration;
use tokio::time::delay_for;
pub fn jitter(value: Duration) -> Duration {
let random: u64 = thread_rng().gen_range(0, 10);
Duration::from_secs(random) + value
}
pub async fn delay_with_jitter(value: Duration) {
delay_for(jitter(value)).await
}

View File

@ -21,11 +21,13 @@ pub mod fs;
pub mod heartbeat;
pub mod http;
pub mod input_tester;
pub mod jitter;
pub mod libfuzzer;
pub mod machine_id;
pub mod monitor;
pub mod process;
pub mod sha256;
pub mod syncdir;
pub mod system;
pub mod utils;

View File

@ -0,0 +1,142 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::{
az_copy,
blob::BlobContainerUrl,
jitter::delay_with_jitter,
monitor::DirectoryMonitor,
telemetry::{Event, EventData},
uploader::BlobUploader,
};
use anyhow::Result;
use futures::stream::StreamExt;
use std::{path::PathBuf, str, time::Duration};
use tokio::fs;
#[derive(Debug, Clone, Copy)]
pub enum SyncOperation {
Push,
Pull,
}
const DELAY: Duration = Duration::from_secs(10);
const DEFAULT_CONTINUOUS_SYNC_DELAY: Duration = Duration::from_secs(60);
#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct SyncedDir {
pub path: PathBuf,
pub url: BlobContainerUrl,
}
impl SyncedDir {
pub async fn sync(&self, operation: SyncOperation) -> Result<()> {
let dir = &self.path;
let url = self.url.url();
let url = url.as_ref();
verbose!("syncing {:?} {}", operation, dir.display());
match operation {
SyncOperation::Push => az_copy::sync(dir, url).await,
SyncOperation::Pull => az_copy::sync(url, dir).await,
}
}
pub async fn init_pull(&self) -> Result<()> {
self.init().await?;
self.sync(SyncOperation::Pull).await
}
pub async fn init(&self) -> Result<()> {
match fs::metadata(&self.path).await {
Ok(m) => {
if m.is_dir() {
Ok(())
} else {
anyhow::bail!("File with name '{}' already exists", self.path.display());
}
}
Err(_) => fs::create_dir(&self.path).await.map_err(|e| e.into()),
}
}
pub async fn sync_pull(&self) -> Result<()> {
self.sync(SyncOperation::Pull).await
}
pub async fn sync_push(&self) -> Result<()> {
self.sync(SyncOperation::Push).await
}
pub async fn continuous_sync(
&self,
operation: SyncOperation,
delay: Option<Duration>,
) -> Result<()> {
let delay = delay.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY);
loop {
self.sync(operation).await?;
delay_with_jitter(delay).await;
}
}
async fn file_uploader_monitor(&self, event: Event) -> Result<()> {
let url = self.url.url();
verbose!("monitoring {}", self.path.display());
let mut monitor = DirectoryMonitor::new(self.path.clone());
monitor.start()?;
let mut uploader = BlobUploader::new(url);
while let Some(item) = monitor.next().await {
event!(event.clone(); EventData::Path = item.display().to_string());
if let Err(err) = uploader.upload(item.clone()).await {
bail!(
"Couldn't upload file. path:{} dir:{} err:{}",
item.display(),
self.path.display(),
err
);
}
}
Ok(())
}
/// Monitor a directory for results.
///
/// This function does not require the directory to exist before it is called.
/// If the directory is reset (unlinked and recreated), this function will stop
/// listening to the original filesystem node, and begin watching the new one
/// once it has been created.
///
/// The intent of this is to support use cases where we usually want a directory
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
/// a directory, and may reset it.
pub async fn monitor_results(&self, event: Event) -> Result<()> {
loop {
verbose!("waiting to monitor {}", self.path.display());
while fs::metadata(&self.path).await.is_err() {
verbose!("dir {} not ready to monitor, delaying", self.path.display());
delay_with_jitter(DELAY).await;
}
verbose!("starting monitor for {}", self.path.display());
self.file_uploader_monitor(event.clone()).await?;
}
}
}
pub async fn continuous_sync(
dirs: &[SyncedDir],
operation: SyncOperation,
delay: Option<Duration>,
) -> Result<()> {
let delay = delay.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY);
loop {
for dir in dirs {
dir.sync(operation).await?;
}
delay_with_jitter(delay).await;
}
}

View File

@ -1,3 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use crate::jitter::delay_with_jitter;
use async_trait::async_trait;
use std::time::Duration;
@ -11,7 +15,7 @@ impl CheckNotify for tokio::sync::Notify {
async fn is_notified(&self, delay: Duration) -> bool {
let notify = self;
tokio::select! {
() = tokio::time::delay_for(delay) => false,
() = delay_with_jitter(delay) => false,
() = notify.notified() => true,
}
}