Do it without a new flag

This is safer against other conditions where _deque ends up with items when we
start a scan - for example, if we chunk up deque processing.
This commit is contained in:
Jean-Paul Calderone 2019-03-18 15:38:18 -04:00
parent 6a4d461d38
commit 55690bf639

View File

@ -950,10 +950,6 @@ class QueueMixin(HookMixin):
"""
action = PROCESSING_LOOP(**self._log_fields)
# Keep track of the fact that we're just starting up and we can and
# should skip a collective scan for one iteration.
self._startup_iteration = True
# Note that we don't put the processing iterations into the logging
# action because we expect this loop to run for the whole lifetime of
# the process. The tooling for dealing with incomplete action trees
@ -977,19 +973,17 @@ class QueueMixin(HookMixin):
with action.context():
d = DeferredContext(defer.Deferred())
if self._startup_iteration:
# During startup we scanned the collective for items to
# download. We do not need to perform another scan before
# processing our work queue. More importantly, the logic for
# determining which items to download is *not correct* in the
# case where two scans are performed with no intermediate
# emptying of the work queue. Therefore, skip the scan in the
# first processing iteration. Either there will be work in
# the queue from the initial scan or not. Either way, when we
# get here again on the next iteration, we'll go the other way
# and perform a scan.
self._startup_iteration = False
else:
# During startup we scanned the collective for items to
# download. We do not need to perform another scan before
# processing our work queue. More importantly, the logic for
# determining which items to download is *not correct* in the
# case where two scans are performed with no intermediate
# emptying of the work queue. Therefore, skip the scan in the
# first processing iteration. Either there will be work in
# the queue from the initial scan or not. Either way, when we
# get here again on the next iteration, we'll go the other way
# and perform a scan.
if not self._deque:
# adds items to our deque
d.addCallback(lambda ignored: self._perform_scan())