reformat by black

This commit is contained in:
Kuang-che Wu
2025-05-06 23:45:42 +08:00
parent 7cb8ccc960
commit ec27e96486

View File

@ -54,130 +54,156 @@ except ImportError:
parser = argparse.ArgumentParser()
cpu_count = multiprocessing.cpu_count()
group = parser.add_argument_group('Required parameters')
group.add_argument('-i',
dest='input',
action='append',
metavar='dir',
required=True,
help='input directory with the starting corpus')
group.add_argument('-o',
dest='output',
metavar='dir',
required=True,
help='output directory for minimized files')
group = parser.add_argument_group('Execution control settings')
group.add_argument('-f',
dest='stdin_file',
metavar='file',
help='location read by the fuzzed program (stdin)')
group = parser.add_argument_group("Required parameters")
group.add_argument(
'-m',
dest='memory_limit',
default='none',
metavar='megs',
type=lambda x: x if x == 'none' else int(x),
help='memory limit for child process (default: %(default)s)')
group.add_argument('-t',
dest='time_limit',
default=5000,
metavar='msec',
type=lambda x: x if x == 'none' else int(x),
help='timeout for each run (default: %(default)s)')
group.add_argument('-O',
dest='frida_mode',
action='store_true',
default=False,
help='use binary-only instrumentation (FRIDA mode)')
group.add_argument('-Q',
dest='qemu_mode',
action='store_true',
default=False,
help='use binary-only instrumentation (QEMU mode)')
group.add_argument('-U',
dest='unicorn_mode',
action='store_true',
default=False,
help='use unicorn-based instrumentation (Unicorn mode)')
group.add_argument('-X',
dest='nyx_mode',
action='store_true',
default=False,
help='use Nyx mode')
"-i",
dest="input",
action="append",
metavar="dir",
required=True,
help="input directory with the starting corpus",
)
group.add_argument(
"-o",
dest="output",
metavar="dir",
required=True,
help="output directory for minimized files",
)
group = parser.add_argument_group('Minimization settings')
group.add_argument('--crash-dir',
dest='crash_dir',
metavar='dir',
default=None,
help="move crashes to a separate dir, always deduplicated")
group.add_argument('-A',
dest='allow_any',
action='store_true',
help='allow crashes and timeouts (not recommended)')
group.add_argument('-C',
dest='crash_only',
action='store_true',
help='keep crashing inputs, reject everything else')
group.add_argument('-e',
dest='edge_mode',
action='store_true',
default=False,
help='solve for edge coverage only, ignore hit counts')
group = parser.add_argument_group("Execution control settings")
group.add_argument(
"-f",
dest="stdin_file",
metavar="file",
help="location read by the fuzzed program (stdin)",
)
group.add_argument(
"-m",
dest="memory_limit",
default="none",
metavar="megs",
type=lambda x: x if x == "none" else int(x),
help="memory limit for child process (default: %(default)s)",
)
group.add_argument(
"-t",
dest="time_limit",
default=5000,
metavar="msec",
type=lambda x: x if x == "none" else int(x),
help="timeout for each run (default: %(default)s)",
)
group.add_argument(
"-O",
dest="frida_mode",
action="store_true",
default=False,
help="use binary-only instrumentation (FRIDA mode)",
)
group.add_argument(
"-Q",
dest="qemu_mode",
action="store_true",
default=False,
help="use binary-only instrumentation (QEMU mode)",
)
group.add_argument(
"-U",
dest="unicorn_mode",
action="store_true",
default=False,
help="use unicorn-based instrumentation (Unicorn mode)",
)
group.add_argument(
"-X", dest="nyx_mode", action="store_true", default=False, help="use Nyx mode"
)
group = parser.add_argument_group('Misc')
group.add_argument('-T',
dest='workers',
type=lambda x: cpu_count if x == 'all' else int(x),
default=1,
help='number of concurrent worker (default: %(default)d)')
group.add_argument('--as_queue',
action='store_true',
help='output file name like "id:000000,hash:value"')
group.add_argument('--no-dedup',
action='store_true',
help='skip deduplication step for corpus files')
group.add_argument('--debug', action='store_true')
group = parser.add_argument_group("Minimization settings")
group.add_argument(
"--crash-dir",
dest="crash_dir",
metavar="dir",
default=None,
help="move crashes to a separate dir, always deduplicated",
)
group.add_argument(
"-A",
dest="allow_any",
action="store_true",
help="allow crashes and timeouts (not recommended)",
)
group.add_argument(
"-C",
dest="crash_only",
action="store_true",
help="keep crashing inputs, reject everything else",
)
group.add_argument(
"-e",
dest="edge_mode",
action="store_true",
default=False,
help="solve for edge coverage only, ignore hit counts",
)
parser.add_argument('exe', metavar='/path/to/target_app')
parser.add_argument('args', nargs='*')
group = parser.add_argument_group("Misc")
group.add_argument(
"-T",
dest="workers",
type=lambda x: cpu_count if x == "all" else int(x),
default=1,
help="number of concurrent worker (default: %(default)d)",
)
group.add_argument(
"--as_queue",
action="store_true",
help='output file name like "id:000000,hash:value"',
)
group.add_argument(
"--no-dedup", action="store_true", help="skip deduplication step for corpus files"
)
group.add_argument("--debug", action="store_true")
parser.add_argument("exe", metavar="/path/to/target_app")
parser.add_argument("args", nargs="*")
args = parser.parse_args()
logger = None
afl_showmap_bin = None
tuple_index_type_code = 'I'
tuple_index_type_code = "I"
file_index_type_code = None
def init():
global logger
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=log_level, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
if args.stdin_file and args.workers > 1:
logger.error('-f is only supported with one worker (-T 1)')
logger.error("-f is only supported with one worker (-T 1)")
sys.exit(1)
if args.memory_limit != 'none' and args.memory_limit < 5:
logger.error('dangerously low memory limit')
if args.memory_limit != "none" and args.memory_limit < 5:
logger.error("dangerously low memory limit")
sys.exit(1)
if args.time_limit != 'none' and args.time_limit < 10:
logger.error('dangerously low timeout')
if args.time_limit != "none" and args.time_limit < 10:
logger.error("dangerously low timeout")
sys.exit(1)
if not os.path.isfile(args.exe):
logger.error('binary "%s" not found or not regular file', args.exe)
sys.exit(1)
if not os.environ.get('AFL_SKIP_BIN_CHECK') and not any(
[args.qemu_mode, args.frida_mode, args.unicorn_mode, args.nyx_mode]):
if b'__AFL_SHM_ID' not in open(args.exe, 'rb').read():
logger.error("binary '%s' doesn't appear to be instrumented",
args.exe)
if not os.environ.get("AFL_SKIP_BIN_CHECK") and not any(
[args.qemu_mode, args.frida_mode, args.unicorn_mode, args.nyx_mode]
):
if b"__AFL_SHM_ID" not in open(args.exe, "rb").read():
logger.error("binary '%s' doesn't appear to be instrumented", args.exe)
sys.exit(1)
for dn in args.input:
@ -191,18 +217,18 @@ def init():
os.path.dirname(__file__),
os.getcwd(),
]
if os.environ.get('AFL_PATH'):
searches.append(os.environ['AFL_PATH'])
if os.environ.get("AFL_PATH"):
searches.append(os.environ["AFL_PATH"])
for search in searches:
afl_showmap_bin = shutil.which('afl-showmap', path=search)
afl_showmap_bin = shutil.which("afl-showmap", path=search)
if afl_showmap_bin:
break
if not afl_showmap_bin:
logger.fatal('cannot find afl-showmap, please set AFL_PATH')
logger.fatal("cannot find afl-showmap, please set AFL_PATH")
sys.exit(1)
trace_dir = os.path.join(args.output, '.traces')
trace_dir = os.path.join(args.output, ".traces")
shutil.rmtree(trace_dir, ignore_errors=True)
try:
os.rmdir(args.output)
@ -210,19 +236,19 @@ def init():
pass
if os.path.exists(args.output):
logger.error(
'directory "%s" exists and is not empty - delete it first',
args.output)
'directory "%s" exists and is not empty - delete it first', args.output
)
sys.exit(1)
if args.crash_dir and not os.path.exists(args.crash_dir):
os.makedirs(args.crash_dir)
os.makedirs(trace_dir)
logger.info('use %d workers (-T)', args.workers)
logger.info("use %d workers (-T)", args.workers)
def detect_type_code(size):
for type_code in ['B', 'H', 'I', 'L', 'Q']:
if 256**array.array(type_code).itemsize > size:
for type_code in ["B", "H", "I", "L", "Q"]:
if 256 ** array.array(type_code).itemsize > size:
return type_code
@ -238,71 +264,70 @@ def afl_showmap(input_path=None, batch=None, afl_map_size=None, first=False):
# yapf: enable
found_atat = False
for arg in args.args:
if '@@' in arg:
if "@@" in arg:
found_atat = True
if args.stdin_file:
assert args.workers == 1
input_from_file = True
stdin_file = args.stdin_file
cmd += ['-H', stdin_file]
cmd += ["-H", stdin_file]
elif found_atat:
input_from_file = True
stdin_file = os.path.join(args.output, f'.input.{os.getpid()}')
cmd += ['-H', stdin_file]
stdin_file = os.path.join(args.output, f".input.{os.getpid()}")
cmd += ["-H", stdin_file]
else:
input_from_file = False
if batch:
input_from_file = True
filelist = os.path.join(args.output, f'.filelist.{os.getpid()}')
with open(filelist, 'w') as f:
filelist = os.path.join(args.output, f".filelist.{os.getpid()}")
with open(filelist, "w") as f:
for _, path in batch:
f.write(path + '\n')
cmd += ['-I', filelist]
output_path = os.path.join(args.output, f'.showmap.{os.getpid()}')
cmd += ['-o', output_path]
f.write(path + "\n")
cmd += ["-I", filelist]
output_path = os.path.join(args.output, f".showmap.{os.getpid()}")
cmd += ["-o", output_path]
else:
if input_from_file:
shutil.copy(input_path, stdin_file)
cmd += ['-o', '-']
cmd += ["-o", "-"]
if args.frida_mode:
cmd += ['-O']
cmd += ["-O"]
if args.qemu_mode:
cmd += ['-Q']
cmd += ["-Q"]
if args.unicorn_mode:
cmd += ['-U']
cmd += ["-U"]
if args.nyx_mode:
cmd += ['-X']
cmd += ["-X"]
if args.edge_mode:
cmd += ['-e']
cmd += ['--', args.exe] + args.args
cmd += ["-e"]
cmd += ["--", args.exe] + args.args
env = os.environ.copy()
env['AFL_QUIET'] = '1'
env['ASAN_OPTIONS'] = 'detect_leaks=0'
env["AFL_QUIET"] = "1"
env["ASAN_OPTIONS"] = "detect_leaks=0"
if first:
logger.debug('run command line: %s', subprocess.list2cmdline(cmd))
env['AFL_CMIN_ALLOW_ANY'] = '1'
logger.debug("run command line: %s", subprocess.list2cmdline(cmd))
env["AFL_CMIN_ALLOW_ANY"] = "1"
if afl_map_size:
env['AFL_MAP_SIZE'] = str(afl_map_size)
env["AFL_MAP_SIZE"] = str(afl_map_size)
if args.crash_only:
env['AFL_CMIN_CRASHES_ONLY'] = '1'
env["AFL_CMIN_CRASHES_ONLY"] = "1"
if args.allow_any:
env['AFL_CMIN_ALLOW_ANY'] = '1'
env["AFL_CMIN_ALLOW_ANY"] = "1"
if input_from_file:
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
env=env,
bufsize=1048576)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env, bufsize=1048576)
else:
p = subprocess.Popen(cmd,
stdin=open(input_path, 'rb'),
stdout=subprocess.PIPE,
env=env,
bufsize=1048576)
p = subprocess.Popen(
cmd,
stdin=open(input_path, "rb"),
stdout=subprocess.PIPE,
env=env,
bufsize=1048576,
)
out = p.stdout.read()
p.wait()
@ -313,7 +338,7 @@ def afl_showmap(input_path=None, batch=None, afl_map_size=None, first=False):
values = []
try:
trace_file = os.path.join(output_path, basename)
with open(trace_file, 'r') as f:
with open(trace_file, "r") as f:
values = list(map(int, f))
crashed = len(values) == 0
os.unlink(trace_file)
@ -366,21 +391,22 @@ class Worker(multiprocessing.Process):
def run(self):
map_size = self.afl_map_size or 65536
max_tuple = map_size * 9
max_file_index = 256**array.array(file_index_type_code).itemsize - 1
max_file_index = 256 ** array.array(file_index_type_code).itemsize - 1
m = array.array(file_index_type_code, [max_file_index] * max_tuple)
counter = collections.Counter()
crashes = []
pack_name = os.path.join(args.output, '.traces', f'{self.idx}.pack')
pack_name = os.path.join(args.output, ".traces", f"{self.idx}.pack")
pack_pos = 0
with open(pack_name, 'wb') as trace_pack:
with open(pack_name, "wb") as trace_pack:
while True:
batch = self.q_in.get()
if batch is None:
break
for idx, r, crash in afl_showmap(
batch=batch, afl_map_size=self.afl_map_size):
batch=batch, afl_map_size=self.afl_map_size
):
counter.update(r)
used = False
@ -419,7 +445,7 @@ class CombineTraceWorker(multiprocessing.Process):
def run(self):
already_have = set()
with open(self.pack_name, 'rb') as f:
with open(self.pack_name, "rb") as f:
for pos, tuple_count in self.jobs:
f.seek(pos)
result = array.array(tuple_index_type_code)
@ -430,7 +456,7 @@ class CombineTraceWorker(multiprocessing.Process):
def hash_file(path):
m = hashlib.sha1()
with open(path, 'rb') as f:
with open(path, "rb") as f:
m.update(f.read())
return m.digest()
@ -443,11 +469,14 @@ def dedup(files):
# use large chunksize to reduce multiprocessing overhead
chunksize = max(1, min(256, len(files) // args.workers))
for i, h in enumerate(
tqdm(pool.imap(hash_file, files, chunksize),
desc='dedup',
total=len(files),
ncols=0,
leave=(len(files) > 100000))):
tqdm(
pool.imap(hash_file, files, chunksize),
desc="dedup",
total=len(files),
ncols=0,
leave=(len(files) > 100000),
)
):
if h in seen_hash:
continue
seen_hash.add(h)
@ -457,8 +486,12 @@ def dedup(files):
def is_afl_dir(dirnames, filenames):
return ('queue' in dirnames and 'hangs' in dirnames
and 'crashes' in dirnames and 'fuzzer_setup' in filenames)
return (
"queue" in dirnames
and "hangs" in dirnames
and "crashes" in dirnames
and "fuzzer_setup" in filenames
)
def collect_files(input_paths):
@ -467,18 +500,18 @@ def collect_files(input_paths):
paths += glob.glob(s)
files = []
with tqdm(desc='search', unit=' files', ncols=0) as pbar:
with tqdm(desc="search", unit=" files", ncols=0) as pbar:
for path in paths:
for root, dirnames, filenames in os.walk(path, followlinks=True):
for dirname in dirnames:
if dirname.startswith('.'):
if dirname.startswith("."):
dirnames.remove(dirname)
if not args.crash_only and is_afl_dir(dirnames, filenames):
continue
for filename in filenames:
if filename.startswith('.'):
if filename.startswith("."):
continue
pbar.update(1)
files.append(os.path.join(root, filename))
@ -490,21 +523,20 @@ def main():
files = collect_files(args.input)
if len(files) == 0:
logger.error('no inputs in the target directory - nothing to be done')
logger.error("no inputs in the target directory - nothing to be done")
sys.exit(1)
logger.info('Found %d input files in %d directories', len(files),
len(args.input))
logger.info("Found %d input files in %d directories", len(files), len(args.input))
if not args.no_dedup:
files, hash_list = dedup(files)
logger.info('Remain %d files after dedup', len(files))
logger.info("Remain %d files after dedup", len(files))
else:
logger.info('Skipping file deduplication.')
logger.info("Skipping file deduplication.")
global file_index_type_code
file_index_type_code = detect_type_code(len(files))
logger.info('Sorting files.')
logger.info("Sorting files.")
with multiprocessing.Pool(args.workers) as pool:
chunksize = max(1, min(512, len(files) // args.workers))
size_list = list(pool.map(os.path.getsize, files, chunksize))
@ -513,24 +545,22 @@ def main():
hash_list = [hash_list[idx] for idx in idxes]
afl_map_size = None
if b'AFL_DUMP_MAP_SIZE' in open(args.exe, 'rb').read():
output = subprocess.run([args.exe],
capture_output=True,
env={
'AFL_DUMP_MAP_SIZE': '1'
}).stdout
if b"AFL_DUMP_MAP_SIZE" in open(args.exe, "rb").read():
output = subprocess.run(
[args.exe], capture_output=True, env={"AFL_DUMP_MAP_SIZE": "1"}
).stdout
afl_map_size = int(output)
logger.info('Setting AFL_MAP_SIZE=%d', afl_map_size)
logger.info("Setting AFL_MAP_SIZE=%d", afl_map_size)
global tuple_index_type_code
tuple_index_type_code = detect_type_code(afl_map_size * 9)
logger.info('Testing the target binary')
logger.info("Testing the target binary")
tuples, _ = afl_showmap(files[0], afl_map_size=afl_map_size, first=True)
if tuples:
logger.info('ok, %d tuples recorded', len(tuples))
logger.info("ok, %d tuples recorded", len(tuples))
else:
logger.error('no instrumentation output detected')
logger.error("no instrumentation output detected")
sys.exit(1)
job_queue = multiprocessing.Queue()
@ -550,7 +580,7 @@ def main():
dispatcher = JobDispatcher(job_queue, jobs)
dispatcher.start()
logger.info('Processing traces')
logger.info("Processing traces")
effective = 0
trace_info = {}
for _ in tqdm(files, ncols=0, smoothing=0.01):
@ -561,7 +591,7 @@ def main():
effective += 1
dispatcher.join()
logger.info('Obtaining trace results')
logger.info("Obtaining trace results")
ms = []
crashes = []
counter = collections.Counter()
@ -574,27 +604,38 @@ def main():
best_idxes = list(map(min, zip(*ms)))
if not args.crash_dir:
logger.info('Found %d unique tuples across %d files (%d effective)',
len(counter), len(files), effective)
logger.info(
"Found %d unique tuples across %d files (%d effective)",
len(counter),
len(files),
effective,
)
else:
logger.info(
'Found %d unique tuples across %d files (%d effective, %d crashes)',
len(counter), len(files), effective, len(crashes))
"Found %d unique tuples across %d files (%d effective, %d crashes)",
len(counter),
len(files),
effective,
len(crashes),
)
all_unique = counter.most_common()
logger.info('Processing candidates and writing output')
logger.info("Processing candidates and writing output")
already_have = set()
count = 0
def save_file(idx):
input_path = files[idx]
fn = (base64.b16encode(hash_list[idx]).decode('utf8').lower()
if not args.no_dedup else os.path.basename(input_path))
fn = (
base64.b16encode(hash_list[idx]).decode("utf8").lower()
if not args.no_dedup
else os.path.basename(input_path)
)
if args.as_queue:
if args.no_dedup:
fn = 'id:%06d,orig:%s' % (count, fn)
fn = "id:%06d,orig:%s" % (count, fn)
else:
fn = 'id:%06d,hash:%s' % (count, fn)
fn = "id:%06d,hash:%s" % (count, fn)
output_path = os.path.join(args.output, fn)
try:
os.link(input_path, output_path)
@ -620,8 +661,8 @@ def main():
trace_packs = []
workers = []
for i in range(args.workers):
pack_name = os.path.join(args.output, '.traces', f'{i}.pack')
trace_f = open(pack_name, 'rb')
pack_name = os.path.join(args.output, ".traces", f"{i}.pack")
trace_f = open(pack_name, "rb")
trace_packs.append(trace_f)
p = CombineTraceWorker(pack_name, jobs[i], result_queue)
@ -652,7 +693,7 @@ def main():
f.close()
if args.crash_dir:
logger.info('Saving crashes to %s', args.crash_dir)
logger.info("Saving crashes to %s", args.crash_dir)
crash_files = [files[c] for c in crashes]
if args.no_dedup:
@ -661,7 +702,7 @@ def main():
crash_files, hash_list = dedup(crash_files)
for idx, crash_path in enumerate(crash_files):
fn = base64.b16encode(hash_list[idx]).decode('utf8').lower()
fn = base64.b16encode(hash_list[idx]).decode("utf8").lower()
output_path = os.path.join(args.crash_dir, fn)
try:
os.link(crash_path, output_path)
@ -675,13 +716,13 @@ def main():
pass
if count == 1:
logger.warning('all test cases had the same traces, check syntax!')
logger.warning("all test cases had the same traces, check syntax!")
logger.info('narrowed down to %s files, saved in "%s"', count, args.output)
if not os.environ.get('AFL_KEEP_TRACES'):
logger.info('Deleting trace files')
trace_dir = os.path.join(args.output, '.traces')
if not os.environ.get("AFL_KEEP_TRACES"):
logger.info("Deleting trace files")
trace_dir = os.path.join(args.output, ".traces")
shutil.rmtree(trace_dir, ignore_errors=True)
if __name__ == '__main__':
if __name__ == "__main__":
main()