actual usable agent

This commit is contained in:
meejah 2020-06-01 09:06:46 -06:00
parent d50ca9af55
commit 1faa81c7c4

View File

@ -261,3 +261,52 @@ def create_fake_tahoe_root():
)
yield
returnValue(root)
@implementer(IBodyProducer)
class _SynchronousProducer(object):
"""
A partial implementation of an :obj:`IBodyProducer` which produces its
entire payload immediately. There is no way to access to an instance of
this object from :obj:`RequestTraversalAgent` or :obj:`StubTreq`, or even a
:obj:`Resource: passed to :obj:`StubTreq`.
This does not implement the :func:`IBodyProducer.stopProducing` method,
because that is very difficult to trigger. (The request from
`RequestTraversalAgent` would have to be canceled while it is still in the
transmitting state), and the intent is to use `RequestTraversalAgent` to
make synchronous requests.
"""
def __init__(self, body):
"""
Create a synchronous producer with some bytes.
"""
if not isinstance(body, bytes):
raise ValueError(
"'body' must be bytes"
)
self.body = body
self.length = len(body)
def startProducing(self, consumer):
"""
Immediately produce all data.
"""
consumer.write(self.body)
return succeed(None)
@inlineCallbacks
def create_tahoe_treq_client(root=None):
"""
"""
if root is None:
root = yield create_fake_tahoe_root()
client = HTTPClient(
agent=RequestTraversalAgent(root),
data_to_body_producer=_SynchronousProducer,
)
returnValue(client)