remove wait_for_numpeers and the when_enough_peers call in mutable.Publish

This commit is contained in:
Brian Warner 2008-01-14 14:55:59 -07:00
parent 222718f13c
commit 7ac2b94aba
12 changed files with 86 additions and 125 deletions

View File

@ -241,19 +241,19 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
assert IMutableFileURI.providedBy(u), u assert IMutableFileURI.providedBy(u), u
return MutableFileNode(self).init_from_uri(u) return MutableFileNode(self).init_from_uri(u)
def create_empty_dirnode(self, wait_for_numpeers=None): def create_empty_dirnode(self):
n = NewDirectoryNode(self) n = NewDirectoryNode(self)
d = n.create(wait_for_numpeers=wait_for_numpeers) d = n.create()
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
def create_mutable_file(self, contents="", wait_for_numpeers=None): def create_mutable_file(self, contents=""):
n = MutableFileNode(self) n = MutableFileNode(self)
d = n.create(contents, wait_for_numpeers=wait_for_numpeers) d = n.create(contents)
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
def upload(self, uploadable, wait_for_numpeers=None): def upload(self, uploadable):
uploader = self.getServiceNamed("uploader") uploader = self.getServiceNamed("uploader")
return uploader.upload(uploadable, wait_for_numpeers=wait_for_numpeers) return uploader.upload(uploadable)

View File

