tahoe-lafs/src/allmydata/test/test_keygen.py
robk-tahoe 27ac71458f key_generator: remove poll timeouts from test
the timeouts on uses of 'poll' were there purely to make sure a test doesn't
poll indefinitely.  however having such timeouts makes tests susceptible 
to premature timeouts under high load, or on slow machines. (e.g. cygwin 
slaves running in virtual machines on loaded hosts)

purportedly trial by default applies a timeout to tests to prevent them
hanging out indefinitely, so these poll timeouts are redundant and cause
intermittent failures on slow hosts.  hence they're more bother than they're
worth, and should be culled.
2008-04-08 11:06:58 -07:00

98 lines
3.2 KiB
Python

import os
from twisted.trial import unittest
from twisted.application import service
from foolscap import Tub, eventual
from allmydata import key_generator
from allmydata.util import testutil
from pycryptopp.publickey import rsa
def flush_but_dont_ignore(res):
d = eventual.flushEventualQueue()
def _done(ignored):
return res
d.addCallback(_done)
return d
class KeyGenService(unittest.TestCase, testutil.PollMixin):
def setUp(self):
self.parent = service.MultiService()
self.parent.startService()
self.tub = t = Tub()
t.setServiceParent(self.parent)
t.listenOn("tcp:0")
t.setLocationAutomatically()
def tearDown(self):
d = self.parent.stopService()
d.addCallback(eventual.fireEventually)
d.addBoth(flush_but_dont_ignore)
return d
def test_key_gen_service(self):
def p(junk, msg):
#import time
#print time.asctime(), msg
return junk
#print 'starting key generator service'
kgs = key_generator.KeyGeneratorService(display_furl=False)
kgs.key_generator.verbose = True
kgs.setServiceParent(self.parent)
kgs.key_generator.pool_size = 8
keysize = kgs.key_generator.DEFAULT_KEY_SIZE
def keypool_full():
return len(kgs.key_generator.keypool) == kgs.key_generator.pool_size
# first wait for key gen pool to fill up
d = eventual.fireEventually()
d.addCallback(p, 'waiting for pool to fill up')
d.addCallback(lambda junk: self.poll(keypool_full))
d.addCallback(p, 'grabbing a few keys')
# grab a few keys, check that pool size shrinks
def get_key(junk=None):
d = self.tub.getReference(kgs.keygen_furl)
d.addCallback(lambda kg: kg.callRemote('get_rsa_key_pair', keysize))
return d
def check_poolsize(junk, size):
self.failUnlessEqual(len(kgs.key_generator.keypool), size)
n_keys_to_waste = 4
for i in range(n_keys_to_waste):
d.addCallback(get_key)
d.addCallback(check_poolsize, kgs.key_generator.pool_size - n_keys_to_waste)
d.addCallback(p, 'checking a key works')
# check that a retrieved key is actually useful
d.addCallback(get_key)
def check_key_works(keys):
verifying_key, signing_key = keys
v = rsa.create_verifying_key_from_string(verifying_key)
s = rsa.create_signing_key_from_string(signing_key)
junk = os.urandom(42)
sig = s.sign(junk)
self.failUnless(v.verify(junk, sig))
d.addCallback(check_key_works)
d.addCallback(p, 'checking pool exhaustion')
# exhaust the pool
for i in range(kgs.key_generator.pool_size):
d.addCallback(get_key)
d.addCallback(check_poolsize, 0)
# and check it still works (will gen key synchronously on demand)
d.addCallback(get_key)
d.addCallback(check_key_works)
d.addCallback(p, 'checking pool replenishment')
# check that the pool will refill
d.addCallback(lambda junk: self.poll(keypool_full))
return d