Delete nodes when they're done (#1763)

* Delete nodes when they're done

* Missed a file

* Load node disposal strategy from env var

* Lint

* Fix subtle bug

* Deleting doesn't work, will 'decomission' nodes once they complete work

* Missed a file

* Remove logging line
This commit is contained in:
Teo Voinea
2022-04-12 13:32:15 -04:00
committed by GitHub
parent faaa5d2d78
commit 87eb606b35
3 changed files with 49 additions and 28 deletions

View File

@ -156,6 +156,7 @@ def create_auto_scale_profile(
# When there's more than 1 message in the pool queue # When there's more than 1 message in the pool queue
operator=ComparisonOperationType.GREATER_THAN_OR_EQUAL, operator=ComparisonOperationType.GREATER_THAN_OR_EQUAL,
threshold=1, threshold=1,
divide_per_instance=False,
), ),
scale_action=ScaleAction( scale_action=ScaleAction(
direction=ScaleDirection.INCREASE, direction=ScaleDirection.INCREASE,
@ -170,16 +171,17 @@ def create_auto_scale_profile(
metric_trigger=MetricTrigger( metric_trigger=MetricTrigger(
metric_name="ApproximateMessageCount", metric_name="ApproximateMessageCount",
metric_resource_uri=queue_uri, metric_resource_uri=queue_uri,
# Check every 20 minutes # Check every 10 minutes
time_grain=timedelta(minutes=20), time_grain=timedelta(minutes=10),
# The average amount of messages there are in the pool queue # The average amount of messages there are in the pool queue
time_aggregation=TimeAggregationType.AVERAGE, time_aggregation=TimeAggregationType.AVERAGE,
statistic=MetricStatisticType.SUM, statistic=MetricStatisticType.SUM,
# Over the past 20 minutes # Over the past 10 minutes
time_window=timedelta(minutes=20), time_window=timedelta(minutes=10),
# When there's no messages in the pool queue # When there's no messages in the pool queue
operator=ComparisonOperationType.EQUALS, operator=ComparisonOperationType.EQUALS,
threshold=0, threshold=0,
divide_per_instance=False,
), ),
scale_action=ScaleAction( scale_action=ScaleAction(
direction=ScaleDirection.DECREASE, direction=ScaleDirection.DECREASE,
@ -194,7 +196,7 @@ def create_auto_scale_profile(
def default_auto_scale_profile(queue_uri: str, scaleset_size: int) -> AutoscaleProfile: def default_auto_scale_profile(queue_uri: str, scaleset_size: int) -> AutoscaleProfile:
return create_auto_scale_profile( return create_auto_scale_profile(
queue_uri, 1, scaleset_size, scaleset_size, 1, 10, 1, 15 queue_uri, 1, scaleset_size, scaleset_size, 1, 10, 1, 5
) )

View File

@ -5,6 +5,7 @@
import datetime import datetime
import logging import logging
import os
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import UUID from uuid import UUID
@ -437,8 +438,13 @@ class Scaleset(BASE_SCALESET, ORMMixin):
# Perform operations until they fail due to scaleset getting locked # Perform operations until they fail due to scaleset getting locked
try: try:
self.reimage_nodes(to_reimage, NodeDisaposalStrategy.scale_in) strategy_str = os.getenv("ONEFUZZ_NODE_DISPOSAL_STRATEGY", "scale_in")
self.delete_nodes(to_delete, NodeDisaposalStrategy.scale_in) if strategy_str == "decomission":
strategy = NodeDisaposalStrategy.decomission
else:
strategy = NodeDisaposalStrategy.scale_in
self.reimage_nodes(to_reimage, strategy)
self.delete_nodes(to_delete, strategy)
except UnableToUpdate: except UnableToUpdate:
logging.info( logging.info(
SCALESET_LOG_PREFIX SCALESET_LOG_PREFIX
@ -598,17 +604,23 @@ class Scaleset(BASE_SCALESET, ORMMixin):
else: else:
machine_ids.add(node.machine_id) machine_ids.add(node.machine_id)
logging.info( if disposal_strategy == NodeDisaposalStrategy.decomission:
SCALESET_LOG_PREFIX + "deleting nodes scaleset_id:%s machine_id:%s", logging.info(SCALESET_LOG_PREFIX + "decomissioning nodes")
self.scaleset_id, for node in nodes:
machine_ids, if node.machine_id in machine_ids:
)
delete_vmss_nodes(self.scaleset_id, machine_ids)
for node in nodes:
if node.machine_id in machine_ids:
node.delete()
if disposal_strategy == NodeDisaposalStrategy.scale_in:
node.release_scale_in_protection() node.release_scale_in_protection()
else:
logging.info(
SCALESET_LOG_PREFIX + "deleting nodes scaleset_id:%s machine_id:%s",
self.scaleset_id,
machine_ids,
)
delete_vmss_nodes(self.scaleset_id, machine_ids)
for node in nodes:
if node.machine_id in machine_ids:
node.delete()
if disposal_strategy == NodeDisaposalStrategy.scale_in:
node.release_scale_in_protection()
def reimage_nodes( def reimage_nodes(
self, nodes: List[Node], disposal_strategy: NodeDisaposalStrategy self, nodes: List[Node], disposal_strategy: NodeDisaposalStrategy
@ -659,18 +671,24 @@ class Scaleset(BASE_SCALESET, ORMMixin):
) )
return return
result = reimage_vmss_nodes(self.scaleset_id, machine_ids) if disposal_strategy == NodeDisaposalStrategy.decomission:
if isinstance(result, Error): logging.info(SCALESET_LOG_PREFIX + "decomissioning nodes")
raise Exception( for node in nodes:
"unable to reimage nodes: %s:%s - %s" if node.machine_id in machine_ids:
% (self.scaleset_id, machine_ids, result)
)
for node in nodes:
if node.machine_id in machine_ids:
node.delete()
if disposal_strategy == NodeDisaposalStrategy.scale_in:
node.release_scale_in_protection() node.release_scale_in_protection()
else:
result = reimage_vmss_nodes(self.scaleset_id, machine_ids)
if isinstance(result, Error):
raise Exception(
"unable to reimage nodes: %s:%s - %s"
% (self.scaleset_id, machine_ids, result)
)
for node in nodes:
if node.machine_id in machine_ids:
node.delete()
if disposal_strategy == NodeDisaposalStrategy.scale_in:
node.release_scale_in_protection()
def set_shutdown(self, now: bool) -> None: def set_shutdown(self, now: bool) -> None:
if now: if now:

View File

@ -417,3 +417,4 @@ class UserFieldType(Enum):
class NodeDisaposalStrategy(Enum): class NodeDisaposalStrategy(Enum):
scale_in = "scale_in" scale_in = "scale_in"
decomission = "decomission"