Drop Python 3.4 and switch to async / await syntax for asyncio. Fixes #1425

This commit is contained in:
grossmj
2018-10-15 17:05:49 +07:00
parent 8baa480b79
commit 76af98404a
119 changed files with 2432 additions and 3084 deletions

View File

@ -25,8 +25,7 @@ import threading
from asyncio.futures import CancelledError
@asyncio.coroutine
def wait_run_in_executor(func, *args, **kwargs):
async def wait_run_in_executor(func, *args, **kwargs):
"""
Run blocking code in a different thread and wait
for the result.
@ -39,12 +38,11 @@ def wait_run_in_executor(func, *args, **kwargs):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, functools.partial(func, *args, **kwargs))
yield from asyncio.wait([future])
await asyncio.wait([future])
return future.result()
@asyncio.coroutine
def cancellable_wait_run_in_executor(func, *args, **kwargs):
async def cancellable_wait_run_in_executor(func, *args, **kwargs):
"""
Run blocking code in a different thread and wait
for the result. Support cancellation.
@ -57,13 +55,12 @@ def cancellable_wait_run_in_executor(func, *args, **kwargs):
stopped_event = threading.Event()
kwargs['stopped_event'] = stopped_event
try:
yield from wait_run_in_executor(func, *args, **kwargs)
await wait_run_in_executor(func, *args, **kwargs)
except CancelledError:
stopped_event.set()
@asyncio.coroutine
def subprocess_check_output(*args, cwd=None, env=None, stderr=False):
async def subprocess_check_output(*args, cwd=None, env=None, stderr=False):
"""
Run a command and capture output
@ -75,11 +72,11 @@ def subprocess_check_output(*args, cwd=None, env=None, stderr=False):
"""
if stderr:
proc = yield from asyncio.create_subprocess_exec(*args, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env)
output = yield from proc.stderr.read()
proc = await asyncio.create_subprocess_exec(*args, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env)
output = await proc.stderr.read()
else:
proc = yield from asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, cwd=cwd, env=env)
output = yield from proc.stdout.read()
proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, cwd=cwd, env=env)
output = await proc.stdout.read()
if output is None:
return ""
# If we received garbage we ignore invalid characters
@ -87,14 +84,13 @@ def subprocess_check_output(*args, cwd=None, env=None, stderr=False):
# and the code of VPCS, dynamips... Will detect it's not the correct binary
return output.decode("utf-8", errors="ignore")
@asyncio.coroutine
def wait_for_process_termination(process, timeout=10):
async def wait_for_process_termination(process, timeout=10):
"""
Wait for a process terminate, and raise asyncio.TimeoutError in case of
timeout.
In theory this can be implemented by just:
yield from asyncio.wait_for(self._iou_process.wait(), timeout=100)
await asyncio.wait_for(self._iou_process.wait(), timeout=100)
But it's broken before Python 3.4:
http://bugs.python.org/issue23140
@ -105,24 +101,23 @@ def wait_for_process_termination(process, timeout=10):
if sys.version_info >= (3, 5):
try:
yield from asyncio.wait_for(process.wait(), timeout=timeout)
await asyncio.wait_for(process.wait(), timeout=timeout)
except ProcessLookupError:
return
else:
while timeout > 0:
if process.returncode is not None:
return
yield from asyncio.sleep(0.1)
await asyncio.sleep(0.1)
timeout -= 0.1
raise asyncio.TimeoutError()
@asyncio.coroutine
def _check_process(process, termination_callback):
async def _check_process(process, termination_callback):
if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
returncode = yield from process.wait()
returncode = await process.wait()
if asyncio.iscoroutinefunction(termination_callback):
yield from termination_callback(returncode)
await termination_callback(returncode)
else:
termination_callback(returncode)
@ -130,22 +125,20 @@ def _check_process(process, termination_callback):
def monitor_process(process, termination_callback):
"""Call termination_callback when a process dies"""
asyncio_ensure_future(_check_process(process, termination_callback))
asyncio.ensure_future(_check_process(process, termination_callback))
@asyncio.coroutine
def wait_for_file_creation(path, timeout=10):
async def wait_for_file_creation(path, timeout=10):
while timeout > 0:
if os.path.exists(path):
return
yield from asyncio.sleep(0.5)
await asyncio.sleep(0.5)
timeout -= 0.5
raise asyncio.TimeoutError()
@asyncio.coroutine
def wait_for_named_pipe_creation(pipe_path, timeout=60):
async def wait_for_named_pipe_creation(pipe_path, timeout=60):
import win32pipe
import pywintypes
@ -154,38 +147,20 @@ def wait_for_named_pipe_creation(pipe_path, timeout=60):
try:
win32pipe.WaitNamedPipe(pipe_path, 1)
except pywintypes.error:
yield from asyncio.sleep(0.5)
await asyncio.sleep(0.5)
timeout -= 0.5
else:
return
raise asyncio.TimeoutError()
#FIXME: Use the following wrapper when we drop Python 3.4 and use the async def syntax
# def locking(f):
#
# @wraps(f)
# async def wrapper(oself, *args, **kwargs):
# lock_name = "__" + f.__name__ + "_lock"
# if not hasattr(oself, lock_name):
# setattr(oself, lock_name, asyncio.Lock())
# async with getattr(oself, lock_name):
# return await f(oself, *args, **kwargs)
# return wrapper
def locking(f):
@functools.wraps(f)
def wrapper(oself, *args, **kwargs):
async def wrapper(oself, *args, **kwargs):
lock_name = "__" + f.__name__ + "_lock"
if not hasattr(oself, lock_name):
setattr(oself, lock_name, asyncio.Lock())
with (yield from getattr(oself, lock_name)):
return (yield from f(oself, *args, **kwargs))
async with getattr(oself, lock_name):
return await f(oself, *args, **kwargs)
return wrapper
#FIXME: conservative approach to supported versions, please remove it when we drop the support to Python < 3.4.4
try:
from asyncio import ensure_future
asyncio_ensure_future = asyncio.ensure_future
except ImportError:
asyncio_ensure_future = getattr(asyncio, 'async')

View File

@ -81,8 +81,7 @@ class EmbedShell:
def welcome_message(self, welcome_message):
self._welcome_message = welcome_message
@asyncio.coroutine
def help(self, *args):
async def help(self, *args):
"""
Show help
"""
@ -105,8 +104,7 @@ class EmbedShell:
res += '\nhelp command for details about a command\n'
return res
@asyncio.coroutine
def _parse_command(self, text):
async def _parse_command(self, text):
cmd = text.split(' ')
found = False
if cmd[0] == '?':
@ -119,22 +117,21 @@ class EmbedShell:
for (name, meth) in inspect.getmembers(self):
if name == cmd[0]:
cmd.pop(0)
res = yield from meth(*cmd)
res = await meth(*cmd)
found = True
break
if not found:
res = ('Command not found {}\n'.format(cmd[0]) + (yield from self.help()))
res = ('Command not found {}\n'.format(cmd[0]) + (await self.help()))
return res
@asyncio.coroutine
def run(self):
async def run(self):
if self._welcome_message:
self._writer.feed_data(self._welcome_message.encode())
while True:
self._writer.feed_data(self._prompt.encode())
result = yield from self._reader.readline()
result = await self._reader.readline()
result = result.decode().strip('\n')
res = yield from self._parse_command(result)
res = await self._parse_command(result)
self._writer.feed_data(res.encode())
def get_commands(self):
@ -208,8 +205,7 @@ class ShellConnection(TelnetConnection):
self.encoding = 'utf-8'
@asyncio.coroutine
def connected(self):
async def connected(self):
# prompt_toolkit internally checks if it's on windows during output rendering but
# we need to force that we use Vt100_Output not Win32_Output
from prompt_toolkit import renderer
@ -235,16 +231,14 @@ class ShellConnection(TelnetConnection):
self._cli._redraw()
@asyncio.coroutine
def disconnected(self):
async def disconnected(self):
pass
def window_size_changed(self, columns, rows):
self._size = Size(rows=rows, columns=columns)
self._cb.terminal_size_changed()
@asyncio.coroutine
def feed(self, data):
async def feed(self, data):
data = data.decode()
self._inputstream.feed(data)
self._cli._redraw()
@ -260,7 +254,7 @@ class ShellConnection(TelnetConnection):
command = returned_value.text
res = yield from self._shell._parse_command(command)
res = await self._shell._parse_command(command)
self.send(res.encode())
self.reset()
@ -305,20 +299,18 @@ def create_stdin_shell(shell, loop=None):
:param loop: The event loop
:returns: Telnet server
"""
@asyncio.coroutine
def feed_stdin(loop, reader, shell):
async def feed_stdin(loop, reader, shell):
history = InMemoryHistory()
completer = WordCompleter([name for name, _ in shell.get_commands()], ignore_case=True)
while True:
line = yield from prompt(
line = await prompt(
">", patch_stdout=True, return_asyncio_coroutine=True, history=history, completer=completer)
line += '\n'
reader.feed_data(line.encode())
@asyncio.coroutine
def read_stdout(writer):
async def read_stdout(writer):
while True:
c = yield from writer.read(1)
c = await writer.read(1)
print(c.decode(), end='')
sys.stdout.flush()
@ -339,22 +331,20 @@ if __name__ == '__main__':
class Demo(EmbedShell):
@asyncio.coroutine
def hello(self, *args):
async def hello(self, *args):
"""
Hello world
This command accept arguments: hello tutu will display tutu
"""
@asyncio.coroutine
def world():
yield from asyncio.sleep(2)
async def world():
await asyncio.sleep(2)
if len(args):
return ' '.join(args)
else:
return 'world\n'
return (yield from world())
return (await world())
# Demo using telnet
shell = Demo(welcome_message="Welcome!\n")

View File

@ -30,8 +30,7 @@ class Pool():
def append(self, task, *args, **kwargs):
self._tasks.append((task, args, kwargs))
@asyncio.coroutine
def join(self):
async def join(self):
"""
Wait for all task to finish
"""
@ -41,7 +40,7 @@ class Pool():
while len(self._tasks) > 0 and len(pending) < self._concurrency:
task, args, kwargs = self._tasks.pop(0)
pending.add(task(*args, **kwargs))
(done, pending) = yield from asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
(done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
if task.exception():
exceptions.add(task.exception())
@ -50,10 +49,9 @@ class Pool():
def main():
@asyncio.coroutine
def task(id):
async def task(id):
print("Run", id)
yield from asyncio.sleep(0.5)
await asyncio.sleep(0.5)
pool = Pool(concurrency=5)
for i in range(1, 20):

View File

@ -20,8 +20,6 @@ import copy
import asyncio
import asyncio.subprocess
from gns3server.utils.asyncio import asyncio_ensure_future
import logging
log = logging.getLogger(__name__)
@ -44,24 +42,22 @@ class AsyncioRawCommandServer:
# We limit number of process
self._lock = asyncio.Semaphore(value=4)
@asyncio.coroutine
def run(self, network_reader, network_writer):
yield from self._lock.acquire()
process = yield from asyncio.subprocess.create_subprocess_exec(*self._command,
async def run(self, network_reader, network_writer):
await self._lock.acquire()
process = await asyncio.subprocess.create_subprocess_exec(*self._command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.PIPE)
try:
yield from self._process(network_reader, network_writer, process.stdout, process.stdin)
await self._process(network_reader, network_writer, process.stdout, process.stdin)
except ConnectionResetError:
network_writer.close()
if process.returncode is None:
process.kill()
yield from process.wait()
await process.wait()
self._lock.release()
@asyncio.coroutine
def _process(self, network_reader, network_writer, process_reader, process_writer):
async def _process(self, network_reader, network_writer, process_reader, process_writer):
replaces = []
# Server host from the client point of view
host = network_writer.transport.get_extra_info("sockname")[0]
@ -71,12 +67,12 @@ class AsyncioRawCommandServer:
else:
replaces.append((replace[0], replace[1], ))
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE))
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE))
reader_read = asyncio.ensure_future(process_reader.read(READ_SIZE))
timeout = 30
while True:
done, pending = yield from asyncio.wait(
done, pending = await asyncio.wait(
[
network_read,
reader_read
@ -91,22 +87,22 @@ class AsyncioRawCommandServer:
if network_reader.at_eof():
raise ConnectionResetError()
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE))
process_writer.write(data)
yield from process_writer.drain()
await process_writer.drain()
elif coro == reader_read:
if process_reader.at_eof():
raise ConnectionResetError()
reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE))
reader_read = asyncio.ensure_future(process_reader.read(READ_SIZE))
for replace in replaces:
data = data.replace(replace[0], replace[1])
timeout = 2 # We reduce the timeout when the process start to return stuff to avoid problem with server not closing the connection
network_writer.write(data)
yield from network_writer.drain()
await network_writer.drain()
if __name__ == '__main__':

