mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
Merged in mnesbit-cor-389-checkpoint-protocol-message-ids (pull request #377)
Put the message unique id's on based on a stored value in the checkpoint
This commit is contained in:
commit
568e1ebcd4
@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.contracts.ClientToServiceCommand
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
|
@ -16,6 +16,7 @@ import com.r3corda.core.utilities.debug
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.messaging.*
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
@ -212,6 +213,8 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
return session.createMessage(false).apply {
|
||||
putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name)
|
||||
putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import org.junit.Test
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
@ -57,8 +58,11 @@ class ClientRPCInfrastructureTests {
|
||||
producer = serverSession.createProducer()
|
||||
val dispatcher = object : RPCDispatcher(TestOps()) {
|
||||
override fun send(bits: SerializedBytes<*>, toAddress: String) {
|
||||
val msg = serverSession.createMessage(false)
|
||||
msg.writeBodyBufferBytes(bits.bits)
|
||||
val msg = serverSession.createMessage(false).apply {
|
||||
writeBodyBufferBytes(bits.bits)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer.send(toAddress, msg)
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -73,27 +74,28 @@ interface MessagingService {
|
||||
*/
|
||||
fun send(message: Message, target: MessageRecipients)
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* Must not be blank.
|
||||
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
|
||||
* construction of a session, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topicSession identifier for the topic and session the message is sent to.
|
||||
*/
|
||||
fun createMessage(topicSession: TopicSession, data: ByteArray): Message
|
||||
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
|
||||
|
||||
/** Returns an address that refers to this node. */
|
||||
val myAddress: SingleMessageRecipient
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* Must not be blank.
|
||||
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
|
||||
* construction of a session, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
|
||||
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
|
||||
@ -106,7 +108,7 @@ interface MessagingService {
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: Executor? = null, callback: (Message) -> Unit)
|
||||
= runOnNextMessage(TopicSession(topic, sessionID), executor, callback)
|
||||
= runOnNextMessage(TopicSession(topic, sessionID), executor, callback)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
|
||||
@ -125,11 +127,11 @@ fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Exec
|
||||
}
|
||||
}
|
||||
|
||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients)
|
||||
= send(TopicSession(topic, sessionID), payload, to)
|
||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(TopicSession(topic, sessionID), payload, to, uuid)
|
||||
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients)
|
||||
= send(createMessage(topicSession, payload.serialize().bits), to)
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(createMessage(topicSession, payload.serialize().bits, uuid), to)
|
||||
|
||||
interface MessageHandlerRegistration
|
||||
|
||||
@ -145,6 +147,7 @@ data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION
|
||||
companion object {
|
||||
val Blank = TopicSession("", DEFAULT_SESSION_ID)
|
||||
}
|
||||
|
||||
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
|
||||
|
||||
override fun toString(): String = "$topic.$sessionID"
|
||||
@ -164,7 +167,7 @@ interface Message {
|
||||
val topicSession: TopicSession
|
||||
val data: ByteArray
|
||||
val debugTimestamp: Instant
|
||||
val debugMessageID: String
|
||||
val uniqueMessageId: UUID
|
||||
fun serialise(): ByteArray
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.node.CityDatabase
|
||||
import com.r3corda.core.node.CordaPluginRegistry
|
||||
|
@ -3,6 +3,7 @@ package com.r3corda.node.services.api
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
|
@ -85,7 +85,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
var rpcNotificationConsumer: ClientConsumer? = null
|
||||
|
||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||
var undeliveredMessages = listOf<Pair<Message, UUID>>()
|
||||
var undeliveredMessages = listOf<Message>()
|
||||
}
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
@ -122,7 +122,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
// Create a session. Note that the acknowledgement of messages is not flushed to
|
||||
// the DB until the default buffer size of 1MB is acknowledged.
|
||||
// the Artermis journal until the default buffer size of 1MB is acknowledged.
|
||||
val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
session.start()
|
||||
@ -179,11 +179,9 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
null
|
||||
} ?: break
|
||||
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uuid = UUID.fromString(artemisMessage.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage, uuid)
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage)
|
||||
if (message != null)
|
||||
deliver(message, uuid)
|
||||
deliver(message)
|
||||
|
||||
// Ack the message so it won't be redelivered. We should only really do this when there were no
|
||||
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
|
||||
@ -203,7 +201,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
|
||||
private fun artemisToCordaMessage(message: ClientMessage, uuid: UUID): Message? {
|
||||
private fun artemisToCordaMessage(message: ClientMessage): Message? {
|
||||
try {
|
||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||
@ -215,6 +213,8 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uuid = UUID.fromString(message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
|
||||
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid")
|
||||
|
||||
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
@ -223,7 +223,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
override val topicSession = TopicSession(topic, sessionID)
|
||||
override val data: ByteArray = body
|
||||
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
|
||||
override val debugMessageID: String = message.messageID.toString()
|
||||
override val uniqueMessageId: UUID = uuid
|
||||
override fun serialise(): ByteArray = body
|
||||
override fun toString() = topic + "#" + data.opaque()
|
||||
}
|
||||
@ -235,7 +235,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun deliver(msg: Message, uuid: UUID): Boolean {
|
||||
private fun deliver(msg: Message): Boolean {
|
||||
state.checkNotLocked()
|
||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||
// or removed whilst the filter is executing will not affect anything.
|
||||
@ -249,7 +249,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// This is a hack; transient messages held in memory isn't crash resistant.
|
||||
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
|
||||
state.locked {
|
||||
undeliveredMessages += Pair(msg, uuid)
|
||||
undeliveredMessages += msg
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -268,10 +268,10 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// interpret persistent as "server" and non-persistent as "client".
|
||||
if (persistentInbox) {
|
||||
databaseTransaction {
|
||||
callHandlers(msg, uuid, deliverTo)
|
||||
callHandlers(msg, deliverTo)
|
||||
}
|
||||
} else {
|
||||
callHandlers(msg, uuid, deliverTo)
|
||||
callHandlers(msg, deliverTo)
|
||||
}
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
@ -280,16 +280,16 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
return true
|
||||
}
|
||||
|
||||
private fun callHandlers(msg: Message, uuid: UUID, deliverTo: List<Handler>) {
|
||||
if (uuid in processedMessages) {
|
||||
log.trace { "discard duplicate message $uuid for ${msg.topicSession}" }
|
||||
private fun callHandlers(msg: Message, deliverTo: List<Handler>) {
|
||||
if (msg.uniqueMessageId in processedMessages) {
|
||||
log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
|
||||
return
|
||||
}
|
||||
for (handler in deliverTo) {
|
||||
handler.callback(msg, handler)
|
||||
}
|
||||
// TODO We will at some point need to decide a trimming policy for the id's
|
||||
processedMessages += uuid
|
||||
processedMessages += msg.uniqueMessageId
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
@ -332,7 +332,6 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
val queueName = toQueueName(target)
|
||||
val uuid = UUID.randomUUID()
|
||||
state.locked {
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
@ -340,13 +339,13 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||
writeBodyBufferBytes(message.data)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(uuid.toString()))
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
|
||||
if (knownQueues.add(queueName)) {
|
||||
maybeCreateQueue(queueName)
|
||||
}
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $uuid")
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $message.uniqueMessageId")
|
||||
producer!!.send(queueName, artemisMessage)
|
||||
}
|
||||
}
|
||||
@ -376,7 +375,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
undeliveredMessages = listOf()
|
||||
messagesToRedeliver
|
||||
}
|
||||
messagesToRedeliver.forEach { deliver(it.first, it.second) }
|
||||
messagesToRedeliver.forEach { deliver(it) }
|
||||
return handler
|
||||
}
|
||||
|
||||
@ -384,25 +383,26 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
handlers.remove(registration)
|
||||
}
|
||||
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
|
||||
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
|
||||
return object : Message {
|
||||
override val topicSession: TopicSession get() = topicSession
|
||||
override val data: ByteArray get() = data
|
||||
override val debugTimestamp: Instant = Instant.now()
|
||||
override fun serialise(): ByteArray = this.serialise()
|
||||
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
|
||||
override val uniqueMessageId: UUID = uuid
|
||||
override fun toString() = topicSession.toString() + "#" + String(data)
|
||||
}
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, sessionID: Long, data: ByteArray) = createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
private fun createRPCDispatcher(ops: CordaRPCOps) = object : RPCDispatcher(ops) {
|
||||
override fun send(bits: SerializedBytes<*>, toAddress: String) {
|
||||
state.locked {
|
||||
val msg = session!!.createMessage(false)
|
||||
msg.writeBodyBufferBytes(bits.bits)
|
||||
val msg = session!!.createMessage(false).apply {
|
||||
writeBodyBufferBytes(bits.bits)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer!!.send(toAddress, msg)
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.toStringShort
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
|
@ -6,10 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.contracts.Contract
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.NetworkCacheError
|
||||
|
@ -9,6 +9,7 @@ import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
|
@ -2,6 +2,7 @@ package com.r3corda.node.services.statemachine
|
||||
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.TopicSession
|
||||
import java.util.*
|
||||
|
||||
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
|
||||
interface ProtocolIORequest {
|
||||
@ -15,6 +16,7 @@ interface SendRequest : ProtocolIORequest {
|
||||
val destination: Party
|
||||
val payload: Any
|
||||
val sendSessionID: Long
|
||||
val uniqueMessageId: UUID
|
||||
}
|
||||
|
||||
interface ReceiveRequest<T> : ProtocolIORequest {
|
||||
@ -27,6 +29,7 @@ data class SendAndReceive<T>(override val topic: String,
|
||||
override val destination: Party,
|
||||
override val payload: Any,
|
||||
override val sendSessionID: Long,
|
||||
override val uniqueMessageId: UUID,
|
||||
override val receiveType: Class<T>,
|
||||
override val receiveSessionID: Long) : SendRequest, ReceiveRequest<T> {
|
||||
@Transient
|
||||
@ -43,7 +46,8 @@ data class ReceiveOnly<T>(override val topic: String,
|
||||
data class SendOnly(override val destination: Party,
|
||||
override val topic: String,
|
||||
override val payload: Any,
|
||||
override val sendSessionID: Long) : SendRequest {
|
||||
override val sendSessionID: Long,
|
||||
override val uniqueMessageId: UUID) : SendRequest {
|
||||
@Transient
|
||||
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
||||
/**
|
||||
@ -121,7 +122,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
sessionIDForReceive: Long,
|
||||
payload: Any,
|
||||
receiveType: Class<T>): UntrustworthyData<T> {
|
||||
return suspendAndExpectReceive(SendAndReceive(topic, destination, payload, sessionIDForSend, receiveType, sessionIDForReceive))
|
||||
return suspendAndExpectReceive(SendAndReceive(topic, destination, payload, sessionIDForSend, UUID.randomUUID(), receiveType, sessionIDForReceive))
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -131,7 +132,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
|
||||
@Suspendable
|
||||
override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) {
|
||||
suspend(SendOnly(destination, topic, payload, sessionID))
|
||||
suspend(SendOnly(destination, topic, payload, sessionID, UUID.randomUUID()))
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
@ -147,9 +147,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
logError(e, it, topicSession, fiber)
|
||||
}
|
||||
}
|
||||
if (checkpoint.request is SendRequest) {
|
||||
sendMessage(fiber, checkpoint.request)
|
||||
}
|
||||
} else {
|
||||
fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${checkpoint.receivedPayload.toString().abbreviate(50)}")
|
||||
executor.executeASAP {
|
||||
if (checkpoint.request is SendRequest) {
|
||||
sendMessage(fiber, checkpoint.request)
|
||||
}
|
||||
iterateStateMachine(fiber, checkpoint.receivedPayload) {
|
||||
try {
|
||||
Fiber.unparkDeserialized(fiber, scheduler)
|
||||
@ -279,21 +285,21 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
|
||||
private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>, request: ProtocolIORequest) {
|
||||
val serialisedFiber = serializeFiber(psm)
|
||||
updateCheckpoint(psm, serialisedFiber, request, null)
|
||||
// We have a request to do something: send, receive, or send-and-receive.
|
||||
if (request is ReceiveRequest<*>) {
|
||||
// Prepare a listener on the network that runs in the background thread when we receive a message.
|
||||
prepareToReceiveForRequest(psm, request)
|
||||
prepareToReceiveForRequest(psm, serialisedFiber, request)
|
||||
}
|
||||
if (request is SendRequest) {
|
||||
performSendRequest(psm, request)
|
||||
}
|
||||
}
|
||||
|
||||
private fun prepareToReceiveForRequest(psm: ProtocolStateMachineImpl<*>, request: ReceiveRequest<*>) {
|
||||
private fun prepareToReceiveForRequest(psm: ProtocolStateMachineImpl<*>, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>, request: ReceiveRequest<*>) {
|
||||
executor.checkOnThread()
|
||||
val queueID = request.receiveTopicSession
|
||||
val serialisedFiber = serializeFiber(psm)
|
||||
updateCheckpoint(psm, serialisedFiber, request, null)
|
||||
psm.logger.trace { "Preparing to receive message of type ${request.receiveType.name} on queue $queueID" }
|
||||
iterateOnResponse(psm, serialisedFiber, request) {
|
||||
try {
|
||||
@ -305,12 +311,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
|
||||
private fun performSendRequest(psm: ProtocolStateMachineImpl<*>, request: SendRequest) {
|
||||
val topicSession = TopicSession(request.topic, request.sendSessionID)
|
||||
val payload = request.payload
|
||||
psm.logger.trace { "Sending message of type ${payload.javaClass.name} using queue $topicSession to ${request.destination} (${payload.toString().abbreviate(50)})" }
|
||||
val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination.name) ?:
|
||||
throw IllegalArgumentException("Don't know about ${request.destination} but trying to send a message of type ${payload.javaClass.name} on $topicSession (${payload.toString().abbreviate(50)})", request.stackTraceInCaseOfProblems)
|
||||
serviceHub.networkService.send(topicSession, payload, node.address)
|
||||
val topicSession = sendMessage(psm, request)
|
||||
|
||||
if (request is SendOnly) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
@ -324,6 +325,16 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendMessage(psm: ProtocolStateMachineImpl<*>, request: SendRequest): TopicSession {
|
||||
val topicSession = TopicSession(request.topic, request.sendSessionID)
|
||||
val payload = request.payload
|
||||
psm.logger.trace { "Sending message of type ${payload.javaClass.name} using queue $topicSession to ${request.destination} (${payload.toString().abbreviate(50)})" }
|
||||
val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination.name) ?:
|
||||
throw IllegalArgumentException("Don't know about ${request.destination} but trying to send a message of type ${payload.javaClass.name} on $topicSession (${payload.toString().abbreviate(50)})", request.stackTraceInCaseOfProblems)
|
||||
serviceHub.networkService.send(topicSession, payload, node.address, request.uniqueMessageId)
|
||||
return topicSession
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a trigger to the [MessagingService] to deserialize the fiber and pass message content to it, once a message is
|
||||
* received.
|
||||
|
@ -4,6 +4,7 @@ package com.r3corda.node.messaging
|
||||
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.TopicStringValidator
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
@ -106,9 +107,11 @@ class InMemoryMessagingTests {
|
||||
assertEquals(1, received)
|
||||
|
||||
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
|
||||
// this would fail.
|
||||
node2.net.send(invalidMessage, node1.net.myAddress)
|
||||
node2.net.send(validMessage, node1.net.myAddress)
|
||||
// this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops
|
||||
val invalidMessage2 = node2.net.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0))
|
||||
val validMessage2 = node2.net.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0))
|
||||
node2.net.send(invalidMessage2, node1.net.myAddress)
|
||||
node2.net.send(validMessage2, node1.net.myAddress)
|
||||
network.runNetwork()
|
||||
assertEquals(2, received)
|
||||
|
||||
|
@ -3,6 +3,7 @@ package com.r3corda.node.services
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||
|
@ -5,6 +5,7 @@ import com.r3corda.contracts.asset.Cash
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.newSecureRandom
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.random63BitValue
|
||||
|
@ -111,6 +111,45 @@ class StateMachineManagerTests {
|
||||
assertThat(restoredProtocol.receivedPayload).isEqualTo(payload)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `protocol with send will resend on interrupted restart`() {
|
||||
val topic = "send-and-receive"
|
||||
val payload = random63BitValue()
|
||||
val payload2 = random63BitValue()
|
||||
var sentCount = 0
|
||||
var receivedCount = 0
|
||||
net.messagingNetwork.sentMessages.subscribe { if (it.message.topicSession.topic == topic) sentCount++ }
|
||||
net.messagingNetwork.receivedMessages.subscribe { if (it.message.topicSession.topic == topic) receivedCount++ }
|
||||
val node3 = net.createNode(node1.info.address)
|
||||
net.runNetwork()
|
||||
val firstProtocol = PingPongProtocol(topic, node3.info.identity, payload)
|
||||
val secondProtocol = PingPongProtocol(topic, node2.info.identity, payload2)
|
||||
connectProtocols(firstProtocol, secondProtocol)
|
||||
// Kick off first send and receive
|
||||
node2.smm.add("test", firstProtocol)
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints.count())
|
||||
// Restart node and thus reload the checkpoint and resend the message with same UUID
|
||||
node2.stop()
|
||||
val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
|
||||
val (firstAgain, fut1) = node2b.smm.findStateMachines(PingPongProtocol::class.java).single()
|
||||
net.runNetwork()
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints.count())
|
||||
// Now add in the other half of the protocol. First message should get deduped. So message data stays in sync.
|
||||
node3.smm.add("test", secondProtocol)
|
||||
net.runNetwork()
|
||||
node2b.smm.executor.flush()
|
||||
fut1.get()
|
||||
// Check protocols completed cleanly and didn't get out of phase
|
||||
assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way
|
||||
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") // can't give a precise value as every addMessageHandler re-runs the undelivered messages
|
||||
assertEquals(0, node2b.checkpointStorage.checkpoints.count(), "Checkpoints left after restored protocol should have ended")
|
||||
assertEquals(0, node3.checkpointStorage.checkpoints.count(), "Checkpoints left after restored protocol should have ended")
|
||||
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
|
||||
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
|
||||
assertEquals(payload, secondProtocol.receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
|
||||
assertEquals(payload + 1, secondProtocol.receivedPayload2, "Received payload does not match the expected second value on Node 2")
|
||||
}
|
||||
|
||||
private inline fun <reified P : NonTerminatingProtocol> MockNode.restartAndGetRestoredProtocol(networkMapAddress: SingleMessageRecipient? = null): P {
|
||||
val servicesArray = advertisedServices.toTypedArray()
|
||||
val node = mockNet.createNode(networkMapAddress, id, advertisedServices = *servicesArray)
|
||||
@ -148,7 +187,8 @@ class StateMachineManagerTests {
|
||||
val lazyTime by lazy { serviceHub.clock.instant() }
|
||||
|
||||
@Suspendable
|
||||
override fun call() {}
|
||||
override fun call() {
|
||||
}
|
||||
|
||||
override val topic: String get() = throw UnsupportedOperationException()
|
||||
}
|
||||
@ -170,6 +210,17 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private class PingPongProtocol(override val topic: String, val otherParty: Party, val payload: Long) : ProtocolLogic<Unit>() {
|
||||
@Transient var receivedPayload: Long? = null
|
||||
@Transient var receivedPayload2: Long? = null
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
receivedPayload = sendAndReceive<Long>(otherParty, payload).unwrap { it }
|
||||
receivedPayload2 = sendAndReceive<Long>(otherParty, (payload + 1)).unwrap { it }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A protocol that suspends forever after doing some work. This is to allow it to be retrieved from the SMM after
|
||||
|
@ -5,13 +5,10 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.sha256
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.MessagingServiceBuilder
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
@ -50,7 +47,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
// The corresponding sentMessages stream reflects when a message was pumpSend'd
|
||||
private val messageSendQueue = LinkedBlockingQueue<MessageTransfer>()
|
||||
private val _sentMessages = PublishSubject.create<MessageTransfer>()
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val sentMessages: Observable<MessageTransfer>
|
||||
get() = _sentMessages
|
||||
@ -63,7 +60,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
|
||||
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
|
||||
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val receivedMessages: Observable<MessageTransfer>
|
||||
get() = _receivedMessages
|
||||
@ -213,6 +210,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(HashSet<UUID>())
|
||||
|
||||
override val myAddress: SingleMessageRecipient = handle
|
||||
|
||||
@ -228,7 +226,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
|
||||
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
|
||||
|
||||
override fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
|
||||
check(running)
|
||||
@ -267,17 +265,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
}
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
|
||||
return object : Message {
|
||||
override val topicSession: TopicSession get() = topicSession
|
||||
override val data: ByteArray get() = data
|
||||
override val debugTimestamp: Instant = Instant.now()
|
||||
override fun serialise(): ByteArray = this.serialise()
|
||||
override val debugMessageID: String get() = serialise().sha256().prefixChars()
|
||||
override val uniqueMessageId: UUID = uuid
|
||||
|
||||
override fun toString() = topicSession.toString() + "#" + String(data)
|
||||
}
|
||||
@ -335,19 +329,23 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
val next = getNextQueue(q, block) ?: return null
|
||||
val (transfer, deliverTo) = next
|
||||
|
||||
for (handler in deliverTo) {
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(transfer.message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
if (transfer.message.uniqueMessageId !in processedMessages) {
|
||||
for (handler in deliverTo) {
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(transfer.message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
_receivedMessages.onNext(transfer)
|
||||
processedMessages += transfer.message.uniqueMessageId
|
||||
} else {
|
||||
log.info("Drop duplicate message ${transfer.message.uniqueMessageId}")
|
||||
}
|
||||
|
||||
_receivedMessages.onNext(transfer)
|
||||
|
||||
return transfer
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user