Verify a workset only exists along with a reboot context (#378)

Adds the following:

1. Serializes a workset to disk during setup.
2. Upon deserializing a RebootContext, delete the file from disk (We support rebooting once and only once)
3. Check if a workset exists with a RebootContext
    1. If True, continuing processing
    2. if False, mark the tasks & node as "Done" with appropriate errors via:
        1. send WorkerEvent::Done events for each of the tasks in the work set
        2. send StateUpdateEvent::Done for the node
This commit is contained in:
bmc-msft 2021-01-04 12:51:20 -05:00 committed by GitHub
parent 36b3e2a5aa
commit d038cca1e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 11 deletions

View File

@ -12,12 +12,16 @@ extern crate serde;
#[macro_use]
extern crate clap;
use crate::heartbeat::*;
use crate::{
config::StaticConfig, coordinator::StateUpdateEvent, heartbeat::*, work::WorkSet,
worker::WorkerEvent,
};
use std::path::PathBuf;
use anyhow::Result;
use onefuzz::{
machine_id::{get_machine_id, get_scaleset_name},
process::ExitStatus,
telemetry::{self, EventData},
};
use structopt::StructOpt;
@ -35,8 +39,6 @@ pub mod setup;
pub mod work;
pub mod worker;
use config::StaticConfig;
#[derive(StructOpt, Debug)]
enum Opt {
Run(RunOpt),
@ -125,6 +127,43 @@ fn load_config(opt: RunOpt) -> Result<StaticConfig> {
Ok(config)
}
async fn check_existing_worksets(coordinator: &mut coordinator::Coordinator) -> Result<()> {
// Having existing worksets at this point means the supervisor crashed. If
// that is the case, mark each of the work units within the workset as
// failed, then exit as a failure.
if let Some(work) = WorkSet::load_from_fs_context().await? {
let failure = "onefuzz-supervisor failed to launch task due to pre-existing config";
for unit in &work.work_units {
let event = WorkerEvent::Done {
task_id: unit.task_id,
stdout: "".to_string(),
stderr: failure.to_string(),
exit_status: ExitStatus {
code: Some(1),
signal: None,
success: false,
},
};
coordinator.emit_event(event.into()).await?;
}
let event = StateUpdateEvent::Done {
error: Some(failure.to_string()),
script_output: None,
};
coordinator.emit_event(event.into()).await?;
// force set done semaphore, as to not prevent the supervisor continuing
// to report the workset as failed.
done::set_done_lock().await?;
anyhow::bail!("error starting due to pre-existing workset");
}
Ok(())
}
async fn run_agent(config: StaticConfig) -> Result<()> {
telemetry::set_property(EventData::InstanceId(config.instance_id));
telemetry::set_property(EventData::MachineId(get_machine_id().await?));
@ -146,11 +185,15 @@ async fn run_agent(config: StaticConfig) -> Result<()> {
};
verbose!("current registration: {:?}", registration);
let coordinator = coordinator::Coordinator::new(registration.clone()).await?;
let mut coordinator = coordinator::Coordinator::new(registration.clone()).await?;
verbose!("initialized coordinator");
let mut reboot = reboot::Reboot;
let scheduler = reboot.load_context().await?.into();
let reboot_context = reboot.load_context().await?;
if reboot_context.is_none() {
check_existing_worksets(&mut coordinator).await?;
}
let scheduler = reboot_context.into();
verbose!("loaded scheduler: {}", scheduler);
let work_queue = work::WorkQueue::new(registration.clone());

View File

@ -55,10 +55,11 @@ impl Reboot {
pub async fn load_context(&mut self) -> Result<Option<RebootContext>> {
use std::io::ErrorKind;
let path = reboot_context_path()?;
info!("checking for saved reboot context");
info!("checking for saved reboot context: {}", path.display());
let data = fs::read(reboot_context_path()?).await;
let data = fs::read(&path).await;
if let Err(err) = &data {
if let ErrorKind::NotFound = err.kind() {
@ -71,11 +72,9 @@ impl Reboot {
let data = data?;
let ctx = serde_json::from_slice(&data)?;
info!(
"loaded reboot context from {}",
reboot_context_path()?.display()
);
fs::remove_file(&path).await?;
info!("loaded reboot context");
Ok(Some(ctx))
}

View File

@ -38,6 +38,8 @@ impl SetupRunner {
pub async fn run(&mut self, work_set: &WorkSet) -> Result<SetupOutput> {
info!("running setup for work set");
work_set.save_context().await?;
// Download the setup container.
let setup_url = work_set.setup_url.url();
let setup_dir = work_set.setup_url.container();

View File

@ -1,12 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use std::io::ErrorKind;
use std::path::PathBuf;
use anyhow::Result;
use downcast_rs::Downcast;
use onefuzz::blob::BlobContainerUrl;
use storage_queue::QueueClient;
use tokio::fs;
use uuid::Uuid;
use crate::auth::Secret;
@ -28,6 +30,43 @@ impl WorkSet {
pub fn task_ids(&self) -> Vec<TaskId> {
self.work_units.iter().map(|w| w.task_id).collect()
}
pub fn context_path() -> Result<PathBuf> {
Ok(onefuzz::fs::onefuzz_root()?.join("workset_context.json"))
}
pub async fn load_from_fs_context() -> Result<Option<Self>> {
let path = Self::context_path()?;
info!("checking for workset context: {}", path.display());
let data = fs::read(path).await;
if let Err(err) = &data {
if let ErrorKind::NotFound = err.kind() {
// If new image, there won't be any reboot context.
info!("no workset context found, assuming first launch");
return Ok(None);
}
}
let data = data?;
let ctx = serde_json::from_slice(&data)?;
info!("loaded workset context");
Ok(Some(ctx))
}
pub async fn save_context(&self) -> Result<()> {
let path = Self::context_path()?;
info!("saving workset context: {}", path.display());
let data = serde_json::to_vec(&self)?;
fs::write(path, &data).await?;
Ok(())
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]

View File

@ -54,6 +54,7 @@ class Integration(BaseModel):
wait_for_files: List[ContainerType]
check_asan_log: Optional[bool] = Field(default=False)
disable_check_debugger: Optional[bool] = Field(default=False)
reboot_after_setup: Optional[bool] = Field(default=False)
TARGETS: Dict[str, Integration] = {
@ -70,6 +71,7 @@ TARGETS: Dict[str, Integration] = {
target_exe="fuzz.exe",
inputs="seeds",
wait_for_files=[ContainerType.unique_reports, ContainerType.coverage],
reboot_after_setup=True,
),
"linux-libfuzzer-rust": Integration(
template=TemplateType.libfuzzer,
@ -203,6 +205,7 @@ class TestOnefuzz:
setup_dir=setup,
duration=1,
vm_count=1,
reboot_after_setup=config.reboot_after_setup,
)
elif config.template == TemplateType.radamsa:
job = self.of.template.radamsa.basic(