mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-16 20:08:09 +00:00
Pool shrink queue (#1050)
This commit is contained in:
@ -253,6 +253,10 @@ class Node(BASE_NODE, ORMMixin):
|
||||
def could_shrink_scaleset(self) -> bool:
|
||||
if self.scaleset_id and ShrinkQueue(self.scaleset_id).should_shrink():
|
||||
return True
|
||||
|
||||
if self.pool_id and ShrinkQueue(self.pool_id).should_shrink():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def can_process_new_work(self) -> bool:
|
||||
|
@ -24,6 +24,7 @@ from ..azure.queue import create_queue, delete_queue, peek_queue, queue_object
|
||||
from ..azure.storage import StorageType
|
||||
from ..events import send_event
|
||||
from ..orm import MappingIntStrAny, ORMMixin, QueryFilter
|
||||
from .shrink_queue import ShrinkQueue
|
||||
|
||||
NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1)
|
||||
NODE_REIMAGE_TIME: datetime.timedelta = datetime.timedelta(days=7)
|
||||
@ -98,6 +99,11 @@ class Pool(BASE_POOL, ORMMixin):
|
||||
for x in Scaleset.search_by_pool(self.name)
|
||||
]
|
||||
|
||||
def peek_work_queue(self) -> List[WorkSet]:
|
||||
return peek_queue(
|
||||
self.get_pool_queue(), StorageType.corpus, object_type=WorkSet
|
||||
)
|
||||
|
||||
def populate_work_queue(self) -> None:
|
||||
self.work_queue = []
|
||||
|
||||
@ -106,11 +112,13 @@ class Pool(BASE_POOL, ORMMixin):
|
||||
if self.state == PoolState.init:
|
||||
return
|
||||
|
||||
worksets = peek_queue(
|
||||
self.get_pool_queue(), StorageType.corpus, object_type=WorkSet
|
||||
)
|
||||
worksets = self.peek_work_queue()
|
||||
|
||||
for workset in worksets:
|
||||
# only include work units with work
|
||||
if not workset.work_units:
|
||||
continue
|
||||
|
||||
work_units = [
|
||||
WorkUnitSummary(
|
||||
job_id=work_unit.job_id,
|
||||
@ -126,6 +134,7 @@ class Pool(BASE_POOL, ORMMixin):
|
||||
|
||||
def init(self) -> None:
|
||||
create_queue(self.get_pool_queue(), StorageType.corpus)
|
||||
ShrinkQueue(self.pool_id).create()
|
||||
self.state = PoolState.running
|
||||
self.save()
|
||||
|
||||
@ -216,6 +225,7 @@ class Pool(BASE_POOL, ORMMixin):
|
||||
nodes = Node.search(query={"pool_name": [self.name]})
|
||||
if not scalesets and not nodes:
|
||||
delete_queue(self.get_pool_queue(), StorageType.corpus)
|
||||
ShrinkQueue(self.pool_id).delete()
|
||||
logging.info("pool stopped, deleting: %s", self.name)
|
||||
self.state = PoolState.halt
|
||||
self.delete()
|
||||
|
@ -378,6 +378,9 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
||||
if ShrinkQueue(self.scaleset_id).should_shrink():
|
||||
node.set_halt()
|
||||
to_delete.append(node)
|
||||
elif ShrinkQueue(pool.pool_id).should_shrink():
|
||||
node.set_halt()
|
||||
to_delete.append(node)
|
||||
else:
|
||||
to_reimage.append(node)
|
||||
|
||||
|
Reference in New Issue
Block a user