Abstract out node disposal (#1686)

* Abstract node disposal strategy

* Cleanup + lint

* Handle possibile scalesets being in resize state

* Setting the size is still exposed via CLI, we don't want to break that functionality

* PR comments
This commit is contained in:
Teo Voinea
2022-03-08 13:30:34 -05:00
committed by GitHub
parent 7c507ab7c7
commit 4d1c1f5713
3 changed files with 63 additions and 18 deletions

View File

@ -118,28 +118,54 @@ def create_auto_scale_profile(min: int, max: int, queue_uri: str) -> AutoscalePr
return AutoscaleProfile( return AutoscaleProfile(
name=str(uuid.uuid4()), name=str(uuid.uuid4()),
capacity=ScaleCapacity(minimum=min, maximum=max, default=max), capacity=ScaleCapacity(minimum=min, maximum=max, default=max),
# Auto scale tuning guidance:
# https://docs.microsoft.com/en-us/azure/architecture/best-practices/auto-scaling
rules=[ rules=[
ScaleRule( ScaleRule(
metric_trigger=MetricTrigger( metric_trigger=MetricTrigger(
metric_name="ApproximateMessageCount", metric_name="ApproximateMessageCount",
metric_resource_uri=queue_uri, metric_resource_uri=queue_uri,
# Check every minute # Check every 15 minutes
time_grain=timedelta(minutes=1), time_grain=timedelta(minutes=15),
# The average amount of messages there are in the pool queue # The average amount of messages there are in the pool queue
time_aggregation=TimeAggregationType.AVERAGE, time_aggregation=TimeAggregationType.AVERAGE,
statistic=MetricStatisticType.COUNT, statistic=MetricStatisticType.COUNT,
# Over the past 10 minutes # Over the past 15 minutes
time_window=timedelta(minutes=10), time_window=timedelta(minutes=15),
# When there's more than 1 message in the pool queue # When there's more than 1 message in the pool queue
operator=ComparisonOperationType.GREATER_THAN, operator=ComparisonOperationType.GREATER_THAN_OR_EQUAL,
threshold=1, threshold=1,
), ),
scale_action=ScaleAction( scale_action=ScaleAction(
direction=ScaleDirection.INCREASE, direction=ScaleDirection.INCREASE,
type=ScaleType.CHANGE_COUNT, type=ScaleType.CHANGE_COUNT,
value=1, value=2,
cooldown=timedelta(minutes=5), cooldown=timedelta(minutes=10),
),
),
# Scale in
ScaleRule(
# Scale in if no work in the past 20 mins
metric_trigger=MetricTrigger(
metric_name="ApproximateMessageCount",
metric_resource_uri=queue_uri,
# Check every 20 minutes
time_grain=timedelta(minutes=20),
# The average amount of messages there are in the pool queue
time_aggregation=TimeAggregationType.AVERAGE,
statistic=MetricStatisticType.SUM,
# Over the past 20 minutes
time_window=timedelta(minutes=20),
# When there's no messages in the pool queue
operator=ComparisonOperationType.EQUALS,
threshold=0,
),
scale_action=ScaleAction(
direction=ScaleDirection.DECREASE,
type=ScaleType.CHANGE_COUNT,
value=1,
cooldown=timedelta(minutes=15),
),
), ),
)
], ],
) )

View File

@ -8,7 +8,13 @@ import logging
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import UUID from uuid import UUID
from onefuzztypes.enums import ErrorCode, NodeState, PoolState, ScalesetState from onefuzztypes.enums import (
ErrorCode,
NodeDisaposalStrategy,
NodeState,
PoolState,
ScalesetState,
)
from onefuzztypes.events import ( from onefuzztypes.events import (
EventScalesetCreated, EventScalesetCreated,
EventScalesetDeleted, EventScalesetDeleted,
@ -420,8 +426,8 @@ class Scaleset(BASE_SCALESET, ORMMixin):
# Perform operations until they fail due to scaleset getting locked # Perform operations until they fail due to scaleset getting locked
try: try:
self.delete_nodes(to_delete) self.reimage_nodes(to_reimage, NodeDisaposalStrategy.scale_in)
self.reimage_nodes(to_reimage) self.delete_nodes(to_delete, NodeDisaposalStrategy.scale_in)
except UnableToUpdate: except UnableToUpdate:
logging.info( logging.info(
SCALESET_LOG_PREFIX SCALESET_LOG_PREFIX
@ -491,6 +497,8 @@ class Scaleset(BASE_SCALESET, ORMMixin):
return return
if size != self.size: if size != self.size:
# Azure auto-scaled us or nodes were manually added/removed
# New node state will be synced in cleanup_nodes
logging.info( logging.info(
SCALESET_LOG_PREFIX + "unexpected scaleset size, resizing. " SCALESET_LOG_PREFIX + "unexpected scaleset size, resizing. "
"scaleset_id:%s expected:%d actual:%d", "scaleset_id:%s expected:%d actual:%d",
@ -498,7 +506,8 @@ class Scaleset(BASE_SCALESET, ORMMixin):
self.size, self.size,
size, size,
) )
self.set_state(ScalesetState.resize) self.size = size
self.save()
def set_size(self, size: int) -> None: def set_size(self, size: int) -> None:
# ensure we always stay within max_size boundaries # ensure we always stay within max_size boundaries
@ -545,7 +554,9 @@ class Scaleset(BASE_SCALESET, ORMMixin):
else: else:
self._resize_shrink(size - self.size) self._resize_shrink(size - self.size)
def delete_nodes(self, nodes: List[Node]) -> None: def delete_nodes(
self, nodes: List[Node], disposal_strategy: NodeDisaposalStrategy
) -> None:
if not nodes: if not nodes:
logging.info( logging.info(
SCALESET_LOG_PREFIX + "no nodes to delete. scaleset_id:%s", SCALESET_LOG_PREFIX + "no nodes to delete. scaleset_id:%s",
@ -585,8 +596,12 @@ class Scaleset(BASE_SCALESET, ORMMixin):
for node in nodes: for node in nodes:
if node.machine_id in machine_ids: if node.machine_id in machine_ids:
node.delete() node.delete()
if disposal_strategy == NodeDisaposalStrategy.scale_in:
node.release_scale_in_protection()
def reimage_nodes(self, nodes: List[Node]) -> None: def reimage_nodes(
self, nodes: List[Node], disposal_strategy: NodeDisaposalStrategy
) -> None:
if not nodes: if not nodes:
logging.info( logging.info(
SCALESET_LOG_PREFIX + "no nodes to reimage: scaleset_id:%s", SCALESET_LOG_PREFIX + "no nodes to reimage: scaleset_id:%s",
@ -601,7 +616,7 @@ class Scaleset(BASE_SCALESET, ORMMixin):
+ "scaleset_id:%s", + "scaleset_id:%s",
self.scaleset_id, self.scaleset_id,
) )
self.delete_nodes(nodes) self.delete_nodes(nodes, disposal_strategy)
return return
if self.state == ScalesetState.halt: if self.state == ScalesetState.halt:
@ -643,6 +658,8 @@ class Scaleset(BASE_SCALESET, ORMMixin):
for node in nodes: for node in nodes:
if node.machine_id in machine_ids: if node.machine_id in machine_ids:
node.delete() node.delete()
if disposal_strategy == NodeDisaposalStrategy.scale_in:
node.release_scale_in_protection()
def set_shutdown(self, now: bool) -> None: def set_shutdown(self, now: bool) -> None:
if now: if now:
@ -852,8 +869,6 @@ class Scaleset(BASE_SCALESET, ORMMixin):
logging.error(capacity_failed) logging.error(capacity_failed)
return capacity_failed return capacity_failed
auto_scale_profile = create_auto_scale_profile( auto_scale_profile = create_auto_scale_profile(1, capacity, pool_queue_uri)
capacity, capacity, pool_queue_uri
)
logging.info("Added auto scale resource to scaleset: %s" % self.scaleset_id) logging.info("Added auto scale resource to scaleset: %s" % self.scaleset_id)
return add_auto_scale_to_vmss(self.scaleset_id, auto_scale_profile) return add_auto_scale_to_vmss(self.scaleset_id, auto_scale_profile)

View File

@ -411,3 +411,7 @@ class UserFieldType(Enum):
Str = "Str" Str = "Str"
DictStr = "DictStr" DictStr = "DictStr"
ListStr = "ListStr" ListStr = "ListStr"
class NodeDisaposalStrategy(Enum):
scale_in = "scale_in"