custom python mutators in afl-tmin

This commit is contained in:
Yassine Ferhane
2025-03-28 00:36:12 +01:00
parent 8e4823e7ed
commit 542233e1ce

View File

@ -56,6 +56,9 @@
#endif
#include <sys/stat.h>
#include <sys/types.h>
#ifdef USE_PYTHON
#include "afl-fuzz-python.h"
#endif
#include <sys/resource.h>
static u8 *mask_bitmap; /* Mask for trace bits (-B) */
@ -73,6 +76,11 @@ static u32 in_len, /* Input data length */
static u64 orig_cksum; /* Original checksum */
#ifdef USE_PYTHON
py_mutator_t *py_mutator = NULL;
u8 use_py_trimmer = 0;
#endif
static u8 crash_mode, /* Crash-centric mode? */
hang_mode, /* Minimize as long as it hangs */
exit_crash, /* Treat non-zero exit as crash? */
@ -365,6 +373,184 @@ static void minimize(afl_forkserver_t *fsrv) {
u32 syms_removed, alpha_del0 = 0, alpha_del1, alpha_del2, alpha_d_total = 0;
u8 changed_any, prev_del;
#ifdef USE_PYTHON
// Try to load python module
char *py_module = getenv("AFL_PYTHON_MODULE");
if (py_module) {
py_mutator = init_py_module(afl, py_module);
// Check trimming functions
if (py_mutator &&
py_mutator->py_functions[PY_FUNC_INIT_TRIM] &&
py_mutator->py_functions[PY_FUNC_TRIM_STEP] &&
py_mutator->py_functions[PY_FUNC_GET_TRIM_SIZE] &&
py_mutator->py_functions[PY_FUNC_GET_TRIM_BUF]) {
use_py_trimmer = 1;
ACTF("Loaded Python module for custom trimming: %s", py_module);
}
}
if (use_py_trimmer) {
ACTF("Performing custom trim with Python mutator...");
// Initialize the trimmer
PyObject *py_args, *py_value;
py_args = PyTuple_New(1);
py_value = PyByteArray_FromStringAndSize(in_data, in_len);
if (!py_value) {
Py_DECREF(py_args);
FATAL("Failed to convert arguments in init_trim");
}
PyTuple_SetItem(py_args, 0, py_value);
py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_INIT_TRIM], py_args);
Py_DECREF(py_args);
s32 initial_steps = 0;
if (py_value != NULL) {
#if PY_MAJOR_VERSION >= 3
initial_steps = (s32)PyLong_AsLong(py_value);
#else
initial_steps = (s32)PyInt_AsLong(py_value);
#endif
Py_DECREF(py_value);
} else {
PyErr_Print();
FATAL("Call to init_trim() failed");
}
if (initial_steps <= 0) {
WARNF("Python init_trim returned %d, skipping custom trim", initial_steps);
} else {
ACTF("Python trimmer initialized, %d steps planned", initial_steps);
u32 trim_rounds = 0;
u32 trimmed_successfully = 0;
// Trim loop
while (1) {
// Check if we can continue trimming
py_args = PyTuple_New(0);
py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_TRIM_STEP], py_args);
Py_DECREF(py_args);
u32 can_trim = 0;
if (py_value != NULL) {
#if PY_MAJOR_VERSION >= 3
can_trim = (u32)PyLong_AsLong(py_value);
#else
can_trim = (u32)PyInt_AsLong(py_value);
#endif
Py_DECREF(py_value);
} else {
PyErr_Print();
FATAL("Call to trim_step() failed");
}
if (!can_trim) break;
trim_rounds++;
// Get trimmed test case size
py_args = PyTuple_New(0);
py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_GET_TRIM_SIZE], py_args);
Py_DECREF(py_args);
size_t trimmed_size = 0;
if (py_value != NULL) {
#if PY_MAJOR_VERSION >= 3
trimmed_size = (size_t)PyLong_AsLong(py_value);
#else
trimmed_size = (size_t)PyInt_AsLong(py_value);
#endif
Py_DECREF(py_value);
} else {
PyErr_Print();
FATAL("Call to get_trim_size() failed");
}
if (trimmed_size >= in_len) {
SAYF("[Python trim] Round %u: no improvements over %zu bytes.\n",
trim_rounds, in_len);
continue;
}
SAYF("[Python trim] Round %u: reduced from %zu to %zu bytes.\n",
trim_rounds, in_len, trimmed_size);
// Get trimmed buffer
py_args = PyTuple_New(0);
py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_GET_TRIM_BUF], py_args);
Py_DECREF(py_args);
u8* trimmed_buf = NULL;
if (py_value != NULL) {
char* bytes;
size_t len;
if (PyByteArray_Check(py_value)) {
bytes = PyByteArray_AsString(py_value);
len = PyByteArray_Size(py_value);
} else if (PyBytes_Check(py_value)) {
bytes = PyBytes_AsString(py_value);
len = PyBytes_Size(py_value);
} else {
Py_DECREF(py_value);
FATAL("get_trim_buf should return bytes or bytearray");
}
trimmed_buf = ck_alloc(len);
memcpy(trimmed_buf, bytes, len);
Py_DECREF(py_value);
} else {
PyErr_Print();
FATAL("Call to get_trim_buf() failed");
}
// Test if the trimmed case still works
if (!tmin_run_target(fsrv, trimmed_buf, trimmed_size, 0)) {
SAYF("[Python trim] But the testcase no longer crashes - skipping this reduction.\n");
ck_free(trimmed_buf);
continue;
}
// Accept the reduction
memcpy(in_data, trimmed_buf, trimmed_size);
in_len = trimmed_size;
ck_free(trimmed_buf);
trimmed_successfully = 1;
// Signal success to the mutator
py_args = PyTuple_New(1);
py_value = PyBool_FromLong(1);
PyTuple_SetItem(py_args, 0, py_value);
py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_POST_TRIM], py_args);
Py_DECREF(py_args);
if (py_value != NULL) {
Py_DECREF(py_value);
}
}
// Finalize trim process
py_args = PyTuple_New(0);
py_value = PyObject_CallObject(py_mutator->py_functions[PY_FUNC_TRIM_END], py_args);
Py_DECREF(py_args);
if (py_value != NULL) {
Py_DECREF(py_value);
}
ACTF("Custom trimming complete after %u rounds, reduced: %s",
trim_rounds, trimmed_successfully ? "yes" : "no");
if (trimmed_successfully && !exact_mode) {
if (tmp_buf) { ck_free(tmp_buf); }
return; // Skip standard minimization if successful
}
}
}
#endif
/***********************
* BLOCK NORMALIZATION *
***********************/
@ -1181,6 +1367,21 @@ int main(int argc, char **argv_orig, char **envp) {
exact_mode = !!get_afl_env("AFL_TMIN_EXACT");
#ifdef USE_PYTHON
if (py_mutator) {
if (py_mutator->py_module != NULL) {
u32 i;
for (i = 0; i < PY_FUNC_COUNT; ++i) {
Py_XDECREF(py_mutator->py_functions[i]);
}
Py_DECREF(py_mutator->py_module);
}
Py_Finalize();
free(py_mutator);
}
#endif
if (hang_mode && exact_mode) {
SAYF("AFL_TMIN_EXACT won't work for loops in hang mode, ignoring.");