From cb7a0229a8c876b5799825f01ce98515e397c5de Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Fri, 16 Feb 2018 16:03:39 +0000 Subject: [PATCH] Fix a hang in AMQP protocol code that occurs when pausing in debugger causes protocol timeout, but wasn't driving event procesing to actuially kill the socket. (#2557) --- .../internal/protonwrapper/engine/ConnectionStateMachine.kt | 3 +++ .../nodeapi/internal/protonwrapper/engine/EventProcessor.kt | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 1ab83fcc4e..473c28876c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -168,12 +168,14 @@ internal class ConnectionStateMachine(serverMode: Boolean, val transport = event.transport log.debug { "Transport Head Closed $transport" } transport.close_tail() + onTransportInternal(transport) } override fun onTransportTailClosed(event: Event) { val transport = event.transport log.debug { "Transport Tail Closed $transport" } transport.close_head() + onTransportInternal(transport) } override fun onTransportClosed(event: Event) { @@ -195,6 +197,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, } else { log.info("Error (no description returned).") } + onTransportInternal(transport) } override fun onTransport(event: Event) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt index 14c21b97f2..c02c6f2541 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt @@ -79,7 +79,10 @@ internal class EventProcessor(channel: Channel, if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) { val now = System.currentTimeMillis() val tickDelay = Math.max(0L, connection.transport.tick(now) - now) - executor.schedule({ tick(connection) }, tickDelay, TimeUnit.MILLISECONDS) + executor.schedule({ + tick(connection) + processEvents() + }, tickDelay, TimeUnit.MILLISECONDS) } } catch (ex: Exception) { connection.transport.close()