Merge pull request #1348 from meejah/4078.race-condition

fix 4078 / race condition
This commit is contained in:
meejah 2024-01-04 18:59:26 +00:00 committed by GitHub
commit f44574f2f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 14 deletions

View File

@ -0,0 +1 @@
Fix a race condition with SegmentFetcher

View File

@ -63,11 +63,12 @@ class SegmentFetcher(object):
self._running = True
def stop(self):
if self._running:
log.msg("SegmentFetcher(%r).stop" % self._node._si_prefix,
level=log.NOISY, parent=self._lp, umid="LWyqpg")
self._cancel_all_requests()
self._running = False
# help GC ??? XXX
# help GC ???
del self._shares, self._shares_from_server, self._active_share_map
del self._share_observers

View File

@ -132,8 +132,8 @@ class DownloadNode(object):
def stop(self):
# called by the Terminator at shutdown, mostly for tests
if self._active_segment:
self._active_segment.stop()
self._active_segment = None
seg, self._active_segment = self._active_segment, None
seg.stop()
self._sharefinder.stop()
# things called by outside callers, via CiphertextFileNode. get_segment()
@ -410,11 +410,11 @@ class DownloadNode(object):
def fetch_failed(self, sf, f):
assert sf is self._active_segment
self._active_segment = None
# deliver error upwards
for (d,c,seg_ev) in self._extract_requests(sf.segnum):
seg_ev.error(now())
eventually(self._deliver, d, c, f)
self._active_segment = None
self._start_new_segment()
def process_blocks(self, segnum, blocks):
@ -434,6 +434,7 @@ class DownloadNode(object):
eventually(self._deliver, d, c, result)
else:
(offset, segment, decodetime) = result
self._active_segment = None
for (d,c,seg_ev) in self._extract_requests(segnum):
# when we have two requests for the same segment, the
# second one will not be "activated" before the data is
@ -446,7 +447,6 @@ class DownloadNode(object):
seg_ev.deliver(when, offset, len(segment), decodetime)
eventually(self._deliver, d, c, result)
self._download_status.add_misc_event("process_block", start, now())
self._active_segment = None
self._start_new_segment()
d.addBoth(_deliver)
d.addErrback(log.err, "unhandled error during process_blocks",
@ -533,11 +533,12 @@ class DownloadNode(object):
self._segment_requests = [t for t in self._segment_requests
if t[2] != cancel]
segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
# self._active_segment might be None in rare circumstances, so make
# sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums:
self._active_segment.stop()
self._active_segment = None
seg, self._active_segment = self._active_segment, None
seg.stop()
self._start_new_segment()
# called by ShareFinder to choose hashtree sizes in CommonShares, and by

View File

@ -264,7 +264,7 @@ class RunTests(SyncTestCase):
self.assertThat(runs, Equals([]))
self.assertThat(result_code, Equals(1))
good_file_content_re = re.compile(r"\s[0-9]*\s[0-9]*\s", re.M)
good_file_content_re = re.compile(r"\s*[0-9]*\s[0-9]*\s*", re.M)
@given(text())
def test_pidfile_contents(self, content):