upload: add WriterProxy

This commit is contained in:
Brian Warner 2006-12-01 04:06:11 -07:00
parent 14a2dbd553
commit f87008cf36
2 changed files with 48 additions and 16 deletions

View File

@ -52,6 +52,13 @@ class NextPeer(unittest.TestCase):
"good", # 4
]
def compare_landlords(self, u, c, expected):
exp = [(peerid, bucketnum, c.peers[peerid])
for peerid, bucketnum in expected]
landlords = [(peerid, bucketnum, proxy.remote_bucket)
for peerid, bucketnum, proxy in u.landlords]
self.failUnlessEqual(landlords, exp)
def test_0(self):
c = FakeClient([])
u = NextPeerUploader(c)
@ -74,10 +81,9 @@ class NextPeer(unittest.TestCase):
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 2)
self.failUnlessEqual(u.landlords,
[(0, 0, c.peers[0]),
(4, 1, c.peers[4]),
])
self.compare_landlords(u, c, [(0, 0),
(4, 1),
])
d.addCallback(_check)
return d
@ -90,11 +96,10 @@ class NextPeer(unittest.TestCase):
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 3)
self.failUnlessEqual(u.landlords,
[(0, 0, c.peers[0]),
(4, 1, c.peers[4]),
(0, 2, c.peers[0]),
])
self.compare_landlords(u, c, [(0, 0),
(4, 1),
(0, 2),
])
d.addCallback(_check)
return d
@ -114,11 +119,10 @@ class NextPeer(unittest.TestCase):
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 3)
self.failUnlessEqual(u.landlords,
[(0, 0, c.peers[0]),
(3, 1, c.peers[3]),
(0, 2, c.peers[0]),
])
self.compare_landlords(u, c, [(0, 0),
(3, 1),
(0, 2),
])
d.addCallback(_check)
return d

View File

@ -13,6 +13,30 @@ class HaveAllPeersError(Exception):
class TooFullError(Exception):
pass
class WriterProxy:
# make this look like a writable file
def __init__(self, remote_bucket):
self.remote_bucket = remote_bucket
self.good = True
def _broken(self, why):
self.good = False
# do something else here
return why
def write(self, data):
d = self.remote_bucket.callRemote("write", data=data)
d.addErrback(self._broken)
def seek(self, offset, whence=0):
d = self.remote_bucket.callRemote("seek", offset=offset, whence=whence)
d.addErrback(self._broken)
def close(self):
if self._broken:
raise SomethingBrokeError()
d = self.remote_bucket.callRemote("close")
d.addErrback(self._broken)
class Uploader:
debug = False
@ -20,6 +44,9 @@ class Uploader:
def __init__(self, peer):
self._peer = peer
def set_encoder(self, encoder):
self._encoder = encoder
def set_verifierid(self, vid):
assert isinstance(vid, str)
self._verifierid = vid
@ -33,7 +60,7 @@ class Uploader:
def start(self):
# who should we upload to?
# first step: who should we upload to?
# maybe limit max_peers to 2*len(self.shares), to reduce memory
# footprint
@ -71,7 +98,8 @@ class Uploader:
def _allocate_response(bucket):
if self.debug:
print " peerid %s will grant us a lease" % peerid
self.landlords.append( (peerid, bucket_num, bucket) )
writer = WriterProxy(bucket)
self.landlords.append( (peerid, bucket_num, writer) )
self.goodness_points += 1
if self.goodness_points >= self.target_goodness:
if self.debug: print " we're done!"