Managing Pool Resizing at service side (#107)

This commit is contained in:
Anshuman Goel
2020-10-13 11:04:26 -07:00
committed by GitHub
parent ab43890615
commit 7f0c25e2da
17 changed files with 319 additions and 40 deletions

View File

@ -1 +1,4 @@
.direnv
.python_packages
__pycache__
.venv

View File

@ -15,6 +15,7 @@ from ..onefuzzlib.request import not_ok, ok, parse_request
def get(req: func.HttpRequest) -> func.HttpResponse:
request = parse_request(NodeCommandGet, req)
if isinstance(request, Error):
return not_ok(request, context="NodeCommandGet")

View File

@ -107,6 +107,7 @@ def on_state_update(
state=NodeTaskState.setting_up,
)
node_task.save()
elif state == NodeState.done:
# if tasks are running on the node when it reports as Done
# those are stopped early
@ -125,6 +126,8 @@ def on_state_update(
machine_id,
done_data,
)
else:
logging.debug("No change in Node state")
else:
logging.info("ignoring state updates from the node: %s: %s", machine_id, state)

View File

@ -3,6 +3,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
from uuid import UUID
import azure.functions as func
@ -76,6 +77,7 @@ def get(req: func.HttpRequest) -> func.HttpResponse:
def post(req: func.HttpRequest) -> func.HttpResponse:
registration_request = parse_uri(AgentRegistrationPost, req)
logging.info("Registration request: %s", (registration_request))
if isinstance(registration_request, Error):
return not_ok(registration_request, context="agent registration")

View File

@ -16,7 +16,7 @@ from onefuzztypes.enums import (
PoolState,
ScalesetState,
)
from onefuzztypes.models import Error
from onefuzztypes.models import AutoScaleConfig, Error
from onefuzztypes.models import Node as BASE_NODE
from onefuzztypes.models import NodeAssignment, NodeCommand
from onefuzztypes.models import NodeTasks as BASE_NODE_TASK
@ -327,6 +327,7 @@ class Pool(BASE_POOL, ORMMixin):
arch: Architecture,
managed: bool,
client_id: Optional[UUID],
autoscale: Optional[AutoScaleConfig],
) -> "Pool":
return cls(
name=name,
@ -335,6 +336,7 @@ class Pool(BASE_POOL, ORMMixin):
managed=managed,
client_id=client_id,
config=None,
autoscale=autoscale,
)
def save_exclude(self) -> Optional[MappingIntStrAny]:
@ -854,14 +856,18 @@ class Scaleset(BASE_SCALESET, ORMMixin):
self.state = ScalesetState.halt
self.delete()
def max_size(self) -> int:
@classmethod
def scaleset_max_size(cls, image: str) -> int:
# https://docs.microsoft.com/en-us/azure/virtual-machine-scale-sets/
# virtual-machine-scale-sets-placement-groups#checklist-for-using-large-scale-sets
if self.image.startswith("/"):
if image.startswith("/"):
return 600
else:
return 1000
def max_size(self) -> int:
return Scaleset.scaleset_max_size(self.image)
@classmethod
def search_states(
cls, *, states: Optional[List[ScalesetState]] = None

View File

@ -153,6 +153,23 @@ class Task(BASE_TASK, ORMMixin):
task = tasks[0]
return task
@classmethod
def get_tasks_by_pool_name(cls, pool_name: str) -> List["Task"]:
tasks = cls.search_states(states=TaskState.available())
if not tasks:
return []
pool_tasks = []
for task in tasks:
task_pool = task.get_pool()
if not task_pool:
continue
if pool_name == task_pool.name and task.state in TaskState.available():
pool_tasks.append(task)
return pool_tasks
def mark_stopping(self) -> None:
if self.state not in [TaskState.stopped, TaskState.stopping]:
self.state = TaskState.stopping

View File

@ -3,6 +3,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
import os
import azure.functions as func
@ -10,7 +11,8 @@ from onefuzztypes.enums import ErrorCode, PoolState
from onefuzztypes.models import AgentConfig, Error
from onefuzztypes.requests import PoolCreate, PoolSearch, PoolStop
from ..onefuzzlib.azure.creds import get_instance_url
from ..onefuzzlib.azure.creds import get_base_region, get_instance_url, get_regions
from ..onefuzzlib.azure.vmss import list_available_skus
from ..onefuzzlib.pools import Pool
from ..onefuzzlib.request import not_ok, ok, parse_request
@ -65,12 +67,39 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
context=repr(request),
)
logging.info(request)
if request.autoscale:
if request.autoscale.region is None:
request.autoscale.region = get_base_region()
else:
if request.autoscale.region not in get_regions():
return not_ok(
Error(code=ErrorCode.UNABLE_TO_CREATE, errors=["invalid region"]),
context="poolcreate",
)
region = request.autoscale.region
if request.autoscale.vm_sku not in list_available_skus(region):
return not_ok(
Error(
code=ErrorCode.UNABLE_TO_CREATE,
errors=[
"vm_sku '%s' is not available in the location '%s'"
% (request.autoscale.vm_sku, region)
],
),
context="poolcreate",
)
pool = Pool.create(
name=request.name,
os=request.os,
arch=request.arch,
managed=request.managed,
client_id=request.client_id,
autoscale=request.autoscale,
)
pool.save()
return ok(set_config(pool))

