ran black on python scripts

This commit is contained in:
Dominik Maier
2021-02-15 13:52:23 +01:00
parent 0298ae82b0
commit cebde1f9e6
14 changed files with 743 additions and 392 deletions

View File

@ -48,27 +48,28 @@ INDEX_FILE_NAME = "_index.json"
# ---------------------- # ----------------------
# ---- Helper Functions # ---- Helper Functions
def map_arch(): def map_arch():
arch = get_arch() # from GEF arch = get_arch() # from GEF
if 'x86_64' in arch or 'x86-64' in arch: if "x86_64" in arch or "x86-64" in arch:
return "x64" return "x64"
elif 'x86' in arch or 'i386' in arch: elif "x86" in arch or "i386" in arch:
return "x86" return "x86"
elif 'aarch64' in arch or 'arm64' in arch: elif "aarch64" in arch or "arm64" in arch:
return "arm64le" return "arm64le"
elif 'aarch64_be' in arch: elif "aarch64_be" in arch:
return "arm64be" return "arm64be"
elif 'armeb' in arch: elif "armeb" in arch:
# check for THUMB mode # check for THUMB mode
cpsr = get_register('$cpsr') cpsr = get_register("$cpsr")
if (cpsr & (1 << 5)): if cpsr & (1 << 5):
return "armbethumb" return "armbethumb"
else: else:
return "armbe" return "armbe"
elif 'arm' in arch: elif "arm" in arch:
# check for THUMB mode # check for THUMB mode
cpsr = get_register('$cpsr') cpsr = get_register("$cpsr")
if (cpsr & (1 << 5)): if cpsr & (1 << 5):
return "armlethumb" return "armlethumb"
else: else:
return "armle" return "armle"
@ -79,6 +80,7 @@ def map_arch():
# ----------------------- # -----------------------
# ---- Dumping functions # ---- Dumping functions
def dump_arch_info(): def dump_arch_info():
arch_info = {} arch_info = {}
arch_info["arch"] = map_arch() arch_info["arch"] = map_arch()
@ -89,7 +91,7 @@ def dump_regs():
reg_state = {} reg_state = {}
for reg in current_arch.all_registers: for reg in current_arch.all_registers:
reg_val = get_register(reg) reg_val = get_register(reg)
reg_state[reg.strip().strip('$')] = reg_val reg_state[reg.strip().strip("$")] = reg_val
return reg_state return reg_state
@ -108,47 +110,76 @@ def dump_process_memory(output_dir):
if entry.page_start == entry.page_end: if entry.page_start == entry.page_end:
continue continue
seg_info = {'start': entry.page_start, 'end': entry.page_end, 'name': entry.path, 'permissions': { seg_info = {
"start": entry.page_start,
"end": entry.page_end,
"name": entry.path,
"permissions": {
"r": entry.is_readable() > 0, "r": entry.is_readable() > 0,
"w": entry.is_writable() > 0, "w": entry.is_writable() > 0,
"x": entry.is_executable() > 0 "x": entry.is_executable() > 0,
}, 'content_file': ''} },
"content_file": "",
}
# "(deleted)" may or may not be valid, but don't push it. # "(deleted)" may or may not be valid, but don't push it.
if entry.is_readable() and not '(deleted)' in entry.path: if entry.is_readable() and not "(deleted)" in entry.path:
try: try:
# Compress and dump the content to a file # Compress and dump the content to a file
seg_content = read_memory(entry.page_start, entry.size) seg_content = read_memory(entry.page_start, entry.size)
if(seg_content == None): if seg_content == None:
print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.page_start, entry.path)) print(
"Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(
entry.page_start, entry.path
)
)
else: else:
print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.page_start, len(seg_content), entry.path, repr(seg_info['permissions']))) print(
"Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(
entry.page_start,
len(seg_content),
entry.path,
repr(seg_info["permissions"]),
)
)
compressed_seg_content = zlib.compress(seg_content) compressed_seg_content = zlib.compress(seg_content)
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info["content_file"] = md5_sum seg_info["content_file"] = md5_sum
# Write the compressed contents to disk # Write the compressed contents to disk
out_file = open(os.path.join(output_dir, md5_sum), 'wb') out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content) out_file.write(compressed_seg_content)
out_file.close() out_file.close()
except: except:
print("Exception reading segment ({}): {}".format(entry.path, sys.exc_info()[0])) print(
"Exception reading segment ({}): {}".format(
entry.path, sys.exc_info()[0]
)
)
else: else:
print("Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start)) print(
"Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start)
)
# Add the segment to the list # Add the segment to the list
final_segment_list.append(seg_info) final_segment_list.append(seg_info)
return final_segment_list return final_segment_list
# --------------------------------------------- # ---------------------------------------------
# ---- ARM Extention (dump floating point regs) # ---- ARM Extention (dump floating point regs)
def dump_float(rge=32): def dump_float(rge=32):
reg_convert = "" reg_convert = ""
if map_arch() == "armbe" or map_arch() == "armle" or map_arch() == "armbethumb" or map_arch() == "armbethumb": if (
map_arch() == "armbe"
or map_arch() == "armle"
or map_arch() == "armbethumb"
or map_arch() == "armbethumb"
):
reg_state = {} reg_state = {}
for reg_num in range(32): for reg_num in range(32):
value = gdb.selected_frame().read_register("d" + str(reg_num)) value = gdb.selected_frame().read_register("d" + str(reg_num))
@ -158,9 +189,11 @@ def dump_float(rge=32):
return reg_state return reg_state
# ---------- # ----------
# ---- Main # ---- Main
def main(): def main():
print("----- Unicorn Context Dumper -----") print("----- Unicorn Context Dumper -----")
print("You must be actively debugging before running this!") print("You must be actively debugging before running this!")
@ -175,7 +208,9 @@ def main():
try: try:
# Create the output directory # Create the output directory
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
"%Y%m%d_%H%M%S"
)
output_path = "UnicornContext_" + timestamp output_path = "UnicornContext_" + timestamp
if not os.path.exists(output_path): if not os.path.exists(output_path):
os.makedirs(output_path) os.makedirs(output_path)
@ -190,7 +225,7 @@ def main():
} }
# Write the index file # Write the index file
index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4)) index_file.write(json.dumps(context, indent=4))
index_file.close() index_file.close()
print("Done.") print("Done.")
@ -198,5 +233,6 @@ def main():
except Exception as e: except Exception as e:
print("!!! ERROR:\n\t{}".format(repr(e))) print("!!! ERROR:\n\t{}".format(repr(e)))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -34,6 +34,7 @@ INDEX_FILE_NAME = "_index.json"
# ---------------------- # ----------------------
# ---- Helper Functions # ---- Helper Functions
def get_arch(): def get_arch():
if ph.id == PLFM_386 and ph.flag & PR_USE64: if ph.id == PLFM_386 and ph.flag & PR_USE64:
return "x64" return "x64"
@ -52,6 +53,7 @@ def get_arch():
else: else:
return "" return ""
def get_register_list(arch): def get_register_list(arch):
if arch == "arm64le" or arch == "arm64be": if arch == "arm64le" or arch == "arm64be":
arch = "arm64" arch = "arm64"
@ -60,46 +62,128 @@ def get_register_list(arch):
registers = { registers = {
"x64": [ "x64": [
"rax", "rbx", "rcx", "rdx", "rsi", "rdi", "rbp", "rsp", "rax",
"r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", "rbx",
"rip", "rsp", "efl", "rcx",
"cs", "ds", "es", "fs", "gs", "ss", "rdx",
"rsi",
"rdi",
"rbp",
"rsp",
"r8",
"r9",
"r10",
"r11",
"r12",
"r13",
"r14",
"r15",
"rip",
"rsp",
"efl",
"cs",
"ds",
"es",
"fs",
"gs",
"ss",
], ],
"x86": [ "x86": [
"eax", "ebx", "ecx", "edx", "esi", "edi", "ebp", "esp", "eax",
"eip", "esp", "efl", "ebx",
"cs", "ds", "es", "fs", "gs", "ss", "ecx",
"edx",
"esi",
"edi",
"ebp",
"esp",
"eip",
"esp",
"efl",
"cs",
"ds",
"es",
"fs",
"gs",
"ss",
], ],
"arm": [ "arm": [
"R0", "R1", "R2", "R3", "R4", "R5", "R6", "R7", "R0",
"R8", "R9", "R10", "R11", "R12", "PC", "SP", "LR", "R1",
"R2",
"R3",
"R4",
"R5",
"R6",
"R7",
"R8",
"R9",
"R10",
"R11",
"R12",
"PC",
"SP",
"LR",
"PSR", "PSR",
], ],
"arm64": [ "arm64": [
"X0", "X1", "X2", "X3", "X4", "X5", "X6", "X7", "X0",
"X8", "X9", "X10", "X11", "X12", "X13", "X14", "X1",
"X15", "X16", "X17", "X18", "X19", "X20", "X21", "X2",
"X22", "X23", "X24", "X25", "X26", "X27", "X28", "X3",
"PC", "SP", "FP", "LR", "CPSR" "X4",
"X5",
"X6",
"X7",
"X8",
"X9",
"X10",
"X11",
"X12",
"X13",
"X14",
"X15",
"X16",
"X17",
"X18",
"X19",
"X20",
"X21",
"X22",
"X23",
"X24",
"X25",
"X26",
"X27",
"X28",
"PC",
"SP",
"FP",
"LR",
"CPSR"
# "NZCV", # "NZCV",
] ],
} }
return registers[arch] return registers[arch]
# ----------------------- # -----------------------
# ---- Dumping functions # ---- Dumping functions
def dump_arch_info(): def dump_arch_info():
arch_info = {} arch_info = {}
arch_info["arch"] = get_arch() arch_info["arch"] = get_arch()
return arch_info return arch_info
def dump_regs(): def dump_regs():
reg_state = {} reg_state = {}
for reg in get_register_list(get_arch()): for reg in get_register_list(get_arch()):
reg_state[reg] = GetRegValue(reg) reg_state[reg] = GetRegValue(reg)
return reg_state return reg_state
def dump_process_memory(output_dir): def dump_process_memory(output_dir):
# Segment information dictionary # Segment information dictionary
segment_list = [] segment_list = []
@ -126,17 +210,25 @@ def dump_process_memory(output_dir):
try: try:
# Compress and dump the content to a file # Compress and dump the content to a file
seg_content = get_many_bytes(seg_start, seg_end - seg_start) seg_content = get_many_bytes(seg_start, seg_end - seg_start)
if(seg_content == None): if seg_content == None:
print("Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format(SegName(seg_ea), seg_ea)) print(
"Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format(
SegName(seg_ea), seg_ea
)
)
seg_info["content_file"] = "" seg_info["content_file"] = ""
else: else:
print("Dumping segment {0}@0x{1:016x} (size:{2})".format(SegName(seg_ea), seg_ea, len(seg_content))) print(
"Dumping segment {0}@0x{1:016x} (size:{2})".format(
SegName(seg_ea), seg_ea, len(seg_content)
)
)
compressed_seg_content = zlib.compress(seg_content) compressed_seg_content = zlib.compress(seg_content)
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info["content_file"] = md5_sum seg_info["content_file"] = md5_sum
# Write the compressed contents to disk # Write the compressed contents to disk
out_file = open(os.path.join(output_dir, md5_sum), 'wb') out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content) out_file.write(compressed_seg_content)
out_file.close() out_file.close()
except: except:
@ -151,6 +243,7 @@ def dump_process_memory(output_dir):
return segment_list return segment_list
""" """
TODO: FINISH IMPORT DUMPING TODO: FINISH IMPORT DUMPING
def import_callback(ea, name, ord): def import_callback(ea, name, ord):
@ -173,15 +266,20 @@ def dump_imports():
# ---------- # ----------
# ---- Main # ---- Main
def main(): def main():
try: try:
print("----- Unicorn Context Dumper -----") print("----- Unicorn Context Dumper -----")
print("You must be actively debugging before running this!") print("You must be actively debugging before running this!")
print("If it fails, double check that you are actively debugging before running.") print(
"If it fails, double check that you are actively debugging before running."
)
# Create the output directory # Create the output directory
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
"%Y%m%d_%H%M%S"
)
output_path = os.path.dirname(os.path.abspath(GetIdbPath())) output_path = os.path.dirname(os.path.abspath(GetIdbPath()))
output_path = os.path.join(output_path, "UnicornContext_" + timestamp) output_path = os.path.join(output_path, "UnicornContext_" + timestamp)
if not os.path.exists(output_path): if not os.path.exists(output_path):
@ -197,7 +295,7 @@ def main():
} }
# Write the index file # Write the index file
index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4)) index_file.write(json.dumps(context, indent=4))
index_file.close() index_file.close()
print("Done.") print("Done.")
@ -205,5 +303,6 @@ def main():
except Exception, e: except Exception, e:
print("!!! ERROR:\n\t{}".format(str(e))) print("!!! ERROR:\n\t{}".format(str(e)))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -55,39 +55,47 @@ ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1)
# ---------------------- # ----------------------
# ---- Helper Functions # ---- Helper Functions
def overlap_alignments(segments, memory): def overlap_alignments(segments, memory):
final_list = [] final_list = []
curr_seg_idx = 0 curr_seg_idx = 0
curr_end_addr = 0 curr_end_addr = 0
curr_node = None curr_node = None
current_segment = None current_segment = None
sorted_segments = sorted(segments, key=lambda k: (k['start'], k['end'])) sorted_segments = sorted(segments, key=lambda k: (k["start"], k["end"]))
if curr_seg_idx < len(sorted_segments): if curr_seg_idx < len(sorted_segments):
current_segment = sorted_segments[curr_seg_idx] current_segment = sorted_segments[curr_seg_idx]
for mem in sorted(memory, key=lambda k: (k['start'], -k['end'])): for mem in sorted(memory, key=lambda k: (k["start"], -k["end"])):
if curr_node is None: if curr_node is None:
if current_segment is not None and current_segment['start'] == mem['start']: if current_segment is not None and current_segment["start"] == mem["start"]:
curr_node = deepcopy(current_segment) curr_node = deepcopy(current_segment)
curr_node['permissions'] = mem['permissions'] curr_node["permissions"] = mem["permissions"]
else: else:
curr_node = deepcopy(mem) curr_node = deepcopy(mem)
curr_end_addr = curr_node['end'] curr_end_addr = curr_node["end"]
while curr_end_addr <= mem['end']: while curr_end_addr <= mem["end"]:
if curr_node['end'] == mem['end']: if curr_node["end"] == mem["end"]:
if current_segment is not None and current_segment['start'] > curr_node['start'] and current_segment['start'] < curr_node['end']: if (
curr_node['end'] = current_segment['start'] current_segment is not None
if(curr_node['end'] > curr_node['start']): and current_segment["start"] > curr_node["start"]
and current_segment["start"] < curr_node["end"]
):
curr_node["end"] = current_segment["start"]
if curr_node["end"] > curr_node["start"]:
final_list.append(curr_node) final_list.append(curr_node)
curr_node = deepcopy(current_segment) curr_node = deepcopy(current_segment)
curr_node['permissions'] = mem['permissions'] curr_node["permissions"] = mem["permissions"]
curr_end_addr = curr_node['end'] curr_end_addr = curr_node["end"]
else: else:
if(curr_node['end'] > curr_node['start']): if curr_node["end"] > curr_node["start"]:
final_list.append(curr_node) final_list.append(curr_node)
# if curr_node is a segment # if curr_node is a segment
if current_segment is not None and current_segment['end'] == mem['end']: if (
current_segment is not None
and current_segment["end"] == mem["end"]
):
curr_seg_idx += 1 curr_seg_idx += 1
if curr_seg_idx < len(sorted_segments): if curr_seg_idx < len(sorted_segments):
current_segment = sorted_segments[curr_seg_idx] current_segment = sorted_segments[curr_seg_idx]
@ -98,9 +106,9 @@ def overlap_alignments(segments, memory):
break break
# could only be a segment # could only be a segment
else: else:
if curr_node['end'] < mem['end']: if curr_node["end"] < mem["end"]:
# check for remaining segments and valid segments # check for remaining segments and valid segments
if(curr_node['end'] > curr_node['start']): if curr_node["end"] > curr_node["start"]:
final_list.append(curr_node) final_list.append(curr_node)
curr_seg_idx += 1 curr_seg_idx += 1
@ -109,32 +117,37 @@ def overlap_alignments(segments, memory):
else: else:
current_segment = None current_segment = None
if current_segment is not None and current_segment['start'] <= curr_end_addr and current_segment['start'] < mem['end']: if (
current_segment is not None
and current_segment["start"] <= curr_end_addr
and current_segment["start"] < mem["end"]
):
curr_node = deepcopy(current_segment) curr_node = deepcopy(current_segment)
curr_node['permissions'] = mem['permissions'] curr_node["permissions"] = mem["permissions"]
else: else:
# no more segments # no more segments
curr_node = deepcopy(mem) curr_node = deepcopy(mem)
curr_node['start'] = curr_end_addr curr_node["start"] = curr_end_addr
curr_end_addr = curr_node['end'] curr_end_addr = curr_node["end"]
return final_list return final_list
# https://github.com/llvm-mirror/llvm/blob/master/include/llvm/ADT/Triple.h # https://github.com/llvm-mirror/llvm/blob/master/include/llvm/ADT/Triple.h
def get_arch(): def get_arch():
arch, arch_vendor, arch_os = lldb.target.GetTriple().split('-') arch, arch_vendor, arch_os = lldb.target.GetTriple().split("-")
if arch == 'x86_64': if arch == "x86_64":
return "x64" return "x64"
elif arch == 'x86' or arch == 'i386': elif arch == "x86" or arch == "i386":
return "x86" return "x86"
elif arch == 'aarch64' or arch == 'arm64': elif arch == "aarch64" or arch == "arm64":
return "arm64le" return "arm64le"
elif arch == 'aarch64_be': elif arch == "aarch64_be":
return "arm64be" return "arm64be"
elif arch == 'armeb': elif arch == "armeb":
return "armbe" return "armbe"
elif arch == 'arm': elif arch == "arm":
return "armle" return "armle"
else: else:
return "" return ""
@ -143,6 +156,7 @@ def get_arch():
# ----------------------- # -----------------------
# ---- Dumping functions # ---- Dumping functions
def dump_arch_info(): def dump_arch_info():
arch_info = {} arch_info = {}
arch_info["arch"] = get_arch() arch_info["arch"] = get_arch()
@ -152,19 +166,20 @@ def dump_arch_info():
def dump_regs(): def dump_regs():
reg_state = {} reg_state = {}
for reg_list in lldb.frame.GetRegisters(): for reg_list in lldb.frame.GetRegisters():
if 'general purpose registers' in reg_list.GetName().lower(): if "general purpose registers" in reg_list.GetName().lower():
for reg in reg_list: for reg in reg_list:
reg_state[reg.GetName()] = int(reg.GetValue(), 16) reg_state[reg.GetName()] = int(reg.GetValue(), 16)
return reg_state return reg_state
def get_section_info(sec): def get_section_info(sec):
name = sec.name if sec.name is not None else '' name = sec.name if sec.name is not None else ""
if sec.GetParent().name is not None: if sec.GetParent().name is not None:
name = sec.GetParent().name + '.' + sec.name name = sec.GetParent().name + "." + sec.name
module_name = sec.addr.module.file.GetFilename() module_name = sec.addr.module.file.GetFilename()
module_name = module_name if module_name is not None else '' module_name = module_name if module_name is not None else ""
long_name = module_name + '.' + name long_name = module_name + "." + name
return sec.addr.load_addr, (sec.addr.load_addr + sec.size), sec.size, long_name return sec.addr.load_addr, (sec.addr.load_addr + sec.size), sec.size, long_name
@ -178,26 +193,33 @@ def dump_process_memory(output_dir):
# Loop over the segments, fill in the segment info dictionary # Loop over the segments, fill in the segment info dictionary
for module in lldb.target.module_iter(): for module in lldb.target.module_iter():
for seg_ea in module.section_iter(): for seg_ea in module.section_iter():
seg_info = {'module': module.file.GetFilename() } seg_info = {"module": module.file.GetFilename()}
seg_info['start'], seg_info['end'], seg_size, seg_info['name'] = get_section_info(seg_ea) (
seg_info["start"],
seg_info["end"],
seg_size,
seg_info["name"],
) = get_section_info(seg_ea)
# TODO: Ugly hack for -1 LONG address on 32-bit # TODO: Ugly hack for -1 LONG address on 32-bit
if seg_info['start'] >= sys.maxint or seg_size <= 0: if seg_info["start"] >= sys.maxint or seg_size <= 0:
print "Throwing away page: {}".format(seg_info['name']) print "Throwing away page: {}".format(seg_info["name"])
continue continue
# Page-align segment # Page-align segment
seg_info['start'] = ALIGN_PAGE_DOWN(seg_info['start']) seg_info["start"] = ALIGN_PAGE_DOWN(seg_info["start"])
seg_info['end'] = ALIGN_PAGE_UP(seg_info['end']) seg_info["end"] = ALIGN_PAGE_UP(seg_info["end"])
print("Appending: {}".format(seg_info['name'])) print ("Appending: {}".format(seg_info["name"]))
raw_segment_list.append(seg_info) raw_segment_list.append(seg_info)
# Add the stack memory region (just hardcode 0x1000 around the current SP) # Add the stack memory region (just hardcode 0x1000 around the current SP)
sp = lldb.frame.GetSP() sp = lldb.frame.GetSP()
start_sp = ALIGN_PAGE_DOWN(sp) start_sp = ALIGN_PAGE_DOWN(sp)
raw_segment_list.append({'start': start_sp, 'end': start_sp + 0x1000, 'name': 'STACK'}) raw_segment_list.append(
{"start": start_sp, "end": start_sp + 0x1000, "name": "STACK"}
)
# Write the original memory to file for debugging # Write the original memory to file for debugging
index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), 'w') index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), "w")
index_file.write(json.dumps(raw_segment_list, indent=4)) index_file.write(json.dumps(raw_segment_list, indent=4))
index_file.close() index_file.close()
@ -218,15 +240,20 @@ def dump_process_memory(output_dir):
end_addr = mem_info.GetRegionEnd() end_addr = mem_info.GetRegionEnd()
# Unknown region name # Unknown region name
region_name = 'UNKNOWN' region_name = "UNKNOWN"
# Ignore regions that aren't even mapped # Ignore regions that aren't even mapped
if mem_info.IsMapped() and mem_info.IsReadable(): if mem_info.IsMapped() and mem_info.IsReadable():
mem_info_obj = {'start': start_addr, 'end': end_addr, 'name': region_name, 'permissions': { mem_info_obj = {
"start": start_addr,
"end": end_addr,
"name": region_name,
"permissions": {
"r": mem_info.IsReadable(), "r": mem_info.IsReadable(),
"w": mem_info.IsWritable(), "w": mem_info.IsWritable(),
"x": mem_info.IsExecutable() "x": mem_info.IsExecutable(),
}} },
}
raw_memory_list.append(mem_info_obj) raw_memory_list.append(mem_info_obj)
@ -234,44 +261,67 @@ def dump_process_memory(output_dir):
for seg_info in final_segment_list: for seg_info in final_segment_list:
try: try:
seg_info['content_file'] = '' seg_info["content_file"] = ""
start_addr = seg_info['start'] start_addr = seg_info["start"]
end_addr = seg_info['end'] end_addr = seg_info["end"]
region_name = seg_info['name'] region_name = seg_info["name"]
# Compress and dump the content to a file # Compress and dump the content to a file
err = lldb.SBError() err = lldb.SBError()
seg_content = lldb.process.ReadMemory(start_addr, end_addr - start_addr, err) seg_content = lldb.process.ReadMemory(
if(seg_content == None): start_addr, end_addr - start_addr, err
print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(start_addr, region_name)) )
seg_info['content_file'] = '' if seg_content == None:
print (
"Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(
start_addr, region_name
)
)
seg_info["content_file"] = ""
else: else:
print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(start_addr, len(seg_content), region_name, repr(seg_info['permissions']))) print (
"Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(
start_addr,
len(seg_content),
region_name,
repr(seg_info["permissions"]),
)
)
compressed_seg_content = zlib.compress(seg_content) compressed_seg_content = zlib.compress(seg_content)
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info['content_file'] = md5_sum seg_info["content_file"] = md5_sum
# Write the compressed contents to disk # Write the compressed contents to disk
out_file = open(os.path.join(output_dir, md5_sum), 'wb') out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content) out_file.write(compressed_seg_content)
out_file.close() out_file.close()
except: except:
print("Exception reading segment ({}): {}".format(region_name, sys.exc_info()[0])) print (
"Exception reading segment ({}): {}".format(
region_name, sys.exc_info()[0]
)
)
return final_segment_list return final_segment_list
# ---------- # ----------
# ---- Main # ---- Main
def main(): def main():
try: try:
print ("----- Unicorn Context Dumper -----") print ("----- Unicorn Context Dumper -----")
print ("You must be actively debugging before running this!") print ("You must be actively debugging before running this!")
print("If it fails, double check that you are actively debugging before running.") print (
"If it fails, double check that you are actively debugging before running."
)
# Create the output directory # Create the output directory
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
"%Y%m%d_%H%M%S"
)
output_path = "UnicornContext_" + timestamp output_path = "UnicornContext_" + timestamp
if not os.path.exists(output_path): if not os.path.exists(output_path):
os.makedirs(output_path) os.makedirs(output_path)
@ -285,7 +335,7 @@ def main():
} }
# Write the index file # Write the index file
index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4)) index_file.write(json.dumps(context, indent=4))
index_file.close() index_file.close()
print ("Done.") print ("Done.")
@ -293,6 +343,7 @@ def main():
except Exception, e: except Exception, e:
print ("!!! ERROR:\n\t{}".format(repr(e))) print ("!!! ERROR:\n\t{}".format(repr(e)))
if __name__ == "__main__": if __name__ == "__main__":
main() main()
elif lldb.debugger: elif lldb.debugger:

