#!/usr/bin/env python3 # Copyright 2016-2025 Google Inc. # Copyright 2025 AFLplusplus Project. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import argparse import array import base64 import collections import glob import hashlib import itertools import logging import multiprocessing import os import shutil import subprocess import sys try: from tqdm import tqdm except ImportError: print('Hint: install python module "tqdm" to show progress bar') class tqdm: def __init__(self, data=None, *args, **argd): self.data = data def __iter__(self): yield from self.data def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): pass def update(self, *args): pass parser = argparse.ArgumentParser() cpu_count = multiprocessing.cpu_count() group = parser.add_argument_group('Required parameters') group.add_argument('-i', dest='input', action='append', metavar='dir', required=True, help='input directory with the starting corpus') group.add_argument('-o', dest='output', metavar='dir', required=True, help='output directory for minimized files') group = parser.add_argument_group('Execution control settings') group.add_argument('-f', dest='stdin_file', metavar='file', help='location read by the fuzzed program (stdin)') group.add_argument( '-m', dest='memory_limit', default='none', metavar='megs', type=lambda x: x if x == 'none' else int(x), help='memory limit for child process (default: %(default)s)') group.add_argument('-t', dest='time_limit', default=5000, metavar='msec', type=lambda x: x if x == 'none' else int(x), help='timeout for each run (default: %(default)s)') group.add_argument('-O', dest='frida_mode', action='store_true', default=False, help='use binary-only instrumentation (FRIDA mode)') group.add_argument('-Q', dest='qemu_mode', action='store_true', default=False, help='use binary-only instrumentation (QEMU mode)') group.add_argument('-U', dest='unicorn_mode', action='store_true', default=False, help='use unicorn-based instrumentation (Unicorn mode)') group.add_argument('-X', dest='nyx_mode', action='store_true', default=False, help='use Nyx mode') group = parser.add_argument_group('Minimization settings') group.add_argument('--crash-dir', dest='crash_dir', metavar='dir', default=None, help="move crashes to a separate dir, always deduplicated") group.add_argument('-A', dest='allow_any', action='store_true', help='allow crashes and timeouts (not recommended)') group.add_argument('-C', dest='crash_only', action='store_true', help='keep crashing inputs, reject everything else') group.add_argument('-e', dest='edge_mode', action='store_true', default=False, help='solve for edge coverage only, ignore hit counts') group = parser.add_argument_group('Misc') group.add_argument('-T', dest='workers', type=lambda x: cpu_count if x == 'all' else int(x), default=1, help='number of concurrent worker (default: %(default)d)') group.add_argument('--as_queue', action='store_true', help='output file name like "id:000000,hash:value"') group.add_argument('--no-dedup', action='store_true', help='skip deduplication step for corpus files') group.add_argument('--debug', action='store_true') parser.add_argument('exe', metavar='/path/to/target_app') parser.add_argument('args', nargs='*') args = parser.parse_args() logger = None afl_showmap_bin = None tuple_index_type_code = 'I' file_index_type_code = None def init(): global logger log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) if args.stdin_file and args.workers > 1: logger.error('-f is only supported with one worker (-T 1)') sys.exit(1) if args.memory_limit != 'none' and args.memory_limit < 5: logger.error('dangerously low memory limit') sys.exit(1) if args.time_limit != 'none' and args.time_limit < 10: logger.error('dangerously low timeout') sys.exit(1) if not os.path.isfile(args.exe): logger.error('binary "%s" not found or not regular file', args.exe) sys.exit(1) if not os.environ.get('AFL_SKIP_BIN_CHECK') and not any( [args.qemu_mode, args.frida_mode, args.unicorn_mode, args.nyx_mode]): if b'__AFL_SHM_ID' not in open(args.exe, 'rb').read(): logger.error("binary '%s' doesn't appear to be instrumented", args.exe) sys.exit(1) for dn in args.input: if not os.path.isdir(dn) and not glob.glob(dn): logger.error('directory "%s" not found', dn) sys.exit(1) global afl_showmap_bin searches = [ None, os.path.dirname(__file__), os.getcwd(), ] if os.environ.get('AFL_PATH'): searches.append(os.environ['AFL_PATH']) for search in searches: afl_showmap_bin = shutil.which('afl-showmap', path=search) if afl_showmap_bin: break if not afl_showmap_bin: logger.fatal('cannot find afl-showmap, please set AFL_PATH') sys.exit(1) trace_dir = os.path.join(args.output, '.traces') shutil.rmtree(trace_dir, ignore_errors=True) try: os.rmdir(args.output) except OSError: pass if os.path.exists(args.output): logger.error( 'directory "%s" exists and is not empty - delete it first', args.output) sys.exit(1) if args.crash_dir and not os.path.exists(args.crash_dir): os.makedirs(args.crash_dir) os.makedirs(trace_dir) logger.info('use %d workers (-T)', args.workers) def detect_type_code(size): for type_code in ['B', 'H', 'I', 'L', 'Q']: if 256**array.array(type_code).itemsize > size: return type_code def afl_showmap(input_path=None, batch=None, afl_map_size=None, first=False): assert input_path or batch # yapf: disable cmd = [ afl_showmap_bin, '-m', str(args.memory_limit), '-t', str(args.time_limit), '-Z', # cmin mode ] # yapf: enable found_atat = False for arg in args.args: if '@@' in arg: found_atat = True if args.stdin_file: assert args.workers == 1 input_from_file = True stdin_file = args.stdin_file cmd += ['-H', stdin_file] elif found_atat: input_from_file = True stdin_file = os.path.join(args.output, f'.input.{os.getpid()}') cmd += ['-H', stdin_file] else: input_from_file = False if batch: input_from_file = True filelist = os.path.join(args.output, f'.filelist.{os.getpid()}') with open(filelist, 'w') as f: for _, path in batch: f.write(path + '\n') cmd += ['-I', filelist] output_path = os.path.join(args.output, f'.showmap.{os.getpid()}') cmd += ['-o', output_path] else: if input_from_file: shutil.copy(input_path, stdin_file) cmd += ['-o', '-'] if args.frida_mode: cmd += ['-O'] if args.qemu_mode: cmd += ['-Q'] if args.unicorn_mode: cmd += ['-U'] if args.nyx_mode: cmd += ['-X'] if args.edge_mode: cmd += ['-e'] cmd += ['--', args.exe] + args.args env = os.environ.copy() env['AFL_QUIET'] = '1' env['ASAN_OPTIONS'] = 'detect_leaks=0' if first: logger.debug('run command line: %s', subprocess.list2cmdline(cmd)) env['AFL_CMIN_ALLOW_ANY'] = '1' if afl_map_size: env['AFL_MAP_SIZE'] = str(afl_map_size) if args.crash_only: env['AFL_CMIN_CRASHES_ONLY'] = '1' if args.allow_any: env['AFL_CMIN_ALLOW_ANY'] = '1' if input_from_file: p = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env, bufsize=1048576) else: p = subprocess.Popen(cmd, stdin=open(input_path, 'rb'), stdout=subprocess.PIPE, env=env, bufsize=1048576) out = p.stdout.read() p.wait() if batch: result = [] for idx, input_path in batch: basename = os.path.basename(input_path) values = [] try: trace_file = os.path.join(output_path, basename) with open(trace_file, 'r') as f: values = list(map(int, f)) crashed = len(values) == 0 os.unlink(trace_file) except FileNotFoundError: a = None crashed = True values = [(t // 1000) * 9 + t % 1000 for t in values] a = array.array(tuple_index_type_code, values) result.append((idx, a, crashed)) os.unlink(filelist) os.rmdir(output_path) return result else: values = [] for line in out.split(): if not line.isdigit(): continue values.append(int(line)) values = [(t // 1000) * 9 + t % 1000 for t in values] a = array.array(tuple_index_type_code, values) crashed = p.returncode in [2, 3] if input_from_file and stdin_file != args.stdin_file: os.unlink(stdin_file) return a, crashed class JobDispatcher(multiprocessing.Process): def __init__(self, job_queue, jobs): super().__init__() self.job_queue = job_queue self.jobs = jobs def run(self): for job in self.jobs: self.job_queue.put(job) self.job_queue.close() class Worker(multiprocessing.Process): def __init__(self, idx, afl_map_size, q_in, p_out, r_out): super().__init__() self.idx = idx self.afl_map_size = afl_map_size self.q_in = q_in self.p_out = p_out self.r_out = r_out def run(self): map_size = self.afl_map_size or 65536 max_tuple = map_size * 9 max_file_index = 256**array.array(file_index_type_code).itemsize - 1 m = array.array(file_index_type_code, [max_file_index] * max_tuple) counter = collections.Counter() crashes = [] pack_name = os.path.join(args.output, '.traces', f'{self.idx}.pack') pack_pos = 0 with open(pack_name, 'wb') as trace_pack: while True: batch = self.q_in.get() if batch is None: break for idx, r, crash in afl_showmap( batch=batch, afl_map_size=self.afl_map_size): counter.update(r) used = False if crash: crashes.append(idx) # If we aren't saving crashes to a separate dir, handle them # the same as other inputs. However, unless AFL_CMIN_ALLOW_ANY=1, # afl_showmap will not return any coverage for crashes so they will # never be retained. if not crash or not args.crash_dir: for t in r: if idx < m[t]: m[t] = idx used = True if used: tuple_count = len(r) r.tofile(trace_pack) self.p_out.put((idx, self.idx, pack_pos, tuple_count)) pack_pos += tuple_count * r.itemsize else: self.p_out.put(None) self.r_out.put((self.idx, m, counter, crashes)) class CombineTraceWorker(multiprocessing.Process): def __init__(self, pack_name, jobs, r_out): super().__init__() self.pack_name = pack_name self.jobs = jobs self.r_out = r_out def run(self): already_have = set() with open(self.pack_name, 'rb') as f: for pos, tuple_count in self.jobs: f.seek(pos) result = array.array(tuple_index_type_code) result.fromfile(f, tuple_count) already_have.update(result) self.r_out.put(already_have) def hash_file(path): m = hashlib.sha1() with open(path, 'rb') as f: m.update(f.read()) return m.digest() def dedup(files): with multiprocessing.Pool(args.workers) as pool: seen_hash = set() result = [] hash_list = [] # use large chunksize to reduce multiprocessing overhead chunksize = max(1, min(256, len(files) // args.workers)) for i, h in enumerate( tqdm(pool.imap(hash_file, files, chunksize), desc='dedup', total=len(files), ncols=0, leave=(len(files) > 100000))): if h in seen_hash: continue seen_hash.add(h) result.append(files[i]) hash_list.append(h) return result, hash_list def is_afl_dir(dirnames, filenames): return ('queue' in dirnames and 'hangs' in dirnames and 'crashes' in dirnames and 'fuzzer_setup' in filenames) def collect_files(input_paths): paths = [] for s in input_paths: paths += glob.glob(s) files = [] with tqdm(desc='search', unit=' files', ncols=0) as pbar: for path in paths: for root, dirnames, filenames in os.walk(path, followlinks=True): for dirname in dirnames: if dirname.startswith('.'): dirnames.remove(dirname) if not args.crash_only and is_afl_dir(dirnames, filenames): continue for filename in filenames: if filename.startswith('.'): continue pbar.update(1) files.append(os.path.join(root, filename)) return files def main(): init() files = collect_files(args.input) if len(files) == 0: logger.error('no inputs in the target directory - nothing to be done') sys.exit(1) logger.info('Found %d input files in %d directories', len(files), len(args.input)) if not args.no_dedup: files, hash_list = dedup(files) logger.info('Remain %d files after dedup', len(files)) else: logger.info('Skipping file deduplication.') global file_index_type_code file_index_type_code = detect_type_code(len(files)) logger.info('Sorting files.') with multiprocessing.Pool(args.workers) as pool: chunksize = max(1, min(512, len(files) // args.workers)) size_list = list(pool.map(os.path.getsize, files, chunksize)) idxes = sorted(range(len(files)), key=lambda x: size_list[x]) files = [files[idx] for idx in idxes] hash_list = [hash_list[idx] for idx in idxes] afl_map_size = None if b'AFL_DUMP_MAP_SIZE' in open(args.exe, 'rb').read(): output = subprocess.run([args.exe], capture_output=True, env={ 'AFL_DUMP_MAP_SIZE': '1' }).stdout afl_map_size = int(output) logger.info('Setting AFL_MAP_SIZE=%d', afl_map_size) global tuple_index_type_code tuple_index_type_code = detect_type_code(afl_map_size * 9) logger.info('Testing the target binary') tuples, _ = afl_showmap(files[0], afl_map_size=afl_map_size, first=True) if tuples: logger.info('ok, %d tuples recorded', len(tuples)) else: logger.error('no instrumentation output detected') sys.exit(1) job_queue = multiprocessing.Queue() progress_queue = multiprocessing.Queue() result_queue = multiprocessing.Queue() workers = [] for i in range(args.workers): p = Worker(i, afl_map_size, job_queue, progress_queue, result_queue) p.start() workers.append(p) chunk = max(1, min(128, len(files) // args.workers)) jobs = list(itertools.batched(enumerate(files), chunk)) jobs += [None] * args.workers # sentinel dispatcher = JobDispatcher(job_queue, jobs) dispatcher.start() logger.info('Processing traces') effective = 0 trace_info = {} for _ in tqdm(files, ncols=0, smoothing=0.01): r = progress_queue.get() if r is not None: idx, worker_idx, pos, tuple_count = r trace_info[idx] = worker_idx, pos, tuple_count effective += 1 dispatcher.join() logger.info('Obtaining trace results') ms = [] crashes = [] counter = collections.Counter() for _ in tqdm(range(args.workers), ncols=0): idx, m, c, crs = result_queue.get() ms.append(m) counter.update(c) crashes.extend(crs) workers[idx].join() best_idxes = list(map(min, zip(*ms))) if not args.crash_dir: logger.info('Found %d unique tuples across %d files (%d effective)', len(counter), len(files), effective) else: logger.info( 'Found %d unique tuples across %d files (%d effective, %d crashes)', len(counter), len(files), effective, len(crashes)) all_unique = counter.most_common() logger.info('Processing candidates and writing output') already_have = set() count = 0 def save_file(idx): input_path = files[idx] fn = (base64.b16encode(hash_list[idx]).decode('utf8').lower() if not args.no_dedup else os.path.basename(input_path)) if args.as_queue: if args.no_dedup: fn = 'id:%06d,orig:%s' % (count, fn) else: fn = 'id:%06d,hash:%s' % (count, fn) output_path = os.path.join(args.output, fn) try: os.link(input_path, output_path) except OSError: shutil.copy(input_path, output_path) jobs = [[] for i in range(args.workers)] saved = set() for t, c in all_unique: if c != 1: continue idx = best_idxes[t] if idx in saved: continue save_file(idx) saved.add(idx) count += 1 worker_idx, pos, tuple_count = trace_info[idx] job = (pos, tuple_count) jobs[worker_idx].append(job) trace_packs = [] workers = [] for i in range(args.workers): pack_name = os.path.join(args.output, '.traces', f'{i}.pack') trace_f = open(pack_name, 'rb') trace_packs.append(trace_f) p = CombineTraceWorker(pack_name, jobs[i], result_queue) p.start() workers.append(p) for _ in range(args.workers): result = result_queue.get() already_have.update(result) for t, c in tqdm(list(reversed(all_unique)), ncols=0): if t in already_have: continue idx = best_idxes[t] save_file(idx) count += 1 worker_idx, pos, tuple_count = trace_info[idx] trace_pack = trace_packs[worker_idx] trace_pack.seek(pos) result = array.array(tuple_index_type_code) result.fromfile(trace_pack, tuple_count) already_have.update(result) for f in trace_packs: f.close() if args.crash_dir: logger.info('Saving crashes to %s', args.crash_dir) crash_files = [files[c] for c in crashes] if args.no_dedup: # Unless we deduped previously, we have to dedup the crash files # now. crash_files, hash_list = dedup(crash_files) for idx, crash_path in enumerate(crash_files): fn = base64.b16encode(hash_list[idx]).decode('utf8').lower() output_path = os.path.join(args.crash_dir, fn) try: os.link(crash_path, output_path) except OSError: try: shutil.copy(crash_path, output_path) except shutil.Error: # This error happens when src and dest are hardlinks of the # same file. We have nothing to do in this case, but handle # it gracefully. pass if count == 1: logger.warning('all test cases had the same traces, check syntax!') logger.info('narrowed down to %s files, saved in "%s"', count, args.output) if not os.environ.get('AFL_KEEP_TRACES'): logger.info('Deleting trace files') trace_dir = os.path.join(args.output, '.traces') shutil.rmtree(trace_dir, ignore_errors=True) if __name__ == '__main__': main()