Wiring up the UI to the events (#776)

## Summary of the Pull Request
- The UI now receives the telemetry events
- A new section for the coverage has been added 
- All synced dir are now monitored by the UI
- Gracefully exit from the UI 


depends on #663
This commit is contained in:
Cheick Keita
2021-04-09 10:05:48 -07:00
committed by GitHub
parent bf5f3d0ebb
commit 4086e7695e
19 changed files with 664 additions and 287 deletions

1
src/agent/Cargo.lock generated
View File

@ -1812,6 +1812,7 @@ version = "0.1.0"
dependencies = [
"appinsights",
"iced-x86",
"lazy_static",
"log",
"serde",
"tokio 0.2.25",

View File

@ -39,18 +39,20 @@ pub async fn run(args: clap::ArgMatches<'static>) -> Result<()> {
let event_sender = terminal.as_ref().map(|t| t.task_events.clone());
let command_run = tokio::spawn(async move {
match args.subcommand() {
(RADAMSA, Some(sub)) => radamsa::run(sub).await,
(RADAMSA, Some(sub)) => radamsa::run(sub, event_sender).await,
(LIBFUZZER, Some(sub)) => libfuzzer::run(sub, event_sender).await,
(LIBFUZZER_FUZZ, Some(sub)) => libfuzzer_fuzz::run(sub).await,
(LIBFUZZER_COVERAGE, Some(sub)) => libfuzzer_coverage::run(sub).await,
(LIBFUZZER_CRASH_REPORT, Some(sub)) => libfuzzer_crash_report::run(sub).await,
(LIBFUZZER_MERGE, Some(sub)) => libfuzzer_merge::run(sub).await,
(GENERIC_ANALYSIS, Some(sub)) => generic_analysis::run(sub).await,
(GENERIC_CRASH_REPORT, Some(sub)) => generic_crash_report::run(sub).await,
(GENERIC_GENERATOR, Some(sub)) => generic_generator::run(sub).await,
(GENERIC_TEST_INPUT, Some(sub)) => test_input::run(sub).await,
(LIBFUZZER_TEST_INPUT, Some(sub)) => libfuzzer_test_input::run(sub).await,
(LIBFUZZER_REGRESSION, Some(sub)) => libfuzzer_regression::run(sub).await,
(LIBFUZZER_FUZZ, Some(sub)) => libfuzzer_fuzz::run(sub, event_sender).await,
(LIBFUZZER_COVERAGE, Some(sub)) => libfuzzer_coverage::run(sub, event_sender).await,
(LIBFUZZER_CRASH_REPORT, Some(sub)) => {
libfuzzer_crash_report::run(sub, event_sender).await
}
(LIBFUZZER_MERGE, Some(sub)) => libfuzzer_merge::run(sub, event_sender).await,
(GENERIC_ANALYSIS, Some(sub)) => generic_analysis::run(sub, event_sender).await,
(GENERIC_CRASH_REPORT, Some(sub)) => generic_crash_report::run(sub, event_sender).await,
(GENERIC_GENERATOR, Some(sub)) => generic_generator::run(sub, event_sender).await,
(GENERIC_TEST_INPUT, Some(sub)) => test_input::run(sub, event_sender).await,
(LIBFUZZER_TEST_INPUT, Some(sub)) => libfuzzer_test_input::run(sub, event_sender).await,
(LIBFUZZER_REGRESSION, Some(sub)) => libfuzzer_regression::run(sub, event_sender).await,
_ => {
anyhow::bail!("missing subcommand\nUSAGE: {}", args.usage());
}

View File

@ -15,8 +15,7 @@ use std::{
path::{Path, PathBuf},
time::Duration,
};
use tokio::stream::StreamExt;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::delay_for};
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;
pub const SETUP_DIR: &str = "setup_dir";
@ -69,6 +68,7 @@ pub enum CmdType {
pub struct LocalContext {
pub job_path: PathBuf,
pub common_config: CommonConfig,
pub event_sender: Option<UnboundedSender<UiEvent>>,
}
pub fn get_hash_map(args: &clap::ArgMatches<'_>, name: &str) -> Result<HashMap<String, String>> {
@ -212,7 +212,11 @@ pub fn get_synced_dir(
// fuzzing tasks from generating random task id to using UUID::nil(). This
// enables making the one-shot crash report generation, which isn't really a task,
// consistent across multiple runs.
pub fn build_local_context(args: &ArgMatches<'_>, generate_task_id: bool) -> Result<LocalContext> {
pub fn build_local_context(
args: &ArgMatches<'_>,
generate_task_id: bool,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<LocalContext> {
let job_id = get_uuid("job_id", args).unwrap_or_else(|_| Uuid::nil());
let task_id = get_uuid("task_id", args).unwrap_or_else(|_| {
if generate_task_id {
@ -250,6 +254,7 @@ pub fn build_local_context(args: &ArgMatches<'_>, generate_task_id: bool) -> Res
Ok(LocalContext {
job_path,
common_config,
event_sender,
})
}
@ -317,48 +322,31 @@ pub async fn wait_for_dir(path: impl AsRef<Path>) -> Result<()> {
.await
}
pub fn spawn_file_count_monitor(
dir: PathBuf,
sender: UnboundedSender<UiEvent>,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
wait_for_dir(&dir).await?;
loop {
let mut rd = tokio::fs::read_dir(&dir).await?;
let mut count: usize = 0;
while let Some(Ok(entry)) = rd.next().await {
if entry.path().is_file() {
count += 1;
}
}
if sender
.send(UiEvent::FileCount {
dir: dir.clone(),
count,
})
.is_err()
{
return Ok(());
}
delay_for(Duration::from_secs(5)).await;
}
})
}
pub fn monitor_file_urls(
urls: &[Option<impl AsRef<Path>>],
event_sender: UnboundedSender<UiEvent>,
) -> Vec<JoinHandle<Result<()>>> {
urls.iter()
.filter_map(|x| x.as_ref())
.map(|path| spawn_file_count_monitor(path.as_ref().into(), event_sender.clone()))
.collect::<Vec<_>>()
}
#[derive(Debug)]
pub enum UiEvent {
FileCount { dir: PathBuf, count: usize },
MonitorDir(PathBuf),
}
pub trait SyncCountDirMonitor<T: Sized> {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<T>;
}
impl SyncCountDirMonitor<SyncedDir> for SyncedDir {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
if let (Some(event_sender), Some(p)) = (event_sender, self.url.as_file_path()) {
event_sender.send(UiEvent::MonitorDir(p))?;
}
Ok(self)
}
}
impl SyncCountDirMonitor<Option<SyncedDir>> for Option<SyncedDir> {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
if let Some(sd) = self {
let sd = sd.monitor_count(event_sender)?;
Ok(Some(sd))
} else {
Ok(self)
}
}
}

View File

@ -4,8 +4,9 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_exe, get_hash_map, get_synced_dir, CmdType,
ANALYSIS_DIR, ANALYZER_ENV, ANALYZER_EXE, ANALYZER_OPTIONS, CRASHES_DIR, NO_REPRO_DIR,
REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TOOLS_DIR, UNIQUE_REPORTS_DIR,
SyncCountDirMonitor, UiEvent, ANALYSIS_DIR, ANALYZER_ENV, ANALYZER_EXE, ANALYZER_OPTIONS,
CRASHES_DIR, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TOOLS_DIR,
UNIQUE_REPORTS_DIR,
},
tasks::{
analysis::generic::{run as run_analysis, Config},
@ -15,11 +16,13 @@ use crate::{
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_analysis_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_options = get_cmd_arg(CmdType::Target, args);
@ -27,18 +30,25 @@ pub fn build_analysis_config(
let analyzer_exe = value_t!(args, ANALYZER_EXE, String)?;
let analyzer_options = args.values_of_lossy(ANALYZER_OPTIONS).unwrap_or_default();
let analyzer_env = get_hash_map(args, ANALYZER_ENV)?;
let analysis = get_synced_dir(ANALYSIS_DIR, common.job_id, common.task_id, args)?;
let analysis = get_synced_dir(ANALYSIS_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let tools = get_synced_dir(TOOLS_DIR, common.job_id, common.task_id, args)?;
let crashes = if input_queue.is_none() {
get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args).ok()
get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?
} else {
None
};
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok();
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok();
let unique_reports =
get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args).ok();
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let unique_reports = get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let config = Config {
target_exe,
@ -59,9 +69,12 @@ pub fn build_analysis_config(
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_analysis_config(args, None, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_analysis_config(args, None, context.common_config.clone(), event_sender)?;
run_analysis(config).await
}

View File

@ -4,9 +4,9 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType,
CHECK_ASAN_LOG, CHECK_RETRY_COUNT, CRASHES_DIR, DISABLE_CHECK_DEBUGGER,
DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
SyncCountDirMonitor, UiEvent, CHECK_ASAN_LOG, CHECK_RETRY_COUNT, CRASHES_DIR,
DISABLE_CHECK_DEBUGGER, DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV,
TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
},
tasks::{
config::CommonConfig,
@ -16,11 +16,13 @@ use crate::{
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_report_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -31,16 +33,22 @@ pub fn build_report_config(
common.job_id,
common.task_id,
args,
)?);
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok();
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok();
)?)
.monitor_count(&event_sender)?;
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let unique_reports = Some(get_synced_dir(
UNIQUE_REPORTS_DIR,
common.job_id,
common.task_id,
args,
)?);
)?)
.monitor_count(&event_sender)?;
let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok();
@ -70,9 +78,12 @@ pub fn build_report_config(
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_report_config(args, None, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_report_config(args, None, context.common_config.clone(), event_sender)?;
ReportTask::new(config).managed_run().await
}

View File

@ -4,9 +4,10 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir,
get_synced_dirs, CmdType, CHECK_ASAN_LOG, CHECK_RETRY_COUNT, CRASHES_DIR,
DISABLE_CHECK_DEBUGGER, GENERATOR_ENV, GENERATOR_EXE, GENERATOR_OPTIONS, READONLY_INPUTS,
RENAME_OUTPUT, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, TOOLS_DIR,
get_synced_dirs, CmdType, SyncCountDirMonitor, UiEvent, CHECK_ASAN_LOG, CHECK_RETRY_COUNT,
CRASHES_DIR, DISABLE_CHECK_DEBUGGER, GENERATOR_ENV, GENERATOR_EXE, GENERATOR_OPTIONS,
READONLY_INPUTS, RENAME_OUTPUT, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT,
TOOLS_DIR,
},
tasks::{
config::CommonConfig,
@ -15,9 +16,15 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use tokio::sync::mpsc::UnboundedSender;
pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> Result<Config> {
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?;
pub fn build_fuzz_config(
args: &clap::ArgMatches<'_>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_options = get_cmd_arg(CmdType::Target, args);
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -25,7 +32,10 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R
let generator_exe = get_cmd_exe(CmdType::Generator, args)?;
let generator_options = get_cmd_arg(CmdType::Generator, args);
let generator_env = get_cmd_env(CmdType::Generator, args)?;
let readonly_inputs = get_synced_dirs(READONLY_INPUTS, common.job_id, common.task_id, args)?;
let readonly_inputs = get_synced_dirs(READONLY_INPUTS, common.job_id, common.task_id, args)?
.into_iter()
.map(|sd| sd.monitor_count(&event_sender))
.collect::<Result<Vec<_>>>()?;
let rename_output = args.is_present(RENAME_OUTPUT);
let check_asan_log = args.is_present(CHECK_ASAN_LOG);
@ -33,7 +43,9 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R
let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?;
let target_timeout = Some(value_t!(args, TARGET_TIMEOUT, u64)?);
let tools = get_synced_dir(TOOLS_DIR, common.job_id, common.task_id, args).ok();
let tools = get_synced_dir(TOOLS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let ensemble_sync_delay = None;
@ -59,9 +71,12 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_fuzz_config(args, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_fuzz_config(args, context.common_config.clone(), event_sender)?;
GeneratorTask::new(config).run().await
}

View File

@ -4,8 +4,8 @@
use crate::{
local::{
common::{
build_local_context, monitor_file_urls, wait_for_dir, DirectoryMonitorQueue,
ANALYZER_EXE, COVERAGE_DIR, REGRESSION_REPORTS_DIR, UNIQUE_REPORTS_DIR,
build_local_context, wait_for_dir, DirectoryMonitorQueue, UiEvent, ANALYZER_EXE,
COVERAGE_DIR, REGRESSION_REPORTS_DIR, UNIQUE_REPORTS_DIR,
},
generic_analysis::{build_analysis_config, build_shared_args as build_analysis_args},
libfuzzer_coverage::{build_coverage_config, build_shared_args as build_coverage_args},
@ -28,24 +28,12 @@ use std::collections::HashSet;
use tokio::{sync::mpsc::UnboundedSender, task::spawn};
use uuid::Uuid;
use super::common::UiEvent;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let mut task_handles = vec![];
let context = build_local_context(args, true)?;
let fuzz_config = build_fuzz_config(args, context.common_config.clone())?;
if let Some(event_sender) = event_sender.clone() {
task_handles.append(&mut monitor_file_urls(
&[
fuzz_config.crashes.url.as_file_path(),
fuzz_config.inputs.url.as_file_path(),
],
event_sender,
));
}
let context = build_local_context(args, true, event_sender.clone())?;
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
let crash_dir = fuzz_config
.crashes
.url
@ -72,26 +60,8 @@ pub async fn run(
task_id: Uuid::new_v4(),
..context.common_config.clone()
},
event_sender.clone(),
)?;
if let Some(event_sender) = event_sender.clone() {
task_handles.append(&mut monitor_file_urls(
&[
report_config
.no_repro
.clone()
.and_then(|u| u.url.as_file_path()),
report_config
.reports
.clone()
.and_then(|u| u.url.as_file_path()),
report_config
.unique_reports
.clone()
.and_then(|u| u.url.as_file_path()),
],
event_sender,
));
}
let mut report = ReportTask::new(report_config);
let report_task = spawn(async move { report.managed_run().await });
@ -111,23 +81,8 @@ pub async fn run(
task_id: Uuid::new_v4(),
..context.common_config.clone()
},
)?;
if let Some(event_sender) = event_sender {
task_handles.append(&mut monitor_file_urls(
&coverage_config
.readonly_inputs
.iter()
.cloned()
.map(|input| input.url.as_file_path())
.collect::<Vec<_>>(),
event_sender.clone(),
));
task_handles.append(&mut monitor_file_urls(
&[coverage_config.coverage.url.as_file_path()],
event_sender,
));
}
)?;
let mut coverage = CoverageTask::new(coverage_config);
let coverage_task = spawn(async move { coverage.managed_run().await });
@ -145,6 +100,7 @@ pub async fn run(
task_id: Uuid::new_v4(),
..context.common_config.clone()
},
event_sender.clone(),
)?;
let analysis_task = spawn(async move { run_analysis(analysis_config).await });
@ -159,6 +115,7 @@ pub async fn run(
task_id: Uuid::new_v4(),
..context.common_config.clone()
},
event_sender,
)?;
let regression = LibFuzzerRegressionTask::new(regression_config);
let regression_task = spawn(async move { regression.run().await });

View File

@ -15,29 +15,35 @@ use crate::{
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
use super::common::{SyncCountDirMonitor, UiEvent};
pub fn build_coverage_config(
args: &clap::ArgMatches<'_>,
local_job: bool,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let readonly_inputs = if local_job {
vec![get_synced_dir(
INPUTS_DIR,
common.job_id,
common.task_id,
args,
)?]
vec![
get_synced_dir(INPUTS_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?,
]
} else {
get_synced_dirs(READONLY_INPUTS, common.job_id, common.task_id, args)?
.into_iter()
.map(|sd| sd.monitor_count(&event_sender))
.collect::<Result<Vec<_>>>()?
};
let coverage = get_synced_dir(COVERAGE_DIR, common.job_id, common.task_id, args)?;
let coverage = get_synced_dir(COVERAGE_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP);
let config = Config {
@ -51,12 +57,22 @@ pub fn build_coverage_config(
common,
check_queue: false,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_coverage_config(args, false, None, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_coverage_config(
args,
false,
None,
context.common_config.clone(),
event_sender,
)?;
let mut task = CoverageTask::new(config);
task.managed_run().await

View File

@ -4,8 +4,9 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType,
CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, CRASHES_DIR, DISABLE_CHECK_QUEUE, NO_REPRO_DIR,
REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
SyncCountDirMonitor, UiEvent, CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, CRASHES_DIR,
DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
},
tasks::{
config::CommonConfig,
@ -15,23 +16,32 @@ use crate::{
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_report_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args).ok();
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok();
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok();
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let unique_reports =
get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args).ok();
let unique_reports = get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok();
@ -59,12 +69,16 @@ pub fn build_report_config(
unique_reports,
common,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_report_config(args, None, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_report_config(args, None, context.common_config.clone(), event_sender)?;
ReportTask::new(config).managed_run().await
}

View File

@ -4,8 +4,8 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType,
CHECK_FUZZER_HELP, CRASHES_DIR, INPUTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
TARGET_WORKERS,
SyncCountDirMonitor, UiEvent, CHECK_FUZZER_HELP, CRASHES_DIR, INPUTS_DIR, TARGET_ENV,
TARGET_EXE, TARGET_OPTIONS, TARGET_WORKERS,
},
tasks::{
config::CommonConfig,
@ -14,12 +14,19 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use tokio::sync::mpsc::UnboundedSender;
const DISABLE_EXPECT_CRASH_ON_FAILURE: &str = "disable_expect_crash_on_failure";
pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> Result<Config> {
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?;
let inputs = get_synced_dir(INPUTS_DIR, common.job_id, common.task_id, args)?;
pub fn build_fuzz_config(
args: &clap::ArgMatches<'_>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let inputs = get_synced_dir(INPUTS_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -49,9 +56,12 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_fuzz_config(args, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_fuzz_config(args, context.common_config.clone(), event_sender)?;
LibFuzzerFuzzTask::new(config)?.run().await
}

View File

@ -4,8 +4,9 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir,
get_synced_dirs, CmdType, ANALYSIS_INPUTS, ANALYSIS_UNIQUE_INPUTS, CHECK_FUZZER_HELP,
INPUTS_DIR, PRESERVE_EXISTING_OUTPUTS, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
get_synced_dirs, CmdType, SyncCountDirMonitor, UiEvent, ANALYSIS_INPUTS,
ANALYSIS_UNIQUE_INPUTS, CHECK_FUZZER_HELP, INPUTS_DIR, PRESERVE_EXISTING_OUTPUTS,
TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
},
tasks::{
config::CommonConfig,
@ -15,19 +16,25 @@ use crate::{
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_merge_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP);
let inputs = get_synced_dirs(ANALYSIS_INPUTS, common.job_id, common.task_id, args)?;
let inputs = get_synced_dirs(ANALYSIS_INPUTS, common.job_id, common.task_id, args)?
.into_iter()
.map(|sd| sd.monitor_count(&event_sender))
.collect::<Result<Vec<_>>>()?;
let unique_inputs =
get_synced_dir(ANALYSIS_UNIQUE_INPUTS, common.job_id, common.task_id, args)?;
get_synced_dir(ANALYSIS_UNIQUE_INPUTS, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let preserve_existing_outputs = value_t!(args, PRESERVE_EXISTING_OUTPUTS, bool)?;
let config = Config {
@ -41,12 +48,16 @@ pub fn build_merge_config(
unique_inputs,
preserve_existing_outputs,
};
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_merge_config(args, None, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_merge_config(args, None, context.common_config.clone(), event_sender)?;
spawn(std::sync::Arc::new(config)).await
}

View File

@ -4,9 +4,9 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType,
CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, COVERAGE_DIR, CRASHES_DIR, NO_REPRO_DIR,
REGRESSION_REPORTS_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
SyncCountDirMonitor, UiEvent, CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, COVERAGE_DIR,
CRASHES_DIR, NO_REPRO_DIR, REGRESSION_REPORTS_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE,
TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR,
},
tasks::{
config::CommonConfig,
@ -15,26 +15,35 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use tokio::sync::mpsc::UnboundedSender;
const REPORT_NAMES: &str = "report_names";
pub fn build_regression_config(
args: &clap::ArgMatches<'_>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
let target_options = get_cmd_arg(CmdType::Target, args);
let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok();
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?;
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let regression_reports =
get_synced_dir(REGRESSION_REPORTS_DIR, common.job_id, common.task_id, args)?;
get_synced_dir(REGRESSION_REPORTS_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?;
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok();
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok();
let unique_reports =
get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args).ok();
let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let unique_reports = get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args)
.ok()
.monitor_count(&event_sender)?;
let report_list = if args.is_present(REPORT_NAMES) {
Some(values_t!(args, REPORT_NAMES, String)?)
@ -64,9 +73,12 @@ pub fn build_regression_config(
Ok(config)
}
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let config = build_regression_config(args, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_regression_config(args, context.common_config.clone(), event_sender)?;
LibFuzzerRegressionTask::new(config).run().await
}

View File

@ -3,17 +3,21 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, CmdType, CHECK_RETRY_COUNT, TARGET_ENV,
TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT,
build_local_context, get_cmd_arg, get_cmd_env, CmdType, UiEvent, CHECK_RETRY_COUNT,
TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT,
},
tasks::report::libfuzzer_report::{test_input, TestInputArgs},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
use tokio::sync::mpsc::UnboundedSender;
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender)?;
let target_exe = value_t!(args, TARGET_EXE, PathBuf)?;
let target_env = get_cmd_env(CmdType::Target, args)?;

View File

@ -3,7 +3,7 @@
use crate::{
local::{
common::{build_local_context, DirectoryMonitorQueue},
common::{build_local_context, DirectoryMonitorQueue, UiEvent},
generic_crash_report::{build_report_config, build_shared_args as build_crash_args},
generic_generator::{build_fuzz_config, build_shared_args as build_fuzz_args},
},
@ -13,13 +13,16 @@ use anyhow::Result;
use clap::{App, SubCommand};
use onefuzz::utils::try_wait_all_join_handles;
use std::collections::HashSet;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::spawn;
use uuid::Uuid;
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, true)?;
let fuzz_config = build_fuzz_config(args, context.common_config.clone())?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
let crash_dir = fuzz_config
.crashes
.url
@ -37,6 +40,7 @@ pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
task_id: Uuid::new_v4(),
..context.common_config.clone()
},
event_sender,
)?;
let report_task = spawn(async move { ReportTask::new(report_config).managed_run().await });

View File

@ -3,17 +3,22 @@
use crate::{
local::common::{
build_local_context, get_cmd_arg, get_cmd_env, CmdType, CHECK_ASAN_LOG, CHECK_RETRY_COUNT,
DISABLE_CHECK_DEBUGGER, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT,
build_local_context, get_cmd_arg, get_cmd_env, CmdType, UiEvent, CHECK_ASAN_LOG,
CHECK_RETRY_COUNT, DISABLE_CHECK_DEBUGGER, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS,
TARGET_TIMEOUT,
},
tasks::report::generic::{test_input, TestInputArgs},
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use std::path::PathBuf;
use tokio::sync::mpsc::UnboundedSender;
pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let context = build_local_context(args, false)?;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
let context = build_local_context(args, false, event_sender)?;
let target_exe = value_t!(args, TARGET_EXE, PathBuf)?;
let target_env = get_cmd_env(CmdType::Target, args)?;

View File

@ -2,7 +2,7 @@
// Licensed under the MIT License.
use crate::local::common::UiEvent;
use anyhow::{Context, Result};
use anyhow::Result;
use crossterm::{
event::{self, Event, KeyCode},
execute,
@ -11,29 +11,37 @@ use crossterm::{
use futures::{StreamExt, TryStreamExt};
use log::Level;
use onefuzz::utils::try_wait_all_join_handles;
use onefuzz_telemetry::{self, EventData};
use std::{
collections::HashMap,
io::{self, Stdout, Write},
iter::once,
mem::{discriminant, Discriminant},
path::PathBuf,
thread::{self, JoinHandle},
time::Duration,
};
use tokio::{
sync::mpsc::{self, UnboundedReceiver},
time,
sync::{
broadcast::{self, TryRecvError},
mpsc::{self, UnboundedSender},
},
time::delay_for,
};
use tui::{
backend::CrosstermBackend,
layout::{Constraint, Corner, Direction, Layout},
layout::{Alignment, Constraint, Corner, Direction, Layout},
style::{Color, Modifier, Style},
text::{Span, Spans},
widgets::{Block, Borders},
widgets::{List, ListItem, ListState},
widgets::{Gauge, List, ListItem, ListState, Paragraph, Wrap},
Terminal,
};
use arraydeque::{ArrayDeque, Wrapping};
use super::common::wait_for_dir;
#[derive(Debug, thiserror::Error)]
enum UiLoopError {
#[error("program exiting")]
@ -57,6 +65,15 @@ impl From<std::io::Error> for UiLoopError {
/// Maximum number of log message to display, arbitrarily chosen
const LOGS_BUFFER_SIZE: usize = 100;
const TICK_RATE: Duration = Duration::from_millis(250);
const FILE_MONITOR_POLLING_PERIOD: Duration = Duration::from_secs(5);
const EVENT_POLLING_PERIOD: Duration = Duration::from_secs(1);
#[derive(Debug, Default)]
struct CoverageData {
covered: Option<u64>,
features: Option<u64>,
rate: Option<f64>,
}
/// Event driving the refresh of the UI
#[derive(Debug)]
@ -65,15 +82,19 @@ enum TerminalEvent {
Tick,
FileCount { dir: PathBuf, count: usize },
Quit,
MonitorDir(PathBuf),
Telemetry(Vec<EventData>),
}
struct UiLoopState {
pub logs: ArrayDeque<[(Level, String); LOGS_BUFFER_SIZE], Wrapping>,
pub file_count: HashMap<PathBuf, usize>,
pub file_count_state: ListState,
pub file_monitors: Vec<JoinHandle<Result<()>>>,
pub file_monitors: HashMap<PathBuf, tokio::task::JoinHandle<Result<()>>>,
pub log_event_receiver: mpsc::UnboundedReceiver<(Level, String)>,
pub terminal: Terminal<CrosstermBackend<Stdout>>,
pub cancellation_tx: broadcast::Sender<()>,
pub events: HashMap<Discriminant<EventData>, EventData>,
}
impl UiLoopState {
@ -81,6 +102,8 @@ impl UiLoopState {
terminal: Terminal<CrosstermBackend<Stdout>>,
log_event_receiver: mpsc::UnboundedReceiver<(Level, String)>,
) -> Self {
let (cancellation_tx, _) = broadcast::channel(1);
let events = HashMap::new();
Self {
log_event_receiver,
logs: Default::default(),
@ -88,6 +111,8 @@ impl UiLoopState {
file_count_state: Default::default(),
file_monitors: Default::default(),
terminal,
cancellation_tx,
events,
}
}
}
@ -130,40 +155,109 @@ impl TerminalUi {
.init();
let tick_event_tx_clone = self.ui_event_tx.clone();
let tick_event_handle =
tokio::spawn(async { Self::ticking(tick_event_tx_clone).await.context("ticking") });
let tick_event_handle = tokio::spawn(Self::ticking(
tick_event_tx_clone,
initial_state.cancellation_tx.subscribe(),
));
let keyboard_ui_event_tx = self.ui_event_tx.clone();
let _keyboard_event_handle = Self::read_keyboard_events(keyboard_ui_event_tx);
let _keyboard_event_handle = Self::read_keyboard_events(
keyboard_ui_event_tx,
initial_state.cancellation_tx.subscribe(),
);
let task_event_receiver = self.task_event_receiver;
let ui_event_tx = self.ui_event_tx.clone();
let external_event_handle =
tokio::spawn(Self::read_commands(ui_event_tx, task_event_receiver));
let ui_loop = tokio::spawn(Self::ui_loop(initial_state, self.ui_event_rx));
let external_event_handle = tokio::spawn(Self::read_commands(
ui_event_tx,
task_event_receiver,
initial_state.cancellation_tx.subscribe(),
));
let mut task_handles = vec![tick_event_handle, ui_loop, external_event_handle];
let mut task_handles = vec![tick_event_handle, external_event_handle];
let ui_event_tx = self.ui_event_tx.clone();
let telemetry = tokio::spawn(Self::listen_telemetry_event(
ui_event_tx,
initial_state.cancellation_tx.subscribe(),
));
task_handles.push(telemetry);
let ui_loop = tokio::spawn(Self::ui_loop(
initial_state,
self.ui_event_rx,
self.ui_event_tx.clone(),
));
task_handles.push(ui_loop);
if let Some(timeout) = timeout {
let ui_event_tx = self.ui_event_tx.clone();
let timeout_task = tokio::spawn(async move {
time::delay_for(timeout).await;
tokio::spawn(async move {
tokio::time::delay_for(timeout).await;
let _ = ui_event_tx.send(TerminalEvent::Quit);
Ok(())
});
task_handles.push(timeout_task);
}
try_wait_all_join_handles(task_handles)
.await
.context("ui_loop")?;
try_wait_all_join_handles(task_handles).await?;
Ok(())
}
async fn ticking(ui_event_tx: mpsc::UnboundedSender<TerminalEvent>) -> Result<()> {
fn filter_event(event: &EventData) -> bool {
!matches!(
event,
EventData::WorkerId(_)
| EventData::InstanceId(_)
| EventData::JobId(_)
| EventData::TaskId(_)
| EventData::ScalesetId(_)
| EventData::MachineId(_)
| EventData::Version(_)
| EventData::CommandLine(_)
| EventData::Type(_)
| EventData::Mode(_)
| EventData::Path(_)
| EventData::Count(_)
| EventData::ExecsSecond(_)
| EventData::Pid(_)
| EventData::RunId(_)
| EventData::Name(_)
| EventData::ToolName(_)
| EventData::ProcessStatus(_)
)
}
async fn listen_telemetry_event(
ui_event_tx: UnboundedSender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
let mut rx = onefuzz_telemetry::subscribe_to_events();
while cancellation_rx.try_recv() == Err(broadcast::TryRecvError::Empty) {
match rx.try_recv() {
Ok((_event, data)) => {
let data = data
.into_iter()
.filter(Self::filter_event)
.collect::<Vec<_>>();
let _ = ui_event_tx.send(TerminalEvent::Telemetry(data));
}
Err(TryRecvError::Empty) => delay_for(EVENT_POLLING_PERIOD).await,
Err(TryRecvError::Lagged(_)) => continue,
Err(TryRecvError::Closed) => break,
}
}
Ok(())
}
async fn ticking(
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
let mut interval = tokio::time::interval(TICK_RATE);
loop {
while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() {
interval.tick().await;
if let Err(_err) = ui_event_tx.send(TerminalEvent::Tick) {
break;
@ -174,34 +268,42 @@ impl TerminalUi {
fn read_keyboard_events(
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> JoinHandle<Result<()>> {
thread::spawn(move || loop {
if event::poll(Duration::from_secs(1))? {
thread::spawn(move || {
while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() {
if event::poll(EVENT_POLLING_PERIOD)? {
let event = event::read()?;
if let Err(_err) = ui_event_tx.send(TerminalEvent::Input(event)) {
return Ok(());
}
}
}
Ok(())
})
}
async fn read_commands(
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
mut external_event_rx: mpsc::UnboundedReceiver<UiEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
while let Some(UiEvent::FileCount { dir, count }) = external_event_rx.recv().await {
if ui_event_tx
.send(TerminalEvent::FileCount { dir, count })
.is_err()
{
while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() {
match external_event_rx.try_recv() {
Ok(UiEvent::MonitorDir(dir)) => {
if ui_event_tx.send(TerminalEvent::MonitorDir(dir)).is_err() {
break;
}
}
Err(mpsc::error::TryRecvError::Empty) => delay_for(EVENT_POLLING_PERIOD).await,
Err(mpsc::error::TryRecvError::Closed) => break,
}
}
Ok(())
}
fn take_available_logs<T>(
receiver: &mut UnboundedReceiver<T>,
receiver: &mut mpsc::UnboundedReceiver<T>,
size: usize,
buffer: &mut ArrayDeque<[T; LOGS_BUFFER_SIZE], Wrapping>,
) {
@ -215,46 +317,100 @@ impl TerminalUi {
}
}
async fn refresh_ui(ui_state: UiLoopState) -> Result<UiLoopState, UiLoopError> {
let mut logs = ui_state.logs;
let mut file_count_state = ui_state.file_count_state;
let file_count = ui_state.file_count;
let mut log_event_receiver = ui_state.log_event_receiver;
let mut terminal = ui_state.terminal;
fn create_coverage_gauge<'a>(rate: f64) -> Gauge<'a> {
let label = format!("coverage {:.2}%", rate * 100.0);
Gauge::default()
.gauge_style(
Style::default()
.fg(Color::White)
.bg(Color::Black)
.add_modifier(Modifier::ITALIC | Modifier::BOLD),
)
.label(label)
.ratio(rate)
}
Self::take_available_logs(&mut log_event_receiver, 10, &mut logs);
terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(25), Constraint::Percentage(75)].as_ref())
.split(f.size());
fn create_stats_paragraph(
events: &HashMap<Discriminant<EventData>, EventData>,
) -> Paragraph<'_> {
let mut event_values = events.values().map(|v| v.as_values()).collect::<Vec<_>>();
event_values.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut stats_spans = once(Span::styled(
"Stats: ",
Style::default().add_modifier(Modifier::BOLD),
))
.chain(
event_values
.into_iter()
.map(|(name, value)| {
vec![
Span::raw(name),
Span::raw(" "),
Span::styled(value, Style::default().add_modifier(Modifier::BOLD)),
Span::raw(", "),
]
})
.flatten(),
)
.collect::<Vec<_>>();
if stats_spans.len() > 1 {
// removing the last ","
stats_spans.pop();
}
Paragraph::new(Spans::from(stats_spans))
.style(Style::default())
.alignment(Alignment::Left)
.wrap(Wrap { trim: true })
}
fn create_file_count_paragraph(file_count: &HashMap<PathBuf, usize>) -> Paragraph<'_> {
let mut sorted_file_count = file_count.iter().collect::<Vec<_>>();
sorted_file_count.sort_by(|(p1, _), (p2, _)| p1.cmp(p2));
let files = sorted_file_count
let mut files_spans = once(Span::styled(
"Files: ",
Style::default().add_modifier(Modifier::BOLD),
))
.chain(
sorted_file_count
.iter()
.map(|(path, count)| {
ListItem::new(Spans::from(vec![
vec![
Span::raw(
path.file_name()
.map(|f| f.to_string_lossy())
.unwrap_or_default(),
),
Span::raw(": "),
Span::raw(format!("{}", count)),
]))
Span::raw(" "),
Span::styled(
format!("{}", count),
Style::default().add_modifier(Modifier::BOLD),
),
Span::raw(", "),
]
})
.flatten(),
)
.collect::<Vec<_>>();
let log_list = List::new(files)
.block(Block::default().borders(Borders::ALL).title("files"))
.highlight_style(Style::default().add_modifier(Modifier::BOLD))
.start_corner(Corner::TopLeft);
if files_spans.len() > 1 {
files_spans.pop();
} // removing the last ","
f.render_stateful_widget(log_list, chunks[0], &mut file_count_state);
Paragraph::new(Spans::from(files_spans))
.style(Style::default())
.alignment(Alignment::Left)
.wrap(Wrap { trim: true })
}
fn create_log_list(
logs: &ArrayDeque<[(Level, String); LOGS_BUFFER_SIZE], Wrapping>,
) -> List<'_> {
let log_items = logs
.iter()
.map(|(level, log)| {
@ -274,18 +430,71 @@ impl TerminalUi {
})
.collect::<Vec<_>>();
let log_list = List::new(log_items)
.block(Block::default().borders(Borders::ALL).title("Logs"))
.start_corner(Corner::BottomLeft);
List::new(log_items)
.block(Block::default().borders(Borders::TOP).title("Logs"))
.start_corner(Corner::BottomLeft)
}
f.render_widget(log_list, chunks[1]);
async fn refresh_ui(ui_state: UiLoopState) -> Result<UiLoopState, UiLoopError> {
let mut logs = ui_state.logs;
let file_count = ui_state.file_count;
let mut log_event_receiver = ui_state.log_event_receiver;
let mut terminal = ui_state.terminal;
let rate = ui_state
.events
.get(&discriminant(&EventData::Rate(0.0)))
.and_then(|x| {
if let EventData::Rate(r) = x {
Some(*r)
} else {
None
}
});
let events = ui_state.events;
Self::take_available_logs(&mut log_event_receiver, 10, &mut logs);
terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(25), Constraint::Percentage(75)].as_ref())
.split(f.size());
let log_area = chunks[1];
let top_area = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref())
.split(chunks[0]);
let file_count_area = top_area[0];
if let Some(rate) = rate {
let coverage_area = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(25), Constraint::Percentage(75)].as_ref())
.split(top_area[1]);
let gauge = Self::create_coverage_gauge(rate);
f.render_widget(gauge, coverage_area[0]);
let stats_paragraph = Self::create_stats_paragraph(&events);
f.render_widget(stats_paragraph, coverage_area[1]);
} else {
let stats_paragraph = Self::create_stats_paragraph(&events);
f.render_widget(stats_paragraph, top_area[1]);
}
let file_count_paragraph = Self::create_file_count_paragraph(&file_count);
f.render_widget(file_count_paragraph, file_count_area);
let log_list = Self::create_log_list(&logs);
f.render_widget(log_list, log_area);
})?;
Ok(UiLoopState {
logs,
file_count_state,
file_count,
terminal,
log_event_receiver,
events,
..ui_state
})
}
@ -331,11 +540,16 @@ impl TerminalUi {
})
}
async fn on_quit(ui_state: UiLoopState) -> Result<UiLoopState, UiLoopError> {
async fn on_quit(
ui_state: UiLoopState,
cancellation_tx: broadcast::Sender<()>,
) -> Result<UiLoopState, UiLoopError> {
let _ = cancellation_tx.send(());
let mut terminal = ui_state.terminal;
disable_raw_mode().map_err(|e| anyhow!("{:?}", e))?;
execute!(terminal.backend_mut(), LeaveAlternateScreen).map_err(|e| anyhow!("{:?}", e))?;
terminal.show_cursor()?;
Err(UiLoopError::Exit)
}
@ -352,17 +566,38 @@ impl TerminalUi {
})
}
async fn on_monitor_dir(
ui_state: UiLoopState,
path: PathBuf,
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
cancellation_rx: broadcast::Receiver<()>,
) -> Result<UiLoopState, UiLoopError> {
let mut file_monitors = ui_state.file_monitors;
file_monitors.entry(path).or_insert_with_key(|path| {
Self::spawn_file_count_monitor(path.clone(), ui_event_tx, cancellation_rx)
});
Ok(UiLoopState {
file_monitors,
..ui_state
})
}
async fn ui_loop(
initial_state: UiLoopState,
ui_event_rx: mpsc::UnboundedReceiver<TerminalEvent>,
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
) -> Result<()> {
let loop_result = ui_event_rx
.map(Ok)
.try_fold(initial_state, |ui_state, event| async {
let ui_event_tx = ui_event_tx.clone();
let cancellation_tx = ui_state.cancellation_tx.clone();
match event {
TerminalEvent::Tick => Self::refresh_ui(ui_state).await,
TerminalEvent::Input(Event::Key(k)) => match k.code {
KeyCode::Char('q') => Self::on_quit(ui_state).await,
KeyCode::Char('q') => Self::on_quit(ui_state, cancellation_tx).await,
KeyCode::Down => Self::on_key_down(ui_state).await,
KeyCode::Up => Self::on_key_up(ui_state).await,
_ => Ok(ui_state),
@ -370,7 +605,24 @@ impl TerminalUi {
TerminalEvent::FileCount { dir, count } => {
Self::on_file_count(ui_state, dir, count).await
}
TerminalEvent::Quit => Self::on_quit(ui_state).await,
TerminalEvent::Quit => Self::on_quit(ui_state, cancellation_tx).await,
TerminalEvent::MonitorDir(path) => {
Self::on_monitor_dir(
ui_state,
path,
ui_event_tx,
cancellation_tx.subscribe(),
)
.await
}
TerminalEvent::Telemetry(event_data) => {
let mut events = ui_state.events;
for e in event_data {
events.insert(discriminant(&e), e);
}
Ok(UiLoopState { events, ..ui_state })
}
_ => Ok(ui_state),
}
})
@ -381,4 +633,37 @@ impl TerminalUi {
Err(UiLoopError::Anyhow(e)) => Err(e),
}
}
fn spawn_file_count_monitor(
dir: PathBuf,
sender: mpsc::UnboundedSender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(async move {
wait_for_dir(&dir).await?;
while cancellation_rx.try_recv() == Err(broadcast::TryRecvError::Empty) {
let mut rd = tokio::fs::read_dir(&dir).await?;
let mut count: usize = 0;
while let Some(Ok(entry)) = rd.next().await {
if entry.path().is_file() {
count += 1;
}
}
if sender
.send(TerminalEvent::FileCount {
dir: dir.clone(),
count,
})
.is_err()
{
break;
}
delay_for(FILE_MONITOR_POLLING_PERIOD).await;
}
Ok(())
})
}
}

View File

@ -250,7 +250,11 @@ impl LibFuzzerFuzzTask {
for file in &files {
if let Some(filename) = file.file_name() {
let dest = self.config.crashes.path.join(filename);
tokio::fs::rename(file, dest).await?;
if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await {
if !dest.exists() {
bail!(e)
}
}
}
}

View File

@ -17,7 +17,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
serde = { version = "1.0", features = ["derive"] }
z3-sys = { version = "0.6", optional = true}
iced-x86 = { version = "1.1", optional = true}
[dev-dependencies]
tokio = { version = "0.2" }
lazy_static = "1.4"

View File

@ -11,6 +11,9 @@ use uuid::Uuid;
use z3_sys::ErrorCode as Z3ErrorCode;
pub use appinsights::telemetry::SeverityLevel::{Critical, Error, Information, Verbose, Warning};
use tokio::sync::broadcast::{self, Receiver};
#[macro_use]
extern crate lazy_static;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(transparent)]
@ -338,6 +341,8 @@ mod global {
RwLock,
};
use tokio::sync::broadcast::Sender;
use super::*;
#[derive(Default)]
@ -350,6 +355,14 @@ mod global {
instance: None,
microsoft: None,
};
lazy_static! {
pub static ref EVENT_SOURCE: Sender<(Event, Vec<EventData>)> = {
let (telemetry_event_source, _) = broadcast::channel::<_>(100);
telemetry_event_source
};
}
const UNSET: usize = 0;
const SETTING: usize = 1;
const SET: usize = 2;
@ -501,6 +514,17 @@ pub fn format_events(events: &[EventData]) -> String {
.join(" ")
}
fn try_broadcast_event(event: &Event, properties: &[EventData]) -> bool {
// we ignore any send error here because they indicate that
// there are no receivers on the other end
let (event, properties) = (event.clone(), properties.to_vec());
global::EVENT_SOURCE.send((event, properties)).is_ok()
}
pub fn subscribe_to_events() -> Receiver<(Event, Vec<EventData>)> {
global::EVENT_SOURCE.subscribe()
}
pub fn track_event(event: &Event, properties: &[EventData]) {
use appinsights::telemetry::Telemetry;
@ -526,6 +550,7 @@ pub fn track_event(event: &Event, properties: &[EventData]) {
}
client.track(evt);
}
try_broadcast_event(event, properties);
}
pub fn to_log_level(level: &appinsights::telemetry::SeverityLevel) -> log::Level {