From a964073c2f811fa1498aa83b8a6b5ab7b4d5f50c Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Tue, 20 Sep 2016 13:55:57 +0100 Subject: [PATCH] Track message id's to deduplicate replays. Widen the auto-acknowledgement window of Artemis back to the default. Use synchronized wrapper over set. Drop discard message to trace level logging. Fix code layout Use lazy trace extension method Track message id's to deduplicate replays. Widen the auto-acknowledgement window of Artemis back to the default. Use synchronized wrapper over set. Include tx message unique id in checkpointed data. Add test for checkpointed resend Fix bug in not getting UUID off message. Tidy formatting Add explanation comments to test asserts Put unique id even on Client messages. Tidy formatting --- .../com/r3corda/client/NodeMonitorClient.kt | 1 + .../r3corda/client/impl/CordaRPCClientImpl.kt | 3 ++ .../client/ClientRPCInfrastructureTests.kt | 8 ++- .../com/r3corda/core/messaging/Messaging.kt | 37 +++++++------ .../com/r3corda/node/internal/AbstractNode.kt | 1 + .../node/services/api/AbstractNodeService.kt | 1 + .../services/messaging/NodeMessagingClient.kt | 52 +++++++++--------- .../services/monitor/NodeMonitorService.kt | 1 + .../network/InMemoryNetworkMapCache.kt | 5 +- .../services/network/NetworkMapService.kt | 1 + .../statemachine/ProtocolIORequest.kt | 6 ++- .../statemachine/ProtocolStateMachineImpl.kt | 5 +- .../statemachine/StateMachineManager.kt | 31 +++++++---- .../node/messaging/InMemoryMessagingTests.kt | 9 ++-- .../node/services/ArtemisMessagingTests.kt | 1 + .../node/services/NodeMonitorServiceTests.kt | 1 + .../statemachine/StateMachineManagerTests.kt | 53 ++++++++++++++++++- .../testing/node/InMemoryMessagingNetwork.kt | 40 +++++++------- 18 files changed, 169 insertions(+), 87 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt b/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt index 36b5c4eb6e..83b25b18c8 100644 --- a/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt +++ b/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.NodeInfo import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize diff --git a/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt b/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt index 485c294bcb..2850eede20 100644 --- a/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt +++ b/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt @@ -16,6 +16,7 @@ import com.r3corda.core.utilities.debug import com.r3corda.core.utilities.trace import com.r3corda.node.services.messaging.* import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException +import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientProducer @@ -212,6 +213,8 @@ class CordaRPCClientImpl(private val session: ClientSession, return session.createMessage(false).apply { putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name) putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } } diff --git a/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt b/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt index 625668c547..c45f3bb61c 100644 --- a/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt +++ b/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt @@ -21,6 +21,7 @@ import org.junit.Test import rx.Observable import rx.subjects.PublishSubject import java.io.Closeable +import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.locks.ReentrantLock @@ -57,8 +58,11 @@ class ClientRPCInfrastructureTests { producer = serverSession.createProducer() val dispatcher = object : RPCDispatcher(TestOps()) { override fun send(bits: SerializedBytes<*>, toAddress: String) { - val msg = serverSession.createMessage(false) - msg.writeBodyBufferBytes(bits.bits) + val msg = serverSession.createMessage(false).apply { + writeBodyBufferBytes(bits.bits) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } producer.send(toAddress, msg) } } diff --git a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt index 4922f2f304..83867a4047 100644 --- a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt @@ -4,6 +4,7 @@ import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef import com.r3corda.core.serialization.serialize import java.time.Instant +import java.util.* import java.util.concurrent.Executor import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.ThreadSafe @@ -73,27 +74,28 @@ interface MessagingService { */ fun send(message: Message, target: MessageRecipients) - /** - * Returns an initialised [Message] with the current time, etc, already filled in. - * - * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". - * Must not be blank. - * @param sessionID identifier for the session the message is part of. For messages sent to services before the - * construction of a session, use [DEFAULT_SESSION_ID]. - */ - fun createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message - /** * Returns an initialised [Message] with the current time, etc, already filled in. * * @param topicSession identifier for the topic and session the message is sent to. */ - fun createMessage(topicSession: TopicSession, data: ByteArray): Message + fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message /** Returns an address that refers to this node. */ val myAddress: SingleMessageRecipient } +/** + * Returns an initialised [Message] with the current time, etc, already filled in. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * Must not be blank. + * @param sessionID identifier for the session the message is part of. For messages sent to services before the + * construction of a session, use [DEFAULT_SESSION_ID]. + */ +fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message + = createMessage(TopicSession(topic, sessionID), data) + /** * Registers a handler for the given topic and session ID that runs the given callback with the message and then removes * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback @@ -106,7 +108,7 @@ interface MessagingService { * a session is established, use [DEFAULT_SESSION_ID]. */ fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: Executor? = null, callback: (Message) -> Unit) - = runOnNextMessage(TopicSession(topic, sessionID), executor, callback) + = runOnNextMessage(TopicSession(topic, sessionID), executor, callback) /** * Registers a handler for the given topic and session that runs the given callback with the message and then removes @@ -125,11 +127,11 @@ fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Exec } } -fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients) - = send(TopicSession(topic, sessionID), payload, to) +fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) + = send(TopicSession(topic, sessionID), payload, to, uuid) -fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients) - = send(createMessage(topicSession, payload.serialize().bits), to) +fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) + = send(createMessage(topicSession, payload.serialize().bits, uuid), to) interface MessageHandlerRegistration @@ -145,6 +147,7 @@ data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION companion object { val Blank = TopicSession("", DEFAULT_SESSION_ID) } + fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID override fun toString(): String = "$topic.$sessionID" @@ -164,7 +167,7 @@ interface Message { val topicSession: TopicSession val data: ByteArray val debugTimestamp: Instant - val debugMessageID: String + val uniqueMessageId: UUID fun serialise(): ByteArray } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 72ff00b1f3..6059176876 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -7,6 +7,7 @@ import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.RunOnCallerThread import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.messaging.createMessage import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.node.CityDatabase import com.r3corda.core.node.CordaPluginRegistry diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt index ab27742dea..226251db7a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt @@ -3,6 +3,7 @@ package com.r3corda.node.services.api import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.MessageHandlerRegistration +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.serialization.SingletonSerializeAsToken diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt index e2efc3e1b2..49dd6cc0a3 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt @@ -85,7 +85,7 @@ class NodeMessagingClient(config: NodeConfiguration, var rpcNotificationConsumer: ClientConsumer? = null // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. - var undeliveredMessages = listOf>() + var undeliveredMessages = listOf() } /** A registration to handle messages of different types */ @@ -122,7 +122,7 @@ class NodeMessagingClient(config: NodeConfiguration, clientFactory = locator.createSessionFactory() // Create a session. Note that the acknowledgement of messages is not flushed to - // the DB until the default buffer size of 1MB is acknowledged. + // the Artermis journal until the default buffer size of 1MB is acknowledged. val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) this.session = session session.start() @@ -179,11 +179,9 @@ class NodeMessagingClient(config: NodeConfiguration, null } ?: break - // Use the magic deduplication property built into Artemis as our message identity too - val uuid = UUID.fromString(artemisMessage.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) - val message: Message? = artemisToCordaMessage(artemisMessage, uuid) + val message: Message? = artemisToCordaMessage(artemisMessage) if (message != null) - deliver(message, uuid) + deliver(message) // Ack the message so it won't be redelivered. We should only really do this when there were no // transient failures. If we caught an exception in the handler, we could back off and retry delivery @@ -203,7 +201,7 @@ class NodeMessagingClient(config: NodeConfiguration, shutdownLatch.countDown() } - private fun artemisToCordaMessage(message: ClientMessage, uuid: UUID): Message? { + private fun artemisToCordaMessage(message: ClientMessage): Message? { try { if (!message.containsProperty(TOPIC_PROPERTY)) { log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") @@ -215,6 +213,8 @@ class NodeMessagingClient(config: NodeConfiguration, } val topic = message.getStringProperty(TOPIC_PROPERTY) val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) + // Use the magic deduplication property built into Artemis as our message identity too + val uuid = UUID.fromString(message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid") val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } @@ -223,7 +223,7 @@ class NodeMessagingClient(config: NodeConfiguration, override val topicSession = TopicSession(topic, sessionID) override val data: ByteArray = body override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) - override val debugMessageID: String = message.messageID.toString() + override val uniqueMessageId: UUID = uuid override fun serialise(): ByteArray = body override fun toString() = topic + "#" + data.opaque() } @@ -235,7 +235,7 @@ class NodeMessagingClient(config: NodeConfiguration, } } - private fun deliver(msg: Message, uuid: UUID): Boolean { + private fun deliver(msg: Message): Boolean { state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. @@ -249,7 +249,7 @@ class NodeMessagingClient(config: NodeConfiguration, // This is a hack; transient messages held in memory isn't crash resistant. // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. state.locked { - undeliveredMessages += Pair(msg, uuid) + undeliveredMessages += msg } return false } @@ -268,10 +268,10 @@ class NodeMessagingClient(config: NodeConfiguration, // interpret persistent as "server" and non-persistent as "client". if (persistentInbox) { databaseTransaction { - callHandlers(msg, uuid, deliverTo) + callHandlers(msg, deliverTo) } } else { - callHandlers(msg, uuid, deliverTo) + callHandlers(msg, deliverTo) } } } catch(e: Exception) { @@ -280,16 +280,16 @@ class NodeMessagingClient(config: NodeConfiguration, return true } - private fun callHandlers(msg: Message, uuid: UUID, deliverTo: List) { - if (uuid in processedMessages) { - log.trace { "discard duplicate message $uuid for ${msg.topicSession}" } + private fun callHandlers(msg: Message, deliverTo: List) { + if (msg.uniqueMessageId in processedMessages) { + log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" } return } for (handler in deliverTo) { handler.callback(msg, handler) } // TODO We will at some point need to decide a trimming policy for the id's - processedMessages += uuid + processedMessages += msg.uniqueMessageId } override fun stop() { @@ -332,7 +332,6 @@ class NodeMessagingClient(config: NodeConfiguration, override fun send(message: Message, target: MessageRecipients) { val queueName = toQueueName(target) - val uuid = UUID.randomUUID() state.locked { val artemisMessage = session!!.createMessage(true).apply { val sessionID = message.topicSession.sessionID @@ -340,13 +339,13 @@ class NodeMessagingClient(config: NodeConfiguration, putLongProperty(SESSION_ID_PROPERTY, sessionID) writeBodyBufferBytes(message.data) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(uuid.toString())) + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } if (knownQueues.add(queueName)) { maybeCreateQueue(queueName) } - log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $uuid") + log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $message.uniqueMessageId") producer!!.send(queueName, artemisMessage) } } @@ -376,7 +375,7 @@ class NodeMessagingClient(config: NodeConfiguration, undeliveredMessages = listOf() messagesToRedeliver } - messagesToRedeliver.forEach { deliver(it.first, it.second) } + messagesToRedeliver.forEach { deliver(it) } return handler } @@ -384,25 +383,26 @@ class NodeMessagingClient(config: NodeConfiguration, handlers.remove(registration) } - override fun createMessage(topicSession: TopicSession, data: ByteArray): Message { + override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message { // TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying. return object : Message { override val topicSession: TopicSession get() = topicSession override val data: ByteArray get() = data override val debugTimestamp: Instant = Instant.now() override fun serialise(): ByteArray = this.serialise() - override val debugMessageID: String get() = Instant.now().toEpochMilli().toString() + override val uniqueMessageId: UUID = uuid override fun toString() = topicSession.toString() + "#" + String(data) } } - override fun createMessage(topic: String, sessionID: Long, data: ByteArray) = createMessage(TopicSession(topic, sessionID), data) - private fun createRPCDispatcher(ops: CordaRPCOps) = object : RPCDispatcher(ops) { override fun send(bits: SerializedBytes<*>, toAddress: String) { state.locked { - val msg = session!!.createMessage(false) - msg.writeBodyBufferBytes(bits.bits) + val msg = session!!.createMessage(false).apply { + writeBodyBufferBytes(bits.bits) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } producer!!.send(toAddress, msg) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt index c6d4cccccf..ca4db6caf5 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt @@ -7,6 +7,7 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.toStringShort import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.Vault import com.r3corda.core.protocols.ProtocolLogic diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 0d4d9119cf..ae3f24abc5 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -6,10 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.contracts.Contract import com.r3corda.core.crypto.Party -import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.core.messaging.runOnNextMessage -import com.r3corda.core.messaging.send +import com.r3corda.core.messaging.* import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.NetworkCacheError diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt index 34411bf568..5308c178d6 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt @@ -9,6 +9,7 @@ import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.messaging.MessageHandlerRegistration import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.NetworkMapCache diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolIORequest.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolIORequest.kt index 51ae58bfa7..7df7f68f3a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolIORequest.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolIORequest.kt @@ -2,6 +2,7 @@ package com.r3corda.node.services.statemachine import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.TopicSession +import java.util.* // TODO revisit when Kotlin 1.1 is released and data classes can extend other classes interface ProtocolIORequest { @@ -15,6 +16,7 @@ interface SendRequest : ProtocolIORequest { val destination: Party val payload: Any val sendSessionID: Long + val uniqueMessageId: UUID } interface ReceiveRequest : ProtocolIORequest { @@ -27,6 +29,7 @@ data class SendAndReceive(override val topic: String, override val destination: Party, override val payload: Any, override val sendSessionID: Long, + override val uniqueMessageId: UUID, override val receiveType: Class, override val receiveSessionID: Long) : SendRequest, ReceiveRequest { @Transient @@ -43,7 +46,8 @@ data class ReceiveOnly(override val topic: String, data class SendOnly(override val destination: Party, override val topic: String, override val payload: Any, - override val sendSessionID: Long) : SendRequest { + override val sendSessionID: Long, + override val uniqueMessageId: UUID) : SendRequest { @Transient override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt index 0e3581c766..3dbe5b9c13 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt @@ -17,6 +17,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.sql.Connection import java.sql.SQLException +import java.util.* import java.util.concurrent.ExecutionException /** @@ -121,7 +122,7 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, sessionIDForReceive: Long, payload: Any, receiveType: Class): UntrustworthyData { - return suspendAndExpectReceive(SendAndReceive(topic, destination, payload, sessionIDForSend, receiveType, sessionIDForReceive)) + return suspendAndExpectReceive(SendAndReceive(topic, destination, payload, sessionIDForSend, UUID.randomUUID(), receiveType, sessionIDForReceive)) } @Suspendable @@ -131,7 +132,7 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, @Suspendable override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) { - suspend(SendOnly(destination, topic, payload, sessionID)) + suspend(SendOnly(destination, topic, payload, sessionID, UUID.randomUUID())) } @Suspendable diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index 8d946f396d..274cb3355e 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -147,9 +147,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService logError(e, it, topicSession, fiber) } } + if (checkpoint.request is SendRequest) { + sendMessage(fiber, checkpoint.request) + } } else { fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${checkpoint.receivedPayload.toString().abbreviate(50)}") executor.executeASAP { + if (checkpoint.request is SendRequest) { + sendMessage(fiber, checkpoint.request) + } iterateStateMachine(fiber, checkpoint.receivedPayload) { try { Fiber.unparkDeserialized(fiber, scheduler) @@ -279,21 +285,21 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>, request: ProtocolIORequest) { + val serialisedFiber = serializeFiber(psm) + updateCheckpoint(psm, serialisedFiber, request, null) // We have a request to do something: send, receive, or send-and-receive. if (request is ReceiveRequest<*>) { // Prepare a listener on the network that runs in the background thread when we receive a message. - prepareToReceiveForRequest(psm, request) + prepareToReceiveForRequest(psm, serialisedFiber, request) } if (request is SendRequest) { performSendRequest(psm, request) } } - private fun prepareToReceiveForRequest(psm: ProtocolStateMachineImpl<*>, request: ReceiveRequest<*>) { + private fun prepareToReceiveForRequest(psm: ProtocolStateMachineImpl<*>, serialisedFiber: SerializedBytes>, request: ReceiveRequest<*>) { executor.checkOnThread() val queueID = request.receiveTopicSession - val serialisedFiber = serializeFiber(psm) - updateCheckpoint(psm, serialisedFiber, request, null) psm.logger.trace { "Preparing to receive message of type ${request.receiveType.name} on queue $queueID" } iterateOnResponse(psm, serialisedFiber, request) { try { @@ -305,12 +311,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } private fun performSendRequest(psm: ProtocolStateMachineImpl<*>, request: SendRequest) { - val topicSession = TopicSession(request.topic, request.sendSessionID) - val payload = request.payload - psm.logger.trace { "Sending message of type ${payload.javaClass.name} using queue $topicSession to ${request.destination} (${payload.toString().abbreviate(50)})" } - val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination.name) ?: - throw IllegalArgumentException("Don't know about ${request.destination} but trying to send a message of type ${payload.javaClass.name} on $topicSession (${payload.toString().abbreviate(50)})", request.stackTraceInCaseOfProblems) - serviceHub.networkService.send(topicSession, payload, node.address) + val topicSession = sendMessage(psm, request) if (request is SendOnly) { // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. @@ -324,6 +325,16 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } } + private fun sendMessage(psm: ProtocolStateMachineImpl<*>, request: SendRequest): TopicSession { + val topicSession = TopicSession(request.topic, request.sendSessionID) + val payload = request.payload + psm.logger.trace { "Sending message of type ${payload.javaClass.name} using queue $topicSession to ${request.destination} (${payload.toString().abbreviate(50)})" } + val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination.name) ?: + throw IllegalArgumentException("Don't know about ${request.destination} but trying to send a message of type ${payload.javaClass.name} on $topicSession (${payload.toString().abbreviate(50)})", request.stackTraceInCaseOfProblems) + serviceHub.networkService.send(topicSession, payload, node.address, request.uniqueMessageId) + return topicSession + } + /** * Add a trigger to the [MessagingService] to deserialize the fiber and pass message content to it, once a message is * received. diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt index 96c3138132..24df131e07 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt @@ -4,6 +4,7 @@ package com.r3corda.node.messaging import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.TopicStringValidator +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.node.services.network.NetworkMapService import com.r3corda.testing.node.MockNetwork @@ -106,9 +107,11 @@ class InMemoryMessagingTests { assertEquals(1, received) // Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so - // this would fail. - node2.net.send(invalidMessage, node1.net.myAddress) - node2.net.send(validMessage, node1.net.myAddress) + // this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops + val invalidMessage2 = node2.net.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0)) + val validMessage2 = node2.net.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0)) + node2.net.send(invalidMessage2, node1.net.myAddress) + node2.net.send(validMessage2, node1.net.myAddress) network.runNetwork() assertEquals(2, received) diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index bf022d37ab..3ec543bd48 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -3,6 +3,7 @@ package com.r3corda.node.services import com.google.common.net.HostAndPort import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.messaging.Message +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingServer diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt index 2761457769..e2adce8ce9 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt @@ -5,6 +5,7 @@ import com.r3corda.contracts.asset.Cash import com.r3corda.core.contracts.* import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.newSecureRandom +import com.r3corda.core.messaging.createMessage import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.Vault import com.r3corda.core.random63BitValue diff --git a/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt index 336732c2f5..f0d00c2ce1 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt @@ -111,6 +111,45 @@ class StateMachineManagerTests { assertThat(restoredProtocol.receivedPayload).isEqualTo(payload) } + @Test + fun `protocol with send will resend on interrupted restart`() { + val topic = "send-and-receive" + val payload = random63BitValue() + val payload2 = random63BitValue() + var sentCount = 0 + var receivedCount = 0 + net.messagingNetwork.sentMessages.subscribe { if (it.message.topicSession.topic == topic) sentCount++ } + net.messagingNetwork.receivedMessages.subscribe { if (it.message.topicSession.topic == topic) receivedCount++ } + val node3 = net.createNode(node1.info.address) + net.runNetwork() + val firstProtocol = PingPongProtocol(topic, node3.info.identity, payload) + val secondProtocol = PingPongProtocol(topic, node2.info.identity, payload2) + connectProtocols(firstProtocol, secondProtocol) + // Kick off first send and receive + node2.smm.add("test", firstProtocol) + assertEquals(1, node2.checkpointStorage.checkpoints.count()) + // Restart node and thus reload the checkpoint and resend the message with same UUID + node2.stop() + val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray()) + val (firstAgain, fut1) = node2b.smm.findStateMachines(PingPongProtocol::class.java).single() + net.runNetwork() + assertEquals(1, node2.checkpointStorage.checkpoints.count()) + // Now add in the other half of the protocol. First message should get deduped. So message data stays in sync. + node3.smm.add("test", secondProtocol) + net.runNetwork() + node2b.smm.executor.flush() + fut1.get() + // Check protocols completed cleanly and didn't get out of phase + assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way + assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") // can't give a precise value as every addMessageHandler re-runs the undelivered messages + assertEquals(0, node2b.checkpointStorage.checkpoints.count(), "Checkpoints left after restored protocol should have ended") + assertEquals(0, node3.checkpointStorage.checkpoints.count(), "Checkpoints left after restored protocol should have ended") + assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3") + assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3") + assertEquals(payload, secondProtocol.receivedPayload, "Received payload does not match the (restarted) first value on Node 2") + assertEquals(payload + 1, secondProtocol.receivedPayload2, "Received payload does not match the expected second value on Node 2") + } + private inline fun MockNode.restartAndGetRestoredProtocol(networkMapAddress: SingleMessageRecipient? = null): P { val servicesArray = advertisedServices.toTypedArray() val node = mockNet.createNode(networkMapAddress, id, advertisedServices = *servicesArray) @@ -148,7 +187,8 @@ class StateMachineManagerTests { val lazyTime by lazy { serviceHub.clock.instant() } @Suspendable - override fun call() {} + override fun call() { + } override val topic: String get() = throw UnsupportedOperationException() } @@ -170,6 +210,17 @@ class StateMachineManagerTests { } } + private class PingPongProtocol(override val topic: String, val otherParty: Party, val payload: Long) : ProtocolLogic() { + @Transient var receivedPayload: Long? = null + @Transient var receivedPayload2: Long? = null + + @Suspendable + override fun call() { + receivedPayload = sendAndReceive(otherParty, payload).unwrap { it } + receivedPayload2 = sendAndReceive(otherParty, (payload + 1)).unwrap { it } + } + + } /** * A protocol that suspends forever after doing some work. This is to allow it to be retrieved from the SMM after diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt index 5e38895a9d..d32ce46577 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt @@ -5,13 +5,10 @@ import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.ThreadBox -import com.r3corda.core.crypto.sha256 import com.r3corda.core.messaging.* import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace -import com.r3corda.node.services.api.MessagingServiceBuilder -import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging import org.slf4j.LoggerFactory import rx.Observable @@ -50,7 +47,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria // The corresponding sentMessages stream reflects when a message was pumpSend'd private val messageSendQueue = LinkedBlockingQueue() private val _sentMessages = PublishSubject.create() - @Suppress("unused") // Used by the visualiser tool. + @Suppress("unused") // Used by the visualiser tool. /** A stream of (sender, message, recipients) triples */ val sentMessages: Observable get() = _sentMessages @@ -63,7 +60,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private val messageReceiveQueues = HashMap>() private val _receivedMessages = PublishSubject.create() - @Suppress("unused") // Used by the visualiser tool. + @Suppress("unused") // Used by the visualiser tool. /** A stream of (sender, message, recipients) triples */ val receivedMessages: Observable get() = _receivedMessages @@ -213,6 +210,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } private val state = ThreadBox(InnerState()) + private val processedMessages: MutableSet = Collections.synchronizedSet(HashSet()) override val myAddress: SingleMessageRecipient = handle @@ -228,7 +226,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration - = addMessageHandler(TopicSession(topic, sessionID), executor, callback) + = addMessageHandler(TopicSession(topic, sessionID), executor, callback) override fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { check(running) @@ -267,17 +265,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } /** Returns the given (topic & session, data) pair as a newly created message object. */ - override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message - = createMessage(TopicSession(topic, sessionID), data) - - /** Returns the given (topic & session, data) pair as a newly created message object. */ - override fun createMessage(topicSession: TopicSession, data: ByteArray): Message { + override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message { return object : Message { override val topicSession: TopicSession get() = topicSession override val data: ByteArray get() = data override val debugTimestamp: Instant = Instant.now() override fun serialise(): ByteArray = this.serialise() - override val debugMessageID: String get() = serialise().sha256().prefixChars() + override val uniqueMessageId: UUID = uuid override fun toString() = topicSession.toString() + "#" + String(data) } @@ -335,19 +329,23 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria val next = getNextQueue(q, block) ?: return null val (transfer, deliverTo) = next - for (handler in deliverTo) { - // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. - (handler.executor ?: MoreExecutors.directExecutor()).execute { - try { - handler.callback(transfer.message, handler) - } catch(e: Exception) { - loggerFor().error("Caught exception in handler for $this/${handler.topicSession}", e) + if (transfer.message.uniqueMessageId !in processedMessages) { + for (handler in deliverTo) { + // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. + (handler.executor ?: MoreExecutors.directExecutor()).execute { + try { + handler.callback(transfer.message, handler) + } catch(e: Exception) { + loggerFor().error("Caught exception in handler for $this/${handler.topicSession}", e) + } } } + _receivedMessages.onNext(transfer) + processedMessages += transfer.message.uniqueMessageId + } else { + log.info("Drop duplicate message ${transfer.message.uniqueMessageId}") } - _receivedMessages.onNext(transfer) - return transfer } }