@ -53,7 +53,7 @@ class NewDirectoryNode:
self._node.init_from_uri(self._uri.get_filenode_uri()) self._node.init_from_uri(self._uri.get_filenode_uri())
return self return self
def create(self, wait_for_numpeers=None): def create(self):
""" """
Returns a deferred that eventually fires with self once the directory Returns a deferred that eventually fires with self once the directory
has been created (distributed across a set of storage servers). has been created (distributed across a set of storage servers).
@ -62,7 +62,7 @@ class NewDirectoryNode:
# URI to create our own. # URI to create our own.
self._node = self.filenode_class(self._client) self._node = self.filenode_class(self._client)
empty_contents = self._pack_contents({}) empty_contents = self._pack_contents({})
d = self._node.create(empty_contents, wait_for_numpeers=wait_for_numpeers) d = self._node.create(empty_contents)
d.addCallback(self._filenode_created) d.addCallback(self._filenode_created)
return d return d
def _filenode_created(self, res): def _filenode_created(self, res):
@ -214,7 +214,7 @@ class NewDirectoryNode:
d.addCallback(_got) d.addCallback(_got)
return d return d
def set_uri(self, name, child_uri, metadata={}, wait_for_numpeers=None): def set_uri(self, name, child_uri, metadata={}):
"""I add a child (by URI) at the specific name. I return a Deferred """I add a child (by URI) at the specific name. I return a Deferred
that fires with the child node when the operation finishes. I will that fires with the child node when the operation finishes. I will
replace any existing child of the same name. replace any existing child of the same name.
@ -224,10 +224,9 @@ class NewDirectoryNode:
If this directory node is read-only, the Deferred will errback with a If this directory node is read-only, the Deferred will errback with a
NotMutableError.""" NotMutableError."""
return self.set_node(name, self._create_node(child_uri), metadata, return self.set_node(name, self._create_node(child_uri), metadata)
wait_for_numpeers)
def set_uris(self, entries, wait_for_numpeers=None): def set_uris(self, entries):
node_entries = [] node_entries = []
for e in entries: for e in entries:
if len(e) == 2: if len(e) == 2:
@ -237,9 +236,9 @@ class NewDirectoryNode:
assert len(e) == 3 assert len(e) == 3
name, child_uri, metadata = e name, child_uri, metadata = e
node_entries.append( (name,self._create_node(child_uri),metadata) ) node_entries.append( (name,self._create_node(child_uri),metadata) )
return self.set_nodes(node_entries, wait_for_numpeers) return self.set_nodes(node_entries)
def set_node(self, name, child, metadata={}, wait_for_numpeers=None): def set_node(self, name, child, metadata={}):
"""I add a child at the specific name. I return a Deferred that fires """I add a child at the specific name. I return a Deferred that fires
when the operation finishes. This Deferred will fire with the child when the operation finishes. This Deferred will fire with the child
node that was just added. I will replace any existing child of the node that was just added. I will replace any existing child of the
@ -248,11 +247,11 @@ class NewDirectoryNode:
If this directory node is read-only, the Deferred will errback with a If this directory node is read-only, the Deferred will errback with a
NotMutableError.""" NotMutableError."""
assert IFilesystemNode.providedBy(child), child assert IFilesystemNode.providedBy(child), child
d = self.set_nodes( [(name, child, metadata)], wait_for_numpeers) d = self.set_nodes( [(name, child, metadata)])
d.addCallback(lambda res: child) d.addCallback(lambda res: child)
return d return d
def set_nodes(self, entries, wait_for_numpeers=None): def set_nodes(self, entries):
if self.is_readonly(): if self.is_readonly():
return defer.fail(NotMutableError()) return defer.fail(NotMutableError())
d = self._read() d = self._read()
@ -266,22 +265,22 @@ class NewDirectoryNode:
name, child, metadata = e name, child, metadata = e
children[name] = (child, metadata) children[name] = (child, metadata)
new_contents = self._pack_contents(children) new_contents = self._pack_contents(children)
return self._node.replace(new_contents, wait_for_numpeers=wait_for_numpeers) return self._node.replace(new_contents)
d.addCallback(_add) d.addCallback(_add)
d.addCallback(lambda res: None) d.addCallback(lambda res: None)
return d return d
def add_file(self, name, uploadable, wait_for_numpeers=None): def add_file(self, name, uploadable):
"""I upload a file (using the given IUploadable), then attach the """I upload a file (using the given IUploadable), then attach the
resulting FileNode to the directory at the given name. I return a resulting FileNode to the directory at the given name. I return a
Deferred that fires (with the IFileNode of the uploaded file) when Deferred that fires (with the IFileNode of the uploaded file) when
the operation completes.""" the operation completes."""
if self.is_readonly(): if self.is_readonly():
return defer.fail(NotMutableError()) return defer.fail(NotMutableError())
d = self._client.upload(uploadable, wait_for_numpeers=wait_for_numpeers) d = self._client.upload(uploadable)
d.addCallback(self._client.create_node_from_uri) d.addCallback(self._client.create_node_from_uri)
d.addCallback(lambda node: self.set_node(name, node, wait_for_numpeers=wait_for_numpeers)) d.addCallback(lambda node: self.set_node(name, node))
return d return d
def delete(self, name): def delete(self, name):
@ -302,22 +301,22 @@ class NewDirectoryNode:
d.addCallback(_delete) d.addCallback(_delete)
return d return d
def create_empty_directory(self, name, wait_for_numpeers=None): def create_empty_directory(self, name):
"""I create and attach an empty directory at the given name. I return """I create and attach an empty directory at the given name. I return
a Deferred that fires (with the new directory node) when the a Deferred that fires (with the new directory node) when the
operation finishes.""" operation finishes."""
if self.is_readonly(): if self.is_readonly():
return defer.fail(NotMutableError()) return defer.fail(NotMutableError())
d = self._client.create_empty_dirnode(wait_for_numpeers=wait_for_numpeers) d = self._client.create_empty_dirnode()
def _created(child): def _created(child):
d = self.set_node(name, child, wait_for_numpeers=wait_for_numpeers) d = self.set_node(name, child)
d.addCallback(lambda res: child) d.addCallback(lambda res: child)
return d return d
d.addCallback(_created) d.addCallback(_created)
return d return d
def move_child_to(self, current_child_name, new_parent, def move_child_to(self, current_child_name, new_parent,
new_child_name=None, wait_for_numpeers=None): new_child_name=None):
"""I take one of my children and move them to a new parent. The child """I take one of my children and move them to a new parent. The child
is referenced by name. On the new parent, the child will live under is referenced by name. On the new parent, the child will live under
'new_child_name', which defaults to 'current_child_name'. I return a 'new_child_name', which defaults to 'current_child_name'. I return a
@ -328,8 +327,7 @@ class NewDirectoryNode:
new_child_name = current_child_name new_child_name = current_child_name
d = self.get(current_child_name) d = self.get(current_child_name)
def sn(child): def sn(child):
return new_parent.set_node(new_child_name, child, return new_parent.set_node(new_child_name, child)
wait_for_numpeers=wait_for_numpeers)
d.addCallback(sn) d.addCallback(sn)
d.addCallback(lambda child: self.delete(current_child_name)) d.addCallback(lambda child: self.delete(current_child_name))
return d return d

View File

