From 9ff1ad7769f8e87d8d6e2c9ee4c756ce5783f305 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 24 Aug 2016 15:27:35 +0100 Subject: [PATCH] node: Fix redelivery race --- .../messaging/ArtemisMessagingClient.kt | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt index 8fda2fde18..cad2869476 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -70,6 +70,9 @@ class ArtemisMessagingClient(directory: Path, var consumer: ClientConsumer? = null var session: ClientSession? = null var clientFactory: ClientSessionFactory? = null + + // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. + var undeliveredMessages = listOf() } /** A registration to handle messages of different types */ @@ -85,9 +88,6 @@ class ArtemisMessagingClient(directory: Path, private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() - // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. - private var undeliveredMessages = listOf() - init { require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } } @@ -214,7 +214,9 @@ class ArtemisMessagingClient(directory: Path, // This is a hack; transient messages held in memory isn't crash resistant. // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. - undeliveredMessages += msg + state.locked { + undeliveredMessages += msg + } return false } @@ -312,8 +314,11 @@ class ArtemisMessagingClient(directory: Path, require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } val handler = Handler(executor, topicSession, callback) handlers.add(handler) - val messagesToRedeliver = undeliveredMessages - undeliveredMessages = listOf() + val messagesToRedeliver = state.locked { + val messagesToRedeliver = undeliveredMessages + undeliveredMessages = listOf() + messagesToRedeliver + } messagesToRedeliver.forEach { deliver(it) } return handler }