Merge pull request #2518 from markparonyan/get-link-ifaces
Some checks failed
testing / build (ubuntu-latest, 3.10) (push) Has been cancelled
testing / build (ubuntu-latest, 3.11) (push) Has been cancelled
testing / build (ubuntu-latest, 3.12) (push) Has been cancelled
testing / build (ubuntu-latest, 3.13) (push) Has been cancelled
testing / build (ubuntu-latest, 3.9) (push) Has been cancelled

Get iface/udp tunnel from links with Cloud,Nat
This commit is contained in:
Jeremy Grossmann 2025-03-23 20:10:23 +07:00 committed by GitHub
commit e532c83538
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 159 additions and 1 deletions

View File

@ -285,3 +285,58 @@ async def stream_pcap(request: Request, link: Link = Depends(dep_link)) -> Strea
raise ControllerError(f"Client error received when receiving pcap stream from compute: {e}")
return StreamingResponse(compute_pcap_stream(), media_type="application/vnd.tcpdump.pcap")
@router.get(
"/{link_id}/iface",
response_model=dict[str, schemas.UdpPort | schemas.EthernetPort],
dependencies=[Depends(has_privilege("Link.Audit"))]
)
async def get_iface(link: Link = Depends(dep_link)) -> dict[str, schemas.UdpPort | schemas.EthernetPort]:
"""
Return iface info for links to Cloud or NAT devices.
Required privilege: Link.Audit
"""
ifaces_info = {}
for node_data in link._nodes:
node = node_data["node"]
if node.node_type not in ("cloud", "nat"):
continue
port_number = node_data["port_number"]
compute = node.compute
project_id = link.project.id
response = await compute.get(
f"/projects/{project_id}/{node.node_type}/nodes/{node.id}"
)
node_info = response.json
if "ports_mapping" not in node_info:
continue
ports_mapping = node_info["ports_mapping"]
for port in ports_mapping:
port_num = port.get("port_number")
if port_num and int(port_num) == int(port_number):
port_type = port.get("type", "")
if "udp" in port_type.lower():
ifaces_info[node.id] = {
"type": f"{port_type}",
"rhost": port["rhost"],
"lport": port["lport"],
"rport": port["rport"],
}
else:
ifaces_info[node.id] = {
"type": f"{port_type}",
"interface": port["interface"],
}
if not ifaces_info:
raise ControllerError(
"Link not connected to Cloud/NAT"
)
return ifaces_info

View File

@ -20,7 +20,7 @@ from .common import ErrorMessage
from .version import Version
# Controller schemas
from .controller.links import LinkCreate, LinkUpdate, Link
from .controller.links import LinkCreate, LinkUpdate, Link, UdpPort, EthernetPort
from .controller.computes import ComputeCreate, ComputeUpdate, ComputeVirtualBoxVM, ComputeVMwareVM, ComputeDockerImage, AutoIdlePC, Compute
from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template
from .controller.images import Image, ImageType

View File

@ -92,3 +92,23 @@ class Link(LinkBase):
None,
description="Read only property. The compute identifier where a capture is running"
)
class UdpPort(BaseModel):
"""
UDP port information.
"""
lport: int
rhost: str
rport: int
type: str
class EthernetPort(BaseModel):
"""
Ethernet port information.
"""
interface: str
type: str

View File

@ -423,3 +423,86 @@ class TestLinkRoutes:
assert mock.called
assert response.status_code == status.HTTP_200_OK
assert response.json() == FILTERS
async def test_get_udp_interface(self, app: FastAPI, client: AsyncClient, project: Project) -> None:
"""
Test getting UDP tunnel interface information from a link.
"""
link = Link(project)
project._links = {link.id: link}
cloud_node = MagicMock()
cloud_node.node_type = "cloud"
cloud_node.id = "cloud-node-id"
cloud_node.name = "Cloud1"
compute = MagicMock()
response = MagicMock()
response.json = {
"ports_mapping": [
{
"port_number": 1,
"type": "udp",
"lport": 20000,
"rhost": "127.0.0.1",
"rport": 30000,
"name": "UDP tunnel 1"
}
]
}
compute.get = AsyncioMagicMock(return_value=response)
cloud_node.compute = compute
link._nodes = [{"node": cloud_node, "port_number": 1}]
response = await client.get(app.url_path_for("get_iface", project_id=project.id, link_id=link.id))
assert response.status_code == status.HTTP_200_OK
result = response.json()
assert "cloud-node-id" in result
udp_info = result["cloud-node-id"]
assert udp_info["lport"] == 20000
assert udp_info["rhost"] == "127.0.0.1"
assert udp_info["rport"] == 30000
assert udp_info["type"] == "udp"
async def test_get_ethernet_interface(self, app: FastAPI, client: AsyncClient, project: Project) -> None:
"""
Test getting ethernet interface information from a link.
"""
link = Link(project)
project._links = {link.id: link}
cloud_node = MagicMock()
cloud_node.node_type = "cloud"
cloud_node.id = "cloud-node-id"
cloud_node.name = "Cloud1"
compute = MagicMock()
response = MagicMock()
response.json = {
"ports_mapping": [
{
"port_number": 1,
"type": "ethernet",
"interface": "eth0",
"name": "Ethernet 1"
}
]
}
compute.get = AsyncioMagicMock(return_value=response)
cloud_node.compute = compute
link._nodes = [{"node": cloud_node, "port_number": 1}]
response = await client.get(app.url_path_for("get_iface", project_id=project.id, link_id=link.id))
assert response.status_code == status.HTTP_200_OK
result = response.json()
assert "cloud-node-id" in result
interface_info = result["cloud-node-id"]
assert interface_info["interface"] == "eth0"
assert interface_info["type"] == "ethernet"