diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index 5d4a3980a..950f8e739 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -253,6 +253,10 @@ class Node(BASE_NODE, ORMMixin): def could_shrink_scaleset(self) -> bool: if self.scaleset_id and ShrinkQueue(self.scaleset_id).should_shrink(): return True + + if self.pool_id and ShrinkQueue(self.pool_id).should_shrink(): + return True + return False def can_process_new_work(self) -> bool: diff --git a/src/api-service/__app__/onefuzzlib/workers/pools.py b/src/api-service/__app__/onefuzzlib/workers/pools.py index 91c7de5f3..409b9f833 100644 --- a/src/api-service/__app__/onefuzzlib/workers/pools.py +++ b/src/api-service/__app__/onefuzzlib/workers/pools.py @@ -24,6 +24,7 @@ from ..azure.queue import create_queue, delete_queue, peek_queue, queue_object from ..azure.storage import StorageType from ..events import send_event from ..orm import MappingIntStrAny, ORMMixin, QueryFilter +from .shrink_queue import ShrinkQueue NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1) NODE_REIMAGE_TIME: datetime.timedelta = datetime.timedelta(days=7) @@ -98,6 +99,11 @@ class Pool(BASE_POOL, ORMMixin): for x in Scaleset.search_by_pool(self.name) ] + def peek_work_queue(self) -> List[WorkSet]: + return peek_queue( + self.get_pool_queue(), StorageType.corpus, object_type=WorkSet + ) + def populate_work_queue(self) -> None: self.work_queue = [] @@ -106,11 +112,13 @@ class Pool(BASE_POOL, ORMMixin): if self.state == PoolState.init: return - worksets = peek_queue( - self.get_pool_queue(), StorageType.corpus, object_type=WorkSet - ) + worksets = self.peek_work_queue() for workset in worksets: + # only include work units with work + if not workset.work_units: + continue + work_units = [ WorkUnitSummary( job_id=work_unit.job_id, @@ -126,6 +134,7 @@ class Pool(BASE_POOL, ORMMixin): def init(self) -> None: create_queue(self.get_pool_queue(), StorageType.corpus) + ShrinkQueue(self.pool_id).create() self.state = PoolState.running self.save() @@ -216,6 +225,7 @@ class Pool(BASE_POOL, ORMMixin): nodes = Node.search(query={"pool_name": [self.name]}) if not scalesets and not nodes: delete_queue(self.get_pool_queue(), StorageType.corpus) + ShrinkQueue(self.pool_id).delete() logging.info("pool stopped, deleting: %s", self.name) self.state = PoolState.halt self.delete() diff --git a/src/api-service/__app__/onefuzzlib/workers/scalesets.py b/src/api-service/__app__/onefuzzlib/workers/scalesets.py index 08255bd6f..dfb02ab58 100644 --- a/src/api-service/__app__/onefuzzlib/workers/scalesets.py +++ b/src/api-service/__app__/onefuzzlib/workers/scalesets.py @@ -378,6 +378,9 @@ class Scaleset(BASE_SCALESET, ORMMixin): if ShrinkQueue(self.scaleset_id).should_shrink(): node.set_halt() to_delete.append(node) + elif ShrinkQueue(pool.pool_id).should_shrink(): + node.set_halt() + to_delete.append(node) else: to_reimage.append(node)