@ -452,7 +452,7 @@ class IMutableFileNode(IFileNode, IMutableFilesystemNode):
more advanced API will signal and provide access to the multiple more advanced API will signal and provide access to the multiple
heads.""" heads."""
def replace(newdata, wait_for_numpeers=None): def replace(newdata):
"""Replace the old contents with the new data. Returns a Deferred """Replace the old contents with the new data. Returns a Deferred
that fires (with None) when the operation is complete. that fires (with None) when the operation is complete.
@ -1087,7 +1087,7 @@ class IUploadable(Interface):
closed.""" closed."""
class IUploader(Interface): class IUploader(Interface):
def upload(uploadable, wait_for_numpeers=None): def upload(uploadable):
"""Upload the file. 'uploadable' must impement IUploadable. This """Upload the file. 'uploadable' must impement IUploadable. This
returns a Deferred which fires with the URI of the file.""" returns a Deferred which fires with the URI of the file."""
@ -1164,27 +1164,21 @@ class IChecker(Interface):
""" """
class IClient(Interface): class IClient(Interface):
def upload(uploadable, wait_for_numpeers=None): def upload(uploadable):
"""Upload some data into a CHK, get back the URI string for it. """Upload some data into a CHK, get back the URI string for it.
@param uploadable: something that implements IUploadable @param uploadable: something that implements IUploadable
@param wait_for_numpeers: don't upload anything until we have at least
this many peers connected
@return: a Deferred that fires with the (string) URI for this file. @return: a Deferred that fires with the (string) URI for this file.
""" """
def create_mutable_file(contents="", wait_for_numpeers=None): def create_mutable_file(contents=""):
"""Create a new mutable file with contents, get back the URI string. """Create a new mutable file with contents, get back the URI string.
@param contents: the initial contents to place in the file. @param contents: the initial contents to place in the file.
@param wait_for_numpeers: don't upload anything until we have at least
this many peers connected
@return: a Deferred that fires with tne (string) SSK URI for the new @return: a Deferred that fires with tne (string) SSK URI for the new
file. file.
""" """
def create_empty_dirnode(wait_for_numpeers=None): def create_empty_dirnode():
"""Create a new dirnode, empty and unattached. """Create a new dirnode, empty and unattached.
@param wait_for_numpeers: don't create anything until we have at least
this many peers connected.
@return: a Deferred that fires with the new IDirectoryNode instance. @return: a Deferred that fires with the new IDirectoryNode instance.
""" """

View File

@ -728,28 +728,13 @@ class Publish:
num = log.err(*args, **kwargs) num = log.err(*args, **kwargs)
return num return num
def publish(self, newdata, wait_for_numpeers=None): def publish(self, newdata):
"""Publish the filenode's current contents. Returns a Deferred that """Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever fires (with None) when the publish has done as much work as it's ever
going to do, or errbacks with ConsistencyError if it detects a going to do, or errbacks with ConsistencyError if it detects a
simultaneous write. simultaneous write.
It will wait until at least wait_for_numpeers peers are connected
before it starts uploading
If wait_for_numpeers is None then it will be set to a default value
(currently 1).
""" """
if wait_for_numpeers is None:
wait_for_numpeers = 1
self.log("starting publish")
d = self._node._client.introducer_client.when_enough_peers(wait_for_numpeers)
d.addCallback(lambda dummy: self._after_enough_peers(newdata))
return d
def _after_enough_peers(self, newdata):
# 1: generate shares (SDMF: files are small, so we can do it in RAM) # 1: generate shares (SDMF: files are small, so we can do it in RAM)
# 2: perform peer selection, get candidate servers # 2: perform peer selection, get candidate servers
# 2a: send queries to n+epsilon servers, to determine current shares # 2a: send queries to n+epsilon servers, to determine current shares
@ -759,7 +744,7 @@ class Publish:
# 4a: may need to run recovery algorithm # 4a: may need to run recovery algorithm
# 5: when enough responses are back, we're done # 5: when enough responses are back, we're done
self.log("got enough peers, datalen is %s" % len(newdata)) self.log("starting publish, datalen is %s" % len(newdata))
self._writekey = self._node.get_writekey() self._writekey = self._node.get_writekey()
assert self._writekey, "need write capability to publish" assert self._writekey, "need write capability to publish"
@ -1355,7 +1340,7 @@ class MutableFileNode:
self._encprivkey = None self._encprivkey = None
return self return self
def create(self, initial_contents, wait_for_numpeers=None): def create(self, initial_contents):
"""Call this when the filenode is first created. This will generate """Call this when the filenode is first created. This will generate
the keys, generate the initial shares, wait until at least numpeers the keys, generate the initial shares, wait until at least numpeers
are connected, allocate shares, and upload the initial are connected, allocate shares, and upload the initial
@ -1379,7 +1364,7 @@ class MutableFileNode:
# nobody knows about us yet" # nobody knows about us yet"
self._current_seqnum = 0 self._current_seqnum = 0
self._current_roothash = "\x00"*32 self._current_roothash = "\x00"*32
return self._publish(initial_contents, wait_for_numpeers=wait_for_numpeers) return self._publish(initial_contents)
d.addCallback(_generated) d.addCallback(_generated)
return d return d
@ -1389,9 +1374,9 @@ class MutableFileNode:
verifier = signer.get_verifying_key() verifier = signer.get_verifying_key()
return verifier, signer return verifier, signer
def _publish(self, initial_contents, wait_for_numpeers): def _publish(self, initial_contents):
p = self.publish_class(self) p = self.publish_class(self)
d = p.publish(initial_contents, wait_for_numpeers=wait_for_numpeers) d = p.publish(initial_contents)
d.addCallback(lambda res: self) d.addCallback(lambda res: self)
return d return d
@ -1511,8 +1496,8 @@ class MutableFileNode:
r = Retrieve(self) r = Retrieve(self)
return r.retrieve() return r.retrieve()
def replace(self, newdata, wait_for_numpeers=None): def replace(self, newdata):
r = Retrieve(self) r = Retrieve(self)
d = r.retrieve() d = r.retrieve()
d.addCallback(lambda res: self._publish(newdata, wait_for_numpeers=wait_for_numpeers)) d.addCallback(lambda res: self._publish(newdata))
return d return d