View File

@ -62,36 +62,37 @@ INDEX_FILE_NAME = "_index.json"
# ---------------------- # ----------------------
# ---- Helper Functions # ---- Helper Functions
def map_arch(): def map_arch():
arch = pwndbg.arch.current # from PWNDBG arch = pwndbg.arch.current # from PWNDBG
if 'x86_64' in arch or 'x86-64' in arch: if "x86_64" in arch or "x86-64" in arch:
return "x64" return "x64"
elif 'x86' in arch or 'i386' in arch: elif "x86" in arch or "i386" in arch:
return "x86" return "x86"
elif 'aarch64' in arch or 'arm64' in arch: elif "aarch64" in arch or "arm64" in arch:
return "arm64le" return "arm64le"
elif 'aarch64_be' in arch: elif "aarch64_be" in arch:
return "arm64be" return "arm64be"
elif 'arm' in arch: elif "arm" in arch:
cpsr = pwndbg.regs['cpsr'] cpsr = pwndbg.regs["cpsr"]
# check endianess # check endianess
if pwndbg.arch.endian == 'big': if pwndbg.arch.endian == "big":
# check for THUMB mode # check for THUMB mode
if (cpsr & (1 << 5)): if cpsr & (1 << 5):
return "armbethumb" return "armbethumb"
else: else:
return "armbe" return "armbe"
else: else:
# check for THUMB mode # check for THUMB mode
if (cpsr & (1 << 5)): if cpsr & (1 << 5):
return "armlethumb" return "armlethumb"
else: else:
return "armle" return "armle"
elif 'mips' in arch: elif "mips" in arch:
if pwndbg.arch.endian == 'little': if pwndbg.arch.endian == "little":
return 'mipsel' return "mipsel"
else: else:
return 'mips' return "mips"
else: else:
return "" return ""
@ -99,6 +100,7 @@ def map_arch():
# ----------------------- # -----------------------
# ---- Dumping functions # ---- Dumping functions
def dump_arch_info(): def dump_arch_info():
arch_info = {} arch_info = {}
arch_info["arch"] = map_arch() arch_info["arch"] = map_arch()
@ -114,7 +116,7 @@ def dump_regs():
# if "64" in get_arch(): # if "64" in get_arch():
# reg_str = "0x{:016x}".format(reg_val) # reg_str = "0x{:016x}".format(reg_val)
# reg_state[reg.strip().strip('$')] = reg_str # reg_state[reg.strip().strip('$')] = reg_str
reg_state[reg.strip().strip('$')] = reg_val reg_state[reg.strip().strip("$")] = reg_val
return reg_state return reg_state
@ -126,7 +128,7 @@ def dump_process_memory(output_dir):
vmmap = pwndbg.vmmap.get() vmmap = pwndbg.vmmap.get()
# Pointer to end of last dumped memory segment # Pointer to end of last dumped memory segment
segment_last_addr = 0x0; segment_last_addr = 0x0
start = None start = None
end = None end = None
@ -143,40 +145,58 @@ def dump_process_memory(output_dir):
start = entry.start start = entry.start
end = entry.end end = entry.end
if (segment_last_addr > entry.start): # indicates overlap if segment_last_addr > entry.start: # indicates overlap
if (segment_last_addr > entry.end): # indicates complete overlap, so we skip the segment entirely if (
segment_last_addr > entry.end
): # indicates complete overlap, so we skip the segment entirely
continue continue
else: else:
start = segment_last_addr start = segment_last_addr
seg_info = {
seg_info = {'start': start, 'end': end, 'name': entry.objfile, 'permissions': { "start": start,
"r": entry.read, "end": end,
"w": entry.write, "name": entry.objfile,
"x": entry.execute "permissions": {"r": entry.read, "w": entry.write, "x": entry.execute},
}, 'content_file': ''} "content_file": "",
}
# "(deleted)" may or may not be valid, but don't push it. # "(deleted)" may or may not be valid, but don't push it.
if entry.read and not '(deleted)' in entry.objfile: if entry.read and not "(deleted)" in entry.objfile:
try: try:
# Compress and dump the content to a file # Compress and dump the content to a file
seg_content = pwndbg.memory.read(start, end - start) seg_content = pwndbg.memory.read(start, end - start)
if(seg_content == None): if seg_content == None:
print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.start, entry.objfile)) print(
"Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(
entry.start, entry.objfile
)
)
else: else:
print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.start, len(seg_content), entry.objfile, repr(seg_info['permissions']))) print(
"Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(
entry.start,
len(seg_content),
entry.objfile,
repr(seg_info["permissions"]),
)
)
compressed_seg_content = zlib.compress(str(seg_content)) compressed_seg_content = zlib.compress(str(seg_content))
md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin"
seg_info["content_file"] = md5_sum seg_info["content_file"] = md5_sum
# Write the compressed contents to disk # Write the compressed contents to disk
out_file = open(os.path.join(output_dir, md5_sum), 'wb') out_file = open(os.path.join(output_dir, md5_sum), "wb")
out_file.write(compressed_seg_content) out_file.write(compressed_seg_content)
out_file.close() out_file.close()
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
print("Exception reading segment ({}): {}".format(entry.objfile, sys.exc_info()[0])) print(
"Exception reading segment ({}): {}".format(
entry.objfile, sys.exc_info()[0]
)
)
else: else:
print("Skipping segment {0}@0x{1:016x}".format(entry.objfile, entry.start)) print("Skipping segment {0}@0x{1:016x}".format(entry.objfile, entry.start))
@ -185,12 +205,13 @@ def dump_process_memory(output_dir):
# Add the segment to the list # Add the segment to the list
final_segment_list.append(seg_info) final_segment_list.append(seg_info)
return final_segment_list return final_segment_list
# ---------- # ----------
# ---- Main # ---- Main
def main(): def main():
print("----- Unicorn Context Dumper -----") print("----- Unicorn Context Dumper -----")
print("You must be actively debugging before running this!") print("You must be actively debugging before running this!")
@ -199,7 +220,9 @@ def main():
try: try:
# Create the output directory # Create the output directory
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(
"%Y%m%d_%H%M%S"
)
output_path = "UnicornContext_" + timestamp output_path = "UnicornContext_" + timestamp
if not os.path.exists(output_path): if not os.path.exists(output_path):
os.makedirs(output_path) os.makedirs(output_path)
@ -213,7 +236,7 @@ def main():
} }
# Write the index file # Write the index file
index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w")
index_file.write(json.dumps(context, indent=4)) index_file.write(json.dumps(context, indent=4))
index_file.close() index_file.close()
print("Done.") print("Done.")
@ -221,6 +244,6 @@ def main():
except Exception as e: except Exception as e:
print("!!! ERROR:\n\t{}".format(repr(e))) print("!!! ERROR:\n\t{}".format(repr(e)))
if __name__ == "__main__" and pwndbg_loaded: if __name__ == "__main__" and pwndbg_loaded:
main() main()

