Path traversal vulenerability fix

This commit is contained in:
Saifeddine ALOUI 2024-02-15 00:35:04 +01:00
parent e9541dc91b
commit 0b51063119
10 changed files with 204 additions and 76 deletions

55
app.py
View File

@ -22,23 +22,10 @@ import argparse
from socketio import ASGIApp
import webbrowser
import threading
import os
app = FastAPI()
# Create a Socket.IO server
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*", ping_timeout=1200, ping_interval=30) # Enable CORS for all origins
@sio.event
async def disconnect(sid):
ASCIIColors.yellow(f"Disconnected: {sid}")
@sio.event
async def message(sid, data):
ASCIIColors.yellow(f"Message from {sid}: {data}")
await sio.send(sid, "Message received!")
#app.mount("/socket.io", StaticFiles(directory="path/to/socketio.js"))
@ -64,9 +51,21 @@ if __name__ == "__main__":
if args.port:
config.port=args.port
cert_file_path = lollms_paths.personal_certificates/"cert.pem"
key_file_path = lollms_paths.personal_certificates/"key.pem"
if os.path.exists(cert_file_path) and os.path.exists(key_file_path):
is_https = True
else:
is_https = False
# Create a Socket.IO server
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins=config.allowed_origins+[f"https://localhost:{config['port']}" if is_https else f"http://localhost:{config['port']}"], ping_timeout=1200, ping_interval=30) # Enable CORS for selected origins
LOLLMSWebUI.build_instance(config=config, lollms_paths=lollms_paths, args=args, sio=sio)
lollmsElfServer:LOLLMSWebUI = LOLLMSWebUI.get_instance()
lollmsElfServer.verbose = True
# Import all endpoints
from lollms.server.endpoints.lollms_binding_files_server import router as lollms_binding_files_server_router
from lollms.server.endpoints.lollms_infos import router as lollms_infos_router
@ -142,6 +141,18 @@ if __name__ == "__main__":
app.include_router(lollms_configuration_infos_router)
@sio.event
async def disconnect(sid):
ASCIIColors.yellow(f"Disconnected: {sid}")
@sio.event
async def message(sid, data):
ASCIIColors.yellow(f"Message from {sid}: {data}")
await sio.send(sid, "Message received!")
lollms_generation_events_add(sio)
lollms_personality_events_add(sio)
lollms_files_events_add(sio)
@ -158,6 +169,9 @@ if __name__ == "__main__":
app.mount("/playground", StaticFiles(directory=Path(__file__).parent/"web"/"dist", html=True), name="playground")
app.mount("/settings", StaticFiles(directory=Path(__file__).parent/"web"/"dist", html=True), name="settings")
app.mount("/", StaticFiles(directory=Path(__file__).parent/"web"/"dist", html=True), name="static")
app = ASGIApp(socketio_server=sio, other_asgi_app=app)
lollmsElfServer.app = app
@ -179,15 +193,20 @@ if __name__ == "__main__":
# thread.start()
# if autoshow
if config.auto_show_browser:
if config.auto_show_browser and not config.headless_server_mode:
if config['host']=="0.0.0.0":
webbrowser.open(f"http://localhost:{config['port']}")
webbrowser.open(f"https://localhost:{config['port']}" if is_https else f"http://localhost:{config['port']}")
#webbrowser.open(f"http://localhost:{6523}") # needed for debug (to be removed in production)
else:
webbrowser.open(f"http://{config['host']}:{config['port']}")
webbrowser.open(f"https://{config['host']}:{config['port']}" if is_https else f"http://{config['host']}:{config['port']}")
#webbrowser.open(f"http://{config['host']}:{6523}") # needed for debug (to be removed in production)
uvicorn.run(app, host=config.host, port=config.port)
if is_https:
uvicorn.run(app, host=config.host, port=config.port, ssl_certfile=cert_file_path, ssl_keyfile=key_file_path)
else:
uvicorn.run(app, host=config.host, port=config.port)
except Exception as ex:
trace_exception(ex)

View File

@ -1,9 +1,10 @@
# =================== Lord Of Large Language Multimodal Systems Configuration file ===========================
version: 56
version: 58
binding_name: null
model_name: null
headless_server_mode: False
allowed_origins: []
# Host information
host: localhost

View File

