Remove Stream impl for DirectoryMonitor (#1078)

This commit is contained in:
Joe Ranweiler
2021-07-16 10:10:53 -07:00
committed by GitHub
parent db66a1d3c5
commit 517aa54a59
4 changed files with 69 additions and 63 deletions

View File

@ -4,11 +4,9 @@ use anyhow::Result;
use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
use clap::{App, Arg, ArgMatches};
use flume::Sender;
use onefuzz::jitter::delay_with_jitter;
use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use path_absolutize::Absolutize;
use reqwest::Url;
use std::task::Poll;
use std::{
collections::HashMap,
env::current_dir,
@ -275,17 +273,13 @@ impl DirectoryMonitorQueue {
let handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
let mut monitor = DirectoryMonitor::new(directory_path_clone.clone());
monitor.start()?;
loop {
match monitor.poll_file() {
Poll::Ready(Some(file_path)) => {
let file_url = Url::from_file_path(file_path)
.map_err(|_| anyhow!("invalid file path"))?;
queue.enqueue(file_url).await?;
}
Poll::Ready(None) => break,
Poll::Pending => delay_with_jitter(Duration::from_secs(1)).await,
}
while let Some(file_path) = monitor.next_file().await {
let file_url =
Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?;
queue.enqueue(file_url).await?;
}
Ok(())
});

View File

@ -2,7 +2,6 @@
// Licensed under the MIT License.
use anyhow::{Context, Result};
use futures::StreamExt;
use onefuzz::{blob::BlobUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use onefuzz_telemetry::{
Event::{
@ -276,7 +275,7 @@ pub async fn monitor_reports(
let mut monitor = DirectoryMonitor::new(base_dir);
monitor.start()?;
while let Some(file) = monitor.next().await {
while let Some(file) = monitor.next_file().await {
let result = parse_report_file(file).await?;
result.save(unique_reports, reports, no_crash).await?;
}

View File

@ -1,19 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use std::{path::PathBuf, pin::Pin, sync::mpsc, time::Duration};
use std::path::PathBuf;
use std::sync::{self, mpsc::Receiver as SyncReceiver};
use std::time::Duration;
use anyhow::Result;
use futures::{
stream::{FusedStream, Stream},
task::{self, Poll},
};
use notify::{DebouncedEvent, Watcher};
use std::sync::mpsc::TryRecvError;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::task::{self, JoinHandle};
pub struct DirectoryMonitor {
dir: PathBuf,
rx: mpsc::Receiver<DebouncedEvent>,
notify_events: UnboundedReceiver<DebouncedEvent>,
watcher: notify::RecommendedWatcher,
terminated: bool,
}
@ -21,13 +20,17 @@ pub struct DirectoryMonitor {
impl DirectoryMonitor {
pub fn new(dir: impl Into<PathBuf>) -> Self {
let dir = dir.into();
let (tx, rx) = mpsc::channel();
let (notify_sender, notify_receiver) = sync::mpsc::channel();
let delay = Duration::from_millis(100);
let watcher = notify::watcher(tx, delay).unwrap();
let watcher = notify::watcher(notify_sender, delay).unwrap();
// We can drop the thread handle, and it will continue to run until it
// errors or we drop the async receiver.
let (notify_events, _handle) = into_async(notify_receiver);
Self {
dir,
rx,
notify_events,
watcher,
terminated: false,
}
@ -49,48 +52,58 @@ impl DirectoryMonitor {
Ok(())
}
pub fn poll_file(&mut self) -> Poll<Option<PathBuf>> {
match self.rx.try_recv() {
Ok(DebouncedEvent::Create(path)) => Poll::Ready(Some(path)),
Ok(DebouncedEvent::Remove(path)) => {
if path == self.dir {
// The directory we were watching was removed; we're done.
self.stop().ok();
Poll::Ready(None)
} else {
// Some file _inside_ the watched directory was removed.
Poll::Pending
pub async fn next_file(&mut self) -> Option<PathBuf> {
loop {
let event = self.notify_events.recv().await;
if event.is_none() {
// Make sure we stop our `Watcher` if we return early.
let _ = self.stop();
}
match event? {
DebouncedEvent::Create(path) => {
return Some(path);
}
DebouncedEvent::Remove(path) => {
if path == self.dir {
// The directory we were watching was removed; we're done.
let _ = self.stop();
return None;
} else {
// Some file _inside_ the watched directory was removed. Ignore.
}
}
_event => {
// Other filesystem event. Ignore.
}
}
Ok(_evt) => {
// Filesystem event we can ignore.
Poll::Pending
}
Err(TryRecvError::Empty) => {
// Nothing to read, but sender still connected.
Poll::Pending
}
Err(TryRecvError::Disconnected) => {
// We'll never receive any more events; whatever happened, we're done.
self.stop().ok();
Poll::Ready(None)
}
}
}
}
impl Stream for DirectoryMonitor {
type Item = PathBuf;
/// Convert a `Receiver` from a `std::sync::mpsc` channel into an async receiver.
///
/// The returned `JoinHandle` does _not_ need to be held by callers. The associated task
/// will continue to run (detached) if dropped.
fn into_async<T: Send + 'static>(
sync_receiver: SyncReceiver<T>,
) -> (UnboundedReceiver<T>, JoinHandle<()>) {
let (sender, receiver) = unbounded_channel();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let poll = self.poll_file();
cx.waker().wake_by_ref();
poll
}
}
let handle = task::spawn_blocking(move || {
while let Ok(msg) = sync_receiver.recv() {
if sender.send(msg).is_err() {
// The async receiver is closed. We can't do anything else, so
// drop this message (and the sync receiver).
break;
}
}
impl FusedStream for DirectoryMonitor {
fn is_terminated(&self) -> bool {
self.terminated
}
// We'll never receive any more events.
//
// Drop our `Receiver` and hang up.
});
(receiver, handle)
}

View File

@ -11,7 +11,6 @@ use crate::{
};
use anyhow::{Context, Result};
use dunce::canonicalize;
use futures::stream::StreamExt;
use onefuzz_telemetry::{Event, EventData};
use reqwest::{StatusCode, Url};
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
@ -219,13 +218,14 @@ impl SyncedDir {
ignore_dotfiles: bool,
) -> Result<()> {
debug!("monitoring {}", path.display());
let mut monitor = DirectoryMonitor::new(path.clone());
monitor.start()?;
if let Some(path) = url.as_file_path() {
fs::create_dir_all(&path).await?;
while let Some(item) = monitor.next().await {
while let Some(item) = monitor.next_file().await {
let file_name = item
.file_name()
.ok_or_else(|| anyhow!("invalid file path"))?;
@ -261,7 +261,7 @@ impl SyncedDir {
} else {
let mut uploader = BlobUploader::new(url.url()?);
while let Some(item) = monitor.next().await {
while let Some(item) = monitor.next_file().await {
let file_name = item
.file_name()
.ok_or_else(|| anyhow!("invalid file path"))?;