diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs index 8fe16e67c..b3bb86488 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs @@ -17,11 +17,12 @@ use onefuzz_telemetry::{ EventData, }; use serde::Deserialize; -use std::{collections::HashMap, path::PathBuf}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tempfile::{tempdir_in, TempDir}; use tokio::{ io::{AsyncBufReadExt, BufReader}, - sync::mpsc, + select, + sync::{mpsc, Notify}, task, time::{sleep, Duration}, }; @@ -212,11 +213,12 @@ impl LibFuzzerFuzzTask { ); let mut running = fuzzer.fuzz(crash_dir.path(), local_inputs, &inputs)?; let running_id = running.id(); - + let notify = Arc::new(Notify::new()); let sys_info = task::spawn(report_fuzzer_sys_info( worker_id, run_id, running_id.unwrap_or_default(), + notify.clone(), )); // Splitting borrow. @@ -227,7 +229,6 @@ impl LibFuzzerFuzzTask { let mut stderr = BufReader::new(stderr); let mut libfuzzer_output: ArrayDeque<[_; LOGS_BUFFER_SIZE], Wrapping> = ArrayDeque::new(); - loop { let mut buf = vec![]; let bytes_read = stderr.read_until(b'\n', &mut buf).await?; @@ -243,7 +244,10 @@ impl LibFuzzerFuzzTask { libfuzzer_output.push_back(line); } - let (exit_status, _) = tokio::join!(running.wait(), sys_info); + let exit_status = running.wait().await; + notify.notify_one(); + let _ = sys_info.await; + let exit_status: ExitStatus = exit_status?.into(); let files = list_files(crash_dir.path()).await?; @@ -323,11 +327,23 @@ fn try_report_iter_update( Ok(()) } -async fn report_fuzzer_sys_info(worker_id: usize, run_id: Uuid, fuzzer_pid: u32) -> Result<()> { +async fn report_fuzzer_sys_info( + worker_id: usize, + run_id: Uuid, + fuzzer_pid: u32, + cancellation: Arc, +) -> Result<()> { // Allow for sampling CPU usage. - sleep(PROC_INFO_COLLECTION_DELAY).await; - + let mut period = tokio::time::interval_at( + tokio::time::Instant::now() + PROC_INFO_COLLECTION_DELAY, + PROC_INFO_PERIOD, + ); loop { + select! { + () = cancellation.notified() => break, + _ = period.tick() => (), + } + // process doesn't exist if !system::refresh_process(fuzzer_pid)? { break; @@ -348,8 +364,6 @@ async fn report_fuzzer_sys_info(worker_id: usize, run_id: Uuid, fuzzer_pid: u32) // The process no longer exists. break; } - - sleep(PROC_INFO_PERIOD).await; } Ok(()) diff --git a/src/agent/onefuzz/src/libfuzzer.rs b/src/agent/onefuzz/src/libfuzzer.rs index 42e3fe6a6..80fb781b3 100644 --- a/src/agent/onefuzz/src/libfuzzer.rs +++ b/src/agent/onefuzz/src/libfuzzer.rs @@ -21,6 +21,13 @@ use tokio::process::{Child, Command}; const DEFAULT_MAX_TOTAL_SECONDS: i32 = 10 * 60; +use lazy_static::lazy_static; + +lazy_static! { + static ref LIBFUZZERLINEREGEX: regex::Regex = + regex::Regex::new(r"#(\d+)\s*(?:pulse|INITED|NEW|REDUCE).*exec/s: (\d+)").unwrap(); +} + #[derive(Debug)] pub struct LibFuzzerMergeOutput { pub added_files_count: i32, @@ -304,9 +311,7 @@ impl LibFuzzerLine { } pub fn parse(line: &str) -> Result> { - let re = regex::Regex::new(r"#(\d+)\s*(?:pulse|INITED|NEW|REDUCE).*exec/s: (\d+)")?; - - let caps = match re.captures(line) { + let caps = match LIBFUZZERLINEREGEX.captures(line) { Some(caps) => caps, None => return Ok(None), };