diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt index 4454d99e09..15b01bf0f0 100644 --- a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt @@ -56,7 +56,10 @@ class FlowWorker(flowWorkerId: String, private val flowWorkerServiceHub: FlowWor val consumer = session.createConsumer(queueName) val producer = session.createProducer() - consumer.setMessageHandler { message -> handleFlowWorkerMessage(message, session, producer) } + consumer.setMessageHandler { message -> + handleFlowWorkerMessage(message, session, producer) + message.acknowledge() + } thread { (flowWorkerServiceHub.networkService as P2PMessagingClient).run() diff --git a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt index 745216d8e4..705c99af96 100644 --- a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt +++ b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt @@ -84,7 +84,10 @@ class CordaRpcWorkerOps( } val consumer = session.createConsumer(rpcWorkerQueueName) - consumer.setMessageHandler { message -> handleFlowWorkerMessage(message) } + consumer.setMessageHandler { message -> + handleFlowWorkerMessage(message) + message.acknowledge() + } networkMapFeed().updates.subscribe { mapChange: NetworkMapCache.MapChange? -> val networkMapUpdateMessage = NetworkMapUpdate(services.myInfo.legalIdentities.first().name, mapChange!!) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 251cfaf7c1..0e70a85905 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -91,6 +91,7 @@ class BridgeControlListener(val config: MutualSslConfiguration, } catch (ex: Exception) { log.error("Unable to process bridge control message", ex) } + msg.acknowledge() } } @@ -113,6 +114,7 @@ class BridgeControlListener(val config: MutualSslConfiguration, } catch (ex: Exception) { log.error("Unable to process bridge notification message", ex) } + msg.acknowledge() } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 5f06b38572..86a852e739 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -292,6 +292,7 @@ class RPCServer( val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME) log.warn("Detected RPC client disconnect on address $clientAddress, scheduling for reaping") invalidateClient(SimpleString(clientAddress)) + artemisMessage.acknowledge() } private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) { @@ -303,6 +304,7 @@ class RPCServer( val buffer = stopBuffering(clientAddress) buffer?.let { drainBuffer(it) } + artemisMessage.acknowledge() } /**