mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-18 04:38:09 +00:00
Allow the local run to work without copying the synced dirs (#794)
Added an option to local run to not create a job directory
This commit is contained in:
16
src/agent/Cargo.lock
generated
16
src/agent/Cargo.lock
generated
@ -1764,7 +1764,6 @@ dependencies = [
|
|||||||
"onefuzz",
|
"onefuzz",
|
||||||
"onefuzz-telemetry",
|
"onefuzz-telemetry",
|
||||||
"path-absolutize",
|
"path-absolutize",
|
||||||
"remove_dir_all 0.7.0",
|
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"reqwest-retry",
|
"reqwest-retry",
|
||||||
"serde",
|
"serde",
|
||||||
@ -2317,19 +2316,6 @@ dependencies = [
|
|||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "remove_dir_all"
|
|
||||||
version = "0.7.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "882f368737489ea543bc5c340e6f3d34a28c39980bd9a979e47322b26f60ac40"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"log",
|
|
||||||
"num_cpus",
|
|
||||||
"rayon",
|
|
||||||
"winapi 0.3.9",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "reqwest"
|
name = "reqwest"
|
||||||
version = "0.10.10"
|
version = "0.10.10"
|
||||||
@ -2835,7 +2821,7 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"rand 0.8.3",
|
"rand 0.8.3",
|
||||||
"redox_syscall",
|
"redox_syscall",
|
||||||
"remove_dir_all 0.5.3",
|
"remove_dir_all",
|
||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -31,7 +31,6 @@ onefuzz = { path = "../onefuzz" }
|
|||||||
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
|
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
|
||||||
path-absolutize = "3.0.6"
|
path-absolutize = "3.0.6"
|
||||||
reqwest-retry = { path = "../reqwest-retry" }
|
reqwest-retry = { path = "../reqwest-retry" }
|
||||||
remove_dir_all = "0.7"
|
|
||||||
stacktrace-parser = { path = "../stacktrace-parser" }
|
stacktrace-parser = { path = "../stacktrace-parser" }
|
||||||
storage-queue = { path = "../storage-queue" }
|
storage-queue = { path = "../storage-queue" }
|
||||||
tempfile = "3.2"
|
tempfile = "3.2"
|
||||||
|
@ -4,9 +4,8 @@ use anyhow::Result;
|
|||||||
use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
|
use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
|
||||||
use clap::{App, Arg, ArgMatches};
|
use clap::{App, Arg, ArgMatches};
|
||||||
use onefuzz::jitter::delay_with_jitter;
|
use onefuzz::jitter::delay_with_jitter;
|
||||||
use onefuzz::{blob::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
|
use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
|
||||||
use path_absolutize::Absolutize;
|
use path_absolutize::Absolutize;
|
||||||
use remove_dir_all::remove_dir_all;
|
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
use std::{
|
use std::{
|
||||||
@ -55,6 +54,8 @@ pub const ANALYSIS_INPUTS: &str = "analysis_inputs";
|
|||||||
pub const ANALYSIS_UNIQUE_INPUTS: &str = "analysis_unique_inputs";
|
pub const ANALYSIS_UNIQUE_INPUTS: &str = "analysis_unique_inputs";
|
||||||
pub const PRESERVE_EXISTING_OUTPUTS: &str = "preserve_existing_outputs";
|
pub const PRESERVE_EXISTING_OUTPUTS: &str = "preserve_existing_outputs";
|
||||||
|
|
||||||
|
pub const CREATE_JOB_DIR: &str = "create_job_dir";
|
||||||
|
|
||||||
const WAIT_FOR_MAX_WAIT: Duration = Duration::from_secs(10);
|
const WAIT_FOR_MAX_WAIT: Duration = Duration::from_secs(10);
|
||||||
const WAIT_FOR_DIR_DELAY: Duration = Duration::from_secs(1);
|
const WAIT_FOR_DIR_DELAY: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
@ -139,10 +140,10 @@ pub fn add_common_config(app: App<'static, 'static>) -> App<'static, 'static> {
|
|||||||
.required(false),
|
.required(false),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("keep_job_dir")
|
Arg::with_name(CREATE_JOB_DIR)
|
||||||
.long("keep_job_dir")
|
.long(CREATE_JOB_DIR)
|
||||||
.required(false)
|
.required(false)
|
||||||
.help("keep the local directory created for running the task"),
|
.help("create a local job directory to sync the files"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,40 +159,29 @@ pub fn get_synced_dirs(
|
|||||||
task_id: Uuid,
|
task_id: Uuid,
|
||||||
args: &ArgMatches<'_>,
|
args: &ArgMatches<'_>,
|
||||||
) -> Result<Vec<SyncedDir>> {
|
) -> Result<Vec<SyncedDir>> {
|
||||||
|
let create_job_dir = args.is_present(CREATE_JOB_DIR);
|
||||||
let current_dir = std::env::current_dir()?;
|
let current_dir = std::env::current_dir()?;
|
||||||
args.values_of_os(name)
|
args.values_of_os(name)
|
||||||
.ok_or_else(|| anyhow!("argument '{}' not specified", name))?
|
.ok_or_else(|| anyhow!("argument '{}' not specified", name))?
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(index, remote_path)| {
|
.map(|(index, remote_path)| {
|
||||||
let path = PathBuf::from(remote_path);
|
let path = PathBuf::from(remote_path);
|
||||||
|
if create_job_dir {
|
||||||
let remote_path = path.absolutize()?;
|
let remote_path = path.absolutize()?;
|
||||||
let remote_url = Url::from_file_path(remote_path).expect("invalid file path");
|
let remote_url = Url::from_file_path(remote_path).expect("invalid file path");
|
||||||
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
|
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
|
||||||
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
|
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
|
||||||
Ok(SyncedDir {
|
Ok(SyncedDir {
|
||||||
url: remote_blob_url,
|
url: Some(remote_blob_url),
|
||||||
path,
|
path,
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
Ok(SyncedDir { url: None, path })
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_cleanup(job_id: Uuid) -> Result<()> {
|
|
||||||
let path = std::env::current_dir()?.join(job_id.to_string());
|
|
||||||
atexit::register(move || {
|
|
||||||
// only cleaing up if the path exists upon exit
|
|
||||||
if std::fs::metadata(&path).is_ok() {
|
|
||||||
let result = remove_dir_all(&path);
|
|
||||||
|
|
||||||
// don't panic if the remove failed but the path is gone
|
|
||||||
if result.is_err() && std::fs::metadata(&path).is_ok() {
|
|
||||||
result.expect("cleanup failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_synced_dir(
|
pub fn get_synced_dir(
|
||||||
name: &str,
|
name: &str,
|
||||||
job_id: Uuid,
|
job_id: Uuid,
|
||||||
@ -199,13 +189,21 @@ pub fn get_synced_dir(
|
|||||||
args: &ArgMatches<'_>,
|
args: &ArgMatches<'_>,
|
||||||
) -> Result<SyncedDir> {
|
) -> Result<SyncedDir> {
|
||||||
let remote_path = value_t!(args, name, PathBuf)?.absolutize()?.into_owned();
|
let remote_path = value_t!(args, name, PathBuf)?.absolutize()?.into_owned();
|
||||||
let remote_url = Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?;
|
if args.is_present(CREATE_JOB_DIR) {
|
||||||
|
let remote_url =
|
||||||
|
Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?;
|
||||||
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
|
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
|
||||||
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
|
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
|
||||||
Ok(SyncedDir {
|
Ok(SyncedDir {
|
||||||
url: remote_blob_url,
|
url: Some(remote_blob_url),
|
||||||
path,
|
path,
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
Ok(SyncedDir {
|
||||||
|
url: None,
|
||||||
|
path: remote_path,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: generate_task_id is intended to change the default behavior for local
|
// NOTE: generate_task_id is intended to change the default behavior for local
|
||||||
@ -238,10 +236,6 @@ pub fn build_local_context(
|
|||||||
PathBuf::default()
|
PathBuf::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
if !args.is_present("keep_job_dir") {
|
|
||||||
register_cleanup(job_id)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let common_config = CommonConfig {
|
let common_config = CommonConfig {
|
||||||
job_id,
|
job_id,
|
||||||
task_id,
|
task_id,
|
||||||
@ -333,7 +327,7 @@ pub trait SyncCountDirMonitor<T: Sized> {
|
|||||||
|
|
||||||
impl SyncCountDirMonitor<SyncedDir> for SyncedDir {
|
impl SyncCountDirMonitor<SyncedDir> for SyncedDir {
|
||||||
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
|
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
|
||||||
if let (Some(event_sender), Some(p)) = (event_sender, self.url.as_file_path()) {
|
if let (Some(event_sender), Some(p)) = (event_sender, self.remote_url()?.as_file_path()) {
|
||||||
event_sender.send(UiEvent::MonitorDir(p))?;
|
event_sender.send(UiEvent::MonitorDir(p))?;
|
||||||
}
|
}
|
||||||
Ok(self)
|
Ok(self)
|
||||||
|
@ -36,7 +36,7 @@ pub async fn run(
|
|||||||
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
|
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
|
||||||
let crash_dir = fuzz_config
|
let crash_dir = fuzz_config
|
||||||
.crashes
|
.crashes
|
||||||
.url
|
.remote_url()?
|
||||||
.as_file_path()
|
.as_file_path()
|
||||||
.expect("invalid crash dir remote location");
|
.expect("invalid crash dir remote location");
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ pub async fn run(
|
|||||||
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
|
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
|
||||||
let crash_dir = fuzz_config
|
let crash_dir = fuzz_config
|
||||||
.crashes
|
.crashes
|
||||||
.url
|
.remote_url()?
|
||||||
.as_file_path()
|
.as_file_path()
|
||||||
.expect("invalid crash dir remote location");
|
.expect("invalid crash dir remote location");
|
||||||
|
|
||||||
|
@ -130,8 +130,8 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<
|
|||||||
|
|
||||||
async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
|
async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
|
||||||
let result = if let Some(crashes) = &config.crashes {
|
let result = if let Some(crashes) = &config.crashes {
|
||||||
crashes.url.account() == input.account()
|
crashes.url.clone().and_then(|u| u.account()) == input.account()
|
||||||
&& crashes.url.container() == input.container()
|
&& crashes.url.clone().and_then(|u| u.container()) == input.container()
|
||||||
&& crashes.path.join(input.name()).exists()
|
&& crashes.path.join(input.name()).exists()
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
@ -211,12 +211,14 @@ pub async fn run_tool(
|
|||||||
})
|
})
|
||||||
.set_optional_ref(&config.crashes, |tester, crashes| {
|
.set_optional_ref(&config.crashes, |tester, crashes| {
|
||||||
tester
|
tester
|
||||||
.set_optional_ref(&crashes.url.account(), |tester, account| {
|
.set_optional_ref(
|
||||||
tester.crashes_account(account)
|
&crashes.url.clone().and_then(|u| u.account()),
|
||||||
})
|
|tester, account| tester.crashes_account(account),
|
||||||
.set_optional_ref(&crashes.url.container(), |tester, container| {
|
)
|
||||||
tester.crashes_container(container)
|
.set_optional_ref(
|
||||||
})
|
&crashes.url.clone().and_then(|u| u.container()),
|
||||||
|
|tester, container| tester.crashes_container(container),
|
||||||
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let analyzer_path = expand.evaluate_value(&config.analyzer_exe)?;
|
let analyzer_path = expand.evaluate_value(&config.analyzer_exe)?;
|
||||||
|
@ -244,15 +244,21 @@ mod tests {
|
|||||||
generator_options,
|
generator_options,
|
||||||
readonly_inputs: vec![SyncedDir {
|
readonly_inputs: vec![SyncedDir {
|
||||||
path: readonly_inputs_local,
|
path: readonly_inputs_local,
|
||||||
url: BlobContainerUrl::parse(Url::from_directory_path(inputs).unwrap())?,
|
url: Some(BlobContainerUrl::parse(
|
||||||
|
Url::from_directory_path(inputs).unwrap(),
|
||||||
|
)?),
|
||||||
}],
|
}],
|
||||||
crashes: SyncedDir {
|
crashes: SyncedDir {
|
||||||
path: crashes_local,
|
path: crashes_local,
|
||||||
url: BlobContainerUrl::parse(Url::from_directory_path(crashes).unwrap())?,
|
url: Some(BlobContainerUrl::parse(
|
||||||
|
Url::from_directory_path(crashes).unwrap(),
|
||||||
|
)?),
|
||||||
},
|
},
|
||||||
tools: Some(SyncedDir {
|
tools: Some(SyncedDir {
|
||||||
path: tools_local,
|
path: tools_local,
|
||||||
url: BlobContainerUrl::parse(Url::from_directory_path(radamsa_dir).unwrap())?,
|
url: Some(BlobContainerUrl::parse(
|
||||||
|
Url::from_directory_path(radamsa_dir).unwrap(),
|
||||||
|
)?),
|
||||||
}),
|
}),
|
||||||
target_exe: Default::default(),
|
target_exe: Default::default(),
|
||||||
target_env: Default::default(),
|
target_env: Default::default(),
|
||||||
|
@ -199,12 +199,14 @@ async fn start_supervisor(
|
|||||||
.set_optional_ref(&config.common.instance_telemetry_key, |tester, key| {
|
.set_optional_ref(&config.common.instance_telemetry_key, |tester, key| {
|
||||||
tester.instance_telemetry_key(&key)
|
tester.instance_telemetry_key(&key)
|
||||||
})
|
})
|
||||||
.set_optional_ref(&config.crashes.url.account(), |tester, account| {
|
.set_optional_ref(
|
||||||
tester.crashes_account(account)
|
&config.crashes.url.clone().and_then(|u| u.account()),
|
||||||
})
|
|tester, account| tester.crashes_account(account),
|
||||||
.set_optional_ref(&config.crashes.url.container(), |tester, container| {
|
)
|
||||||
tester.crashes_container(container)
|
.set_optional_ref(
|
||||||
});
|
&config.crashes.url.clone().and_then(|u| u.container()),
|
||||||
|
|tester, container| tester.crashes_container(container),
|
||||||
|
);
|
||||||
|
|
||||||
let supervisor_path = expand.evaluate_value(&config.supervisor_exe)?;
|
let supervisor_path = expand.evaluate_value(&config.supervisor_exe)?;
|
||||||
let mut cmd = Command::new(supervisor_path);
|
let mut cmd = Command::new(supervisor_path);
|
||||||
@ -285,15 +287,18 @@ mod tests {
|
|||||||
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
|
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
|
||||||
let crashes = SyncedDir {
|
let crashes = SyncedDir {
|
||||||
path: crashes_local,
|
path: crashes_local,
|
||||||
url: BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap())
|
url: Some(
|
||||||
.unwrap(),
|
BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(),
|
||||||
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
let corpus_dir_temp = tempfile::tempdir().unwrap();
|
let corpus_dir_temp = tempfile::tempdir().unwrap();
|
||||||
let corpus_dir = SyncedDir {
|
let corpus_dir = SyncedDir {
|
||||||
path: corpus_dir_local,
|
path: corpus_dir_local,
|
||||||
url: BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
|
url: Some(
|
||||||
|
BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
|
),
|
||||||
};
|
};
|
||||||
let seed_file_name = corpus_dir.path.join("seed.txt");
|
let seed_file_name = corpus_dir.path.join("seed.txt");
|
||||||
tokio::fs::write(seed_file_name, "xyz").await.unwrap();
|
tokio::fs::write(seed_file_name, "xyz").await.unwrap();
|
||||||
|
@ -149,7 +149,7 @@ impl<M> InputPoller<M> {
|
|||||||
let dir_relative = input_path.strip_prefix(&dir_path)?;
|
let dir_relative = input_path.strip_prefix(&dir_path)?;
|
||||||
dir_relative.display().to_string()
|
dir_relative.display().to_string()
|
||||||
};
|
};
|
||||||
let url = to_process.try_url().map(|x| x.blob(blob_name).url()).ok();
|
let url = to_process.try_url().map(|x| x.blob(blob_name).url());
|
||||||
|
|
||||||
processor.process(url, &path).await?;
|
processor.process(url, &path).await?;
|
||||||
}
|
}
|
||||||
@ -160,8 +160,8 @@ impl<M> InputPoller<M> {
|
|||||||
pub async fn seen_in_batch(&self, url: &Url) -> Result<bool> {
|
pub async fn seen_in_batch(&self, url: &Url) -> Result<bool> {
|
||||||
let result = if let Some(batch_dir) = &self.batch_dir {
|
let result = if let Some(batch_dir) = &self.batch_dir {
|
||||||
if let Ok(blob) = BlobUrl::new(url.clone()) {
|
if let Ok(blob) = BlobUrl::new(url.clone()) {
|
||||||
batch_dir.try_url()?.account() == blob.account()
|
batch_dir.try_url().and_then(|u| u.account()) == blob.account()
|
||||||
&& batch_dir.try_url()?.container() == blob.container()
|
&& batch_dir.try_url().and_then(|u| u.container()) == blob.container()
|
||||||
&& batch_dir.path.join(blob.name()).exists()
|
&& batch_dir.path.join(blob.name()).exists()
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
|
@ -88,7 +88,7 @@ pub async fn handle_inputs(
|
|||||||
.to_string_lossy()
|
.to_string_lossy()
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
let input_url = readonly_inputs.url.url().join(&file_name)?;
|
let input_url = readonly_inputs.remote_url()?.url().join(&file_name)?;
|
||||||
|
|
||||||
let crash_test_result = handler.get_crash_result(file_path, input_url).await?;
|
let crash_test_result = handler.get_crash_result(file_path, input_url).await?;
|
||||||
RegressionReport {
|
RegressionReport {
|
||||||
@ -150,7 +150,7 @@ pub async fn handle_crash_reports(
|
|||||||
}
|
}
|
||||||
.ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?;
|
.ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?;
|
||||||
|
|
||||||
let input_url = crashes.url.blob(&input_blob.name).url();
|
let input_url = crashes.remote_url()?.url().clone();
|
||||||
let input = crashes.path.join(&input_blob.name);
|
let input = crashes.path.join(&input_blob.name);
|
||||||
let crash_test_result = handler.get_crash_result(input, input_url).await?;
|
let crash_test_result = handler.get_crash_result(input, input_url).await?;
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ use crate::{
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use onefuzz_telemetry::{Event, EventData};
|
use onefuzz_telemetry::{Event, EventData};
|
||||||
use reqwest::StatusCode;
|
use reqwest::{StatusCode, Url};
|
||||||
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
|
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{path::PathBuf, str, time::Duration};
|
use std::{path::PathBuf, str, time::Duration};
|
||||||
@ -30,13 +30,21 @@ const DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS: u64 = 60;
|
|||||||
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
||||||
pub struct SyncedDir {
|
pub struct SyncedDir {
|
||||||
pub path: PathBuf,
|
pub path: PathBuf,
|
||||||
pub url: BlobContainerUrl,
|
pub url: Option<BlobContainerUrl>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SyncedDir {
|
impl SyncedDir {
|
||||||
|
pub fn remote_url(&self) -> Result<BlobContainerUrl> {
|
||||||
|
let url = self.url.clone().unwrap_or(BlobContainerUrl::new(
|
||||||
|
Url::from_file_path(self.path.clone()).map_err(|_| anyhow!("invalid path"))?,
|
||||||
|
)?);
|
||||||
|
Ok(url)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
|
pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
|
||||||
let dir = &self.path.join("");
|
let dir = &self.path.join("");
|
||||||
if let Some(dest) = self.url.as_file_path() {
|
|
||||||
|
if let Some(dest) = self.url.clone().and_then(|u| u.as_file_path()) {
|
||||||
debug!("syncing {:?} {}", operation, dest.display());
|
debug!("syncing {:?} {}", operation, dest.display());
|
||||||
match operation {
|
match operation {
|
||||||
SyncOperation::Push => {
|
SyncOperation::Push => {
|
||||||
@ -56,19 +64,20 @@ impl SyncedDir {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else if let Some(url) = self.url.clone().map(|u| u.url().clone()) {
|
||||||
let url = self.url.url();
|
|
||||||
let url = url.as_ref();
|
let url = url.as_ref();
|
||||||
debug!("syncing {:?} {}", operation, dir.display());
|
debug!("syncing {:?} {}", operation, dir.display());
|
||||||
match operation {
|
match operation {
|
||||||
SyncOperation::Push => az_copy::sync(dir, url, delete_dst).await,
|
SyncOperation::Push => az_copy::sync(dir, url, delete_dst).await,
|
||||||
SyncOperation::Pull => az_copy::sync(url, dir, delete_dst).await,
|
SyncOperation::Pull => az_copy::sync(url, dir, delete_dst).await,
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_url(&self) -> Result<&BlobContainerUrl> {
|
pub fn try_url(&self) -> Option<BlobContainerUrl> {
|
||||||
Ok(&self.url)
|
self.url.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init_pull(&self) -> Result<()> {
|
pub async fn init_pull(&self) -> Result<()> {
|
||||||
@ -77,7 +86,7 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init(&self) -> Result<()> {
|
pub async fn init(&self) -> Result<()> {
|
||||||
if let Some(remote_path) = self.url.as_file_path() {
|
if let Some(remote_path) = self.url.clone().and_then(|u| u.as_file_path()) {
|
||||||
fs::create_dir_all(remote_path).await?;
|
fs::create_dir_all(remote_path).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,7 +131,8 @@ impl SyncedDir {
|
|||||||
|
|
||||||
// Conditionally upload a report, if it would not be a duplicate.
|
// Conditionally upload a report, if it would not be a duplicate.
|
||||||
pub async fn upload<T: Serialize>(&self, name: &str, data: &T) -> Result<bool> {
|
pub async fn upload<T: Serialize>(&self, name: &str, data: &T) -> Result<bool> {
|
||||||
match self.url.as_file_path() {
|
if let Some(url) = self.url.clone() {
|
||||||
|
match url.as_file_path() {
|
||||||
Some(path) => {
|
Some(path) => {
|
||||||
let path = path.join(name);
|
let path = path.join(name);
|
||||||
if !exists(&path).await? {
|
if !exists(&path).await? {
|
||||||
@ -134,7 +144,7 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let url = self.url.blob(name).url();
|
let url = url.blob(name).url();
|
||||||
let blob = BlobClient::new();
|
let blob = BlobClient::new();
|
||||||
let result = blob
|
let result = blob
|
||||||
.put(url.clone())
|
.put(url.clone())
|
||||||
@ -156,14 +166,29 @@ impl SyncedDir {
|
|||||||
Ok(result.status() == StatusCode::CREATED)
|
Ok(result.status() == StatusCode::CREATED)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
let path = self.path.join(name);
|
||||||
|
if !exists(&path).await? {
|
||||||
|
let data = serde_json::to_vec(&data)?;
|
||||||
|
fs::write(path, data).await?;
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn file_monitor_event(&self, event: Event, ignore_dotfiles: bool) -> Result<()> {
|
async fn file_monitor_event(
|
||||||
debug!("monitoring {}", self.path.display());
|
path: PathBuf,
|
||||||
let mut monitor = DirectoryMonitor::new(self.path.clone());
|
url: BlobContainerUrl,
|
||||||
|
event: Event,
|
||||||
|
ignore_dotfiles: bool,
|
||||||
|
) -> Result<()> {
|
||||||
|
debug!("monitoring {}", path.display());
|
||||||
|
let mut monitor = DirectoryMonitor::new(path.clone());
|
||||||
monitor.start()?;
|
monitor.start()?;
|
||||||
|
|
||||||
if let Some(path) = self.url.as_file_path() {
|
if let Some(path) = url.as_file_path() {
|
||||||
fs::create_dir_all(&path).await?;
|
fs::create_dir_all(&path).await?;
|
||||||
|
|
||||||
while let Some(item) = monitor.next().await {
|
while let Some(item) = monitor.next().await {
|
||||||
@ -192,7 +217,7 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut uploader = BlobUploader::new(self.url.url().clone());
|
let mut uploader = BlobUploader::new(url.url().clone());
|
||||||
|
|
||||||
while let Some(item) = monitor.next().await {
|
while let Some(item) = monitor.next().await {
|
||||||
let file_name = item
|
let file_name = item
|
||||||
@ -207,7 +232,7 @@ impl SyncedDir {
|
|||||||
let error_message = format!(
|
let error_message = format!(
|
||||||
"Couldn't upload file. path:{} dir:{} err:{}",
|
"Couldn't upload file. path:{} dir:{} err:{}",
|
||||||
item.display(),
|
item.display(),
|
||||||
self.path.display(),
|
path.display(),
|
||||||
err
|
err
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -235,6 +260,7 @@ impl SyncedDir {
|
|||||||
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
|
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
|
||||||
/// a directory, and may reset it.
|
/// a directory, and may reset it.
|
||||||
pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> {
|
pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> {
|
||||||
|
if let Some(url) = self.url.clone() {
|
||||||
loop {
|
loop {
|
||||||
debug!("waiting to monitor {}", self.path.display());
|
debug!("waiting to monitor {}", self.path.display());
|
||||||
|
|
||||||
@ -244,9 +270,17 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
|
|
||||||
debug!("starting monitor for {}", self.path.display());
|
debug!("starting monitor for {}", self.path.display());
|
||||||
self.file_monitor_event(event.clone(), ignore_dotfiles)
|
Self::file_monitor_event(
|
||||||
|
self.path.clone(),
|
||||||
|
url.clone(),
|
||||||
|
event.clone(),
|
||||||
|
ignore_dotfiles,
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user