A nicer interface.

This commit is contained in:
Itamar Turner-Trauring 2022-07-06 09:47:08 -04:00
parent fd8a385d1d
commit 0b5132745d
2 changed files with 15 additions and 5 deletions

View File

@ -4,7 +4,7 @@ HTTP client that talks to the HTTP storage server.
from __future__ import annotations
from typing import Union, Optional, Sequence, Mapping
from typing import Union, Optional, Sequence, Mapping, BinaryIO
from base64 import b64encode
from io import BytesIO
@ -131,25 +131,35 @@ class _LengthLimitedCollector:
self.f.write(data)
def limited_content(response, max_length: int = 30 * 1024 * 1024) -> Deferred:
def limited_content(response, max_length: int = 30 * 1024 * 1024) -> Deferred[BinaryIO]:
"""
Like ``treq.content()``, but limit data read from the response to a set
length. If the response is longer than the max allowed length, the result
fails with a ``ValueError``.
A potentially useful future improvement would be using a temporary file to
store the content; since filesystem buffering means that would use memory
for small responses and disk for large responses.
"""
collector = _LengthLimitedCollector(max_length)
# Make really sure everything gets called in Deferred context, treq might
# call collector directly...
d = succeed(None)
d.addCallback(lambda _: treq.collect(response, collector))
d.addCallback(lambda _: collector.f.getvalue())
def done(_):
collector.f.seek(0)
return collector.f
d.addCallback(done)
return d
def _decode_cbor(response, schema: Schema):
"""Given HTTP response, return decoded CBOR body."""
def got_content(data):
def got_content(f: BinaryIO):
data = f.read()
schema.validate_cbor(data)
return loads(data)

View File

@ -346,7 +346,7 @@ class CustomHTTPServerTests(SyncTestCase):
)
self.assertEqual(
result_of(limited_content(response, at_least_length)),
result_of(limited_content(response, at_least_length)).read(),
gen_bytes(length),
)