Change method to prevent forbidden directory traversal. Ref

This commit is contained in:
grossmj 2021-05-16 14:29:56 +09:30
parent f3d81fa450
commit 2bf16f1e5f
8 changed files with 104 additions and 17 deletions

View File

@ -19,6 +19,7 @@ API routes for projects.
"""
import os
import urllib.parse
import logging
@ -198,6 +199,7 @@ async def get_compute_project_file(file_path: str, project: Project = Depends(de
Get a file from a project.
"""
file_path = urllib.parse.unquote(file_path)
path = os.path.normpath(file_path)
# Raise error if user try to escape
@ -214,6 +216,7 @@ async def get_compute_project_file(file_path: str, project: Project = Depends(de
@router.post("/projects/{project_id}/files/{file_path:path}", status_code=status.HTTP_204_NO_CONTENT)
async def write_compute_project_file(file_path: str, request: Request, project: Project = Depends(dep_project)) -> None:
file_path = urllib.parse.unquote(file_path)
path = os.path.normpath(file_path)
# Raise error if user try to escape

View File

@ -24,6 +24,7 @@ import tempfile
import zipfile
import aiofiles
import time
import urllib.parse
import logging
@ -369,7 +370,8 @@ async def get_file(file_path: str, project: Project = Depends(dep_project)) -> F
Return a file from a project.
"""
path = os.path.normpath(file_path).strip("/")
file_path = urllib.parse.unquote(file_path)
path = os.path.normpath(file_path)
# Raise error if user try to escape
if not is_safe_path(path, project.path):
@ -388,7 +390,8 @@ async def write_file(file_path: str, request: Request, project: Project = Depend
Write a file from a project.
"""
path = os.path.normpath(file_path).strip("/")
file_path = urllib.parse.unquote(file_path)
path = os.path.normpath(file_path)
# Raise error if user try to escape
if not is_safe_path(path, project.path):

View File

@ -17,6 +17,7 @@
import os
from pathlib import Path
from fastapi import HTTPException, status
from ..config import Config
@ -37,15 +38,14 @@ def get_default_project_directory():
return path
def is_safe_path(file_path, directory):
def is_safe_path(file_path: str, basedir: str) -> bool:
"""
Check that file path is safe.
(the file is stored inside directory or one of its sub-directory)
"""
requested_path = os.path.abspath(file_path)
common_prefix = os.path.commonprefix([requested_path, directory])
return common_prefix != directory
test_path = (Path(basedir) / file_path).resolve()
return Path(basedir).resolve() in test_path.resolve().parents
def check_path_allowed(path):

View File

@ -186,7 +186,23 @@ async def test_upload_image(app: FastAPI, client: AsyncClient, images_dir: str)
assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf"
async def test_download_image_escape(app: FastAPI, client: AsyncClient, tmpdir) -> None:
async def test_upload_image_forbidden_location(app: FastAPI, client: AsyncClient) -> None:
file_path = "%2e%2e/hello"
response = await client.post(app.url_path_for("upload_dynamips_image", filename=file_path), content=b"TEST")
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_download_image(app: FastAPI, client: AsyncClient, images_dir: str) -> None:
response = await client.post(app.url_path_for("upload_dynamips_image", filename="test3"), content=b"TEST")
assert response.status_code == status.HTTP_204_NO_CONTENT
response = await client.get(app.url_path_for("download_dynamips_image", filename="test3"))
assert response.status_code == status.HTTP_200_OK
async def test_download_image_forbidden(app: FastAPI, client: AsyncClient, tmpdir) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("download_dynamips_image", filename=file_path))

View File

@ -415,7 +415,23 @@ async def test_upload_image(app: FastAPI, client: AsyncClient, tmpdir) -> None:
assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf"
async def test_download_image_escape(app: FastAPI, client: AsyncClient, tmpdir) -> None:
async def test_upload_image_forbidden_location(app: FastAPI, client: AsyncClient) -> None:
file_path = "%2e%2e/hello"
response = await client.post(app.url_path_for("upload_dynamips_image", filename=file_path), content=b"TEST")
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_download_image(app: FastAPI, client: AsyncClient, images_dir: str) -> None:
response = await client.post(app.url_path_for("upload_dynamips_image", filename="test3"), content=b"TEST")
assert response.status_code == status.HTTP_204_NO_CONTENT
response = await client.get(app.url_path_for("download_dynamips_image", filename="test3"))
assert response.status_code == status.HTTP_200_OK
async def test_download_image_forbidden(app: FastAPI, client: AsyncClient, tmpdir) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("download_iou_image", filename=file_path))

