diff --git a/src/allmydata/client.py b/src/allmydata/client.py index fc7b092b0..21c66f3ae 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -150,6 +150,11 @@ class Client(node.Node, Referenceable): results.sort() return results + def get_encoding_parameters(self): + if not self.introducer_client: + return None + return self.introducer_client.encoding_parameters + def connected_to_introducer(self): if self.introducer_client: return self.introducer_client.connected_to_introducer() diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py index 324e6a18d..6ef84a34b 100644 --- a/src/allmydata/codec.py +++ b/src/allmydata/codec.py @@ -113,9 +113,13 @@ class CRSDecoder(object): return self.required_shares def decode(self, some_shares, their_shareids): - precondition(len(some_shares) == len(their_shareids), len(some_shares), len(their_shareids)) - precondition(len(some_shares) == self.required_shares, len(some_shares), self.required_shares) - return defer.succeed(self.decoder.decode(some_shares, [int(s) for s in their_shareids])) + precondition(len(some_shares) == len(their_shareids), + len(some_shares), len(their_shareids)) + precondition(len(some_shares) == self.required_shares, + len(some_shares), self.required_shares) + data = self.decoder.decode(some_shares, + [int(s) for s in their_shareids]) + return defer.succeed(data) all_encoders = { diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index feb78992f..830b5b63b 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -88,6 +88,12 @@ class Encoder(object): self.TOTAL_SHARES = n self.uri_extension_data = {} + def set_params(self, encoding_parameters): + k,d,n = encoding_parameters + self.NEEDED_SHARES = k + self.SHARES_OF_HAPPINESS = d + self.TOTAL_SHARES = n + def setup(self, infile, encryption_key): self.infile = infile assert isinstance(encryption_key, str) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 1f95fa829..8d0afac25 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -20,6 +20,22 @@ URIExtensionData = StringConstraint(1000) class RIIntroducerClient(RemoteInterface): def new_peers(furls=SetOf(FURL)): return None + def set_encoding_parameters(parameters=(int, int, int)): + """Advise the client of the recommended k-of-n encoding parameters + for this grid. 'parameters' is a tuple of (k, desired, n), where 'n' + is the total number of shares that will be created for any given + file, while 'k' is the number of shares that must be retrieved to + recover that file, and 'desired' is the minimum number of shares that + must be placed before the uploader will consider its job a success. + n/k is the expansion ratio, while k determines the robustness. + + Introducers should specify 'n' according to the expected size of the + grid (there is no point to producing more shares than there are + peers), and k according to the desired reliability-vs-overhead goals. + + Note that the current encoding technology requires k>=2. + """ + return None class RIIntroducer(RemoteInterface): def hello(node=RIIntroducerClient, furl=FURL): diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index ccda1e71b..668be165b 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -9,11 +9,16 @@ from allmydata.util import idlib, observer class Introducer(service.MultiService, Referenceable): implements(RIIntroducer) + name = "introducer" def __init__(self): service.MultiService.__init__(self) self.nodes = set() self.furls = set() + self._encoding_parameters = None + + def set_encoding_parameters(self, parameters): + self._encoding_parameters = parameters def remote_hello(self, node, furl): log.msg("introducer: new contact at %s, node is %s" % (furl, node)) @@ -24,6 +29,9 @@ class Introducer(service.MultiService, Referenceable): node.notifyOnDisconnect(_remove) self.furls.add(furl) node.callRemote("new_peers", self.furls) + if self._encoding_parameters is not None: + node.callRemote("set_encoding_parameters", + self._encoding_parameters) for othernode in self.nodes: othernode.callRemote("new_peers", set([furl])) self.nodes.add(node) @@ -42,6 +50,7 @@ class IntroducerClient(service.Service, Referenceable): self._connected = False self.connection_observers = observer.ObserverList() + self.encoding_parameters = None def startService(self): service.Service.startService(self) @@ -60,6 +69,9 @@ class IntroducerClient(service.Service, Referenceable): for furl in furls: self._new_peer(furl) + def remote_set_encoding_parameters(self, parameters): + self.encoding_parameters = parameters + def stopService(self): service.Service.stopService(self) self.introducer_reconnector.stopConnecting() diff --git a/src/allmydata/introducer_and_vdrive.py b/src/allmydata/introducer_and_vdrive.py index 65995be6a..27e838351 100644 --- a/src/allmydata/introducer_and_vdrive.py +++ b/src/allmydata/introducer_and_vdrive.py @@ -9,13 +9,17 @@ class IntroducerAndVdrive(node.Node): PORTNUMFILE = "introducer.port" NODETYPE = "introducer" VDRIVEDIR = "vdrive" + ENCODING_PARAMETERS_FILE = "encoding_parameters" + DEFAULT_K, DEFAULT_DESIRED, DEFAULT_N = 3, 7, 10 def __init__(self, basedir="."): node.Node.__init__(self, basedir) self.urls = {} + self.read_encoding_parameters() def tub_ready(self): - r = self.add_service(Introducer()) + i = Introducer() + r = self.add_service(i) self.urls["introducer"] = self.tub.registerReference(r, "introducer") self.log(" introducer is at %s" % self.urls["introducer"]) f = open(os.path.join(self.basedir, "introducer.furl"), "w") @@ -32,3 +36,17 @@ class IntroducerAndVdrive(node.Node): f.write(self.urls["vdrive"] + "\n") f.close() + encoding_parameters = self.read_encoding_parameters() + i.set_encoding_parameters(encoding_parameters) + + def read_encoding_parameters(self): + k, desired, n = self.DEFAULT_K, self.DEFAULT_DESIRED, self.DEFAULT_N + PARAM_FILE = os.path.join(self.basedir, self.ENCODING_PARAMETERS_FILE) + if os.path.exists(PARAM_FILE): + f = open(PARAM_FILE, "r") + data = f.read().strip() + f.close() + k,desired,n = data.split() + k = int(k); desired = int(desired); n = int(n) + return k, desired, n + diff --git a/src/allmydata/test/test_introducer_and_vdrive.py b/src/allmydata/test/test_introducer_and_vdrive.py index c85d20aee..75eea86de 100644 --- a/src/allmydata/test/test_introducer_and_vdrive.py +++ b/src/allmydata/test/test_introducer_and_vdrive.py @@ -1,4 +1,5 @@ +import os from twisted.trial import unittest from foolscap.eventual import fireEventually, flushEventualQueue @@ -7,9 +8,34 @@ from allmydata.util import testutil class Basic(testutil.SignalMixin, unittest.TestCase): def test_loadable(self): - q = introducer_and_vdrive.IntroducerAndVdrive() + basedir = "introducer_and_vdrive.Basic.test_loadable" + os.mkdir(basedir) + q = introducer_and_vdrive.IntroducerAndVdrive(basedir) d = fireEventually(None) d.addCallback(lambda res: q.startService()) + d.addCallback(lambda res: q.when_tub_ready()) + def _check_parameters(res): + i = q.getServiceNamed("introducer") + self.failUnlessEqual(i._encoding_parameters, (3, 7, 10)) + d.addCallback(_check_parameters) + d.addCallback(lambda res: q.stopService()) + d.addCallback(flushEventualQueue) + return d + + def test_set_parameters(self): + basedir = "introducer_and_vdrive.Basic.test_set_parameters" + os.mkdir(basedir) + f = open(os.path.join(basedir, "encoding_parameters"), "w") + f.write("25 75 100") + f.close() + q = introducer_and_vdrive.IntroducerAndVdrive(basedir) + d = fireEventually(None) + d.addCallback(lambda res: q.startService()) + d.addCallback(lambda res: q.when_tub_ready()) + def _check_parameters(res): + i = q.getServiceNamed("introducer") + self.failUnlessEqual(i._encoding_parameters, (25, 75, 100)) + d.addCallback(_check_parameters) d.addCallback(lambda res: q.stopService()) d.addCallback(flushEventualQueue) return d diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 066746cce..45bb9e6ab 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -573,8 +573,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): self.failUnless("size: %d\n" % len(self.data) in output) self.failUnless("num_segments: 1\n" in output) # segment_size is always a multiple of needed_shares - self.failUnless("segment_size: 125\n" in output) - self.failUnless("total_shares: 100\n" in output) + self.failUnless("segment_size: 114\n" in output) + self.failUnless("total_shares: 10\n" in output) # keys which are supposed to be present for key in ("size", "num_segments", "segment_size", "needed_shares", "total_shares", diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 43548831b..6b508513a 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -14,6 +14,8 @@ class FakeClient: def get_permuted_peers(self, storage_index): return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),) for fakeid in range(50) ] + def get_encoding_parameters(self): + return None DATA = """ Once upon a time, there was a beautiful princess named Buttercup. She lived diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 7b5a6b060..33afc0a23 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -65,7 +65,10 @@ class FileUploader: self._client = client self._options = options - def set_params(self, needed_shares, shares_of_happiness, total_shares): + def set_params(self, encoding_parameters): + self._encoding_parameters = encoding_parameters + + needed_shares, shares_of_happiness, total_shares = encoding_parameters self.needed_shares = needed_shares self.shares_of_happiness = shares_of_happiness self.total_shares = total_shares @@ -111,6 +114,7 @@ class FileUploader: def setup_encoder(self): self._encoder = encode.Encoder(self._options) + self._encoder.set_params(self._encoding_parameters) self._encoder.setup(self._filehandle, self._encryption_key) share_size = self._encoder.get_share_size() block_size = self._encoder.get_block_size() @@ -313,9 +317,12 @@ class Uploader(service.MultiService): uploader_class = FileUploader URI_LIT_SIZE_THRESHOLD = 55 - needed_shares = 25 # Number of shares required to reconstruct a file. - desired_shares = 75 # We will abort an upload unless we can allocate space for at least this many. - total_shares = 100 # Total number of shares created by encoding. If everybody has room then this is is how many we will upload. + DEFAULT_ENCODING_PARAMETERS = (25, 75, 100) + # this is a tuple of (needed, desired, total). 'needed' is the number of + # shares required to reconstruct a file. 'desired' means that we will + # abort an upload unless we can allocate space for at least this many. + # 'total' is the total number of shares created by encoding. If everybody + # has room then this is is how many we will upload. def compute_id_strings(self, f): # return a list of (plaintext_hash, encryptionkey, crypttext_hash) @@ -366,8 +373,10 @@ class Uploader(service.MultiService): else: u = self.uploader_class(self.parent, options) u.set_filehandle(fh) - u.set_params(self.needed_shares, self.desired_shares, - self.total_shares) + encoding_parameters = self.parent.get_encoding_parameters() + if not encoding_parameters: + encoding_parameters = self.DEFAULT_ENCODING_PARAMETERS + u.set_params(encoding_parameters) plaintext_hash, key, crypttext_hash = self.compute_id_strings(fh) u.set_encryption_key(key) u.set_id_strings(crypttext_hash, plaintext_hash)