mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-17 04:18:07 +00:00
Updates the following libraries in the service: * azure-core * azure-functions * azure-identity * azure-keyvault-keys * azure-keyvault-secrets * azure-mgmt-compute * azure-mgmt-core * azure-mgmt-loganalytics * azure-mgmt-network * azure-mgmt-resource * azure-mgmt-storage * azure-mgmt-subscription * azure-storage-blob * azure-storage-queue * pydantic * requests * jsonpatch Removes the following libraries in the service: * azure-cli-core * azure-cli-nspkg * azure-mgmt-cosmosdb * azure-servicebus Updates the following libraries in the CLI: * requests * semver * asciimatics * pydantic * tenacity Updates the following libraries in onefuzztypes: * pydantic The primary "legacy" libraries are [azure-graphrbac](https://pypi.org/project/azure-graphrbac/) and azure-cosmosdb-table. The former has not been updated to use azure-identity yet. The later is being rewritten as [azure-data-tables](https://pypi.org/project/azure-data-tables/), but is still in early beta.
154 lines
5.0 KiB
Python
154 lines
5.0 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright (c) Microsoft Corporation.
|
|
# Licensed under the MIT License.
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any, Dict, Optional, Union
|
|
from uuid import UUID
|
|
|
|
from azure.core.exceptions import ResourceNotFoundError
|
|
from msrestazure.azure_exceptions import CloudError
|
|
from msrestazure.tools import parse_resource_id
|
|
from onefuzztypes.enums import ErrorCode
|
|
from onefuzztypes.models import Error
|
|
|
|
from .creds import get_base_resource_group
|
|
from .network_mgmt_client import get_network_client
|
|
from .subnet import create_virtual_network, get_subnet_id
|
|
from .vmss import get_instance_id
|
|
|
|
|
|
def get_scaleset_instance_ip(scaleset: UUID, machine_id: UUID) -> Optional[str]:
|
|
instance = get_instance_id(scaleset, machine_id)
|
|
if not isinstance(instance, str):
|
|
return None
|
|
|
|
resource_group = get_base_resource_group()
|
|
|
|
client = get_network_client()
|
|
intf = client.network_interfaces.list_virtual_machine_scale_set_network_interfaces(
|
|
resource_group, str(scaleset)
|
|
)
|
|
try:
|
|
for interface in intf:
|
|
resource = parse_resource_id(interface.virtual_machine.id)
|
|
if resource.get("resource_name") != instance:
|
|
continue
|
|
|
|
for config in interface.ip_configurations:
|
|
if config.private_ip_address is None:
|
|
continue
|
|
return str(config.private_ip_address)
|
|
except (ResourceNotFoundError, CloudError):
|
|
# this can fail if an interface is removed during the iteration
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def get_ip(resource_group: str, name: str) -> Optional[Any]:
|
|
logging.info("getting ip %s:%s", resource_group, name)
|
|
network_client = get_network_client()
|
|
try:
|
|
return network_client.public_ip_addresses.get(resource_group, name)
|
|
except (ResourceNotFoundError, CloudError):
|
|
return None
|
|
|
|
|
|
def delete_ip(resource_group: str, name: str) -> Any:
|
|
logging.info("deleting ip %s:%s", resource_group, name)
|
|
network_client = get_network_client()
|
|
return network_client.public_ip_addresses.begin_delete(resource_group, name)
|
|
|
|
|
|
def create_ip(resource_group: str, name: str, location: str) -> Any:
|
|
logging.info("creating ip for %s:%s in %s", resource_group, name, location)
|
|
|
|
network_client = get_network_client()
|
|
params: Dict[str, Union[str, Dict[str, str]]] = {
|
|
"location": location,
|
|
"public_ip_allocation_method": "Dynamic",
|
|
}
|
|
if "ONEFUZZ_OWNER" in os.environ:
|
|
params["tags"] = {"OWNER": os.environ["ONEFUZZ_OWNER"]}
|
|
return network_client.public_ip_addresses.begin_create_or_update(
|
|
resource_group, name, params
|
|
)
|
|
|
|
|
|
def get_public_nic(resource_group: str, name: str) -> Optional[Any]:
|
|
logging.info("getting nic: %s %s", resource_group, name)
|
|
network_client = get_network_client()
|
|
try:
|
|
return network_client.network_interfaces.get(resource_group, name)
|
|
except (ResourceNotFoundError, CloudError):
|
|
return None
|
|
|
|
|
|
def delete_nic(resource_group: str, name: str) -> Optional[Any]:
|
|
logging.info("deleting nic %s:%s", resource_group, name)
|
|
network_client = get_network_client()
|
|
return network_client.network_interfaces.begin_delete(resource_group, name)
|
|
|
|
|
|
def create_public_nic(resource_group: str, name: str, location: str) -> Optional[Error]:
|
|
logging.info("creating nic for %s:%s in %s", resource_group, name, location)
|
|
|
|
network_client = get_network_client()
|
|
subnet_id = get_subnet_id(resource_group, location)
|
|
if not subnet_id:
|
|
return create_virtual_network(resource_group, location, location)
|
|
|
|
ip = get_ip(resource_group, name)
|
|
if not ip:
|
|
create_ip(resource_group, name, location)
|
|
return None
|
|
|
|
params = {
|
|
"location": location,
|
|
"ip_configurations": [
|
|
{
|
|
"name": "myIPConfig",
|
|
"public_ip_address": ip,
|
|
"subnet": {"id": subnet_id},
|
|
}
|
|
],
|
|
}
|
|
if "ONEFUZZ_OWNER" in os.environ:
|
|
params["tags"] = {"OWNER": os.environ["ONEFUZZ_OWNER"]}
|
|
|
|
try:
|
|
network_client.network_interfaces.begin_create_or_update(
|
|
resource_group, name, params
|
|
)
|
|
except (ResourceNotFoundError, CloudError) as err:
|
|
if "RetryableError" not in repr(err):
|
|
return Error(
|
|
code=ErrorCode.VM_CREATE_FAILED,
|
|
errors=["unable to create nic: %s" % err],
|
|
)
|
|
return None
|
|
|
|
|
|
def get_public_ip(resource_id: str) -> Optional[str]:
|
|
logging.info("getting ip for %s", resource_id)
|
|
network_client = get_network_client()
|
|
resource = parse_resource_id(resource_id)
|
|
ip = (
|
|
network_client.network_interfaces.get(
|
|
resource["resource_group"], resource["name"]
|
|
)
|
|
.ip_configurations[0]
|
|
.public_ip_address
|
|
)
|
|
resource = parse_resource_id(ip.id)
|
|
ip = network_client.public_ip_addresses.get(
|
|
resource["resource_group"], resource["name"]
|
|
).ip_address
|
|
if ip is None:
|
|
return None
|
|
else:
|
|
return str(ip)
|