mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-19 07:48:11 +00:00
#96: add flag to enable pushing data to ourselves, defaulting to False
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
|
|
||||||
import os, sha, stat, time, re
|
import os, sha, stat, time, re
|
||||||
|
from base64 import b32decode
|
||||||
from foolscap import Referenceable, SturdyRef
|
from foolscap import Referenceable, SturdyRef
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from allmydata.interfaces import RIClient
|
from allmydata.interfaces import RIClient
|
||||||
@ -29,6 +30,7 @@ class Client(node.Node, Referenceable):
|
|||||||
MY_FURL_FILE = "myself.furl"
|
MY_FURL_FILE = "myself.furl"
|
||||||
SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
|
SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
|
||||||
SIZELIMIT_FILE = "sizelimit"
|
SIZELIMIT_FILE = "sizelimit"
|
||||||
|
PUSH_TO_OURSELVES_FILE = "push_to_ourselves"
|
||||||
|
|
||||||
# we're pretty narrow-minded right now
|
# we're pretty narrow-minded right now
|
||||||
OLDEST_SUPPORTED_VERSION = allmydata.__version__
|
OLDEST_SUPPORTED_VERSION = allmydata.__version__
|
||||||
@ -38,6 +40,7 @@ class Client(node.Node, Referenceable):
|
|||||||
self.my_furl = None
|
self.my_furl = None
|
||||||
self.introducer_client = None
|
self.introducer_client = None
|
||||||
self.init_storage()
|
self.init_storage()
|
||||||
|
self.init_options()
|
||||||
self.add_service(Uploader())
|
self.add_service(Uploader())
|
||||||
self.add_service(Downloader())
|
self.add_service(Downloader())
|
||||||
self.add_service(VirtualDrive())
|
self.add_service(VirtualDrive())
|
||||||
@ -87,6 +90,12 @@ class Client(node.Node, Referenceable):
|
|||||||
no_storage = os.path.exists(NOSTORAGE_FILE)
|
no_storage = os.path.exists(NOSTORAGE_FILE)
|
||||||
self.add_service(StorageServer(storedir, sizelimit, no_storage))
|
self.add_service(StorageServer(storedir, sizelimit, no_storage))
|
||||||
|
|
||||||
|
def init_options(self):
|
||||||
|
self.push_to_ourselves = None
|
||||||
|
filename = os.path.join(self.basedir, self.PUSH_TO_OURSELVES_FILE)
|
||||||
|
if os.path.exists(filename):
|
||||||
|
self.push_to_ourselves = True
|
||||||
|
|
||||||
def _check_hotline(self, hotline_file):
|
def _check_hotline(self, hotline_file):
|
||||||
if os.path.exists(hotline_file):
|
if os.path.exists(hotline_file):
|
||||||
mtime = os.stat(hotline_file)[stat.ST_MTIME]
|
mtime = os.stat(hotline_file)[stat.ST_MTIME]
|
||||||
@ -141,18 +150,25 @@ class Client(node.Node, Referenceable):
|
|||||||
return []
|
return []
|
||||||
return self.introducer_client.get_all_peerids()
|
return self.introducer_client.get_all_peerids()
|
||||||
|
|
||||||
def get_permuted_peers(self, key):
|
def get_permuted_peers(self, key, include_myself=True):
|
||||||
"""
|
"""
|
||||||
@return: list of (permuted-peerid, peerid, connection,)
|
@return: list of (permuted-peerid, peerid, connection,)
|
||||||
"""
|
"""
|
||||||
results = []
|
results = []
|
||||||
|
myid = b32decode(self.tub.tubID.upper())
|
||||||
for peerid, connection in self.introducer_client.get_all_peers():
|
for peerid, connection in self.introducer_client.get_all_peers():
|
||||||
assert isinstance(peerid, str)
|
assert isinstance(peerid, str)
|
||||||
|
if not include_myself and peerid == myid:
|
||||||
|
self.log("get_permuted_peers: removing myself from the list")
|
||||||
|
continue
|
||||||
permuted = bytes_to_long(sha.new(key + peerid).digest())
|
permuted = bytes_to_long(sha.new(key + peerid).digest())
|
||||||
results.append((permuted, peerid, connection))
|
results.append((permuted, peerid, connection))
|
||||||
results.sort()
|
results.sort()
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def get_push_to_ourselves(self):
|
||||||
|
return self.push_to_ourselves
|
||||||
|
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
if not self.introducer_client:
|
if not self.introducer_client:
|
||||||
return None
|
return None
|
||||||
|
@ -112,8 +112,8 @@ class IntroducerClient(service.Service, Referenceable):
|
|||||||
self.log(" introducing ourselves: %s, %s" % (self, self.my_furl))
|
self.log(" introducing ourselves: %s, %s" % (self, self.my_furl))
|
||||||
self._connected = True
|
self._connected = True
|
||||||
d = introducer.callRemote("hello",
|
d = introducer.callRemote("hello",
|
||||||
node=self,
|
node=self,
|
||||||
furl=self.my_furl)
|
furl=self.my_furl)
|
||||||
introducer.notifyOnDisconnect(self._disconnected)
|
introducer.notifyOnDisconnect(self._disconnected)
|
||||||
|
|
||||||
def _disconnected(self):
|
def _disconnected(self):
|
||||||
|
@ -36,6 +36,8 @@ class Node(service.MultiService):
|
|||||||
self.tub = Tub(certFile=certfile)
|
self.tub = Tub(certFile=certfile)
|
||||||
self.tub.setOption("logLocalFailures", True)
|
self.tub.setOption("logLocalFailures", True)
|
||||||
self.tub.setOption("logRemoteFailures", True)
|
self.tub.setOption("logRemoteFailures", True)
|
||||||
|
# I think self.nodeid is kind of whacked. Shouldn't it equal the
|
||||||
|
# fingerprint portion of our furl?
|
||||||
self.nodeid = b32encode(self.tub.tubID).lower()
|
self.nodeid = b32encode(self.tub.tubID).lower()
|
||||||
f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
|
f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
|
||||||
f.write(b32encode(self.nodeid).lower() + "\n")
|
f.write(b32encode(self.nodeid).lower() + "\n")
|
||||||
|
@ -161,6 +161,10 @@ this file are ignored.
|
|||||||
f = open(os.path.join(clientdir, "debug_no_storage"), "w")
|
f = open(os.path.join(clientdir, "debug_no_storage"), "w")
|
||||||
f.write("no_storage\n")
|
f.write("no_storage\n")
|
||||||
f.close()
|
f.close()
|
||||||
|
if self.mode == "upload-self":
|
||||||
|
f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
|
||||||
|
f.write("push_to_ourselves\n")
|
||||||
|
f.close()
|
||||||
self.keepalive_file = os.path.join(clientdir,
|
self.keepalive_file = os.path.join(clientdir,
|
||||||
"suicide_prevention_hotline")
|
"suicide_prevention_hotline")
|
||||||
# now start updating the mtime.
|
# now start updating the mtime.
|
||||||
@ -273,7 +277,7 @@ this file are ignored.
|
|||||||
name = '%d' % size
|
name = '%d' % size
|
||||||
print
|
print
|
||||||
print "uploading %s" % name
|
print "uploading %s" % name
|
||||||
if self.mode == "upload":
|
if self.mode in ("upload", "upload-self"):
|
||||||
files[name] = self.create_data(name, size)
|
files[name] = self.create_data(name, size)
|
||||||
d = control.callRemote("upload_from_file_to_uri", files[name])
|
d = control.callRemote("upload_from_file_to_uri", files[name])
|
||||||
def _done(uri):
|
def _done(uri):
|
||||||
|
@ -129,9 +129,11 @@ class FakeBucketWriter:
|
|||||||
class FakeClient:
|
class FakeClient:
|
||||||
def __init__(self, mode="good"):
|
def __init__(self, mode="good"):
|
||||||
self.mode = mode
|
self.mode = mode
|
||||||
def get_permuted_peers(self, storage_index):
|
def get_permuted_peers(self, storage_index, include_myself):
|
||||||
return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
|
return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
|
||||||
for fakeid in range(50) ]
|
for fakeid in range(50) ]
|
||||||
|
def get_push_to_ourselves(self):
|
||||||
|
return None
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -88,7 +88,8 @@ class Tahoe3PeerSelector:
|
|||||||
|
|
||||||
def get_shareholders(self, client,
|
def get_shareholders(self, client,
|
||||||
storage_index, share_size, block_size,
|
storage_index, share_size, block_size,
|
||||||
num_segments, total_shares, shares_of_happiness):
|
num_segments, total_shares, shares_of_happiness,
|
||||||
|
push_to_ourselves):
|
||||||
"""
|
"""
|
||||||
@return: a set of PeerTracker instances that have agreed to hold some
|
@return: a set of PeerTracker instances that have agreed to hold some
|
||||||
shares for us
|
shares for us
|
||||||
@ -99,8 +100,9 @@ class Tahoe3PeerSelector:
|
|||||||
|
|
||||||
# we are responsible for locating the shareholders. self._encoder is
|
# we are responsible for locating the shareholders. self._encoder is
|
||||||
# responsible for handling the data and sending out the shares.
|
# responsible for handling the data and sending out the shares.
|
||||||
peers = client.get_permuted_peers(storage_index)
|
peers = client.get_permuted_peers(storage_index, push_to_ourselves)
|
||||||
assert peers
|
|
||||||
|
assert peers, "peer selection left us with zero peers for our data"
|
||||||
|
|
||||||
# this needed_hashes computation should mirror
|
# this needed_hashes computation should mirror
|
||||||
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
||||||
@ -398,10 +400,11 @@ class CHKUploader:
|
|||||||
block_size = encoder.get_param("block_size")
|
block_size = encoder.get_param("block_size")
|
||||||
num_segments = encoder.get_param("num_segments")
|
num_segments = encoder.get_param("num_segments")
|
||||||
k,desired,n = encoder.get_param("share_counts")
|
k,desired,n = encoder.get_param("share_counts")
|
||||||
|
push_to_ourselves = self._options.get("push_to_ourselves", False)
|
||||||
|
|
||||||
gs = peer_selector.get_shareholders
|
gs = peer_selector.get_shareholders
|
||||||
d = gs(self._client, storage_index, share_size, block_size,
|
d = gs(self._client, storage_index, share_size, block_size,
|
||||||
num_segments, n, desired)
|
num_segments, n, desired, push_to_ourselves)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def set_shareholders(self, used_peers, encoder):
|
def set_shareholders(self, used_peers, encoder):
|
||||||
@ -554,6 +557,10 @@ class Uploader(service.MultiService):
|
|||||||
# this returns the URI
|
# this returns the URI
|
||||||
assert self.parent
|
assert self.parent
|
||||||
assert self.running
|
assert self.running
|
||||||
|
push_to_ourselves = self.parent.get_push_to_ourselves()
|
||||||
|
if push_to_ourselves is not None:
|
||||||
|
options["push_to_ourselves"] = push_to_ourselves
|
||||||
|
|
||||||
uploadable = IUploadable(uploadable)
|
uploadable = IUploadable(uploadable)
|
||||||
d = uploadable.get_size()
|
d = uploadable.get_size()
|
||||||
def _got_size(size):
|
def _got_size(size):
|
||||||
|
Reference in New Issue
Block a user