diff --git a/afl-cmin.py b/afl-cmin.py index cbfe4aca..aad7bc16 100755 --- a/afl-cmin.py +++ b/afl-cmin.py @@ -54,130 +54,156 @@ except ImportError: parser = argparse.ArgumentParser() cpu_count = multiprocessing.cpu_count() -group = parser.add_argument_group('Required parameters') -group.add_argument('-i', - dest='input', - action='append', - metavar='dir', - required=True, - help='input directory with the starting corpus') -group.add_argument('-o', - dest='output', - metavar='dir', - required=True, - help='output directory for minimized files') - -group = parser.add_argument_group('Execution control settings') -group.add_argument('-f', - dest='stdin_file', - metavar='file', - help='location read by the fuzzed program (stdin)') +group = parser.add_argument_group("Required parameters") group.add_argument( - '-m', - dest='memory_limit', - default='none', - metavar='megs', - type=lambda x: x if x == 'none' else int(x), - help='memory limit for child process (default: %(default)s)') -group.add_argument('-t', - dest='time_limit', - default=5000, - metavar='msec', - type=lambda x: x if x == 'none' else int(x), - help='timeout for each run (default: %(default)s)') -group.add_argument('-O', - dest='frida_mode', - action='store_true', - default=False, - help='use binary-only instrumentation (FRIDA mode)') -group.add_argument('-Q', - dest='qemu_mode', - action='store_true', - default=False, - help='use binary-only instrumentation (QEMU mode)') -group.add_argument('-U', - dest='unicorn_mode', - action='store_true', - default=False, - help='use unicorn-based instrumentation (Unicorn mode)') -group.add_argument('-X', - dest='nyx_mode', - action='store_true', - default=False, - help='use Nyx mode') + "-i", + dest="input", + action="append", + metavar="dir", + required=True, + help="input directory with the starting corpus", +) +group.add_argument( + "-o", + dest="output", + metavar="dir", + required=True, + help="output directory for minimized files", +) -group = parser.add_argument_group('Minimization settings') -group.add_argument('--crash-dir', - dest='crash_dir', - metavar='dir', - default=None, - help="move crashes to a separate dir, always deduplicated") -group.add_argument('-A', - dest='allow_any', - action='store_true', - help='allow crashes and timeouts (not recommended)') -group.add_argument('-C', - dest='crash_only', - action='store_true', - help='keep crashing inputs, reject everything else') -group.add_argument('-e', - dest='edge_mode', - action='store_true', - default=False, - help='solve for edge coverage only, ignore hit counts') +group = parser.add_argument_group("Execution control settings") +group.add_argument( + "-f", + dest="stdin_file", + metavar="file", + help="location read by the fuzzed program (stdin)", +) +group.add_argument( + "-m", + dest="memory_limit", + default="none", + metavar="megs", + type=lambda x: x if x == "none" else int(x), + help="memory limit for child process (default: %(default)s)", +) +group.add_argument( + "-t", + dest="time_limit", + default=5000, + metavar="msec", + type=lambda x: x if x == "none" else int(x), + help="timeout for each run (default: %(default)s)", +) +group.add_argument( + "-O", + dest="frida_mode", + action="store_true", + default=False, + help="use binary-only instrumentation (FRIDA mode)", +) +group.add_argument( + "-Q", + dest="qemu_mode", + action="store_true", + default=False, + help="use binary-only instrumentation (QEMU mode)", +) +group.add_argument( + "-U", + dest="unicorn_mode", + action="store_true", + default=False, + help="use unicorn-based instrumentation (Unicorn mode)", +) +group.add_argument( + "-X", dest="nyx_mode", action="store_true", default=False, help="use Nyx mode" +) -group = parser.add_argument_group('Misc') -group.add_argument('-T', - dest='workers', - type=lambda x: cpu_count if x == 'all' else int(x), - default=1, - help='number of concurrent worker (default: %(default)d)') -group.add_argument('--as_queue', - action='store_true', - help='output file name like "id:000000,hash:value"') -group.add_argument('--no-dedup', - action='store_true', - help='skip deduplication step for corpus files') -group.add_argument('--debug', action='store_true') +group = parser.add_argument_group("Minimization settings") +group.add_argument( + "--crash-dir", + dest="crash_dir", + metavar="dir", + default=None, + help="move crashes to a separate dir, always deduplicated", +) +group.add_argument( + "-A", + dest="allow_any", + action="store_true", + help="allow crashes and timeouts (not recommended)", +) +group.add_argument( + "-C", + dest="crash_only", + action="store_true", + help="keep crashing inputs, reject everything else", +) +group.add_argument( + "-e", + dest="edge_mode", + action="store_true", + default=False, + help="solve for edge coverage only, ignore hit counts", +) -parser.add_argument('exe', metavar='/path/to/target_app') -parser.add_argument('args', nargs='*') +group = parser.add_argument_group("Misc") +group.add_argument( + "-T", + dest="workers", + type=lambda x: cpu_count if x == "all" else int(x), + default=1, + help="number of concurrent worker (default: %(default)d)", +) +group.add_argument( + "--as_queue", + action="store_true", + help='output file name like "id:000000,hash:value"', +) +group.add_argument( + "--no-dedup", action="store_true", help="skip deduplication step for corpus files" +) +group.add_argument("--debug", action="store_true") + +parser.add_argument("exe", metavar="/path/to/target_app") +parser.add_argument("args", nargs="*") args = parser.parse_args() logger = None afl_showmap_bin = None -tuple_index_type_code = 'I' +tuple_index_type_code = "I" file_index_type_code = None def init(): global logger log_level = logging.DEBUG if args.debug else logging.INFO - logging.basicConfig(level=log_level, - format='%(asctime)s - %(levelname)s - %(message)s') + logging.basicConfig( + level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" + ) logger = logging.getLogger(__name__) if args.stdin_file and args.workers > 1: - logger.error('-f is only supported with one worker (-T 1)') + logger.error("-f is only supported with one worker (-T 1)") sys.exit(1) - if args.memory_limit != 'none' and args.memory_limit < 5: - logger.error('dangerously low memory limit') + if args.memory_limit != "none" and args.memory_limit < 5: + logger.error("dangerously low memory limit") sys.exit(1) - if args.time_limit != 'none' and args.time_limit < 10: - logger.error('dangerously low timeout') + if args.time_limit != "none" and args.time_limit < 10: + logger.error("dangerously low timeout") sys.exit(1) if not os.path.isfile(args.exe): logger.error('binary "%s" not found or not regular file', args.exe) sys.exit(1) - if not os.environ.get('AFL_SKIP_BIN_CHECK') and not any( - [args.qemu_mode, args.frida_mode, args.unicorn_mode, args.nyx_mode]): - if b'__AFL_SHM_ID' not in open(args.exe, 'rb').read(): - logger.error("binary '%s' doesn't appear to be instrumented", - args.exe) + if not os.environ.get("AFL_SKIP_BIN_CHECK") and not any( + [args.qemu_mode, args.frida_mode, args.unicorn_mode, args.nyx_mode] + ): + if b"__AFL_SHM_ID" not in open(args.exe, "rb").read(): + logger.error("binary '%s' doesn't appear to be instrumented", args.exe) sys.exit(1) for dn in args.input: @@ -191,18 +217,18 @@ def init(): os.path.dirname(__file__), os.getcwd(), ] - if os.environ.get('AFL_PATH'): - searches.append(os.environ['AFL_PATH']) + if os.environ.get("AFL_PATH"): + searches.append(os.environ["AFL_PATH"]) for search in searches: - afl_showmap_bin = shutil.which('afl-showmap', path=search) + afl_showmap_bin = shutil.which("afl-showmap", path=search) if afl_showmap_bin: break if not afl_showmap_bin: - logger.fatal('cannot find afl-showmap, please set AFL_PATH') + logger.fatal("cannot find afl-showmap, please set AFL_PATH") sys.exit(1) - trace_dir = os.path.join(args.output, '.traces') + trace_dir = os.path.join(args.output, ".traces") shutil.rmtree(trace_dir, ignore_errors=True) try: os.rmdir(args.output) @@ -210,19 +236,19 @@ def init(): pass if os.path.exists(args.output): logger.error( - 'directory "%s" exists and is not empty - delete it first', - args.output) + 'directory "%s" exists and is not empty - delete it first', args.output + ) sys.exit(1) if args.crash_dir and not os.path.exists(args.crash_dir): os.makedirs(args.crash_dir) os.makedirs(trace_dir) - logger.info('use %d workers (-T)', args.workers) + logger.info("use %d workers (-T)", args.workers) def detect_type_code(size): - for type_code in ['B', 'H', 'I', 'L', 'Q']: - if 256**array.array(type_code).itemsize > size: + for type_code in ["B", "H", "I", "L", "Q"]: + if 256 ** array.array(type_code).itemsize > size: return type_code @@ -238,71 +264,70 @@ def afl_showmap(input_path=None, batch=None, afl_map_size=None, first=False): # yapf: enable found_atat = False for arg in args.args: - if '@@' in arg: + if "@@" in arg: found_atat = True if args.stdin_file: assert args.workers == 1 input_from_file = True stdin_file = args.stdin_file - cmd += ['-H', stdin_file] + cmd += ["-H", stdin_file] elif found_atat: input_from_file = True - stdin_file = os.path.join(args.output, f'.input.{os.getpid()}') - cmd += ['-H', stdin_file] + stdin_file = os.path.join(args.output, f".input.{os.getpid()}") + cmd += ["-H", stdin_file] else: input_from_file = False if batch: input_from_file = True - filelist = os.path.join(args.output, f'.filelist.{os.getpid()}') - with open(filelist, 'w') as f: + filelist = os.path.join(args.output, f".filelist.{os.getpid()}") + with open(filelist, "w") as f: for _, path in batch: - f.write(path + '\n') - cmd += ['-I', filelist] - output_path = os.path.join(args.output, f'.showmap.{os.getpid()}') - cmd += ['-o', output_path] + f.write(path + "\n") + cmd += ["-I", filelist] + output_path = os.path.join(args.output, f".showmap.{os.getpid()}") + cmd += ["-o", output_path] else: if input_from_file: shutil.copy(input_path, stdin_file) - cmd += ['-o', '-'] + cmd += ["-o", "-"] if args.frida_mode: - cmd += ['-O'] + cmd += ["-O"] if args.qemu_mode: - cmd += ['-Q'] + cmd += ["-Q"] if args.unicorn_mode: - cmd += ['-U'] + cmd += ["-U"] if args.nyx_mode: - cmd += ['-X'] + cmd += ["-X"] if args.edge_mode: - cmd += ['-e'] - cmd += ['--', args.exe] + args.args + cmd += ["-e"] + cmd += ["--", args.exe] + args.args env = os.environ.copy() - env['AFL_QUIET'] = '1' - env['ASAN_OPTIONS'] = 'detect_leaks=0' + env["AFL_QUIET"] = "1" + env["ASAN_OPTIONS"] = "detect_leaks=0" if first: - logger.debug('run command line: %s', subprocess.list2cmdline(cmd)) - env['AFL_CMIN_ALLOW_ANY'] = '1' + logger.debug("run command line: %s", subprocess.list2cmdline(cmd)) + env["AFL_CMIN_ALLOW_ANY"] = "1" if afl_map_size: - env['AFL_MAP_SIZE'] = str(afl_map_size) + env["AFL_MAP_SIZE"] = str(afl_map_size) if args.crash_only: - env['AFL_CMIN_CRASHES_ONLY'] = '1' + env["AFL_CMIN_CRASHES_ONLY"] = "1" if args.allow_any: - env['AFL_CMIN_ALLOW_ANY'] = '1' + env["AFL_CMIN_ALLOW_ANY"] = "1" if input_from_file: - p = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - env=env, - bufsize=1048576) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env, bufsize=1048576) else: - p = subprocess.Popen(cmd, - stdin=open(input_path, 'rb'), - stdout=subprocess.PIPE, - env=env, - bufsize=1048576) + p = subprocess.Popen( + cmd, + stdin=open(input_path, "rb"), + stdout=subprocess.PIPE, + env=env, + bufsize=1048576, + ) out = p.stdout.read() p.wait() @@ -313,7 +338,7 @@ def afl_showmap(input_path=None, batch=None, afl_map_size=None, first=False): values = [] try: trace_file = os.path.join(output_path, basename) - with open(trace_file, 'r') as f: + with open(trace_file, "r") as f: values = list(map(int, f)) crashed = len(values) == 0 os.unlink(trace_file) @@ -366,21 +391,22 @@ class Worker(multiprocessing.Process): def run(self): map_size = self.afl_map_size or 65536 max_tuple = map_size * 9 - max_file_index = 256**array.array(file_index_type_code).itemsize - 1 + max_file_index = 256 ** array.array(file_index_type_code).itemsize - 1 m = array.array(file_index_type_code, [max_file_index] * max_tuple) counter = collections.Counter() crashes = [] - pack_name = os.path.join(args.output, '.traces', f'{self.idx}.pack') + pack_name = os.path.join(args.output, ".traces", f"{self.idx}.pack") pack_pos = 0 - with open(pack_name, 'wb') as trace_pack: + with open(pack_name, "wb") as trace_pack: while True: batch = self.q_in.get() if batch is None: break for idx, r, crash in afl_showmap( - batch=batch, afl_map_size=self.afl_map_size): + batch=batch, afl_map_size=self.afl_map_size + ): counter.update(r) used = False @@ -419,7 +445,7 @@ class CombineTraceWorker(multiprocessing.Process): def run(self): already_have = set() - with open(self.pack_name, 'rb') as f: + with open(self.pack_name, "rb") as f: for pos, tuple_count in self.jobs: f.seek(pos) result = array.array(tuple_index_type_code) @@ -430,7 +456,7 @@ class CombineTraceWorker(multiprocessing.Process): def hash_file(path): m = hashlib.sha1() - with open(path, 'rb') as f: + with open(path, "rb") as f: m.update(f.read()) return m.digest() @@ -443,11 +469,14 @@ def dedup(files): # use large chunksize to reduce multiprocessing overhead chunksize = max(1, min(256, len(files) // args.workers)) for i, h in enumerate( - tqdm(pool.imap(hash_file, files, chunksize), - desc='dedup', - total=len(files), - ncols=0, - leave=(len(files) > 100000))): + tqdm( + pool.imap(hash_file, files, chunksize), + desc="dedup", + total=len(files), + ncols=0, + leave=(len(files) > 100000), + ) + ): if h in seen_hash: continue seen_hash.add(h) @@ -457,8 +486,12 @@ def dedup(files): def is_afl_dir(dirnames, filenames): - return ('queue' in dirnames and 'hangs' in dirnames - and 'crashes' in dirnames and 'fuzzer_setup' in filenames) + return ( + "queue" in dirnames + and "hangs" in dirnames + and "crashes" in dirnames + and "fuzzer_setup" in filenames + ) def collect_files(input_paths): @@ -467,18 +500,18 @@ def collect_files(input_paths): paths += glob.glob(s) files = [] - with tqdm(desc='search', unit=' files', ncols=0) as pbar: + with tqdm(desc="search", unit=" files", ncols=0) as pbar: for path in paths: for root, dirnames, filenames in os.walk(path, followlinks=True): for dirname in dirnames: - if dirname.startswith('.'): + if dirname.startswith("."): dirnames.remove(dirname) if not args.crash_only and is_afl_dir(dirnames, filenames): continue for filename in filenames: - if filename.startswith('.'): + if filename.startswith("."): continue pbar.update(1) files.append(os.path.join(root, filename)) @@ -490,21 +523,20 @@ def main(): files = collect_files(args.input) if len(files) == 0: - logger.error('no inputs in the target directory - nothing to be done') + logger.error("no inputs in the target directory - nothing to be done") sys.exit(1) - logger.info('Found %d input files in %d directories', len(files), - len(args.input)) + logger.info("Found %d input files in %d directories", len(files), len(args.input)) if not args.no_dedup: files, hash_list = dedup(files) - logger.info('Remain %d files after dedup', len(files)) + logger.info("Remain %d files after dedup", len(files)) else: - logger.info('Skipping file deduplication.') + logger.info("Skipping file deduplication.") global file_index_type_code file_index_type_code = detect_type_code(len(files)) - logger.info('Sorting files.') + logger.info("Sorting files.") with multiprocessing.Pool(args.workers) as pool: chunksize = max(1, min(512, len(files) // args.workers)) size_list = list(pool.map(os.path.getsize, files, chunksize)) @@ -513,24 +545,22 @@ def main(): hash_list = [hash_list[idx] for idx in idxes] afl_map_size = None - if b'AFL_DUMP_MAP_SIZE' in open(args.exe, 'rb').read(): - output = subprocess.run([args.exe], - capture_output=True, - env={ - 'AFL_DUMP_MAP_SIZE': '1' - }).stdout + if b"AFL_DUMP_MAP_SIZE" in open(args.exe, "rb").read(): + output = subprocess.run( + [args.exe], capture_output=True, env={"AFL_DUMP_MAP_SIZE": "1"} + ).stdout afl_map_size = int(output) - logger.info('Setting AFL_MAP_SIZE=%d', afl_map_size) + logger.info("Setting AFL_MAP_SIZE=%d", afl_map_size) global tuple_index_type_code tuple_index_type_code = detect_type_code(afl_map_size * 9) - logger.info('Testing the target binary') + logger.info("Testing the target binary") tuples, _ = afl_showmap(files[0], afl_map_size=afl_map_size, first=True) if tuples: - logger.info('ok, %d tuples recorded', len(tuples)) + logger.info("ok, %d tuples recorded", len(tuples)) else: - logger.error('no instrumentation output detected') + logger.error("no instrumentation output detected") sys.exit(1) job_queue = multiprocessing.Queue() @@ -550,7 +580,7 @@ def main(): dispatcher = JobDispatcher(job_queue, jobs) dispatcher.start() - logger.info('Processing traces') + logger.info("Processing traces") effective = 0 trace_info = {} for _ in tqdm(files, ncols=0, smoothing=0.01): @@ -561,7 +591,7 @@ def main(): effective += 1 dispatcher.join() - logger.info('Obtaining trace results') + logger.info("Obtaining trace results") ms = [] crashes = [] counter = collections.Counter() @@ -574,27 +604,38 @@ def main(): best_idxes = list(map(min, zip(*ms))) if not args.crash_dir: - logger.info('Found %d unique tuples across %d files (%d effective)', - len(counter), len(files), effective) + logger.info( + "Found %d unique tuples across %d files (%d effective)", + len(counter), + len(files), + effective, + ) else: logger.info( - 'Found %d unique tuples across %d files (%d effective, %d crashes)', - len(counter), len(files), effective, len(crashes)) + "Found %d unique tuples across %d files (%d effective, %d crashes)", + len(counter), + len(files), + effective, + len(crashes), + ) all_unique = counter.most_common() - logger.info('Processing candidates and writing output') + logger.info("Processing candidates and writing output") already_have = set() count = 0 def save_file(idx): input_path = files[idx] - fn = (base64.b16encode(hash_list[idx]).decode('utf8').lower() - if not args.no_dedup else os.path.basename(input_path)) + fn = ( + base64.b16encode(hash_list[idx]).decode("utf8").lower() + if not args.no_dedup + else os.path.basename(input_path) + ) if args.as_queue: if args.no_dedup: - fn = 'id:%06d,orig:%s' % (count, fn) + fn = "id:%06d,orig:%s" % (count, fn) else: - fn = 'id:%06d,hash:%s' % (count, fn) + fn = "id:%06d,hash:%s" % (count, fn) output_path = os.path.join(args.output, fn) try: os.link(input_path, output_path) @@ -620,8 +661,8 @@ def main(): trace_packs = [] workers = [] for i in range(args.workers): - pack_name = os.path.join(args.output, '.traces', f'{i}.pack') - trace_f = open(pack_name, 'rb') + pack_name = os.path.join(args.output, ".traces", f"{i}.pack") + trace_f = open(pack_name, "rb") trace_packs.append(trace_f) p = CombineTraceWorker(pack_name, jobs[i], result_queue) @@ -652,7 +693,7 @@ def main(): f.close() if args.crash_dir: - logger.info('Saving crashes to %s', args.crash_dir) + logger.info("Saving crashes to %s", args.crash_dir) crash_files = [files[c] for c in crashes] if args.no_dedup: @@ -661,7 +702,7 @@ def main(): crash_files, hash_list = dedup(crash_files) for idx, crash_path in enumerate(crash_files): - fn = base64.b16encode(hash_list[idx]).decode('utf8').lower() + fn = base64.b16encode(hash_list[idx]).decode("utf8").lower() output_path = os.path.join(args.crash_dir, fn) try: os.link(crash_path, output_path) @@ -675,13 +716,13 @@ def main(): pass if count == 1: - logger.warning('all test cases had the same traces, check syntax!') + logger.warning("all test cases had the same traces, check syntax!") logger.info('narrowed down to %s files, saved in "%s"', count, args.output) - if not os.environ.get('AFL_KEEP_TRACES'): - logger.info('Deleting trace files') - trace_dir = os.path.join(args.output, '.traces') + if not os.environ.get("AFL_KEEP_TRACES"): + logger.info("Deleting trace files") + trace_dir = os.path.join(args.output, ".traces") shutil.rmtree(trace_dir, ignore_errors=True) -if __name__ == '__main__': +if __name__ == "__main__": main()