mirror of
https://github.com/GNS3/gns3-server.git
synced 2025-06-22 00:41:56 +00:00
Use black with -l 120 param.
This commit is contained in:
@ -46,7 +46,7 @@ class EmbedShell:
|
||||
self._loop = loop
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
self._prompt = '> '
|
||||
self._prompt = "> "
|
||||
self._welcome_message = welcome_message
|
||||
|
||||
@property
|
||||
@ -85,30 +85,30 @@ class EmbedShell:
|
||||
"""
|
||||
Show help
|
||||
"""
|
||||
res = ''
|
||||
res = ""
|
||||
if len(args) == 0:
|
||||
res = 'Help:\n'
|
||||
res = "Help:\n"
|
||||
for name, value in inspect.getmembers(self):
|
||||
if not inspect.isgeneratorfunction(value):
|
||||
continue
|
||||
if name.startswith('_') or (len(args) and name != args[0]) or name == 'run':
|
||||
if name.startswith("_") or (len(args) and name != args[0]) or name == "run":
|
||||
continue
|
||||
doc = inspect.getdoc(value)
|
||||
res += name
|
||||
if len(args) and doc:
|
||||
res += ': ' + doc
|
||||
res += ": " + doc
|
||||
elif doc:
|
||||
res += ': ' + doc.split('\n')[0]
|
||||
res += '\n'
|
||||
res += ": " + doc.split("\n")[0]
|
||||
res += "\n"
|
||||
if len(args) == 0:
|
||||
res += '\nhelp command for details about a command\n'
|
||||
res += "\nhelp command for details about a command\n"
|
||||
return res
|
||||
|
||||
async def _parse_command(self, text):
|
||||
cmd = text.split(' ')
|
||||
cmd = text.split(" ")
|
||||
found = False
|
||||
if cmd[0] == '?':
|
||||
cmd[0] = 'help'
|
||||
if cmd[0] == "?":
|
||||
cmd[0] = "help"
|
||||
|
||||
# when there is no command specified just return empty result
|
||||
if not cmd[0].strip():
|
||||
@ -121,7 +121,7 @@ class EmbedShell:
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
res = (f'Command not found {cmd[0]}\n' + (await self.help()))
|
||||
res = f"Command not found {cmd[0]}\n" + (await self.help())
|
||||
return res
|
||||
|
||||
async def run(self):
|
||||
@ -130,7 +130,7 @@ class EmbedShell:
|
||||
while True:
|
||||
self._writer.feed_data(self._prompt.encode())
|
||||
result = await self._reader.readline()
|
||||
result = result.decode().strip('\n')
|
||||
result = result.decode().strip("\n")
|
||||
res = await self._parse_command(result)
|
||||
self._writer.feed_data(res.encode())
|
||||
|
||||
@ -143,7 +143,7 @@ class EmbedShell:
|
||||
for name, value in inspect.getmembers(self):
|
||||
if not inspect.isgeneratorfunction(value):
|
||||
continue
|
||||
if name.startswith('_') or name == 'run':
|
||||
if name.startswith("_") or name == "run":
|
||||
continue
|
||||
doc = inspect.getdoc(value)
|
||||
commands.append((name, doc))
|
||||
@ -156,22 +156,23 @@ class PatchedStdinInput(StdinInput):
|
||||
Fixes issue when PyCharm runs own terminal without emulation.
|
||||
https://github.com/GNS3/gns3-server/issues/1172
|
||||
"""
|
||||
|
||||
def __init__(self, stdin=None):
|
||||
self.stdin = stdin or sys.stdin
|
||||
try:
|
||||
self.stdin.fileno()
|
||||
except io.UnsupportedOperation:
|
||||
if 'idlelib.run' in sys.modules:
|
||||
raise io.UnsupportedOperation(
|
||||
'Stdin is not a terminal. Running from Idle is not supported.')
|
||||
if "idlelib.run" in sys.modules:
|
||||
raise io.UnsupportedOperation("Stdin is not a terminal. Running from Idle is not supported.")
|
||||
else:
|
||||
raise io.UnsupportedOperation('Stdin is not a terminal.')
|
||||
raise io.UnsupportedOperation("Stdin is not a terminal.")
|
||||
|
||||
|
||||
class UnstoppableEventLoop(EventLoop):
|
||||
"""
|
||||
Partially fake event loop which cannot be stopped by CommandLineInterface
|
||||
"""
|
||||
|
||||
def __init__(self, loop):
|
||||
self._loop = loop
|
||||
|
||||
@ -202,13 +203,13 @@ class ShellConnection(TelnetConnection):
|
||||
self._cli = None
|
||||
self._cb = None
|
||||
self._size = Size(rows=40, columns=79)
|
||||
self.encoding = 'utf-8'
|
||||
|
||||
self.encoding = "utf-8"
|
||||
|
||||
async def connected(self):
|
||||
# prompt_toolkit internally checks if it's on windows during output rendering but
|
||||
# we need to force that we use Vt100_Output not Win32_Output
|
||||
from prompt_toolkit import renderer
|
||||
|
||||
renderer.is_windows = lambda: False
|
||||
|
||||
def get_size():
|
||||
@ -218,7 +219,8 @@ class ShellConnection(TelnetConnection):
|
||||
application=create_prompt_application(self._shell.prompt),
|
||||
eventloop=UnstoppableEventLoop(create_asyncio_eventloop(self._loop)),
|
||||
input=PatchedStdinInput(sys.stdin),
|
||||
output=Vt100_Output(self, get_size))
|
||||
output=Vt100_Output(self, get_size),
|
||||
)
|
||||
|
||||
self._cb = self._cli.create_eventloop_callbacks()
|
||||
self._inputstream = InputStream(self._cb.feed_key)
|
||||
@ -302,19 +304,21 @@ def create_stdin_shell(shell, loop=None):
|
||||
:param loop: The event loop
|
||||
:returns: Telnet server
|
||||
"""
|
||||
|
||||
async def feed_stdin(loop, reader, shell):
|
||||
history = InMemoryHistory()
|
||||
completer = WordCompleter([name for name, _ in shell.get_commands()], ignore_case=True)
|
||||
while True:
|
||||
line = await prompt(
|
||||
">", patch_stdout=True, return_asyncio_coroutine=True, history=history, completer=completer)
|
||||
line += '\n'
|
||||
">", patch_stdout=True, return_asyncio_coroutine=True, history=history, completer=completer
|
||||
)
|
||||
line += "\n"
|
||||
reader.feed_data(line.encode())
|
||||
|
||||
async def read_stdout(writer):
|
||||
while True:
|
||||
c = await writer.read(1)
|
||||
print(c.decode(), end='')
|
||||
print(c.decode(), end="")
|
||||
sys.stdout.flush()
|
||||
|
||||
reader = asyncio.StreamReader()
|
||||
@ -329,30 +333,31 @@ def create_stdin_shell(shell, loop=None):
|
||||
shell_task = loop.create_task(shell.run())
|
||||
return asyncio.gather(shell_task, writer_task, reader_task)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
class Demo(EmbedShell):
|
||||
|
||||
async def hello(self, *args):
|
||||
"""
|
||||
Hello world
|
||||
|
||||
This command accept arguments: hello tutu will display tutu
|
||||
"""
|
||||
|
||||
async def world():
|
||||
await asyncio.sleep(2)
|
||||
if len(args):
|
||||
return ' '.join(args)
|
||||
return " ".join(args)
|
||||
else:
|
||||
return 'world\n'
|
||||
return "world\n"
|
||||
|
||||
return await world()
|
||||
|
||||
# Demo using telnet
|
||||
shell = Demo(welcome_message="Welcome!\n")
|
||||
server = create_telnet_shell(shell, loop=loop)
|
||||
coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop)
|
||||
coro = asyncio.start_server(server.run, "127.0.0.1", 4444, loop=loop)
|
||||
s = loop.run_until_complete(coro)
|
||||
try:
|
||||
loop.run_forever()
|
||||
|
Reference in New Issue
Block a user