Upgrade to tokio 1.5.0 (#800)

Upgrade to tokio 1.5.0
replaced tokio::sync::mpsc:: with [flume ](https://crates.io/crates/flume) because try_recv was removed
This commit is contained in:
Cheick Keita
2021-04-14 03:07:28 -07:00
committed by GitHub
parent 3f52f34b6a
commit e413aec03d
44 changed files with 683 additions and 356 deletions

View File

@ -19,12 +19,13 @@ backoff = { version = "0.3", features = ["async-std"] }
clap = "2.33"
crossterm = "0.18"
env_logger = "0.8"
futures = "0.3"
flume = "0.10.4"
futures = "0.3.14"
hex = "0.4"
lazy_static = "1.4"
log = "0.4"
num_cpus = "1.13"
reqwest = { version = "0.10", features = ["json", "stream"] }
reqwest = { version = "0.11.3", features = ["json", "stream"] }
serde = "1.0"
serde_json = "1.0"
onefuzz = { path = "../onefuzz" }
@ -35,9 +36,9 @@ stacktrace-parser = { path = "../stacktrace-parser" }
storage-queue = { path = "../storage-queue" }
tempfile = "3.2"
thiserror = "1.0"
tokio = { version = "0.2", features = ["full"] }
tokio-util = { version = "0.3", features = ["full"] }
tokio-stream = "0.1.3"
tokio = { version = "1.5.0", features = ["full"] }
tokio-util = { version = "0.6.6", features = ["full"] }
tokio-stream = "0.1.5"
tui = { version = "0.14", default-features = false, features = ['crossterm'] }
url = { version = "2.2", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] }

View File

@ -3,6 +3,7 @@ use crate::tasks::utils::parse_key_value;
use anyhow::Result;
use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
use clap::{App, Arg, ArgMatches};
use flume::Sender;
use onefuzz::jitter::delay_with_jitter;
use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use path_absolutize::Absolutize;
@ -14,7 +15,6 @@ use std::{
path::{Path, PathBuf},
time::Duration,
};
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;
pub const SETUP_DIR: &str = "setup_dir";
@ -69,7 +69,7 @@ pub enum CmdType {
pub struct LocalContext {
pub job_path: PathBuf,
pub common_config: CommonConfig,
pub event_sender: Option<UnboundedSender<UiEvent>>,
pub event_sender: Option<Sender<UiEvent>>,
}
pub fn get_hash_map(args: &clap::ArgMatches<'_>, name: &str) -> Result<HashMap<String, String>> {
@ -213,7 +213,7 @@ pub fn get_synced_dir(
pub fn build_local_context(
args: &ArgMatches<'_>,
generate_task_id: bool,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<LocalContext> {
let job_id = get_uuid("job_id", args).unwrap_or_else(|_| Uuid::nil());
let task_id = get_uuid("task_id", args).unwrap_or_else(|_| {
@ -322,11 +322,11 @@ pub enum UiEvent {
}
pub trait SyncCountDirMonitor<T: Sized> {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<T>;
fn monitor_count(self, event_sender: &Option<Sender<UiEvent>>) -> Result<T>;
}
impl SyncCountDirMonitor<SyncedDir> for SyncedDir {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
fn monitor_count(self, event_sender: &Option<Sender<UiEvent>>) -> Result<Self> {
if let (Some(event_sender), Some(p)) = (event_sender, self.remote_url()?.as_file_path()) {
event_sender.send(UiEvent::MonitorDir(p))?;
}
@ -335,7 +335,7 @@ impl SyncCountDirMonitor<SyncedDir> for SyncedDir {
}
impl SyncCountDirMonitor<Option<SyncedDir>> for Option<SyncedDir> {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
fn monitor_count(self, event_sender: &Option<Sender<UiEvent>>) -> Result<Self> {
if let Some(sd) = self {
let sd = sd.monitor_count(event_sender)?;
Ok(Some(sd))

View File

@ -15,14 +15,14 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_analysis_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_options = get_cmd_arg(CmdType::Target, args);
@ -69,10 +69,7 @@ pub fn build_analysis_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_analysis_config(args, None, context.common_config.clone(), event_sender)?;
run_analysis(config).await

View File

@ -15,14 +15,14 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_report_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -78,10 +78,7 @@ pub fn build_report_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_report_config(args, None, context.common_config.clone(), event_sender)?;
ReportTask::new(config).managed_run().await

View File

@ -16,12 +16,12 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use tokio::sync::mpsc::UnboundedSender;
use flume::Sender;
pub fn build_fuzz_config(
args: &clap::ArgMatches<'_>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
@ -71,10 +71,7 @@ pub fn build_fuzz_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_fuzz_config(args, context.common_config.clone(), event_sender)?;
GeneratorTask::new(config).run().await

View File

@ -23,15 +23,13 @@ use crate::{
};
use anyhow::Result;
use clap::{App, SubCommand};
use flume::Sender;
use onefuzz::utils::try_wait_all_join_handles;
use std::collections::HashSet;
use tokio::{sync::mpsc::UnboundedSender, task::spawn};
use tokio::task::spawn;
use uuid::Uuid;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
let crash_dir = fuzz_config

View File

@ -14,8 +14,8 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
use super::common::{SyncCountDirMonitor, UiEvent};
@ -24,7 +24,7 @@ pub fn build_coverage_config(
local_job: bool,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -61,10 +61,7 @@ pub fn build_coverage_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_coverage_config(
args,

View File

@ -15,14 +15,14 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_report_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -73,10 +73,7 @@ pub fn build_report_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_report_config(args, None, context.common_config.clone(), event_sender)?;
ReportTask::new(config).managed_run().await

View File

@ -14,14 +14,14 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use tokio::sync::mpsc::UnboundedSender;
use flume::Sender;
const EXPECT_CRASH_ON_FAILURE: &str = "expect_crash_on_failure";
pub fn build_fuzz_config(
args: &clap::ArgMatches<'_>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?
.monitor_count(&event_sender)?;
@ -56,10 +56,7 @@ pub fn build_fuzz_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_fuzz_config(args, context.common_config.clone(), event_sender)?;
LibFuzzerFuzzTask::new(config)?.run().await

View File

@ -15,14 +15,14 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use storage_queue::QueueClient;
use tokio::sync::mpsc::UnboundedSender;
pub fn build_merge_config(
args: &clap::ArgMatches<'_>,
input_queue: Option<QueueClient>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -52,10 +52,7 @@ pub fn build_merge_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_merge_config(args, None, context.common_config.clone(), event_sender)?;
spawn(std::sync::Arc::new(config)).await

View File

@ -15,14 +15,14 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use tokio::sync::mpsc::UnboundedSender;
use flume::Sender;
const REPORT_NAMES: &str = "report_names";
pub fn build_regression_config(
args: &clap::ArgMatches<'_>,
common: CommonConfig,
event_sender: Option<UnboundedSender<UiEvent>>,
event_sender: Option<Sender<UiEvent>>,
) -> Result<Config> {
let target_exe = get_cmd_exe(CmdType::Target, args)?.into();
let target_env = get_cmd_env(CmdType::Target, args)?;
@ -73,10 +73,7 @@ pub fn build_regression_config(
Ok(config)
}
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let config = build_regression_config(args, context.common_config.clone(), event_sender)?;
LibFuzzerRegressionTask::new(config).run().await

View File

@ -10,13 +10,10 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use std::path::PathBuf;
use tokio::sync::mpsc::UnboundedSender;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender)?;
let target_exe = value_t!(args, TARGET_EXE, PathBuf)?;

View File

@ -11,16 +11,13 @@ use crate::{
};
use anyhow::Result;
use clap::{App, SubCommand};
use flume::Sender;
use onefuzz::utils::try_wait_all_join_handles;
use std::collections::HashSet;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::spawn;
use uuid::Uuid;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, true, event_sender.clone())?;
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
let crash_dir = fuzz_config

View File

@ -11,13 +11,10 @@ use crate::{
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use flume::Sender;
use std::path::PathBuf;
use tokio::sync::mpsc::UnboundedSender;
pub async fn run(
args: &clap::ArgMatches<'_>,
event_sender: Option<UnboundedSender<UiEvent>>,
) -> Result<()> {
pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEvent>>) -> Result<()> {
let context = build_local_context(args, false, event_sender)?;
let target_exe = value_t!(args, TARGET_EXE, PathBuf)?;

View File

@ -21,12 +21,11 @@ use std::{
thread::{self, JoinHandle},
time::Duration,
};
use flume::{Receiver, Sender};
use tokio::{
sync::{
broadcast::{self, TryRecvError},
mpsc::{self, UnboundedSender},
},
time::delay_for,
sync::broadcast::{self, error::TryRecvError},
time::sleep,
};
use tui::{
backend::CrosstermBackend,
@ -91,7 +90,7 @@ struct UiLoopState {
pub file_count: HashMap<PathBuf, usize>,
pub file_count_state: ListState,
pub file_monitors: HashMap<PathBuf, tokio::task::JoinHandle<Result<()>>>,
pub log_event_receiver: mpsc::UnboundedReceiver<(Level, String)>,
pub log_event_receiver: Receiver<(Level, String)>,
pub terminal: Terminal<CrosstermBackend<Stdout>>,
pub cancellation_tx: broadcast::Sender<()>,
pub events: HashMap<Discriminant<EventData>, EventData>,
@ -100,7 +99,7 @@ struct UiLoopState {
impl UiLoopState {
fn new(
terminal: Terminal<CrosstermBackend<Stdout>>,
log_event_receiver: mpsc::UnboundedReceiver<(Level, String)>,
log_event_receiver: Receiver<(Level, String)>,
) -> Self {
let (cancellation_tx, _) = broadcast::channel(1);
let events = HashMap::new();
@ -118,16 +117,16 @@ impl UiLoopState {
}
pub struct TerminalUi {
pub task_events: mpsc::UnboundedSender<UiEvent>,
task_event_receiver: mpsc::UnboundedReceiver<UiEvent>,
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
ui_event_rx: mpsc::UnboundedReceiver<TerminalEvent>,
pub task_events: Sender<UiEvent>,
task_event_receiver: Receiver<UiEvent>,
ui_event_tx: Sender<TerminalEvent>,
ui_event_rx: Receiver<TerminalEvent>,
}
impl TerminalUi {
pub fn init() -> Result<Self> {
let (task_event_sender, task_event_receiver) = mpsc::unbounded_channel();
let (ui_event_tx, ui_event_rx) = mpsc::unbounded_channel();
let (task_event_sender, task_event_receiver) = flume::unbounded();
let (ui_event_tx, ui_event_rx) = flume::unbounded();
Ok(Self {
task_events: task_event_sender,
task_event_receiver,
@ -144,7 +143,7 @@ impl TerminalUi {
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
let (log_event_sender, log_event_receiver) = mpsc::unbounded_channel();
let (log_event_sender, log_event_receiver) = flume::unbounded();
let initial_state = UiLoopState::new(terminal, log_event_receiver);
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
@ -196,7 +195,7 @@ impl TerminalUi {
if let Some(timeout) = timeout {
let ui_event_tx = self.ui_event_tx.clone();
tokio::spawn(async move {
tokio::time::delay_for(timeout).await;
tokio::time::sleep(timeout).await;
let _ = ui_event_tx.send(TerminalEvent::Quit);
});
}
@ -226,12 +225,12 @@ impl TerminalUi {
}
async fn listen_telemetry_event(
ui_event_tx: UnboundedSender<TerminalEvent>,
ui_event_tx: Sender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
let mut rx = onefuzz_telemetry::subscribe_to_events();
while cancellation_rx.try_recv() == Err(broadcast::TryRecvError::Empty) {
while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) {
match rx.try_recv() {
Ok((_event, data)) => {
let data = data
@ -240,7 +239,7 @@ impl TerminalUi {
.collect::<Vec<_>>();
let _ = ui_event_tx.send(TerminalEvent::Telemetry(data));
}
Err(TryRecvError::Empty) => delay_for(EVENT_POLLING_PERIOD).await,
Err(TryRecvError::Empty) => sleep(EVENT_POLLING_PERIOD).await,
Err(TryRecvError::Lagged(_)) => continue,
Err(TryRecvError::Closed) => break,
}
@ -249,11 +248,11 @@ impl TerminalUi {
}
async fn ticking(
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
ui_event_tx: Sender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
let mut interval = tokio::time::interval(TICK_RATE);
while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() {
while Err(broadcast::error::TryRecvError::Empty) == cancellation_rx.try_recv() {
interval.tick().await;
if let Err(_err) = ui_event_tx.send(TerminalEvent::Tick) {
break;
@ -263,11 +262,11 @@ impl TerminalUi {
}
fn read_keyboard_events(
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
ui_event_tx: Sender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> JoinHandle<Result<()>> {
thread::spawn(move || {
while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() {
while Err(broadcast::error::TryRecvError::Empty) == cancellation_rx.try_recv() {
if event::poll(EVENT_POLLING_PERIOD)? {
let event = event::read()?;
if let Err(_err) = ui_event_tx.send(TerminalEvent::Input(event)) {
@ -280,26 +279,26 @@ impl TerminalUi {
}
async fn read_commands(
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
mut external_event_rx: mpsc::UnboundedReceiver<UiEvent>,
ui_event_tx: Sender<TerminalEvent>,
external_event_rx: Receiver<UiEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() {
while Err(broadcast::error::TryRecvError::Empty) == cancellation_rx.try_recv() {
match external_event_rx.try_recv() {
Ok(UiEvent::MonitorDir(dir)) => {
if ui_event_tx.send(TerminalEvent::MonitorDir(dir)).is_err() {
break;
}
}
Err(mpsc::error::TryRecvError::Empty) => delay_for(EVENT_POLLING_PERIOD).await,
Err(mpsc::error::TryRecvError::Closed) => break,
Err(flume::TryRecvError::Empty) => sleep(EVENT_POLLING_PERIOD).await,
Err(flume::TryRecvError::Disconnected) => break,
}
}
Ok(())
}
fn take_available_logs<T>(
receiver: &mut mpsc::UnboundedReceiver<T>,
receiver: &mut Receiver<T>,
size: usize,
buffer: &mut ArrayDeque<[T; LOGS_BUFFER_SIZE], Wrapping>,
) {
@ -565,7 +564,7 @@ impl TerminalUi {
async fn on_monitor_dir(
ui_state: UiLoopState,
path: PathBuf,
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
ui_event_tx: Sender<TerminalEvent>,
cancellation_rx: broadcast::Receiver<()>,
) -> Result<UiLoopState, UiLoopError> {
let mut file_monitors = ui_state.file_monitors;
@ -582,10 +581,11 @@ impl TerminalUi {
async fn ui_loop(
initial_state: UiLoopState,
ui_event_rx: mpsc::UnboundedReceiver<TerminalEvent>,
ui_event_tx: mpsc::UnboundedSender<TerminalEvent>,
ui_event_rx: Receiver<TerminalEvent>,
ui_event_tx: Sender<TerminalEvent>,
) -> Result<()> {
let loop_result = ui_event_rx
.stream()
.map(Ok)
.try_fold(initial_state, |ui_state, event| async {
let ui_event_tx = ui_event_tx.clone();
@ -632,16 +632,16 @@ impl TerminalUi {
fn spawn_file_count_monitor(
dir: PathBuf,
sender: mpsc::UnboundedSender<TerminalEvent>,
sender: Sender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(async move {
wait_for_dir(&dir).await?;
while cancellation_rx.try_recv() == Err(broadcast::TryRecvError::Empty) {
while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) {
let mut rd = tokio::fs::read_dir(&dir).await?;
let mut count: usize = 0;
while let Some(Ok(entry)) = rd.next().await {
while let Ok(Some(entry)) = rd.next_entry().await {
if entry.path().is_file() {
count += 1;
}
@ -657,7 +657,7 @@ impl TerminalUi {
break;
}
delay_for(FILE_MONITOR_POLLING_PERIOD).await;
sleep(FILE_MONITOR_POLLING_PERIOD).await;
}
Ok(())
})

View File

@ -37,7 +37,7 @@ fn main() -> Result<()> {
let matches = app.get_matches();
let mut rt = tokio::runtime::Runtime::new()?;
let rt = tokio::runtime::Runtime::new()?;
let result = rt.block_on(run(matches));
atexit::execute();
result

View File

@ -5,7 +5,6 @@ use crate::tasks::{
config::CommonConfig, heartbeat::HeartbeatSender, report::crash_report::monitor_reports,
};
use anyhow::{Context, Result};
use futures::stream::StreamExt;
use onefuzz::{az_copy, blob::url::BlobUrl};
use onefuzz::{
expand::Expand,
@ -116,9 +115,8 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<
crashes.init_pull().await?;
let mut count: u64 = 0;
let mut read_dir = fs::read_dir(&crashes.path).await?;
while let Some(file) = read_dir.next().await {
while let Some(file) = read_dir.next_entry().await? {
debug!("Processing file {:?}", file);
let file = file?;
run_tool(file.path(), &config, &reports_dir).await?;
count += 1;
}

View File

@ -38,7 +38,6 @@ use crate::tasks::{
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::stream::StreamExt;
use onefuzz::{fs::list_files, libfuzzer::LibFuzzer, syncdir::SyncedDir};
use onefuzz_telemetry::{Event::coverage_data, EventData};
use reqwest::Url;
@ -159,9 +158,10 @@ impl CoverageTask {
})?;
let mut seen_inputs = false;
while let Some(input) = corpus.next().await {
let input = match input {
Ok(input) => input,
loop {
let input = match corpus.next_entry().await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(err) => {
error!("{}", err);
continue;

View File

@ -7,7 +7,6 @@ use crate::tasks::{
utils::{self, default_bool_true},
};
use anyhow::{Context, Result};
use futures::stream::StreamExt;
use onefuzz::{
expand::Expand,
fs::set_executable,
@ -122,9 +121,7 @@ impl GeneratorTask {
tester: &Tester<'_>,
) -> Result<()> {
let mut read_dir = fs::read_dir(generated_inputs).await?;
while let Some(file) = read_dir.next().await {
let file = file?;
while let Some(file) = read_dir.next_entry().await? {
debug!("testing input: {:?}", file);
let destination_file = if self.config.rename_output {

View File

@ -4,7 +4,7 @@
use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender, utils::default_bool_true};
use anyhow::{Context, Result};
use arraydeque::{ArrayDeque, Wrapping};
use futures::{future::try_join_all, stream::StreamExt};
use futures::future::try_join_all;
use onefuzz::{
fs::list_files,
libfuzzer::{LibFuzzer, LibFuzzerLine},
@ -23,7 +23,7 @@ use tokio::{
io::{AsyncBufReadExt, BufReader},
sync::mpsc,
task,
time::{delay_for, Duration},
time::{sleep, Duration},
};
use uuid::Uuid;
@ -160,7 +160,7 @@ impl LibFuzzerFuzzTask {
.await?;
let mut entries = tokio::fs::read_dir(local_input_dir.path()).await?;
while let Some(Ok(entry)) = entries.next().await {
while let Ok(Some(entry)) = entries.next_entry().await {
let destination_path = self.config.inputs.path.clone().join(entry.file_name());
tokio::fs::rename(&entry.path(), &destination_path)
.await
@ -203,7 +203,11 @@ impl LibFuzzerFuzzTask {
let mut running = fuzzer.fuzz(crash_dir.path(), local_inputs, &inputs)?;
let running_id = running.id();
let sys_info = task::spawn(report_fuzzer_sys_info(worker_id, run_id, running_id));
let sys_info = task::spawn(report_fuzzer_sys_info(
worker_id,
run_id,
running_id.unwrap_or_default(),
));
// Splitting borrow.
let stderr = running
@ -229,7 +233,7 @@ impl LibFuzzerFuzzTask {
libfuzzer_output.push_back(line);
}
let (exit_status, _) = tokio::join!(running, sys_info);
let (exit_status, _) = tokio::join!(running.wait(), sys_info);
let exit_status: ExitStatus = exit_status?.into();
let files = list_files(crash_dir.path()).await?;
@ -311,7 +315,7 @@ fn try_report_iter_update(
async fn report_fuzzer_sys_info(worker_id: usize, run_id: Uuid, fuzzer_pid: u32) -> Result<()> {
// Allow for sampling CPU usage.
delay_for(PROC_INFO_COLLECTION_DELAY).await;
sleep(PROC_INFO_COLLECTION_DELAY).await;
loop {
// process doesn't exist
@ -335,7 +339,7 @@ async fn report_fuzzer_sys_info(worker_id: usize, run_id: Uuid, fuzzer_pid: u32)
break;
}
delay_for(PROC_INFO_PERIOD).await;
sleep(PROC_INFO_PERIOD).await;
}
Ok(())
@ -396,7 +400,7 @@ impl Timer {
}
async fn wait(&self) {
delay_for(self.interval).await;
sleep(self.interval).await;
}
}
@ -424,7 +428,7 @@ async fn report_runtime_stats(
loop {
tokio::select! {
Some(stats) = stats_channel.next() => {
Some(stats) = stats_channel.recv() => {
heartbeat_client.alive();
total.update(stats);
total.report()

View File

@ -362,7 +362,7 @@ mod tests {
MAX_FUZZ_TIME_SECONDS
);
}
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}

View File

@ -4,7 +4,6 @@
use std::{fmt, path::PathBuf};
use anyhow::Result;
use futures::stream::StreamExt;
use onefuzz::{blob::BlobUrl, jitter::delay_with_jitter, syncdir::SyncedDir};
use reqwest::Url;
use tempfile::{tempdir, TempDir};
@ -133,8 +132,8 @@ impl<M> InputPoller<M> {
);
let mut read_dir = fs::read_dir(&to_process.path).await?;
while let Some(file) = read_dir.next().await {
let path = file?.path();
while let Some(file) = read_dir.next_entry().await? {
let path = file.path();
info!(
"processing batch-downloaded input: {} - {}",
self.name,