View File

@ -0,0 +1,151 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
import math
from typing import List
import azure.functions as func
from onefuzztypes.enums import NodeState, PoolState, ScalesetState
from onefuzztypes.models import AutoScaleConfig, TaskPool
from ..onefuzzlib.pools import Node, Pool, Scaleset
from ..onefuzzlib.tasks.main import Task
def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
logging.info("Scaling up")
autoscale_config = pool.autoscale
if not isinstance(autoscale_config, AutoScaleConfig):
return
for scaleset in scalesets:
if scaleset.state == ScalesetState.running:
max_size = min(scaleset.max_size(), autoscale_config.scaleset_size)
logging.info(
"Sacleset id: %s, Scaleset size: %d, max_size: %d"
% (scaleset.scaleset_id, scaleset.size, max_size)
)
if scaleset.size < max_size:
current_size = scaleset.size
if nodes_needed <= max_size - current_size:
scaleset.size = current_size + nodes_needed
nodes_needed = 0
else:
scaleset.size = max_size
nodes_needed = nodes_needed - (max_size - current_size)
scaleset.state = ScalesetState.resize
scaleset.save()
else:
continue
if nodes_needed == 0:
return
for _ in range(
math.ceil(
nodes_needed
/ min(
Scaleset.scaleset_max_size(autoscale_config.image),
autoscale_config.scaleset_size,
)
)
):
logging.info("Creating Scaleset for Pool %s" % (pool.name))
max_nodes_scaleset = min(
Scaleset.scaleset_max_size(autoscale_config.image),
autoscale_config.scaleset_size,
nodes_needed,
)
if not autoscale_config.region:
raise Exception("Region is missing")
scaleset = Scaleset.create(
pool_name=pool.name,
vm_sku=autoscale_config.vm_sku,
image=autoscale_config.image,
region=autoscale_config.region,
size=max_nodes_scaleset,
spot_instances=autoscale_config.spot_instances,
tags={"pool": pool.name},
)
scaleset.save()
nodes_needed -= max_nodes_scaleset
def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
logging.info("Scaling down")
for scaleset in scalesets:
nodes = Node.search_states(
scaleset_id=scaleset.scaleset_id, states=[NodeState.free]
)
if nodes and nodes_to_remove > 0:
max_nodes_remove = min(len(nodes), nodes_to_remove)
if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size:
scaleset.state = ScalesetState.shutdown
nodes_to_remove = nodes_to_remove - scaleset.size
scaleset.save()
for node in nodes:
node.set_shutdown()
continue
scaleset.size = scaleset.size - max_nodes_remove
nodes_to_remove = nodes_to_remove - max_nodes_remove
scaleset.state = ScalesetState.resize
scaleset.save()
def get_vm_count(tasks: List[Task]) -> int:
count = 0
for task in tasks:
task_pool = task.get_pool()
if (
not task_pool
or not isinstance(task_pool, Pool)
or not isinstance(task.config.pool, TaskPool)
):
continue
count += task.config.pool.count
return count
def main(mytimer: func.TimerRequest) -> None: # noqa: F841
pools = Pool.search_states(states=PoolState.available())
for pool in pools:
logging.info("autoscale: %s" % (pool.autoscale))
if not pool.autoscale:
continue
# get all the tasks (count not stopped) for the pool
tasks = Task.get_tasks_by_pool_name(pool.name)
logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks)))
num_of_tasks = get_vm_count(tasks)
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
if pool.autoscale.max_size:
nodes_needed = min(nodes_needed, pool.autoscale.max_size)
# do scaleset logic match with pool
# get all the scalesets for the pool
scalesets = Scaleset.search_by_pool(pool.name)
pool_resize = False
for scaleset in scalesets:
if scaleset.state in ScalesetState.modifying():
pool_resize = True
break
nodes_needed = nodes_needed - scaleset.size
if pool_resize:
continue
logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed))
if nodes_needed > 0:
# resizing scaleset or creating new scaleset.
scale_up(pool, scalesets, nodes_needed)
elif nodes_needed < 0:
scale_down(scalesets, abs(nodes_needed))