View File

@ -21,7 +21,6 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
self._log_number = self._helper.log("CHKUploadHelper starting") self._log_number = self._helper.log("CHKUploadHelper starting")
self._client = helper.parent self._client = helper.parent
self._wait_for_numpeers = None
self._options = {} self._options = {}
self.set_params( (3,7,10) ) # GACK self.set_params( (3,7,10) ) # GACK

View File

@ -75,7 +75,7 @@ class FakeMutableFileNode:
self.client = client self.client = client
self.my_uri = make_mutable_file_uri() self.my_uri = make_mutable_file_uri()
self.storage_index = self.my_uri.storage_index self.storage_index = self.my_uri.storage_index
def create(self, initial_contents, wait_for_numpeers=None): def create(self, initial_contents):
self.all_contents[self.storage_index] = initial_contents self.all_contents[self.storage_index] = initial_contents
return defer.succeed(self) return defer.succeed(self)
def init_from_uri(self, myuri): def init_from_uri(self, myuri):
@ -97,7 +97,7 @@ class FakeMutableFileNode:
def get_size(self): def get_size(self):
return "?" # TODO: see mutable.MutableFileNode.get_size return "?" # TODO: see mutable.MutableFileNode.get_size
def replace(self, new_contents, wait_for_numpeers=None): def replace(self, new_contents):
assert not self.is_readonly() assert not self.is_readonly()
self.all_contents[self.storage_index] = new_contents self.all_contents[self.storage_index] = new_contents
return defer.succeed(None) return defer.succeed(None)

View File

