Fix ConnectionResetError issues and switch to aiohttp version 3.4.4. Fixes #1474.

This commit is contained in:
grossmj 2018-12-03 19:14:22 +08:00
parent 030714ae80
commit f76b329cba
8 changed files with 31 additions and 33 deletions

View File

@ -449,6 +449,7 @@ class Compute:
if action == "ping":
self._cpu_usage_percent = event["cpu_usage_percent"]
self._memory_usage_percent = event["memory_usage_percent"]
#FIXME: slow down number of compute events
self._controller.notification.controller_emit("compute.updated", self.__json__())
else:
await self._controller.notification.dispatch(action, event, compute_id=self.id)

View File

@ -45,14 +45,14 @@ class NotificationHandler:
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with notifications.queue() as queue:
while True:
try:
try:
while True:
notification = await queue.get_json(1)
if ws.closed:
break
await ws.send_str(notification)
except asyncio.futures.CancelledError:
break
finally:
request.app['websockets'].discard(ws)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
return ws

View File

@ -191,8 +191,6 @@ class ProjectHandler:
msg = json.dumps({"action": action, "event": msg}, sort_keys=True)
log.debug("Send notification: %s", msg)
await response.write(("{}\n".format(msg)).encode("utf-8"))
except asyncio.futures.CancelledError as e:
break
except asyncio.futures.TimeoutError:
await response.write("{}\n".format(json.dumps(ProjectHandler._getPingMessage())).encode("utf-8"))
project.stop_listen_queue(queue)

View File

@ -50,11 +50,8 @@ class NotificationHandler:
await response.prepare(request)
with controller.notification.controller_queue() as queue:
while True:
try:
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
except asyncio.futures.CancelledError:
break
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
@Route.get(
r"/notifications/ws",
@ -71,14 +68,15 @@ class NotificationHandler:
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with controller.notification.controller_queue() as queue:
while True:
try:
try:
while True:
notification = await queue.get_json(5)
if ws.closed:
break
await ws.send_str(notification)
except asyncio.futures.CancelledError:
break
finally:
request.app['websockets'].discard(ws)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
return ws

View File

@ -229,11 +229,8 @@ class ProjectHandler:
await response.prepare(request)
with controller.notification.project_queue(project) as queue:
while True:
try:
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
except asyncio.futures.CancelledError as e:
break
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
@ -263,16 +260,16 @@ class ProjectHandler:
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with controller.notification.project_queue(project) as queue:
while True:
try:
try:
while True:
notification = await queue.get_json(5)
if ws.closed:
break
await ws.send_str(notification)
except asyncio.futures.CancelledError:
break
finally:
request.app['websockets'].discard(ws)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking

View File

@ -221,6 +221,7 @@ class Route(object):
response = Response(request=request, route=route)
response.set_status(408)
response.json({"message": "Request canceled", "status": 408})
raise # must raise to let aiohttp know the connection has been closed
except aiohttp.ClientError:
log.warning("Client error")
response = Response(request=request, route=route)

View File

@ -102,7 +102,10 @@ class WebServer:
return
# close websocket connections
for ws in set(self._app['websockets']):
websocket_connections = set(self._app['websockets'])
if websocket_connections:
log.info("Closing {} websocket connections...".format(len(websocket_connections)))
for ws in websocket_connections:
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
if self._server:

View File

@ -1,5 +1,5 @@
jsonschema>=2.4.0
aiohttp==3.2.1
aiohttp==3.4.4
aiohttp-cors==0.7.0
Jinja2>=2.7.3
raven>=5.23.0