2009-01-12 18:00:22 +00:00
|
|
|
import copy, os.path, stat
|
2008-10-28 20:41:04 +00:00
|
|
|
from cStringIO import StringIO
|
2007-12-03 21:52:42 +00:00
|
|
|
from zope.interface import implements
|
2007-12-04 04:37:54 +00:00
|
|
|
from twisted.internet import defer
|
2008-10-06 19:52:36 +00:00
|
|
|
from twisted.internet.interfaces import IPushProducer, IConsumer
|
2008-10-28 20:41:04 +00:00
|
|
|
from twisted.protocols import basic
|
2008-10-29 00:56:18 +00:00
|
|
|
from foolscap.eventual import eventually
|
|
|
|
from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
|
2009-01-12 18:00:22 +00:00
|
|
|
IDownloadTarget, IUploadResults
|
|
|
|
from allmydata.util import dictutil, log, base32
|
2009-01-08 18:53:49 +00:00
|
|
|
from allmydata.util.assertutil import precondition
|
2009-01-08 18:25:30 +00:00
|
|
|
from allmydata import uri as urimodule
|
2009-01-06 01:28:18 +00:00
|
|
|
from allmydata.immutable.checker import Checker
|
2009-01-12 18:00:22 +00:00
|
|
|
from allmydata.check_results import CheckResults, CheckAndRepairResults
|
2009-01-06 01:28:18 +00:00
|
|
|
from allmydata.immutable.repairer import Repairer
|
2008-10-28 20:41:04 +00:00
|
|
|
from allmydata.immutable import download
|
2007-12-03 21:52:42 +00:00
|
|
|
|
2008-10-28 20:41:04 +00:00
|
|
|
class _ImmutableFileNodeBase(object):
|
2008-07-16 00:23:25 +00:00
|
|
|
implements(IFileNode, ICheckable)
|
2007-12-03 21:52:42 +00:00
|
|
|
|
|
|
|
def __init__(self, uri, client):
|
2009-01-08 18:53:49 +00:00
|
|
|
precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
|
2008-09-23 18:52:49 +00:00
|
|
|
self.u = IFileURI(uri)
|
2007-12-03 21:52:42 +00:00
|
|
|
self._client = client
|
|
|
|
|
2008-09-23 18:52:49 +00:00
|
|
|
def get_readonly_uri(self):
|
|
|
|
return self.get_uri()
|
2007-12-03 21:52:42 +00:00
|
|
|
|
2008-05-19 20:03:00 +00:00
|
|
|
def is_mutable(self):
|
|
|
|
return False
|
|
|
|
|
2007-12-03 21:52:42 +00:00
|
|
|
def is_readonly(self):
|
|
|
|
return True
|
|
|
|
|
2008-09-23 18:52:49 +00:00
|
|
|
def __hash__(self):
|
|
|
|
return self.u.__hash__()
|
|
|
|
def __eq__(self, other):
|
|
|
|
if IFileNode.providedBy(other):
|
|
|
|
return self.u.__eq__(other.u)
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
def __ne__(self, other):
|
|
|
|
if IFileNode.providedBy(other):
|
|
|
|
return self.u.__eq__(other.u)
|
|
|
|
else:
|
|
|
|
return True
|
|
|
|
|
2008-10-28 20:41:04 +00:00
|
|
|
class PortionOfFile:
|
|
|
|
# like a list slice (things[2:14]), but for a file on disk
|
|
|
|
def __init__(self, fn, offset=0, size=None):
|
|
|
|
self.f = open(fn, "rb")
|
|
|
|
self.f.seek(offset)
|
|
|
|
self.bytes_left = size
|
|
|
|
|
|
|
|
def read(self, size=None):
|
|
|
|
# bytes_to_read = min(size, self.bytes_left), but None>anything
|
|
|
|
if size is None:
|
|
|
|
bytes_to_read = self.bytes_left
|
|
|
|
elif self.bytes_left is None:
|
|
|
|
bytes_to_read = size
|
|
|
|
else:
|
|
|
|
bytes_to_read = min(size, self.bytes_left)
|
|
|
|
data = self.f.read(bytes_to_read)
|
|
|
|
if self.bytes_left is not None:
|
|
|
|
self.bytes_left -= len(data)
|
|
|
|
return data
|
|
|
|
|
2008-10-30 20:39:09 +00:00
|
|
|
class DownloadCache:
|
2008-10-29 00:56:18 +00:00
|
|
|
implements(IDownloadTarget)
|
2008-09-23 18:52:49 +00:00
|
|
|
|
2008-10-30 20:39:09 +00:00
|
|
|
def __init__(self, node, cachefile):
|
|
|
|
self._downloader = node._client.getServiceNamed("downloader")
|
|
|
|
self._uri = node.get_uri()
|
|
|
|
self._storage_index = node.get_storage_index()
|
2008-10-29 00:56:18 +00:00
|
|
|
self.milestones = set() # of (offset,size,Deferred)
|
2008-10-30 20:39:09 +00:00
|
|
|
self.cachefile = cachefile
|
2008-10-29 00:56:18 +00:00
|
|
|
self.download_in_progress = False
|
2008-10-28 20:41:04 +00:00
|
|
|
# five states:
|
|
|
|
# new FileNode, no downloads ever performed
|
|
|
|
# new FileNode, leftover file (partial)
|
|
|
|
# new FileNode, leftover file (whole)
|
|
|
|
# download in progress, not yet complete
|
|
|
|
# download complete
|
2008-10-29 00:56:18 +00:00
|
|
|
|
|
|
|
def when_range_available(self, offset, size):
|
|
|
|
assert isinstance(offset, (int,long))
|
|
|
|
assert isinstance(size, (int,long))
|
|
|
|
|
|
|
|
d = defer.Deferred()
|
|
|
|
self.milestones.add( (offset,size,d) )
|
|
|
|
self._check_milestones()
|
|
|
|
if self.milestones and not self.download_in_progress:
|
|
|
|
self.download_in_progress = True
|
|
|
|
log.msg(format=("immutable filenode read [%(si)s]: " +
|
|
|
|
"starting download"),
|
2008-10-30 20:39:09 +00:00
|
|
|
si=base32.b2a(self._storage_index),
|
2008-10-29 00:56:18 +00:00
|
|
|
umid="h26Heg", level=log.OPERATIONAL)
|
2008-10-30 20:39:09 +00:00
|
|
|
d2 = self._downloader.download(self._uri, self)
|
2008-10-29 00:56:18 +00:00
|
|
|
d2.addBoth(self._download_done)
|
|
|
|
d2.addErrback(self._download_failed)
|
|
|
|
d2.addErrback(log.err, umid="cQaM9g")
|
|
|
|
return d
|
|
|
|
|
|
|
|
def read(self, consumer, offset, size):
|
|
|
|
assert offset+size <= self.get_filesize()
|
2008-10-30 20:39:09 +00:00
|
|
|
f = PortionOfFile(self.cachefile.get_filename(), offset, size)
|
2008-10-29 00:56:18 +00:00
|
|
|
d = basic.FileSender().beginFileTransfer(f, consumer)
|
|
|
|
d.addCallback(lambda lastSent: consumer)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _download_done(self, res):
|
|
|
|
# clear download_in_progress, so failed downloads can be re-tried
|
2008-10-28 20:41:04 +00:00
|
|
|
self.download_in_progress = False
|
2008-10-29 00:56:18 +00:00
|
|
|
return res
|
|
|
|
|
|
|
|
def _download_failed(self, f):
|
|
|
|
# tell anyone who's waiting that we failed
|
|
|
|
for m in self.milestones:
|
|
|
|
(offset,size,d) = m
|
|
|
|
eventually(d.errback, f)
|
|
|
|
self.milestones.clear()
|
|
|
|
|
|
|
|
def _check_milestones(self):
|
|
|
|
current_size = self.get_filesize()
|
|
|
|
for m in list(self.milestones):
|
|
|
|
(offset,size,d) = m
|
|
|
|
if offset+size <= current_size:
|
|
|
|
log.msg(format=("immutable filenode read [%(si)s] " +
|
|
|
|
"%(offset)d+%(size)d vs %(filesize)d: " +
|
|
|
|
"done"),
|
2008-10-30 20:39:09 +00:00
|
|
|
si=base32.b2a(self._storage_index),
|
2008-10-29 00:56:18 +00:00
|
|
|
offset=offset, size=size, filesize=current_size,
|
|
|
|
umid="nuedUg", level=log.NOISY)
|
|
|
|
self.milestones.discard(m)
|
|
|
|
eventually(d.callback, None)
|
|
|
|
else:
|
|
|
|
log.msg(format=("immutable filenode read [%(si)s] " +
|
|
|
|
"%(offset)d+%(size)d vs %(filesize)d: " +
|
|
|
|
"still waiting"),
|
2008-10-30 20:39:09 +00:00
|
|
|
si=base32.b2a(self._storage_index),
|
2008-10-29 00:56:18 +00:00
|
|
|
offset=offset, size=size, filesize=current_size,
|
|
|
|
umid="8PKOhg", level=log.NOISY)
|
|
|
|
|
|
|
|
def get_filesize(self):
|
|
|
|
try:
|
2008-10-30 20:39:09 +00:00
|
|
|
filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
|
2008-10-29 00:56:18 +00:00
|
|
|
except OSError:
|
|
|
|
filesize = 0
|
|
|
|
return filesize
|
|
|
|
|
|
|
|
|
|
|
|
def open(self, size):
|
2008-10-30 20:39:09 +00:00
|
|
|
self.f = open(self.cachefile.get_filename(), "wb")
|
2008-10-29 00:56:18 +00:00
|
|
|
|
|
|
|
def write(self, data):
|
|
|
|
self.f.write(data)
|
|
|
|
self._check_milestones()
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.f.close()
|
|
|
|
self._check_milestones()
|
|
|
|
|
|
|
|
def fail(self, why):
|
|
|
|
pass
|
|
|
|
def register_canceller(self, cb):
|
|
|
|
pass
|
|
|
|
def finish(self):
|
|
|
|
return None
|
2009-01-12 18:00:22 +00:00
|
|
|
# The following methods are just because the target might be a repairer.DownUpConnector,
|
|
|
|
# and just because the current CHKUpload object expects to find the storage index and
|
|
|
|
# encoding parameters in its Uploadable.
|
|
|
|
def set_storageindex(self, storageindex):
|
|
|
|
pass
|
|
|
|
def set_encodingparams(self, encodingparams):
|
|
|
|
pass
|
2008-10-29 00:56:18 +00:00
|
|
|
|
|
|
|
|
2009-01-08 18:25:30 +00:00
|
|
|
class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
|
2008-10-29 00:56:18 +00:00
|
|
|
def __init__(self, uri, client, cachefile):
|
|
|
|
_ImmutableFileNodeBase.__init__(self, uri, client)
|
2008-10-30 20:39:09 +00:00
|
|
|
self.download_cache = DownloadCache(self, cachefile)
|
2009-01-08 18:53:49 +00:00
|
|
|
prefix = uri.get_verify_cap().to_string()
|
2009-01-08 18:25:30 +00:00
|
|
|
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
|
|
|
|
self.log("starting", level=log.OPERATIONAL)
|
2008-09-23 18:52:49 +00:00
|
|
|
|
|
|
|
def get_uri(self):
|
|
|
|
return self.u.to_string()
|
2007-12-03 21:52:42 +00:00
|
|
|
|
|
|
|
def get_size(self):
|
2008-07-17 23:47:09 +00:00
|
|
|
return self.u.get_size()
|
2007-12-03 21:52:42 +00:00
|
|
|
|
2008-12-08 19:44:11 +00:00
|
|
|
def get_verify_cap(self):
|
|
|
|
return self.u.get_verify_cap()
|
2007-12-03 21:52:42 +00:00
|
|
|
|
2008-08-12 23:14:07 +00:00
|
|
|
def get_storage_index(self):
|
|
|
|
return self.u.storage_index
|
|
|
|
|
2008-10-22 08:38:18 +00:00
|
|
|
def check_and_repair(self, monitor, verify=False):
|
2009-01-06 01:28:18 +00:00
|
|
|
verifycap = self.get_verify_cap()
|
|
|
|
servers = self._client.get_servers("storage")
|
|
|
|
|
|
|
|
c = Checker(client=self._client, verifycap=verifycap, servers=servers, verify=verify, monitor=monitor)
|
|
|
|
d = c.start()
|
|
|
|
def _maybe_repair(cr):
|
|
|
|
crr = CheckAndRepairResults(self.u.storage_index)
|
|
|
|
crr.pre_repair_results = cr
|
|
|
|
if cr.is_healthy():
|
|
|
|
crr.post_repair_results = cr
|
|
|
|
return defer.succeed(crr)
|
|
|
|
else:
|
2009-01-13 01:56:19 +00:00
|
|
|
crr.repair_attempted = True
|
|
|
|
crr.repair_successful = False # until proven successful
|
2009-01-12 18:00:22 +00:00
|
|
|
def _gather_repair_results(ur):
|
|
|
|
assert IUploadResults.providedBy(ur), ur
|
|
|
|
# clone the cr -- check results to form the basic of the prr -- post-repair results
|
|
|
|
prr = CheckResults(cr.uri, cr.storage_index)
|
|
|
|
prr.data = copy.deepcopy(cr.data)
|
|
|
|
|
|
|
|
sm = prr.data['sharemap']
|
|
|
|
assert isinstance(sm, dictutil.DictOfSets), sm
|
|
|
|
sm.update(ur.sharemap)
|
|
|
|
servers_responding = set(prr.data['servers-responding'])
|
|
|
|
servers_responding.union(ur.sharemap.iterkeys())
|
|
|
|
prr.data['servers-responding'] = list(servers_responding)
|
|
|
|
prr.data['count-shares-good'] = len(sm)
|
|
|
|
prr.data['count-good-share-hosts'] = len(sm)
|
2009-01-13 01:56:19 +00:00
|
|
|
is_healthy = len(sm) >= self.u.total_shares
|
|
|
|
prr.set_healthy(is_healthy)
|
|
|
|
crr.repair_successful = is_healthy
|
2009-01-12 18:00:22 +00:00
|
|
|
prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
|
|
|
|
|
|
|
|
crr.post_repair_results = prr
|
2009-01-06 01:28:18 +00:00
|
|
|
return crr
|
2009-01-13 01:56:19 +00:00
|
|
|
def _repair_error(f):
|
|
|
|
# as with mutable repair, I'm not sure if I want to pass
|
|
|
|
# through a failure or not. TODO
|
|
|
|
crr.repair_successful = False
|
|
|
|
crr.repair_failure = f
|
|
|
|
return f
|
2009-01-12 18:00:22 +00:00
|
|
|
r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
|
2009-01-06 01:28:18 +00:00
|
|
|
d = r.start()
|
2009-01-13 01:56:19 +00:00
|
|
|
d.addCallbacks(_gather_repair_results, _repair_error)
|
2009-01-06 01:28:18 +00:00
|
|
|
return d
|
|
|
|
|
|
|
|
d.addCallback(_maybe_repair)
|
2008-09-09 23:34:49 +00:00
|
|
|
return d
|
2008-09-07 19:44:56 +00:00
|
|
|
|
2009-01-06 01:28:18 +00:00
|
|
|
def check(self, monitor, verify=False):
|
|
|
|
v = Checker(client=self._client, verifycap=self.get_verify_cap(), servers=self._client.get_servers("storage"), verify=verify, monitor=monitor)
|
|
|
|
return v.start()
|
|
|
|
|
2008-10-28 20:41:04 +00:00
|
|
|
def read(self, consumer, offset=0, size=None):
|
|
|
|
if size is None:
|
|
|
|
size = self.get_size() - offset
|
2008-11-04 22:29:19 +00:00
|
|
|
size = min(size, self.get_size() - offset)
|
2008-10-28 20:41:04 +00:00
|
|
|
|
|
|
|
if offset == 0 and size == self.get_size():
|
|
|
|
# don't use the cache, just do a normal streaming download
|
2009-01-08 18:25:30 +00:00
|
|
|
self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
|
2008-10-28 20:41:04 +00:00
|
|
|
return self.download(download.ConsumerAdapter(consumer))
|
|
|
|
|
2008-10-30 20:39:09 +00:00
|
|
|
d = self.download_cache.when_range_available(offset, size)
|
|
|
|
d.addCallback(lambda res:
|
|
|
|
self.download_cache.read(consumer, offset, size))
|
2008-10-28 20:41:04 +00:00
|
|
|
return d
|
|
|
|
|
2007-12-03 21:52:42 +00:00
|
|
|
def download(self, target):
|
|
|
|
downloader = self._client.getServiceNamed("downloader")
|
2009-01-14 23:14:24 +00:00
|
|
|
history = self._client.get_history()
|
|
|
|
return downloader.download(self.get_uri(), target, self._parentmsgid,
|
|
|
|
history=history)
|
2007-12-03 21:52:42 +00:00
|
|
|
|
|
|
|
def download_to_data(self):
|
|
|
|
downloader = self._client.getServiceNamed("downloader")
|
2009-01-14 23:14:24 +00:00
|
|
|
history = self._client.get_history()
|
|
|
|
return downloader.download_to_data(self.get_uri(), history=history)
|
2007-12-03 21:52:42 +00:00
|
|
|
|
2008-10-06 19:52:36 +00:00
|
|
|
class LiteralProducer:
|
|
|
|
implements(IPushProducer)
|
|
|
|
def resumeProducing(self):
|
|
|
|
pass
|
|
|
|
def stopProducing(self):
|
|
|
|
pass
|
2007-12-04 04:37:54 +00:00
|
|
|
|
2008-10-28 20:41:04 +00:00
|
|
|
|
|
|
|
class LiteralFileNode(_ImmutableFileNodeBase):
|
2007-12-04 04:37:54 +00:00
|
|
|
|
2008-09-23 18:52:49 +00:00
|
|
|
def __init__(self, uri, client):
|
2009-01-08 18:53:49 +00:00
|
|
|
precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
|
2008-10-28 20:41:04 +00:00
|
|
|
_ImmutableFileNodeBase.__init__(self, uri, client)
|
2007-12-04 04:37:54 +00:00
|
|
|
|
|
|
|
def get_uri(self):
|
2008-09-23 18:52:49 +00:00
|
|
|
return self.u.to_string()
|
2007-12-04 04:37:54 +00:00
|
|
|
|
|
|
|
def get_size(self):
|
2008-09-23 18:52:49 +00:00
|
|
|
return len(self.u.data)
|
2007-12-04 04:37:54 +00:00
|
|
|
|
2008-12-08 19:44:11 +00:00
|
|
|
def get_verify_cap(self):
|
2007-12-04 04:37:54 +00:00
|
|
|
return None
|
|
|
|
|
2008-08-12 23:14:07 +00:00
|
|
|
def get_storage_index(self):
|
|
|
|
return None
|
|
|
|
|
2008-10-22 08:38:18 +00:00
|
|
|
def check(self, monitor, verify=False):
|
2008-09-07 19:44:56 +00:00
|
|
|
return defer.succeed(None)
|
2007-12-04 04:37:54 +00:00
|
|
|
|
2008-10-22 08:38:18 +00:00
|
|
|
def check_and_repair(self, monitor, verify=False):
|
2008-09-09 23:34:49 +00:00
|
|
|
return defer.succeed(None)
|
|
|
|
|
2008-10-28 20:41:04 +00:00
|
|
|
def read(self, consumer, offset=0, size=None):
|
|
|
|
if size is None:
|
|
|
|
data = self.u.data[offset:]
|
|
|
|
else:
|
|
|
|
data = self.u.data[offset:offset+size]
|
|
|
|
|
|
|
|
# We use twisted.protocols.basic.FileSender, which only does
|
|
|
|
# non-streaming, i.e. PullProducer, where the receiver/consumer must
|
|
|
|
# ask explicitly for each chunk of data. There are only two places in
|
|
|
|
# the Twisted codebase that can't handle streaming=False, both of
|
|
|
|
# which are in the upload path for an FTP/SFTP server
|
|
|
|
# (protocols.ftp.FileConsumer and
|
|
|
|
# vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
|
|
|
|
# likely to be used as the target for a Tahoe download.
|
|
|
|
|
|
|
|
d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
|
|
|
|
d.addCallback(lambda lastSent: consumer)
|
|
|
|
return d
|
|
|
|
|
2007-12-04 04:37:54 +00:00
|
|
|
def download(self, target):
|
2008-07-16 00:23:25 +00:00
|
|
|
# note that this does not update the stats_provider
|
2008-09-23 18:52:49 +00:00
|
|
|
data = self.u.data
|
2008-10-06 19:52:36 +00:00
|
|
|
if IConsumer.providedBy(target):
|
|
|
|
target.registerProducer(LiteralProducer(), True)
|
2007-12-04 04:37:54 +00:00
|
|
|
target.open(len(data))
|
|
|
|
target.write(data)
|
2008-10-06 19:52:36 +00:00
|
|
|
if IConsumer.providedBy(target):
|
|
|
|
target.unregisterProducer()
|
2007-12-04 04:37:54 +00:00
|
|
|
target.close()
|
|
|
|
return defer.maybeDeferred(target.finish)
|
|
|
|
|
|
|
|
def download_to_data(self):
|
2008-09-23 18:52:49 +00:00
|
|
|
data = self.u.data
|
2007-12-04 04:37:54 +00:00
|
|
|
return defer.succeed(data)
|