mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-11 04:40:03 +00:00
Use IReactorTime instead of ad-hoc solutions.
This commit is contained in:
parent
6a78703675
commit
92c36a67d8
@ -20,6 +20,7 @@ import six
|
||||
from foolscap.api import Referenceable
|
||||
from foolscap.ipb import IRemoteReference
|
||||
from twisted.application import service
|
||||
from twisted.internet import reactor
|
||||
|
||||
from zope.interface import implementer
|
||||
from allmydata.interfaces import RIStorageServer, IStatsProducer
|
||||
@ -71,7 +72,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
expiration_override_lease_duration=None,
|
||||
expiration_cutoff_date=None,
|
||||
expiration_sharetypes=("mutable", "immutable"),
|
||||
get_current_time=time.time):
|
||||
clock=reactor):
|
||||
service.MultiService.__init__(self)
|
||||
assert isinstance(nodeid, bytes)
|
||||
assert len(nodeid) == 20
|
||||
@ -122,7 +123,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
expiration_cutoff_date,
|
||||
expiration_sharetypes)
|
||||
self.lease_checker.setServiceParent(self)
|
||||
self._get_current_time = get_current_time
|
||||
self._clock = clock
|
||||
|
||||
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
|
||||
# to connection: when disconnection happens, the BucketWriters are
|
||||
@ -292,7 +293,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
# owner_num is not for clients to set, but rather it should be
|
||||
# curried into the PersonalStorageServer instance that is dedicated
|
||||
# to a particular owner.
|
||||
start = self._get_current_time()
|
||||
start = self._clock.seconds()
|
||||
self.count("allocate")
|
||||
alreadygot = set()
|
||||
bucketwriters = {} # k: shnum, v: BucketWriter
|
||||
@ -305,7 +306,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
# goes into the share files themselves. It could also be put into a
|
||||
# separate database. Note that the lease should not be added until
|
||||
# the BucketWriter has been closed.
|
||||
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
|
||||
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
|
||||
lease_info = LeaseInfo(owner_num,
|
||||
renew_secret, cancel_secret,
|
||||
expire_time, self.my_nodeid)
|
||||
@ -360,7 +361,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
if bucketwriters:
|
||||
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
|
||||
|
||||
self.add_latency("allocate", self._get_current_time() - start)
|
||||
self.add_latency("allocate", self._clock.seconds() - start)
|
||||
return alreadygot, bucketwriters
|
||||
|
||||
def remote_allocate_buckets(self, storage_index,
|
||||
@ -395,26 +396,26 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
|
||||
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
|
||||
owner_num=1):
|
||||
start = self._get_current_time()
|
||||
start = self._clock.seconds()
|
||||
self.count("add-lease")
|
||||
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
|
||||
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
|
||||
lease_info = LeaseInfo(owner_num,
|
||||
renew_secret, cancel_secret,
|
||||
new_expire_time, self.my_nodeid)
|
||||
for sf in self._iter_share_files(storage_index):
|
||||
sf.add_or_renew_lease(lease_info)
|
||||
self.add_latency("add-lease", self._get_current_time() - start)
|
||||
self.add_latency("add-lease", self._clock.seconds() - start)
|
||||
return None
|
||||
|
||||
def remote_renew_lease(self, storage_index, renew_secret):
|
||||
start = self._get_current_time()
|
||||
start = self._clock.seconds()
|
||||
self.count("renew")
|
||||
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
|
||||
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
|
||||
found_buckets = False
|
||||
for sf in self._iter_share_files(storage_index):
|
||||
found_buckets = True
|
||||
sf.renew_lease(renew_secret, new_expire_time)
|
||||
self.add_latency("renew", self._get_current_time() - start)
|
||||
self.add_latency("renew", self._clock.seconds() - start)
|
||||
if not found_buckets:
|
||||
raise IndexError("no such lease to renew")
|
||||
|
||||
@ -441,7 +442,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
pass
|
||||
|
||||
def remote_get_buckets(self, storage_index):
|
||||
start = self._get_current_time()
|
||||
start = self._clock.seconds()
|
||||
self.count("get")
|
||||
si_s = si_b2a(storage_index)
|
||||
log.msg("storage: get_buckets %r" % si_s)
|
||||
@ -449,7 +450,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
bucketreaders[shnum] = BucketReader(self, filename,
|
||||
storage_index, shnum)
|
||||
self.add_latency("get", self._get_current_time() - start)
|
||||
self.add_latency("get", self._clock.seconds() - start)
|
||||
return bucketreaders
|
||||
|
||||
def get_leases(self, storage_index):
|
||||
@ -608,7 +609,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
:return LeaseInfo: Information for a new lease for a share.
|
||||
"""
|
||||
ownerid = 1 # TODO
|
||||
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
|
||||
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
|
||||
lease_info = LeaseInfo(ownerid,
|
||||
renew_secret, cancel_secret,
|
||||
expire_time, self.my_nodeid)
|
||||
@ -646,7 +647,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
See ``allmydata.interfaces.RIStorageServer`` for details about other
|
||||
parameters and return value.
|
||||
"""
|
||||
start = self._get_current_time()
|
||||
start = self._clock.seconds()
|
||||
self.count("writev")
|
||||
si_s = si_b2a(storage_index)
|
||||
log.msg("storage: slot_writev %r" % si_s)
|
||||
@ -687,7 +688,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
self._add_or_renew_leases(remaining_shares, lease_info)
|
||||
|
||||
# all done
|
||||
self.add_latency("writev", self._get_current_time() - start)
|
||||
self.add_latency("writev", self._clock.seconds() - start)
|
||||
return (testv_is_good, read_data)
|
||||
|
||||
def remote_slot_testv_and_readv_and_writev(self, storage_index,
|
||||
@ -713,7 +714,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
return share
|
||||
|
||||
def remote_slot_readv(self, storage_index, shares, readv):
|
||||
start = self._get_current_time()
|
||||
start = self._clock.seconds()
|
||||
self.count("readv")
|
||||
si_s = si_b2a(storage_index)
|
||||
lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
|
||||
@ -722,7 +723,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
# shares exist if there is a file for them
|
||||
bucketdir = os.path.join(self.sharedir, si_dir)
|
||||
if not os.path.isdir(bucketdir):
|
||||
self.add_latency("readv", self._get_current_time() - start)
|
||||
self.add_latency("readv", self._clock.seconds() - start)
|
||||
return {}
|
||||
datavs = {}
|
||||
for sharenum_s in os.listdir(bucketdir):
|
||||
@ -736,7 +737,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
datavs[sharenum] = msf.readv(readv)
|
||||
log.msg("returning shares %s" % (list(datavs.keys()),),
|
||||
facility="tahoe.storage", level=log.NOISY, parent=lp)
|
||||
self.add_latency("readv", self._get_current_time() - start)
|
||||
self.add_latency("readv", self._clock.seconds() - start)
|
||||
return datavs
|
||||
|
||||
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
|
||||
|
@ -21,6 +21,7 @@ if PY2:
|
||||
from random import Random
|
||||
|
||||
from twisted.internet.defer import inlineCallbacks, returnValue
|
||||
from twisted.internet.task import Clock
|
||||
|
||||
from foolscap.api import Referenceable, RemoteException
|
||||
|
||||
@ -1017,16 +1018,17 @@ class _FoolscapMixin(SystemTestMixin):
|
||||
self.server = s
|
||||
break
|
||||
assert self.server is not None, "Couldn't find StorageServer"
|
||||
self._current_time = 123456
|
||||
self.server._get_current_time = self.fake_time
|
||||
self._clock = Clock()
|
||||
self._clock.advance(123456)
|
||||
self.server._clock = self._clock
|
||||
|
||||
def fake_time(self):
|
||||
"""Return the current fake, test-controlled, time."""
|
||||
return self._current_time
|
||||
return self._clock.seconds()
|
||||
|
||||
def fake_sleep(self, seconds):
|
||||
"""Advance the fake time by the given number of seconds."""
|
||||
self._current_time += seconds
|
||||
self._clock.advance(seconds)
|
||||
|
||||
@inlineCallbacks
|
||||
def tearDown(self):
|
||||
|
@ -23,7 +23,7 @@ from uuid import uuid4
|
||||
|
||||
from twisted.trial import unittest
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.task import Clock
|
||||
|
||||
from hypothesis import given, strategies
|
||||
@ -438,11 +438,11 @@ class Server(unittest.TestCase):
|
||||
basedir = os.path.join("storage", "Server", name)
|
||||
return basedir
|
||||
|
||||
def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time):
|
||||
def create(self, name, reserved_space=0, klass=StorageServer, clock=reactor):
|
||||
workdir = self.workdir(name)
|
||||
ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
|
||||
stats_provider=FakeStatsProvider(),
|
||||
get_current_time=get_current_time)
|
||||
clock=clock)
|
||||
ss.setServiceParent(self.sparent)
|
||||
return ss
|
||||
|
||||
@ -626,7 +626,7 @@ class Server(unittest.TestCase):
|
||||
clock.advance(first_lease)
|
||||
ss = self.create(
|
||||
"test_allocate_without_lease_renewal",
|
||||
get_current_time=clock.seconds,
|
||||
clock=clock,
|
||||
)
|
||||
|
||||
# Put a share on there
|
||||
@ -918,7 +918,7 @@ class Server(unittest.TestCase):
|
||||
"""
|
||||
clock = Clock()
|
||||
clock.advance(123)
|
||||
ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds)
|
||||
ss = self.create("test_immutable_add_lease_renews", clock=clock)
|
||||
|
||||
# Start out with single lease created with bucket:
|
||||
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
|
||||
@ -1032,10 +1032,10 @@ class MutableServer(unittest.TestCase):
|
||||
basedir = os.path.join("storage", "MutableServer", name)
|
||||
return basedir
|
||||
|
||||
def create(self, name, get_current_time=time.time):
|
||||
def create(self, name, clock=reactor):
|
||||
workdir = self.workdir(name)
|
||||
ss = StorageServer(workdir, b"\x00" * 20,
|
||||
get_current_time=get_current_time)
|
||||
clock=clock)
|
||||
ss.setServiceParent(self.sparent)
|
||||
return ss
|
||||
|
||||
@ -1420,7 +1420,7 @@ class MutableServer(unittest.TestCase):
|
||||
clock = Clock()
|
||||
clock.advance(235)
|
||||
ss = self.create("test_mutable_add_lease_renews",
|
||||
get_current_time=clock.seconds)
|
||||
clock=clock)
|
||||
def secrets(n):
|
||||
return ( self.write_enabler(b"we1"),
|
||||
self.renew_secret(b"we1-%d" % n),
|
||||
|
Loading…
x
Reference in New Issue
Block a user