mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-17 04:18:07 +00:00
add event for scaleset state updates (#882)
This moves all scaleset state updates through `Scaleset.set_state` and adds a new event EventScalesetStateUpdated.
This commit is contained in:
@ -41,6 +41,7 @@ Each event will be submitted via HTTP POST to the user provided URL.
|
||||
* [scaleset_created](#scaleset_created)
|
||||
* [scaleset_deleted](#scaleset_deleted)
|
||||
* [scaleset_failed](#scaleset_failed)
|
||||
* [scaleset_state_updated](#scaleset_state_updated)
|
||||
* [task_created](#task_created)
|
||||
* [task_failed](#task_failed)
|
||||
* [task_heartbeat](#task_heartbeat)
|
||||
@ -2203,6 +2204,61 @@ Each event will be submitted via HTTP POST to the user provided URL.
|
||||
}
|
||||
```
|
||||
|
||||
### scaleset_state_updated
|
||||
|
||||
#### Example
|
||||
|
||||
```json
|
||||
{
|
||||
"pool_name": "example",
|
||||
"scaleset_id": "00000000-0000-0000-0000-000000000000",
|
||||
"state": "init"
|
||||
}
|
||||
```
|
||||
|
||||
#### Schema
|
||||
|
||||
```json
|
||||
{
|
||||
"definitions": {
|
||||
"ScalesetState": {
|
||||
"description": "An enumeration.",
|
||||
"enum": [
|
||||
"init",
|
||||
"setup",
|
||||
"resize",
|
||||
"running",
|
||||
"shutdown",
|
||||
"halt",
|
||||
"creation_failed"
|
||||
],
|
||||
"title": "ScalesetState"
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"pool_name": {
|
||||
"title": "Pool Name",
|
||||
"type": "string"
|
||||
},
|
||||
"scaleset_id": {
|
||||
"format": "uuid",
|
||||
"title": "Scaleset Id",
|
||||
"type": "string"
|
||||
},
|
||||
"state": {
|
||||
"$ref": "#/definitions/ScalesetState"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"scaleset_id",
|
||||
"pool_name",
|
||||
"state"
|
||||
],
|
||||
"title": "EventScalesetStateUpdated",
|
||||
"type": "object"
|
||||
}
|
||||
```
|
||||
|
||||
### task_created
|
||||
|
||||
#### Example
|
||||
@ -4980,6 +5036,29 @@ Each event will be submitted via HTTP POST to the user provided URL.
|
||||
"title": "EventScalesetFailed",
|
||||
"type": "object"
|
||||
},
|
||||
"EventScalesetStateUpdated": {
|
||||
"properties": {
|
||||
"pool_name": {
|
||||
"title": "Pool Name",
|
||||
"type": "string"
|
||||
},
|
||||
"scaleset_id": {
|
||||
"format": "uuid",
|
||||
"title": "Scaleset Id",
|
||||
"type": "string"
|
||||
},
|
||||
"state": {
|
||||
"$ref": "#/definitions/ScalesetState"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"scaleset_id",
|
||||
"pool_name",
|
||||
"state"
|
||||
],
|
||||
"title": "EventScalesetStateUpdated",
|
||||
"type": "object"
|
||||
},
|
||||
"EventTaskCreated": {
|
||||
"properties": {
|
||||
"config": {
|
||||
@ -5139,6 +5218,7 @@ Each event will be submitted via HTTP POST to the user provided URL.
|
||||
"scaleset_created",
|
||||
"scaleset_deleted",
|
||||
"scaleset_failed",
|
||||
"scaleset_state_updated",
|
||||
"task_created",
|
||||
"task_failed",
|
||||
"task_state_updated",
|
||||
@ -5374,6 +5454,19 @@ Each event will be submitted via HTTP POST to the user provided URL.
|
||||
"title": "Report",
|
||||
"type": "object"
|
||||
},
|
||||
"ScalesetState": {
|
||||
"description": "An enumeration.",
|
||||
"enum": [
|
||||
"init",
|
||||
"setup",
|
||||
"resize",
|
||||
"running",
|
||||
"shutdown",
|
||||
"halt",
|
||||
"creation_failed"
|
||||
],
|
||||
"title": "ScalesetState"
|
||||
},
|
||||
"StatsFormat": {
|
||||
"description": "An enumeration.",
|
||||
"enum": [
|
||||
@ -5782,6 +5875,9 @@ Each event will be submitted via HTTP POST to the user provided URL.
|
||||
{
|
||||
"$ref": "#/definitions/EventScalesetDeleted"
|
||||
},
|
||||
{
|
||||
"$ref": "#/definitions/EventScalesetStateUpdated"
|
||||
},
|
||||
{
|
||||
"$ref": "#/definitions/EventTaskFailed"
|
||||
},
|
||||
|
@ -40,8 +40,7 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
|
||||
else:
|
||||
scaleset.size = max_size
|
||||
nodes_needed = nodes_needed - (max_size - current_size)
|
||||
scaleset.state = ScalesetState.resize
|
||||
scaleset.save()
|
||||
scaleset.set_state(ScalesetState.resize)
|
||||
|
||||
else:
|
||||
continue
|
||||
@ -90,8 +89,7 @@ def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
|
||||
ScalesetState.shutdown,
|
||||
ScalesetState.halt,
|
||||
]:
|
||||
scaleset.state = ScalesetState.resize
|
||||
scaleset.save()
|
||||
scaleset.set_state(ScalesetState.resize)
|
||||
|
||||
free_nodes = Node.search_states(
|
||||
scaleset_id=scaleset.scaleset_id,
|
||||
@ -107,9 +105,8 @@ def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
|
||||
max_nodes_remove = min(len(nodes), nodes_to_remove)
|
||||
# All nodes in scaleset are free. Can shutdown VMSS
|
||||
if max_nodes_remove >= scaleset.size and len(nodes) >= scaleset.size:
|
||||
scaleset.state = ScalesetState.shutdown
|
||||
scaleset.set_state(ScalesetState.shutdown)
|
||||
nodes_to_remove = nodes_to_remove - scaleset.size
|
||||
scaleset.save()
|
||||
for node in nodes:
|
||||
node.set_shutdown()
|
||||
continue
|
||||
@ -117,8 +114,7 @@ def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
|
||||
# Resize of VMSS needed
|
||||
scaleset.size = scaleset.size - max_nodes_remove
|
||||
nodes_to_remove = nodes_to_remove - max_nodes_remove
|
||||
scaleset.state = ScalesetState.resize
|
||||
scaleset.save()
|
||||
scaleset.set_state(ScalesetState.resize)
|
||||
|
||||
|
||||
def get_vm_count(tasks: List[Task]) -> int:
|
||||
|
@ -222,8 +222,7 @@ class Pool(BASE_POOL, ORMMixin):
|
||||
return
|
||||
|
||||
for scaleset in scalesets:
|
||||
scaleset.state = ScalesetState.halt
|
||||
scaleset.save()
|
||||
scaleset.set_state(ScalesetState.halt)
|
||||
|
||||
for node in nodes:
|
||||
node.set_halt()
|
||||
|
@ -13,6 +13,7 @@ from onefuzztypes.events import (
|
||||
EventScalesetCreated,
|
||||
EventScalesetDeleted,
|
||||
EventScalesetFailed,
|
||||
EventScalesetStateUpdated,
|
||||
)
|
||||
from onefuzztypes.models import Error
|
||||
from onefuzztypes.models import Scaleset as BASE_SCALESET
|
||||
@ -141,8 +142,7 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
return
|
||||
|
||||
self.error = error
|
||||
self.state = ScalesetState.creation_failed
|
||||
self.save()
|
||||
self.set_state(ScalesetState.creation_failed)
|
||||
|
||||
send_event(
|
||||
EventScalesetFailed(
|
||||
@ -184,11 +184,9 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
self.set_failed(error)
|
||||
return
|
||||
else:
|
||||
self.state = ScalesetState.setup
|
||||
self.set_state(ScalesetState.setup)
|
||||
else:
|
||||
self.state = ScalesetState.setup
|
||||
|
||||
self.save()
|
||||
self.set_state(ScalesetState.setup)
|
||||
|
||||
def setup(self) -> None:
|
||||
from .pools import Pool
|
||||
@ -269,7 +267,7 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
self.set_failed(identity_result)
|
||||
return
|
||||
else:
|
||||
self.state = ScalesetState.running
|
||||
self.set_state(ScalesetState.running)
|
||||
self.save()
|
||||
|
||||
def try_set_identity(self, vmss: Any) -> Optional[Error]:
|
||||
@ -414,9 +412,7 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
node_count = len(Node.search_states(scaleset_id=self.scaleset_id))
|
||||
if node_count == self.size:
|
||||
logging.info(SCALESET_LOG_PREFIX + "resize finished: %s", self.scaleset_id)
|
||||
self.state = ScalesetState.running
|
||||
self.save()
|
||||
return
|
||||
self.set_state(ScalesetState.running)
|
||||
else:
|
||||
logging.info(
|
||||
SCALESET_LOG_PREFIX
|
||||
@ -426,7 +422,6 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
node_count,
|
||||
self.size,
|
||||
)
|
||||
return
|
||||
|
||||
def _resize_grow(self) -> None:
|
||||
try:
|
||||
@ -577,10 +572,16 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
if self.state in [ScalesetState.halt, ScalesetState.shutdown]:
|
||||
return
|
||||
|
||||
logging.info(
|
||||
SCALESET_LOG_PREFIX + "scaleset set_shutdown: scaleset_id:%s now:%s",
|
||||
self.scaleset_id,
|
||||
now,
|
||||
)
|
||||
|
||||
if now:
|
||||
self.state = ScalesetState.halt
|
||||
self.set_state(ScalesetState.halt)
|
||||
else:
|
||||
self.state = ScalesetState.shutdown
|
||||
self.set_state(ScalesetState.shutdown)
|
||||
|
||||
self.save()
|
||||
|
||||
@ -736,6 +737,18 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
EventScalesetDeleted(scaleset_id=self.scaleset_id, pool_name=self.pool_name)
|
||||
)
|
||||
|
||||
def set_state(self, state: ScalesetState) -> None:
|
||||
if self.state == state:
|
||||
return
|
||||
|
||||
self.state = state
|
||||
self.save()
|
||||
send_event(
|
||||
EventScalesetStateUpdated(
|
||||
scaleset_id=self.scaleset_id, pool_name=self.pool_name, state=self.state
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class ShrinkEntry(BaseModel):
|
||||
shrink_id: UUID = Field(default_factory=uuid4)
|
||||
|
@ -109,13 +109,7 @@ def delete(req: func.HttpRequest) -> func.HttpResponse:
|
||||
if isinstance(scaleset, Error):
|
||||
return not_ok(scaleset, context="scaleset stop")
|
||||
|
||||
if request.now:
|
||||
scaleset.state = ScalesetState.halt
|
||||
else:
|
||||
scaleset.state = ScalesetState.shutdown
|
||||
|
||||
scaleset.save()
|
||||
scaleset.auth = None
|
||||
scaleset.set_shutdown(request.now)
|
||||
return ok(BoolResult(result=True))
|
||||
|
||||
|
||||
@ -139,7 +133,7 @@ def patch(req: func.HttpRequest) -> func.HttpResponse:
|
||||
|
||||
if request.size is not None:
|
||||
scaleset.size = request.size
|
||||
scaleset.state = ScalesetState.resize
|
||||
scaleset.set_state(ScalesetState.resize)
|
||||
|
||||
scaleset.save()
|
||||
scaleset.auth = None
|
||||
|
@ -12,6 +12,7 @@ from onefuzztypes.enums import (
|
||||
ContainerType,
|
||||
ErrorCode,
|
||||
NodeState,
|
||||
ScalesetState,
|
||||
TaskState,
|
||||
TaskType,
|
||||
)
|
||||
@ -35,6 +36,7 @@ from onefuzztypes.events import (
|
||||
EventScalesetCreated,
|
||||
EventScalesetDeleted,
|
||||
EventScalesetFailed,
|
||||
EventScalesetStateUpdated,
|
||||
EventTaskCreated,
|
||||
EventTaskFailed,
|
||||
EventTaskHeartbeat,
|
||||
@ -176,6 +178,11 @@ def main() -> None:
|
||||
),
|
||||
),
|
||||
EventScalesetDeleted(scaleset_id=UUID(int=0), pool_name=PoolName("example")),
|
||||
EventScalesetStateUpdated(
|
||||
scaleset_id=UUID(int=0),
|
||||
pool_name=PoolName("example"),
|
||||
state=ScalesetState.init,
|
||||
),
|
||||
EventJobCreated(
|
||||
job_id=UUID(int=0),
|
||||
config=JobConfig(
|
||||
|
@ -10,7 +10,7 @@ from uuid import UUID, uuid4
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .enums import OS, Architecture, NodeState, TaskState, TaskType
|
||||
from .enums import OS, Architecture, NodeState, ScalesetState, TaskState, TaskType
|
||||
from .models import (
|
||||
AutoScaleConfig,
|
||||
Error,
|
||||
@ -150,6 +150,12 @@ class EventNodeDeleted(BaseEvent):
|
||||
pool_name: PoolName
|
||||
|
||||
|
||||
class EventScalesetStateUpdated(BaseEvent):
|
||||
scaleset_id: UUID
|
||||
pool_name: PoolName
|
||||
state: ScalesetState
|
||||
|
||||
|
||||
class EventNodeStateUpdated(BaseEvent):
|
||||
machine_id: UUID
|
||||
scaleset_id: Optional[UUID]
|
||||
@ -192,6 +198,7 @@ Event = Union[
|
||||
EventScalesetFailed,
|
||||
EventScalesetCreated,
|
||||
EventScalesetDeleted,
|
||||
EventScalesetStateUpdated,
|
||||
EventTaskFailed,
|
||||
EventTaskStateUpdated,
|
||||
EventTaskCreated,
|
||||
@ -218,6 +225,7 @@ class EventType(Enum):
|
||||
scaleset_created = "scaleset_created"
|
||||
scaleset_deleted = "scaleset_deleted"
|
||||
scaleset_failed = "scaleset_failed"
|
||||
scaleset_state_updated = "scaleset_state_updated"
|
||||
task_created = "task_created"
|
||||
task_failed = "task_failed"
|
||||
task_state_updated = "task_state_updated"
|
||||
@ -245,6 +253,7 @@ EventTypeMap = {
|
||||
EventType.scaleset_created: EventScalesetCreated,
|
||||
EventType.scaleset_deleted: EventScalesetDeleted,
|
||||
EventType.scaleset_failed: EventScalesetFailed,
|
||||
EventType.scaleset_state_updated: EventScalesetStateUpdated,
|
||||
EventType.task_created: EventTaskCreated,
|
||||
EventType.task_failed: EventTaskFailed,
|
||||
EventType.task_state_updated: EventTaskStateUpdated,
|
||||
|
Reference in New Issue
Block a user