@ -23,6 +23,15 @@ from fastapi import FastAPI, UploadFile, File
import shutil
import os
import platform
import string
import re
# Regular expression pattern to validate file paths
FILE_PATH_REGEX = r'^[a-zA-Z0-9_\-\\\/]+$'
# Function to validate file paths using the regex pattern
def validate_file_path(path):
return re.match(FILE_PATH_REGEX, path)
from utilities.execution_engines.python_execution_engine import execute_python
from utilities.execution_engines.latex_execution_engine import execute_latex
@ -48,12 +57,17 @@ async def execute_code(request: Request):
:param request: The HTTP request object.
:return: A JSON response with the status of the operation.
"""
if lollmsElfServer.config.headless_server_mode:
return {"status":False,"error":"Code execution is blocked when in headless mode for obvious security reasons!"}
if lollmsElfServer.config.host=="0.0.0.0":
return {"status":False,"error":"Code execution is blocked when the server is exposed outside for very obvipous reasons!"}
try:
data = (await request.json())
code = data["code"]
discussion_id = data.get("discussion_id","unknown_discussion")
message_id = data.get("message_id","unknown_message")
discussion_id = int(data.get("discussion_id","unknown_discussion"))
message_id = int(data.get("message_id","unknown_message"))
language = data.get("language","python")
@ -87,37 +101,7 @@ async def execute_code(request: Request):
ASCIIColors.info("Executing graphviz code:")
ASCIIColors.yellow(code)
return execute_graphviz(code, discussion_id, message_id)
return {"output": "Unsupported language", "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
return {"status":False,"error":str(ex)}
@router.post("/open_code_folder")
async def open_code_folder(request: Request):
"""
Opens code folder.
:param request: The HTTP request object.
:return: A JSON response with the status of the operation.
"""
try:
data = (await request.json())
discussion_id = data.get("discussion_id","unknown_discussion")
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path/"discussions"/f"d_{discussion_id}"
root_folder.mkdir(parents=True,exist_ok=True)
if platform.system() == 'Windows':
os.startfile(str(root_folder))
elif platform.system() == 'Linux':
os.system('xdg-open ' + str(root_folder))
elif platform.system() == 'Darwin':
os.system('open ' + str(root_folder))
return {"output": "OK", "execution_time": 0}
return {"status": False, "error": "Unsupported language", "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
@ -138,9 +122,8 @@ async def open_code_folder_in_vs_code(request: Request):
if "discussion_id" in data:
data = (await request.json())
code = data["code"]
discussion_id = data.get("discussion_id","unknown_discussion")
message_id = data.get("message_id","unknown_message")
language = data.get("language","python")
discussion_id = int(data.get("discussion_id","unknown_discussion"))
message_id = int(data.get("message_id","unknown_message"))
ASCIIColors.info("Opening folder:")
# Create a temporary file.
@ -158,7 +141,7 @@ async def open_code_folder_in_vs_code(request: Request):
root_folder.mkdir(parents=True,exist_ok=True)
os.system('code ' + str(root_folder))
return {"output": "OK", "execution_time": 0}
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
@ -175,9 +158,20 @@ async def open_file(request: Request):
try:
data = (await request.json())
# Validate the 'path' parameter
path = data.get('path')
os.system("start "+path)
return {"output": "OK", "execution_time": 0}
if not validate_file_path(path):
return {"status":False,"error":"Invalid file path"}
# Sanitize the 'path' parameter
path = os.path.realpath(path)
# Use parameterized queries to pass the file path as a parameter
os.system(["start", path])
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
@ -195,22 +189,19 @@ async def open_code_in_vs_code(request: Request):
try:
data = (await request.json())
discussion_id = data.get("discussion_id","unknown_discussion")
message_id = data.get("message_id","")
discussion_id = int(data.get("discussion_id","unknown_discussion"))
message_id = int(data.get("message_id",""))
code = data["code"]
discussion_id = data.get("discussion_id","unknown_discussion")
message_id = data.get("message_id","unknown_message")
language = data.get("language","python")
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path/"discussions"/f"d_{discussion_id}"/f"{message_id}.py"
root_folder = Path(os.path.realpath(lollmsElfServer.lollms_paths.personal_outputs_path/"discussions"/f"d_{discussion_id}"/f"{message_id}.py"))
root_folder.mkdir(parents=True,exist_ok=True)
tmp_file = root_folder/f"ai_code_{message_id}.py"
with open(tmp_file,"w") as f:
f.write(code)
os.system('code ' + str(root_folder))
return {"output": "OK", "execution_time": 0}
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
@ -229,36 +220,43 @@ async def open_code_folder(request: Request):
try:
data = (await request.json())
if "discussion_id" in data:
discussion_id = data.get("discussion_id","unknown_discussion")
discussion_id = int(data.get("discussion_id", "unknown_discussion"))
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path/"discussions"/f"d_{discussion_id}"
root_folder.mkdir(parents=True,exist_ok=True)
root_folder = lollmsElfServer.lollms_paths.personal_outputs_path / "discussions" / f"d_{discussion_id}"
root_folder.mkdir(parents=True, exist_ok=True)
if platform.system() == 'Windows':
os.startfile(str(root_folder))
elif platform.system() == 'Linux':
os.system('xdg-open ' + str(root_folder))
elif platform.system() == 'Darwin':
os.system('open ' + str(root_folder))
return {"output": "OK", "execution_time": 0}
return {"status": True, "execution_time": 0}
elif "folder_path" in data:
folder_path = os.path.realpath(data["folder_path"])
# Verify that this is a file and not an executable
root_folder = Path(folder_path)
is_valid_folder_path = root_folder.is_dir()
if not is_valid_folder_path:
return {"status":False, "error":"Invalid folder path"}
ASCIIColors.info("Opening folder:")
# Create a temporary file.
root_folder = data["folder_path"]
root_folder.mkdir(parents=True,exist_ok=True)
root_folder.mkdir(parents=True, exist_ok=True)
if platform.system() == 'Windows':
os.startfile(str(root_folder))
elif platform.system() == 'Linux':
os.system('xdg-open ' + str(root_folder))
elif platform.system() == 'Darwin':
os.system('open ' + str(root_folder))
return {"output": "OK", "execution_time": 0}
return {"status": True, "execution_time": 0}
except Exception as ex:
trace_exception(ex)
lollmsElfServer.error(ex)
return {"status":False,"error":str(ex)}
return {"status": False, "error": str(ex)}
@router.get("/start_recording")

@ -1 +1 @@
Subproject commit ab03d2348fe546b1bf2947eeb6b8a0efc444b2b4
Subproject commit 002102b5c9d7884074f501d95978e792f7534bd6

View File

@ -0,0 +1,29 @@
"""
This code is attempting to perform a path traversal attack on your endpoint. Here's how it works:
The code imports the requests library, which is commonly used for making HTTP requests.
It defines the URL of your endpoint as url = 'http://localhost:9600/upload_avatar'.
It specifies the path to the file you want to upload as file_path = 'test.txt'.
It opens the file in binary mode using open(file_path, 'rb') and assigns it to the variable f.
It creates a dictionary called files with a single key-value pair. The key is 'avatar', which corresponds to the name of the file input field in your endpoint. The value is a tuple containing the file path and the file object f. In this case, the file path is '../../../../../../../../tmp/teeest.txt', which attempts to traverse up multiple levels in the directory structure and access the file located in /tmp/teeest.txt.
It sends a POST request to your endpoint with the files dictionary as the files parameter, which includes the file path traversal attempt.
Finally, it prints the response from the server.
This code is trying to exploit the path traversal vulnerability in your endpoint by specifying a file path that includes multiple ../ sequences to traverse up the directory structure and access a file outside of the intended directory.
To protect against this type of attack, you should implement the measures I mentioned earlier, such as file validation, randomized file names, and using a dedicated directory for storing uploaded files. Additionally, make sure to sanitize and validate all user input to prevent malicious file paths from being accepted.
"""
import requests
# Endpoint URL
url = 'http://localhost:9600/upload_avatar'
# Path to the file you want to upload
file_path = 'test.txt'
# Open the file in binary mode and send it as 'avatar'
with open(file_path, 'rb') as f:
files = {'avatar': ('../../../../../../../../tmp/teeest.txt', f)}
response = requests.post(url, files=files)
# Print the response from the server
print(response.json())

View File

@ -0,0 +1,18 @@
import requests
# Set the URL of the endpoint
url = "http://localhost:9600/open_code_folder"
# Valid folder path
valid_folder_path = {"folder_path": "c:"}
# Non-valid executable file path
non_valid_file_path = {"folder_path": "c:/file.exe"}
# Send the valid folder path request
valid_response = requests.post(url, json=valid_folder_path)
print(valid_response.json())
# Send the non-valid executable file path request
non_valid_response = requests.post(url, json=non_valid_file_path)
print(non_valid_response.json())

View File

@ -0,0 +1,21 @@
import requests
url = "http://localhost:9600/open_code_in_vs_code"
# Valid request
valid_payload = {
"discussion_id": "0",
"message_id": "0",
"code": "print('test')"
}
response = requests.post(url, json=valid_payload)
print("Valid request response:", response.json())
# Invalid request
invalid_payload = {
"discussion_id": "../invalid_discussion",
"message_id": "../invalid_message",
"code": "print('test')"
}
response = requests.post(url, json=invalid_payload)
print("Invalid request response:", response.json())

View File

@ -0,0 +1,40 @@
import os
import requests
def test_vulnerabilities():
"""
This function tests for two vulnerabilities in the '/open_file' endpoint.
1. Path Traversal vulnerability: It attempts to access sensitive files by providing a payload with multiple "../" to traverse directories.
2. Command Injection vulnerability: It attempts to execute additional commands by appending a payload with "&&" and an echo statement.
:return: None
"""
# Test path traversal vulnerability
payload = "../../../../../../../../../../etc/passwd"
url = "http://localhost:9600/open_file"
data = {"path": payload}
response = requests.post(url, json=data)
if response.status_code == 200:
if "root:x" in response.text:
print("Path Traversal vulnerability found!")
else:
print("Path Traversal vulnerability not found.")
else:
print("Error: ", response.status_code)
# Test command injection vulnerability
payload = "&& echo 'Command Injection Test'"
data = {"path": payload}
response = requests.post(url, json=data)
if response.status_code == 200:
if "Command Injection Test" in response.text:
print("Command Injection vulnerability found!")
else:
print("Command Injection vulnerability not found.")
else:
print("Error: ", response.status_code)
# Call the test function
test_vulnerabilities()

1
tests/pentests/test.txt Normal file
View File

@ -0,0 +1 @@
this is a test file to simulate an attack on lollms avatar

View File

@ -2815,6 +2815,7 @@ export default {
axios.post('/upload_avatar', formData)
.then(response => {
console.log("Avatar uploaded successfully")
this.$store.state.toast.showToast("Avatar uploaded successfully!", 4, true)
// Assuming the server responds with the file name after successful upload
const fileName = response.data.fileName;