mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-11 15:32:39 +00:00
exercise concurrent upload
This commit is contained in:
parent
8eb518a221
commit
ef11eeb4a2
@ -14,6 +14,10 @@ from twisted.application import service
|
||||
|
||||
from foolscap.api import Tub, fireEventually, flushEventualQueue
|
||||
|
||||
from eliot.twisted import (
|
||||
inline_callbacks,
|
||||
)
|
||||
|
||||
from allmydata.crypto import aes
|
||||
from allmydata.storage.server import si_b2a
|
||||
from allmydata.storage_client import StorageFarmBroker
|
||||
@ -126,6 +130,28 @@ def upload_data(uploader, data, convergence):
|
||||
u = upload.Data(data, convergence=convergence)
|
||||
return uploader.upload(u)
|
||||
|
||||
|
||||
def make_uploader(helper_furl, parent, override_name=None):
|
||||
"""
|
||||
Make an ``upload.Uploader`` service pointed at the given helper and with
|
||||
the given service parent.
|
||||
|
||||
:param bytes helper_furl: The Foolscap URL of the upload helper.
|
||||
|
||||
:param IServiceCollection parent: A parent to assign to the new uploader.
|
||||
|
||||
:param str override_name: If not ``None``, a new name for the uploader
|
||||
service. Multiple services cannot coexist with the same name.
|
||||
"""
|
||||
if not isinstance(helper_furl, bytes):
|
||||
raise TypeError("helper_furl must be bytes, got {!r} instead".format(helper_furl))
|
||||
u = upload.Uploader(helper_furl)
|
||||
if override_name is not None:
|
||||
u.name = override_name
|
||||
u.setServiceParent(parent)
|
||||
return u
|
||||
|
||||
|
||||
class AssistedUpload(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tub = t = Tub()
|
||||
@ -159,34 +185,75 @@ class AssistedUpload(unittest.TestCase):
|
||||
d.addBoth(flush_but_dont_ignore)
|
||||
return d
|
||||
|
||||
|
||||
def test_one(self):
|
||||
"""
|
||||
Some data that has never been uploaded before can be uploaded in CHK
|
||||
format using the ``RIHelper`` provider and ``Uploader.upload``.
|
||||
"""
|
||||
self.basedir = "helper/AssistedUpload/test_one"
|
||||
self.setUpHelper(self.basedir)
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
u = make_uploader(self.helper_furl, self.s)
|
||||
|
||||
d = wait_a_few_turns()
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
|
||||
self.assertTrue(
|
||||
u._helper,
|
||||
"Expected uploader to have a helper reference, had {} instead.".format(
|
||||
u._helper,
|
||||
),
|
||||
)
|
||||
return upload_data(u, DATA, convergence=b"some convergence string")
|
||||
d.addCallback(_ready)
|
||||
|
||||
def _uploaded(results):
|
||||
the_uri = results.get_uri()
|
||||
assert b"CHK" in the_uri
|
||||
self.assertIn(b"CHK", the_uri)
|
||||
d.addCallback(_uploaded)
|
||||
|
||||
def _check_empty(res):
|
||||
# Make sure the intermediate artifacts aren't left lying around.
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
|
||||
self.failUnlessEqual(files, [])
|
||||
self.assertEqual(files, [])
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
|
||||
self.failUnlessEqual(files, [])
|
||||
self.assertEqual(files, [])
|
||||
d.addCallback(_check_empty)
|
||||
|
||||
return d
|
||||
|
||||
@inline_callbacks
|
||||
def test_concurrent(self):
|
||||
"""
|
||||
The same data can be uploaded by more than one ``Uploader`` at a time.
|
||||
"""
|
||||
self.basedir = "helper/AssistedUpload/test_concurrent"
|
||||
self.setUpHelper(self.basedir)
|
||||
u1 = make_uploader(self.helper_furl, self.s, "u1")
|
||||
u2 = make_uploader(self.helper_furl, self.s, "u2")
|
||||
|
||||
yield wait_a_few_turns()
|
||||
|
||||
for u in [u1, u2]:
|
||||
self.assertTrue(
|
||||
u._helper,
|
||||
"Expected uploader to have a helper reference, had {} instead.".format(
|
||||
u._helper,
|
||||
),
|
||||
)
|
||||
|
||||
uploads = list(
|
||||
upload_data(u, DATA, convergence=b"some convergence string")
|
||||
for u
|
||||
in [u1, u2]
|
||||
)
|
||||
|
||||
result1, result2 = yield defer.gatherResults(uploads)
|
||||
|
||||
self.assertEqual(
|
||||
result1.get_uri(),
|
||||
result2.get_uri(),
|
||||
)
|
||||
|
||||
def test_previous_upload_failed(self):
|
||||
self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
|
||||
self.setUpHelper(self.basedir)
|
||||
@ -214,8 +281,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
f.write(aes.encrypt_data(encryptor, DATA))
|
||||
f.close()
|
||||
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
u = make_uploader(self.helper_furl, self.s)
|
||||
|
||||
d = wait_a_few_turns()
|
||||
|
||||
@ -240,8 +306,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
def test_already_uploaded(self):
|
||||
self.basedir = "helper/AssistedUpload/test_already_uploaded"
|
||||
self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
u = make_uploader(self.helper_furl, self.s)
|
||||
|
||||
d = wait_a_few_turns()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user