From 92c36a67d8c98436e2cc2616d89ff0135307858c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 17 Nov 2021 11:01:04 -0500 Subject: [PATCH] Use IReactorTime instead of ad-hoc solutions. --- src/allmydata/storage/server.py | 39 ++++++++++++----------- src/allmydata/test/test_istorageserver.py | 10 +++--- src/allmydata/test/test_storage.py | 16 +++++----- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index ee2ea1c61..499d47276 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -20,6 +20,7 @@ import six from foolscap.api import Referenceable from foolscap.ipb import IRemoteReference from twisted.application import service +from twisted.internet import reactor from zope.interface import implementer from allmydata.interfaces import RIStorageServer, IStatsProducer @@ -71,7 +72,7 @@ class StorageServer(service.MultiService, Referenceable): expiration_override_lease_duration=None, expiration_cutoff_date=None, expiration_sharetypes=("mutable", "immutable"), - get_current_time=time.time): + clock=reactor): service.MultiService.__init__(self) assert isinstance(nodeid, bytes) assert len(nodeid) == 20 @@ -122,7 +123,7 @@ class StorageServer(service.MultiService, Referenceable): expiration_cutoff_date, expiration_sharetypes) self.lease_checker.setServiceParent(self) - self._get_current_time = get_current_time + self._clock = clock # Currently being-written Bucketwriters. For Foolscap, lifetime is tied # to connection: when disconnection happens, the BucketWriters are @@ -292,7 +293,7 @@ class StorageServer(service.MultiService, Referenceable): # owner_num is not for clients to set, but rather it should be # curried into the PersonalStorageServer instance that is dedicated # to a particular owner. - start = self._get_current_time() + start = self._clock.seconds() self.count("allocate") alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter @@ -305,7 +306,7 @@ class StorageServer(service.MultiService, Referenceable): # goes into the share files themselves. It could also be put into a # separate database. Note that the lease should not be added until # the BucketWriter has been closed. - expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME lease_info = LeaseInfo(owner_num, renew_secret, cancel_secret, expire_time, self.my_nodeid) @@ -360,7 +361,7 @@ class StorageServer(service.MultiService, Referenceable): if bucketwriters: fileutil.make_dirs(os.path.join(self.sharedir, si_dir)) - self.add_latency("allocate", self._get_current_time() - start) + self.add_latency("allocate", self._clock.seconds() - start) return alreadygot, bucketwriters def remote_allocate_buckets(self, storage_index, @@ -395,26 +396,26 @@ class StorageServer(service.MultiService, Referenceable): def remote_add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1): - start = self._get_current_time() + start = self._clock.seconds() self.count("add-lease") - new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME lease_info = LeaseInfo(owner_num, renew_secret, cancel_secret, new_expire_time, self.my_nodeid) for sf in self._iter_share_files(storage_index): sf.add_or_renew_lease(lease_info) - self.add_latency("add-lease", self._get_current_time() - start) + self.add_latency("add-lease", self._clock.seconds() - start) return None def remote_renew_lease(self, storage_index, renew_secret): - start = self._get_current_time() + start = self._clock.seconds() self.count("renew") - new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME found_buckets = False for sf in self._iter_share_files(storage_index): found_buckets = True sf.renew_lease(renew_secret, new_expire_time) - self.add_latency("renew", self._get_current_time() - start) + self.add_latency("renew", self._clock.seconds() - start) if not found_buckets: raise IndexError("no such lease to renew") @@ -441,7 +442,7 @@ class StorageServer(service.MultiService, Referenceable): pass def remote_get_buckets(self, storage_index): - start = self._get_current_time() + start = self._clock.seconds() self.count("get") si_s = si_b2a(storage_index) log.msg("storage: get_buckets %r" % si_s) @@ -449,7 +450,7 @@ class StorageServer(service.MultiService, Referenceable): for shnum, filename in self._get_bucket_shares(storage_index): bucketreaders[shnum] = BucketReader(self, filename, storage_index, shnum) - self.add_latency("get", self._get_current_time() - start) + self.add_latency("get", self._clock.seconds() - start) return bucketreaders def get_leases(self, storage_index): @@ -608,7 +609,7 @@ class StorageServer(service.MultiService, Referenceable): :return LeaseInfo: Information for a new lease for a share. """ ownerid = 1 # TODO - expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME lease_info = LeaseInfo(ownerid, renew_secret, cancel_secret, expire_time, self.my_nodeid) @@ -646,7 +647,7 @@ class StorageServer(service.MultiService, Referenceable): See ``allmydata.interfaces.RIStorageServer`` for details about other parameters and return value. """ - start = self._get_current_time() + start = self._clock.seconds() self.count("writev") si_s = si_b2a(storage_index) log.msg("storage: slot_writev %r" % si_s) @@ -687,7 +688,7 @@ class StorageServer(service.MultiService, Referenceable): self._add_or_renew_leases(remaining_shares, lease_info) # all done - self.add_latency("writev", self._get_current_time() - start) + self.add_latency("writev", self._clock.seconds() - start) return (testv_is_good, read_data) def remote_slot_testv_and_readv_and_writev(self, storage_index, @@ -713,7 +714,7 @@ class StorageServer(service.MultiService, Referenceable): return share def remote_slot_readv(self, storage_index, shares, readv): - start = self._get_current_time() + start = self._clock.seconds() self.count("readv") si_s = si_b2a(storage_index) lp = log.msg("storage: slot_readv %r %r" % (si_s, shares), @@ -722,7 +723,7 @@ class StorageServer(service.MultiService, Referenceable): # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_dir) if not os.path.isdir(bucketdir): - self.add_latency("readv", self._get_current_time() - start) + self.add_latency("readv", self._clock.seconds() - start) return {} datavs = {} for sharenum_s in os.listdir(bucketdir): @@ -736,7 +737,7 @@ class StorageServer(service.MultiService, Referenceable): datavs[sharenum] = msf.readv(readv) log.msg("returning shares %s" % (list(datavs.keys()),), facility="tahoe.storage", level=log.NOISY, parent=lp) - self.add_latency("readv", self._get_current_time() - start) + self.add_latency("readv", self._clock.seconds() - start) return datavs def remote_advise_corrupt_share(self, share_type, storage_index, shnum, diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index fe494a9d4..a17264713 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -21,6 +21,7 @@ if PY2: from random import Random from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.task import Clock from foolscap.api import Referenceable, RemoteException @@ -1017,16 +1018,17 @@ class _FoolscapMixin(SystemTestMixin): self.server = s break assert self.server is not None, "Couldn't find StorageServer" - self._current_time = 123456 - self.server._get_current_time = self.fake_time + self._clock = Clock() + self._clock.advance(123456) + self.server._clock = self._clock def fake_time(self): """Return the current fake, test-controlled, time.""" - return self._current_time + return self._clock.seconds() def fake_sleep(self, seconds): """Advance the fake time by the given number of seconds.""" - self._current_time += seconds + self._clock.advance(seconds) @inlineCallbacks def tearDown(self): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 4e40a76a5..e143bec63 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -23,7 +23,7 @@ from uuid import uuid4 from twisted.trial import unittest -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.internet.task import Clock from hypothesis import given, strategies @@ -438,11 +438,11 @@ class Server(unittest.TestCase): basedir = os.path.join("storage", "Server", name) return basedir - def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time): + def create(self, name, reserved_space=0, klass=StorageServer, clock=reactor): workdir = self.workdir(name) ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space, stats_provider=FakeStatsProvider(), - get_current_time=get_current_time) + clock=clock) ss.setServiceParent(self.sparent) return ss @@ -626,7 +626,7 @@ class Server(unittest.TestCase): clock.advance(first_lease) ss = self.create( "test_allocate_without_lease_renewal", - get_current_time=clock.seconds, + clock=clock, ) # Put a share on there @@ -918,7 +918,7 @@ class Server(unittest.TestCase): """ clock = Clock() clock.advance(123) - ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds) + ss = self.create("test_immutable_add_lease_renews", clock=clock) # Start out with single lease created with bucket: renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0") @@ -1032,10 +1032,10 @@ class MutableServer(unittest.TestCase): basedir = os.path.join("storage", "MutableServer", name) return basedir - def create(self, name, get_current_time=time.time): + def create(self, name, clock=reactor): workdir = self.workdir(name) ss = StorageServer(workdir, b"\x00" * 20, - get_current_time=get_current_time) + clock=clock) ss.setServiceParent(self.sparent) return ss @@ -1420,7 +1420,7 @@ class MutableServer(unittest.TestCase): clock = Clock() clock.advance(235) ss = self.create("test_mutable_add_lease_renews", - get_current_time=clock.seconds) + clock=clock) def secrets(n): return ( self.write_enabler(b"we1"), self.renew_secret(b"we1-%d" % n),