Add StopIfFree node command to tell free nodes to stop asking for new work (#866)

This commit is contained in:
bmc-msft
2021-05-07 13:55:50 -04:00
committed by GitHub
parent eba6fa4456
commit 221a3316a1
9 changed files with 71 additions and 2 deletions

View File

@ -25,6 +25,7 @@ pub enum NodeCommand {
AddSshKey(SshKeyInfo), AddSshKey(SshKeyInfo),
StopTask(StopTask), StopTask(StopTask),
Stop {}, Stop {},
StopIfFree {},
} }
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)] #[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]

View File

@ -71,6 +71,15 @@ impl Scheduler {
}; };
*self = state.into(); *self = state.into();
} }
NodeCommand::StopIfFree {} => {
if let Scheduler::Free(_) = self {
let cause = DoneCause::Stopped;
let state = State {
ctx: Done { cause },
};
*self = state.into();
}
}
} }
Ok(()) Ok(())

View File

@ -6,6 +6,7 @@
import os import os
from typing import Dict from typing import Dict
import semver
from memoization import cached from memoization import cached
from onefuzztypes.responses import Version from onefuzztypes.responses import Version
@ -29,3 +30,8 @@ def versions() -> Dict[str, Version]:
version=__version__, version=__version__,
) )
return {"onefuzz": entry} return {"onefuzz": entry}
def is_minimum_version(*, version: str, minimum: str) -> bool:
# check if version is at least (or higher) than minimum
return bool(semver.VersionInfo.parse(version).compare(minimum) >= 0)

View File

@ -16,7 +16,12 @@ from onefuzztypes.events import (
) )
from onefuzztypes.models import Error from onefuzztypes.models import Error
from onefuzztypes.models import Node as BASE_NODE from onefuzztypes.models import Node as BASE_NODE
from onefuzztypes.models import NodeAssignment, NodeCommand, NodeCommandAddSshKey from onefuzztypes.models import (
NodeAssignment,
NodeCommand,
NodeCommandAddSshKey,
NodeCommandStopIfFree,
)
from onefuzztypes.models import NodeTasks as BASE_NODE_TASK from onefuzztypes.models import NodeTasks as BASE_NODE_TASK
from onefuzztypes.models import Result, StopNodeCommand, StopTaskNodeCommand from onefuzztypes.models import Result, StopNodeCommand, StopTaskNodeCommand
from onefuzztypes.primitives import PoolName from onefuzztypes.primitives import PoolName
@ -26,6 +31,7 @@ from ..__version__ import __version__
from ..azure.vmss import get_instance_id from ..azure.vmss import get_instance_id
from ..events import send_event from ..events import send_event
from ..orm import MappingIntStrAny, ORMMixin, QueryFilter from ..orm import MappingIntStrAny, ORMMixin, QueryFilter
from ..versions import is_minimum_version
NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1) NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1)
NODE_REIMAGE_TIME: datetime.timedelta = datetime.timedelta(days=7) NODE_REIMAGE_TIME: datetime.timedelta = datetime.timedelta(days=7)
@ -338,6 +344,11 @@ class Node(BASE_NODE, ORMMixin):
if not self.reimage_requested and not self.delete_requested: if not self.reimage_requested and not self.delete_requested:
logging.info("setting reimage_requested: %s", self.machine_id) logging.info("setting reimage_requested: %s", self.machine_id)
self.reimage_requested = True self.reimage_requested = True
# if we're going to reimage, make sure the node doesn't pick up new work
# too.
self.send_stop_if_free()
self.save() self.save()
def add_ssh_public_key(self, public_key: str) -> Result[None]: def add_ssh_public_key(self, public_key: str) -> Result[None]:
@ -355,6 +366,10 @@ class Node(BASE_NODE, ORMMixin):
) )
return None return None
def send_stop_if_free(self) -> None:
if is_minimum_version(version=self.version, minimum="2.16.1"):
self.send_message(NodeCommand(stop_if_free=NodeCommandStopIfFree()))
def stop(self, done: bool = False) -> None: def stop(self, done: bool = False) -> None:
self.to_reimage(done=done) self.to_reimage(done=done)
self.send_message(NodeCommand(stop=StopNodeCommand())) self.send_message(NodeCommand(stop=StopNodeCommand()))

View File

@ -440,10 +440,19 @@ class Scaleset(BASE_SCALESET, ORMMixin):
return return
def _resize_shrink(self, to_remove: int) -> None: def _resize_shrink(self, to_remove: int) -> None:
logging.info(
SCALESET_LOG_PREFIX + "shrinking scaleset. scaleset_id:%s to_remove:%d",
self.scaleset_id,
to_remove,
)
queue = ScalesetShrinkQueue(self.scaleset_id) queue = ScalesetShrinkQueue(self.scaleset_id)
for _ in range(to_remove): for _ in range(to_remove):
queue.add_entry() queue.add_entry()
nodes = Node.search_states(scaleset_id=self.scaleset_id)
for node in nodes:
node.send_stop_if_free()
def resize(self) -> None: def resize(self) -> None:
# no longer needing to resize # no longer needing to resize
if self.state != ScalesetState.resize: if self.state != ScalesetState.resize:

View File

@ -30,5 +30,6 @@ memoization~=0.3.1
github3.py~=1.3.0 github3.py~=1.3.0
typing-extensions~=3.7.4.3 typing-extensions~=3.7.4.3
jsonpatch==1.28 jsonpatch==1.28
semver==2.13.0
# onefuzz types version is set during build # onefuzz types version is set during build
onefuzztypes==0.0.0 onefuzztypes==0.0.0

View File

@ -37,3 +37,6 @@ ignore_missing_imports = True
[mypy-jsonpatch.*] [mypy-jsonpatch.*]
ignore_missing_imports = True ignore_missing_imports = True
[mypy-semver.*]
ignore_missing_imports = True

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import unittest
from __app__.onefuzzlib.versions import is_minimum_version
class TestMinVersion(unittest.TestCase):
def test_basic(self) -> None:
self.assertEqual(is_minimum_version(version="1.0.0", minimum="1.0.0"), True)
self.assertEqual(is_minimum_version(version="2.0.0", minimum="1.0.0"), True)
self.assertEqual(is_minimum_version(version="2.0.0", minimum="3.0.0"), False)
self.assertEqual(is_minimum_version(version="1.0.0", minimum="1.6.0"), False)
if __name__ == "__main__":
unittest.main()

View File

@ -557,6 +557,10 @@ class NodeHeartbeatEntry(BaseModel):
data: List[Dict[str, HeartbeatType]] data: List[Dict[str, HeartbeatType]]
class NodeCommandStopIfFree(BaseModel):
pass
class StopNodeCommand(BaseModel): class StopNodeCommand(BaseModel):
pass pass
@ -573,6 +577,7 @@ class NodeCommand(EnumModel):
stop: Optional[StopNodeCommand] stop: Optional[StopNodeCommand]
stop_task: Optional[StopTaskNodeCommand] stop_task: Optional[StopTaskNodeCommand]
add_ssh_key: Optional[NodeCommandAddSshKey] add_ssh_key: Optional[NodeCommandAddSshKey]
stop_if_free: Optional[NodeCommandStopIfFree]
class NodeCommandEnvelope(BaseModel): class NodeCommandEnvelope(BaseModel):