diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py new file mode 100644 index 00000000..afb0649d --- /dev/null +++ b/tests/execution/test_execution.py @@ -0,0 +1,30 @@ +import subprocess +import platform +from pathlib import Path +from ascii_colors import get_trace_exception, trace_exception + +if __name__ == "__main__": + # Create a temporary file. + code = "print('Hello world');input('hi')" + message_id=102 + root_folder = Path(r"E:\lollms\discussion_databases\html stuff\105") + root_folder.mkdir(parents=True, exist_ok=True) + tmp_file = root_folder / f"ai_code_{message_id}.py" + with open(tmp_file, "w", encoding="utf8") as f: + f.write(code) + + try: + # Determine the platform and open a terminal to execute the Python code. + system = platform.system() + if system == "Windows": + subprocess.Popen(f"""start cmd /k "cd /d {root_folder} && python {tmp_file} && pause" """, shell=True) + elif system == "Darwin": # macOS + subprocess.Popen(["open", "-a", "Terminal", f'cd "{root_folder}" && python "{tmp_file}"'], shell=True) + elif system == "Linux": + subprocess.Popen(["x-terminal-emulator", "-e", f'bash -c "cd \\"{root_folder}\\" && python \\"{tmp_file}\\"; exec bash"'], shell=True) + else: + raise Exception(f"Unsupported platform: {system}") + + except Exception as ex: + error_message = f"Error executing Python code: {ex}" + error_json = {"output": "
" + error_message + "\n" + get_trace_exception(ex) + "
", "execution_time": 0} diff --git a/utilities/execution_engines/python_execution_engine.py b/utilities/execution_engines/python_execution_engine.py index 0828fed5..546b6fd2 100644 --- a/utilities/execution_engines/python_execution_engine.py +++ b/utilities/execution_engines/python_execution_engine.py @@ -13,10 +13,71 @@ import time import subprocess import json from lollms.client_session import Client - +import platform +from pathlib import Path lollmsElfServer:LOLLMSWebUI = LOLLMSWebUI.get_instance() -def execute_python(code, client:Client, message_id, build_file=True): + +def execute_python(code, client, message_id, build_file=True): + def spawn_process(code): + """Executes Python code in a new terminal and returns the output as JSON.""" + + # Start the timer. + start_time = time.time() + + # Create a temporary file. + root_folder = client.discussion.discussion_folder + root_folder.mkdir(parents=True, exist_ok=True) + tmp_file = root_folder / f"ai_code_{message_id}.py" + with open(tmp_file, "w", encoding="utf8") as f: + f.write(code) + + try: + # Determine the platform and open a terminal to execute the Python code. + system = platform.system() + if system == "Windows": + process = subprocess.Popen(f"""start cmd /k "cd /d "{root_folder}" && python "{tmp_file}" && pause" """, shell=True) + elif system == "Darwin": # macOS + process = subprocess.Popen(["open", "-a", "Terminal", f'cd "{root_folder}" && python "{tmp_file}"'], shell=True) + elif system == "Linux": + process = subprocess.Popen(["x-terminal-emulator", "-e", f'bash -c "cd \\"{root_folder}\\" && python \\"{tmp_file}\\"; exec bash"'], shell=True) + else: + raise Exception(f"Unsupported platform: {system}") + + # Wait for the process to complete. + process.wait() + + # Get the output and error from the process. + output, error = process.communicate() + except Exception as ex: + # Stop the timer. + execution_time = time.time() - start_time + error_message = f"Error executing Python code: {ex}" + error_json = {"output": "
" + error_message + "\n" + get_trace_exception(ex) + "
", "execution_time": execution_time} + return error_json + + # Stop the timer. + execution_time = time.time() - start_time + + # Check if the process was successful. + if process.returncode != 0: + # The child process threw an exception. + try: + error_message = f"Output: {output.decode('utf-8', errors='ignore')}\nError executing Python code:\n{error.decode('utf-8', errors='ignore')}" + except: + error_message = f"Error executing Python code:\n{error}" + error_json = {"output": "
" + error_message + "
", "execution_time": execution_time} + return error_json + + # The child process was successful. + output_json = {"output": output.decode("utf8"), "execution_time": execution_time} + return output_json + + return spawn_process(code) + + + +def execute_python_old(code, client:Client, message_id, build_file=True): def spawn_process(code): """Executes Python code and returns the output as JSON.""" @@ -65,3 +126,37 @@ def execute_python(code, client:Client, message_id, build_file=True): output_json = {"output": output.decode("utf8"), "execution_time": execution_time} return output_json return spawn_process(code) + + +def create_and_execute_script(code, message_id, root_folder): + try: + # Ensure the root folder exists + root_folder = Path(root_folder) + root_folder.mkdir(parents=True, exist_ok=True) + + # Create the temporary Python file + tmp_file = root_folder / f"ai_code_{message_id}.py" + with open(tmp_file, "w", encoding="utf8") as f: + f.write(code) + + # Determine the platform and open a terminal to execute the Python code + system = platform.system() + if system == "Windows": + subprocess.Popen(f"""start cmd /k "cd /d "{root_folder}" && python "{tmp_file}" && pause" """, shell=True) + elif system == "Darwin": # macOS + subprocess.Popen(["open", "-a", "Terminal", f'cd "{root_folder}" && python "{tmp_file}"'], shell=True) + elif system == "Linux": + subprocess.Popen(["x-terminal-emulator", "-e", f'bash -c "cd \\"{root_folder}\\" && python \\"{tmp_file}\\"; exec bash"'], shell=True) + else: + raise Exception(f"Unsupported platform: {system}") + + except Exception as ex: + error_message = f"Error executing Python code: {ex}" + error_json = {"output": "
" + error_message + "\n" + get_trace_exception(ex) + "
", "execution_time": 0} + print(error_json) + +if __name__ == "__main__": + code = "print('Hello world');input('hi')" + message_id = 102 + root_folder = r"E:\lollms\discussion_databases\html stuff\105" + create_and_execute_script(code, message_id, root_folder) \ No newline at end of file