View File

@ -47,8 +47,7 @@ class SerialReaderWriterProtocol(asyncio.Protocol):
if self.transport:
self.transport.write(data)
@asyncio.coroutine
def drain(self):
async def drain(self):
pass
def connection_made(self, transport):
@ -72,13 +71,12 @@ class WindowsPipe:
self._handle = open(path, "a+b")
self._pipe = msvcrt.get_osfhandle(self._handle.fileno())
@asyncio.coroutine
def read(self, n=-1):
async def read(self, n=-1):
(read, num_avail, num_message) = win32pipe.PeekNamedPipe(self._pipe, 0)
if num_avail > 0:
(error_code, output) = win32file.ReadFile(self._pipe, num_avail, None)
return output
yield from asyncio.sleep(0.01)
await asyncio.sleep(0.01)
return b""
def at_eof(self):
@ -87,16 +85,14 @@ class WindowsPipe:
def write(self, data):
win32file.WriteFile(self._pipe, data)
@asyncio.coroutine
def drain(self):
async def drain(self):
return
def close(self):
pass
@asyncio.coroutine
def _asyncio_open_serial_windows(path):
async def _asyncio_open_serial_windows(path):
"""
Open a windows named pipe
@ -104,14 +100,13 @@ def _asyncio_open_serial_windows(path):
"""
try:
yield from wait_for_named_pipe_creation(path)
await wait_for_named_pipe_creation(path)
except asyncio.TimeoutError:
raise NodeError('Pipe file "{}" is missing'.format(path))
return WindowsPipe(path)
@asyncio.coroutine
def _asyncio_open_serial_unix(path):
async def _asyncio_open_serial_unix(path):
"""
Open a unix socket or a windows named pipe
@ -120,20 +115,19 @@ def _asyncio_open_serial_unix(path):
try:
# wait for VM to create the pipe file.
yield from wait_for_file_creation(path)
await wait_for_file_creation(path)
except asyncio.TimeoutError:
raise NodeError('Pipe file "{}" is missing'.format(path))
output = SerialReaderWriterProtocol()
try:
yield from asyncio.get_event_loop().create_unix_connection(lambda: output, path)
await asyncio.get_event_loop().create_unix_connection(lambda: output, path)
except ConnectionRefusedError:
raise NodeError('Can\'t open pipe file "{}"'.format(path))
return output
@asyncio.coroutine
def asyncio_open_serial(path):
async def asyncio_open_serial(path):
"""
Open a unix socket or a windows named pipe
@ -141,6 +135,6 @@ def asyncio_open_serial(path):
"""
if sys.platform.startswith("win"):
return (yield from _asyncio_open_serial_windows(path))
return (await _asyncio_open_serial_windows(path))
else:
return (yield from _asyncio_open_serial_unix(path))
return (await _asyncio_open_serial_unix(path))

View File

@ -15,13 +15,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import asyncio
import asyncio.subprocess
import struct
from gns3server.utils.asyncio import asyncio_ensure_future
import logging
log = logging.getLogger(__name__)
@ -75,13 +72,11 @@ class TelnetConnection(object):
def writer(self):
return self._writer
@asyncio.coroutine
def connected(self):
async def connected(self):
"""Method called when client is connected"""
pass
@asyncio.coroutine
def disconnected(self):
async def disconnected(self):
"""Method called when client is disconnecting"""
pass
@ -90,8 +85,7 @@ class TelnetConnection(object):
`naws` flag is enable in server configuration."""
pass
@asyncio.coroutine
def feed(self, data):
async def feed(self, data):
"""
Handles incoming data
:return:
@ -148,8 +142,7 @@ class AsyncioTelnetServer:
self._connection_factory = connection_factory
@staticmethod
@asyncio.coroutine
def write_client_intro(writer, echo=False):
async def write_client_intro(writer, echo=False):
# Send initial telnet session opening
if echo:
writer.write(bytes([IAC, WILL, ECHO]))
@ -157,10 +150,9 @@ class AsyncioTelnetServer:
writer.write(bytes([
IAC, WONT, ECHO,
IAC, DONT, ECHO]))
yield from writer.drain()
await writer.drain()
@asyncio.coroutine
def _write_intro(self, writer, binary=False, echo=False, naws=False):
async def _write_intro(self, writer, binary=False, echo=False, naws=False):
# Send initial telnet session opening
if echo:
writer.write(bytes([IAC, WILL, ECHO]))
@ -185,20 +177,19 @@ class AsyncioTelnetServer:
writer.write(bytes([
IAC, DO, NAWS
]))
yield from writer.drain()
await writer.drain()
@asyncio.coroutine
def run(self, network_reader, network_writer):
async def run(self, network_reader, network_writer):
# Keep track of connected clients
connection = self._connection_factory(network_reader, network_writer)
self._connections[network_writer] = connection
try:
yield from self._write_intro(network_writer, echo=self._echo, binary=self._binary, naws=self._naws)
yield from connection.connected()
yield from self._process(network_reader, network_writer, connection)
await self._write_intro(network_writer, echo=self._echo, binary=self._binary, naws=self._naws)
await connection.connected()
await self._process(network_reader, network_writer, connection)
except ConnectionError:
with (yield from self._lock):
async with self._lock:
network_writer.close()
if self._reader_process == network_reader:
self._reader_process = None
@ -206,53 +197,49 @@ class AsyncioTelnetServer:
if self._current_read is not None:
self._current_read.cancel()
yield from connection.disconnected()
await connection.disconnected()
del self._connections[network_writer]
@asyncio.coroutine
def close(self):
async def close(self):
for writer, connection in self._connections.items():
try:
writer.write_eof()
yield from writer.drain()
await writer.drain()
except (AttributeError, ConnectionError):
continue
@asyncio.coroutine
def client_connected_hook(self):
async def client_connected_hook(self):
pass
@asyncio.coroutine
def _get_reader(self, network_reader):
async def _get_reader(self, network_reader):
"""
Get a reader or None if another reader is already reading.
"""
with (yield from self._lock):
async with self._lock:
if self._reader_process is None:
self._reader_process = network_reader
if self._reader:
if self._reader_process == network_reader:
self._current_read = asyncio_ensure_future(self._reader.read(READ_SIZE))
self._current_read = asyncio.ensure_future(self._reader.read(READ_SIZE))
return self._current_read
return None
@asyncio.coroutine
def _process(self, network_reader, network_writer, connection):
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
reader_read = yield from self._get_reader(network_reader)
async def _process(self, network_reader, network_writer, connection):
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE))
reader_read = await self._get_reader(network_reader)
while True:
if reader_read is None:
reader_read = yield from self._get_reader(network_reader)
reader_read = await self._get_reader(network_reader)
if reader_read is None:
done, pending = yield from asyncio.wait(
done, pending = await asyncio.wait(
[
network_read,
],
timeout=1,
return_when=asyncio.FIRST_COMPLETED)
else:
done, pending = yield from asyncio.wait(
done, pending = await asyncio.wait(
[
network_read,
reader_read
@ -264,10 +251,10 @@ class AsyncioTelnetServer:
if network_reader.at_eof():
raise ConnectionResetError()
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE))
if IAC in data:
data = yield from self._IAC_parser(data, network_reader, network_writer, connection)
data = await self._IAC_parser(data, network_reader, network_writer, connection)
if len(data) == 0:
continue
@ -277,9 +264,9 @@ class AsyncioTelnetServer:
if self._writer:
self._writer.write(data)
yield from self._writer.drain()
await self._writer.drain()
yield from connection.feed(data)
await connection.feed(data)
if connection.is_closing:
raise ConnectionResetError()
@ -287,22 +274,21 @@ class AsyncioTelnetServer:
if self._reader and self._reader.at_eof():
raise ConnectionResetError()
reader_read = yield from self._get_reader(network_reader)
reader_read = await self._get_reader(network_reader)
# Replicate the output on all clients
for connection in self._connections.values():
connection.writer.write(data)
yield from connection.writer.drain()
await connection.writer.drain()
@asyncio.coroutine
def _read(self, cmd, buffer, location, reader):
async def _read(self, cmd, buffer, location, reader):
""" Reads next op from the buffer or reader"""
try:
op = buffer[location]
cmd.append(op)
return op
except IndexError:
op = yield from reader.read(1)
op = await reader.read(1)
buffer.extend(op)
cmd.append(buffer[location])
return op
@ -320,8 +306,7 @@ class AsyncioTelnetServer:
else:
log.debug("Not supported negotiation sequence, received {} bytes", len(data))
@asyncio.coroutine
def _IAC_parser(self, buf, network_reader, network_writer, connection):
async def _IAC_parser(self, buf, network_reader, network_writer, connection):
"""
Processes and removes any Telnet commands from the buffer.
@ -342,7 +327,7 @@ class AsyncioTelnetServer:
try:
iac_cmd.append(buf[iac_loc + 1])
except IndexError:
d = yield from network_reader.read(1)
d = await network_reader.read(1)
buf.extend(d)
iac_cmd.append(buf[iac_loc + 1])
@ -366,7 +351,7 @@ class AsyncioTelnetServer:
elif iac_cmd[1] == SB: # starts negotiation commands
negotiation = []
for pos in range(2, self.MAX_NEGOTIATION_READ):
op = yield from self._read(iac_cmd, buf, iac_loc + pos, network_reader)
op = await self._read(iac_cmd, buf, iac_loc + pos, network_reader)
negotiation.append(op)
if op == SE:
# ends negotiation commands
@ -380,7 +365,7 @@ class AsyncioTelnetServer:
try:
iac_cmd.append(buf[iac_loc + 2])
except IndexError:
d = yield from network_reader.read(1)
d = await network_reader.read(1)
buf.extend(d)
iac_cmd.append(buf[iac_loc + 2])
# We do ECHO, SGA, and BINARY. Period.
@ -413,7 +398,7 @@ class AsyncioTelnetServer:
# Remove the entire TELNET command from the buffer
buf = buf.replace(iac_cmd, b'', 1)
yield from network_writer.drain()
await network_writer.drain()
# Return the new copy of the buffer, minus telnet commands
return buf
@ -422,7 +407,7 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
process = loop.run_until_complete(asyncio_ensure_future(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i",
process = loop.run_until_complete(asyncio.ensure_future(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.PIPE)))