@ -38,7 +38,7 @@ class Marker:
class FakeClient: class FakeClient:
implements(IClient) implements(IClient)
def upload(self, uploadable, wait_for_numpeers): def upload(self, uploadable):
d = uploadable.get_size() d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size)) d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav): def _got_data(datav):
@ -55,9 +55,9 @@ class FakeClient:
return FakeDirectoryNode(self).init_from_uri(u) return FakeDirectoryNode(self).init_from_uri(u)
return Marker(u.to_string()) return Marker(u.to_string())
def create_empty_dirnode(self, wait_for_numpeers): def create_empty_dirnode(self):
n = FakeDirectoryNode(self) n = FakeDirectoryNode(self)
d = n.create(wait_for_numpeers) d = n.create()
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
@ -67,7 +67,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
self.client = FakeClient() self.client = FakeClient()
def test_basic(self): def test_basic(self):
d = self.client.create_empty_dirnode(0) d = self.client.create_empty_dirnode()
def _done(res): def _done(res):
self.failUnless(isinstance(res, FakeDirectoryNode)) self.failUnless(isinstance(res, FakeDirectoryNode))
rep = str(res) rep = str(res)
@ -76,7 +76,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
return d return d
def test_corrupt(self): def test_corrupt(self):
d = self.client.create_empty_dirnode(0) d = self.client.create_empty_dirnode()
def _created(dn): def _created(dn):
u = make_mutable_file_uri() u = make_mutable_file_uri()
d = dn.set_uri("child", u) d = dn.set_uri("child", u)
@ -108,7 +108,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
return d return d
def test_check(self): def test_check(self):
d = self.client.create_empty_dirnode(0) d = self.client.create_empty_dirnode()
d.addCallback(lambda dn: dn.check()) d.addCallback(lambda dn: dn.check())
def _done(res): def _done(res):
pass pass
@ -120,7 +120,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
filenode = self.client.create_node_from_uri(fileuri) filenode = self.client.create_node_from_uri(fileuri)
uploadable = upload.Data("some data") uploadable = upload.Data("some data")
d = self.client.create_empty_dirnode(0) d = self.client.create_empty_dirnode()
def _created(rw_dn): def _created(rw_dn):
d2 = rw_dn.set_uri("child", fileuri) d2 = rw_dn.set_uri("child", fileuri)
d2.addCallback(lambda res: rw_dn) d2.addCallback(lambda res: rw_dn)
@ -157,7 +157,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
def test_create(self): def test_create(self):
self.expected_manifest = [] self.expected_manifest = []
d = self.client.create_empty_dirnode(wait_for_numpeers=1) d = self.client.create_empty_dirnode()
def _then(n): def _then(n):
self.failUnless(n.is_mutable()) self.failUnless(n.is_mutable())
u = n.get_uri() u = n.get_uri()
@ -180,7 +180,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
self.expected_manifest.append(ffu_v) self.expected_manifest.append(ffu_v)
d.addCallback(lambda res: n.set_uri("child", fake_file_uri)) d.addCallback(lambda res: n.set_uri("child", fake_file_uri))
d.addCallback(lambda res: n.create_empty_directory("subdir", wait_for_numpeers=1)) d.addCallback(lambda res: n.create_empty_directory("subdir"))
def _created(subdir): def _created(subdir):
self.failUnless(isinstance(subdir, FakeDirectoryNode)) self.failUnless(isinstance(subdir, FakeDirectoryNode))
self.subdir = subdir self.subdir = subdir
@ -201,7 +201,7 @@ class Dirnode(unittest.TestCase, testutil.ShouldFailMixin):
d.addCallback(_check_manifest) d.addCallback(_check_manifest)
def _add_subsubdir(res): def _add_subsubdir(res):
return self.subdir.create_empty_directory("subsubdir", wait_for_numpeers=1) return self.subdir.create_empty_directory("subsubdir")
d.addCallback(_add_subsubdir) d.addCallback(_add_subsubdir)
d.addCallback(lambda res: n.get_child_at_path("subdir/subsubdir")) d.addCallback(lambda res: n.get_child_at_path("subdir/subsubdir"))
d.addCallback(lambda subsubdir: d.addCallback(lambda subsubdir:

View File

@ -19,8 +19,8 @@ class FakeFilenode(mutable.MutableFileNode):
all_contents = {} all_contents = {}
all_rw_friends = {} all_rw_friends = {}
def create(self, initial_contents, wait_for_numpeers=None): def create(self, initial_contents):
d = mutable.MutableFileNode.create(self, initial_contents, wait_for_numpeers=None) d = mutable.MutableFileNode.create(self, initial_contents)
def _then(res): def _then(res):
self.all_contents[self.get_uri()] = initial_contents self.all_contents[self.get_uri()] = initial_contents
return res return res
@ -32,7 +32,7 @@ class FakeFilenode(mutable.MutableFileNode):
def _generate_pubprivkeys(self): def _generate_pubprivkeys(self):
count = self.counter.next() count = self.counter.next()
return FakePubKey(count), FakePrivKey(count) return FakePubKey(count), FakePrivKey(count)
def _publish(self, initial_contents, wait_for_numpeers): def _publish(self, initial_contents):
self.all_contents[self.get_uri()] = initial_contents self.all_contents[self.get_uri()] = initial_contents
return defer.succeed(self) return defer.succeed(self)
@ -42,7 +42,7 @@ class FakeFilenode(mutable.MutableFileNode):
return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]]) return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
else: else:
return defer.succeed(self.all_contents[self.get_uri()]) return defer.succeed(self.all_contents[self.get_uri()])
def replace(self, newdata, wait_for_numpeers=None): def replace(self, newdata):
self.all_contents[self.get_uri()] = newdata self.all_contents[self.get_uri()] = newdata
return defer.succeed(None) return defer.succeed(None)
@ -69,16 +69,11 @@ class FakePublish(mutable.Publish):
class FakeNewDirectoryNode(dirnode.NewDirectoryNode): class FakeNewDirectoryNode(dirnode.NewDirectoryNode):
filenode_class = FakeFilenode filenode_class = FakeFilenode
class FakeIntroducerClient:
def when_enough_peers(self, numpeers):
return defer.succeed(None)
class FakeClient: class FakeClient:
def __init__(self, num_peers=10): def __init__(self, num_peers=10):
self._num_peers = num_peers self._num_peers = num_peers
self._peerids = [tagged_hash("peerid", "%d" % i)[:20] self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)] for i in range(self._num_peers)]
self.introducer_client = FakeIntroducerClient()
self.nodeid = "fakenodeid" self.nodeid = "fakenodeid"
def log(self, msg, **kw): def log(self, msg, **kw):
@ -89,18 +84,18 @@ class FakeClient:
def get_cancel_secret(self): def get_cancel_secret(self):
return "I hereby permit you to cancel my leases" return "I hereby permit you to cancel my leases"
def create_empty_dirnode(self, wait_for_numpeers): def create_empty_dirnode(self):
n = FakeNewDirectoryNode(self) n = FakeNewDirectoryNode(self)
d = n.create(wait_for_numpeers=wait_for_numpeers) d = n.create()
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
def create_dirnode_from_uri(self, u): def create_dirnode_from_uri(self, u):
return FakeNewDirectoryNode(self).init_from_uri(u) return FakeNewDirectoryNode(self).init_from_uri(u)
def create_mutable_file(self, contents="", wait_for_numpeers=None): def create_mutable_file(self, contents=""):
n = FakeFilenode(self) n = FakeFilenode(self)
d = n.create(contents, wait_for_numpeers=wait_for_numpeers) d = n.create(contents)
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
@ -131,7 +126,7 @@ class FakeClient:
results.sort() results.sort()
return results return results
def upload(self, uploadable, wait_for_numpeers=None): def upload(self, uploadable):
assert IUploadable.providedBy(uploadable) assert IUploadable.providedBy(uploadable)
d = uploadable.get_size() d = uploadable.get_size()
d.addCallback(lambda length: uploadable.read(length)) d.addCallback(lambda length: uploadable.read(length))
@ -148,7 +143,7 @@ class Filenode(unittest.TestCase):
self.client = FakeClient() self.client = FakeClient()
def test_create(self): def test_create(self):
d = self.client.create_mutable_file(wait_for_numpeers=1) d = self.client.create_mutable_file()
def _created(n): def _created(n):
d = n.replace("contents 1") d = n.replace("contents 1")
d.addCallback(lambda res: self.failUnlessIdentical(res, None)) d.addCallback(lambda res: self.failUnlessIdentical(res, None))
@ -181,7 +176,7 @@ class Publish(unittest.TestCase):
# .create usually returns a Deferred, but we happen to know it's # .create usually returns a Deferred, but we happen to know it's
# synchronous # synchronous
CONTENTS = "some initial contents" CONTENTS = "some initial contents"
fn.create(CONTENTS, wait_for_numpeers=1) fn.create(CONTENTS)
p = mutable.Publish(fn) p = mutable.Publish(fn)
target_info = None target_info = None
d = defer.maybeDeferred(p._encrypt_and_encode, target_info, d = defer.maybeDeferred(p._encrypt_and_encode, target_info,
@ -208,7 +203,7 @@ class Publish(unittest.TestCase):
# .create usually returns a Deferred, but we happen to know it's # .create usually returns a Deferred, but we happen to know it's
# synchronous # synchronous
CONTENTS = "some initial contents" CONTENTS = "some initial contents"
fn.create(CONTENTS, wait_for_numpeers=1) fn.create(CONTENTS)
p = mutable.Publish(fn) p = mutable.Publish(fn)
r = mutable.Retrieve(fn) r = mutable.Retrieve(fn)
# make some fake shares # make some fake shares

View File

@ -351,7 +351,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _create_mutable(res): def _create_mutable(res):
c = self.clients[0] c = self.clients[0]
log.msg("starting create_mutable_file") log.msg("starting create_mutable_file")
d1 = c.create_mutable_file(DATA, wait_for_numpeers=self.numclients) d1 = c.create_mutable_file(DATA)
def _done(res): def _done(res):
log.msg("DONE: %s" % (res,)) log.msg("DONE: %s" % (res,))
self._mutable_node_1 = res self._mutable_node_1 = res
@ -444,7 +444,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
self.failUnlessEqual(res, DATA) self.failUnlessEqual(res, DATA)
# replace the data # replace the data
log.msg("starting replace1") log.msg("starting replace1")
d1 = newnode.replace(NEWDATA, wait_for_numpeers=self.numclients) d1 = newnode.replace(NEWDATA)
d1.addCallback(lambda res: newnode.download_to_data()) d1.addCallback(lambda res: newnode.download_to_data())
return d1 return d1
d.addCallback(_check_download_3) d.addCallback(_check_download_3)
@ -458,7 +458,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
newnode2 = self.clients[3].create_node_from_uri(uri) newnode2 = self.clients[3].create_node_from_uri(uri)
self._newnode3 = self.clients[3].create_node_from_uri(uri) self._newnode3 = self.clients[3].create_node_from_uri(uri)
log.msg("starting replace2") log.msg("starting replace2")
d1 = newnode1.replace(NEWERDATA, wait_for_numpeers=self.numclients) d1 = newnode1.replace(NEWERDATA)
d1.addCallback(lambda res: newnode2.download_to_data()) d1.addCallback(lambda res: newnode2.download_to_data())
return d1 return d1
d.addCallback(_check_download_4) d.addCallback(_check_download_4)
@ -528,20 +528,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _check_empty_file(res): def _check_empty_file(res):
# make sure we can create empty files, this usually screws up the # make sure we can create empty files, this usually screws up the
# segsize math # segsize math
d1 = self.clients[2].create_mutable_file("", wait_for_numpeers=self.numclients) d1 = self.clients[2].create_mutable_file("")
d1.addCallback(lambda newnode: newnode.download_to_data()) d1.addCallback(lambda newnode: newnode.download_to_data())
d1.addCallback(lambda res: self.failUnlessEqual("", res)) d1.addCallback(lambda res: self.failUnlessEqual("", res))
return d1 return d1
d.addCallback(_check_empty_file) d.addCallback(_check_empty_file)
d.addCallback(lambda res: self.clients[0].create_empty_dirnode(wait_for_numpeers=self.numclients)) d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
def _created_dirnode(dnode): def _created_dirnode(dnode):
log.msg("_created_dirnode(%s)" % (dnode,)) log.msg("_created_dirnode(%s)" % (dnode,))
d1 = dnode.list() d1 = dnode.list()
d1.addCallback(lambda children: self.failUnlessEqual(children, {})) d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
d1.addCallback(lambda res: dnode.has_child("edgar")) d1.addCallback(lambda res: dnode.has_child("edgar"))
d1.addCallback(lambda answer: self.failUnlessEqual(answer, False)) d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
d1.addCallback(lambda res: dnode.set_node("see recursive", dnode, wait_for_numpeers=self.numclients)) d1.addCallback(lambda res: dnode.set_node("see recursive", dnode))
d1.addCallback(lambda res: dnode.has_child("see recursive")) d1.addCallback(lambda res: dnode.has_child("see recursive"))
d1.addCallback(lambda answer: self.failUnlessEqual(answer, True)) d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
d1.addCallback(lambda res: dnode.build_manifest()) d1.addCallback(lambda res: dnode.build_manifest())
@ -624,15 +624,15 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _do_publish1(self, res): def _do_publish1(self, res):
ut = upload.Data(self.data) ut = upload.Data(self.data)
c0 = self.clients[0] c0 = self.clients[0]
d = c0.create_empty_dirnode(wait_for_numpeers=self.numclients) d = c0.create_empty_dirnode()
def _made_root(new_dirnode): def _made_root(new_dirnode):
self._root_directory_uri = new_dirnode.get_uri() self._root_directory_uri = new_dirnode.get_uri()
return c0.create_node_from_uri(self._root_directory_uri) return c0.create_node_from_uri(self._root_directory_uri)
d.addCallback(_made_root) d.addCallback(_made_root)
d.addCallback(lambda root: root.create_empty_directory("subdir1", wait_for_numpeers=self.numclients)) d.addCallback(lambda root: root.create_empty_directory("subdir1"))
def _made_subdir1(subdir1_node): def _made_subdir1(subdir1_node):
self._subdir1_node = subdir1_node self._subdir1_node = subdir1_node
d1 = subdir1_node.add_file("mydata567", ut, wait_for_numpeers=self.numclients) d1 = subdir1_node.add_file("mydata567", ut)
d1.addCallback(self.log, "publish finished") d1.addCallback(self.log, "publish finished")
def _stash_uri(filenode): def _stash_uri(filenode):
self.uri = filenode.get_uri() self.uri = filenode.get_uri()
@ -643,8 +643,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _do_publish2(self, res): def _do_publish2(self, res):
ut = upload.Data(self.data) ut = upload.Data(self.data)
d = self._subdir1_node.create_empty_directory("subdir2", wait_for_numpeers=self.numclients) d = self._subdir1_node.create_empty_directory("subdir2")
d.addCallback(lambda subdir2: subdir2.add_file("mydata992", ut, wait_for_numpeers=self.numclients)) d.addCallback(lambda subdir2: subdir2.add_file("mydata992", ut))
return d return d
def _bounce_client0(self, res): def _bounce_client0(self, res):
@ -686,18 +686,18 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _do_publish_private(self, res): def _do_publish_private(self, res):
self.smalldata = "sssh, very secret stuff" self.smalldata = "sssh, very secret stuff"
ut = upload.Data(self.smalldata) ut = upload.Data(self.smalldata)
d = self.clients[0].create_empty_dirnode(wait_for_numpeers=self.numclients) d = self.clients[0].create_empty_dirnode()
d.addCallback(self.log, "GOT private directory") d.addCallback(self.log, "GOT private directory")
def _got_new_dir(privnode): def _got_new_dir(privnode):
rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri) rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
d1 = privnode.create_empty_directory("personal", wait_for_numpeers=self.numclients) d1 = privnode.create_empty_directory("personal")
d1.addCallback(self.log, "made P/personal") d1.addCallback(self.log, "made P/personal")
d1.addCallback(lambda node: node.add_file("sekrit data", ut, wait_for_numpeers=self.numclients)) d1.addCallback(lambda node: node.add_file("sekrit data", ut))
d1.addCallback(self.log, "made P/personal/sekrit data") d1.addCallback(self.log, "made P/personal/sekrit data")
d1.addCallback(lambda res: rootnode.get_child_at_path(["subdir1", "subdir2"])) d1.addCallback(lambda res: rootnode.get_child_at_path(["subdir1", "subdir2"]))
def _got_s2(s2node): def _got_s2(s2node):
d2 = privnode.set_uri("s2-rw", s2node.get_uri(), wait_for_numpeers=self.numclients) d2 = privnode.set_uri("s2-rw", s2node.get_uri())
d2.addCallback(lambda node: privnode.set_uri("s2-ro", s2node.get_readonly_uri(), wait_for_numpeers=self.numclients)) d2.addCallback(lambda node: privnode.set_uri("s2-ro", s2node.get_readonly_uri()))
return d2 return d2
d1.addCallback(_got_s2) d1.addCallback(_got_s2)
d1.addCallback(lambda res: privnode) d1.addCallback(lambda res: privnode)

View File

@ -131,15 +131,10 @@ class FakeBucketWriter:
precondition(not self.closed) precondition(not self.closed)
self.closed = True self.closed = True
class FakeIntroducerClient:
def when_enough_peers(self, numpeers):
return defer.succeed(None)
class FakeClient: class FakeClient:
def __init__(self, mode="good", num_servers=50): def __init__(self, mode="good", num_servers=50):
self.mode = mode self.mode = mode
self.num_servers = num_servers self.num_servers = num_servers
self.introducer_client = FakeIntroducerClient()
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
pass pass
def get_permuted_peers(self, storage_index, include_myself): def get_permuted_peers(self, storage_index, include_myself):

View File

@ -38,17 +38,17 @@ class FakeClient(service.MultiService):
assert IMutableFileURI.providedBy(u), u assert IMutableFileURI.providedBy(u), u
return FakeMutableFileNode(self).init_from_uri(u) return FakeMutableFileNode(self).init_from_uri(u)
def create_empty_dirnode(self, wait_for_numpeers=None): def create_empty_dirnode(self):
n = NonGridDirectoryNode(self) n = NonGridDirectoryNode(self)
d = n.create(wait_for_numpeers) d = n.create()
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
def create_mutable_file(self, contents="", wait_for_numpeers=None): def create_mutable_file(self, contents=""):
n = FakeMutableFileNode(self) n = FakeMutableFileNode(self)
return n.create(contents) return n.create(contents)
def upload(self, uploadable, wait_for_numpeers=None): def upload(self, uploadable):
d = uploadable.get_size() d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size)) d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav): def _got_data(datav):

