Minimal streaming implementation.

This commit is contained in:
Itamar Turner-Trauring 2022-06-27 14:03:05 -04:00
parent bfd54dc6ea
commit 06eca79263
2 changed files with 59 additions and 17 deletions

View File

@ -12,10 +12,15 @@ import binascii
from zope.interface import implementer
from klein import Klein
from twisted.web import http
from twisted.internet.interfaces import IListeningPort, IStreamServerEndpoint
from twisted.web.server import NOT_DONE_YET
from twisted.internet.interfaces import (
IListeningPort,
IStreamServerEndpoint,
IPullProducer,
)
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
from twisted.web.server import Site
from twisted.web.server import Site, Request
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.filepath import FilePath
@ -274,7 +279,37 @@ _SCHEMAS = {
}
def read_range(request, read_data: Callable[[int, int], bytes]) -> None:
@implementer(IPullProducer)
@define
class _ReadProducer:
"""
Producer that calls a read function, and writes to a request.
"""
request: Request
read_data: Callable[[int, int], bytes]
result: Deferred
start: int = field(default=0)
def resumeProducing(self):
data = self.read_data(self.start, self.start + 65536)
if not data:
self.request.unregisterProducer()
d = self.result
del self.result
d.callback(b"")
return
self.request.write(data)
self.start += len(data)
def pauseProducing(self):
pass
def stopProducing(self):
pass
def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None:
"""
Read an optional ``Range`` header, reads data appropriately via the given
callable, writes the data to the request.
@ -290,17 +325,9 @@ def read_range(request, read_data: Callable[[int, int], bytes]) -> None:
The resulting data is written to the request.
"""
if request.getHeader("range") is None:
# Return the whole thing.
start = 0
while True:
# TODO should probably yield to event loop occasionally...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = read_data(start, start + 65536)
if not data:
request.finish()
return
request.write(data)
start += len(data)
d = Deferred()
request.registerProducer(_ReadProducer(request, read_data, d), False)
return d
range_header = parse_range_header(request.getHeader("range"))
if (

View File

@ -6,6 +6,7 @@ from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from typing import Union, Callable, Tuple, Iterable
from time import sleep, time
from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st
@ -14,7 +15,8 @@ from treq.testing import StubTreq
from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock
from twisted.internet.task import Clock, Cooperator
from twisted.internet import task
from twisted.web import http
from twisted.web.http_headers import Headers
from werkzeug import routing
@ -316,10 +318,11 @@ class HttpTestFixture(Fixture):
self.tempdir.path, b"\x00" * 20, clock=self.clock
)
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=StubTreq(self.http_server.get_resource()),
treq=self.treq,
)
@ -1261,8 +1264,20 @@ class SharedImmutableMutableTestsMixin:
"""
A read with no range returns the whole mutable/immutable.
"""
self.patch(
task,
"_theCooperator",
Cooperator(scheduler=lambda c: self.http.clock.callLater(0.000001, c)),
)
def result_of_with_flush(d):
for i in range(100):
self.http.clock.advance(0.001)
self.http.treq.flush()
return result_of(d)
storage_index, uploaded_data, _ = self.upload(1, data_length)
response = result_of(
response = result_of_with_flush(
self.http.client.request(
"GET",
self.http.client.relative_url(