From a563861487ef42f8b0667a005b1c17f9f62d97ba Mon Sep 17 00:00:00 2001 From: bmc-msft <41130664+bmc-msft@users.noreply.github.com> Date: Fri, 16 Oct 2020 17:04:46 -0400 Subject: [PATCH] fail louder when uploads fail (#166) --- .../onefuzz-agent/src/tasks/fuzz/generator.rs | 6 +- .../src/tasks/fuzz/libfuzzer_fuzz.rs | 58 +------------------ .../src/tasks/fuzz/supervisor.rs | 4 +- src/agent/onefuzz-agent/src/tasks/utils.rs | 45 +++++--------- src/agent/onefuzz/src/az_copy.rs | 8 ++- 5 files changed, 31 insertions(+), 90 deletions(-) diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs index 1c2c9ffcd..b48667073 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs @@ -8,7 +8,9 @@ use crate::tasks::{ }; use anyhow::{Error, Result}; use futures::stream::StreamExt; -use onefuzz::{expand::Expand, fs::set_executable, input_tester::Tester, sha256}; +use onefuzz::{ + expand::Expand, fs::set_executable, input_tester::Tester, sha256, telemetry::Event::new_result, +}; use serde::Deserialize; use std::collections::HashMap; use std::{ @@ -63,7 +65,7 @@ pub async fn spawn(config: Arc) -> Result<(), Error> { config.readonly_inputs.clone(), std::time::Duration::from_secs(10), ); - let crash_dir_monitor = utils::monitor_result_dir(config.crashes.clone()); + let crash_dir_monitor = utils::monitor_result_dir(config.crashes.clone(), new_result); let tester = Tester::new( &config.target_exe, &config.target_options, diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs index 33c78c5e9..19b2c4f3b 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs @@ -11,14 +11,12 @@ use futures::{future::try_join_all, stream::StreamExt}; use onefuzz::{ fs::list_files, libfuzzer::{LibFuzzer, LibFuzzerLine}, - monitor::DirectoryMonitor, process::ExitStatus, system, telemetry::{ Event::{new_coverage, new_result, process_stats, runtime_stats}, EventData, }, - uploader::BlobUploader, }; use serde::Deserialize; use std::{collections::HashMap, path::PathBuf}; @@ -79,8 +77,8 @@ impl LibFuzzerFuzzTask { // To be scheduled. let resync = self.resync_all_corpuses(); - let new_corpus = self.monitor_new_corpus(); - let faults = self.monitor_faults(); + let new_inputs = utils::monitor_result_dir(self.config.inputs.clone(), new_coverage); + let new_crashes = utils::monitor_result_dir(self.config.crashes.clone(), new_result); let (stats_sender, stats_receiver) = mpsc::unbounded_channel(); let report_stats = report_runtime_stats(workers as usize, stats_receiver, hb_client); @@ -91,7 +89,7 @@ impl LibFuzzerFuzzTask { let fuzzers = try_join_all(fuzzers); - futures::try_join!(resync, new_corpus, faults, fuzzers, report_stats)?; + futures::try_join!(resync, new_inputs, new_crashes, fuzzers, report_stats)?; Ok(()) } @@ -209,56 +207,6 @@ impl LibFuzzerFuzzTask { self.sync_all_corpuses().await?; } } - - async fn monitor_new_corpus(&self) -> Result<()> { - let url = self.config.inputs.url.url(); - let mut monitor = DirectoryMonitor::new(&self.config.inputs.path); - monitor.start()?; - - monitor - .for_each(move |item| { - let url = url.clone(); - - async move { - event!(new_coverage; EventData::Path = item.display().to_string()); - - let mut uploader = BlobUploader::new(url); - - if let Err(err) = uploader.upload(item.clone()).await { - error!("Couldn't upload coverage: {}", err); - } - } - }) - .await; - - Ok(()) - } - - async fn monitor_faults(&self) -> Result<()> { - let url = self.config.crashes.url.url(); - let dir = self.config.crashes.path.clone(); - - let mut monitor = DirectoryMonitor::new(dir); - monitor.start()?; - - monitor - .for_each(move |item| { - let url = url.clone(); - - async move { - event!(new_result; EventData::Path = item.display().to_string()); - - let mut uploader = BlobUploader::new(url); - - if let Err(err) = uploader.upload(item.clone()).await { - error!("Couldn't upload fault: {}", err); - } - } - }) - .await; - - Ok(()) - } } fn try_report_iter_update( diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs index d680cff7c..d8f6d4d54 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. - #![allow(clippy::too_many_arguments)] use crate::tasks::{ config::{CommonConfig, ContainerType, SyncedDir}, @@ -13,6 +12,7 @@ use anyhow::{Error, Result}; use onefuzz::{ expand::Expand, fs::{has_files, set_executable, OwnedDir}, + telemetry::Event::new_result, }; use serde::Deserialize; use std::{ @@ -64,7 +64,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { }; utils::init_dir(&crashes.path).await?; - let monitor_crashes = utils::monitor_result_dir(crashes.clone()); + let monitor_crashes = utils::monitor_result_dir(crashes.clone(), new_result); let inputs = SyncedDir { path: runtime_dir.path().join("inputs"), diff --git a/src/agent/onefuzz-agent/src/tasks/utils.rs b/src/agent/onefuzz-agent/src/tasks/utils.rs index b9ac75a3e..1948d82f4 100644 --- a/src/agent/onefuzz-agent/src/tasks/utils.rs +++ b/src/agent/onefuzz-agent/src/tasks/utils.rs @@ -6,11 +6,12 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use futures::{future::Future, stream::StreamExt}; +use futures::stream::StreamExt; use onefuzz::{ az_copy, monitor::DirectoryMonitor, - telemetry::{Event::new_result, EventData}, + telemetry::{Event, EventData}, + uploader::BlobUploader, }; use reqwest::Url; use tokio::{fs, io}; @@ -113,37 +114,23 @@ impl CheckNotify for tokio::sync::Notify { const DELAY: Duration = Duration::from_secs(10); -pub fn file_uploader_monitor(synced_dir: SyncedDir) -> Result { +async fn file_uploader_monitor(synced_dir: SyncedDir, event: Event) -> Result<()> { + let url = synced_dir.url.url(); + let dir = synced_dir.path.clone(); verbose!("monitoring {}", synced_dir.path.display()); - let dir = synced_dir.path; - let url = synced_dir.url; - - let mut monitor = DirectoryMonitor::new(&dir); + let mut monitor = DirectoryMonitor::new(dir); monitor.start()?; + let mut uploader = BlobUploader::new(url); - let monitor = monitor.for_each(move |item| { - verbose!("saw item = {}", item.display()); - - let url = url.clone(); - - async move { - event!(new_result; EventData::Path = item.display().to_string()); - - let mut uploader = onefuzz::uploader::BlobUploader::new(url.url()); - let result = uploader.upload(item.clone()).await; - - if let Err(err) = result { - error!("couldn't upload item = {}, error = {}", item.display(), err); - } else { - verbose!("uploaded item = {}", item.display()); - } + while let Some(item) = monitor.next().await { + event!(event.clone(); EventData::Path = item.display().to_string()); + if let Err(err) = uploader.upload(item.clone()).await { + bail!("Couldn't upload file: {}", err); } - }); + } - verbose!("done monitoring {}", dir.display()); - - Ok(monitor) + Ok(()) } /// Monitor a directory for results. @@ -156,7 +143,7 @@ pub fn file_uploader_monitor(synced_dir: SyncedDir) -> Result { /// The intent of this is to support use cases where we usually want a directory /// to be initialized, but a user-supplied binary, (such as AFL) logically owns /// a directory, and may reset it. -pub async fn monitor_result_dir(synced_dir: SyncedDir) -> Result<()> { +pub async fn monitor_result_dir(synced_dir: SyncedDir, event: Event) -> Result<()> { loop { verbose!("waiting to monitor {}", synced_dir.path.display()); @@ -169,7 +156,7 @@ pub async fn monitor_result_dir(synced_dir: SyncedDir) -> Result<()> { } verbose!("starting monitor for {}", synced_dir.path.display()); - file_uploader_monitor(synced_dir.clone())?.await; + file_uploader_monitor(synced_dir.clone(), event.clone()).await?; } } diff --git a/src/agent/onefuzz/src/az_copy.rs b/src/agent/onefuzz/src/az_copy.rs index f5b80a29e..fe996cdf3 100644 --- a/src/agent/onefuzz/src/az_copy.rs +++ b/src/agent/onefuzz/src/az_copy.rs @@ -19,11 +19,13 @@ pub async fn sync(src: impl AsRef, dst: impl AsRef) -> Result<()> let output = cmd.spawn()?.wait_with_output().await?; if !output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout); let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!( - "sync failed '{:?}' ==> '{:?}' : {}", + "sync failed src:{:?} dst:{:?} stdout:{:?} stderr:{:?}", src.as_ref(), dst.as_ref(), + stdout, stderr ); } @@ -49,11 +51,13 @@ pub async fn copy(src: impl AsRef, dst: impl AsRef, recursive: boo let output = cmd.spawn()?.wait_with_output().await?; if !output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout); let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!( - "copy failed '{:?}' ==> '{:?}' : {}", + "copy failed src:{:?} dst:{:?} stdout:{:?} stderr:{:?}", src.as_ref(), dst.as_ref(), + stdout, stderr ); }