View File

@ -419,10 +419,8 @@ class EncryptAnUploadable:
class CHKUploader: class CHKUploader:
peer_selector_class = Tahoe2PeerSelector peer_selector_class = Tahoe2PeerSelector
def __init__(self, client, options={}, wait_for_numpeers=None): def __init__(self, client, options={}):
assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
self._client = client self._client = client
self._wait_for_numpeers = wait_for_numpeers
self._options = options self._options = options
self._log_number = self._client.log("CHKUploader starting") self._log_number = self._client.log("CHKUploader starting")
@ -525,7 +523,7 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
class LiteralUploader: class LiteralUploader:
def __init__(self, client, wait_for_numpeers, options={}): def __init__(self, client, options={}):
self._client = client self._client = client
self._options = options self._options = options
@ -738,8 +736,7 @@ class Uploader(service.MultiService):
def _got_helper(self, helper): def _got_helper(self, helper):
self._helper = helper self._helper = helper
def upload(self, uploadable, options={}, wait_for_numpeers=None): def upload(self, uploadable, options={}):
assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
# this returns the URI # this returns the URI
assert self.parent assert self.parent
assert self.running assert self.running
@ -751,13 +748,11 @@ class Uploader(service.MultiService):
d = uploadable.get_size() d = uploadable.get_size()
def _got_size(size): def _got_size(size):
if size <= self.URI_LIT_SIZE_THRESHOLD: if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader(self.parent, options, uploader = LiteralUploader(self.parent, options)
wait_for_numpeers)
elif self._helper: elif self._helper:
uploader = AssistedUploader(self._helper, options) uploader = AssistedUploader(self._helper, options)
else: else:
uploader = self.uploader_class(self.parent, options, uploader = self.uploader_class(self.parent, options)
wait_for_numpeers)
uploader.set_params(self.parent.get_encoding_parameters() uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS) or self.DEFAULT_ENCODING_PARAMETERS)
return uploader.start(uploadable) return uploader.start(uploadable)