View File

@ -22,7 +22,9 @@ from unicornafl import *
from unicornafl.x86_const import * from unicornafl.x86_const import *
# Path to the file containing the binary to emulate # Path to the file containing the binary to emulate
BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'compcov_target.bin') BINARY_FILE = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "compcov_target.bin"
)
# Memory map for the code to be tested # Memory map for the code to be tested
CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
@ -35,35 +37,66 @@ DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data
try: try:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
from capstone import * from capstone import *
cs = Cs(CS_ARCH_X86, CS_MODE_64) cs = Cs(CS_ARCH_X86, CS_MODE_64)
def unicorn_debug_instruction(uc, address, size, user_data): def unicorn_debug_instruction(uc, address, size, user_data):
mem = uc.mem_read(address, size) mem = uc.mem_read(address, size)
for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
bytes(mem), size
):
print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr))
except ImportError: except ImportError:
def unicorn_debug_instruction(uc, address, size, user_data): def unicorn_debug_instruction(uc, address, size, user_data):
print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
def unicorn_debug_block(uc, address, size, user_data): def unicorn_debug_block(uc, address, size, user_data):
print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
def unicorn_debug_mem_access(uc, access, address, size, value, user_data): def unicorn_debug_mem_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE: if access == UC_MEM_WRITE:
print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data): def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED: if access == UC_MEM_WRITE_UNMAPPED:
print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) print(
" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)
)
def main(): def main():
parser = argparse.ArgumentParser(description="Test harness for compcov_target.bin") parser = argparse.ArgumentParser(description="Test harness for compcov_target.bin")
parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load") parser.add_argument(
parser.add_argument('-t', '--trace', default=False, action="store_true", help="Enables debug tracing") "input_file",
type=str,
help="Path to the file containing the mutated input to load",
)
parser.add_argument(
"-t",
"--trace",
default=False,
action="store_true",
help="Enables debug tracing",
)
args = parser.parse_args() args = parser.parse_args()
# Instantiate a MIPS32 big endian Unicorn Engine instance # Instantiate a MIPS32 big endian Unicorn Engine instance
@ -73,13 +106,16 @@ def main():
uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block) uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block)
uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction) uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction)
uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access) uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access)
uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access) uc.hook_add(
UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
unicorn_debug_mem_invalid_access,
)
# --------------------------------------------------- # ---------------------------------------------------
# Load the binary to emulate and map it into memory # Load the binary to emulate and map it into memory
print("Loading data input from {}".format(args.input_file)) print("Loading data input from {}".format(args.input_file))
binary_file = open(BINARY_FILE, 'rb') binary_file = open(BINARY_FILE, "rb")
binary_code = binary_file.read() binary_code = binary_file.read()
binary_file.close() binary_file.close()
@ -106,7 +142,6 @@ def main():
# Mapping a location to write our buffer to # Mapping a location to write our buffer to
uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX) uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX)
# ----------------------------------------------- # -----------------------------------------------
# Load the mutated input and map it into memory # Load the mutated input and map it into memory
@ -129,8 +164,9 @@ def main():
input_file=args.input_file, input_file=args.input_file,
place_input_callback=place_input_callback, place_input_callback=place_input_callback,
exits=[end_address], exits=[end_address],
persistent_iters=1 persistent_iters=1,
) )
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -22,7 +22,9 @@ from unicornafl import *
from unicornafl.mips_const import * from unicornafl.mips_const import *
# Path to the file containing the binary to emulate # Path to the file containing the binary to emulate
BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'simple_target.bin') BINARY_FILE = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "simple_target.bin"
)
# Memory map for the code to be tested # Memory map for the code to be tested
CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
@ -35,35 +37,66 @@ DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data
try: try:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
from capstone import * from capstone import *
cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN) cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN)
def unicorn_debug_instruction(uc, address, size, user_data): def unicorn_debug_instruction(uc, address, size, user_data):
mem = uc.mem_read(address, size) mem = uc.mem_read(address, size)
for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
bytes(mem), size
):
print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr))
except ImportError: except ImportError:
def unicorn_debug_instruction(uc, address, size, user_data): def unicorn_debug_instruction(uc, address, size, user_data):
print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
def unicorn_debug_block(uc, address, size, user_data): def unicorn_debug_block(uc, address, size, user_data):
print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
def unicorn_debug_mem_access(uc, access, address, size, value, user_data): def unicorn_debug_mem_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE: if access == UC_MEM_WRITE:
print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data): def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED: if access == UC_MEM_WRITE_UNMAPPED:
print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) print(
" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)
)
def main(): def main():
parser = argparse.ArgumentParser(description="Test harness for simple_target.bin") parser = argparse.ArgumentParser(description="Test harness for simple_target.bin")
parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load") parser.add_argument(
parser.add_argument('-t', '--trace', default=False, action="store_true", help="Enables debug tracing") "input_file",
type=str,
help="Path to the file containing the mutated input to load",
)
parser.add_argument(
"-t",
"--trace",
default=False,
action="store_true",
help="Enables debug tracing",
)
args = parser.parse_args() args = parser.parse_args()
# Instantiate a MIPS32 big endian Unicorn Engine instance # Instantiate a MIPS32 big endian Unicorn Engine instance
@ -73,13 +106,16 @@ def main():
uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block) uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block)
uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction) uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction)
uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access) uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access)
uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access) uc.hook_add(
UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
unicorn_debug_mem_invalid_access,
)
# --------------------------------------------------- # ---------------------------------------------------
# Load the binary to emulate and map it into memory # Load the binary to emulate and map it into memory
print("Loading data input from {}".format(args.input_file)) print("Loading data input from {}".format(args.input_file))
binary_file = open(BINARY_FILE, 'rb') binary_file = open(BINARY_FILE, "rb")
binary_code = binary_file.read() binary_code = binary_file.read()
binary_file.close() binary_file.close()
@ -94,7 +130,7 @@ def main():
# Set the program counter to the start of the code # Set the program counter to the start of the code
start_address = CODE_ADDRESS # Address of entry point of main() start_address = CODE_ADDRESS # Address of entry point of main()
end_address = CODE_ADDRESS + 0xf4 # Address of last instruction in main() end_address = CODE_ADDRESS + 0xF4 # Address of last instruction in main()
uc.reg_write(UC_MIPS_REG_PC, start_address) uc.reg_write(UC_MIPS_REG_PC, start_address)
# ----------------- # -----------------
@ -122,5 +158,6 @@ def main():
# Start the fuzzer. # Start the fuzzer.
uc.afl_fuzz(args.input_file, place_input_callback, [end_address]) uc.afl_fuzz(args.input_file, place_input_callback, [end_address])
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -25,7 +25,9 @@ from unicornafl import *
from unicornafl.mips_const import * from unicornafl.mips_const import *
# Path to the file containing the binary to emulate # Path to the file containing the binary to emulate
BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'simple_target.bin') BINARY_FILE = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "simple_target.bin"
)
# Memory map for the code to be tested # Memory map for the code to be tested
CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded
@ -38,37 +40,64 @@ DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data
try: try:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
from capstone import * from capstone import *
cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN) cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN)
def unicorn_debug_instruction(uc, address, size, user_data): def unicorn_debug_instruction(uc, address, size, user_data):
mem = uc.mem_read(address, size) mem = uc.mem_read(address, size)
for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
bytes(mem), size
):
print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr))
except ImportError: except ImportError:
def unicorn_debug_instruction(uc, address, size, user_data): def unicorn_debug_instruction(uc, address, size, user_data):
print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
def unicorn_debug_block(uc, address, size, user_data): def unicorn_debug_block(uc, address, size, user_data):
print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
def unicorn_debug_mem_access(uc, access, address, size, value, user_data): def unicorn_debug_mem_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE: if access == UC_MEM_WRITE:
print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data): def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED: if access == UC_MEM_WRITE_UNMAPPED:
print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) print(
" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)
)
def force_crash(uc_error): def force_crash(uc_error):
# This function should be called to indicate to AFL that a crash occurred during emulation. # This function should be called to indicate to AFL that a crash occurred during emulation.
# Pass in the exception received from Uc.emu_start() # Pass in the exception received from Uc.emu_start()
mem_errors = [ mem_errors = [
UC_ERR_READ_UNMAPPED, UC_ERR_READ_PROT, UC_ERR_READ_UNALIGNED, UC_ERR_READ_UNMAPPED,
UC_ERR_WRITE_UNMAPPED, UC_ERR_WRITE_PROT, UC_ERR_WRITE_UNALIGNED, UC_ERR_READ_PROT,
UC_ERR_FETCH_UNMAPPED, UC_ERR_FETCH_PROT, UC_ERR_FETCH_UNALIGNED, UC_ERR_READ_UNALIGNED,
UC_ERR_WRITE_UNMAPPED,
UC_ERR_WRITE_PROT,
UC_ERR_WRITE_UNALIGNED,
UC_ERR_FETCH_UNMAPPED,
UC_ERR_FETCH_PROT,
UC_ERR_FETCH_UNALIGNED,
] ]
if uc_error.errno in mem_errors: if uc_error.errno in mem_errors:
# Memory error - throw SIGSEGV # Memory error - throw SIGSEGV
@ -80,11 +109,22 @@ def force_crash(uc_error):
# Not sure what happened - throw SIGABRT # Not sure what happened - throw SIGABRT
os.kill(os.getpid(), signal.SIGABRT) os.kill(os.getpid(), signal.SIGABRT)
def main(): def main():
parser = argparse.ArgumentParser(description="Test harness for simple_target.bin") parser = argparse.ArgumentParser(description="Test harness for simple_target.bin")
parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load") parser.add_argument(
parser.add_argument('-d', '--debug', default=False, action="store_true", help="Enables debug tracing") "input_file",
type=str,
help="Path to the file containing the mutated input to load",
)
parser.add_argument(
"-d",
"--debug",
default=False,
action="store_true",
help="Enables debug tracing",
)
args = parser.parse_args() args = parser.parse_args()
# Instantiate a MIPS32 big endian Unicorn Engine instance # Instantiate a MIPS32 big endian Unicorn Engine instance
@ -94,13 +134,16 @@ def main():
uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block) uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block)
uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction) uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction)
uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access) uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access)
uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access) uc.hook_add(
UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
unicorn_debug_mem_invalid_access,
)
# --------------------------------------------------- # ---------------------------------------------------
# Load the binary to emulate and map it into memory # Load the binary to emulate and map it into memory
print("Loading data input from {}".format(args.input_file)) print("Loading data input from {}".format(args.input_file))
binary_file = open(BINARY_FILE, 'rb') binary_file = open(BINARY_FILE, "rb")
binary_code = binary_file.read() binary_code = binary_file.read()
binary_file.close() binary_file.close()
@ -115,7 +158,7 @@ def main():
# Set the program counter to the start of the code # Set the program counter to the start of the code
start_address = CODE_ADDRESS # Address of entry point of main() start_address = CODE_ADDRESS # Address of entry point of main()
end_address = CODE_ADDRESS + 0xf4 # Address of last instruction in main() end_address = CODE_ADDRESS + 0xF4 # Address of last instruction in main()
uc.reg_write(UC_MIPS_REG_PC, start_address) uc.reg_write(UC_MIPS_REG_PC, start_address)
# ----------------- # -----------------
@ -147,7 +190,7 @@ def main():
# Load the mutated input from disk # Load the mutated input from disk
out("Loading data input from {}", args.input_file) out("Loading data input from {}", args.input_file)
input_file = open(args.input_file, 'rb') input_file = open(args.input_file, "rb")
input = input_file.read() input = input_file.read()
input_file.close() input_file.close()
@ -175,5 +218,6 @@ def main():
# UC_AFL_RET_FINISHED = 3 # UC_AFL_RET_FINISHED = 3
out("Done. AFL Mode is {}", afl_mode) out("Done. AFL Mode is {}", afl_mode)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -262,7 +262,7 @@ def main():
# print(f"Placing input: {input} in round {persistent_round}") # print(f"Placing input: {input} in round {persistent_round}")
# Make sure the string is always 0-terminated (as it would be "in the wild") # Make sure the string is always 0-terminated (as it would be "in the wild")
input[-1] = b'\0' input[-1] = b"\0"
# Write the mutated command into the data buffer # Write the mutated command into the data buffer
uc.mem_write(INPUT_ADDRESS, input) uc.mem_write(INPUT_ADDRESS, input)

