2024-01-07 22:17:22 +00:00
|
|
|
"""
|
|
|
|
project: lollms_webui
|
|
|
|
file: python_execution_engine.py
|
|
|
|
author: ParisNeo
|
|
|
|
description:
|
|
|
|
This is a utility for executing python code
|
|
|
|
|
|
|
|
"""
|
2024-12-19 12:48:57 +00:00
|
|
|
|
2024-01-07 22:17:22 +00:00
|
|
|
import json
|
2024-07-13 11:59:46 +00:00
|
|
|
import platform
|
2024-12-19 12:48:57 +00:00
|
|
|
import subprocess
|
|
|
|
import time
|
2024-07-13 11:59:46 +00:00
|
|
|
from pathlib import Path
|
2024-12-19 12:48:57 +00:00
|
|
|
|
|
|
|
from ascii_colors import get_trace_exception, trace_exception
|
|
|
|
from fastapi import routing
|
|
|
|
from lollms.client_session import Client
|
|
|
|
|
|
|
|
from lollms_webui import LOLLMSWebUI
|
|
|
|
|
|
|
|
lollmsElfServer: LOLLMSWebUI = LOLLMSWebUI.get_instance()
|
2024-01-07 22:17:22 +00:00
|
|
|
|
2024-07-13 11:59:46 +00:00
|
|
|
|
|
|
|
def execute_python(code, client, message_id, build_file=True):
|
|
|
|
def spawn_process(code):
|
|
|
|
"""Executes Python code in a new terminal and returns the output as JSON."""
|
|
|
|
|
|
|
|
# Start the timer.
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
# Create a temporary file.
|
|
|
|
root_folder = client.discussion.discussion_folder
|
|
|
|
root_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
tmp_file = root_folder / f"ai_code_{message_id}.py"
|
|
|
|
with open(tmp_file, "w", encoding="utf8") as f:
|
|
|
|
f.write(code)
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Determine the platform and open a terminal to execute the Python code.
|
|
|
|
system = platform.system()
|
|
|
|
if system == "Windows":
|
2024-12-19 12:48:57 +00:00
|
|
|
process = subprocess.Popen(
|
|
|
|
f"""start cmd /k "cd /d "{root_folder}" && python "{tmp_file}" && pause" """,
|
|
|
|
shell=True,
|
|
|
|
)
|
2024-07-13 11:59:46 +00:00
|
|
|
elif system == "Darwin": # macOS
|
2024-12-19 12:48:57 +00:00
|
|
|
process = subprocess.Popen(
|
|
|
|
[
|
|
|
|
"open",
|
|
|
|
"-a",
|
|
|
|
"Terminal",
|
|
|
|
f'cd "{root_folder}" && python "{tmp_file}"',
|
|
|
|
],
|
|
|
|
shell=True,
|
|
|
|
)
|
2024-07-13 11:59:46 +00:00
|
|
|
elif system == "Linux":
|
2024-12-19 12:48:57 +00:00
|
|
|
process = subprocess.Popen(
|
|
|
|
[
|
|
|
|
"x-terminal-emulator",
|
|
|
|
"-e",
|
|
|
|
f'bash -c "cd \\"{root_folder}\\" && python \\"{tmp_file}\\"; exec bash"',
|
|
|
|
],
|
|
|
|
shell=True,
|
|
|
|
)
|
2024-07-13 11:59:46 +00:00
|
|
|
else:
|
|
|
|
raise Exception(f"Unsupported platform: {system}")
|
|
|
|
|
|
|
|
# Wait for the process to complete.
|
|
|
|
process.wait()
|
|
|
|
|
|
|
|
# Get the output and error from the process.
|
|
|
|
output, error = process.communicate()
|
|
|
|
except Exception as ex:
|
|
|
|
# Stop the timer.
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
error_message = f"Error executing Python code: {ex}"
|
2024-12-19 12:48:57 +00:00
|
|
|
error_json = {
|
|
|
|
"output": "<div class='text-red-500'>"
|
|
|
|
+ error_message
|
|
|
|
+ "\n"
|
|
|
|
+ get_trace_exception(ex)
|
|
|
|
+ "</div>",
|
|
|
|
"execution_time": execution_time,
|
|
|
|
}
|
2024-07-13 11:59:46 +00:00
|
|
|
return error_json
|
|
|
|
|
|
|
|
# Stop the timer.
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
|
|
|
|
# Check if the process was successful.
|
|
|
|
if process.returncode != 0:
|
|
|
|
# The child process threw an exception.
|
|
|
|
try:
|
|
|
|
error_message = f"Output: {output.decode('utf-8', errors='ignore')}\nError executing Python code:\n{error.decode('utf-8', errors='ignore')}"
|
|
|
|
except:
|
|
|
|
error_message = f"Error executing Python code:\n{error}"
|
2024-12-19 12:48:57 +00:00
|
|
|
error_json = {
|
|
|
|
"output": "<div class='text-red-500'>" + error_message + "</div>",
|
|
|
|
"execution_time": execution_time,
|
|
|
|
}
|
2024-07-13 11:59:46 +00:00
|
|
|
return error_json
|
|
|
|
|
|
|
|
# The child process was successful.
|
2024-09-29 10:16:48 +00:00
|
|
|
if output:
|
2024-12-19 12:48:57 +00:00
|
|
|
output_json = {
|
|
|
|
"output": output.decode("utf8"),
|
|
|
|
"execution_time": execution_time,
|
|
|
|
}
|
2024-09-29 10:16:48 +00:00
|
|
|
else:
|
|
|
|
output_json = {"output": "", "execution_time": execution_time}
|
2024-07-13 11:59:46 +00:00
|
|
|
return output_json
|
|
|
|
|
|
|
|
return spawn_process(code)
|
|
|
|
|
|
|
|
|
2024-12-19 12:48:57 +00:00
|
|
|
def execute_python_old(code, client: Client, message_id, build_file=True):
|
2024-01-07 22:17:22 +00:00
|
|
|
def spawn_process(code):
|
|
|
|
"""Executes Python code and returns the output as JSON."""
|
|
|
|
|
|
|
|
# Start the timer.
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
# Create a temporary file.
|
2024-02-28 00:06:59 +00:00
|
|
|
root_folder = client.discussion.discussion_folder
|
2024-12-19 12:48:57 +00:00
|
|
|
root_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
tmp_file = root_folder / f"ai_code_{message_id}.py"
|
|
|
|
with open(tmp_file, "w", encoding="utf8") as f:
|
2024-01-07 22:17:22 +00:00
|
|
|
f.write(code)
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Execute the Python code in a temporary file.
|
|
|
|
process = subprocess.Popen(
|
|
|
|
["python", str(tmp_file)],
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.PIPE,
|
2024-12-19 12:48:57 +00:00
|
|
|
cwd=root_folder,
|
2024-01-07 22:17:22 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
# Get the output and error from the process.
|
|
|
|
output, error = process.communicate()
|
|
|
|
except Exception as ex:
|
|
|
|
# Stop the timer.
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
error_message = f"Error executing Python code: {ex}"
|
2024-12-19 12:48:57 +00:00
|
|
|
error_json = {
|
|
|
|
"output": "<div class='text-red-500'>"
|
|
|
|
+ ex
|
|
|
|
+ "\n"
|
|
|
|
+ get_trace_exception(ex)
|
|
|
|
+ "</div>",
|
|
|
|
"execution_time": execution_time,
|
|
|
|
}
|
2024-01-07 23:22:23 +00:00
|
|
|
return error_json
|
2024-01-07 22:17:22 +00:00
|
|
|
|
|
|
|
# Stop the timer.
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
|
|
|
|
# Check if the process was successful.
|
|
|
|
if process.returncode != 0:
|
|
|
|
# The child process threw an exception.
|
2024-04-09 21:56:48 +00:00
|
|
|
try:
|
2024-04-10 07:20:15 +00:00
|
|
|
error_message = f"Output:{output.decode('utf-8', errors='ignore')}\nError executing Python code:\n{error.decode('utf-8', errors='ignore')}"
|
2024-04-09 21:56:48 +00:00
|
|
|
except:
|
2024-04-10 07:15:32 +00:00
|
|
|
error_message = f"Error executing Python code:\n{error}"
|
2024-12-19 12:48:57 +00:00
|
|
|
error_json = {
|
|
|
|
"output": "<div class='text-red-500'>" + error_message + "</div>",
|
|
|
|
"execution_time": execution_time,
|
|
|
|
}
|
2024-01-07 23:22:23 +00:00
|
|
|
return error_json
|
2024-01-07 22:17:22 +00:00
|
|
|
|
|
|
|
# The child process was successful.
|
2024-12-19 12:48:57 +00:00
|
|
|
output_json = {
|
|
|
|
"output": output.decode("utf8"),
|
|
|
|
"execution_time": execution_time,
|
|
|
|
}
|
2024-01-07 23:22:23 +00:00
|
|
|
return output_json
|
2024-12-19 12:48:57 +00:00
|
|
|
|
2024-01-07 22:17:22 +00:00
|
|
|
return spawn_process(code)
|
2024-07-13 11:59:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
def create_and_execute_script(code, message_id, root_folder):
|
|
|
|
try:
|
|
|
|
# Ensure the root folder exists
|
|
|
|
root_folder = Path(root_folder)
|
|
|
|
root_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
# Create the temporary Python file
|
|
|
|
tmp_file = root_folder / f"ai_code_{message_id}.py"
|
|
|
|
with open(tmp_file, "w", encoding="utf8") as f:
|
|
|
|
f.write(code)
|
|
|
|
|
|
|
|
# Determine the platform and open a terminal to execute the Python code
|
|
|
|
system = platform.system()
|
|
|
|
if system == "Windows":
|
2024-12-19 12:48:57 +00:00
|
|
|
subprocess.Popen(
|
|
|
|
f"""start cmd /k "cd /d "{root_folder}" && python "{tmp_file}" && pause" """,
|
|
|
|
shell=True,
|
|
|
|
)
|
2024-07-13 11:59:46 +00:00
|
|
|
elif system == "Darwin": # macOS
|
2024-12-19 12:48:57 +00:00
|
|
|
subprocess.Popen(
|
|
|
|
[
|
|
|
|
"open",
|
|
|
|
"-a",
|
|
|
|
"Terminal",
|
|
|
|
f'cd "{root_folder}" && python "{tmp_file}"',
|
|
|
|
],
|
|
|
|
shell=True,
|
|
|
|
)
|
2024-07-13 11:59:46 +00:00
|
|
|
elif system == "Linux":
|
2024-12-19 12:48:57 +00:00
|
|
|
subprocess.Popen(
|
|
|
|
[
|
|
|
|
"x-terminal-emulator",
|
|
|
|
"-e",
|
|
|
|
f'bash -c "cd \\"{root_folder}\\" && python \\"{tmp_file}\\"; exec bash"',
|
|
|
|
],
|
|
|
|
shell=True,
|
|
|
|
)
|
2024-07-13 11:59:46 +00:00
|
|
|
else:
|
|
|
|
raise Exception(f"Unsupported platform: {system}")
|
|
|
|
|
|
|
|
except Exception as ex:
|
|
|
|
error_message = f"Error executing Python code: {ex}"
|
2024-12-19 12:48:57 +00:00
|
|
|
error_json = {
|
|
|
|
"output": "<div class='text-red-500'>"
|
|
|
|
+ error_message
|
|
|
|
+ "\n"
|
|
|
|
+ get_trace_exception(ex)
|
|
|
|
+ "</div>",
|
|
|
|
"execution_time": 0,
|
|
|
|
}
|
2024-07-13 11:59:46 +00:00
|
|
|
print(error_json)
|
|
|
|
|
2024-12-19 12:48:57 +00:00
|
|
|
|
2024-07-13 11:59:46 +00:00
|
|
|
if __name__ == "__main__":
|
|
|
|
code = "print('Hello world');input('hi')"
|
|
|
|
message_id = 102
|
|
|
|
root_folder = r"E:\lollms\discussion_databases\html stuff\105"
|
2024-12-19 12:48:57 +00:00
|
|
|
create_and_execute_script(code, message_id, root_folder)
|