mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-17 20:38:06 +00:00
refactor SyncDir and blob container url (#809)
This commit is contained in:
@ -172,11 +172,14 @@ pub fn get_synced_dirs(
|
|||||||
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
|
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
|
||||||
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
|
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
|
||||||
Ok(SyncedDir {
|
Ok(SyncedDir {
|
||||||
url: Some(remote_blob_url),
|
remote_path: Some(remote_blob_url),
|
||||||
path,
|
local_path: path,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Ok(SyncedDir { url: None, path })
|
Ok(SyncedDir {
|
||||||
|
remote_path: None,
|
||||||
|
local_path: path,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
@ -195,13 +198,13 @@ pub fn get_synced_dir(
|
|||||||
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
|
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
|
||||||
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
|
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
|
||||||
Ok(SyncedDir {
|
Ok(SyncedDir {
|
||||||
url: Some(remote_blob_url),
|
remote_path: Some(remote_blob_url),
|
||||||
path,
|
local_path: path,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Ok(SyncedDir {
|
Ok(SyncedDir {
|
||||||
url: None,
|
remote_path: None,
|
||||||
path: remote_path,
|
local_path: remote_path,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ pub struct Config {
|
|||||||
pub async fn run(config: Config) -> Result<()> {
|
pub async fn run(config: Config) -> Result<()> {
|
||||||
let task_dir = config
|
let task_dir = config
|
||||||
.analysis
|
.analysis
|
||||||
.path
|
.local_path
|
||||||
.parent()
|
.parent()
|
||||||
.ok_or_else(|| anyhow!("Invalid input path"))?;
|
.ok_or_else(|| anyhow!("Invalid input path"))?;
|
||||||
let temp_path = task_dir.join(".temp");
|
let temp_path = task_dir.join(".temp");
|
||||||
@ -94,7 +94,7 @@ pub async fn run(config: Config) -> Result<()> {
|
|||||||
(None, None)
|
(None, None)
|
||||||
};
|
};
|
||||||
|
|
||||||
set_executable(&config.tools.path).await?;
|
set_executable(&config.tools.local_path).await?;
|
||||||
run_existing(&config, &reports_path).await?;
|
run_existing(&config, &reports_path).await?;
|
||||||
let poller = poll_inputs(&config, tmp, &reports_path);
|
let poller = poll_inputs(&config, tmp, &reports_path);
|
||||||
|
|
||||||
@ -114,7 +114,7 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<
|
|||||||
if let Some(crashes) = &config.crashes {
|
if let Some(crashes) = &config.crashes {
|
||||||
crashes.init_pull().await?;
|
crashes.init_pull().await?;
|
||||||
let mut count: u64 = 0;
|
let mut count: u64 = 0;
|
||||||
let mut read_dir = fs::read_dir(&crashes.path).await?;
|
let mut read_dir = fs::read_dir(&crashes.local_path).await?;
|
||||||
while let Some(file) = read_dir.next_entry().await? {
|
while let Some(file) = read_dir.next_entry().await? {
|
||||||
debug!("Processing file {:?}", file);
|
debug!("Processing file {:?}", file);
|
||||||
run_tool(file.path(), &config, &reports_dir).await?;
|
run_tool(file.path(), &config, &reports_dir).await?;
|
||||||
@ -128,9 +128,9 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<
|
|||||||
|
|
||||||
async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
|
async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
|
||||||
let result = if let Some(crashes) = &config.crashes {
|
let result = if let Some(crashes) = &config.crashes {
|
||||||
crashes.url.clone().and_then(|u| u.account()) == input.account()
|
crashes.remote_path.clone().and_then(|u| u.account()) == input.account()
|
||||||
&& crashes.url.clone().and_then(|u| u.container()) == input.container()
|
&& crashes.remote_path.clone().and_then(|u| u.container()) == input.container()
|
||||||
&& crashes.path.join(input.name()).exists()
|
&& crashes.local_path.join(input.name()).exists()
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
@ -193,8 +193,8 @@ pub async fn run_tool(
|
|||||||
.target_options(&config.target_options)
|
.target_options(&config.target_options)
|
||||||
.analyzer_exe(&config.analyzer_exe)
|
.analyzer_exe(&config.analyzer_exe)
|
||||||
.analyzer_options(&config.analyzer_options)
|
.analyzer_options(&config.analyzer_options)
|
||||||
.output_dir(&config.analysis.path)
|
.output_dir(&config.analysis.local_path)
|
||||||
.tools_dir(&config.tools.path)
|
.tools_dir(&config.tools.local_path)
|
||||||
.setup_dir(&config.common.setup_dir)
|
.setup_dir(&config.common.setup_dir)
|
||||||
.job_id(&config.common.job_id)
|
.job_id(&config.common.job_id)
|
||||||
.task_id(&config.common.task_id)
|
.task_id(&config.common.task_id)
|
||||||
@ -210,11 +210,11 @@ pub async fn run_tool(
|
|||||||
.set_optional_ref(&config.crashes, |tester, crashes| {
|
.set_optional_ref(&config.crashes, |tester, crashes| {
|
||||||
tester
|
tester
|
||||||
.set_optional_ref(
|
.set_optional_ref(
|
||||||
&crashes.url.clone().and_then(|u| u.account()),
|
&crashes.remote_path.clone().and_then(|u| u.account()),
|
||||||
|tester, account| tester.crashes_account(account),
|
|tester, account| tester.crashes_account(account),
|
||||||
)
|
)
|
||||||
.set_optional_ref(
|
.set_optional_ref(
|
||||||
&crashes.url.clone().and_then(|u| u.container()),
|
&crashes.remote_path.clone().and_then(|u| u.container()),
|
||||||
|tester, container| tester.crashes_container(container),
|
|tester, container| tester.crashes_container(container),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
@ -116,7 +116,7 @@ impl CoverageTask {
|
|||||||
let mut seen_inputs = false;
|
let mut seen_inputs = false;
|
||||||
// Update the total with the coverage from each seed corpus.
|
// Update the total with the coverage from each seed corpus.
|
||||||
for dir in &self.config.readonly_inputs {
|
for dir in &self.config.readonly_inputs {
|
||||||
debug!("recording coverage for {}", dir.path.display());
|
debug!("recording coverage for {}", dir.local_path.display());
|
||||||
dir.init_pull().await?;
|
dir.init_pull().await?;
|
||||||
if self.record_corpus_coverage(&mut processor, dir).await? {
|
if self.record_corpus_coverage(&mut processor, dir).await? {
|
||||||
seen_inputs = true;
|
seen_inputs = true;
|
||||||
@ -150,12 +150,14 @@ impl CoverageTask {
|
|||||||
processor: &mut CoverageProcessor,
|
processor: &mut CoverageProcessor,
|
||||||
corpus_dir: &SyncedDir,
|
corpus_dir: &SyncedDir,
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
let mut corpus = fs::read_dir(&corpus_dir.path).await.with_context(|| {
|
let mut corpus = fs::read_dir(&corpus_dir.local_path)
|
||||||
format!(
|
.await
|
||||||
"unable to read corpus coverage directory: {}",
|
.with_context(|| {
|
||||||
corpus_dir.path.display()
|
format!(
|
||||||
)
|
"unable to read corpus coverage directory: {}",
|
||||||
})?;
|
corpus_dir.local_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
let mut seen_inputs = false;
|
let mut seen_inputs = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -187,7 +189,7 @@ pub struct CoverageProcessor {
|
|||||||
impl CoverageProcessor {
|
impl CoverageProcessor {
|
||||||
pub async fn new(config: Arc<Config>) -> Result<Self> {
|
pub async fn new(config: Arc<Config>) -> Result<Self> {
|
||||||
let heartbeat_client = config.common.init_heartbeat().await?;
|
let heartbeat_client = config.common.init_heartbeat().await?;
|
||||||
let total = TotalCoverage::new(config.coverage.path.join(TOTAL_COVERAGE));
|
let total = TotalCoverage::new(config.coverage.local_path.join(TOTAL_COVERAGE));
|
||||||
let recorder = CoverageRecorder::new(config.clone()).await?;
|
let recorder = CoverageRecorder::new(config.clone()).await?;
|
||||||
let module_totals = BTreeMap::default();
|
let module_totals = BTreeMap::default();
|
||||||
|
|
||||||
@ -209,7 +211,7 @@ impl CoverageProcessor {
|
|||||||
debug!("updating module info {:?}", module);
|
debug!("updating module info {:?}", module);
|
||||||
|
|
||||||
if !self.module_totals.contains_key(&module) {
|
if !self.module_totals.contains_key(&module) {
|
||||||
let parent = &self.config.coverage.path.join("by-module");
|
let parent = &self.config.coverage.local_path.join("by-module");
|
||||||
fs::create_dir_all(parent).await.with_context(|| {
|
fs::create_dir_all(parent).await.with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"unable to create by-module coverage directory: {}",
|
"unable to create by-module coverage directory: {}",
|
||||||
|
@ -98,7 +98,7 @@ impl CoverageRecorder {
|
|||||||
|
|
||||||
let coverage_path = {
|
let coverage_path = {
|
||||||
let digest = digest_file(test_input).await?;
|
let digest = digest_file(test_input).await?;
|
||||||
self.config.coverage.path.join("inputs").join(digest)
|
self.config.coverage.local_path.join("inputs").join(digest)
|
||||||
};
|
};
|
||||||
|
|
||||||
fs::create_dir_all(&coverage_path).await.with_context(|| {
|
fs::create_dir_all(&coverage_path).await.with_context(|| {
|
||||||
|
@ -64,7 +64,7 @@ impl GeneratorTask {
|
|||||||
self.config.crashes.init().await?;
|
self.config.crashes.init().await?;
|
||||||
if let Some(tools) = &self.config.tools {
|
if let Some(tools) = &self.config.tools {
|
||||||
tools.init_pull().await?;
|
tools.init_pull().await?;
|
||||||
set_executable(&tools.path).await?;
|
set_executable(&tools.local_path).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let hb_client = self.config.common.init_heartbeat().await?;
|
let hb_client = self.config.common.init_heartbeat().await?;
|
||||||
@ -104,7 +104,7 @@ impl GeneratorTask {
|
|||||||
loop {
|
loop {
|
||||||
for corpus_dir in &self.config.readonly_inputs {
|
for corpus_dir in &self.config.readonly_inputs {
|
||||||
heartbeat_client.alive();
|
heartbeat_client.alive();
|
||||||
let corpus_dir = &corpus_dir.path;
|
let corpus_dir = &corpus_dir.local_path;
|
||||||
let generated_inputs = tempdir()?;
|
let generated_inputs = tempdir()?;
|
||||||
let generated_inputs_path = generated_inputs.path();
|
let generated_inputs_path = generated_inputs.path();
|
||||||
|
|
||||||
@ -131,7 +131,7 @@ impl GeneratorTask {
|
|||||||
file.file_name()
|
file.file_name()
|
||||||
};
|
};
|
||||||
|
|
||||||
let destination_file = self.config.crashes.path.join(destination_file);
|
let destination_file = self.config.crashes.local_path.join(destination_file);
|
||||||
if tester.is_crash(file.path()).await? {
|
if tester.is_crash(file.path()).await? {
|
||||||
fs::rename(file.path(), &destination_file).await?;
|
fs::rename(file.path(), &destination_file).await?;
|
||||||
debug!("crash found {}", destination_file.display());
|
debug!("crash found {}", destination_file.display());
|
||||||
@ -162,7 +162,7 @@ impl GeneratorTask {
|
|||||||
tester.instance_telemetry_key(&key)
|
tester.instance_telemetry_key(&key)
|
||||||
})
|
})
|
||||||
.set_optional_ref(&self.config.tools, |expand, tools| {
|
.set_optional_ref(&self.config.tools, |expand, tools| {
|
||||||
expand.tools_dir(&tools.path)
|
expand.tools_dir(&tools.local_path)
|
||||||
});
|
});
|
||||||
|
|
||||||
let generator_path = expand.evaluate_value(&self.config.generator_exe)?;
|
let generator_path = expand.evaluate_value(&self.config.generator_exe)?;
|
||||||
@ -240,20 +240,20 @@ mod tests {
|
|||||||
generator_exe: String::from("{tools_dir}/radamsa"),
|
generator_exe: String::from("{tools_dir}/radamsa"),
|
||||||
generator_options,
|
generator_options,
|
||||||
readonly_inputs: vec![SyncedDir {
|
readonly_inputs: vec![SyncedDir {
|
||||||
path: readonly_inputs_local,
|
local_path: readonly_inputs_local,
|
||||||
url: Some(BlobContainerUrl::parse(
|
remote_path: Some(BlobContainerUrl::parse(
|
||||||
Url::from_directory_path(inputs).unwrap(),
|
Url::from_directory_path(inputs).unwrap(),
|
||||||
)?),
|
)?),
|
||||||
}],
|
}],
|
||||||
crashes: SyncedDir {
|
crashes: SyncedDir {
|
||||||
path: crashes_local,
|
local_path: crashes_local,
|
||||||
url: Some(BlobContainerUrl::parse(
|
remote_path: Some(BlobContainerUrl::parse(
|
||||||
Url::from_directory_path(crashes).unwrap(),
|
Url::from_directory_path(crashes).unwrap(),
|
||||||
)?),
|
)?),
|
||||||
},
|
},
|
||||||
tools: Some(SyncedDir {
|
tools: Some(SyncedDir {
|
||||||
path: tools_local,
|
local_path: tools_local,
|
||||||
url: Some(BlobContainerUrl::parse(
|
remote_path: Some(BlobContainerUrl::parse(
|
||||||
Url::from_directory_path(radamsa_dir).unwrap(),
|
Url::from_directory_path(radamsa_dir).unwrap(),
|
||||||
)?),
|
)?),
|
||||||
}),
|
}),
|
||||||
|
@ -104,9 +104,12 @@ impl LibFuzzerFuzzTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn verify(&self) -> Result<()> {
|
pub async fn verify(&self) -> Result<()> {
|
||||||
let mut directories = vec![self.config.inputs.path.clone()];
|
let mut directories = vec![self.config.inputs.local_path.clone()];
|
||||||
if let Some(readonly_inputs) = &self.config.readonly_inputs {
|
if let Some(readonly_inputs) = &self.config.readonly_inputs {
|
||||||
let mut dirs = readonly_inputs.iter().map(|x| x.path.clone()).collect();
|
let mut dirs = readonly_inputs
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.local_path.clone())
|
||||||
|
.collect();
|
||||||
directories.append(&mut dirs);
|
directories.append(&mut dirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,7 +139,7 @@ impl LibFuzzerFuzzTask {
|
|||||||
let task_dir = self
|
let task_dir = self
|
||||||
.config
|
.config
|
||||||
.inputs
|
.inputs
|
||||||
.path
|
.local_path
|
||||||
.parent()
|
.parent()
|
||||||
.ok_or_else(|| anyhow!("Invalid input path"))?;
|
.ok_or_else(|| anyhow!("Invalid input path"))?;
|
||||||
let temp_path = task_dir.join(".temp");
|
let temp_path = task_dir.join(".temp");
|
||||||
@ -161,7 +164,12 @@ impl LibFuzzerFuzzTask {
|
|||||||
|
|
||||||
let mut entries = tokio::fs::read_dir(local_input_dir.path()).await?;
|
let mut entries = tokio::fs::read_dir(local_input_dir.path()).await?;
|
||||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||||
let destination_path = self.config.inputs.path.clone().join(entry.file_name());
|
let destination_path = self
|
||||||
|
.config
|
||||||
|
.inputs
|
||||||
|
.local_path
|
||||||
|
.clone()
|
||||||
|
.join(entry.file_name());
|
||||||
tokio::fs::rename(&entry.path(), &destination_path)
|
tokio::fs::rename(&entry.path(), &destination_path)
|
||||||
.await
|
.await
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
@ -189,9 +197,11 @@ impl LibFuzzerFuzzTask {
|
|||||||
|
|
||||||
debug!("starting fuzzer run, run_id = {}", run_id);
|
debug!("starting fuzzer run, run_id = {}", run_id);
|
||||||
|
|
||||||
let mut inputs = vec![&self.config.inputs.path];
|
let mut inputs = vec![&self.config.inputs.local_path];
|
||||||
if let Some(readonly_inputs) = &self.config.readonly_inputs {
|
if let Some(readonly_inputs) = &self.config.readonly_inputs {
|
||||||
readonly_inputs.iter().for_each(|d| inputs.push(&d.path));
|
readonly_inputs
|
||||||
|
.iter()
|
||||||
|
.for_each(|d| inputs.push(&d.local_path));
|
||||||
}
|
}
|
||||||
|
|
||||||
let fuzzer = LibFuzzer::new(
|
let fuzzer = LibFuzzer::new(
|
||||||
@ -262,7 +272,7 @@ impl LibFuzzerFuzzTask {
|
|||||||
|
|
||||||
for file in &files {
|
for file in &files {
|
||||||
if let Some(filename) = file.file_name() {
|
if let Some(filename) = file.file_name() {
|
||||||
let dest = self.config.crashes.path.join(filename);
|
let dest = self.config.crashes.local_path.join(filename);
|
||||||
if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await {
|
if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await {
|
||||||
if !dest.exists() {
|
if !dest.exists() {
|
||||||
bail!(e)
|
bail!(e)
|
||||||
|
@ -62,13 +62,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
|
|||||||
// setup tools
|
// setup tools
|
||||||
if let Some(tools) = &config.tools {
|
if let Some(tools) = &config.tools {
|
||||||
tools.init_pull().await?;
|
tools.init_pull().await?;
|
||||||
set_executable(&tools.path).await?;
|
set_executable(&tools.local_path).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup crashes
|
// setup crashes
|
||||||
let crashes = SyncedDir {
|
let crashes = SyncedDir {
|
||||||
path: runtime_dir.path().join("crashes"),
|
local_path: runtime_dir.path().join("crashes"),
|
||||||
url: config.crashes.url.clone(),
|
remote_path: config.crashes.remote_path.clone(),
|
||||||
};
|
};
|
||||||
crashes.init().await?;
|
crashes.init().await?;
|
||||||
let monitor_crashes = crashes.monitor_results(new_result, false);
|
let monitor_crashes = crashes.monitor_results(new_result, false);
|
||||||
@ -92,8 +92,8 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let inputs = SyncedDir {
|
let inputs = SyncedDir {
|
||||||
path: runtime_dir.path().join("inputs"),
|
local_path: runtime_dir.path().join("inputs"),
|
||||||
url: config.inputs.url.clone(),
|
remote_path: config.inputs.remote_path.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
inputs.init().await?;
|
inputs.init().await?;
|
||||||
@ -105,7 +105,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
|
|||||||
let delay = std::time::Duration::from_secs(10);
|
let delay = std::time::Duration::from_secs(10);
|
||||||
loop {
|
loop {
|
||||||
dir.sync_pull().await?;
|
dir.sync_pull().await?;
|
||||||
if has_files(&dir.path).await? {
|
if has_files(&dir.local_path).await? {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
delay_with_jitter(delay).await;
|
delay_with_jitter(delay).await;
|
||||||
@ -177,13 +177,15 @@ async fn start_supervisor(
|
|||||||
.supervisor_exe(&config.supervisor_exe)
|
.supervisor_exe(&config.supervisor_exe)
|
||||||
.supervisor_options(&config.supervisor_options)
|
.supervisor_options(&config.supervisor_options)
|
||||||
.runtime_dir(&runtime_dir)
|
.runtime_dir(&runtime_dir)
|
||||||
.crashes(&crashes.path)
|
.crashes(&crashes.local_path)
|
||||||
.input_corpus(&inputs.path)
|
.input_corpus(&inputs.local_path)
|
||||||
.reports_dir(&reports_dir)
|
.reports_dir(&reports_dir)
|
||||||
.setup_dir(&config.common.setup_dir)
|
.setup_dir(&config.common.setup_dir)
|
||||||
.job_id(&config.common.job_id)
|
.job_id(&config.common.job_id)
|
||||||
.task_id(&config.common.task_id)
|
.task_id(&config.common.task_id)
|
||||||
.set_optional_ref(&config.tools, |expand, tools| expand.tools_dir(&tools.path))
|
.set_optional_ref(&config.tools, |expand, tools| {
|
||||||
|
expand.tools_dir(&tools.local_path)
|
||||||
|
})
|
||||||
.set_optional_ref(&config.target_exe, |expand, target_exe| {
|
.set_optional_ref(&config.target_exe, |expand, target_exe| {
|
||||||
expand.target_exe(target_exe)
|
expand.target_exe(target_exe)
|
||||||
})
|
})
|
||||||
@ -200,11 +202,15 @@ async fn start_supervisor(
|
|||||||
tester.instance_telemetry_key(&key)
|
tester.instance_telemetry_key(&key)
|
||||||
})
|
})
|
||||||
.set_optional_ref(
|
.set_optional_ref(
|
||||||
&config.crashes.url.clone().and_then(|u| u.account()),
|
&config.crashes.remote_path.clone().and_then(|u| u.account()),
|
||||||
|tester, account| tester.crashes_account(account),
|
|tester, account| tester.crashes_account(account),
|
||||||
)
|
)
|
||||||
.set_optional_ref(
|
.set_optional_ref(
|
||||||
&config.crashes.url.clone().and_then(|u| u.container()),
|
&config
|
||||||
|
.crashes
|
||||||
|
.remote_path
|
||||||
|
.clone()
|
||||||
|
.and_then(|u| u.container()),
|
||||||
|tester, container| tester.crashes_container(container),
|
|tester, container| tester.crashes_container(container),
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -286,21 +292,21 @@ mod tests {
|
|||||||
let crashes_local = tempfile::tempdir().unwrap().path().into();
|
let crashes_local = tempfile::tempdir().unwrap().path().into();
|
||||||
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
|
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
|
||||||
let crashes = SyncedDir {
|
let crashes = SyncedDir {
|
||||||
path: crashes_local,
|
local_path: crashes_local,
|
||||||
url: Some(
|
remote_path: Some(
|
||||||
BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(),
|
BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
let corpus_dir_temp = tempfile::tempdir().unwrap();
|
let corpus_dir_temp = tempfile::tempdir().unwrap();
|
||||||
let corpus_dir = SyncedDir {
|
let corpus_dir = SyncedDir {
|
||||||
path: corpus_dir_local,
|
local_path: corpus_dir_local,
|
||||||
url: Some(
|
remote_path: Some(
|
||||||
BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
|
BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
let seed_file_name = corpus_dir.path.join("seed.txt");
|
let seed_file_name = corpus_dir.local_path.join("seed.txt");
|
||||||
tokio::fs::write(seed_file_name, "xyz").await.unwrap();
|
tokio::fs::write(seed_file_name, "xyz").await.unwrap();
|
||||||
|
|
||||||
let target_options = Some(vec!["{input}".to_owned()]);
|
let target_options = Some(vec!["{input}".to_owned()]);
|
||||||
@ -349,7 +355,7 @@ mod tests {
|
|||||||
let notify = Notify::new();
|
let notify = Notify::new();
|
||||||
let _fuzzing_monitor =
|
let _fuzzing_monitor =
|
||||||
monitor_process(process, "supervisor".to_string(), false, Some(¬ify));
|
monitor_process(process, "supervisor".to_string(), false, Some(¬ify));
|
||||||
let stat_output = crashes.path.join("fuzzer_stats");
|
let stat_output = crashes.local_path.join("fuzzer_stats");
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
loop {
|
loop {
|
||||||
if has_stats(&stat_output).await {
|
if has_stats(&stat_output).await {
|
||||||
|
@ -128,10 +128,10 @@ impl<M> InputPoller<M> {
|
|||||||
info!(
|
info!(
|
||||||
"batch processing directory: {} - {}",
|
"batch processing directory: {} - {}",
|
||||||
self.name,
|
self.name,
|
||||||
to_process.path.display()
|
to_process.local_path.display()
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut read_dir = fs::read_dir(&to_process.path).await?;
|
let mut read_dir = fs::read_dir(&to_process.local_path).await?;
|
||||||
while let Some(file) = read_dir.next_entry().await? {
|
while let Some(file) = read_dir.next_entry().await? {
|
||||||
let path = file.path();
|
let path = file.path();
|
||||||
info!(
|
info!(
|
||||||
@ -143,7 +143,7 @@ impl<M> InputPoller<M> {
|
|||||||
// Compute the file name relative to the synced directory, and thus the
|
// Compute the file name relative to the synced directory, and thus the
|
||||||
// container.
|
// container.
|
||||||
let blob_name = {
|
let blob_name = {
|
||||||
let dir_path = to_process.path.canonicalize()?;
|
let dir_path = to_process.local_path.canonicalize()?;
|
||||||
let input_path = path.canonicalize()?;
|
let input_path = path.canonicalize()?;
|
||||||
let dir_relative = input_path.strip_prefix(&dir_path)?;
|
let dir_relative = input_path.strip_prefix(&dir_path)?;
|
||||||
dir_relative.display().to_string()
|
dir_relative.display().to_string()
|
||||||
@ -161,7 +161,7 @@ impl<M> InputPoller<M> {
|
|||||||
if let Ok(blob) = BlobUrl::new(url.clone()) {
|
if let Ok(blob) = BlobUrl::new(url.clone()) {
|
||||||
batch_dir.try_url().and_then(|u| u.account()) == blob.account()
|
batch_dir.try_url().and_then(|u| u.account()) == blob.account()
|
||||||
&& batch_dir.try_url().and_then(|u| u.container()) == blob.container()
|
&& batch_dir.try_url().and_then(|u| u.container()) == blob.container()
|
||||||
&& batch_dir.path.join(blob.name()).exists()
|
&& batch_dir.local_path.join(blob.name()).exists()
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ pub struct Config {
|
|||||||
|
|
||||||
pub async fn spawn(config: Arc<Config>) -> Result<()> {
|
pub async fn spawn(config: Arc<Config>) -> Result<()> {
|
||||||
config.tools.init_pull().await?;
|
config.tools.init_pull().await?;
|
||||||
set_executable(&config.tools.path).await?;
|
set_executable(&config.tools.local_path).await?;
|
||||||
|
|
||||||
config.unique_inputs.init().await?;
|
config.unique_inputs.init().await?;
|
||||||
let hb_client = config.common.init_heartbeat().await?;
|
let hb_client = config.common.init_heartbeat().await?;
|
||||||
@ -94,7 +94,8 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn process_message(config: Arc<Config>, input_url: &Url, tmp_dir: &Path) -> Result<()> {
|
async fn process_message(config: Arc<Config>, input_url: &Url, tmp_dir: &Path) -> Result<()> {
|
||||||
let input_path = utils::download_input(input_url.clone(), &config.unique_inputs.path).await?;
|
let input_path =
|
||||||
|
utils::download_input(input_url.clone(), &config.unique_inputs.local_path).await?;
|
||||||
info!("downloaded input to {}", input_path.display());
|
info!("downloaded input to {}", input_path.display());
|
||||||
|
|
||||||
info!("Merging corpus");
|
info!("Merging corpus");
|
||||||
@ -105,8 +106,8 @@ async fn process_message(config: Arc<Config>, input_url: &Url, tmp_dir: &Path) -
|
|||||||
queue_dir.push("queue");
|
queue_dir.push("queue");
|
||||||
let _delete_output = tokio::fs::remove_dir_all(queue_dir).await;
|
let _delete_output = tokio::fs::remove_dir_all(queue_dir).await;
|
||||||
let synced_dir = SyncedDir {
|
let synced_dir = SyncedDir {
|
||||||
path: tmp_dir.to_path_buf(),
|
local_path: tmp_dir.to_path_buf(),
|
||||||
url: config.unique_inputs.url.clone(),
|
remote_path: config.unique_inputs.remote_path.clone(),
|
||||||
};
|
};
|
||||||
synced_dir.sync_push().await?
|
synced_dir.sync_push().await?
|
||||||
}
|
}
|
||||||
@ -131,14 +132,14 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
|
|||||||
async fn merge(config: &Config, output_dir: impl AsRef<Path>) -> Result<()> {
|
async fn merge(config: &Config, output_dir: impl AsRef<Path>) -> Result<()> {
|
||||||
let expand = Expand::new()
|
let expand = Expand::new()
|
||||||
.input_marker(&config.supervisor_input_marker)
|
.input_marker(&config.supervisor_input_marker)
|
||||||
.input_corpus(&config.unique_inputs.path)
|
.input_corpus(&config.unique_inputs.local_path)
|
||||||
.target_options(&config.target_options)
|
.target_options(&config.target_options)
|
||||||
.supervisor_exe(&config.supervisor_exe)
|
.supervisor_exe(&config.supervisor_exe)
|
||||||
.supervisor_options(&config.supervisor_options)
|
.supervisor_options(&config.supervisor_options)
|
||||||
.generated_inputs(output_dir)
|
.generated_inputs(output_dir)
|
||||||
.target_exe(&config.target_exe)
|
.target_exe(&config.target_exe)
|
||||||
.setup_dir(&config.common.setup_dir)
|
.setup_dir(&config.common.setup_dir)
|
||||||
.tools_dir(&config.tools.path)
|
.tools_dir(&config.tools.local_path)
|
||||||
.job_id(&config.common.job_id)
|
.job_id(&config.common.job_id)
|
||||||
.task_id(&config.common.task_id)
|
.task_id(&config.common.task_id)
|
||||||
.set_optional_ref(&config.common.microsoft_telemetry_key, |tester, key| {
|
.set_optional_ref(&config.common.microsoft_telemetry_key, |tester, key| {
|
||||||
|
@ -70,7 +70,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
|
|||||||
input.init().await?;
|
input.init().await?;
|
||||||
input.sync_pull().await?;
|
input.sync_pull().await?;
|
||||||
}
|
}
|
||||||
let input_paths = config.inputs.iter().map(|i| &i.path).collect();
|
let input_paths = config.inputs.iter().map(|i| &i.local_path).collect();
|
||||||
sync_and_merge(
|
sync_and_merge(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
input_paths,
|
input_paths,
|
||||||
@ -166,7 +166,9 @@ pub async fn merge_inputs(
|
|||||||
&config.target_env,
|
&config.target_env,
|
||||||
&config.common.setup_dir,
|
&config.common.setup_dir,
|
||||||
);
|
);
|
||||||
merger.merge(&config.unique_inputs.path, &candidates).await
|
merger
|
||||||
|
.merge(&config.unique_inputs.local_path, &candidates)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_delete_blob(input_url: Url) -> Result<()> {
|
async fn try_delete_blob(input_url: Url) -> Result<()> {
|
||||||
|
@ -73,7 +73,7 @@ pub async fn handle_inputs(
|
|||||||
heartbeat_client: &Option<TaskHeartbeatClient>,
|
heartbeat_client: &Option<TaskHeartbeatClient>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
readonly_inputs.init_pull().await?;
|
readonly_inputs.init_pull().await?;
|
||||||
let mut input_files = tokio::fs::read_dir(&readonly_inputs.path).await?;
|
let mut input_files = tokio::fs::read_dir(&readonly_inputs.local_path).await?;
|
||||||
while let Some(file) = input_files.next_entry().await? {
|
while let Some(file) = input_files.next_entry().await? {
|
||||||
heartbeat_client.alive();
|
heartbeat_client.alive();
|
||||||
|
|
||||||
@ -88,7 +88,7 @@ pub async fn handle_inputs(
|
|||||||
.to_string_lossy()
|
.to_string_lossy()
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
let input_url = readonly_inputs.remote_url()?.url().join(&file_name)?;
|
let input_url = readonly_inputs.remote_url()?.url()?.join(&file_name)?;
|
||||||
|
|
||||||
let crash_test_result = handler.get_crash_result(file_path, input_url).await?;
|
let crash_test_result = handler.get_crash_result(file_path, input_url).await?;
|
||||||
RegressionReport {
|
RegressionReport {
|
||||||
@ -120,7 +120,7 @@ pub async fn handle_crash_reports(
|
|||||||
for possible_dir in report_dirs {
|
for possible_dir in report_dirs {
|
||||||
possible_dir.init_pull().await?;
|
possible_dir.init_pull().await?;
|
||||||
|
|
||||||
let mut report_files = tokio::fs::read_dir(&possible_dir.path).await?;
|
let mut report_files = tokio::fs::read_dir(&possible_dir.local_path).await?;
|
||||||
while let Some(file) = report_files.next_entry().await? {
|
while let Some(file) = report_files.next_entry().await? {
|
||||||
heartbeat_client.alive();
|
heartbeat_client.alive();
|
||||||
let file_path = file.path();
|
let file_path = file.path();
|
||||||
@ -150,8 +150,8 @@ pub async fn handle_crash_reports(
|
|||||||
}
|
}
|
||||||
.ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?;
|
.ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?;
|
||||||
|
|
||||||
let input_url = crashes.remote_url()?.url().clone();
|
let input_url = crashes.remote_url()?.url()?;
|
||||||
let input = crashes.path.join(&input_blob.name);
|
let input = crashes.local_path.join(&input_blob.name);
|
||||||
let crash_test_result = handler.get_crash_result(input, input_url).await?;
|
let crash_test_result = handler.get_crash_result(input, input_url).await?;
|
||||||
|
|
||||||
RegressionReport {
|
RegressionReport {
|
||||||
|
@ -41,7 +41,7 @@ impl SetupRunner {
|
|||||||
work_set.save_context().await?;
|
work_set.save_context().await?;
|
||||||
|
|
||||||
// Download the setup container.
|
// Download the setup container.
|
||||||
let setup_url = work_set.setup_url.url();
|
let setup_url = work_set.setup_url.url()?;
|
||||||
let setup_dir = work_set.setup_dir()?;
|
let setup_dir = work_set.setup_dir()?;
|
||||||
|
|
||||||
// `azcopy sync` requires the local dir to exist.
|
// `azcopy sync` requires the local dir to exist.
|
||||||
|
@ -116,8 +116,9 @@ impl fmt::Display for BlobUrl {
|
|||||||
///
|
///
|
||||||
/// Use to validate a URL and address contained blobs.
|
/// Use to validate a URL and address contained blobs.
|
||||||
#[derive(Clone, Eq, PartialEq)]
|
#[derive(Clone, Eq, PartialEq)]
|
||||||
pub struct BlobContainerUrl {
|
pub enum BlobContainerUrl {
|
||||||
url: Url,
|
BlobContainer(Url),
|
||||||
|
Path(PathBuf),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlobContainerUrl {
|
impl BlobContainerUrl {
|
||||||
@ -126,11 +127,19 @@ impl BlobContainerUrl {
|
|||||||
bail!("Invalid container URL: {}", url);
|
bail!("Invalid container URL: {}", url);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Self { url })
|
if let Ok(path) = url.to_file_path() {
|
||||||
|
Ok(Self::Path(path))
|
||||||
|
} else {
|
||||||
|
Ok(Self::BlobContainer(url))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn as_file_path(&self) -> Option<PathBuf> {
|
pub fn as_file_path(&self) -> Option<PathBuf> {
|
||||||
self.url.to_file_path().ok()
|
if let Self::Path(p) = self {
|
||||||
|
Some(p.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse(url: impl AsRef<str>) -> Result<Self> {
|
pub fn parse(url: impl AsRef<str>) -> Result<Self> {
|
||||||
@ -139,55 +148,53 @@ impl BlobContainerUrl {
|
|||||||
Self::new(url)
|
Self::new(url)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn url(&self) -> &Url {
|
pub fn url(&self) -> Result<Url> {
|
||||||
&self.url
|
match self {
|
||||||
|
Self::BlobContainer(url) => Ok(url.clone()),
|
||||||
|
Self::Path(p) => Ok(Url::from_file_path(p).map_err(|_| anyhow!("invalid path"))?),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn account(&self) -> Option<String> {
|
pub fn account(&self) -> Option<String> {
|
||||||
if self.as_file_path().is_some() {
|
match self {
|
||||||
None
|
Self::BlobContainer(url) => {
|
||||||
} else {
|
// Ctor checks that domain has at least one subdomain.
|
||||||
// Ctor checks that domain has at least one subdomain.
|
Some(url.domain().unwrap().split('.').next().unwrap().to_owned())
|
||||||
Some(
|
}
|
||||||
self.url
|
Self::Path(_p) => None,
|
||||||
.domain()
|
|
||||||
.unwrap()
|
|
||||||
.split('.')
|
|
||||||
.next()
|
|
||||||
.unwrap()
|
|
||||||
.to_owned(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn container(&self) -> Option<String> {
|
pub fn container(&self) -> Option<String> {
|
||||||
if self.as_file_path().is_some() {
|
match self {
|
||||||
None
|
Self::BlobContainer(url) => {
|
||||||
} else {
|
Some(url.path_segments().unwrap().next().unwrap().to_owned())
|
||||||
Some(self.url.path_segments().unwrap().next().unwrap().to_owned())
|
}
|
||||||
|
Self::Path(_p) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn blob(&self, name: impl AsRef<str>) -> BlobUrl {
|
pub fn blob(&self, name: impl AsRef<str>) -> BlobUrl {
|
||||||
let mut url = self.url.clone();
|
match self {
|
||||||
name.as_ref().split('/').fold(
|
Self::BlobContainer(url) => {
|
||||||
&mut url.path_segments_mut().unwrap(), // Checked in ctor
|
let mut url = url.clone();
|
||||||
|segments, current| segments.push(current),
|
name.as_ref().split('/').fold(
|
||||||
);
|
&mut url.path_segments_mut().unwrap(), // Checked in ctor
|
||||||
|
|segments, current| segments.push(current),
|
||||||
BlobUrl::new(url).expect("invalid blob URL from valid container")
|
);
|
||||||
}
|
BlobUrl::AzureBlob(url)
|
||||||
}
|
}
|
||||||
|
Self::Path(p) => BlobUrl::LocalFile(p.join(name.as_ref())),
|
||||||
impl AsRef<Url> for BlobContainerUrl {
|
}
|
||||||
fn as_ref(&self) -> &Url {
|
|
||||||
self.url()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for BlobContainerUrl {
|
impl fmt::Debug for BlobContainerUrl {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
write!(f, "{}", redact_query_sas_sig(self.url()))
|
match self {
|
||||||
|
Self::BlobContainer(url) => write!(f, "{}", redact_query_sas_sig(url)),
|
||||||
|
Self::Path(p) => write!(f, "{}", p.display()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,12 +210,6 @@ impl fmt::Display for BlobContainerUrl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<BlobContainerUrl> for Url {
|
|
||||||
fn from(container: BlobContainerUrl) -> Self {
|
|
||||||
container.url
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn redact_query_sas_sig(url: &Url) -> Url {
|
fn redact_query_sas_sig(url: &Url) -> Url {
|
||||||
let mut redacted = url.clone();
|
let mut redacted = url.clone();
|
||||||
redacted.set_query(None);
|
redacted.set_query(None);
|
||||||
@ -292,8 +293,13 @@ impl Serialize for BlobContainerUrl {
|
|||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
let url = self.url.to_string();
|
match self {
|
||||||
serializer.serialize_str(&url)
|
Self::Path(p) => serializer.serialize_str(p.to_str().unwrap_or_default()),
|
||||||
|
Self::BlobContainer(url) => {
|
||||||
|
let url = url.to_string();
|
||||||
|
serializer.serialize_str(&url)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,22 +29,24 @@ const DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS: u64 = 60;
|
|||||||
|
|
||||||
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
||||||
pub struct SyncedDir {
|
pub struct SyncedDir {
|
||||||
pub path: PathBuf,
|
#[serde(alias = "local_path", alias = "path")]
|
||||||
pub url: Option<BlobContainerUrl>,
|
pub local_path: PathBuf,
|
||||||
|
#[serde(alias = "remote_path", alias = "url")]
|
||||||
|
pub remote_path: Option<BlobContainerUrl>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SyncedDir {
|
impl SyncedDir {
|
||||||
pub fn remote_url(&self) -> Result<BlobContainerUrl> {
|
pub fn remote_url(&self) -> Result<BlobContainerUrl> {
|
||||||
let url = self.url.clone().unwrap_or(BlobContainerUrl::new(
|
let url = self.remote_path.clone().unwrap_or(BlobContainerUrl::new(
|
||||||
Url::from_file_path(self.path.clone()).map_err(|_| anyhow!("invalid path"))?,
|
Url::from_file_path(self.local_path.clone()).map_err(|_| anyhow!("invalid path"))?,
|
||||||
)?);
|
)?);
|
||||||
Ok(url)
|
Ok(url)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
|
pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
|
||||||
let dir = &self.path.join("");
|
let dir = &self.local_path.join("");
|
||||||
|
|
||||||
if let Some(dest) = self.url.clone().and_then(|u| u.as_file_path()) {
|
if let Some(dest) = self.remote_path.clone().and_then(|u| u.as_file_path()) {
|
||||||
debug!("syncing {:?} {}", operation, dest.display());
|
debug!("syncing {:?} {}", operation, dest.display());
|
||||||
match operation {
|
match operation {
|
||||||
SyncOperation::Push => {
|
SyncOperation::Push => {
|
||||||
@ -64,7 +66,7 @@ impl SyncedDir {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if let Some(url) = self.url.clone().map(|u| u.url().clone()) {
|
} else if let Some(url) = self.remote_path.clone().and_then(|u| u.url().ok()) {
|
||||||
let url = url.as_ref();
|
let url = url.as_ref();
|
||||||
debug!("syncing {:?} {}", operation, dir.display());
|
debug!("syncing {:?} {}", operation, dir.display());
|
||||||
match operation {
|
match operation {
|
||||||
@ -77,7 +79,7 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_url(&self) -> Option<BlobContainerUrl> {
|
pub fn try_url(&self) -> Option<BlobContainerUrl> {
|
||||||
self.url.clone()
|
self.remote_path.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init_pull(&self) -> Result<()> {
|
pub async fn init_pull(&self) -> Result<()> {
|
||||||
@ -86,20 +88,26 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init(&self) -> Result<()> {
|
pub async fn init(&self) -> Result<()> {
|
||||||
if let Some(remote_path) = self.url.clone().and_then(|u| u.as_file_path()) {
|
if let Some(remote_path) = self.remote_path.clone().and_then(|u| u.as_file_path()) {
|
||||||
fs::create_dir_all(remote_path).await?;
|
fs::create_dir_all(remote_path).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
match fs::metadata(&self.path).await {
|
match fs::metadata(&self.local_path).await {
|
||||||
Ok(m) => {
|
Ok(m) => {
|
||||||
if m.is_dir() {
|
if m.is_dir() {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
anyhow::bail!("File with name '{}' already exists", self.path.display());
|
anyhow::bail!(
|
||||||
|
"File with name '{}' already exists",
|
||||||
|
self.local_path.display()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => fs::create_dir_all(&self.path).await.with_context(|| {
|
Err(_) => fs::create_dir_all(&self.local_path).await.with_context(|| {
|
||||||
format!("unable to create local SyncedDir: {}", self.path.display())
|
format!(
|
||||||
|
"unable to create local SyncedDir: {}",
|
||||||
|
self.local_path.display()
|
||||||
|
)
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,7 +139,7 @@ impl SyncedDir {
|
|||||||
|
|
||||||
// Conditionally upload a report, if it would not be a duplicate.
|
// Conditionally upload a report, if it would not be a duplicate.
|
||||||
pub async fn upload<T: Serialize>(&self, name: &str, data: &T) -> Result<bool> {
|
pub async fn upload<T: Serialize>(&self, name: &str, data: &T) -> Result<bool> {
|
||||||
if let Some(url) = self.url.clone() {
|
if let Some(url) = self.remote_path.clone() {
|
||||||
match url.as_file_path() {
|
match url.as_file_path() {
|
||||||
Some(path) => {
|
Some(path) => {
|
||||||
let path = path.join(name);
|
let path = path.join(name);
|
||||||
@ -167,7 +175,7 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let path = self.path.join(name);
|
let path = self.local_path.join(name);
|
||||||
if !exists(&path).await? {
|
if !exists(&path).await? {
|
||||||
let data = serde_json::to_vec(&data)?;
|
let data = serde_json::to_vec(&data)?;
|
||||||
fs::write(path, data).await?;
|
fs::write(path, data).await?;
|
||||||
@ -217,7 +225,7 @@ impl SyncedDir {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut uploader = BlobUploader::new(url.url().clone());
|
let mut uploader = BlobUploader::new(url.url()?);
|
||||||
|
|
||||||
while let Some(item) = monitor.next().await {
|
while let Some(item) = monitor.next().await {
|
||||||
let file_name = item
|
let file_name = item
|
||||||
@ -260,18 +268,21 @@ impl SyncedDir {
|
|||||||
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
|
/// to be initialized, but a user-supplied binary, (such as AFL) logically owns
|
||||||
/// a directory, and may reset it.
|
/// a directory, and may reset it.
|
||||||
pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> {
|
pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> {
|
||||||
if let Some(url) = self.url.clone() {
|
if let Some(url) = self.remote_path.clone() {
|
||||||
loop {
|
loop {
|
||||||
debug!("waiting to monitor {}", self.path.display());
|
debug!("waiting to monitor {}", self.local_path.display());
|
||||||
|
|
||||||
while fs::metadata(&self.path).await.is_err() {
|
while fs::metadata(&self.local_path).await.is_err() {
|
||||||
debug!("dir {} not ready to monitor, delaying", self.path.display());
|
debug!(
|
||||||
|
"dir {} not ready to monitor, delaying",
|
||||||
|
self.local_path.display()
|
||||||
|
);
|
||||||
delay_with_jitter(DELAY).await;
|
delay_with_jitter(DELAY).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("starting monitor for {}", self.path.display());
|
debug!("starting monitor for {}", self.local_path.display());
|
||||||
Self::file_monitor_event(
|
Self::file_monitor_event(
|
||||||
self.path.clone(),
|
self.local_path.clone(),
|
||||||
url.clone(),
|
url.clone(),
|
||||||
event.clone(),
|
event.clone(),
|
||||||
ignore_dotfiles,
|
ignore_dotfiles,
|
||||||
|
Reference in New Issue
Block a user