View File

@ -0,0 +1,11 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "00:01:00"
}
]
}

View File

@ -812,8 +812,14 @@ class Pool(Endpoint):
unmanaged: bool = False,
arch: enums.Architecture = enums.Architecture.x86_64,
) -> models.Pool:
"""
Create a worker pool
:param str name: Name of the worker-pool
"""
self.logger.debug("create worker pool")
managed = not unmanaged
return self._req_model(
"POST",
models.Pool,
@ -823,6 +829,7 @@ class Pool(Endpoint):
"arch": arch,
"managed": managed,
"client_id": client_id,
"autoscale": None,
},
)

View File

@ -281,6 +281,15 @@ class ScalesetState(Enum):
unavailable = [cls.shutdown, cls.halt, cls.creation_failed]
return [x for x in cls if x not in unavailable]
@classmethod
def modifying(cls) -> List["ScalesetState"]:
""" set of states that indicate scaleset is resizing """
return [
cls.halt,
cls.init,
cls.setup,
]
class Architecture(Enum):
x86_64 = "x86_64"

View File

@ -456,11 +456,50 @@ class NodeTasks(BaseModel):
state: NodeTaskState = Field(default=NodeTaskState.init)
class AutoScaleConfig(BaseModel):
image: str
max_size: Optional[int] # max size of pool
min_size: int = Field(default=0) # min size of pool
region: Optional[Region]
scaleset_size: int # Individual scaleset size
spot_instances: bool = Field(default=False)
vm_sku: str
@validator("scaleset_size", allow_reuse=True)
def check_scaleset_size(cls, value: int) -> int:
if value < 1 or value > 1000:
raise ValueError("invalid scaleset size")
return value
@root_validator()
def check_data(cls, values: Any) -> Any:
if (
"max_size" in values
and values.get("max_size")
and values.get("min_size") > values.get("max_size")
):
raise ValueError("The pool min_size is greater than max_size")
return values
@validator("max_size", allow_reuse=True)
def check_max_size(cls, value: Optional[int]) -> Optional[int]:
if value and value < 1:
raise ValueError("Autoscale sizes are not defined properly")
return value
@validator("min_size", allow_reuse=True)
def check_min_size(cls, value: int) -> int:
if value < 0 or value > 1000:
raise ValueError("Invalid pool min_size")
return value
class Pool(BaseModel):
name: PoolName
pool_id: UUID = Field(default_factory=uuid4)
os: OS
managed: bool
autoscale: Optional[AutoScaleConfig]
arch: Architecture
state: PoolState = Field(default=PoolState.init)
client_id: Optional[UUID]
@ -596,7 +635,7 @@ class NodeEvent(EnumModel):
# Temporary shim type to support hot upgrade of 1.0.0 nodes.
#
# We want future variants to use an externally-tagged repr.
NodeEventShim = Union[NodeEvent, WorkerEvent, NodeStateUpdate]
NodeEventShim = Union[NodeStateUpdate, NodeEvent, WorkerEvent]
class NodeEventEnvelope(BaseModel):

View File

@ -18,7 +18,7 @@ from .enums import (
ScalesetState,
TaskState,
)
from .models import NotificationConfig
from .models import AutoScaleConfig, NotificationConfig
from .primitives import Container, PoolName, Region
@ -91,6 +91,7 @@ class PoolCreate(BaseRequest):
arch: Architecture
managed: bool
client_id: Optional[UUID]
autoscale: Optional[AutoScaleConfig]
class PoolSearch(BaseRequest):