diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py b/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py
index 8c8f9641..1ac4c9f3 100644
--- a/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py
+++ b/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py
@@ -45,30 +45,31 @@ MAX_SEG_SIZE = 128 * 1024 * 1024
INDEX_FILE_NAME = "_index.json"
-#----------------------
-#---- Helper Functions
+# ----------------------
+# ---- Helper Functions
+
def map_arch():
- arch = get_arch() # from GEF
- if 'x86_64' in arch or 'x86-64' in arch:
+ arch = get_arch() # from GEF
+ if "x86_64" in arch or "x86-64" in arch:
return "x64"
- elif 'x86' in arch or 'i386' in arch:
+ elif "x86" in arch or "i386" in arch:
return "x86"
- elif 'aarch64' in arch or 'arm64' in arch:
+ elif "aarch64" in arch or "arm64" in arch:
return "arm64le"
- elif 'aarch64_be' in arch:
+ elif "aarch64_be" in arch:
return "arm64be"
- elif 'armeb' in arch:
+ elif "armeb" in arch:
# check for THUMB mode
- cpsr = get_register('$cpsr')
- if (cpsr & (1 << 5)):
+ cpsr = get_register("$cpsr")
+ if cpsr & (1 << 5):
return "armbethumb"
else:
return "armbe"
- elif 'arm' in arch:
+ elif "arm" in arch:
# check for THUMB mode
- cpsr = get_register('$cpsr')
- if (cpsr & (1 << 5)):
+ cpsr = get_register("$cpsr")
+ if cpsr & (1 << 5):
return "armlethumb"
else:
return "armle"
@@ -76,8 +77,9 @@ def map_arch():
return ""
-#-----------------------
-#---- Dumping functions
+# -----------------------
+# ---- Dumping functions
+
def dump_arch_info():
arch_info = {}
@@ -89,7 +91,7 @@ def dump_regs():
reg_state = {}
for reg in current_arch.all_registers:
reg_val = get_register(reg)
- reg_state[reg.strip().strip('$')] = reg_val
+ reg_state[reg.strip().strip("$")] = reg_val
return reg_state
@@ -108,47 +110,76 @@ def dump_process_memory(output_dir):
if entry.page_start == entry.page_end:
continue
- seg_info = {'start': entry.page_start, 'end': entry.page_end, 'name': entry.path, 'permissions': {
- "r": entry.is_readable() > 0,
- "w": entry.is_writable() > 0,
- "x": entry.is_executable() > 0
- }, 'content_file': ''}
+ seg_info = {
+ "start": entry.page_start,
+ "end": entry.page_end,
+ "name": entry.path,
+ "permissions": {
+ "r": entry.is_readable() > 0,
+ "w": entry.is_writable() > 0,
+ "x": entry.is_executable() > 0,
+ },
+ "content_file": "",
+ }
# "(deleted)" may or may not be valid, but don't push it.
- if entry.is_readable() and not '(deleted)' in entry.path:
+ if entry.is_readable() and not "(deleted)" in entry.path:
try:
# Compress and dump the content to a file
seg_content = read_memory(entry.page_start, entry.size)
- if(seg_content == None):
- print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.page_start, entry.path))
+ if seg_content == None:
+ print(
+ "Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(
+ entry.page_start, entry.path
+ )
+ )
else:
- print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.page_start, len(seg_content), entry.path, repr(seg_info['permissions'])))
+ print(
+ "Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(
+ entry.page_start,
+ len(seg_content),
+ entry.path,
+ repr(seg_info["permissions"]),
+ )
+ )
compressed_seg_content = zlib.compress(seg_content)
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info["content_file"] = md5_sum
# Write the compressed contents to disk
- out_file = open(os.path.join(output_dir, md5_sum), 'wb')
+ out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content)
out_file.close()
except:
- print("Exception reading segment ({}): {}".format(entry.path, sys.exc_info()[0]))
+ print(
+ "Exception reading segment ({}): {}".format(
+ entry.path, sys.exc_info()[0]
+ )
+ )
else:
- print("Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start))
+ print(
+ "Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start)
+ )
# Add the segment to the list
final_segment_list.append(seg_info)
-
return final_segment_list
-#---------------------------------------------
-#---- ARM Extention (dump floating point regs)
+
+# ---------------------------------------------
+# ---- ARM Extention (dump floating point regs)
+
def dump_float(rge=32):
reg_convert = ""
- if map_arch() == "armbe" or map_arch() == "armle" or map_arch() == "armbethumb" or map_arch() == "armbethumb":
+ if (
+ map_arch() == "armbe"
+ or map_arch() == "armle"
+ or map_arch() == "armbethumb"
+ or map_arch() == "armbethumb"
+ ):
reg_state = {}
for reg_num in range(32):
value = gdb.selected_frame().read_register("d" + str(reg_num))
@@ -158,8 +189,10 @@ def dump_float(rge=32):
return reg_state
-#----------
-#---- Main
+
+# ----------
+# ---- Main
+
def main():
print("----- Unicorn Context Dumper -----")
@@ -175,7 +208,9 @@ def main():
try:
# Create the output directory
- timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S')
+ timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
+ "%Y%m%d_%H%M%S"
+ )
output_path = "UnicornContext_" + timestamp
if not os.path.exists(output_path):
os.makedirs(output_path)
@@ -190,7 +225,7 @@ def main():
}
# Write the index file
- index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w')
+ index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4))
index_file.close()
print("Done.")
@@ -198,5 +233,6 @@ def main():
except Exception as e:
print("!!! ERROR:\n\t{}".format(repr(e)))
+
if __name__ == "__main__":
main()
diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_ida.py b/unicorn_mode/helper_scripts/unicorn_dumper_ida.py
index 3f955a5c..fa29fb90 100644
--- a/unicorn_mode/helper_scripts/unicorn_dumper_ida.py
+++ b/unicorn_mode/helper_scripts/unicorn_dumper_ida.py
@@ -31,8 +31,9 @@ MAX_SEG_SIZE = 128 * 1024 * 1024
# Name of the index file
INDEX_FILE_NAME = "_index.json"
-#----------------------
-#---- Helper Functions
+# ----------------------
+# ---- Helper Functions
+
def get_arch():
if ph.id == PLFM_386 and ph.flag & PR_USE64:
@@ -52,6 +53,7 @@ def get_arch():
else:
return ""
+
def get_register_list(arch):
if arch == "arm64le" or arch == "arm64be":
arch = "arm64"
@@ -59,84 +61,174 @@ def get_register_list(arch):
arch = "arm"
registers = {
- "x64" : [
- "rax", "rbx", "rcx", "rdx", "rsi", "rdi", "rbp", "rsp",
- "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15",
- "rip", "rsp", "efl",
- "cs", "ds", "es", "fs", "gs", "ss",
+ "x64": [
+ "rax",
+ "rbx",
+ "rcx",
+ "rdx",
+ "rsi",
+ "rdi",
+ "rbp",
+ "rsp",
+ "r8",
+ "r9",
+ "r10",
+ "r11",
+ "r12",
+ "r13",
+ "r14",
+ "r15",
+ "rip",
+ "rsp",
+ "efl",
+ "cs",
+ "ds",
+ "es",
+ "fs",
+ "gs",
+ "ss",
],
- "x86" : [
- "eax", "ebx", "ecx", "edx", "esi", "edi", "ebp", "esp",
- "eip", "esp", "efl",
- "cs", "ds", "es", "fs", "gs", "ss",
- ],
- "arm" : [
- "R0", "R1", "R2", "R3", "R4", "R5", "R6", "R7",
- "R8", "R9", "R10", "R11", "R12", "PC", "SP", "LR",
+ "x86": [
+ "eax",
+ "ebx",
+ "ecx",
+ "edx",
+ "esi",
+ "edi",
+ "ebp",
+ "esp",
+ "eip",
+ "esp",
+ "efl",
+ "cs",
+ "ds",
+ "es",
+ "fs",
+ "gs",
+ "ss",
+ ],
+ "arm": [
+ "R0",
+ "R1",
+ "R2",
+ "R3",
+ "R4",
+ "R5",
+ "R6",
+ "R7",
+ "R8",
+ "R9",
+ "R10",
+ "R11",
+ "R12",
+ "PC",
+ "SP",
+ "LR",
"PSR",
],
- "arm64" : [
- "X0", "X1", "X2", "X3", "X4", "X5", "X6", "X7",
- "X8", "X9", "X10", "X11", "X12", "X13", "X14",
- "X15", "X16", "X17", "X18", "X19", "X20", "X21",
- "X22", "X23", "X24", "X25", "X26", "X27", "X28",
- "PC", "SP", "FP", "LR", "CPSR"
+ "arm64": [
+ "X0",
+ "X1",
+ "X2",
+ "X3",
+ "X4",
+ "X5",
+ "X6",
+ "X7",
+ "X8",
+ "X9",
+ "X10",
+ "X11",
+ "X12",
+ "X13",
+ "X14",
+ "X15",
+ "X16",
+ "X17",
+ "X18",
+ "X19",
+ "X20",
+ "X21",
+ "X22",
+ "X23",
+ "X24",
+ "X25",
+ "X26",
+ "X27",
+ "X28",
+ "PC",
+ "SP",
+ "FP",
+ "LR",
+ "CPSR"
# "NZCV",
- ]
+ ],
}
- return registers[arch]
+ return registers[arch]
+
+
+# -----------------------
+# ---- Dumping functions
-#-----------------------
-#---- Dumping functions
def dump_arch_info():
arch_info = {}
arch_info["arch"] = get_arch()
return arch_info
+
def dump_regs():
reg_state = {}
for reg in get_register_list(get_arch()):
reg_state[reg] = GetRegValue(reg)
return reg_state
+
def dump_process_memory(output_dir):
# Segment information dictionary
segment_list = []
-
+
# Loop over the segments, fill in the info dictionary
for seg_ea in Segments():
seg_start = SegStart(seg_ea)
seg_end = SegEnd(seg_ea)
seg_size = seg_end - seg_start
-
+
seg_info = {}
- seg_info["name"] = SegName(seg_ea)
+ seg_info["name"] = SegName(seg_ea)
seg_info["start"] = seg_start
- seg_info["end"] = seg_end
-
+ seg_info["end"] = seg_end
+
perms = getseg(seg_ea).perm
seg_info["permissions"] = {
- "r": False if (perms & SEGPERM_READ) == 0 else True,
+ "r": False if (perms & SEGPERM_READ) == 0 else True,
"w": False if (perms & SEGPERM_WRITE) == 0 else True,
- "x": False if (perms & SEGPERM_EXEC) == 0 else True,
+ "x": False if (perms & SEGPERM_EXEC) == 0 else True,
}
if (perms & SEGPERM_READ) and seg_size <= MAX_SEG_SIZE and isLoaded(seg_start):
try:
# Compress and dump the content to a file
seg_content = get_many_bytes(seg_start, seg_end - seg_start)
- if(seg_content == None):
- print("Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format(SegName(seg_ea), seg_ea))
+ if seg_content == None:
+ print(
+ "Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format(
+ SegName(seg_ea), seg_ea
+ )
+ )
seg_info["content_file"] = ""
else:
- print("Dumping segment {0}@0x{1:016x} (size:{2})".format(SegName(seg_ea), seg_ea, len(seg_content)))
+ print(
+ "Dumping segment {0}@0x{1:016x} (size:{2})".format(
+ SegName(seg_ea), seg_ea, len(seg_content)
+ )
+ )
compressed_seg_content = zlib.compress(seg_content)
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info["content_file"] = md5_sum
-
+
# Write the compressed contents to disk
- out_file = open(os.path.join(output_dir, md5_sum), 'wb')
+ out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content)
out_file.close()
except:
@@ -145,12 +237,13 @@ def dump_process_memory(output_dir):
else:
print("Skipping segment {0}@0x{1:016x}".format(SegName(seg_ea), seg_ea))
seg_info["content_file"] = ""
-
+
# Add the segment to the list
- segment_list.append(seg_info)
-
+ segment_list.append(seg_info)
+
return segment_list
+
"""
TODO: FINISH IMPORT DUMPING
def import_callback(ea, name, ord):
@@ -169,41 +262,47 @@ def dump_imports():
return import_dict
"""
-
-#----------
-#---- Main
-
+
+# ----------
+# ---- Main
+
+
def main():
try:
print("----- Unicorn Context Dumper -----")
print("You must be actively debugging before running this!")
- print("If it fails, double check that you are actively debugging before running.")
+ print(
+ "If it fails, double check that you are actively debugging before running."
+ )
# Create the output directory
- timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S')
+ timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
+ "%Y%m%d_%H%M%S"
+ )
output_path = os.path.dirname(os.path.abspath(GetIdbPath()))
output_path = os.path.join(output_path, "UnicornContext_" + timestamp)
if not os.path.exists(output_path):
os.makedirs(output_path)
print("Process context will be output to {}".format(output_path))
-
+
# Get the context
context = {
"arch": dump_arch_info(),
- "regs": dump_regs(),
+ "regs": dump_regs(),
"segments": dump_process_memory(output_path),
- #"imports": dump_imports(),
+ # "imports": dump_imports(),
}
# Write the index file
- index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w')
+ index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4))
- index_file.close()
+ index_file.close()
print("Done.")
-
+
except Exception, e:
print("!!! ERROR:\n\t{}".format(str(e)))
-
+
+
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py b/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py
index 3c019d77..179d062a 100644
--- a/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py
+++ b/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py
@@ -50,10 +50,11 @@ UNICORN_PAGE_SIZE = 0x1000
# Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only)
ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1)
-ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1)
+ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE - 1)
+
+# ----------------------
+# ---- Helper Functions
-#----------------------
-#---- Helper Functions
def overlap_alignments(segments, memory):
final_list = []
@@ -61,33 +62,40 @@ def overlap_alignments(segments, memory):
curr_end_addr = 0
curr_node = None
current_segment = None
- sorted_segments = sorted(segments, key=lambda k: (k['start'], k['end']))
+ sorted_segments = sorted(segments, key=lambda k: (k["start"], k["end"]))
if curr_seg_idx < len(sorted_segments):
current_segment = sorted_segments[curr_seg_idx]
- for mem in sorted(memory, key=lambda k: (k['start'], -k['end'])):
+ for mem in sorted(memory, key=lambda k: (k["start"], -k["end"])):
if curr_node is None:
- if current_segment is not None and current_segment['start'] == mem['start']:
+ if current_segment is not None and current_segment["start"] == mem["start"]:
curr_node = deepcopy(current_segment)
- curr_node['permissions'] = mem['permissions']
+ curr_node["permissions"] = mem["permissions"]
else:
curr_node = deepcopy(mem)
- curr_end_addr = curr_node['end']
+ curr_end_addr = curr_node["end"]
- while curr_end_addr <= mem['end']:
- if curr_node['end'] == mem['end']:
- if current_segment is not None and current_segment['start'] > curr_node['start'] and current_segment['start'] < curr_node['end']:
- curr_node['end'] = current_segment['start']
- if(curr_node['end'] > curr_node['start']):
+ while curr_end_addr <= mem["end"]:
+ if curr_node["end"] == mem["end"]:
+ if (
+ current_segment is not None
+ and current_segment["start"] > curr_node["start"]
+ and current_segment["start"] < curr_node["end"]
+ ):
+ curr_node["end"] = current_segment["start"]
+ if curr_node["end"] > curr_node["start"]:
final_list.append(curr_node)
curr_node = deepcopy(current_segment)
- curr_node['permissions'] = mem['permissions']
- curr_end_addr = curr_node['end']
+ curr_node["permissions"] = mem["permissions"]
+ curr_end_addr = curr_node["end"]
else:
- if(curr_node['end'] > curr_node['start']):
+ if curr_node["end"] > curr_node["start"]:
final_list.append(curr_node)
# if curr_node is a segment
- if current_segment is not None and current_segment['end'] == mem['end']:
+ if (
+ current_segment is not None
+ and current_segment["end"] == mem["end"]
+ ):
curr_seg_idx += 1
if curr_seg_idx < len(sorted_segments):
current_segment = sorted_segments[curr_seg_idx]
@@ -98,50 +106,56 @@ def overlap_alignments(segments, memory):
break
# could only be a segment
else:
- if curr_node['end'] < mem['end']:
+ if curr_node["end"] < mem["end"]:
# check for remaining segments and valid segments
- if(curr_node['end'] > curr_node['start']):
+ if curr_node["end"] > curr_node["start"]:
final_list.append(curr_node)
-
+
curr_seg_idx += 1
if curr_seg_idx < len(sorted_segments):
current_segment = sorted_segments[curr_seg_idx]
else:
current_segment = None
-
- if current_segment is not None and current_segment['start'] <= curr_end_addr and current_segment['start'] < mem['end']:
+
+ if (
+ current_segment is not None
+ and current_segment["start"] <= curr_end_addr
+ and current_segment["start"] < mem["end"]
+ ):
curr_node = deepcopy(current_segment)
- curr_node['permissions'] = mem['permissions']
+ curr_node["permissions"] = mem["permissions"]
else:
# no more segments
curr_node = deepcopy(mem)
-
- curr_node['start'] = curr_end_addr
- curr_end_addr = curr_node['end']
- return final_list
+ curr_node["start"] = curr_end_addr
+ curr_end_addr = curr_node["end"]
+
+ return final_list
+
# https://github.com/llvm-mirror/llvm/blob/master/include/llvm/ADT/Triple.h
def get_arch():
- arch, arch_vendor, arch_os = lldb.target.GetTriple().split('-')
- if arch == 'x86_64':
+ arch, arch_vendor, arch_os = lldb.target.GetTriple().split("-")
+ if arch == "x86_64":
return "x64"
- elif arch == 'x86' or arch == 'i386':
+ elif arch == "x86" or arch == "i386":
return "x86"
- elif arch == 'aarch64' or arch == 'arm64':
+ elif arch == "aarch64" or arch == "arm64":
return "arm64le"
- elif arch == 'aarch64_be':
+ elif arch == "aarch64_be":
return "arm64be"
- elif arch == 'armeb':
+ elif arch == "armeb":
return "armbe"
- elif arch == 'arm':
+ elif arch == "arm":
return "armle"
else:
return ""
-#-----------------------
-#---- Dumping functions
+# -----------------------
+# ---- Dumping functions
+
def dump_arch_info():
arch_info = {}
@@ -152,56 +166,64 @@ def dump_arch_info():
def dump_regs():
reg_state = {}
for reg_list in lldb.frame.GetRegisters():
- if 'general purpose registers' in reg_list.GetName().lower():
+ if "general purpose registers" in reg_list.GetName().lower():
for reg in reg_list:
reg_state[reg.GetName()] = int(reg.GetValue(), 16)
return reg_state
+
def get_section_info(sec):
- name = sec.name if sec.name is not None else ''
+ name = sec.name if sec.name is not None else ""
if sec.GetParent().name is not None:
- name = sec.GetParent().name + '.' + sec.name
+ name = sec.GetParent().name + "." + sec.name
module_name = sec.addr.module.file.GetFilename()
- module_name = module_name if module_name is not None else ''
- long_name = module_name + '.' + name
-
+ module_name = module_name if module_name is not None else ""
+ long_name = module_name + "." + name
+
return sec.addr.load_addr, (sec.addr.load_addr + sec.size), sec.size, long_name
-
+
def dump_process_memory(output_dir):
# Segment information dictionary
raw_segment_list = []
raw_memory_list = []
-
+
# 1st pass:
# Loop over the segments, fill in the segment info dictionary
for module in lldb.target.module_iter():
for seg_ea in module.section_iter():
- seg_info = {'module': module.file.GetFilename() }
- seg_info['start'], seg_info['end'], seg_size, seg_info['name'] = get_section_info(seg_ea)
+ seg_info = {"module": module.file.GetFilename()}
+ (
+ seg_info["start"],
+ seg_info["end"],
+ seg_size,
+ seg_info["name"],
+ ) = get_section_info(seg_ea)
# TODO: Ugly hack for -1 LONG address on 32-bit
- if seg_info['start'] >= sys.maxint or seg_size <= 0:
- print "Throwing away page: {}".format(seg_info['name'])
+ if seg_info["start"] >= sys.maxint or seg_size <= 0:
+ print "Throwing away page: {}".format(seg_info["name"])
continue
# Page-align segment
- seg_info['start'] = ALIGN_PAGE_DOWN(seg_info['start'])
- seg_info['end'] = ALIGN_PAGE_UP(seg_info['end'])
- print("Appending: {}".format(seg_info['name']))
+ seg_info["start"] = ALIGN_PAGE_DOWN(seg_info["start"])
+ seg_info["end"] = ALIGN_PAGE_UP(seg_info["end"])
+ print ("Appending: {}".format(seg_info["name"]))
raw_segment_list.append(seg_info)
# Add the stack memory region (just hardcode 0x1000 around the current SP)
sp = lldb.frame.GetSP()
start_sp = ALIGN_PAGE_DOWN(sp)
- raw_segment_list.append({'start': start_sp, 'end': start_sp + 0x1000, 'name': 'STACK'})
+ raw_segment_list.append(
+ {"start": start_sp, "end": start_sp + 0x1000, "name": "STACK"}
+ )
# Write the original memory to file for debugging
- index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), 'w')
+ index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), "w")
index_file.write(json.dumps(raw_segment_list, indent=4))
- index_file.close()
+ index_file.close()
- # Loop over raw memory regions
+ # Loop over raw memory regions
mem_info = lldb.SBMemoryRegionInfo()
start_addr = -1
next_region_addr = 0
@@ -218,15 +240,20 @@ def dump_process_memory(output_dir):
end_addr = mem_info.GetRegionEnd()
# Unknown region name
- region_name = 'UNKNOWN'
+ region_name = "UNKNOWN"
# Ignore regions that aren't even mapped
if mem_info.IsMapped() and mem_info.IsReadable():
- mem_info_obj = {'start': start_addr, 'end': end_addr, 'name': region_name, 'permissions': {
- "r": mem_info.IsReadable(),
- "w": mem_info.IsWritable(),
- "x": mem_info.IsExecutable()
- }}
+ mem_info_obj = {
+ "start": start_addr,
+ "end": end_addr,
+ "name": region_name,
+ "permissions": {
+ "r": mem_info.IsReadable(),
+ "w": mem_info.IsWritable(),
+ "x": mem_info.IsExecutable(),
+ },
+ }
raw_memory_list.append(mem_info_obj)
@@ -234,65 +261,89 @@ def dump_process_memory(output_dir):
for seg_info in final_segment_list:
try:
- seg_info['content_file'] = ''
- start_addr = seg_info['start']
- end_addr = seg_info['end']
- region_name = seg_info['name']
+ seg_info["content_file"] = ""
+ start_addr = seg_info["start"]
+ end_addr = seg_info["end"]
+ region_name = seg_info["name"]
# Compress and dump the content to a file
err = lldb.SBError()
- seg_content = lldb.process.ReadMemory(start_addr, end_addr - start_addr, err)
- if(seg_content == None):
- print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(start_addr, region_name))
- seg_info['content_file'] = ''
+ seg_content = lldb.process.ReadMemory(
+ start_addr, end_addr - start_addr, err
+ )
+ if seg_content == None:
+ print (
+ "Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(
+ start_addr, region_name
+ )
+ )
+ seg_info["content_file"] = ""
else:
- print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(start_addr, len(seg_content), region_name, repr(seg_info['permissions'])))
+ print (
+ "Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(
+ start_addr,
+ len(seg_content),
+ region_name,
+ repr(seg_info["permissions"]),
+ )
+ )
compressed_seg_content = zlib.compress(seg_content)
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
- seg_info['content_file'] = md5_sum
-
+ seg_info["content_file"] = md5_sum
+
# Write the compressed contents to disk
- out_file = open(os.path.join(output_dir, md5_sum), 'wb')
+ out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content)
out_file.close()
-
+
except:
- print("Exception reading segment ({}): {}".format(region_name, sys.exc_info()[0]))
-
+ print (
+ "Exception reading segment ({}): {}".format(
+ region_name, sys.exc_info()[0]
+ )
+ )
+
return final_segment_list
-#----------
-#---- Main
-
+
+# ----------
+# ---- Main
+
+
def main():
try:
- print("----- Unicorn Context Dumper -----")
- print("You must be actively debugging before running this!")
- print("If it fails, double check that you are actively debugging before running.")
-
+ print ("----- Unicorn Context Dumper -----")
+ print ("You must be actively debugging before running this!")
+ print (
+ "If it fails, double check that you are actively debugging before running."
+ )
+
# Create the output directory
- timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S')
+ timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
+ "%Y%m%d_%H%M%S"
+ )
output_path = "UnicornContext_" + timestamp
if not os.path.exists(output_path):
os.makedirs(output_path)
- print("Process context will be output to {}".format(output_path))
-
+ print ("Process context will be output to {}".format(output_path))
+
# Get the context
context = {
"arch": dump_arch_info(),
- "regs": dump_regs(),
+ "regs": dump_regs(),
"segments": dump_process_memory(output_path),
}
-
+
# Write the index file
- index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w')
+ index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4))
- index_file.close()
- print("Done.")
-
+ index_file.close()
+ print ("Done.")
+
except Exception, e:
- print("!!! ERROR:\n\t{}".format(repr(e)))
-
+ print ("!!! ERROR:\n\t{}".format(repr(e)))
+
+
if __name__ == "__main__":
main()
elif lldb.debugger:
diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py b/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py
index dc56b2aa..eccbc8bf 100644
--- a/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py
+++ b/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py
@@ -59,45 +59,47 @@ MAX_SEG_SIZE = 128 * 1024 * 1024
# Name of the index file
INDEX_FILE_NAME = "_index.json"
-#----------------------
-#---- Helper Functions
+# ----------------------
+# ---- Helper Functions
+
def map_arch():
- arch = pwndbg.arch.current # from PWNDBG
- if 'x86_64' in arch or 'x86-64' in arch:
+ arch = pwndbg.arch.current # from PWNDBG
+ if "x86_64" in arch or "x86-64" in arch:
return "x64"
- elif 'x86' in arch or 'i386' in arch:
+ elif "x86" in arch or "i386" in arch:
return "x86"
- elif 'aarch64' in arch or 'arm64' in arch:
+ elif "aarch64" in arch or "arm64" in arch:
return "arm64le"
- elif 'aarch64_be' in arch:
+ elif "aarch64_be" in arch:
return "arm64be"
- elif 'arm' in arch:
- cpsr = pwndbg.regs['cpsr']
- # check endianess
- if pwndbg.arch.endian == 'big':
+ elif "arm" in arch:
+ cpsr = pwndbg.regs["cpsr"]
+ # check endianess
+ if pwndbg.arch.endian == "big":
# check for THUMB mode
- if (cpsr & (1 << 5)):
+ if cpsr & (1 << 5):
return "armbethumb"
else:
return "armbe"
else:
# check for THUMB mode
- if (cpsr & (1 << 5)):
+ if cpsr & (1 << 5):
return "armlethumb"
else:
return "armle"
- elif 'mips' in arch:
- if pwndbg.arch.endian == 'little':
- return 'mipsel'
+ elif "mips" in arch:
+ if pwndbg.arch.endian == "little":
+ return "mipsel"
else:
- return 'mips'
+ return "mips"
else:
return ""
-#-----------------------
-#---- Dumping functions
+# -----------------------
+# ---- Dumping functions
+
def dump_arch_info():
arch_info = {}
@@ -110,26 +112,26 @@ def dump_regs():
for reg in pwndbg.regs.all:
reg_val = pwndbg.regs[reg]
# current dumper script looks for register values to be hex strings
-# reg_str = "0x{:08x}".format(reg_val)
-# if "64" in get_arch():
-# reg_str = "0x{:016x}".format(reg_val)
-# reg_state[reg.strip().strip('$')] = reg_str
- reg_state[reg.strip().strip('$')] = reg_val
+ # reg_str = "0x{:08x}".format(reg_val)
+ # if "64" in get_arch():
+ # reg_str = "0x{:016x}".format(reg_val)
+ # reg_state[reg.strip().strip('$')] = reg_str
+ reg_state[reg.strip().strip("$")] = reg_val
return reg_state
def dump_process_memory(output_dir):
# Segment information dictionary
final_segment_list = []
-
+
# PWNDBG:
vmmap = pwndbg.vmmap.get()
-
+
# Pointer to end of last dumped memory segment
- segment_last_addr = 0x0;
+ segment_last_addr = 0x0
start = None
- end = None
+ end = None
if not vmmap:
print("No address mapping information found")
@@ -141,86 +143,107 @@ def dump_process_memory(output_dir):
continue
start = entry.start
- end = entry.end
+ end = entry.end
- if (segment_last_addr > entry.start): # indicates overlap
- if (segment_last_addr > entry.end): # indicates complete overlap, so we skip the segment entirely
+ if segment_last_addr > entry.start: # indicates overlap
+ if (
+ segment_last_addr > entry.end
+ ): # indicates complete overlap, so we skip the segment entirely
continue
- else:
+ else:
start = segment_last_addr
-
-
- seg_info = {'start': start, 'end': end, 'name': entry.objfile, 'permissions': {
- "r": entry.read,
- "w": entry.write,
- "x": entry.execute
- }, 'content_file': ''}
+
+ seg_info = {
+ "start": start,
+ "end": end,
+ "name": entry.objfile,
+ "permissions": {"r": entry.read, "w": entry.write, "x": entry.execute},
+ "content_file": "",
+ }
# "(deleted)" may or may not be valid, but don't push it.
- if entry.read and not '(deleted)' in entry.objfile:
+ if entry.read and not "(deleted)" in entry.objfile:
try:
# Compress and dump the content to a file
seg_content = pwndbg.memory.read(start, end - start)
- if(seg_content == None):
- print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.start, entry.objfile))
+ if seg_content == None:
+ print(
+ "Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(
+ entry.start, entry.objfile
+ )
+ )
else:
- print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.start, len(seg_content), entry.objfile, repr(seg_info['permissions'])))
+ print(
+ "Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(
+ entry.start,
+ len(seg_content),
+ entry.objfile,
+ repr(seg_info["permissions"]),
+ )
+ )
compressed_seg_content = zlib.compress(str(seg_content))
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info["content_file"] = md5_sum
-
+
# Write the compressed contents to disk
- out_file = open(os.path.join(output_dir, md5_sum), 'wb')
+ out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content)
out_file.close()
except Exception as e:
traceback.print_exc()
- print("Exception reading segment ({}): {}".format(entry.objfile, sys.exc_info()[0]))
+ print(
+ "Exception reading segment ({}): {}".format(
+ entry.objfile, sys.exc_info()[0]
+ )
+ )
else:
print("Skipping segment {0}@0x{1:016x}".format(entry.objfile, entry.start))
-
+
segment_last_addr = end
# Add the segment to the list
final_segment_list.append(seg_info)
-
return final_segment_list
-#----------
-#---- Main
-
+
+# ----------
+# ---- Main
+
+
def main():
print("----- Unicorn Context Dumper -----")
print("You must be actively debugging before running this!")
print("If it fails, double check that you are actively debugging before running.")
-
+
try:
# Create the output directory
- timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S')
+ timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
+ "%Y%m%d_%H%M%S"
+ )
output_path = "UnicornContext_" + timestamp
if not os.path.exists(output_path):
os.makedirs(output_path)
print("Process context will be output to {}".format(output_path))
-
+
# Get the context
context = {
"arch": dump_arch_info(),
- "regs": dump_regs(),
+ "regs": dump_regs(),
"segments": dump_process_memory(output_path),
}
# Write the index file
- index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w')
+ index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4))
- index_file.close()
+ index_file.close()
print("Done.")
-
+
except Exception as e:
print("!!! ERROR:\n\t{}".format(repr(e)))
-
+
+
if __name__ == "__main__" and pwndbg_loaded:
main()
-
diff --git a/unicorn_mode/samples/compcov_x64/compcov_test_harness.py b/unicorn_mode/samples/compcov_x64/compcov_test_harness.py
index b9ebb61d..f0749d1b 100644
--- a/unicorn_mode/samples/compcov_x64/compcov_test_harness.py
+++ b/unicorn_mode/samples/compcov_x64/compcov_test_harness.py
@@ -22,48 +22,81 @@ from unicornafl import *
from unicornafl.x86_const import *
# Path to the file containing the binary to emulate
-BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'compcov_target.bin')
+BINARY_FILE = os.path.join(
+ os.path.dirname(os.path.abspath(__file__)), "compcov_target.bin"
+)
# Memory map for the code to be tested
-CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
+CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
CODE_SIZE_MAX = 0x00010000 # Max size for the code (64kb)
STACK_ADDRESS = 0x00200000 # Address of the stack (arbitrarily chosen)
-STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen)
-DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed
+STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen)
+DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed
DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data
try:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
from capstone import *
+
cs = Cs(CS_ARCH_X86, CS_MODE_64)
+
def unicorn_debug_instruction(uc, address, size, user_data):
mem = uc.mem_read(address, size)
- for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size):
+ for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
+ bytes(mem), size
+ ):
print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr))
+
+
except ImportError:
+
def unicorn_debug_instruction(uc, address, size, user_data):
print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
+
def unicorn_debug_block(uc, address, size, user_data):
print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
+
def unicorn_debug_mem_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE:
- print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value))
+ print(
+ " >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
+ address, size, value
+ )
+ )
else:
print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
+
def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED:
- print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value))
+ print(
+ " >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
+ address, size, value
+ )
+ )
else:
- print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size))
+ print(
+ " >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)
+ )
+
def main():
parser = argparse.ArgumentParser(description="Test harness for compcov_target.bin")
- parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load")
- parser.add_argument('-t', '--trace', default=False, action="store_true", help="Enables debug tracing")
+ parser.add_argument(
+ "input_file",
+ type=str,
+ help="Path to the file containing the mutated input to load",
+ )
+ parser.add_argument(
+ "-t",
+ "--trace",
+ default=False,
+ action="store_true",
+ help="Enables debug tracing",
+ )
args = parser.parse_args()
# Instantiate a MIPS32 big endian Unicorn Engine instance
@@ -73,13 +106,16 @@ def main():
uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block)
uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction)
uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access)
- uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access)
+ uc.hook_add(
+ UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
+ unicorn_debug_mem_invalid_access,
+ )
- #---------------------------------------------------
+ # ---------------------------------------------------
# Load the binary to emulate and map it into memory
print("Loading data input from {}".format(args.input_file))
- binary_file = open(BINARY_FILE, 'rb')
+ binary_file = open(BINARY_FILE, "rb")
binary_code = binary_file.read()
binary_file.close()
@@ -93,11 +129,11 @@ def main():
uc.mem_write(CODE_ADDRESS, binary_code)
# Set the program counter to the start of the code
- start_address = CODE_ADDRESS # Address of entry point of main()
- end_address = CODE_ADDRESS + 0x55 # Address of last instruction in main()
+ start_address = CODE_ADDRESS # Address of entry point of main()
+ end_address = CODE_ADDRESS + 0x55 # Address of last instruction in main()
uc.reg_write(UC_X86_REG_RIP, start_address)
- #-----------------
+ # -----------------
# Setup the stack
uc.mem_map(STACK_ADDRESS, STACK_SIZE)
@@ -106,8 +142,7 @@ def main():
# Mapping a location to write our buffer to
uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX)
-
- #-----------------------------------------------
+ # -----------------------------------------------
# Load the mutated input and map it into memory
def place_input_callback(uc, input, _, data):
@@ -121,7 +156,7 @@ def main():
# Write the mutated command into the data buffer
uc.mem_write(DATA_ADDRESS, input)
- #------------------------------------------------------------
+ # ------------------------------------------------------------
# Emulate the code, allowing it to process the mutated input
print("Starting the AFL fuzz")
@@ -129,8 +164,9 @@ def main():
input_file=args.input_file,
place_input_callback=place_input_callback,
exits=[end_address],
- persistent_iters=1
+ persistent_iters=1,
)
+
if __name__ == "__main__":
main()
diff --git a/unicorn_mode/samples/simple/simple_test_harness.py b/unicorn_mode/samples/simple/simple_test_harness.py
index 4a673daf..cd04ad3a 100644
--- a/unicorn_mode/samples/simple/simple_test_harness.py
+++ b/unicorn_mode/samples/simple/simple_test_harness.py
@@ -22,48 +22,81 @@ from unicornafl import *
from unicornafl.mips_const import *
# Path to the file containing the binary to emulate
-BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'simple_target.bin')
+BINARY_FILE = os.path.join(
+ os.path.dirname(os.path.abspath(__file__)), "simple_target.bin"
+)
# Memory map for the code to be tested
-CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
+CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
CODE_SIZE_MAX = 0x00010000 # Max size for the code (64kb)
STACK_ADDRESS = 0x00200000 # Address of the stack (arbitrarily chosen)
-STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen)
-DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed
+STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen)
+DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed
DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data
try:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
from capstone import *
+
cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN)
+
def unicorn_debug_instruction(uc, address, size, user_data):
mem = uc.mem_read(address, size)
- for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size):
+ for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
+ bytes(mem), size
+ ):
print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr))
+
+
except ImportError:
+
def unicorn_debug_instruction(uc, address, size, user_data):
- print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
+ print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
+
def unicorn_debug_block(uc, address, size, user_data):
print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
-
+
+
def unicorn_debug_mem_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE:
- print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value))
+ print(
+ " >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
+ address, size, value
+ )
+ )
else:
- print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
+ print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
+
def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED:
- print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value))
+ print(
+ " >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
+ address, size, value
+ )
+ )
else:
- print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size))
+ print(
+ " >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)
+ )
+
def main():
parser = argparse.ArgumentParser(description="Test harness for simple_target.bin")
- parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load")
- parser.add_argument('-t', '--trace', default=False, action="store_true", help="Enables debug tracing")
+ parser.add_argument(
+ "input_file",
+ type=str,
+ help="Path to the file containing the mutated input to load",
+ )
+ parser.add_argument(
+ "-t",
+ "--trace",
+ default=False,
+ action="store_true",
+ help="Enables debug tracing",
+ )
args = parser.parse_args()
# Instantiate a MIPS32 big endian Unicorn Engine instance
@@ -73,13 +106,16 @@ def main():
uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block)
uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction)
uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access)
- uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access)
+ uc.hook_add(
+ UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
+ unicorn_debug_mem_invalid_access,
+ )
- #---------------------------------------------------
+ # ---------------------------------------------------
# Load the binary to emulate and map it into memory
print("Loading data input from {}".format(args.input_file))
- binary_file = open(BINARY_FILE, 'rb')
+ binary_file = open(BINARY_FILE, "rb")
binary_code = binary_file.read()
binary_file.close()
@@ -93,11 +129,11 @@ def main():
uc.mem_write(CODE_ADDRESS, binary_code)
# Set the program counter to the start of the code
- start_address = CODE_ADDRESS # Address of entry point of main()
- end_address = CODE_ADDRESS + 0xf4 # Address of last instruction in main()
+ start_address = CODE_ADDRESS # Address of entry point of main()
+ end_address = CODE_ADDRESS + 0xF4 # Address of last instruction in main()
uc.reg_write(UC_MIPS_REG_PC, start_address)
- #-----------------
+ # -----------------
# Setup the stack
uc.mem_map(STACK_ADDRESS, STACK_SIZE)
@@ -106,14 +142,14 @@ def main():
# reserve some space for data
uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX)
- #-----------------------------------------------------
+ # -----------------------------------------------------
# Set up a callback to place input data (do little work here, it's called for every single iteration)
# We did not pass in any data and don't use persistent mode, so we can ignore these params.
# Be sure to check out the docstrings for the uc.afl_* functions.
def place_input_callback(uc, input, persistent_round, data):
# Apply constraints to the mutated input
if len(input) > DATA_SIZE_MAX:
- #print("Test input is too long (> {} bytes)")
+ # print("Test input is too long (> {} bytes)")
return False
# Write the mutated command into the data buffer
@@ -122,5 +158,6 @@ def main():
# Start the fuzzer.
uc.afl_fuzz(args.input_file, place_input_callback, [end_address])
+
if __name__ == "__main__":
main()
diff --git a/unicorn_mode/samples/simple/simple_test_harness_alt.py b/unicorn_mode/samples/simple/simple_test_harness_alt.py
index 9c3dbc93..3249b13d 100644
--- a/unicorn_mode/samples/simple/simple_test_harness_alt.py
+++ b/unicorn_mode/samples/simple/simple_test_harness_alt.py
@@ -25,50 +25,79 @@ from unicornafl import *
from unicornafl.mips_const import *
# Path to the file containing the binary to emulate
-BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'simple_target.bin')
+BINARY_FILE = os.path.join(
+ os.path.dirname(os.path.abspath(__file__)), "simple_target.bin"
+)
# Memory map for the code to be tested
-CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
+CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
CODE_SIZE_MAX = 0x00010000 # Max size for the code (64kb)
STACK_ADDRESS = 0x00200000 # Address of the stack (arbitrarily chosen)
-STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen)
-DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed
+STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen)
+DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed
DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data
try:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
from capstone import *
+
cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN)
+
def unicorn_debug_instruction(uc, address, size, user_data):
mem = uc.mem_read(address, size)
- for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size):
+ for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
+ bytes(mem), size
+ ):
print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr))
+
+
except ImportError:
+
def unicorn_debug_instruction(uc, address, size, user_data):
- print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
+ print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
+
def unicorn_debug_block(uc, address, size, user_data):
print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
-
+
+
def unicorn_debug_mem_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE:
- print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value))
+ print(
+ " >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
+ address, size, value
+ )
+ )
else:
- print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
+ print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
+
def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED:
- print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value))
+ print(
+ " >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
+ address, size, value
+ )
+ )
else:
- print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size))
+ print(
+ " >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)
+ )
+
def force_crash(uc_error):
# This function should be called to indicate to AFL that a crash occurred during emulation.
# Pass in the exception received from Uc.emu_start()
mem_errors = [
- UC_ERR_READ_UNMAPPED, UC_ERR_READ_PROT, UC_ERR_READ_UNALIGNED,
- UC_ERR_WRITE_UNMAPPED, UC_ERR_WRITE_PROT, UC_ERR_WRITE_UNALIGNED,
- UC_ERR_FETCH_UNMAPPED, UC_ERR_FETCH_PROT, UC_ERR_FETCH_UNALIGNED,
+ UC_ERR_READ_UNMAPPED,
+ UC_ERR_READ_PROT,
+ UC_ERR_READ_UNALIGNED,
+ UC_ERR_WRITE_UNMAPPED,
+ UC_ERR_WRITE_PROT,
+ UC_ERR_WRITE_UNALIGNED,
+ UC_ERR_FETCH_UNMAPPED,
+ UC_ERR_FETCH_PROT,
+ UC_ERR_FETCH_UNALIGNED,
]
if uc_error.errno in mem_errors:
# Memory error - throw SIGSEGV
@@ -80,11 +109,22 @@ def force_crash(uc_error):
# Not sure what happened - throw SIGABRT
os.kill(os.getpid(), signal.SIGABRT)
+
def main():
parser = argparse.ArgumentParser(description="Test harness for simple_target.bin")
- parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load")
- parser.add_argument('-d', '--debug', default=False, action="store_true", help="Enables debug tracing")
+ parser.add_argument(
+ "input_file",
+ type=str,
+ help="Path to the file containing the mutated input to load",
+ )
+ parser.add_argument(
+ "-d",
+ "--debug",
+ default=False,
+ action="store_true",
+ help="Enables debug tracing",
+ )
args = parser.parse_args()
# Instantiate a MIPS32 big endian Unicorn Engine instance
@@ -94,13 +134,16 @@ def main():
uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block)
uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction)
uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access)
- uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access)
+ uc.hook_add(
+ UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
+ unicorn_debug_mem_invalid_access,
+ )
- #---------------------------------------------------
+ # ---------------------------------------------------
# Load the binary to emulate and map it into memory
print("Loading data input from {}".format(args.input_file))
- binary_file = open(BINARY_FILE, 'rb')
+ binary_file = open(BINARY_FILE, "rb")
binary_code = binary_file.read()
binary_file.close()
@@ -114,11 +157,11 @@ def main():
uc.mem_write(CODE_ADDRESS, binary_code)
# Set the program counter to the start of the code
- start_address = CODE_ADDRESS # Address of entry point of main()
- end_address = CODE_ADDRESS + 0xf4 # Address of last instruction in main()
+ start_address = CODE_ADDRESS # Address of entry point of main()
+ end_address = CODE_ADDRESS + 0xF4 # Address of last instruction in main()
uc.reg_write(UC_MIPS_REG_PC, start_address)
- #-----------------
+ # -----------------
# Setup the stack
uc.mem_map(STACK_ADDRESS, STACK_SIZE)
@@ -127,10 +170,10 @@ def main():
# reserve some space for data
uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX)
- #-----------------------------------------------------
+ # -----------------------------------------------------
# Kick off AFL's fork server
- # THIS MUST BE DONE BEFORE LOADING USER DATA!
- # If this isn't done every single run, the AFL fork server
+ # THIS MUST BE DONE BEFORE LOADING USER DATA!
+ # If this isn't done every single run, the AFL fork server
# will not be started appropriately and you'll get erratic results!
print("Starting the AFL forkserver")
@@ -142,12 +185,12 @@ def main():
else:
out = lambda x, y: print(x.format(y))
- #-----------------------------------------------
+ # -----------------------------------------------
# Load the mutated input and map it into memory
# Load the mutated input from disk
out("Loading data input from {}", args.input_file)
- input_file = open(args.input_file, 'rb')
+ input_file = open(args.input_file, "rb")
input = input_file.read()
input_file.close()
@@ -159,7 +202,7 @@ def main():
# Write the mutated command into the data buffer
uc.mem_write(DATA_ADDRESS, input)
- #------------------------------------------------------------
+ # ------------------------------------------------------------
# Emulate the code, allowing it to process the mutated input
out("Executing until a crash or execution reaches 0x{0:016x}", end_address)
@@ -175,5 +218,6 @@ def main():
# UC_AFL_RET_FINISHED = 3
out("Done. AFL Mode is {}", afl_mode)
+
if __name__ == "__main__":
main()
diff --git a/unicorn_mode/samples/speedtest/python/harness.py b/unicorn_mode/samples/speedtest/python/harness.py
index f72eb32b..801ef4d1 100644
--- a/unicorn_mode/samples/speedtest/python/harness.py
+++ b/unicorn_mode/samples/speedtest/python/harness.py
@@ -256,17 +256,17 @@ def main():
input_len = len(input)
# global input_len
if input_len > INPUT_MAX:
- #print("Test input is too long (> {} bytes)")
+ # print("Test input is too long (> {} bytes)")
return False
# print(f"Placing input: {input} in round {persistent_round}")
# Make sure the string is always 0-terminated (as it would be "in the wild")
- input[-1] = b'\0'
+ input[-1] = b"\0"
# Write the mutated command into the data buffer
uc.mem_write(INPUT_ADDRESS, input)
- #uc.reg_write(UC_X86_REG_RIP, main_offset)
+ # uc.reg_write(UC_X86_REG_RIP, main_offset)
print(f"Starting to fuzz. Running from addr {main_offset} to one of {main_ends}")
# Start the fuzzer.
diff --git a/utils/afl_untracer/ida_get_patchpoints.py b/utils/afl_untracer/ida_get_patchpoints.py
index 43cf6d89..807685b3 100644
--- a/utils/afl_untracer/ida_get_patchpoints.py
+++ b/utils/afl_untracer/ida_get_patchpoints.py
@@ -11,6 +11,7 @@ import idc
# See https://www.hex-rays.com/products/ida/support/ida74_idapython_no_bc695_porting_guide.shtml
from os.path import expanduser
+
home = expanduser("~")
patchpoints = set()
@@ -18,7 +19,7 @@ patchpoints = set()
max_offset = 0
for seg_ea in idautils.Segments():
name = idc.get_segm_name(seg_ea)
- #print("Segment: " + name)
+ # print("Segment: " + name)
if name != "__text" and name != ".text":
continue
@@ -26,7 +27,7 @@ for seg_ea in idautils.Segments():
end = idc.get_segm_end(seg_ea)
first = 0
subtract_addr = 0
- #print("Start: " + hex(start) + " End: " + hex(end))
+ # print("Start: " + hex(start) + " End: " + hex(end))
for func_ea in idautils.Functions(start, end):
f = idaapi.get_func(func_ea)
if not f:
@@ -37,10 +38,10 @@ for seg_ea in idautils.Segments():
if block.start_ea >= 0x1000:
subtract_addr = 0x1000
first = 1
-
+
max_offset = max(max_offset, block.start_ea)
patchpoints.add(block.start_ea - subtract_addr)
- #else:
+ # else:
# print("Warning: broken CFG?")
# Round up max_offset to page size
@@ -52,11 +53,11 @@ if rem != 0:
print("Writing to " + home + "/Desktop/patches.txt")
with open(home + "/Desktop/patches.txt", "w") as f:
- f.write(ida_nalt.get_root_filename() + ':' + hex(size) + '\n')
- f.write('\n'.join(map(hex, sorted(patchpoints))))
- f.write('\n')
+ f.write(ida_nalt.get_root_filename() + ":" + hex(size) + "\n")
+ f.write("\n".join(map(hex, sorted(patchpoints))))
+ f.write("\n")
print("Done, found {} patchpoints".format(len(patchpoints)))
# For headless script running remove the comment from the next line
-#ida_pro.qexit()
+# ida_pro.qexit()
diff --git a/utils/custom_mutators/XmlMutatorMin.py b/utils/custom_mutators/XmlMutatorMin.py
index 4c80a2ba..3e6cd0ff 100644
--- a/utils/custom_mutators/XmlMutatorMin.py
+++ b/utils/custom_mutators/XmlMutatorMin.py
@@ -12,12 +12,13 @@ import random, re, io
# The XmlMutatorMin class #
###########################
+
class XmlMutatorMin:
"""
- Optionals parameters:
- seed Seed used by the PRNG (default: "RANDOM")
- verbose Verbosity (default: False)
+ Optionals parameters:
+ seed Seed used by the PRNG (default: "RANDOM")
+ verbose Verbosity (default: False)
"""
def __init__(self, seed="RANDOM", verbose=False):
@@ -41,7 +42,12 @@ class XmlMutatorMin:
self.tree = None
# High-level mutators (no database needed)
- hl_mutators_delete = ["del_node_and_children", "del_node_but_children", "del_attribute", "del_content"] # Delete items
+ hl_mutators_delete = [
+ "del_node_and_children",
+ "del_node_but_children",
+ "del_attribute",
+ "del_content",
+ ] # Delete items
hl_mutators_fuzz = ["fuzz_attribute"] # Randomly change attribute values
# Exposed mutators
@@ -74,7 +80,9 @@ class XmlMutatorMin:
""" Serialize a XML document. Basic wrapper around lxml.tostring() """
- return ET.tostring(tree, with_tail=False, xml_declaration=True, encoding=tree.docinfo.encoding)
+ return ET.tostring(
+ tree, with_tail=False, xml_declaration=True, encoding=tree.docinfo.encoding
+ )
def __ver(self, version):
@@ -161,7 +169,7 @@ class XmlMutatorMin:
# Randomly pick one the function calls
(func, args) = random.choice(l)
# Split by "," and randomly pick one of the arguments
- value = random.choice(args.split(','))
+ value = random.choice(args.split(","))
# Remove superfluous characters
unclean_value = value
value = value.strip(" ").strip("'")
@@ -170,49 +178,49 @@ class XmlMutatorMin:
value = attrib_value
# For each type, define some possible replacement values
- choices_number = ( \
- "0", \
- "11111", \
- "-128", \
- "2", \
- "-1", \
- "1/3", \
- "42/0", \
- "1094861636 idiv 1.0", \
- "-1123329771506872 idiv 3.8", \
- "17=$numericRTF", \
- str(3 + random.randrange(0, 100)), \
- )
+ choices_number = (
+ "0",
+ "11111",
+ "-128",
+ "2",
+ "-1",
+ "1/3",
+ "42/0",
+ "1094861636 idiv 1.0",
+ "-1123329771506872 idiv 3.8",
+ "17=$numericRTF",
+ str(3 + random.randrange(0, 100)),
+ )
- choices_letter = ( \
- "P" * (25 * random.randrange(1, 100)), \
- "%s%s%s%s%s%s", \
- "foobar", \
- )
+ choices_letter = (
+ "P" * (25 * random.randrange(1, 100)),
+ "%s%s%s%s%s%s",
+ "foobar",
+ )
- choices_alnum = ( \
- "Abc123", \
- "020F0302020204030204", \
- "020F0302020204030204" * (random.randrange(5, 20)), \
- )
+ choices_alnum = (
+ "Abc123",
+ "020F0302020204030204",
+ "020F0302020204030204" * (random.randrange(5, 20)),
+ )
# Fuzz the value
- if random.choice((True,False)) and value == "":
+ if random.choice((True, False)) and value == "":
# Empty
new_value = value
- elif random.choice((True,False)) and value.isdigit():
+ elif random.choice((True, False)) and value.isdigit():
# Numbers
new_value = random.choice(choices_number)
- elif random.choice((True,False)) and value.isalpha():
+ elif random.choice((True, False)) and value.isalpha():
# Letters
new_value = random.choice(choices_letter)
- elif random.choice((True,False)) and value.isalnum():
+ elif random.choice((True, False)) and value.isalnum():
# Alphanumeric
new_value = random.choice(choices_alnum)
@@ -232,22 +240,25 @@ class XmlMutatorMin:
# Log something
if self.verbose:
- print("Fuzzing attribute #%i '%s' of tag #%i '%s'" % (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag))
+ print(
+ "Fuzzing attribute #%i '%s' of tag #%i '%s'"
+ % (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag)
+ )
# Modify the attribute
rand_elem.set(rand_attrib, new_value.decode("utf-8"))
def __del_node_and_children(self):
- """ High-level minimizing mutator
- Delete a random node and its children (i.e. delete a random tree) """
+ """High-level minimizing mutator
+ Delete a random node and its children (i.e. delete a random tree)"""
self.__del_node(True)
def __del_node_but_children(self):
- """ High-level minimizing mutator
- Delete a random node but its children (i.e. link them to the parent of the deleted node) """
+ """High-level minimizing mutator
+ Delete a random node but its children (i.e. link them to the parent of the deleted node)"""
self.__del_node(False)
@@ -270,7 +281,10 @@ class XmlMutatorMin:
# Log something
if self.verbose:
but_or_and = "and" if delete_children else "but"
- print("Deleting tag #%i '%s' %s its children" % (rand_elem_id, rand_elem.tag, but_or_and))
+ print(
+ "Deleting tag #%i '%s' %s its children"
+ % (rand_elem_id, rand_elem.tag, but_or_and)
+ )
if delete_children is False:
# Link children of the random (soon to be deleted) node to its parent
@@ -282,8 +296,8 @@ class XmlMutatorMin:
def __del_content(self):
- """ High-level minimizing mutator
- Delete the attributes and children of a random node """
+ """High-level minimizing mutator
+ Delete the attributes and children of a random node"""
# Select a node to modify
(rand_elem_id, rand_elem) = self.__pick_element()
@@ -297,8 +311,8 @@ class XmlMutatorMin:
def __del_attribute(self):
- """ High-level minimizing mutator
- Delete a random attribute from a random node """
+ """High-level minimizing mutator
+ Delete a random attribute from a random node"""
# Select a node to modify
(rand_elem_id, rand_elem) = self.__pick_element()
@@ -318,7 +332,10 @@ class XmlMutatorMin:
# Log something
if self.verbose:
- print("Deleting attribute #%i '%s' of tag #%i '%s'" % (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag))
+ print(
+ "Deleting attribute #%i '%s' of tag #%i '%s'"
+ % (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag)
+ )
# Delete the attribute
rand_elem.attrib.pop(rand_attrib)
@@ -329,4 +346,3 @@ class XmlMutatorMin:
# High-level mutation
self.__exec_among(self, self.hl_mutators_all, min, max)
-
diff --git a/utils/custom_mutators/common.py b/utils/custom_mutators/common.py
index 9a1ef0a3..44a5056a 100644
--- a/utils/custom_mutators/common.py
+++ b/utils/custom_mutators/common.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# encoding: utf-8
-'''
+"""
Module containing functions shared between multiple AFL modules
@author: Christian Holler (:decoder)
@@ -12,7 +12,7 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
@contact: choller@mozilla.com
-'''
+"""
from __future__ import print_function
import random
@@ -23,18 +23,18 @@ import re
def randel(l):
if not l:
return None
- return l[random.randint(0, len(l)-1)]
+ return l[random.randint(0, len(l) - 1)]
def randel_pop(l):
if not l:
return None
- return l.pop(random.randint(0, len(l)-1))
+ return l.pop(random.randint(0, len(l) - 1))
def write_exc_example(data, exc):
- exc_name = re.sub(r'[^a-zA-Z0-9]', '_', repr(exc))
+ exc_name = re.sub(r"[^a-zA-Z0-9]", "_", repr(exc))
if not os.path.exists(exc_name):
- with open(exc_name, 'w') as f:
+ with open(exc_name, "w") as f:
f.write(data)
diff --git a/utils/custom_mutators/example.py b/utils/custom_mutators/example.py
index cf659e5a..3a6d22e4 100644
--- a/utils/custom_mutators/example.py
+++ b/utils/custom_mutators/example.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# encoding: utf-8
-'''
+"""
Example Python Module for AFLFuzz
@author: Christian Holler (:decoder)
@@ -12,7 +12,7 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
@contact: choller@mozilla.com
-'''
+"""
import random
@@ -26,12 +26,12 @@ COMMANDS = [
def init(seed):
- '''
+ """
Called once when AFLFuzz starts up. Used to seed our RNG.
@type seed: int
@param seed: A 32-bit random value
- '''
+ """
random.seed(seed)
@@ -40,7 +40,7 @@ def deinit():
def fuzz(buf, add_buf, max_size):
- '''
+ """
Called per fuzzing iteration.
@type buf: bytearray
@@ -55,13 +55,14 @@ def fuzz(buf, add_buf, max_size):
@rtype: bytearray
@return: A new bytearray containing the mutated data
- '''
+ """
ret = bytearray(100)
ret[:3] = random.choice(COMMANDS)
return ret
+
# Uncomment and implement the following methods if you want to use a custom
# trimming algorithm. See also the documentation for a better API description.
diff --git a/utils/custom_mutators/simple-chunk-replace.py b/utils/custom_mutators/simple-chunk-replace.py
index df2f4ca7..c57218dd 100644
--- a/utils/custom_mutators/simple-chunk-replace.py
+++ b/utils/custom_mutators/simple-chunk-replace.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# encoding: utf-8
-'''
+"""
Simple Chunk Cross-Over Replacement Module for AFLFuzz
@author: Christian Holler (:decoder)
@@ -12,24 +12,24 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
@contact: choller@mozilla.com
-'''
+"""
import random
def init(seed):
- '''
+ """
Called once when AFLFuzz starts up. Used to seed our RNG.
@type seed: int
@param seed: A 32-bit random value
- '''
+ """
# Seed our RNG
random.seed(seed)
def fuzz(buf, add_buf, max_size):
- '''
+ """
Called per fuzzing iteration.
@type buf: bytearray
@@ -44,7 +44,7 @@ def fuzz(buf, add_buf, max_size):
@rtype: bytearray
@return: A new bytearray containing the mutated data
- '''
+ """
# Make a copy of our input buffer for returning
ret = bytearray(buf)
@@ -58,7 +58,9 @@ def fuzz(buf, add_buf, max_size):
rand_dst_idx = random.randint(0, len(buf))
# Make the chunk replacement
- ret[rand_dst_idx:rand_dst_idx + fragment_len] = add_buf[rand_src_idx:rand_src_idx + fragment_len]
+ ret[rand_dst_idx : rand_dst_idx + fragment_len] = add_buf[
+ rand_src_idx : rand_src_idx + fragment_len
+ ]
# Return data
return ret
diff --git a/utils/custom_mutators/wrapper_afl_min.py b/utils/custom_mutators/wrapper_afl_min.py
index ecb03b55..5cd60031 100644
--- a/utils/custom_mutators/wrapper_afl_min.py
+++ b/utils/custom_mutators/wrapper_afl_min.py
@@ -27,7 +27,7 @@ def log(text):
def init(seed):
"""
- Called once when AFL starts up. Seed is used to identify the AFL instance in log files
+ Called once when AFL starts up. Seed is used to identify the AFL instance in log files
"""
global __mutator__
@@ -72,7 +72,10 @@ def fuzz(buf, add_buf, max_size):
if via_buffer:
try:
__mutator__.init_from_string(buf_str)
- log("fuzz(): Mutator successfully initialized with AFL buffer (%d bytes)" % len(buf_str))
+ log(
+ "fuzz(): Mutator successfully initialized with AFL buffer (%d bytes)"
+ % len(buf_str)
+ )
except Exception:
via_buffer = False
log("fuzz(): Can't initialize mutator with AFL buffer")
@@ -104,7 +107,7 @@ def fuzz(buf, add_buf, max_size):
# Main (for debug)
-if __name__ == '__main__':
+if __name__ == "__main__":
__log__ = True
__log_file__ = "/dev/stdout"
@@ -112,7 +115,9 @@ if __name__ == '__main__':
init(__seed__)
- in_1 = bytearray("ffffzzzzzzzzzzzz")
+ in_1 = bytearray(
+ "ffffzzzzzzzzzzzz"
+ )
in_2 = bytearray("")
out = fuzz(in_1, in_2)
print(out)