tahoe-lafs/src/allmydata/test/test_netstring.py

60 lines
2.5 KiB
Python
Raw Normal View History

2020-07-15 14:59:14 +00:00
"""
Tests for allmydata.util.netstring.
2020-07-15 15:05:23 +00:00
Ported to Python 3.
2020-07-15 14:59:14 +00:00
"""
2020-07-15 15:05:23 +00:00
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
2020-08-05 15:53:23 +00:00
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from twisted.trial import unittest
2020-07-15 14:59:14 +00:00
from allmydata.util.netstring import netstring, split_netstring
2020-07-15 14:59:14 +00:00
class Netstring(unittest.TestCase):
2020-07-15 15:02:49 +00:00
def test_encode(self):
"""netstring() correctly encodes the given bytes."""
2020-07-15 15:08:06 +00:00
result = netstring(b"abc")
self.assertEqual(result, b"3:abc,")
self.assertIsInstance(result, bytes)
2020-07-15 15:02:49 +00:00
def test_split(self):
2020-07-15 14:59:14 +00:00
a = netstring(b"hello") + netstring(b"world")
2020-07-15 15:08:06 +00:00
for s in split_netstring(a, 2)[0]:
self.assertIsInstance(s, bytes)
2020-07-15 14:59:14 +00:00
self.failUnlessEqual(split_netstring(a, 2), ([b"hello", b"world"], len(a)))
self.failUnlessEqual(split_netstring(a, 2, required_trailer=b""), ([b"hello", b"world"], len(a)))
self.failUnlessRaises(ValueError, split_netstring, a, 3)
self.failUnlessRaises(ValueError, split_netstring, a+b" extra", 2, required_trailer=b"")
2020-07-15 14:59:14 +00:00
self.failUnlessEqual(split_netstring(a+b" extra", 2), ([b"hello", b"world"], len(a)))
self.failUnlessEqual(split_netstring(a+b"++", 2, required_trailer=b"++"),
([b"hello", b"world"], len(a)+2))
self.failUnlessRaises(ValueError,
2020-07-15 14:59:14 +00:00
split_netstring, a+b"+", 2, required_trailer=b"not")
def test_extra(self):
2020-07-15 14:59:14 +00:00
a = netstring(b"hello")
self.failUnlessEqual(split_netstring(a, 1), ([b"hello"], len(a)))
b = netstring(b"hello") + b"extra stuff"
self.failUnlessEqual(split_netstring(b, 1),
2020-07-15 14:59:14 +00:00
([b"hello"], len(a)))
def test_nested(self):
2020-07-15 14:59:14 +00:00
a = netstring(b"hello") + netstring(b"world") + b"extra stuff"
b = netstring(b"a") + netstring(b"is") + netstring(a) + netstring(b".")
(top, pos) = split_netstring(b, 4)
self.failUnlessEqual(len(top), 4)
2020-07-15 14:59:14 +00:00
self.failUnlessEqual(top[0], b"a")
self.failUnlessEqual(top[1], b"is")
self.failUnlessEqual(top[2], a)
2020-07-15 14:59:14 +00:00
self.failUnlessEqual(top[3], b".")
self.failUnlessRaises(ValueError, split_netstring, a, 2, required_trailer=b"")
bottom = split_netstring(a, 2)
2020-07-15 14:59:14 +00:00
self.failUnlessEqual(bottom, ([b"hello", b"world"], len(netstring(b"hello")+netstring(b"world"))))