enable setting ensemble sync duration timer (#229)

This commit is contained in:
bmc-msft
2020-10-29 14:48:12 -04:00
committed by GitHub
parent 154be220ae
commit ced8200d74
14 changed files with 86 additions and 12 deletions

View File

@ -45,6 +45,8 @@ pub fn run(args: &clap::ArgMatches) -> Result<()> {
url: BlobContainerUrl::new(Url::parse("https://contoso.com/crashes")?)?, url: BlobContainerUrl::new(Url::parse("https://contoso.com/crashes")?)?,
}; };
let ensemble_sync_delay = None;
let config = Config { let config = Config {
inputs, inputs,
readonly_inputs, readonly_inputs,
@ -53,6 +55,7 @@ pub fn run(args: &clap::ArgMatches) -> Result<()> {
target_env, target_env,
target_options, target_options,
target_workers, target_workers,
ensemble_sync_delay,
common: CommonConfig { common: CommonConfig {
heartbeat_queue: None, heartbeat_queue: None,
instrumentation_key: None, instrumentation_key: None,

View File

@ -46,6 +46,7 @@ pub struct GeneratorConfig {
#[serde(default)] #[serde(default)]
pub check_retry_count: u64, pub check_retry_count: u64,
pub rename_output: bool, pub rename_output: bool,
pub ensemble_sync_delay: Option<u64>,
#[serde(flatten)] #[serde(flatten)]
pub common: CommonConfig, pub common: CommonConfig,
} }
@ -61,7 +62,7 @@ pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> {
dir.init_pull().await?; dir.init_pull().await?;
} }
let sync_task = continuous_sync(&config.readonly_inputs, Pull, None); let sync_task = continuous_sync(&config.readonly_inputs, Pull, config.ensemble_sync_delay);
let crash_dir_monitor = config.crashes.monitor_results(new_result); let crash_dir_monitor = config.crashes.monitor_results(new_result);
let tester = Tester::new( let tester = Tester::new(
&config.target_exe, &config.target_exe,

View File

@ -45,6 +45,7 @@ pub struct Config {
pub target_env: HashMap<String, String>, pub target_env: HashMap<String, String>,
pub target_options: Vec<String>, pub target_options: Vec<String>,
pub target_workers: Option<u64>, pub target_workers: Option<u64>,
pub ensemble_sync_delay: Option<u64>,
#[serde(flatten)] #[serde(flatten)]
pub common: CommonConfig, pub common: CommonConfig,
@ -197,7 +198,7 @@ impl LibFuzzerFuzzTask {
let inputs = inputs.clone(); let inputs = inputs.clone();
dirs.extend(inputs); dirs.extend(inputs);
} }
continuous_sync(&dirs, Pull, None).await continuous_sync(&dirs, Pull, self.config.ensemble_sync_delay).await
} }
} }

View File

@ -42,6 +42,7 @@ pub struct SupervisorConfig {
pub wait_for_files: Option<ContainerType>, pub wait_for_files: Option<ContainerType>,
pub stats_file: Option<String>, pub stats_file: Option<String>,
pub stats_format: Option<StatsFormat>, pub stats_format: Option<StatsFormat>,
pub ensemble_sync_delay: Option<u64>,
#[serde(flatten)] #[serde(flatten)]
pub common: CommonConfig, pub common: CommonConfig,
} }
@ -88,7 +89,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
} }
} }
let continuous_sync_task = inputs.continuous_sync(Pull, None); let continuous_sync_task = inputs.continuous_sync(Pull, config.ensemble_sync_delay);
let process = start_supervisor( let process = start_supervisor(
&runtime_dir.path(), &runtime_dir.path(),

View File

@ -21,7 +21,7 @@ pub enum SyncOperation {
} }
const DELAY: Duration = Duration::from_secs(10); const DELAY: Duration = Duration::from_secs(10);
const DEFAULT_CONTINUOUS_SYNC_DELAY: Duration = Duration::from_secs(60); const DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS: u64 = 60;
#[derive(Debug, Deserialize, Clone, PartialEq)] #[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct SyncedDir { pub struct SyncedDir {
@ -70,9 +70,14 @@ impl SyncedDir {
pub async fn continuous_sync( pub async fn continuous_sync(
&self, &self,
operation: SyncOperation, operation: SyncOperation,
delay: Option<Duration>, delay_seconds: Option<u64>,
) -> Result<()> { ) -> Result<()> {
let delay = delay.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY); let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS);
if delay_seconds == 0 {
return Ok(());
}
let delay = Duration::from_secs(delay_seconds);
loop { loop {
self.sync(operation).await?; self.sync(operation).await?;
delay_with_jitter(delay).await; delay_with_jitter(delay).await;
@ -130,9 +135,15 @@ impl SyncedDir {
pub async fn continuous_sync( pub async fn continuous_sync(
dirs: &[SyncedDir], dirs: &[SyncedDir],
operation: SyncOperation, operation: SyncOperation,
delay: Option<Duration>, delay_seconds: Option<u64>,
) -> Result<()> { ) -> Result<()> {
let delay = delay.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY); let delay_seconds = delay_seconds.unwrap_or(DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS);
if delay_seconds == 0 {
return Ok(());
}
let delay = Duration::from_secs(delay_seconds);
loop { loop {
for dir in dirs { for dir in dirs {
dir.sync(operation).await?; dir.sync(operation).await?;

View File

@ -304,6 +304,9 @@ def build_task_config(
if TaskFeature.check_retry_count in definition.features: if TaskFeature.check_retry_count in definition.features:
config.check_retry_count = task_config.task.check_retry_count or 0 config.check_retry_count = task_config.task.check_retry_count or 0
if TaskFeature.ensemble_sync_delay in definition.features:
config.ensemble_sync_delay = task_config.task.ensemble_sync_delay
return config return config

View File

@ -62,6 +62,7 @@ TASK_DEFINITIONS = {
TaskFeature.target_env, TaskFeature.target_env,
TaskFeature.target_options, TaskFeature.target_options,
TaskFeature.target_workers, TaskFeature.target_workers,
TaskFeature.ensemble_sync_delay,
], ],
vm=VmDefinition(compare=Compare.AtLeast, value=1), vm=VmDefinition(compare=Compare.AtLeast, value=1),
containers=[ containers=[
@ -213,6 +214,7 @@ TASK_DEFINITIONS = {
TaskFeature.supervisor_input_marker, TaskFeature.supervisor_input_marker,
TaskFeature.wait_for_files, TaskFeature.wait_for_files,
TaskFeature.stats_file, TaskFeature.stats_file,
TaskFeature.ensemble_sync_delay,
], ],
vm=VmDefinition(compare=Compare.AtLeast, value=1), vm=VmDefinition(compare=Compare.AtLeast, value=1),
containers=[ containers=[
@ -299,6 +301,7 @@ TASK_DEFINITIONS = {
TaskFeature.check_asan_log, TaskFeature.check_asan_log,
TaskFeature.check_debugger, TaskFeature.check_debugger,
TaskFeature.check_retry_count, TaskFeature.check_retry_count,
TaskFeature.ensemble_sync_delay,
], ],
vm=VmDefinition(compare=Compare.AtLeast, value=1), vm=VmDefinition(compare=Compare.AtLeast, value=1),
containers=[ containers=[

View File

@ -672,8 +672,15 @@ class Tasks(Endpoint):
tags: Optional[Dict[str, str]] = None, tags: Optional[Dict[str, str]] = None,
prereq_tasks: Optional[List[UUID]] = None, prereq_tasks: Optional[List[UUID]] = None,
debug: Optional[List[enums.TaskDebugFlag]] = None, debug: Optional[List[enums.TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None,
) -> models.Task: ) -> models.Task:
""" Create a task """ """
Create a task
:param bool ensemble_sync_delay: Specify duration between
syncing inputs during ensemble fuzzing (0 to disable).
"""
self.logger.debug("creating task: %s", task_type) self.logger.debug("creating task: %s", task_type)
job_id_expanded = self._disambiguate_uuid( job_id_expanded = self._disambiguate_uuid(
@ -731,6 +738,7 @@ class Tasks(Endpoint):
check_asan_log=check_asan_log, check_asan_log=check_asan_log,
check_debugger=check_debugger, check_debugger=check_debugger,
check_retry_count=check_retry_count, check_retry_count=check_retry_count,
ensemble_sync_delay=ensemble_sync_delay,
), ),
pool=models.TaskPool(count=vm_count, pool_name=pool_name), pool=models.TaskPool(count=vm_count, pool_name=pool_name),
containers=containers_submit, containers=containers_submit,

View File

@ -52,11 +52,14 @@ class AFL(Command):
dryrun: bool = False, dryrun: bool = False,
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" """
Basic AFL job Basic AFL job
:param Container afl_container: Specify the AFL container to use in the job :param Container afl_container: Specify the AFL container to use in the job
:param bool ensemble_sync_delay: Specify duration between
syncing inputs during ensemble fuzzing (0 to disable).
""" """
if existing_inputs: if existing_inputs:
@ -65,6 +68,10 @@ class AFL(Command):
if dryrun: if dryrun:
return None return None
# disable ensemble sync if only one VM is used
if ensemble_sync_delay is None and vm_count == 1:
ensemble_sync_delay = 0
self.logger.info("creating afl from template") self.logger.info("creating afl from template")
target_options = target_options or ["{input}"] target_options = target_options or ["{input}"]
@ -146,6 +153,7 @@ class AFL(Command):
task_wait_for_files=ContainerType.inputs, task_wait_for_files=ContainerType.inputs,
tags=helper.tags, tags=helper.tags,
debug=debug, debug=debug,
ensemble_sync_delay=ensemble_sync_delay,
) )
report_containers = [ report_containers = [

View File

@ -47,6 +47,7 @@ class Libfuzzer(Command):
check_retry_count: Optional[int] = None, check_retry_count: Optional[int] = None,
crash_report_timeout: Optional[int] = None, crash_report_timeout: Optional[int] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None,
) -> None: ) -> None:
fuzzer_containers = [ fuzzer_containers = [
@ -55,6 +56,11 @@ class Libfuzzer(Command):
(ContainerType.inputs, containers[ContainerType.inputs]), (ContainerType.inputs, containers[ContainerType.inputs]),
] ]
self.logger.info("creating libfuzzer task") self.logger.info("creating libfuzzer task")
# disable ensemble sync if only one VM is used
if ensemble_sync_delay is None and vm_count == 1:
ensemble_sync_delay = 0
fuzzer_task = self.onefuzz.tasks.create( fuzzer_task = self.onefuzz.tasks.create(
job.job_id, job.job_id,
TaskType.libfuzzer_fuzz, TaskType.libfuzzer_fuzz,
@ -69,6 +75,7 @@ class Libfuzzer(Command):
target_workers=target_workers, target_workers=target_workers,
tags=tags, tags=tags,
debug=debug, debug=debug,
ensemble_sync_delay=ensemble_sync_delay,
) )
coverage_containers = [ coverage_containers = [
@ -146,8 +153,14 @@ class Libfuzzer(Command):
dryrun: bool = False, dryrun: bool = False,
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" Basic libfuzzer job """ """
Basic libfuzzer job
:param bool ensemble_sync_delay: Specify duration between
syncing inputs during ensemble fuzzing (0 to disable).
"""
# verify containers exist # verify containers exist
if existing_inputs: if existing_inputs:
@ -213,6 +226,7 @@ class Libfuzzer(Command):
crash_report_timeout=crash_report_timeout, crash_report_timeout=crash_report_timeout,
check_retry_count=check_retry_count, check_retry_count=check_retry_count,
debug=debug, debug=debug,
ensemble_sync_delay=ensemble_sync_delay,
) )
self.logger.info("done creating tasks") self.logger.info("done creating tasks")

View File

@ -114,8 +114,14 @@ class OssFuzz(Command):
sync_inputs: bool = False, sync_inputs: bool = False,
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None,
) -> None: ) -> None:
""" OssFuzz style libfuzzer jobs """ """
OssFuzz style libfuzzer jobs
:param bool ensemble_sync_delay: Specify duration between
syncing inputs during ensemble fuzzing (0 to disable).
"""
fuzzers = sorted(glob.glob("*fuzzer")) fuzzers = sorted(glob.glob("*fuzzer"))
if fuzzers: if fuzzers:
@ -236,6 +242,7 @@ class OssFuzz(Command):
target_env=target_env, target_env=target_env,
tags=helper.tags, tags=helper.tags,
debug=debug, debug=debug,
ensemble_sync_delay=ensemble_sync_delay,
) )
helpers.append(helper) helpers.append(helper)
base_helper.wait() base_helper.wait()

View File

@ -48,8 +48,14 @@ class Radamsa(Command):
dryrun: bool = False, dryrun: bool = False,
notification_config: Optional[NotificationConfig] = None, notification_config: Optional[NotificationConfig] = None,
debug: Optional[List[TaskDebugFlag]] = None, debug: Optional[List[TaskDebugFlag]] = None,
ensemble_sync_delay: Optional[int] = None,
) -> Optional[Job]: ) -> Optional[Job]:
""" Basic radamsa job """ """
Basic radamsa job
:param bool ensemble_sync_delay: Specify duration between
syncing inputs during ensemble fuzzing (0 to disable).
"""
if inputs is None and existing_inputs is None: if inputs is None and existing_inputs is None:
raise Exception("radamsa requires inputs") raise Exception("radamsa requires inputs")
@ -57,6 +63,10 @@ class Radamsa(Command):
if dryrun: if dryrun:
return None return None
# disable ensemble sync if only one VM is used
if ensemble_sync_delay is None and vm_count == 1:
ensemble_sync_delay = 0
self.logger.info("creating radamsa from template") self.logger.info("creating radamsa from template")
helper = JobHelper( helper = JobHelper(
@ -162,6 +172,7 @@ class Radamsa(Command):
tags=helper.tags, tags=helper.tags,
rename_output=rename_output, rename_output=rename_output,
debug=debug, debug=debug,
ensemble_sync_delay=ensemble_sync_delay,
) )
report_containers = [ report_containers = [

View File

@ -73,6 +73,7 @@ class TaskFeature(Enum):
check_asan_log = "check_asan_log" check_asan_log = "check_asan_log"
check_debugger = "check_debugger" check_debugger = "check_debugger"
check_retry_count = "check_retry_count" check_retry_count = "check_retry_count"
ensemble_sync_delay = "ensemble_sync_delay"
# Permissions for an Azure Blob Storage Container. # Permissions for an Azure Blob Storage Container.

View File

@ -122,6 +122,7 @@ class TaskDetails(BaseModel):
stats_format: Optional[StatsFormat] stats_format: Optional[StatsFormat]
reboot_after_setup: Optional[bool] reboot_after_setup: Optional[bool]
target_timeout: Optional[int] target_timeout: Optional[int]
ensemble_sync_delay: Optional[int]
@validator("check_retry_count", allow_reuse=True) @validator("check_retry_count", allow_reuse=True)
def validate_check_retry_count(cls, value: int) -> int: def validate_check_retry_count(cls, value: int) -> int:
@ -303,6 +304,7 @@ class TaskUnitConfig(BaseModel):
analyzer_options: Optional[List[str]] analyzer_options: Optional[List[str]]
stats_file: Optional[str] stats_file: Optional[str]
stats_format: Optional[StatsFormat] stats_format: Optional[StatsFormat]
ensemble_sync_delay: Optional[int]
# from here forwards are Container definitions. These need to be inline # from here forwards are Container definitions. These need to be inline
# with TaskDefinitions and ContainerTypes # with TaskDefinitions and ContainerTypes