From 06eca79263382fab3b742d1d3243463735bc79f6 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 27 Jun 2022 14:03:05 -0400 Subject: [PATCH] Minimal streaming implementation. --- src/allmydata/storage/http_server.py | 55 ++++++++++++++++++------- src/allmydata/test/test_storage_http.py | 21 ++++++++-- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index ebd2323ef..b8887bb4e 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -12,10 +12,15 @@ import binascii from zope.interface import implementer from klein import Klein from twisted.web import http -from twisted.internet.interfaces import IListeningPort, IStreamServerEndpoint +from twisted.web.server import NOT_DONE_YET +from twisted.internet.interfaces import ( + IListeningPort, + IStreamServerEndpoint, + IPullProducer, +) from twisted.internet.defer import Deferred from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate -from twisted.web.server import Site +from twisted.web.server import Site, Request from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.python.filepath import FilePath @@ -274,7 +279,37 @@ _SCHEMAS = { } -def read_range(request, read_data: Callable[[int, int], bytes]) -> None: +@implementer(IPullProducer) +@define +class _ReadProducer: + """ + Producer that calls a read function, and writes to a request. + """ + + request: Request + read_data: Callable[[int, int], bytes] + result: Deferred + start: int = field(default=0) + + def resumeProducing(self): + data = self.read_data(self.start, self.start + 65536) + if not data: + self.request.unregisterProducer() + d = self.result + del self.result + d.callback(b"") + return + self.request.write(data) + self.start += len(data) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None: """ Read an optional ``Range`` header, reads data appropriately via the given callable, writes the data to the request. @@ -290,17 +325,9 @@ def read_range(request, read_data: Callable[[int, int], bytes]) -> None: The resulting data is written to the request. """ if request.getHeader("range") is None: - # Return the whole thing. - start = 0 - while True: - # TODO should probably yield to event loop occasionally... - # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 - data = read_data(start, start + 65536) - if not data: - request.finish() - return - request.write(data) - start += len(data) + d = Deferred() + request.registerProducer(_ReadProducer(request, read_data, d), False) + return d range_header = parse_range_header(request.getHeader("range")) if ( diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 5e0b35d88..23d9bc276 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -6,6 +6,7 @@ from base64 import b64encode from contextlib import contextmanager from os import urandom from typing import Union, Callable, Tuple, Iterable +from time import sleep, time from cbor2 import dumps from pycddl import ValidationError as CDDLValidationError from hypothesis import assume, given, strategies as st @@ -14,7 +15,8 @@ from treq.testing import StubTreq from klein import Klein from hyperlink import DecodedURL from collections_extended import RangeMap -from twisted.internet.task import Clock +from twisted.internet.task import Clock, Cooperator +from twisted.internet import task from twisted.web import http from twisted.web.http_headers import Headers from werkzeug import routing @@ -316,10 +318,11 @@ class HttpTestFixture(Fixture): self.tempdir.path, b"\x00" * 20, clock=self.clock ) self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST) + self.treq = StubTreq(self.http_server.get_resource()) self.client = StorageClient( DecodedURL.from_text("http://127.0.0.1"), SWISSNUM_FOR_TEST, - treq=StubTreq(self.http_server.get_resource()), + treq=self.treq, ) @@ -1261,8 +1264,20 @@ class SharedImmutableMutableTestsMixin: """ A read with no range returns the whole mutable/immutable. """ + self.patch( + task, + "_theCooperator", + Cooperator(scheduler=lambda c: self.http.clock.callLater(0.000001, c)), + ) + + def result_of_with_flush(d): + for i in range(100): + self.http.clock.advance(0.001) + self.http.treq.flush() + return result_of(d) + storage_index, uploaded_data, _ = self.upload(1, data_length) - response = result_of( + response = result_of_with_flush( self.http.client.request( "GET", self.http.client.relative_url(