test_introducer.SystemTest: fix race condition

SystemTest has a couple of different phases, separated by a poller which
waits for everything to be idle (all messages delivered, none in flight). It
does this by watching some internal "_debug_outstanding" counters in the
server and in each client, and waiting for them to hit zero.

Just before the last phase, we replace the server with a new one (to make
sure clients re-send their messages properly). Unfortunately, the polling
function closed over the variable holding the original server, and didn't see
the replacement. It kept polling the old server, and failed to notice the
outstanding messages for the new server. The last phase of the test (check3)
was started too early, which failed (since some messages had not yet been
delivered), and then exploded in a flurry of dirty-reactor errors (because
some messages were delivered after test shutdown).

This replaces the closed-over-variable with a "self.the_introducer", which
seems to fix the race.

One additional place to look at in the future: the client
announcement-receive path (remote_announce) uses an eventually(). If the
message has been received and the eventual-send posted (but not yet executed)
when the poller sees it, the poller might erroneously conclude that the
client is idle and cause the same problem as above. To fix this, the poller
(probably all pollers) could be enhanced to do a flushEventualQueue before
querying the are-we-done-yet predicate function.
This commit is contained in:
Brian Warner 2012-03-30 17:29:06 -07:00
parent c5e10e2261
commit 24812905a1

View File

@ -387,6 +387,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
received_announcements = {}
subscribing_clients = []
publishing_clients = []
self.the_introducer = introducer
privkeys = {}
expected_announcements = [0 for c in range(NUM_CLIENTS)]
@ -483,7 +484,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
for c in subscribing_clients + publishing_clients:
if c._debug_outstanding:
return False
if introducer._debug_outstanding:
if self.the_introducer._debug_outstanding:
return False
return True
return self.poll(_idle)
@ -495,7 +496,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def _check1(res):
log.msg("doing _check1")
dc = introducer._debug_counts
dc = self.the_introducer._debug_counts
if server_version == V1:
# each storage server publishes a record, and (after its
# 'subscribe' has been ACKed) also publishes a "stub_client".
@ -594,11 +595,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
c = subscribing_clients[i]
for k in c._debug_counts:
c._debug_counts[k] = 0
for k in introducer._debug_counts:
introducer._debug_counts[k] = 0
for k in self.the_introducer._debug_counts:
self.the_introducer._debug_counts[k] = 0
expected_announcements[i] += 1 # new 'storage' for everyone
self.create_tub(self.central_portnum)
newfurl = self.central_tub.registerReference(introducer,
newfurl = self.central_tub.registerReference(self.the_introducer,
furlFile=iff)
assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer_tub)
@ -614,7 +615,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
log.msg("doing _check2")
# assert that the introducer sent out new messages, one per
# subscriber
dc = introducer._debug_counts
dc = self.the_introducer._debug_counts
self.failUnlessEqual(dc["outbound_announcements"],
NUM_STORAGE*NUM_CLIENTS)
self.failUnless(dc["outbound_message"] > 0)
@ -652,7 +653,8 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
introducer = old.IntroducerService_v1()
else:
introducer = IntroducerService()
newfurl = self.central_tub.registerReference(introducer,
self.the_introducer = introducer
newfurl = self.central_tub.registerReference(self.the_introducer,
furlFile=iff)
assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer)
@ -663,7 +665,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def _check3(res):
log.msg("doing _check3")
dc = introducer._debug_counts
dc = self.the_introducer._debug_counts
self.failUnlessEqual(dc["outbound_announcements"],
NUM_STORAGE*NUM_CLIENTS)
self.failUnless(dc["outbound_message"] > 0)