mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-20 13:33:09 +00:00
Start of tests for consumer.py.
This commit is contained in:
parent
fbcb9bef29
commit
1ead68d061
80
src/allmydata/test/test_consumer.py
Normal file
80
src/allmydata/test/test_consumer.py
Normal file
@ -0,0 +1,80 @@
|
||||
"""
|
||||
Tests for allmydata.util.consumer.
|
||||
|
||||
Ported to Python 3.
|
||||
"""
|
||||
|
||||
from __future__ import unicode_literals
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from future.utils import PY2
|
||||
if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
from zope.interface import implementer
|
||||
from twisted.trial.unittest import TestCase
|
||||
from twisted.internet.interfaces import IPushProducer, IPullProducer
|
||||
|
||||
from allmydata.util.consumer import MemoryConsumer, download_to_data
|
||||
|
||||
|
||||
@implementer(IPushProducer)
|
||||
@implementer(IPullProducer)
|
||||
class Producer:
|
||||
"""Can be used as either streaming or non-streaming producer.
|
||||
|
||||
If used as streaming, the test should call iterate() manually.
|
||||
"""
|
||||
|
||||
def __init__(self, consumer):
|
||||
self.data = [b"abc", b"def", b"ghi"]
|
||||
self.consumer = consumer
|
||||
self.done = False
|
||||
|
||||
def resumeProducing(self):
|
||||
"""Kick off streaming."""
|
||||
self.iterate()
|
||||
|
||||
def iterate(self):
|
||||
"""Do another iteration of writing."""
|
||||
if self.done:
|
||||
raise RuntimeError(
|
||||
"There's a bug somewhere, shouldn't iterate after being done"
|
||||
)
|
||||
if self.data:
|
||||
self.consumer.write(self.data.pop(0))
|
||||
else:
|
||||
self.done = True
|
||||
self.consumer.unregisterProducer()
|
||||
|
||||
|
||||
class MemoryConsumerTests(TestCase):
|
||||
"""Tests for MemoryConsumer."""
|
||||
|
||||
def test_push_producer(self):
|
||||
"""
|
||||
A MemoryConsumer accumulates all data sent by a streaming producer.
|
||||
"""
|
||||
consumer = MemoryConsumer()
|
||||
producer = Producer(consumer)
|
||||
consumer.registerProducer(producer, True)
|
||||
self.assertEqual(consumer.chunks, [b"abc"])
|
||||
producer.iterate()
|
||||
producer.iterate()
|
||||
self.assertEqual(consumer.chunks, [b"abc", b"def", b"ghi"])
|
||||
self.assertEqual(consumer.done, False)
|
||||
producer.iterate()
|
||||
self.assertEqual(consumer.chunks, [b"abc", b"def", b"ghi"])
|
||||
self.assertEqual(consumer.done, True)
|
||||
|
||||
def test_pull_producer(self):
|
||||
"""
|
||||
A MemoryConsumer accumulates all data sent by a non-streaming producer.
|
||||
"""
|
||||
consumer = MemoryConsumer()
|
||||
producer = Producer(consumer)
|
||||
consumer.registerProducer(producer, False)
|
||||
self.assertEqual(consumer.chunks, [b"abc", b"def", b"ghi"])
|
||||
self.assertEqual(consumer.done, True)
|
@ -191,6 +191,7 @@ PORTED_TEST_MODULES = [
|
||||
"allmydata.test.test_configutil",
|
||||
"allmydata.test.test_connections",
|
||||
"allmydata.test.test_connection_status",
|
||||
"allmydata.test.test_consumer",
|
||||
"allmydata.test.test_crawler",
|
||||
"allmydata.test.test_crypto",
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user