fail louder when uploads fail (#166)

This commit is contained in:
bmc-msft
2020-10-16 17:04:46 -04:00
committed by GitHub
parent 8ff54396a6
commit a563861487
5 changed files with 31 additions and 90 deletions

View File

@ -8,7 +8,9 @@ use crate::tasks::{
};
use anyhow::{Error, Result};
use futures::stream::StreamExt;
use onefuzz::{expand::Expand, fs::set_executable, input_tester::Tester, sha256};
use onefuzz::{
expand::Expand, fs::set_executable, input_tester::Tester, sha256, telemetry::Event::new_result,
};
use serde::Deserialize;
use std::collections::HashMap;
use std::{
@ -63,7 +65,7 @@ pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> {
config.readonly_inputs.clone(),
std::time::Duration::from_secs(10),
);
let crash_dir_monitor = utils::monitor_result_dir(config.crashes.clone());
let crash_dir_monitor = utils::monitor_result_dir(config.crashes.clone(), new_result);
let tester = Tester::new(
&config.target_exe,
&config.target_options,

View File

@ -11,14 +11,12 @@ use futures::{future::try_join_all, stream::StreamExt};
use onefuzz::{
fs::list_files,
libfuzzer::{LibFuzzer, LibFuzzerLine},
monitor::DirectoryMonitor,
process::ExitStatus,
system,
telemetry::{
Event::{new_coverage, new_result, process_stats, runtime_stats},
EventData,
},
uploader::BlobUploader,
};
use serde::Deserialize;
use std::{collections::HashMap, path::PathBuf};
@ -79,8 +77,8 @@ impl LibFuzzerFuzzTask {
// To be scheduled.
let resync = self.resync_all_corpuses();
let new_corpus = self.monitor_new_corpus();
let faults = self.monitor_faults();
let new_inputs = utils::monitor_result_dir(self.config.inputs.clone(), new_coverage);
let new_crashes = utils::monitor_result_dir(self.config.crashes.clone(), new_result);
let (stats_sender, stats_receiver) = mpsc::unbounded_channel();
let report_stats = report_runtime_stats(workers as usize, stats_receiver, hb_client);
@ -91,7 +89,7 @@ impl LibFuzzerFuzzTask {
let fuzzers = try_join_all(fuzzers);
futures::try_join!(resync, new_corpus, faults, fuzzers, report_stats)?;
futures::try_join!(resync, new_inputs, new_crashes, fuzzers, report_stats)?;
Ok(())
}
@ -209,56 +207,6 @@ impl LibFuzzerFuzzTask {
self.sync_all_corpuses().await?;
}
}
async fn monitor_new_corpus(&self) -> Result<()> {
let url = self.config.inputs.url.url();
let mut monitor = DirectoryMonitor::new(&self.config.inputs.path);
monitor.start()?;
monitor
.for_each(move |item| {
let url = url.clone();
async move {
event!(new_coverage; EventData::Path = item.display().to_string());
let mut uploader = BlobUploader::new(url);
if let Err(err) = uploader.upload(item.clone()).await {
error!("Couldn't upload coverage: {}", err);
}
}
})
.await;
Ok(())
}
async fn monitor_faults(&self) -> Result<()> {
let url = self.config.crashes.url.url();
let dir = self.config.crashes.path.clone();
let mut monitor = DirectoryMonitor::new(dir);
monitor.start()?;
monitor
.for_each(move |item| {
let url = url.clone();
async move {
event!(new_result; EventData::Path = item.display().to_string());
let mut uploader = BlobUploader::new(url);
if let Err(err) = uploader.upload(item.clone()).await {
error!("Couldn't upload fault: {}", err);
}
}
})
.await;
Ok(())
}
}
fn try_report_iter_update(

View File

@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#![allow(clippy::too_many_arguments)]
use crate::tasks::{
config::{CommonConfig, ContainerType, SyncedDir},
@ -13,6 +12,7 @@ use anyhow::{Error, Result};
use onefuzz::{
expand::Expand,
fs::{has_files, set_executable, OwnedDir},
telemetry::Event::new_result,
};
use serde::Deserialize;
use std::{
@ -64,7 +64,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
};
utils::init_dir(&crashes.path).await?;
let monitor_crashes = utils::monitor_result_dir(crashes.clone());
let monitor_crashes = utils::monitor_result_dir(crashes.clone(), new_result);
let inputs = SyncedDir {
path: runtime_dir.path().join("inputs"),

View File

@ -6,11 +6,12 @@ use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use futures::{future::Future, stream::StreamExt};
use futures::stream::StreamExt;
use onefuzz::{
az_copy,
monitor::DirectoryMonitor,
telemetry::{Event::new_result, EventData},
telemetry::{Event, EventData},
uploader::BlobUploader,
};
use reqwest::Url;
use tokio::{fs, io};
@ -113,37 +114,23 @@ impl CheckNotify for tokio::sync::Notify {
const DELAY: Duration = Duration::from_secs(10);
pub fn file_uploader_monitor(synced_dir: SyncedDir) -> Result<impl Future> {
async fn file_uploader_monitor(synced_dir: SyncedDir, event: Event) -> Result<()> {
let url = synced_dir.url.url();
let dir = synced_dir.path.clone();
verbose!("monitoring {}", synced_dir.path.display());
let dir = synced_dir.path;
let url = synced_dir.url;
let mut monitor = DirectoryMonitor::new(&dir);
let mut monitor = DirectoryMonitor::new(dir);
monitor.start()?;
let mut uploader = BlobUploader::new(url);
let monitor = monitor.for_each(move |item| {
verbose!("saw item = {}", item.display());
let url = url.clone();
async move {
event!(new_result; EventData::Path = item.display().to_string());
let mut uploader = onefuzz::uploader::BlobUploader::new(url.url());
let result = uploader.upload(item.clone()).await;
if let Err(err) = result {
error!("couldn't upload item = {}, error = {}", item.display(), err);
} else {
verbose!("uploaded item = {}", item.display());
}
while let Some(item) = monitor.next().await {
event!(event.clone(); EventData::Path = item.display().to_string());
if let Err(err) = uploader.upload(item.clone()).await {
bail!("Couldn't upload file: {}", err);
}
});
}
verbose!("done monitoring {}", dir.display());
Ok(monitor)
Ok(())
}
/// Monitor a directory for results.
@ -156,7 +143,7 @@ pub fn file_uploader_monitor(synced_dir: SyncedDir) -> Result<impl Future> {
/// The intent of this is to support use cases where we usually want a directory
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
/// a directory, and may reset it.
pub async fn monitor_result_dir(synced_dir: SyncedDir) -> Result<()> {
pub async fn monitor_result_dir(synced_dir: SyncedDir, event: Event) -> Result<()> {
loop {
verbose!("waiting to monitor {}", synced_dir.path.display());
@ -169,7 +156,7 @@ pub async fn monitor_result_dir(synced_dir: SyncedDir) -> Result<()> {
}
verbose!("starting monitor for {}", synced_dir.path.display());
file_uploader_monitor(synced_dir.clone())?.await;
file_uploader_monitor(synced_dir.clone(), event.clone()).await?;
}
}