mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-17 20:38:06 +00:00
combine execs/sec and iteration counts from multiple workers (#786)
This commit is contained in:
@ -23,7 +23,7 @@ use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
sync::mpsc,
|
||||
task,
|
||||
time::{self, Duration},
|
||||
time::{delay_for, Duration},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
@ -96,7 +96,7 @@ impl LibFuzzerFuzzTask {
|
||||
let new_crashes = self.config.crashes.monitor_results(new_result, true);
|
||||
|
||||
let (stats_sender, stats_receiver) = mpsc::unbounded_channel();
|
||||
let report_stats = report_runtime_stats(self.workers(), stats_receiver, hb_client);
|
||||
let report_stats = report_runtime_stats(stats_receiver, hb_client);
|
||||
let fuzzers = self.run_fuzzers(Some(&stats_sender));
|
||||
futures::try_join!(resync, new_inputs, new_crashes, fuzzers, report_stats)?;
|
||||
|
||||
@ -311,7 +311,7 @@ fn try_report_iter_update(
|
||||
|
||||
async fn report_fuzzer_sys_info(worker_id: usize, run_id: Uuid, fuzzer_pid: u32) -> Result<()> {
|
||||
// Allow for sampling CPU usage.
|
||||
time::delay_for(PROC_INFO_COLLECTION_DELAY).await;
|
||||
delay_for(PROC_INFO_COLLECTION_DELAY).await;
|
||||
|
||||
loop {
|
||||
// process doesn't exist
|
||||
@ -335,13 +335,13 @@ async fn report_fuzzer_sys_info(worker_id: usize, run_id: Uuid, fuzzer_pid: u32)
|
||||
break;
|
||||
}
|
||||
|
||||
time::delay_for(PROC_INFO_PERIOD).await;
|
||||
delay_for(PROC_INFO_PERIOD).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RuntimeStats {
|
||||
worker_id: usize,
|
||||
run_id: Uuid,
|
||||
@ -349,12 +349,34 @@ pub struct RuntimeStats {
|
||||
execs_sec: f64,
|
||||
}
|
||||
|
||||
impl RuntimeStats {
|
||||
pub fn report(&self) {
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TotalStats {
|
||||
worker_stats: HashMap<usize, RuntimeStats>,
|
||||
count: u64,
|
||||
execs_sec: f64,
|
||||
}
|
||||
|
||||
impl TotalStats {
|
||||
fn update(&mut self, worker_data: RuntimeStats) {
|
||||
if let Some(current) = self.worker_stats.get(&worker_data.worker_id) {
|
||||
// if it's the same run, only add the differences
|
||||
if current.run_id == worker_data.run_id {
|
||||
self.count += worker_data.count.saturating_sub(current.count);
|
||||
} else {
|
||||
self.count += worker_data.count;
|
||||
}
|
||||
} else {
|
||||
self.count += worker_data.count;
|
||||
}
|
||||
|
||||
self.worker_stats.insert(worker_data.worker_id, worker_data);
|
||||
|
||||
self.execs_sec = self.worker_stats.values().map(|x| x.execs_sec).sum();
|
||||
}
|
||||
|
||||
fn report(&self) {
|
||||
event!(
|
||||
runtime_stats;
|
||||
EventData::WorkerId = self.worker_id,
|
||||
EventData::RunId = self.run_id,
|
||||
EventData::Count = self.count,
|
||||
EventData::ExecsSecond = self.execs_sec
|
||||
);
|
||||
@ -374,7 +396,7 @@ impl Timer {
|
||||
}
|
||||
|
||||
async fn wait(&self) {
|
||||
time::delay_for(self.interval).await;
|
||||
delay_for(self.interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
@ -386,7 +408,6 @@ impl Timer {
|
||||
// running worker runs, this can result in misleading gaps and binning artifacts. In effect, we
|
||||
// are approximating nearest-neighbor interpolation on the runtime stats time series.
|
||||
async fn report_runtime_stats(
|
||||
workers: usize,
|
||||
mut stats_channel: mpsc::UnboundedReceiver<RuntimeStats>,
|
||||
heartbeat_client: impl HeartbeatSender,
|
||||
) -> Result<()> {
|
||||
@ -394,8 +415,10 @@ async fn report_runtime_stats(
|
||||
//
|
||||
// When logging stats, the most recently reported runtime stats will be used for any
|
||||
// missing data. For time-triggered logging, it will be used for all workers.
|
||||
let mut last_reported: Vec<Option<RuntimeStats>> =
|
||||
std::iter::repeat(None).take(workers).collect();
|
||||
let mut total = TotalStats::default();
|
||||
|
||||
// report all zeros to start
|
||||
total.report();
|
||||
|
||||
let timer = Timer::new(RUNTIME_STATS_PERIOD);
|
||||
|
||||
@ -403,18 +426,80 @@ async fn report_runtime_stats(
|
||||
tokio::select! {
|
||||
Some(stats) = stats_channel.next() => {
|
||||
heartbeat_client.alive();
|
||||
stats.report();
|
||||
|
||||
let idx = stats.worker_id as usize;
|
||||
last_reported[idx] = Some(stats);
|
||||
total.update(stats);
|
||||
total.report()
|
||||
}
|
||||
_ = timer.wait() => {
|
||||
for stats in &last_reported {
|
||||
if let Some(stats) = stats {
|
||||
stats.report();
|
||||
}
|
||||
}
|
||||
total.report()
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{RuntimeStats, TotalStats};
|
||||
use anyhow::Result;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[test]
|
||||
fn test_total_stats() -> Result<()> {
|
||||
let mut total = TotalStats::default();
|
||||
let mut a = RuntimeStats {
|
||||
worker_id: 0,
|
||||
run_id: Uuid::new_v4(),
|
||||
count: 0,
|
||||
execs_sec: 0.0,
|
||||
};
|
||||
|
||||
total.update(a.clone());
|
||||
assert!(total.execs_sec == 0.0);
|
||||
assert!(total.count == 0);
|
||||
|
||||
// same run of existing worker, but counts & execs_sec increased.
|
||||
a.count += 10;
|
||||
a.execs_sec = 1.0;
|
||||
total.update(a.clone());
|
||||
assert!(total.execs_sec == 1.0);
|
||||
assert!(total.count == 10);
|
||||
|
||||
// same run of existing worker. counts and execs should stay the same.
|
||||
total.update(a.clone());
|
||||
assert!(total.count == 10);
|
||||
assert!(total.execs_sec == 1.0);
|
||||
|
||||
// new run of existing worker. counts should go up, execs_sec should
|
||||
// equal new worker value.
|
||||
a.run_id = Uuid::new_v4();
|
||||
a.execs_sec = 2.0;
|
||||
total.update(a.clone());
|
||||
assert!(total.count == 20);
|
||||
assert!(total.execs_sec == 2.0);
|
||||
|
||||
// existing worker, now new data. totals should stay the same.
|
||||
total.update(a.clone());
|
||||
assert!(total.count == 20);
|
||||
assert!(total.execs_sec == 2.0);
|
||||
|
||||
// new worker, counts & execs_sec should go up by data from worker.
|
||||
let mut b = RuntimeStats {
|
||||
worker_id: 1,
|
||||
run_id: Uuid::new_v4(),
|
||||
count: 10,
|
||||
execs_sec: 2.0,
|
||||
};
|
||||
total.update(b.clone());
|
||||
assert!(total.count == 30);
|
||||
assert!(total.execs_sec == 4.0);
|
||||
|
||||
// existing run for existing worker.
|
||||
// count should go up by 1, execs_sec down by 1.
|
||||
b.count += 1;
|
||||
b.execs_sec -= 1.0;
|
||||
total.update(b.clone());
|
||||
assert!(total.count == 31);
|
||||
assert!(total.execs_sec == 3.0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user