View File

@ -11,6 +11,7 @@ import idc
# See https://www.hex-rays.com/products/ida/support/ida74_idapython_no_bc695_porting_guide.shtml # See https://www.hex-rays.com/products/ida/support/ida74_idapython_no_bc695_porting_guide.shtml
from os.path import expanduser from os.path import expanduser
home = expanduser("~") home = expanduser("~")
patchpoints = set() patchpoints = set()
@ -52,9 +53,9 @@ if rem != 0:
print("Writing to " + home + "/Desktop/patches.txt") print("Writing to " + home + "/Desktop/patches.txt")
with open(home + "/Desktop/patches.txt", "w") as f: with open(home + "/Desktop/patches.txt", "w") as f:
f.write(ida_nalt.get_root_filename() + ':' + hex(size) + '\n') f.write(ida_nalt.get_root_filename() + ":" + hex(size) + "\n")
f.write('\n'.join(map(hex, sorted(patchpoints)))) f.write("\n".join(map(hex, sorted(patchpoints))))
f.write('\n') f.write("\n")
print("Done, found {} patchpoints".format(len(patchpoints))) print("Done, found {} patchpoints".format(len(patchpoints)))

View File

@ -12,6 +12,7 @@ import random, re, io
# The XmlMutatorMin class # # The XmlMutatorMin class #
########################### ###########################
class XmlMutatorMin: class XmlMutatorMin:
""" """
@ -41,7 +42,12 @@ class XmlMutatorMin:
self.tree = None self.tree = None
# High-level mutators (no database needed) # High-level mutators (no database needed)
hl_mutators_delete = ["del_node_and_children", "del_node_but_children", "del_attribute", "del_content"] # Delete items hl_mutators_delete = [
"del_node_and_children",
"del_node_but_children",
"del_attribute",
"del_content",
] # Delete items
hl_mutators_fuzz = ["fuzz_attribute"] # Randomly change attribute values hl_mutators_fuzz = ["fuzz_attribute"] # Randomly change attribute values
# Exposed mutators # Exposed mutators
@ -74,7 +80,9 @@ class XmlMutatorMin:
""" Serialize a XML document. Basic wrapper around lxml.tostring() """ """ Serialize a XML document. Basic wrapper around lxml.tostring() """
return ET.tostring(tree, with_tail=False, xml_declaration=True, encoding=tree.docinfo.encoding) return ET.tostring(
tree, with_tail=False, xml_declaration=True, encoding=tree.docinfo.encoding
)
def __ver(self, version): def __ver(self, version):
@ -161,7 +169,7 @@ class XmlMutatorMin:
# Randomly pick one the function calls # Randomly pick one the function calls
(func, args) = random.choice(l) (func, args) = random.choice(l)
# Split by "," and randomly pick one of the arguments # Split by "," and randomly pick one of the arguments
value = random.choice(args.split(',')) value = random.choice(args.split(","))
# Remove superfluous characters # Remove superfluous characters
unclean_value = value unclean_value = value
value = value.strip(" ").strip("'") value = value.strip(" ").strip("'")
@ -170,30 +178,30 @@ class XmlMutatorMin:
value = attrib_value value = attrib_value
# For each type, define some possible replacement values # For each type, define some possible replacement values
choices_number = ( \ choices_number = (
"0", \ "0",
"11111", \ "11111",
"-128", \ "-128",
"2", \ "2",
"-1", \ "-1",
"1/3", \ "1/3",
"42/0", \ "42/0",
"1094861636 idiv 1.0", \ "1094861636 idiv 1.0",
"-1123329771506872 idiv 3.8", \ "-1123329771506872 idiv 3.8",
"17=$numericRTF", \ "17=$numericRTF",
str(3 + random.randrange(0, 100)), \ str(3 + random.randrange(0, 100)),
) )
choices_letter = ( \ choices_letter = (
"P" * (25 * random.randrange(1, 100)), \ "P" * (25 * random.randrange(1, 100)),
"%s%s%s%s%s%s", \ "%s%s%s%s%s%s",
"foobar", \ "foobar",
) )
choices_alnum = ( \ choices_alnum = (
"Abc123", \ "Abc123",
"020F0302020204030204", \ "020F0302020204030204",
"020F0302020204030204" * (random.randrange(5, 20)), \ "020F0302020204030204" * (random.randrange(5, 20)),
) )
# Fuzz the value # Fuzz the value
@ -232,7 +240,10 @@ class XmlMutatorMin:
# Log something # Log something
if self.verbose: if self.verbose:
print("Fuzzing attribute #%i '%s' of tag #%i '%s'" % (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag)) print(
"Fuzzing attribute #%i '%s' of tag #%i '%s'"
% (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag)
)
# Modify the attribute # Modify the attribute
rand_elem.set(rand_attrib, new_value.decode("utf-8")) rand_elem.set(rand_attrib, new_value.decode("utf-8"))
@ -270,7 +281,10 @@ class XmlMutatorMin:
# Log something # Log something
if self.verbose: if self.verbose:
but_or_and = "and" if delete_children else "but" but_or_and = "and" if delete_children else "but"
print("Deleting tag #%i '%s' %s its children" % (rand_elem_id, rand_elem.tag, but_or_and)) print(
"Deleting tag #%i '%s' %s its children"
% (rand_elem_id, rand_elem.tag, but_or_and)
)
if delete_children is False: if delete_children is False:
# Link children of the random (soon to be deleted) node to its parent # Link children of the random (soon to be deleted) node to its parent
@ -318,7 +332,10 @@ class XmlMutatorMin:
# Log something # Log something
if self.verbose: if self.verbose:
print("Deleting attribute #%i '%s' of tag #%i '%s'" % (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag)) print(
"Deleting attribute #%i '%s' of tag #%i '%s'"
% (rand_attrib_id, rand_attrib, rand_elem_id, rand_elem.tag)
)
# Delete the attribute # Delete the attribute
rand_elem.attrib.pop(rand_attrib) rand_elem.attrib.pop(rand_attrib)
@ -329,4 +346,3 @@ class XmlMutatorMin:
# High-level mutation # High-level mutation
self.__exec_among(self, self.hl_mutators_all, min, max) self.__exec_among(self, self.hl_mutators_all, min, max)

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# encoding: utf-8 # encoding: utf-8
''' """
Module containing functions shared between multiple AFL modules Module containing functions shared between multiple AFL modules
@author: Christian Holler (:decoder) @author: Christian Holler (:decoder)
@ -12,7 +12,7 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/. file, You can obtain one at http://mozilla.org/MPL/2.0/.
@contact: choller@mozilla.com @contact: choller@mozilla.com
''' """
from __future__ import print_function from __future__ import print_function
import random import random
@ -33,8 +33,8 @@ def randel_pop(l):
def write_exc_example(data, exc): def write_exc_example(data, exc):
exc_name = re.sub(r'[^a-zA-Z0-9]', '_', repr(exc)) exc_name = re.sub(r"[^a-zA-Z0-9]", "_", repr(exc))
if not os.path.exists(exc_name): if not os.path.exists(exc_name):
with open(exc_name, 'w') as f: with open(exc_name, "w") as f:
f.write(data) f.write(data)

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# encoding: utf-8 # encoding: utf-8
''' """
Example Python Module for AFLFuzz Example Python Module for AFLFuzz
@author: Christian Holler (:decoder) @author: Christian Holler (:decoder)
@ -12,7 +12,7 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/. file, You can obtain one at http://mozilla.org/MPL/2.0/.
@contact: choller@mozilla.com @contact: choller@mozilla.com
''' """
import random import random
@ -26,12 +26,12 @@ COMMANDS = [
def init(seed): def init(seed):
''' """
Called once when AFLFuzz starts up. Used to seed our RNG. Called once when AFLFuzz starts up. Used to seed our RNG.
@type seed: int @type seed: int
@param seed: A 32-bit random value @param seed: A 32-bit random value
''' """
random.seed(seed) random.seed(seed)
@ -40,7 +40,7 @@ def deinit():
def fuzz(buf, add_buf, max_size): def fuzz(buf, add_buf, max_size):
''' """
Called per fuzzing iteration. Called per fuzzing iteration.
@type buf: bytearray @type buf: bytearray
@ -55,13 +55,14 @@ def fuzz(buf, add_buf, max_size):
@rtype: bytearray @rtype: bytearray
@return: A new bytearray containing the mutated data @return: A new bytearray containing the mutated data
''' """
ret = bytearray(100) ret = bytearray(100)
ret[:3] = random.choice(COMMANDS) ret[:3] = random.choice(COMMANDS)
return ret return ret
# Uncomment and implement the following methods if you want to use a custom # Uncomment and implement the following methods if you want to use a custom
# trimming algorithm. See also the documentation for a better API description. # trimming algorithm. See also the documentation for a better API description.

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# encoding: utf-8 # encoding: utf-8
''' """
Simple Chunk Cross-Over Replacement Module for AFLFuzz Simple Chunk Cross-Over Replacement Module for AFLFuzz
@author: Christian Holler (:decoder) @author: Christian Holler (:decoder)
@ -12,24 +12,24 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/. file, You can obtain one at http://mozilla.org/MPL/2.0/.
@contact: choller@mozilla.com @contact: choller@mozilla.com
''' """
import random import random
def init(seed): def init(seed):
''' """
Called once when AFLFuzz starts up. Used to seed our RNG. Called once when AFLFuzz starts up. Used to seed our RNG.
@type seed: int @type seed: int
@param seed: A 32-bit random value @param seed: A 32-bit random value
''' """
# Seed our RNG # Seed our RNG
random.seed(seed) random.seed(seed)
def fuzz(buf, add_buf, max_size): def fuzz(buf, add_buf, max_size):
''' """
Called per fuzzing iteration. Called per fuzzing iteration.
@type buf: bytearray @type buf: bytearray
@ -44,7 +44,7 @@ def fuzz(buf, add_buf, max_size):
@rtype: bytearray @rtype: bytearray
@return: A new bytearray containing the mutated data @return: A new bytearray containing the mutated data
''' """
# Make a copy of our input buffer for returning # Make a copy of our input buffer for returning
ret = bytearray(buf) ret = bytearray(buf)
@ -58,7 +58,9 @@ def fuzz(buf, add_buf, max_size):
rand_dst_idx = random.randint(0, len(buf)) rand_dst_idx = random.randint(0, len(buf))
# Make the chunk replacement # Make the chunk replacement
ret[rand_dst_idx:rand_dst_idx + fragment_len] = add_buf[rand_src_idx:rand_src_idx + fragment_len] ret[rand_dst_idx : rand_dst_idx + fragment_len] = add_buf[
rand_src_idx : rand_src_idx + fragment_len
]
# Return data # Return data
return ret return ret

View File

@ -72,7 +72,10 @@ def fuzz(buf, add_buf, max_size):
if via_buffer: if via_buffer:
try: try:
__mutator__.init_from_string(buf_str) __mutator__.init_from_string(buf_str)
log("fuzz(): Mutator successfully initialized with AFL buffer (%d bytes)" % len(buf_str)) log(
"fuzz(): Mutator successfully initialized with AFL buffer (%d bytes)"
% len(buf_str)
)
except Exception: except Exception:
via_buffer = False via_buffer = False
log("fuzz(): Can't initialize mutator with AFL buffer") log("fuzz(): Can't initialize mutator with AFL buffer")
@ -104,7 +107,7 @@ def fuzz(buf, add_buf, max_size):
# Main (for debug) # Main (for debug)
if __name__ == '__main__': if __name__ == "__main__":
__log__ = True __log__ = True
__log_file__ = "/dev/stdout" __log_file__ = "/dev/stdout"
@ -112,7 +115,9 @@ if __name__ == '__main__':
init(__seed__) init(__seed__)
in_1 = bytearray("<foo ddd='eeee'>ffff<a b='c' d='456' eee='ffffff'>zzzzzzzzzzzz</a><b yyy='YYY' zzz='ZZZ'></b></foo>") in_1 = bytearray(
"<foo ddd='eeee'>ffff<a b='c' d='456' eee='ffffff'>zzzzzzzzzzzz</a><b yyy='YYY' zzz='ZZZ'></b></foo>"
)
in_2 = bytearray("<abc abc123='456' abcCBA='ppppppppppppppppppppppppppppp'/>") in_2 = bytearray("<abc abc123='456' abcCBA='ppppppppppppppppppppppppppppp'/>")
out = fuzz(in_1, in_2) out = fuzz(in_1, in_2)
print(out) print(out)