fixed code injection vulenerabilities

This commit is contained in:
Saifeddine ALOUI 2024-02-15 01:01:55 +01:00
parent 0b51063119
commit 218eb0ed6c
5 changed files with 119 additions and 49 deletions

View File

@ -9,7 +9,7 @@ description:
"""
from fastapi import APIRouter, Request
from lollms_webui import LOLLMSWebUI
from pydantic import BaseModel
from pydantic import BaseModel, Field
from starlette.responses import StreamingResponse
from lollms.types import MSG_TYPE
from lollms.main_config import BaseConfig
@ -25,6 +25,8 @@ import os
import platform
import string
import re
import subprocess
from typing import Optional
# Regular expression pattern to validate file paths
FILE_PATH_REGEX = r'^[a-zA-Z0-9_\-\\\/]+$'
@ -109,8 +111,14 @@ async def execute_code(request: Request):
class OpenCodeFolderInVsCodeRequestModel(BaseModel):
discussion_id: Optional[int] = Field(None, gt=0)
message_id: Optional[int] = Field(None, gt=0)
code: Optional[str]
folder_path: Optional[str]
@router.post("/open_code_folder_in_vs_code")
async def open_code_folder_in_vs_code(request: Request):
async def open_code_folder_in_vs_code(request: OpenCodeFolderInVsCodeRequestModel):
"""
Opens code folder.
@ -119,56 +127,53 @@ async def open_code_folder_in_vs_code(request: Request):
"""
try:
if "discussion_id" in data:
data = (await request.json())
code = data["code"]
discussion_id = int(data.get("discussion_id","unknown_discussion"))
message_id = int(data.get("message_id","unknown_message"))
if request.discussion_id:
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path/"discussions"/f"d_{discussion_id}"
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path/"discussions"/f"d_{request.discussion_id}"
root_folder.mkdir(parents=True,exist_ok=True)
tmp_file = root_folder/f"ai_code_{message_id}.py"
tmp_file = root_folder/f"ai_code_{request.message_id}.py"
with open(tmp_file,"w") as f:
f.write(code)
f.write(request.code)
os.system('code ' + str(root_folder))
elif "folder_path" in data:
if os.path.isdir(root_folder):
subprocess.run(['code', root_folder], check=True)
elif request.folder_path:
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = data["folder_path"]
root_folder = request.folder_path
root_folder.mkdir(parents=True,exist_ok=True)
os.system('code ' + str(root_folder))
if os.path.isdir(root_folder):
subprocess.run(['code', root_folder], check=True)
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
return {"status":False,"error":str(ex)}
return {"status":False,"error":"An error occurred during processing."}
class FilePath(BaseModel):
path: Optional[str] = Field(None, max_length=500)
@router.post("/open_file")
async def open_file(request: Request):
async def open_file(file_path: FilePath):
"""
Opens code in vs code.
:param request: The HTTP request object.
:param file_path: The file path object.
:return: A JSON response with the status of the operation.
"""
try:
data = (await request.json())
# Validate the 'path' parameter
path = data.get('path')
path = file_path.path
if not validate_file_path(path):
return {"status":False,"error":"Invalid file path"}
# Sanitize the 'path' parameter
path = os.path.realpath(path)
# Use parameterized queries to pass the file path as a parameter
os.system(["start", path])
# Use subprocess.Popen to safely open the file
subprocess.Popen(["start", path], shell=True)
return {"status": True, "execution_time": 0}
@ -177,21 +182,24 @@ async def open_file(request: Request):
lollmsElfServer.error(ex)
return {"status":False,"error":str(ex)}
class VSCodeData(BaseModel):
discussion_id: Optional[int] = Field(None, ge=0)
message_id: Optional[int] = Field(None, ge=0)
code: str = Field(...)
@router.post("/open_code_in_vs_code")
async def open_code_in_vs_code(request: Request):
async def open_code_in_vs_code(vs_code_data: VSCodeData):
"""
Opens code in vs code.
:param request: The HTTP request object.
:param vs_code_data: The data object.
:return: A JSON response with the status of the operation.
"""
try:
data = (await request.json())
discussion_id = int(data.get("discussion_id","unknown_discussion"))
message_id = int(data.get("message_id",""))
code = data["code"]
discussion_id = vs_code_data.discussion_id
message_id = vs_code_data.message_id
code = vs_code_data.code
ASCIIColors.info("Opening folder:")
# Create a temporary file.
@ -200,41 +208,46 @@ async def open_code_in_vs_code(request: Request):
tmp_file = root_folder/f"ai_code_{message_id}.py"
with open(tmp_file,"w") as f:
f.write(code)
os.system('code ' + str(root_folder))
# Use subprocess.Popen to safely open the file
subprocess.Popen(["code", str(root_folder)], shell=True)
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
return {"status":False,"error":str(ex)}
class FolderRequest(BaseModel):
discussion_id: Optional[int] = Field(None, title="The discussion ID")
folder_path: Optional[str] = Field(None, title="The folder path")
@router.post("/open_code_folder")
async def open_code_folder(request: Request):
async def open_code_folder(request: FolderRequest):
"""
Opens code folder.
:param request: The HTTP request object.
:return: A JSON response with the status of the operation.
"""
try:
data = (await request.json())
if "discussion_id" in data:
discussion_id = int(data.get("discussion_id", "unknown_discussion"))
if request.discussion_id:
discussion_id = request.discussion_id
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path / "discussions" / f"d_{discussion_id}"
root_folder.mkdir(parents=True, exist_ok=True)
if platform.system() == 'Windows':
os.startfile(str(root_folder))
subprocess.run(['start', str(root_folder)], check=True, shell=True)
elif platform.system() == 'Linux':
os.system('xdg-open ' + str(root_folder))
subprocess.run(['xdg-open', str(root_folder)], check=True)
elif platform.system() == 'Darwin':
os.system('open ' + str(root_folder))
subprocess.run(['open', str(root_folder)], check=True)
return {"status": True, "execution_time": 0}
elif "folder_path" in data:
folder_path = os.path.realpath(data["folder_path"])
elif request.folder_path:
folder_path = os.path.realpath(request.folder_path)
# Verify that this is a file and not an executable
root_folder = Path(folder_path)
is_valid_folder_path = root_folder.is_dir()
@ -246,18 +259,17 @@ async def open_code_folder(request: Request):
# Create a temporary file.
root_folder.mkdir(parents=True, exist_ok=True)
if platform.system() == 'Windows':
os.startfile(str(root_folder))
subprocess.run(['start', str(root_folder)], check=True, shell=True)
elif platform.system() == 'Linux':
os.system('xdg-open ' + str(root_folder))
subprocess.run(['xdg-open', str(root_folder)], check=True)
elif platform.system() == 'Darwin':
os.system('open ' + str(root_folder))
subprocess.run(['open', str(root_folder)], check=True)
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
return {"status": False, "error": str(ex)}
return {"status": False, "error": "An error occurred while processing the request"}
@router.get("/start_recording")
def start_recording():

View File

@ -30,5 +30,5 @@ python-socketio
python-socketio[client]
python-socketio[asyncio_client]
pydantic
selenium

View File

@ -30,5 +30,5 @@ python-socketio
python-socketio[client]
python-socketio[asyncio_client]
pydantic
selenium

View File

@ -0,0 +1,43 @@
from fastapi.testclient import TestClient
from main import app # Replace with the actual name of your FastAPI app
import json
client = TestClient(app)
def test_open_code_in_vs_code_valid():
response = client.post(
"/open_code_in_vs_code",
data=json.dumps({
"discussion_id": 1,
"message_id": 1,
"code": "print('Hello, World!')"
}),
headers={"content-type": "application/json"},
)
assert response.status_code == 200
assert response.json()["status"] == True
def test_open_code_in_vs_code_invalid():
response = client.post(
"/open_code_in_vs_code",
data=json.dumps({
"discussion_id": "1; copy file.exe /some/path/",
"message_id": "1",
"code": "print('Hello, World!')"
}),
headers={"content-type": "application/json"},
)
assert response.status_code == 422 # Unprocessable Entity
def test_open_code_in_vs_code_attack():
response = client.post(
"/open_code_in_vs_code",
data=json.dumps({
"discussion_id": 1,
"message_id": 1,
"code": "print('This is a harmless test.')" # Dangerous code
}),
headers={"content-type": "application/json"},
)
assert response.status_code == 200
assert response.json()["status"] == False # The code should not be executed

View File

@ -1130,6 +1130,21 @@
>
</td>
</tr>
<tr>
<td style="min-width: 200px;">
<label for="db_path" class="text-sm font-bold" style="margin-right: 1rem;">Activate headless server mode (deactivates all code exectuion to protect the PC from attacks):</label>
</td>
<td style="width: 100%;">
<input
type="checkbox"
id="headless_server_mode"
required
v-model="configFile.headless_server_mode"
@change="settingsChanged=true"
class="w-full w-full mt-1 px-2 py-1 border border-gray-300 rounded dark:bg-gray-600 dark:bg-gray-600"
>
</td>
</tr>
</table>
</Card>