mirror of
https://github.com/corda/corda.git
synced 2025-01-21 03:55:00 +00:00
Track message id's to deduplicate replays. Widen the auto-acknowledgement window of Artemis back to the default.
Use synchronized wrapper over set. Drop discard message to trace level logging. Fix code layout Use lazy trace extension method Track message id's to deduplicate replays. Widen the auto-acknowledgement window of Artemis back to the default. Use synchronized wrapper over set. Include tx message unique id in checkpointed data. Add test for checkpointed resend Fix bug in not getting UUID off message. Tidy formatting Add explanation comments to test asserts Put unique id even on Client messages. Tidy formatting
This commit is contained in:
parent
3f9fc2db85
commit
a964073c2f
@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.contracts.ClientToServiceCommand
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
|
@ -16,6 +16,7 @@ import com.r3corda.core.utilities.debug
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.messaging.*
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
@ -212,6 +213,8 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
return session.createMessage(false).apply {
|
||||
putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name)
|
||||
putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import org.junit.Test
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
@ -57,8 +58,11 @@ class ClientRPCInfrastructureTests {
|
||||
producer = serverSession.createProducer()
|
||||
val dispatcher = object : RPCDispatcher(TestOps()) {
|
||||
override fun send(bits: SerializedBytes<*>, toAddress: String) {
|
||||
val msg = serverSession.createMessage(false)
|
||||
msg.writeBodyBufferBytes(bits.bits)
|
||||
val msg = serverSession.createMessage(false).apply {
|
||||
writeBodyBufferBytes(bits.bits)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer.send(toAddress, msg)
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -73,27 +74,28 @@ interface MessagingService {
|
||||
*/
|
||||
fun send(message: Message, target: MessageRecipients)
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* Must not be blank.
|
||||
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
|
||||
* construction of a session, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topicSession identifier for the topic and session the message is sent to.
|
||||
*/
|
||||
fun createMessage(topicSession: TopicSession, data: ByteArray): Message
|
||||
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
|
||||
|
||||
/** Returns an address that refers to this node. */
|
||||
val myAddress: SingleMessageRecipient
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* Must not be blank.
|
||||
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
|
||||
* construction of a session, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
|
||||
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
|
||||
@ -106,7 +108,7 @@ interface MessagingService {
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: Executor? = null, callback: (Message) -> Unit)
|
||||
= runOnNextMessage(TopicSession(topic, sessionID), executor, callback)
|
||||
= runOnNextMessage(TopicSession(topic, sessionID), executor, callback)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
|
||||
@ -125,11 +127,11 @@ fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Exec
|
||||
}
|
||||
}
|
||||
|
||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients)
|
||||
= send(TopicSession(topic, sessionID), payload, to)
|
||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(TopicSession(topic, sessionID), payload, to, uuid)
|
||||
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients)
|
||||
= send(createMessage(topicSession, payload.serialize().bits), to)
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(createMessage(topicSession, payload.serialize().bits, uuid), to)
|
||||
|
||||
interface MessageHandlerRegistration
|
||||
|
||||
@ -145,6 +147,7 @@ data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION
|
||||
companion object {
|
||||
val Blank = TopicSession("", DEFAULT_SESSION_ID)
|
||||
}
|
||||
|
||||
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
|
||||
|
||||
override fun toString(): String = "$topic.$sessionID"
|
||||
@ -164,7 +167,7 @@ interface Message {
|
||||
val topicSession: TopicSession
|
||||
val data: ByteArray
|
||||
val debugTimestamp: Instant
|
||||
val debugMessageID: String
|
||||
val uniqueMessageId: UUID
|
||||
fun serialise(): ByteArray
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.node.CityDatabase
|
||||
import com.r3corda.core.node.CordaPluginRegistry
|
||||
|
@ -3,6 +3,7 @@ package com.r3corda.node.services.api
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
|
@ -85,7 +85,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
var rpcNotificationConsumer: ClientConsumer? = null
|
||||
|
||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||
var undeliveredMessages = listOf<Pair<Message, UUID>>()
|
||||
var undeliveredMessages = listOf<Message>()
|
||||
}
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
@ -122,7 +122,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
// Create a session. Note that the acknowledgement of messages is not flushed to
|
||||
// the DB until the default buffer size of 1MB is acknowledged.
|
||||
// the Artermis journal until the default buffer size of 1MB is acknowledged.
|
||||
val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
session.start()
|
||||
@ -179,11 +179,9 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
null
|
||||
} ?: break
|
||||
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uuid = UUID.fromString(artemisMessage.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage, uuid)
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage)
|
||||
if (message != null)
|
||||
deliver(message, uuid)
|
||||
deliver(message)
|
||||
|
||||
// Ack the message so it won't be redelivered. We should only really do this when there were no
|
||||
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
|
||||
@ -203,7 +201,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
|
||||
private fun artemisToCordaMessage(message: ClientMessage, uuid: UUID): Message? {
|
||||
private fun artemisToCordaMessage(message: ClientMessage): Message? {
|
||||
try {
|
||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||
@ -215,6 +213,8 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uuid = UUID.fromString(message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
|
||||
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid")
|
||||
|
||||
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
@ -223,7 +223,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
override val topicSession = TopicSession(topic, sessionID)
|
||||
override val data: ByteArray = body
|
||||
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
|
||||
override val debugMessageID: String = message.messageID.toString()
|
||||
override val uniqueMessageId: UUID = uuid
|
||||
override fun serialise(): ByteArray = body
|
||||
override fun toString() = topic + "#" + data.opaque()
|
||||
}
|
||||
@ -235,7 +235,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun deliver(msg: Message, uuid: UUID): Boolean {
|
||||
private fun deliver(msg: Message): Boolean {
|
||||
state.checkNotLocked()
|
||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||
// or removed whilst the filter is executing will not affect anything.
|
||||
@ -249,7 +249,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// This is a hack; transient messages held in memory isn't crash resistant.
|
||||
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
|
||||
state.locked {
|
||||
undeliveredMessages += Pair(msg, uuid)
|
||||
undeliveredMessages += msg
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -268,10 +268,10 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// interpret persistent as "server" and non-persistent as "client".
|
||||
if (persistentInbox) {
|
||||
databaseTransaction {
|
||||
callHandlers(msg, uuid, deliverTo)
|
||||
callHandlers(msg, deliverTo)
|
||||
}
|
||||
} else {
|
||||
callHandlers(msg, uuid, deliverTo)
|
||||
callHandlers(msg, deliverTo)
|
||||
}
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
@ -280,16 +280,16 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
return true
|
||||
}
|
||||
|
||||
private fun callHandlers(msg: Message, uuid: UUID, deliverTo: List<Handler>) {
|
||||
if (uuid in processedMessages) {
|
||||
log.trace { "discard duplicate message $uuid for ${msg.topicSession}" }
|
||||
private fun callHandlers(msg: Message, deliverTo: List<Handler>) {
|
||||
if (msg.uniqueMessageId in processedMessages) {
|
||||
log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
|
||||
return
|
||||
}
|
||||
for (handler in deliverTo) {
|
||||
handler.callback(msg, handler)
|
||||
}
|
||||
// TODO We will at some point need to decide a trimming policy for the id's
|
||||
processedMessages += uuid
|
||||
processedMessages += msg.uniqueMessageId
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
@ -332,7 +332,6 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
val queueName = toQueueName(target)
|
||||
val uuid = UUID.randomUUID()
|
||||
state.locked {
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
@ -340,13 +339,13 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||
writeBodyBufferBytes(message.data)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(uuid.toString()))
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
|
||||
if (knownQueues.add(queueName)) {
|
||||
maybeCreateQueue(queueName)
|
||||
}
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $uuid")
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $message.uniqueMessageId")
|
||||
producer!!.send(queueName, artemisMessage)
|
||||
}
|
||||
}
|
||||
@ -376,7 +375,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
undeliveredMessages = listOf()
|
||||
messagesToRedeliver
|
||||
}
|
||||
messagesToRedeliver.forEach { deliver(it.first, it.second) }
|
||||
messagesToRedeliver.forEach { deliver(it) }
|
||||
return handler
|
||||
}
|
||||
|
||||
@ -384,25 +383,26 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
handlers.remove(registration)
|
||||
}
|
||||
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
|
||||
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
|
||||
return object : Message {
|
||||
override val topicSession: TopicSession get() = topicSession
|
||||
override val data: ByteArray get() = data
|
||||
override val debugTimestamp: Instant = Instant.now()
|
||||
override fun serialise(): ByteArray = this.serialise()
|
||||
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
|
||||
override val uniqueMessageId: UUID = uuid
|
||||
override fun toString() = topicSession.toString() + "#" + String(data)
|
||||
}
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, sessionID: Long, data: ByteArray) = createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
private fun createRPCDispatcher(ops: CordaRPCOps) = object : RPCDispatcher(ops) {
|
||||
override fun send(bits: SerializedBytes<*>, toAddress: String) {
|
||||
state.locked {
|
||||
val msg = session!!.createMessage(false)
|
||||
msg.writeBodyBufferBytes(bits.bits)
|
||||
val msg = session!!.createMessage(false).apply {
|
||||
writeBodyBufferBytes(bits.bits)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer!!.send(toAddress, msg)
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.toStringShort
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
|
@ -6,10 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.contracts.Contract
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.NetworkCacheError
|
||||
|
@ -9,6 +9,7 @@ import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
|
@ -2,6 +2,7 @@ package com.r3corda.node.services.statemachine
|
||||
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.TopicSession
|
||||
import java.util.*
|
||||
|
||||
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
|
||||
interface ProtocolIORequest {
|
||||
@ -15,6 +16,7 @@ interface SendRequest : ProtocolIORequest {
|
||||
val destination: Party
|
||||
val payload: Any
|
||||
val sendSessionID: Long
|
||||
val uniqueMessageId: UUID
|
||||
}
|
||||
|
||||
interface ReceiveRequest<T> : ProtocolIORequest {
|
||||
@ -27,6 +29,7 @@ data class SendAndReceive<T>(override val topic: String,
|
||||
override val destination: Party,
|
||||
override val payload: Any,
|
||||
override val sendSessionID: Long,
|
||||
override val uniqueMessageId: UUID,
|
||||
override val receiveType: Class<T>,
|
||||
override val receiveSessionID: Long) : SendRequest, ReceiveRequest<T> {
|
||||
@Transient
|
||||
@ -43,7 +46,8 @@ data class ReceiveOnly<T>(override val topic: String,
|
||||
data class SendOnly(override val destination: Party,
|
||||
override val topic: String,
|
||||
override val payload: Any,
|
||||
override val sendSessionID: Long) : SendRequest {
|
||||
override val sendSessionID: Long,
|
||||
override val uniqueMessageId: UUID) : SendRequest {
|
||||
@Transient
|
||||
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
||||
/**
|
||||
@ -121,7 +122,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
sessionIDForReceive: Long,
|
||||
payload: Any,
|
||||
receiveType: Class<T>): UntrustworthyData<T> {
|
||||
return suspendAndExpectReceive(SendAndReceive(topic, destination, payload, sessionIDForSend, receiveType, sessionIDForReceive))
|
||||
return suspendAndExpectReceive(SendAndReceive(topic, destination, payload, sessionIDForSend, UUID.randomUUID(), receiveType, sessionIDForReceive))
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -131,7 +132,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
|
||||
@Suspendable
|
||||
override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) {
|
||||
suspend(SendOnly(destination, topic, payload, sessionID))
|
||||
suspend(SendOnly(destination, topic, payload, sessionID, UUID.randomUUID()))
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
@ -147,9 +147,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
logError(e, it, topicSession, fiber)
|
||||
}
|
||||
}
|
||||
if (checkpoint.request is SendRequest) {
|
||||
sendMessage(fiber, checkpoint.request)
|
||||
}
|
||||
} else {
|
||||
fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${checkpoint.receivedPayload.toString().abbreviate(50)}")
|
||||
executor.executeASAP {
|
||||
if (checkpoint.request is SendRequest) {
|
||||
sendMessage(fiber, checkpoint.request)
|
||||
}
|
||||
iterateStateMachine(fiber, checkpoint.receivedPayload) {
|
||||
try {
|
||||
Fiber.unparkDeserialized(fiber, scheduler)
|
||||
@ -279,21 +285,21 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
|
||||
private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>, request: ProtocolIORequest) {
|
||||
val serialisedFiber = serializeFiber(psm)
|
||||
updateCheckpoint(psm, serialisedFiber, request, null)
|
||||
// We have a request to do something: send, receive, or send-and-receive.
|
||||
if (request is ReceiveRequest<*>) {
|
||||
// Prepare a listener on the network that runs in the background thread when we receive a message.
|
||||
prepareToReceiveForRequest(psm, request)
|
||||
prepareToReceiveForRequest(psm, serialisedFiber, request)
|
||||
}
|
||||
if (request is SendRequest) {
|
||||
performSendRequest(psm, request)
|
||||
}
|
||||
}
|
||||
|
||||
private fun prepareToReceiveForRequest(psm: ProtocolStateMachineImpl<*>, request: ReceiveRequest<*>) {
|
||||
private fun prepareToReceiveForRequest(psm: ProtocolStateMachineImpl<*>, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>, request: ReceiveRequest<*>) {
|
||||
executor.checkOnThread()
|
||||
val queueID = request.receiveTopicSession
|
||||
val serialisedFiber = serializeFiber(psm)
|
||||
updateCheckpoint(psm, serialisedFiber, request, null)
|
||||
psm.logger.trace { "Preparing to receive message of type ${request.receiveType.name} on queue $queueID" }
|
||||
iterateOnResponse(psm, serialisedFiber, request) {
|
||||
try {
|
||||
@ -305,12 +311,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
|
||||
private fun performSendRequest(psm: ProtocolStateMachineImpl<*>, request: SendRequest) {
|
||||
val topicSession = TopicSession(request.topic, request.sendSessionID)
|
||||
val payload = request.payload
|
||||
psm.logger.trace { "Sending message of type ${payload.javaClass.name} using queue $topicSession to ${request.destination} (${payload.toString().abbreviate(50)})" }
|
||||
val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination.name) ?:
|
||||
throw IllegalArgumentException("Don't know about ${request.destination} but trying to send a message of type ${payload.javaClass.name} on $topicSession (${payload.toString().abbreviate(50)})", request.stackTraceInCaseOfProblems)
|
||||
serviceHub.networkService.send(topicSession, payload, node.address)
|
||||
val topicSession = sendMessage(psm, request)
|
||||
|
||||
if (request is SendOnly) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
@ -324,6 +325,16 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendMessage(psm: ProtocolStateMachineImpl<*>, request: SendRequest): TopicSession {
|
||||
val topicSession = TopicSession(request.topic, request.sendSessionID)
|
||||
val payload = request.payload
|
||||
psm.logger.trace { "Sending message of type ${payload.javaClass.name} using queue $topicSession to ${request.destination} (${payload.toString().abbreviate(50)})" }
|
||||
val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination.name) ?:
|
||||
throw IllegalArgumentException("Don't know about ${request.destination} but trying to send a message of type ${payload.javaClass.name} on $topicSession (${payload.toString().abbreviate(50)})", request.stackTraceInCaseOfProblems)
|
||||
serviceHub.networkService.send(topicSession, payload, node.address, request.uniqueMessageId)
|
||||
return topicSession
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a trigger to the [MessagingService] to deserialize the fiber and pass message content to it, once a message is
|
||||
* received.
|
||||
|
@ -4,6 +4,7 @@ package com.r3corda.node.messaging
|
||||
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.TopicStringValidator
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
@ -106,9 +107,11 @@ class InMemoryMessagingTests {
|
||||
assertEquals(1, received)
|
||||
|
||||
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
|
||||
// this would fail.
|
||||
node2.net.send(invalidMessage, node1.net.myAddress)
|
||||
node2.net.send(validMessage, node1.net.myAddress)
|
||||
// this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops
|
||||
val invalidMessage2 = node2.net.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0))
|
||||
val validMessage2 = node2.net.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0))
|
||||
node2.net.send(invalidMessage2, node1.net.myAddress)
|
||||
node2.net.send(validMessage2, node1.net.myAddress)
|
||||
network.runNetwork()
|
||||
assertEquals(2, received)
|
||||
|
||||
|
@ -3,6 +3,7 @@ package com.r3corda.node.services
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||
|
@ -5,6 +5,7 @@ import com.r3corda.contracts.asset.Cash
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.newSecureRandom
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.random63BitValue
|
||||
|
@ -111,6 +111,45 @@ class StateMachineManagerTests {
|
||||
assertThat(restoredProtocol.receivedPayload).isEqualTo(payload)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `protocol with send will resend on interrupted restart`() {
|
||||
val topic = "send-and-receive"
|
||||
val payload = random63BitValue()
|
||||
val payload2 = random63BitValue()
|
||||
var sentCount = 0
|
||||
var receivedCount = 0
|
||||
net.messagingNetwork.sentMessages.subscribe { if (it.message.topicSession.topic == topic) sentCount++ }
|
||||
net.messagingNetwork.receivedMessages.subscribe { if (it.message.topicSession.topic == topic) receivedCount++ }
|
||||
val node3 = net.createNode(node1.info.address)
|
||||
net.runNetwork()
|
||||
val firstProtocol = PingPongProtocol(topic, node3.info.identity, payload)
|
||||
val secondProtocol = PingPongProtocol(topic, node2.info.identity, payload2)
|
||||
connectProtocols(firstProtocol, secondProtocol)
|
||||
// Kick off first send and receive
|
||||
node2.smm.add("test", firstProtocol)
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints.count())
|
||||
// Restart node and thus reload the checkpoint and resend the message with same UUID
|
||||
node2.stop()
|
||||
val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
|
||||
val (firstAgain, fut1) = node2b.smm.findStateMachines(PingPongProtocol::class.java).single()
|
||||
net.runNetwork()
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints.count())
|
||||
// Now add in the other half of the protocol. First message should get deduped. So message data stays in sync.
|
||||
node3.smm.add("test", secondProtocol)
|
||||
net.runNetwork()
|
||||
node2b.smm.executor.flush()
|
||||
fut1.get()
|
||||
// Check protocols completed cleanly and didn't get out of phase
|
||||
assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way
|
||||
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") // can't give a precise value as every addMessageHandler re-runs the undelivered messages
|
||||
assertEquals(0, node2b.checkpointStorage.checkpoints.count(), "Checkpoints left after restored protocol should have ended")
|
||||
assertEquals(0, node3.checkpointStorage.checkpoints.count(), "Checkpoints left after restored protocol should have ended")
|
||||
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
|
||||
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
|
||||
assertEquals(payload, secondProtocol.receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
|
||||
assertEquals(payload + 1, secondProtocol.receivedPayload2, "Received payload does not match the expected second value on Node 2")
|
||||
}
|
||||
|
||||
private inline fun <reified P : NonTerminatingProtocol> MockNode.restartAndGetRestoredProtocol(networkMapAddress: SingleMessageRecipient? = null): P {
|
||||
val servicesArray = advertisedServices.toTypedArray()
|
||||
val node = mockNet.createNode(networkMapAddress, id, advertisedServices = *servicesArray)
|
||||
@ -148,7 +187,8 @@ class StateMachineManagerTests {
|
||||
val lazyTime by lazy { serviceHub.clock.instant() }
|
||||
|
||||
@Suspendable
|
||||
override fun call() {}
|
||||
override fun call() {
|
||||
}
|
||||
|
||||
override val topic: String get() = throw UnsupportedOperationException()
|
||||
}
|
||||
@ -170,6 +210,17 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private class PingPongProtocol(override val topic: String, val otherParty: Party, val payload: Long) : ProtocolLogic<Unit>() {
|
||||
@Transient var receivedPayload: Long? = null
|
||||
@Transient var receivedPayload2: Long? = null
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
receivedPayload = sendAndReceive<Long>(otherParty, payload).unwrap { it }
|
||||
receivedPayload2 = sendAndReceive<Long>(otherParty, (payload + 1)).unwrap { it }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A protocol that suspends forever after doing some work. This is to allow it to be retrieved from the SMM after
|
||||
|
@ -5,13 +5,10 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.sha256
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.MessagingServiceBuilder
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
@ -50,7 +47,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
// The corresponding sentMessages stream reflects when a message was pumpSend'd
|
||||
private val messageSendQueue = LinkedBlockingQueue<MessageTransfer>()
|
||||
private val _sentMessages = PublishSubject.create<MessageTransfer>()
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val sentMessages: Observable<MessageTransfer>
|
||||
get() = _sentMessages
|
||||
@ -63,7 +60,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
|
||||
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
|
||||
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val receivedMessages: Observable<MessageTransfer>
|
||||
get() = _receivedMessages
|
||||
@ -213,6 +210,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(HashSet<UUID>())
|
||||
|
||||
override val myAddress: SingleMessageRecipient = handle
|
||||
|
||||
@ -228,7 +226,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
|
||||
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
|
||||
|
||||
override fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
|
||||
check(running)
|
||||
@ -267,17 +265,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
}
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
|
||||
return object : Message {
|
||||
override val topicSession: TopicSession get() = topicSession
|
||||
override val data: ByteArray get() = data
|
||||
override val debugTimestamp: Instant = Instant.now()
|
||||
override fun serialise(): ByteArray = this.serialise()
|
||||
override val debugMessageID: String get() = serialise().sha256().prefixChars()
|
||||
override val uniqueMessageId: UUID = uuid
|
||||
|
||||
override fun toString() = topicSession.toString() + "#" + String(data)
|
||||
}
|
||||
@ -335,19 +329,23 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
val next = getNextQueue(q, block) ?: return null
|
||||
val (transfer, deliverTo) = next
|
||||
|
||||
for (handler in deliverTo) {
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(transfer.message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
if (transfer.message.uniqueMessageId !in processedMessages) {
|
||||
for (handler in deliverTo) {
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(transfer.message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
_receivedMessages.onNext(transfer)
|
||||
processedMessages += transfer.message.uniqueMessageId
|
||||
} else {
|
||||
log.info("Drop duplicate message ${transfer.message.uniqueMessageId}")
|
||||
}
|
||||
|
||||
_receivedMessages.onNext(transfer)
|
||||
|
||||
return transfer
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user