formatting

This commit is contained in:
Dominik Maier 2021-07-15 12:38:15 +02:00
parent 7fb2d90c49
commit c279750689

View File

@ -29,6 +29,7 @@ from unicornafl.mips_const import *
# If Capstone libraries are availible (only check once) # If Capstone libraries are availible (only check once)
try: try:
from capstone import * from capstone import *
CAPSTONE_EXISTS = 1 CAPSTONE_EXISTS = 1
except: except:
CAPSTONE_EXISTS = 0 CAPSTONE_EXISTS = 0
@ -44,29 +45,38 @@ MAX_ALLOWABLE_SEG_SIZE = 1024 * 1024 * 1024
# Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only) # Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only)
ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1) ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1)
ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1) ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE - 1)
# ---------------------------------------
# ---- Unicorn-based heap implementation
#---------------------------------------
#---- Unicorn-based heap implementation
class UnicornSimpleHeap(object): class UnicornSimpleHeap(object):
""" Use this class to provide a simple heap implementation. This should """Use this class to provide a simple heap implementation. This should
be used if malloc/free calls break things during emulation. This heap also be used if malloc/free calls break things during emulation. This heap also
implements basic guard-page capabilities which enable immediate notice of implements basic guard-page capabilities which enable immediate notice of
heap overflow and underflows. heap overflow and underflows.
""" """
# Helper data-container used to track chunks # Helper data-container used to track chunks
class HeapChunk(object): class HeapChunk(object):
def __init__(self, actual_addr, total_size, data_size): def __init__(self, actual_addr, total_size, data_size):
self.total_size = total_size # Total size of the chunk (including padding and guard page) self.total_size = (
self.actual_addr = actual_addr # Actual start address of the chunk total_size # Total size of the chunk (including padding and guard page)
self.data_size = data_size # Size requested by the caller of actual malloc call )
self.data_addr = actual_addr + UNICORN_PAGE_SIZE # Address where data actually starts self.actual_addr = actual_addr # Actual start address of the chunk
self.data_size = (
data_size # Size requested by the caller of actual malloc call
)
self.data_addr = (
actual_addr + UNICORN_PAGE_SIZE
) # Address where data actually starts
# Returns true if the specified buffer is completely within the chunk, else false # Returns true if the specified buffer is completely within the chunk, else false
def is_buffer_in_chunk(self, addr, size): def is_buffer_in_chunk(self, addr, size):
if addr >= self.data_addr and ((addr + size) <= (self.data_addr + self.data_size)): if addr >= self.data_addr and (
(addr + size) <= (self.data_addr + self.data_size)
):
return True return True
else: else:
return False return False
@ -75,9 +85,9 @@ class UnicornSimpleHeap(object):
HEAP_MIN_ADDR = 0x00002000 HEAP_MIN_ADDR = 0x00002000
HEAP_MAX_ADDR = 0xFFFFFFFF HEAP_MAX_ADDR = 0xFFFFFFFF
_uc = None # Unicorn engine instance to interact with _uc = None # Unicorn engine instance to interact with
_chunks = [] # List of all known chunks _chunks = [] # List of all known chunks
_debug_print = False # True to print debug information _debug_print = False # True to print debug information
def __init__(self, uc, debug_print=False): def __init__(self, uc, debug_print=False):
self._uc = uc self._uc = uc
@ -98,7 +108,11 @@ class UnicornSimpleHeap(object):
self._uc.mem_map(addr, total_chunk_size, UC_PROT_READ | UC_PROT_WRITE) self._uc.mem_map(addr, total_chunk_size, UC_PROT_READ | UC_PROT_WRITE)
chunk = self.HeapChunk(addr, total_chunk_size, size) chunk = self.HeapChunk(addr, total_chunk_size, size)
if self._debug_print: if self._debug_print:
print("Allocating 0x{0:x}-byte chunk @ 0x{1:016x}".format(chunk.data_size, chunk.data_addr)) print(
"Allocating 0x{0:x}-byte chunk @ 0x{1:016x}".format(
chunk.data_size, chunk.data_addr
)
)
break break
except UcError as e: except UcError as e:
continue continue
@ -110,19 +124,26 @@ class UnicornSimpleHeap(object):
def calloc(self, size, count): def calloc(self, size, count):
# Simple wrapper around malloc with calloc() args # Simple wrapper around malloc with calloc() args
return self.malloc(size*count) return self.malloc(size * count)
def realloc(self, ptr, new_size): def realloc(self, ptr, new_size):
# Wrapper around malloc(new_size) / memcpy(new, old, old_size) / free(old) # Wrapper around malloc(new_size) / memcpy(new, old, old_size) / free(old)
if self._debug_print: if self._debug_print:
print("Reallocating chunk @ 0x{0:016x} to be 0x{1:x} bytes".format(ptr, new_size)) print(
"Reallocating chunk @ 0x{0:016x} to be 0x{1:x} bytes".format(
ptr, new_size
)
)
old_chunk = None old_chunk = None
for chunk in self._chunks: for chunk in self._chunks:
if chunk.data_addr == ptr: if chunk.data_addr == ptr:
old_chunk = chunk old_chunk = chunk
new_chunk_addr = self.malloc(new_size) new_chunk_addr = self.malloc(new_size)
if old_chunk != None: if old_chunk != None:
self._uc.mem_write(new_chunk_addr, str(self._uc.mem_read(old_chunk.data_addr, old_chunk.data_size))) self._uc.mem_write(
new_chunk_addr,
str(self._uc.mem_read(old_chunk.data_addr, old_chunk.data_size)),
)
self.free(old_chunk.data_addr) self.free(old_chunk.data_addr)
return new_chunk_addr return new_chunk_addr
@ -130,7 +151,11 @@ class UnicornSimpleHeap(object):
for chunk in self._chunks: for chunk in self._chunks:
if chunk.is_buffer_in_chunk(addr, 1): if chunk.is_buffer_in_chunk(addr, 1):
if self._debug_print: if self._debug_print:
print("Freeing 0x{0:x}-byte chunk @ 0x{0:016x}".format(chunk.req_size, chunk.data_addr)) print(
"Freeing 0x{0:x}-byte chunk @ 0x{0:016x}".format(
chunk.req_size, chunk.data_addr
)
)
self._uc.mem_unmap(chunk.actual_addr, chunk.total_size) self._uc.mem_unmap(chunk.actual_addr, chunk.total_size)
self._chunks.remove(chunk) self._chunks.remove(chunk)
return True return True
@ -139,19 +164,27 @@ class UnicornSimpleHeap(object):
# Implements basic guard-page functionality # Implements basic guard-page functionality
def __check_mem_access(self, uc, access, address, size, value, user_data): def __check_mem_access(self, uc, access, address, size, value, user_data):
for chunk in self._chunks: for chunk in self._chunks:
if address >= chunk.actual_addr and ((address + size) <= (chunk.actual_addr + chunk.total_size)): if address >= chunk.actual_addr and (
(address + size) <= (chunk.actual_addr + chunk.total_size)
):
if chunk.is_buffer_in_chunk(address, size) == False: if chunk.is_buffer_in_chunk(address, size) == False:
if self._debug_print: if self._debug_print:
print("Heap over/underflow attempting to {0} 0x{1:x} bytes @ {2:016x}".format( \ print(
"write" if access == UC_MEM_WRITE else "read", size, address)) "Heap over/underflow attempting to {0} 0x{1:x} bytes @ {2:016x}".format(
"write" if access == UC_MEM_WRITE else "read",
size,
address,
)
)
# Force a memory-based crash # Force a memory-based crash
uc.force_crash(UcError(UC_ERR_READ_PROT)) uc.force_crash(UcError(UC_ERR_READ_PROT))
#---------------------------
#---- Loading function # ---------------------------
# ---- Loading function
class AflUnicornEngine(Uc): class AflUnicornEngine(Uc):
def __init__(self, context_directory, enable_trace=False, debug_print=False): def __init__(self, context_directory, enable_trace=False, debug_print=False):
""" """
Initializes an AflUnicornEngine instance, which extends standard the UnicornEngine Initializes an AflUnicornEngine instance, which extends standard the UnicornEngine
@ -166,51 +199,56 @@ class AflUnicornEngine(Uc):
# Make sure the index file exists and load it # Make sure the index file exists and load it
index_file_path = os.path.join(context_directory, INDEX_FILE_NAME) index_file_path = os.path.join(context_directory, INDEX_FILE_NAME)
if not os.path.isfile(index_file_path): if not os.path.isfile(index_file_path):
raise Exception("Index file not found. Expected it to be at {}".format(index_file_path)) raise Exception(
"Index file not found. Expected it to be at {}".format(index_file_path)
)
# Load the process context from the index file # Load the process context from the index file
if debug_print: if debug_print:
print("Loading process context index from {}".format(index_file_path)) print("Loading process context index from {}".format(index_file_path))
index_file = open(index_file_path, 'r') index_file = open(index_file_path, "r")
context = json.load(index_file) context = json.load(index_file)
index_file.close() index_file.close()
# Check the context to make sure we have the basic essential components # Check the context to make sure we have the basic essential components
if 'arch' not in context: if "arch" not in context:
raise Exception("Couldn't find architecture information in index file") raise Exception("Couldn't find architecture information in index file")
if 'regs' not in context: if "regs" not in context:
raise Exception("Couldn't find register information in index file") raise Exception("Couldn't find register information in index file")
if 'segments' not in context: if "segments" not in context:
raise Exception("Couldn't find segment/memory information in index file") raise Exception("Couldn't find segment/memory information in index file")
# Set the UnicornEngine instance's architecture and mode # Set the UnicornEngine instance's architecture and mode
self._arch_str = context['arch']['arch'] self._arch_str = context["arch"]["arch"]
arch, mode = self.__get_arch_and_mode(self._arch_str) arch, mode = self.__get_arch_and_mode(self._arch_str)
Uc.__init__(self, arch, mode) Uc.__init__(self, arch, mode)
# Load the registers # Load the registers
regs = context['regs'] regs = context["regs"]
reg_map = self.__get_register_map(self._arch_str) reg_map = self.__get_register_map(self._arch_str)
self.__load_registers(regs, reg_map, debug_print) self.__load_registers(regs, reg_map, debug_print)
# If we have extra FLOATING POINT regs, load them in! # If we have extra FLOATING POINT regs, load them in!
if 'regs_extended' in context: if "regs_extended" in context:
if context['regs_extended']: if context["regs_extended"]:
regs_extended = context['regs_extended'] regs_extended = context["regs_extended"]
reg_map = self.__get_registers_extended(self._arch_str) reg_map = self.__get_registers_extended(self._arch_str)
self.__load_registers(regs_extended, reg_map, debug_print) self.__load_registers(regs_extended, reg_map, debug_print)
# For ARM, sometimes the stack pointer is erased ??? (I think I fixed this (issue with ordering of dumper.py, I'll keep the write anyways) # For ARM, sometimes the stack pointer is erased ??? (I think I fixed this (issue with ordering of dumper.py, I'll keep the write anyways)
if self.__get_arch_and_mode(self.get_arch_str())[0] == UC_ARCH_ARM: if self.__get_arch_and_mode(self.get_arch_str())[0] == UC_ARCH_ARM:
self.reg_write(UC_ARM_REG_SP, regs['sp']) self.reg_write(UC_ARM_REG_SP, regs["sp"])
# Setup the memory map and load memory content # Setup the memory map and load memory content
self.__map_segments(context['segments'], context_directory, debug_print) self.__map_segments(context["segments"], context_directory, debug_print)
if enable_trace: if enable_trace:
self.hook_add(UC_HOOK_BLOCK, self.__trace_block) self.hook_add(UC_HOOK_BLOCK, self.__trace_block)
self.hook_add(UC_HOOK_CODE, self.__trace_instruction) self.hook_add(UC_HOOK_CODE, self.__trace_instruction)
self.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, self.__trace_mem_access) self.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, self.__trace_mem_access)
self.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, self.__trace_mem_invalid_access) self.hook_add(
UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID,
self.__trace_mem_invalid_access,
)
if debug_print: if debug_print:
print("Done loading context.") print("Done loading context.")
@ -225,13 +263,19 @@ class AflUnicornEngine(Uc):
return self._arch_str return self._arch_str
def force_crash(self, uc_error): def force_crash(self, uc_error):
""" This function should be called to indicate to AFL that a crash occurred during emulation. """This function should be called to indicate to AFL that a crash occurred during emulation.
You can pass the exception received from Uc.emu_start You can pass the exception received from Uc.emu_start
""" """
mem_errors = [ mem_errors = [
UC_ERR_READ_UNMAPPED, UC_ERR_READ_PROT, UC_ERR_READ_UNALIGNED, UC_ERR_READ_UNMAPPED,
UC_ERR_WRITE_UNMAPPED, UC_ERR_WRITE_PROT, UC_ERR_WRITE_UNALIGNED, UC_ERR_READ_PROT,
UC_ERR_FETCH_UNMAPPED, UC_ERR_FETCH_PROT, UC_ERR_FETCH_UNALIGNED, UC_ERR_READ_UNALIGNED,
UC_ERR_WRITE_UNMAPPED,
UC_ERR_WRITE_PROT,
UC_ERR_WRITE_UNALIGNED,
UC_ERR_FETCH_UNMAPPED,
UC_ERR_FETCH_PROT,
UC_ERR_FETCH_UNALIGNED,
] ]
if uc_error.errno in mem_errors: if uc_error.errno in mem_errors:
# Memory error - throw SIGSEGV # Memory error - throw SIGSEGV
@ -245,13 +289,18 @@ class AflUnicornEngine(Uc):
def dump_regs(self): def dump_regs(self):
""" Dumps the contents of all the registers to STDOUT """ """ Dumps the contents of all the registers to STDOUT """
for reg in sorted(self.__get_register_map(self._arch_str).items(), key=lambda reg: reg[0]): for reg in sorted(
self.__get_register_map(self._arch_str).items(), key=lambda reg: reg[0]
):
print(">>> {0:>4}: 0x{1:016x}".format(reg[0], self.reg_read(reg[1]))) print(">>> {0:>4}: 0x{1:016x}".format(reg[0], self.reg_read(reg[1])))
def dump_regs_extended(self): def dump_regs_extended(self):
""" Dumps the contents of all the registers to STDOUT """ """ Dumps the contents of all the registers to STDOUT """
try: try:
for reg in sorted(self.__get_registers_extended(self._arch_str).items(), key=lambda reg: reg[0]): for reg in sorted(
self.__get_registers_extended(self._arch_str).items(),
key=lambda reg: reg[0],
):
print(">>> {0:>4}: 0x{1:016x}".format(reg[0], self.reg_read(reg[1]))) print(">>> {0:>4}: 0x{1:016x}".format(reg[0], self.reg_read(reg[1])))
except Exception as e: except Exception as e:
print("ERROR: Are extended registers loaded?") print("ERROR: Are extended registers loaded?")
@ -290,8 +339,8 @@ class AflUnicornEngine(Uc):
struct.unpack('<Q', self.mem_read(addr, 8))[0])) struct.unpack('<Q', self.mem_read(addr, 8))[0]))
""" """
#----------------------------- # -----------------------------
#---- Loader Helper Functions # ---- Loader Helper Functions
def __load_registers(self, regs, reg_map, debug_print): def __load_registers(self, regs, reg_map, debug_print):
for register, value in regs.items(): for register, value in regs.items():
@ -307,7 +356,11 @@ class AflUnicornEngine(Uc):
reg_write_retry = False reg_write_retry = False
except Exception as e: except Exception as e:
if debug_print: if debug_print:
print("ERROR writing register: {}, value: {} -- {}".format(register, value, repr(e))) print(
"ERROR writing register: {}, value: {} -- {}".format(
register, value, repr(e)
)
)
if reg_write_retry: if reg_write_retry:
if debug_print: if debug_print:
@ -316,7 +369,11 @@ class AflUnicornEngine(Uc):
self.reg_write(reg_map[register.lower()], int(value, 16)) self.reg_write(reg_map[register.lower()], int(value, 16))
except Exception as e: except Exception as e:
if debug_print: if debug_print:
print("ERROR writing hex string register: {}, value: {} -- {}".format(register, value, repr(e))) print(
"ERROR writing hex string register: {}, value: {} -- {}".format(
register, value, repr(e)
)
)
def __map_segment(self, name, address, size, perms, debug_print=False): def __map_segment(self, name, address, size, perms, debug_print=False):
# - size is unsigned and must be != 0 # - size is unsigned and must be != 0
@ -330,24 +387,30 @@ class AflUnicornEngine(Uc):
if mem_start_aligned != mem_start or mem_end_aligned != mem_end: if mem_start_aligned != mem_start or mem_end_aligned != mem_end:
print("Aligning segment to page boundary:") print("Aligning segment to page boundary:")
print(" name: {}".format(name)) print(" name: {}".format(name))
print(" start: {0:016x} -> {1:016x}".format(mem_start, mem_start_aligned)) print(
" start: {0:016x} -> {1:016x}".format(mem_start, mem_start_aligned)
)
print(" end: {0:016x} -> {1:016x}".format(mem_end, mem_end_aligned)) print(" end: {0:016x} -> {1:016x}".format(mem_end, mem_end_aligned))
print("Mapping segment from {0:016x} - {1:016x} with perm={2}: {3}".format(mem_start_aligned, mem_end_aligned, perms, name)) print(
if(mem_start_aligned < mem_end_aligned): "Mapping segment from {0:016x} - {1:016x} with perm={2}: {3}".format(
mem_start_aligned, mem_end_aligned, perms, name
)
)
if mem_start_aligned < mem_end_aligned:
self.mem_map(mem_start_aligned, mem_end_aligned - mem_start_aligned, perms) self.mem_map(mem_start_aligned, mem_end_aligned - mem_start_aligned, perms)
def __map_segments(self, segment_list, context_directory, debug_print=False): def __map_segments(self, segment_list, context_directory, debug_print=False):
for segment in segment_list: for segment in segment_list:
# Get the segment information from the index # Get the segment information from the index
name = segment['name'] name = segment["name"]
seg_start = segment['start'] seg_start = segment["start"]
seg_end = segment['end'] seg_end = segment["end"]
perms = \ perms = (
(UC_PROT_READ if segment['permissions']['r'] == True else 0) | \ (UC_PROT_READ if segment["permissions"]["r"] == True else 0)
(UC_PROT_WRITE if segment['permissions']['w'] == True else 0) | \ | (UC_PROT_WRITE if segment["permissions"]["w"] == True else 0)
(UC_PROT_EXEC if segment['permissions']['x'] == True else 0) | (UC_PROT_EXEC if segment["permissions"]["x"] == True else 0)
)
if debug_print: if debug_print:
print("Handling segment {}".format(name)) print("Handling segment {}".format(name))
@ -376,48 +439,86 @@ class AflUnicornEngine(Uc):
# Map memory into the address space if it is of an acceptable size. # Map memory into the address space if it is of an acceptable size.
if (seg_end - seg_start) > MAX_ALLOWABLE_SEG_SIZE: if (seg_end - seg_start) > MAX_ALLOWABLE_SEG_SIZE:
if debug_print: if debug_print:
print("Skipping segment (LARGER THAN {0}) from {1:016x} - {2:016x} with perm={3}: {4}".format(MAX_ALLOWABLE_SEG_SIZE, seg_start, seg_end, perms, name)) print(
"Skipping segment (LARGER THAN {0}) from {1:016x} - {2:016x} with perm={3}: {4}".format(
MAX_ALLOWABLE_SEG_SIZE, seg_start, seg_end, perms, name
)
)
continue continue
elif not found: # Make sure it's not already mapped elif not found: # Make sure it's not already mapped
if overlap_start: # Partial overlap (start) if overlap_start: # Partial overlap (start)
self.__map_segment(name, tmp, seg_end - tmp, perms, debug_print) self.__map_segment(name, tmp, seg_end - tmp, perms, debug_print)
elif overlap_end: # Patrial overlap (end) elif overlap_end: # Patrial overlap (end)
self.__map_segment(name, seg_start, tmp - seg_start, perms, debug_print) self.__map_segment(
else: # Not found name, seg_start, tmp - seg_start, perms, debug_print
self.__map_segment(name, seg_start, seg_end - seg_start, perms, debug_print) )
else: # Not found
self.__map_segment(
name, seg_start, seg_end - seg_start, perms, debug_print
)
else: else:
if debug_print: if debug_print:
print("Segment {} already mapped. Moving on.".format(name)) print("Segment {} already mapped. Moving on.".format(name))
# Load the content (if available) # Load the content (if available)
if 'content_file' in segment and len(segment['content_file']) > 0: if "content_file" in segment and len(segment["content_file"]) > 0:
content_file_path = os.path.join(context_directory, segment['content_file']) content_file_path = os.path.join(
context_directory, segment["content_file"]
)
if not os.path.isfile(content_file_path): if not os.path.isfile(content_file_path):
raise Exception("Unable to find segment content file. Expected it to be at {}".format(content_file_path)) raise Exception(
#if debug_print: "Unable to find segment content file. Expected it to be at {}".format(
content_file_path
)
)
# if debug_print:
# print("Loading content for segment {} from {}".format(name, segment['content_file'])) # print("Loading content for segment {} from {}".format(name, segment['content_file']))
content_file = open(content_file_path, 'rb') content_file = open(content_file_path, "rb")
compressed_content = content_file.read() compressed_content = content_file.read()
content_file.close() content_file.close()
self.mem_write(seg_start, zlib.decompress(compressed_content)) self.mem_write(seg_start, zlib.decompress(compressed_content))
else: else:
if debug_print: if debug_print:
print("No content found for segment {0} @ {1:016x}".format(name, seg_start)) print(
self.mem_write(seg_start, b'\x00' * (seg_end - seg_start)) "No content found for segment {0} @ {1:016x}".format(
name, seg_start
)
)
self.mem_write(seg_start, b"\x00" * (seg_end - seg_start))
def __get_arch_and_mode(self, arch_str): def __get_arch_and_mode(self, arch_str):
arch_map = { arch_map = {
"x64" : [ UC_X86_REG_RIP, UC_ARCH_X86, UC_MODE_64 ], "x64": [UC_X86_REG_RIP, UC_ARCH_X86, UC_MODE_64],
"x86" : [ UC_X86_REG_EIP, UC_ARCH_X86, UC_MODE_32 ], "x86": [UC_X86_REG_EIP, UC_ARCH_X86, UC_MODE_32],
"arm64be" : [ UC_ARM64_REG_PC, UC_ARCH_ARM64, UC_MODE_ARM | UC_MODE_BIG_ENDIAN ], "arm64be": [
"arm64le" : [ UC_ARM64_REG_PC, UC_ARCH_ARM64, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN ], UC_ARM64_REG_PC,
"armbe" : [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_BIG_ENDIAN ], UC_ARCH_ARM64,
"armle" : [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN ], UC_MODE_ARM | UC_MODE_BIG_ENDIAN,
"armbethumb": [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_THUMB | UC_MODE_BIG_ENDIAN ], ],
"armlethumb": [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_THUMB | UC_MODE_LITTLE_ENDIAN ], "arm64le": [
"mips" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_BIG_ENDIAN ], UC_ARM64_REG_PC,
"mipsel" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_LITTLE_ENDIAN ], UC_ARCH_ARM64,
UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN,
],
"armbe": [UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_BIG_ENDIAN],
"armle": [UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN],
"armbethumb": [
UC_ARM_REG_PC,
UC_ARCH_ARM,
UC_MODE_THUMB | UC_MODE_BIG_ENDIAN,
],
"armlethumb": [
UC_ARM_REG_PC,
UC_ARCH_ARM,
UC_MODE_THUMB | UC_MODE_LITTLE_ENDIAN,
],
"mips": [UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_BIG_ENDIAN],
"mipsel": [
UC_MIPS_REG_PC,
UC_ARCH_MIPS,
UC_MODE_MIPS32 | UC_MODE_LITTLE_ENDIAN,
],
} }
return (arch_map[arch_str][1], arch_map[arch_str][2]) return (arch_map[arch_str][1], arch_map[arch_str][2])
@ -430,140 +531,140 @@ class AflUnicornEngine(Uc):
arch = "mips" arch = "mips"
registers = { registers = {
"x64" : { "x64": {
"rax": UC_X86_REG_RAX, "rax": UC_X86_REG_RAX,
"rbx": UC_X86_REG_RBX, "rbx": UC_X86_REG_RBX,
"rcx": UC_X86_REG_RCX, "rcx": UC_X86_REG_RCX,
"rdx": UC_X86_REG_RDX, "rdx": UC_X86_REG_RDX,
"rsi": UC_X86_REG_RSI, "rsi": UC_X86_REG_RSI,
"rdi": UC_X86_REG_RDI, "rdi": UC_X86_REG_RDI,
"rbp": UC_X86_REG_RBP, "rbp": UC_X86_REG_RBP,
"rsp": UC_X86_REG_RSP, "rsp": UC_X86_REG_RSP,
"r8": UC_X86_REG_R8, "r8": UC_X86_REG_R8,
"r9": UC_X86_REG_R9, "r9": UC_X86_REG_R9,
"r10": UC_X86_REG_R10, "r10": UC_X86_REG_R10,
"r11": UC_X86_REG_R11, "r11": UC_X86_REG_R11,
"r12": UC_X86_REG_R12, "r12": UC_X86_REG_R12,
"r13": UC_X86_REG_R13, "r13": UC_X86_REG_R13,
"r14": UC_X86_REG_R14, "r14": UC_X86_REG_R14,
"r15": UC_X86_REG_R15, "r15": UC_X86_REG_R15,
"rip": UC_X86_REG_RIP, "rip": UC_X86_REG_RIP,
"efl": UC_X86_REG_EFLAGS, "efl": UC_X86_REG_EFLAGS,
"cs": UC_X86_REG_CS, "cs": UC_X86_REG_CS,
"ds": UC_X86_REG_DS, "ds": UC_X86_REG_DS,
"es": UC_X86_REG_ES, "es": UC_X86_REG_ES,
"fs": UC_X86_REG_FS, "fs": UC_X86_REG_FS,
"gs": UC_X86_REG_GS, "gs": UC_X86_REG_GS,
"ss": UC_X86_REG_SS, "ss": UC_X86_REG_SS,
}, },
"x86" : { "x86": {
"eax": UC_X86_REG_EAX, "eax": UC_X86_REG_EAX,
"ebx": UC_X86_REG_EBX, "ebx": UC_X86_REG_EBX,
"ecx": UC_X86_REG_ECX, "ecx": UC_X86_REG_ECX,
"edx": UC_X86_REG_EDX, "edx": UC_X86_REG_EDX,
"esi": UC_X86_REG_ESI, "esi": UC_X86_REG_ESI,
"edi": UC_X86_REG_EDI, "edi": UC_X86_REG_EDI,
"ebp": UC_X86_REG_EBP, "ebp": UC_X86_REG_EBP,
"eip": UC_X86_REG_EIP, "eip": UC_X86_REG_EIP,
"esp": UC_X86_REG_ESP, "esp": UC_X86_REG_ESP,
"efl": UC_X86_REG_EFLAGS, "efl": UC_X86_REG_EFLAGS,
# Segment registers removed... # Segment registers removed...
# They caused segfaults (from unicorn?) when they were here # They caused segfaults (from unicorn?) when they were here
}, },
"arm" : { "arm": {
"r0": UC_ARM_REG_R0, "r0": UC_ARM_REG_R0,
"r1": UC_ARM_REG_R1, "r1": UC_ARM_REG_R1,
"r2": UC_ARM_REG_R2, "r2": UC_ARM_REG_R2,
"r3": UC_ARM_REG_R3, "r3": UC_ARM_REG_R3,
"r4": UC_ARM_REG_R4, "r4": UC_ARM_REG_R4,
"r5": UC_ARM_REG_R5, "r5": UC_ARM_REG_R5,
"r6": UC_ARM_REG_R6, "r6": UC_ARM_REG_R6,
"r7": UC_ARM_REG_R7, "r7": UC_ARM_REG_R7,
"r8": UC_ARM_REG_R8, "r8": UC_ARM_REG_R8,
"r9": UC_ARM_REG_R9, "r9": UC_ARM_REG_R9,
"r10": UC_ARM_REG_R10, "r10": UC_ARM_REG_R10,
"r11": UC_ARM_REG_R11, "r11": UC_ARM_REG_R11,
"r12": UC_ARM_REG_R12, "r12": UC_ARM_REG_R12,
"pc": UC_ARM_REG_PC, "pc": UC_ARM_REG_PC,
"sp": UC_ARM_REG_SP, "sp": UC_ARM_REG_SP,
"lr": UC_ARM_REG_LR, "lr": UC_ARM_REG_LR,
"cpsr": UC_ARM_REG_CPSR
},
"arm64" : {
"x0": UC_ARM64_REG_X0,
"x1": UC_ARM64_REG_X1,
"x2": UC_ARM64_REG_X2,
"x3": UC_ARM64_REG_X3,
"x4": UC_ARM64_REG_X4,
"x5": UC_ARM64_REG_X5,
"x6": UC_ARM64_REG_X6,
"x7": UC_ARM64_REG_X7,
"x8": UC_ARM64_REG_X8,
"x9": UC_ARM64_REG_X9,
"x10": UC_ARM64_REG_X10,
"x11": UC_ARM64_REG_X11,
"x12": UC_ARM64_REG_X12,
"x13": UC_ARM64_REG_X13,
"x14": UC_ARM64_REG_X14,
"x15": UC_ARM64_REG_X15,
"x16": UC_ARM64_REG_X16,
"x17": UC_ARM64_REG_X17,
"x18": UC_ARM64_REG_X18,
"x19": UC_ARM64_REG_X19,
"x20": UC_ARM64_REG_X20,
"x21": UC_ARM64_REG_X21,
"x22": UC_ARM64_REG_X22,
"x23": UC_ARM64_REG_X23,
"x24": UC_ARM64_REG_X24,
"x25": UC_ARM64_REG_X25,
"x26": UC_ARM64_REG_X26,
"x27": UC_ARM64_REG_X27,
"x28": UC_ARM64_REG_X28,
"pc": UC_ARM64_REG_PC,
"sp": UC_ARM64_REG_SP,
"fp": UC_ARM64_REG_FP,
"lr": UC_ARM64_REG_LR,
"nzcv": UC_ARM64_REG_NZCV,
"cpsr": UC_ARM_REG_CPSR, "cpsr": UC_ARM_REG_CPSR,
}, },
"mips" : { "arm64": {
"0" : UC_MIPS_REG_ZERO, "x0": UC_ARM64_REG_X0,
"at": UC_MIPS_REG_AT, "x1": UC_ARM64_REG_X1,
"v0": UC_MIPS_REG_V0, "x2": UC_ARM64_REG_X2,
"v1": UC_MIPS_REG_V1, "x3": UC_ARM64_REG_X3,
"a0": UC_MIPS_REG_A0, "x4": UC_ARM64_REG_X4,
"a1": UC_MIPS_REG_A1, "x5": UC_ARM64_REG_X5,
"a2": UC_MIPS_REG_A2, "x6": UC_ARM64_REG_X6,
"a3": UC_MIPS_REG_A3, "x7": UC_ARM64_REG_X7,
"t0": UC_MIPS_REG_T0, "x8": UC_ARM64_REG_X8,
"t1": UC_MIPS_REG_T1, "x9": UC_ARM64_REG_X9,
"t2": UC_MIPS_REG_T2, "x10": UC_ARM64_REG_X10,
"t3": UC_MIPS_REG_T3, "x11": UC_ARM64_REG_X11,
"t4": UC_MIPS_REG_T4, "x12": UC_ARM64_REG_X12,
"t5": UC_MIPS_REG_T5, "x13": UC_ARM64_REG_X13,
"t6": UC_MIPS_REG_T6, "x14": UC_ARM64_REG_X14,
"t7": UC_MIPS_REG_T7, "x15": UC_ARM64_REG_X15,
"t8": UC_MIPS_REG_T8, "x16": UC_ARM64_REG_X16,
"t9": UC_MIPS_REG_T9, "x17": UC_ARM64_REG_X17,
"s0": UC_MIPS_REG_S0, "x18": UC_ARM64_REG_X18,
"s1": UC_MIPS_REG_S1, "x19": UC_ARM64_REG_X19,
"s2": UC_MIPS_REG_S2, "x20": UC_ARM64_REG_X20,
"s3": UC_MIPS_REG_S3, "x21": UC_ARM64_REG_X21,
"s4": UC_MIPS_REG_S4, "x22": UC_ARM64_REG_X22,
"s5": UC_MIPS_REG_S5, "x23": UC_ARM64_REG_X23,
"s6": UC_MIPS_REG_S6, "x24": UC_ARM64_REG_X24,
"s7": UC_MIPS_REG_S7, "x25": UC_ARM64_REG_X25,
"s8": UC_MIPS_REG_S8, "x26": UC_ARM64_REG_X26,
"k0": UC_MIPS_REG_K0, "x27": UC_ARM64_REG_X27,
"k1": UC_MIPS_REG_K1, "x28": UC_ARM64_REG_X28,
"gp": UC_MIPS_REG_GP, "pc": UC_ARM64_REG_PC,
"pc": UC_MIPS_REG_PC, "sp": UC_ARM64_REG_SP,
"sp": UC_MIPS_REG_SP, "fp": UC_ARM64_REG_FP,
"fp": UC_MIPS_REG_FP, "lr": UC_ARM64_REG_LR,
"ra": UC_MIPS_REG_RA, "nzcv": UC_ARM64_REG_NZCV,
"hi": UC_MIPS_REG_HI, "cpsr": UC_ARM_REG_CPSR,
"lo": UC_MIPS_REG_LO },
} "mips": {
"0": UC_MIPS_REG_ZERO,
"at": UC_MIPS_REG_AT,
"v0": UC_MIPS_REG_V0,
"v1": UC_MIPS_REG_V1,
"a0": UC_MIPS_REG_A0,
"a1": UC_MIPS_REG_A1,
"a2": UC_MIPS_REG_A2,
"a3": UC_MIPS_REG_A3,
"t0": UC_MIPS_REG_T0,
"t1": UC_MIPS_REG_T1,
"t2": UC_MIPS_REG_T2,
"t3": UC_MIPS_REG_T3,
"t4": UC_MIPS_REG_T4,
"t5": UC_MIPS_REG_T5,
"t6": UC_MIPS_REG_T6,
"t7": UC_MIPS_REG_T7,
"t8": UC_MIPS_REG_T8,
"t9": UC_MIPS_REG_T9,
"s0": UC_MIPS_REG_S0,
"s1": UC_MIPS_REG_S1,
"s2": UC_MIPS_REG_S2,
"s3": UC_MIPS_REG_S3,
"s4": UC_MIPS_REG_S4,
"s5": UC_MIPS_REG_S5,
"s6": UC_MIPS_REG_S6,
"s7": UC_MIPS_REG_S7,
"s8": UC_MIPS_REG_S8,
"k0": UC_MIPS_REG_K0,
"k1": UC_MIPS_REG_K1,
"gp": UC_MIPS_REG_GP,
"pc": UC_MIPS_REG_PC,
"sp": UC_MIPS_REG_SP,
"fp": UC_MIPS_REG_FP,
"ra": UC_MIPS_REG_RA,
"hi": UC_MIPS_REG_HI,
"lo": UC_MIPS_REG_LO,
},
} }
return registers[arch] return registers[arch]
@ -577,51 +678,50 @@ class AflUnicornEngine(Uc):
arch = "mips" arch = "mips"
registers = { registers = {
"arm": { "arm": {
"d0": UC_ARM_REG_D0, "d0": UC_ARM_REG_D0,
"d1": UC_ARM_REG_D1, "d1": UC_ARM_REG_D1,
"d2": UC_ARM_REG_D2, "d2": UC_ARM_REG_D2,
"d3": UC_ARM_REG_D3, "d3": UC_ARM_REG_D3,
"d4": UC_ARM_REG_D4, "d4": UC_ARM_REG_D4,
"d5": UC_ARM_REG_D5, "d5": UC_ARM_REG_D5,
"d6": UC_ARM_REG_D6, "d6": UC_ARM_REG_D6,
"d7": UC_ARM_REG_D7, "d7": UC_ARM_REG_D7,
"d8": UC_ARM_REG_D8, "d8": UC_ARM_REG_D8,
"d9": UC_ARM_REG_D9, "d9": UC_ARM_REG_D9,
"d10": UC_ARM_REG_D10, "d10": UC_ARM_REG_D10,
"d11": UC_ARM_REG_D11, "d11": UC_ARM_REG_D11,
"d12": UC_ARM_REG_D12, "d12": UC_ARM_REG_D12,
"d13": UC_ARM_REG_D13, "d13": UC_ARM_REG_D13,
"d14": UC_ARM_REG_D14, "d14": UC_ARM_REG_D14,
"d15": UC_ARM_REG_D15, "d15": UC_ARM_REG_D15,
"d16": UC_ARM_REG_D16, "d16": UC_ARM_REG_D16,
"d17": UC_ARM_REG_D17, "d17": UC_ARM_REG_D17,
"d18": UC_ARM_REG_D18, "d18": UC_ARM_REG_D18,
"d19": UC_ARM_REG_D19, "d19": UC_ARM_REG_D19,
"d20": UC_ARM_REG_D20, "d20": UC_ARM_REG_D20,
"d21": UC_ARM_REG_D21, "d21": UC_ARM_REG_D21,
"d22": UC_ARM_REG_D22, "d22": UC_ARM_REG_D22,
"d23": UC_ARM_REG_D23, "d23": UC_ARM_REG_D23,
"d24": UC_ARM_REG_D24, "d24": UC_ARM_REG_D24,
"d25": UC_ARM_REG_D25, "d25": UC_ARM_REG_D25,
"d26": UC_ARM_REG_D26, "d26": UC_ARM_REG_D26,
"d27": UC_ARM_REG_D27, "d27": UC_ARM_REG_D27,
"d28": UC_ARM_REG_D28, "d28": UC_ARM_REG_D28,
"d29": UC_ARM_REG_D29, "d29": UC_ARM_REG_D29,
"d30": UC_ARM_REG_D30, "d30": UC_ARM_REG_D30,
"d31": UC_ARM_REG_D31, "d31": UC_ARM_REG_D31,
"fpscr": UC_ARM_REG_FPSCR "fpscr": UC_ARM_REG_FPSCR,
} }
} }
return registers[arch]; return registers[arch]
#---------------------------
# ---------------------------
# Callbacks for tracing # Callbacks for tracing
# TODO: Extra mode for Capstone (i.e. Cs(cs_arch, cs_mode + cs_extra) not implemented # TODO: Extra mode for Capstone (i.e. Cs(cs_arch, cs_mode + cs_extra) not implemented
def __trace_instruction(self, uc, address, size, user_data): def __trace_instruction(self, uc, address, size, user_data):
if CAPSTONE_EXISTS == 1: if CAPSTONE_EXISTS == 1:
# If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary.
@ -651,11 +751,23 @@ class AflUnicornEngine(Uc):
cs = Cs(cs_arch, cs_mode) cs = Cs(cs_arch, cs_mode)
mem = uc.mem_read(address, size) mem = uc.mem_read(address, size)
if bit_size == 4: if bit_size == 4:
for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
print(" Instr: {:#08x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) bytes(mem), size
):
print(
" Instr: {:#08x}:\t{}\t{}".format(
address, cs_mnemonic, cs_opstr
)
)
else: else:
for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(
print(" Instr: {:#16x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) bytes(mem), size
):
print(
" Instr: {:#16x}:\t{}\t{}".format(
address, cs_mnemonic, cs_opstr
)
)
else: else:
print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size))
@ -664,15 +776,27 @@ class AflUnicornEngine(Uc):
def __trace_mem_access(self, uc, access, address, size, value, user_data): def __trace_mem_access(self, uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE: if access == UC_MEM_WRITE:
print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size))
def __trace_mem_invalid_access(self, uc, access, address, size, value, user_data): def __trace_mem_invalid_access(self, uc, access, address, size, value, user_data):
if access == UC_MEM_WRITE_UNMAPPED: if access == UC_MEM_WRITE_UNMAPPED:
print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) print(
" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(
address, size, value
)
)
else: else:
print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) print(
" >>> INVALID Read: addr=0x{0:016x} size={1}".format(
address, size
)
)
def bit_size_arch(self): def bit_size_arch(self):
arch = self.get_arch() arch = self.get_arch()