diff --git a/gns3server/api/routes/controller/links.py b/gns3server/api/routes/controller/links.py index 7264a941..8c438de2 100644 --- a/gns3server/api/routes/controller/links.py +++ b/gns3server/api/routes/controller/links.py @@ -285,3 +285,58 @@ async def stream_pcap(request: Request, link: Link = Depends(dep_link)) -> Strea raise ControllerError(f"Client error received when receiving pcap stream from compute: {e}") return StreamingResponse(compute_pcap_stream(), media_type="application/vnd.tcpdump.pcap") + + +@router.get( + "/{link_id}/iface", + response_model=dict[str, schemas.UdpPort | schemas.EthernetPort], + dependencies=[Depends(has_privilege("Link.Audit"))] +) +async def get_iface(link: Link = Depends(dep_link)) -> dict[str, schemas.UdpPort | schemas.EthernetPort]: + """ + Return iface info for links to Cloud or NAT devices. + + Required privilege: Link.Audit + """ + ifaces_info = {} + for node_data in link._nodes: + node = node_data["node"] + if node.node_type not in ("cloud", "nat"): + continue + + port_number = node_data["port_number"] + + compute = node.compute + project_id = link.project.id + response = await compute.get( + f"/projects/{project_id}/{node.node_type}/nodes/{node.id}" + ) + node_info = response.json + + if "ports_mapping" not in node_info: + continue + ports_mapping = node_info["ports_mapping"] + + for port in ports_mapping: + port_num = port.get("port_number") + + if port_num and int(port_num) == int(port_number): + port_type = port.get("type", "") + if "udp" in port_type.lower(): + ifaces_info[node.id] = { + "type": f"{port_type}", + "rhost": port["rhost"], + "lport": port["lport"], + "rport": port["rport"], + } + else: + ifaces_info[node.id] = { + "type": f"{port_type}", + "interface": port["interface"], + } + + if not ifaces_info: + raise ControllerError( + "Link not connected to Cloud/NAT" + ) + return ifaces_info diff --git a/gns3server/schemas/__init__.py b/gns3server/schemas/__init__.py index d331de54..a4fb297d 100644 --- a/gns3server/schemas/__init__.py +++ b/gns3server/schemas/__init__.py @@ -20,7 +20,7 @@ from .common import ErrorMessage from .version import Version # Controller schemas -from .controller.links import LinkCreate, LinkUpdate, Link +from .controller.links import LinkCreate, LinkUpdate, Link, UdpPort, EthernetPort from .controller.computes import ComputeCreate, ComputeUpdate, ComputeVirtualBoxVM, ComputeVMwareVM, ComputeDockerImage, AutoIdlePC, Compute from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template from .controller.images import Image, ImageType diff --git a/gns3server/schemas/controller/links.py b/gns3server/schemas/controller/links.py index fd853e2f..ac82c0d7 100644 --- a/gns3server/schemas/controller/links.py +++ b/gns3server/schemas/controller/links.py @@ -92,3 +92,23 @@ class Link(LinkBase): None, description="Read only property. The compute identifier where a capture is running" ) + + +class UdpPort(BaseModel): + """ + UDP port information. + """ + + lport: int + rhost: str + rport: int + type: str + + +class EthernetPort(BaseModel): + """ + Ethernet port information. + """ + + interface: str + type: str diff --git a/tests/api/routes/controller/test_links.py b/tests/api/routes/controller/test_links.py index 2103ddaa..a5aea4d5 100644 --- a/tests/api/routes/controller/test_links.py +++ b/tests/api/routes/controller/test_links.py @@ -423,3 +423,86 @@ class TestLinkRoutes: assert mock.called assert response.status_code == status.HTTP_200_OK assert response.json() == FILTERS + + async def test_get_udp_interface(self, app: FastAPI, client: AsyncClient, project: Project) -> None: + """ + Test getting UDP tunnel interface information from a link. + """ + link = Link(project) + project._links = {link.id: link} + + cloud_node = MagicMock() + cloud_node.node_type = "cloud" + cloud_node.id = "cloud-node-id" + cloud_node.name = "Cloud1" + + compute = MagicMock() + response = MagicMock() + response.json = { + "ports_mapping": [ + { + "port_number": 1, + "type": "udp", + "lport": 20000, + "rhost": "127.0.0.1", + "rport": 30000, + "name": "UDP tunnel 1" + } + ] + } + compute.get = AsyncioMagicMock(return_value=response) + cloud_node.compute = compute + + link._nodes = [{"node": cloud_node, "port_number": 1}] + + response = await client.get(app.url_path_for("get_iface", project_id=project.id, link_id=link.id)) + + assert response.status_code == status.HTTP_200_OK + result = response.json() + + assert "cloud-node-id" in result + + udp_info = result["cloud-node-id"] + assert udp_info["lport"] == 20000 + assert udp_info["rhost"] == "127.0.0.1" + assert udp_info["rport"] == 30000 + assert udp_info["type"] == "udp" + async def test_get_ethernet_interface(self, app: FastAPI, client: AsyncClient, project: Project) -> None: + """ + Test getting ethernet interface information from a link. + """ + link = Link(project) + project._links = {link.id: link} + + cloud_node = MagicMock() + cloud_node.node_type = "cloud" + cloud_node.id = "cloud-node-id" + cloud_node.name = "Cloud1" + + compute = MagicMock() + response = MagicMock() + response.json = { + "ports_mapping": [ + { + "port_number": 1, + "type": "ethernet", + "interface": "eth0", + "name": "Ethernet 1" + } + ] + } + compute.get = AsyncioMagicMock(return_value=response) + cloud_node.compute = compute + + link._nodes = [{"node": cloud_node, "port_number": 1}] + + response = await client.get(app.url_path_for("get_iface", project_id=project.id, link_id=link.id)) + + assert response.status_code == status.HTTP_200_OK + result = response.json() + + assert "cloud-node-id" in result + + interface_info = result["cloud-node-id"] + assert interface_info["interface"] == "eth0" + assert interface_info["type"] == "ethernet" \ No newline at end of file