From 542233e1ce9b7f4e19dc8ff2d287bf461c87ef90 Mon Sep 17 00:00:00 2001 From: Yassine Ferhane <105550481+gitToki@users.noreply.github.com> Date: Fri, 28 Mar 2025 00:36:12 +0100 Subject: [PATCH] custom python mutators in afl-tmin --- src/afl-tmin.c | 201 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) diff --git a/src/afl-tmin.c b/src/afl-tmin.c index c44cb091..d9d204bc 100644 --- a/src/afl-tmin.c +++ b/src/afl-tmin.c @@ -56,6 +56,9 @@ #endif #include #include +#ifdef USE_PYTHON + #include "afl-fuzz-python.h" +#endif #include static u8 *mask_bitmap; /* Mask for trace bits (-B) */ @@ -73,6 +76,11 @@ static u32 in_len, /* Input data length */ static u64 orig_cksum; /* Original checksum */ +#ifdef USE_PYTHON + py_mutator_t *py_mutator = NULL; + u8 use_py_trimmer = 0; +#endif + static u8 crash_mode, /* Crash-centric mode? */ hang_mode, /* Minimize as long as it hangs */ exit_crash, /* Treat non-zero exit as crash? */ @@ -365,6 +373,184 @@ static void minimize(afl_forkserver_t *fsrv) { u32 syms_removed, alpha_del0 = 0, alpha_del1, alpha_del2, alpha_d_total = 0; u8 changed_any, prev_del; + #ifdef USE_PYTHON + // Try to load python module + char *py_module = getenv("AFL_PYTHON_MODULE"); + if (py_module) { + py_mutator = init_py_module(afl, py_module); + + // Check trimming functions + if (py_mutator && + py_mutator->py_functions[PY_FUNC_INIT_TRIM] && + py_mutator->py_functions[PY_FUNC_TRIM_STEP] && + py_mutator->py_functions[PY_FUNC_GET_TRIM_SIZE] && + py_mutator->py_functions[PY_FUNC_GET_TRIM_BUF]) { + use_py_trimmer = 1; + ACTF("Loaded Python module for custom trimming: %s", py_module); + } + } + + if (use_py_trimmer) { + ACTF("Performing custom trim with Python mutator..."); + + // Initialize the trimmer + PyObject *py_args, *py_value; + py_args = PyTuple_New(1); + py_value = PyByteArray_FromStringAndSize(in_data, in_len); + if (!py_value) { + Py_DECREF(py_args); + FATAL("Failed to convert arguments in init_trim"); + } + + PyTuple_SetItem(py_args, 0, py_value); + py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_INIT_TRIM], py_args); + Py_DECREF(py_args); + + s32 initial_steps = 0; + if (py_value != NULL) { + #if PY_MAJOR_VERSION >= 3 + initial_steps = (s32)PyLong_AsLong(py_value); + #else + initial_steps = (s32)PyInt_AsLong(py_value); + #endif + Py_DECREF(py_value); + } else { + PyErr_Print(); + FATAL("Call to init_trim() failed"); + } + + if (initial_steps <= 0) { + WARNF("Python init_trim returned %d, skipping custom trim", initial_steps); + } else { + ACTF("Python trimmer initialized, %d steps planned", initial_steps); + + u32 trim_rounds = 0; + u32 trimmed_successfully = 0; + + // Trim loop + while (1) { + // Check if we can continue trimming + py_args = PyTuple_New(0); + py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_TRIM_STEP], py_args); + Py_DECREF(py_args); + + u32 can_trim = 0; + if (py_value != NULL) { + #if PY_MAJOR_VERSION >= 3 + can_trim = (u32)PyLong_AsLong(py_value); + #else + can_trim = (u32)PyInt_AsLong(py_value); + #endif + Py_DECREF(py_value); + } else { + PyErr_Print(); + FATAL("Call to trim_step() failed"); + } + + if (!can_trim) break; + + trim_rounds++; + + // Get trimmed test case size + py_args = PyTuple_New(0); + py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_GET_TRIM_SIZE], py_args); + Py_DECREF(py_args); + + size_t trimmed_size = 0; + if (py_value != NULL) { + #if PY_MAJOR_VERSION >= 3 + trimmed_size = (size_t)PyLong_AsLong(py_value); + #else + trimmed_size = (size_t)PyInt_AsLong(py_value); + #endif + Py_DECREF(py_value); + } else { + PyErr_Print(); + FATAL("Call to get_trim_size() failed"); + } + + if (trimmed_size >= in_len) { + SAYF("[Python trim] Round %u: no improvements over %zu bytes.\n", + trim_rounds, in_len); + continue; + } + + SAYF("[Python trim] Round %u: reduced from %zu to %zu bytes.\n", + trim_rounds, in_len, trimmed_size); + + // Get trimmed buffer + py_args = PyTuple_New(0); + py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_GET_TRIM_BUF], py_args); + Py_DECREF(py_args); + + u8* trimmed_buf = NULL; + if (py_value != NULL) { + char* bytes; + size_t len; + + if (PyByteArray_Check(py_value)) { + bytes = PyByteArray_AsString(py_value); + len = PyByteArray_Size(py_value); + } else if (PyBytes_Check(py_value)) { + bytes = PyBytes_AsString(py_value); + len = PyBytes_Size(py_value); + } else { + Py_DECREF(py_value); + FATAL("get_trim_buf should return bytes or bytearray"); + } + + trimmed_buf = ck_alloc(len); + memcpy(trimmed_buf, bytes, len); + Py_DECREF(py_value); + } else { + PyErr_Print(); + FATAL("Call to get_trim_buf() failed"); + } + + // Test if the trimmed case still works + if (!tmin_run_target(fsrv, trimmed_buf, trimmed_size, 0)) { + SAYF("[Python trim] But the testcase no longer crashes - skipping this reduction.\n"); + ck_free(trimmed_buf); + continue; + } + + // Accept the reduction + memcpy(in_data, trimmed_buf, trimmed_size); + in_len = trimmed_size; + ck_free(trimmed_buf); + + trimmed_successfully = 1; + + // Signal success to the mutator + py_args = PyTuple_New(1); + py_value = PyBool_FromLong(1); + PyTuple_SetItem(py_args, 0, py_value); + py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_POST_TRIM], py_args); + Py_DECREF(py_args); + if (py_value != NULL) { + Py_DECREF(py_value); + } + } + + // Finalize trim process + py_args = PyTuple_New(0); + py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_TRIM_END], py_args); + Py_DECREF(py_args); + if (py_value != NULL) { + Py_DECREF(py_value); + } + + ACTF("Custom trimming complete after %u rounds, reduced: %s", + trim_rounds, trimmed_successfully ? "yes" : "no"); + + if (trimmed_successfully && !exact_mode) { + if (tmp_buf) { ck_free(tmp_buf); } + return; // Skip standard minimization if successful + } + } + } +#endif + /*********************** * BLOCK NORMALIZATION * ***********************/ @@ -1181,6 +1367,21 @@ int main(int argc, char **argv_orig, char **envp) { exact_mode = !!get_afl_env("AFL_TMIN_EXACT"); + #ifdef USE_PYTHON + if (py_mutator) { + if (py_mutator->py_module != NULL) { + u32 i; + for (i = 0; i < PY_FUNC_COUNT; ++i) { + Py_XDECREF(py_mutator->py_functions[i]); + } + Py_DECREF(py_mutator->py_module); + } + + Py_Finalize(); + free(py_mutator); + } + #endif + if (hang_mode && exact_mode) { SAYF("AFL_TMIN_EXACT won't work for loops in hang mode, ignoring.");