mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-21 13:51:19 +00:00
Adjust auto scale to scale down nodes on shutdown (#2232)
* Only scale down when scale set in shutdown state * Bug fix + explaing the logic a bit better * Fix some bugs * linting and bug fixes * lint * Actually now * I'm not writing sql * last try * It's working * lint * Small docs update
This commit is contained in:
@ -42,23 +42,22 @@ from .monitor import get_monitor_client
|
||||
|
||||
|
||||
@retry_on_auth_failure()
|
||||
def add_auto_scale_to_vmss(
|
||||
vmss: UUID, auto_scale_profile: AutoscaleProfile
|
||||
) -> Optional[Error]:
|
||||
logging.info("Checking scaleset %s for existing auto scale resources" % vmss)
|
||||
def get_auto_scale_settings(
|
||||
vmss: UUID,
|
||||
) -> Union[Optional[AutoscaleSettingResource], Error]:
|
||||
logging.info("Getting auto scale settings for %s" % vmss)
|
||||
client = get_monitor_client()
|
||||
resource_group = get_base_resource_group()
|
||||
|
||||
auto_scale_resource_id = None
|
||||
|
||||
try:
|
||||
auto_scale_collections = client.autoscale_settings.list_by_resource_group(
|
||||
resource_group
|
||||
)
|
||||
for auto_scale in auto_scale_collections:
|
||||
if str(auto_scale.target_resource_uri).endswith(str(vmss)):
|
||||
auto_scale_resource_id = auto_scale.id
|
||||
break
|
||||
logging.info("Found auto scale settings for %s" % vmss)
|
||||
return auto_scale
|
||||
|
||||
except (ResourceNotFoundError, CloudError):
|
||||
return Error(
|
||||
code=ErrorCode.INVALID_CONFIGURATION,
|
||||
@ -68,7 +67,21 @@ def add_auto_scale_to_vmss(
|
||||
],
|
||||
)
|
||||
|
||||
if auto_scale_resource_id is not None:
|
||||
return None
|
||||
|
||||
|
||||
@retry_on_auth_failure()
|
||||
def add_auto_scale_to_vmss(
|
||||
vmss: UUID, auto_scale_profile: AutoscaleProfile
|
||||
) -> Optional[Error]:
|
||||
logging.info("Checking scaleset %s for existing auto scale resources" % vmss)
|
||||
|
||||
existing_auto_scale_resource = get_auto_scale_settings(vmss)
|
||||
|
||||
if isinstance(existing_auto_scale_resource, Error):
|
||||
return existing_auto_scale_resource
|
||||
|
||||
if existing_auto_scale_resource is not None:
|
||||
logging.warning("Scaleset %s already has auto scale resource" % vmss)
|
||||
return None
|
||||
|
||||
@ -87,6 +100,29 @@ def add_auto_scale_to_vmss(
|
||||
return None
|
||||
|
||||
|
||||
def update_auto_scale(auto_scale_resource: AutoscaleSettingResource) -> Optional[Error]:
|
||||
logging.info("Updating auto scale resource: %s" % auto_scale_resource.name)
|
||||
client = get_monitor_client()
|
||||
resource_group = get_base_resource_group()
|
||||
|
||||
try:
|
||||
auto_scale_resource = client.autoscale_settings.create_or_update(
|
||||
resource_group, auto_scale_resource.name, auto_scale_resource
|
||||
)
|
||||
logging.info(
|
||||
"Successfully updated auto scale resource: %s" % auto_scale_resource.name
|
||||
)
|
||||
except (ResourceNotFoundError, CloudError):
|
||||
return Error(
|
||||
code=ErrorCode.UNABLE_TO_UPDATE,
|
||||
errors=[
|
||||
"unable to update auto scale resource with name: %s and profile: %s"
|
||||
% (auto_scale_resource.name, auto_scale_resource)
|
||||
],
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def create_auto_scale_resource_for(
|
||||
resource_id: UUID, location: Region, profile: AutoscaleProfile
|
||||
) -> Union[AutoscaleSettingResource, Error]:
|
||||
@ -200,6 +236,32 @@ def default_auto_scale_profile(queue_uri: str, scaleset_size: int) -> AutoscaleP
|
||||
)
|
||||
|
||||
|
||||
def shutdown_scaleset_rule(queue_uri: str) -> ScaleRule:
|
||||
return ScaleRule(
|
||||
# Scale in if there are 0 or more messages in the queue (aka: every time)
|
||||
metric_trigger=MetricTrigger(
|
||||
metric_name="ApproximateMessageCount",
|
||||
metric_resource_uri=queue_uri,
|
||||
# Check every 10 minutes
|
||||
time_grain=timedelta(minutes=5),
|
||||
# The average amount of messages there are in the pool queue
|
||||
time_aggregation=TimeAggregationType.AVERAGE,
|
||||
statistic=MetricStatisticType.SUM,
|
||||
# Over the past 10 minutes
|
||||
time_window=timedelta(minutes=5),
|
||||
operator=ComparisonOperationType.GREATER_THAN_OR_EQUAL,
|
||||
threshold=0,
|
||||
divide_per_instance=False,
|
||||
),
|
||||
scale_action=ScaleAction(
|
||||
direction=ScaleDirection.DECREASE,
|
||||
type=ScaleType.CHANGE_COUNT,
|
||||
value=1,
|
||||
cooldown=timedelta(minutes=5),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def setup_auto_scale_diagnostics(
|
||||
auto_scale_resource_uri: str,
|
||||
auto_scale_resource_name: str,
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional, Set, Union, cast
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
|
||||
from uuid import UUID
|
||||
|
||||
from azure.core.exceptions import (
|
||||
@ -18,6 +18,7 @@ from azure.mgmt.compute.models import (
|
||||
ResourceSkuRestrictionsType,
|
||||
VirtualMachineScaleSetVMInstanceIDs,
|
||||
VirtualMachineScaleSetVMInstanceRequiredIDs,
|
||||
VirtualMachineScaleSetVMListResult,
|
||||
VirtualMachineScaleSetVMProtectionPolicy,
|
||||
)
|
||||
from memoization import cached
|
||||
@ -36,7 +37,10 @@ from .image import get_os
|
||||
|
||||
|
||||
@retry_on_auth_failure()
|
||||
def list_vmss(name: UUID) -> Optional[List[str]]:
|
||||
def list_vmss(
|
||||
name: UUID,
|
||||
vm_filter: Optional[Callable[[VirtualMachineScaleSetVMListResult], bool]] = None,
|
||||
) -> Optional[List[str]]:
|
||||
resource_group = get_base_resource_group()
|
||||
client = get_compute_client()
|
||||
try:
|
||||
@ -45,6 +49,7 @@ def list_vmss(name: UUID) -> Optional[List[str]]:
|
||||
for x in client.virtual_machine_scale_set_vms.list(
|
||||
resource_group, str(name)
|
||||
)
|
||||
if vm_filter is None or vm_filter(x)
|
||||
]
|
||||
return instances
|
||||
except (ResourceNotFoundError, CloudError) as err:
|
||||
|
@ -35,6 +35,9 @@ from ..azure.auto_scale import (
|
||||
add_auto_scale_to_vmss,
|
||||
create_auto_scale_profile,
|
||||
default_auto_scale_profile,
|
||||
get_auto_scale_settings,
|
||||
shutdown_scaleset_rule,
|
||||
update_auto_scale,
|
||||
)
|
||||
from ..azure.image import get_os
|
||||
from ..azure.network import Network
|
||||
@ -48,6 +51,7 @@ from ..azure.vmss import (
|
||||
get_vmss,
|
||||
get_vmss_size,
|
||||
list_instance_ids,
|
||||
list_vmss,
|
||||
reimage_vmss_nodes,
|
||||
resize_vmss,
|
||||
update_extensions,
|
||||
@ -715,6 +719,61 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
nodes = Node.search_states(scaleset_id=self.scaleset_id)
|
||||
for node in nodes:
|
||||
node.set_shutdown()
|
||||
|
||||
logging.info(
|
||||
SCALESET_LOG_PREFIX
|
||||
+ "checking for existing auto scale settings %s" % self.scaleset_id
|
||||
)
|
||||
auto_scale_policy = get_auto_scale_settings(self.scaleset_id)
|
||||
if auto_scale_policy is not None and not isinstance(auto_scale_policy, Error):
|
||||
for profile in auto_scale_policy.profiles:
|
||||
queue_uri = profile.rules[0].metric_trigger.metric_resource_uri
|
||||
|
||||
# Overwrite any existing scaling rules with one that will
|
||||
# try to scale in by 1 node at every opportunity
|
||||
profile.rules = [shutdown_scaleset_rule(queue_uri)]
|
||||
|
||||
# Auto scale (the azure service) will not allow you to
|
||||
# set the minimum number of instances to a number
|
||||
# smaller than the number of instances
|
||||
# with scale in protection enabled.
|
||||
#
|
||||
# Since:
|
||||
# * Nodes can no longer pick up work once the scale set is
|
||||
# in `shutdown` state
|
||||
# * All scale out rules are removed
|
||||
# Then: The number of nodes in the scale set with scale in
|
||||
# protection enabled _must_ strictly decrease over time.
|
||||
#
|
||||
# This guarantees that _eventually_
|
||||
# auto scale will scale in the remaining nodes,
|
||||
# the scale set will have 0 instances,
|
||||
# and once the scale set is empty, we will delete it.
|
||||
logging.info(
|
||||
SCALESET_LOG_PREFIX + "Getting nodes with scale in protection"
|
||||
)
|
||||
vms_with_protection = list_vmss(
|
||||
self.scaleset_id,
|
||||
lambda vm: vm.protection_policy is not None
|
||||
and bool(vm.protection_policy.protect_from_scale_in),
|
||||
)
|
||||
logging.info(SCALESET_LOG_PREFIX + str(vms_with_protection))
|
||||
if vms_with_protection is not None:
|
||||
profile.capacity.minimum = len(vms_with_protection)
|
||||
else:
|
||||
logging.error(
|
||||
"Failed to list vmss for scaleset %s" % self.scaleset_id
|
||||
)
|
||||
|
||||
updated_auto_scale = update_auto_scale(auto_scale_policy)
|
||||
if isinstance(updated_auto_scale, Error):
|
||||
logging.error("Failed to update auto scale %s" % updated_auto_scale)
|
||||
elif isinstance(auto_scale_policy, Error):
|
||||
logging.error(auto_scale_policy)
|
||||
else:
|
||||
logging.info(
|
||||
"No existing auto scale settings found for %s" % self.scaleset_id
|
||||
)
|
||||
if size == 0:
|
||||
self.halt()
|
||||
|
||||
|
Reference in New Issue
Block a user