View File

@ -179,6 +179,21 @@ async def test_get_file(app: FastAPI, client: AsyncClient, config, tmpdir) -> No
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_get_file_forbidden_location(app: FastAPI, client: AsyncClient, config, tmpdir) -> None:
config.settings.Server.projects_path = str(tmpdir)
project = ProjectManager.instance().create_project(project_id="01010203-0405-0607-0809-0a0b0c0d0e0b")
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(
app.url_path_for(
"get_compute_project_file",
project_id=project.id,
file_path=file_path
)
)
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_write_file(app: FastAPI, client: AsyncClient, config, tmpdir) -> None:
config.settings.Server.projects_path = str(tmpdir)
@ -196,3 +211,15 @@ async def test_write_file(app: FastAPI, client: AsyncClient, config, tmpdir) ->
project_id=project.id,
file_path="../hello"))
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_write_file_forbidden_location(app: FastAPI, client: AsyncClient, config, tmpdir) -> None:
config.settings.Server.projects_path = str(tmpdir)
project = ProjectManager.instance().create_project(project_id="01010203-0405-0607-0809-0a0b0c0d0e0b")
file_path = "%2e%2e/hello"
response = await client.post(app.url_path_for("write_compute_project_file",
project_id=project.id,
file_path=file_path), content=b"world")
assert response.status_code == status.HTTP_403_FORBIDDEN

View File

@ -388,15 +388,23 @@ async def test_upload_image_ova(app: FastAPI, client: AsyncClient, tmpdir:str) -
assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf"
async def test_upload_image_forbiden_location(app: FastAPI, client: AsyncClient, tmpdir: str) -> None:
async def test_upload_image_forbidden_location(app: FastAPI, client: AsyncClient, tmpdir: str) -> None:
with patch("gns3server.compute.Qemu.get_images_directory", return_value=str(tmpdir)):
response = await client.post(app.url_path_for("upload_qemu_image",
filename="/qemu/images/../../test2"), content=b"TEST")
assert response.status_code == status.HTTP_403_FORBIDDEN
response = await client.post(app.url_path_for("upload_qemu_image",
filename="/qemu/images/../../test2"), content=b"TEST")
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_download_image_escape(app: FastAPI, client: AsyncClient, tmpdir) -> None:
async def test_download_image(app: FastAPI, client: AsyncClient, images_dir: str) -> None:
response = await client.post(app.url_path_for("upload_qemu_image", filename="test3"), content=b"TEST")
assert response.status_code == status.HTTP_204_NO_CONTENT
response = await client.get(app.url_path_for("download_qemu_image", filename="test3"))
assert response.status_code == status.HTTP_200_OK
async def test_download_image_forbidden_location(app: FastAPI, client: AsyncClient, tmpdir) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("download_qemu_image", filename=file_path))

View File

@ -330,6 +330,13 @@ async def test_get_file(app: FastAPI, client: AsyncClient, project: Project) ->
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_get_file_forbidden_location(app: FastAPI, client: AsyncClient, project: Project) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("get_file", project_id=project.id, file_path=file_path))
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_write_file(app: FastAPI, client: AsyncClient, project: Project) -> None:
response = await client.post(app.url_path_for("write_file", project_id=project.id, file_path="hello"),
@ -343,6 +350,14 @@ async def test_write_file(app: FastAPI, client: AsyncClient, project: Project) -
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_write_file_forbidden_location(app: FastAPI, client: AsyncClient, project: Project) -> None:
file_path = "%2e%2e/hello"
response = await client.post(app.url_path_for("write_file", project_id=project.id, file_path=file_path),
content=b"world")
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_write_and_get_file_with_leading_slashes_in_filename(
app: FastAPI,
client: AsyncClient,
@ -350,11 +365,10 @@ async def test_write_and_get_file_with_leading_slashes_in_filename(
response = await client.post(app.url_path_for("write_file", project_id=project.id, file_path="//hello"),
content=b"world")
assert response.status_code == status.HTTP_204_NO_CONTENT
assert response.status_code == status.HTTP_403_FORBIDDEN
response = await client.get(app.url_path_for("get_file", project_id=project.id, file_path="//hello"))
assert response.status_code == status.HTTP_200_OK
assert response.content == b"world"
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_import(app: FastAPI, client: AsyncClient, tmpdir, controller: Controller) -> None: