diff --git a/gns3server/handlers/api/file_handler.py b/gns3server/handlers/api/file_handler.py index 4342f6c6..b6d88c40 100644 --- a/gns3server/handlers/api/file_handler.py +++ b/gns3server/handlers/api/file_handler.py @@ -37,6 +37,9 @@ class FileHandler: def read(request, response): response.enable_chunked_encoding() + if not request.json.get("location").endswith(".pcap"): + raise aiohttp.web.HTTPForbidden(text="Only .pcap file are allowed") + try: with open(request.json.get("location"), "rb") as f: loop = asyncio.get_event_loop() diff --git a/tests/handlers/api/test_file.py b/tests/handlers/api/test_file.py index 0b110b2e..a524977d 100644 --- a/tests/handlers/api/test_file.py +++ b/tests/handlers/api/test_file.py @@ -27,15 +27,15 @@ from gns3server.version import __version__ def test_stream(server, tmpdir, loop): - with open(str(tmpdir / "test"), 'w+') as f: + with open(str(tmpdir / "test.pcap"), 'w+') as f: f.write("hello") def go(future): - query = json.dumps({"location": str(tmpdir / "test")}) + query = json.dumps({"location": str(tmpdir / "test.pcap")}) headers = {'content-type': 'application/json'} response = yield from aiohttp.request("GET", server.get_url("/files/stream", 1), data=query, headers=headers) response.body = yield from response.content.read(5) - with open(str(tmpdir / "test"), 'a') as f: + with open(str(tmpdir / "test.pcap"), 'a') as f: f.write("world") response.body += yield from response.content.read(5) response.close() @@ -48,7 +48,8 @@ def test_stream(server, tmpdir, loop): assert response.body == b'helloworld' -def test_stream_file_not_found(server, tmpdir, loop): + +def test_stream_file_not_pcap(server, tmpdir, loop): def go(future): query = json.dumps({"location": str(tmpdir / "test")}) headers = {'content-type': 'application/json'} @@ -56,6 +57,20 @@ def test_stream_file_not_found(server, tmpdir, loop): response.close() future.set_result(response) + future = asyncio.Future() + asyncio.async(go(future)) + response = loop.run_until_complete(future) + assert response.status == 403 + + +def test_stream_file_not_found(server, tmpdir, loop): + def go(future): + query = json.dumps({"location": str(tmpdir / "test.pcap")}) + headers = {'content-type': 'application/json'} + response = yield from aiohttp.request("GET", server.get_url("/files/stream", 1), data=query, headers=headers) + response.close() + future.set_result(response) + future = asyncio.Future() asyncio.async(go(future)) response = loop.run_until_complete(future)