diff --git a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt index e89d7863af..1bb07d52d3 100644 --- a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt @@ -1,6 +1,7 @@ package com.r3corda.core.messaging import com.google.common.util.concurrent.ListenableFuture +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef import com.r3corda.core.serialization.serialize import java.time.Instant @@ -22,7 +23,7 @@ import javax.annotation.concurrent.ThreadSafe interface MessagingService { /** * The provided function will be invoked for each received message whose topic matches the given string, on the given - * executor. The topic can be the empty string to match all messages. + * executor. * * If no executor is received then the callback will run on threads provided by the messaging service, and the * callback is expected to be thread safe as a result. @@ -30,8 +31,28 @@ interface MessagingService { * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister * itself and yet addMessageHandler hasn't returned the handle yet. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). + * @param sessionID identifier for the session the message is part of. For services listening before + * a session is established, use [DEFAULT_SESSION_ID]. */ - fun addMessageHandler(topic: String = "", executor: Executor? = null, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, executor: Executor? = null, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + + /** + * The provided function will be invoked for each received message whose topic and session matches, on the + * given executor. + * + * If no executor is received then the callback will run on threads provided by the messaging service, and the + * callback is expected to be thread safe as a result. + * + * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. + * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister + * itself and yet addMessageHandler hasn't returned the handle yet. + * + * @param topicSession identifier for the topic and session to listen for messages arriving on. + */ + fun addMessageHandler(topicSession: TopicSession, executor: Executor? = null, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration /** * Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once @@ -55,34 +76,81 @@ interface MessagingService { /** * Returns an initialised [Message] with the current time, etc, already filled in. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * Must not be blank. + * @param sessionID identifier for the session the message is part of. For messages sent to services before the + * construction of a session, use [DEFAULT_SESSION_ID]. */ - fun createMessage(topic: String, data: ByteArray): Message + fun createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message + + /** + * Returns an initialised [Message] with the current time, etc, already filled in. + * + * @param topicSession identifier for the topic and session the message is sent to. + */ + fun createMessage(topicSession: TopicSession, data: ByteArray): Message /** Returns an address that refers to this node. */ val myAddress: SingleMessageRecipient } /** - * Registers a handler for the given topic that runs the given callback with the message and then removes itself. This - * is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback doesn't - * take the registration object, unlike the callback to [MessagingService.addMessageHandler]. + * Registers a handler for the given topic and session ID that runs the given callback with the message and then removes + * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback + * doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is + * automatically deregistered before the callback runs. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). + * @param sessionID identifier for the session the message is part of. For services listening before + * a session is established, use [DEFAULT_SESSION_ID]. */ -fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) { +fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: Executor? = null, callback: (Message) -> Unit) + = runOnNextMessage(TopicSession(topic, sessionID), executor, callback) + +/** + * Registers a handler for the given topic and session that runs the given callback with the message and then removes + * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback + * doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler]. + * + * @param topicSession identifier for the topic and session to listen for messages arriving on. + */ +fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, callback: (Message) -> Unit) { val consumed = AtomicBoolean() - addMessageHandler(topic, executor) { msg, reg -> + addMessageHandler(topicSession, executor) { msg, reg -> removeMessageHandler(reg) check(!consumed.getAndSet(true)) { "Called more than once" } - check(msg.topic == topic) { "Topic mismatch: ${msg.topic} vs $topic" } + check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" } callback(msg) } } -fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) { - send(createMessage(topic, payload.serialize().bits), to) -} +fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients) + = send(TopicSession(topic, sessionID), payload, to) + +fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients) + = send(createMessage(topicSession, payload.serialize().bits), to) interface MessageHandlerRegistration +/** + * An identifier for the endpoint [MessagingService] message handlers listen at. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). + * @param sessionID identifier for the session the message is part of. For services listening before + * a session is established, use [DEFAULT_SESSION_ID]. + */ +data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) { + companion object { + val Blank = TopicSession("", DEFAULT_SESSION_ID) + } + fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID + + override fun toString(): String = "${topic}.${sessionID}" +} + /** * A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in * Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor @@ -94,7 +162,7 @@ interface MessageHandlerRegistration * the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id. */ interface Message { - val topic: String + val topicSession: TopicSession val data: ByteArray val debugTimestamp: Instant val debugMessageID: String diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 5c22ec20f7..5a21444757 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -10,9 +10,10 @@ import java.security.PrivateKey import java.security.PublicKey /** - * Postfix for base topics when sending a request to a service. + * Session ID to use for services listening for the first message in a session (before a + * specific session ID has been established). */ -val TOPIC_DEFAULT_POSTFIX = ".0" +val DEFAULT_SESSION_ID = 0L /** * This file defines various 'services' which are not currently fleshed out. A service is a module that provides diff --git a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt index e55e28b6c9..727421be9e 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt @@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.seconds @@ -157,7 +158,7 @@ object TwoPartyDealProtocol { // Copy the transaction to every regulator in the network. This is obviously completely bogus, it's // just for demo purposes. for (regulator in regulators) { - send(regulator.identity, 0, fullySigned) + send(regulator.identity, DEFAULT_SESSION_ID, fullySigned) } } @@ -461,7 +462,7 @@ object TwoPartyDealProtocol { val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.storageService.myLegalIdentity, timeout) // Send initiation to other side to launch one side of the fixing protocol (the Fixer). - send(sortedParties[1], 0, initation) + send(sortedParties[1], DEFAULT_SESSION_ID, initation) // Then start the other side of the fixing protocol. val protocol = Floater(ref, sessionID) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index ef9f6c1c56..a9ec16792c 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -273,11 +273,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val reg = NodeRegistration(info, networkMapSeq++, type, expires) val sessionID = random63BitValue() val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID) - val message = net.createMessage("$REGISTER_PROTOCOL_TOPIC.0", request.serialize().bits) + val message = net.createMessage(REGISTER_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, request.serialize().bits) val future = SettableFuture.create() - val topic = "$REGISTER_PROTOCOL_TOPIC.$sessionID" - net.runOnNextMessage(topic, RunOnCallerThread) { message -> + net.runOnNextMessage(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread) { message -> future.set(message.data.deserialize()) } net.send(message, serviceInfo.address) diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt index dc354efbc0..94eced249f 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt @@ -2,8 +2,9 @@ package com.r3corda.node.services.api import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.messaging.TopicSession +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.NetworkMapCache -import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize @@ -29,13 +30,13 @@ abstract class AbstractNodeService(val net: MessagingService, val networkMapCach addMessageHandler(topic: String, crossinline handler: (Q) -> R, crossinline exceptionConsumer: (Message, Exception) -> Unit) { - net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r -> + net.addMessageHandler(topic, DEFAULT_SESSION_ID, null) { message, r -> try { val request = message.data.deserialize() val response = handler(request) // If the return type R is Unit, then do not send a response if (response.javaClass != Unit.javaClass) { - val msg = net.createMessage("$topic.${request.sessionID}", response.serialize().bits) + val msg = net.createMessage(topic, request.sessionID, response.serialize().bits) net.send(msg, request.getReplyTo(networkMapCache)) } } catch(e: Exception) { diff --git a/node/src/main/kotlin/com/r3corda/node/services/clientapi/FixingSessionInitiation.kt b/node/src/main/kotlin/com/r3corda/node/services/clientapi/FixingSessionInitiation.kt index fff47b7735..6f7706176c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/clientapi/FixingSessionInitiation.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/clientapi/FixingSessionInitiation.kt @@ -1,6 +1,7 @@ package com.r3corda.node.services.clientapi import com.r3corda.core.node.CordaPluginRegistry +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.serialization.deserialize import com.r3corda.node.internal.AbstractNode import com.r3corda.node.services.api.ServiceHubInternal @@ -19,7 +20,7 @@ object FixingSessionInitiation { class Service(services: ServiceHubInternal) { init { - services.networkService.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration -> + services.networkService.addMessageHandler(TwoPartyDealProtocol.FIX_INITIATE_TOPIC, DEFAULT_SESSION_ID) { msg, registration -> val initiation = msg.data.deserialize() val protocol = TwoPartyDealProtocol.Fixer(initiation) services.startProtocol("fixings", protocol) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt index b4f6e2cb08..c5da7a90ca 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt @@ -81,6 +81,8 @@ class ArtemisMessagingService(val directory: Path, // confusion. val TOPIC_PROPERTY = "platform-topic" + val SESSION_ID_PROPERTY = "session-id" + /** Temp helper until network map is established. */ fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort) fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname)) @@ -101,7 +103,7 @@ class ArtemisMessagingService(val directory: Path, /** A registration to handle messages of different types */ inner class Handler(val executor: Executor?, - val topic: String, + val topicSession: TopicSession, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration private val handlers = CopyOnWriteArrayList() @@ -180,12 +182,17 @@ class ArtemisMessagingService(val directory: Path, log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") return@setMessageHandler } + if (!message.containsProperty(SESSION_ID_PROPERTY)) { + log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring") + return@setMessageHandler + } val topic = message.getStringProperty(TOPIC_PROPERTY) + val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } val msg = object : Message { - override val topic = topic + override val topicSession = TopicSession(topic, sessionID) override val data: ByteArray = body override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) override val debugMessageID: String = message.messageID.toString() @@ -208,12 +215,12 @@ class ArtemisMessagingService(val directory: Path, private fun deliverMessage(msg: Message): Boolean { // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. - val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic } + val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } if (deliverTo.isEmpty()) { // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics // without causing log spam. - log.warn("Received message for ${msg.topic} that doesn't have any registered handlers yet") + log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") // This is a hack; transient messages held in memory isn't crash resistant. // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. @@ -227,7 +234,7 @@ class ArtemisMessagingService(val directory: Path, try { handler.callback(msg, handler) } catch(e: Exception) { - log.error("Caught exception whilst executing message handler for ${msg.topic}", e) + log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e) } } } @@ -259,13 +266,24 @@ class ArtemisMessagingService(val directory: Path, override fun send(message: Message, target: MessageRecipients) { if (target !is Address) TODO("Only simple sends to single recipients are currently implemented") - val artemisMessage = session!!.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data) + val artemisMessage = session!!.createMessage(true).apply { + val sessionID = message.topicSession.sessionID + putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) + putLongProperty(SESSION_ID_PROPERTY, sessionID) + writeBodyBufferBytes(message.data) + } getSendClient(target).send(artemisMessage) } - override fun addMessageHandler(topic: String, executor: Executor?, + override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, + callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + = addMessageHandler(TopicSession(topic, sessionID), executor, callback) + + override fun addMessageHandler(topicSession: TopicSession, + executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { - val handler = Handler(executor, topic, callback) + require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } + val handler = Handler(executor, topicSession, callback) handlers.add(handler) undeliveredMessages.removeIf { deliverMessage(it) } return handler @@ -275,18 +293,21 @@ class ArtemisMessagingService(val directory: Path, handlers.remove(registration) } - override fun createMessage(topic: String, data: ByteArray): Message { + override fun createMessage(topicSession: TopicSession, data: ByteArray): Message { // TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying. return object : Message { - override val topic: String get() = topic + override val topicSession: TopicSession get() = topicSession override val data: ByteArray get() = data override val debugTimestamp: Instant = Instant.now() override fun serialise(): ByteArray = this.serialise() override val debugMessageID: String get() = Instant.now().toEpochMilli().toString() - override fun toString() = topic + "#" + String(data) + override fun toString() = topicSession.toString() + "#" + String(data) } } + override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message + = createMessage(TopicSession(topic, sessionID), data) + override val myAddress: SingleMessageRecipient = Address(myHostPort) private enum class ConnectionDirection { INBOUND, OUTBOUND } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index 5b54e81461..1940430fc7 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -42,7 +42,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private val handleEndpointMap = HashMap() data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) { - override fun toString() = "${message.topic} from '${sender.myAddress}' to '$recipients'" + override fun toString() = "${message.topicSession} from '${sender.myAddress}' to '$recipients'" } // All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false @@ -197,8 +197,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria */ @ThreadSafe inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingServiceInternal { - - inner class Handler(val executor: Executor?, val topic: String, + inner class Handler(val executor: Executor?, val topicSession: TopicSession, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @Volatile @@ -226,10 +225,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria override fun registerTrustedAddress(address: SingleMessageRecipient) {} - override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + = addMessageHandler(TopicSession(topic, sessionID), executor, callback) + + override fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { check(running) val (handler, items) = state.locked { - val handler = Handler(executor, topic, callback).apply { handlers.add(this) } + val handler = Handler(executor, topicSession, callback).apply { handlers.add(this) } val items = ArrayList(pendingRedelivery) pendingRedelivery.clear() Pair(handler, items) @@ -262,16 +264,20 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria netNodeHasShutdown(handle) } - /** Returns the given (topic, data) pair as a newly created message object.*/ - override fun createMessage(topic: String, data: ByteArray): Message { + /** Returns the given (topic & session, data) pair as a newly created message object. */ + override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message + = createMessage(TopicSession(topic, sessionID), data) + + /** Returns the given (topic & session, data) pair as a newly created message object. */ + override fun createMessage(topicSession: TopicSession, data: ByteArray): Message { return object : Message { - override val topic: String get() = topic + override val topicSession: TopicSession get() = topicSession override val data: ByteArray get() = data override val debugTimestamp: Instant = Instant.now() override fun serialise(): ByteArray = this.serialise() override val debugMessageID: String get() = serialise().sha256().prefixChars() - override fun toString() = topic + "#" + String(data) + override fun toString() = topicSession.toString() + "#" + String(data) } } @@ -300,7 +306,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria while (deliverTo == null) { val transfer = (if (block) q.take() else q.poll()) ?: return null deliverTo = state.locked { - val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } + val h = handlers.filter { if (it.topicSession.isBlank()) true else transfer.message.topicSession == it.topicSession } if (h.isEmpty()) { // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new @@ -308,6 +314,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at // least sometimes. + log.warn("Message to ${transfer.message.topicSession} could not be delivered") pendingRedelivery.add(transfer) null } else { @@ -335,7 +342,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria try { handler.callback(transfer.message, handler) } catch(e: Exception) { - loggerFor().error("Caught exception in handler for $this/${handler.topic}", e) + loggerFor().error("Caught exception in handler for $this/${handler.topicSession}", e) } } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 0f0c21ee54..07af6b3782 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -10,10 +10,10 @@ import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.send import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.NetworkCacheError import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType -import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize @@ -57,11 +57,11 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : ifChangedSinceVer: Int?): ListenableFuture { if (subscribe && !registeredForPush) { // Add handler to the network, for updates received from the remote network map service. - net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r -> + net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, null) { message, r -> try { val req = message.data.deserialize() val hash = SecureHash.sha256(req.wireReg.serialize().bits) - val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, + val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits) net.send(ackMessage, req.replyTo) processUpdatePush(req) @@ -81,13 +81,13 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : // Add a message handler for the response, and prepare a future to put the data into. // Note that the message handler will run on the network thread (not this one). val future = SettableFuture.create() - net.runOnNextMessage("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message -> + net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message -> val resp = message.data.deserialize() // We may not receive any nodes back, if the map hasn't changed since the version specified resp.nodes?.forEach { processRegistration(it) } future.set(Unit) } - net.send("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.0", req, service.address) + net.send(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address) return future } @@ -114,7 +114,7 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : // Add a message handler for the response, and prepare a future to put the data into. // Note that the message handler will run on the network thread (not this one). val future = SettableFuture.create() - net.runOnNextMessage("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message -> + net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message -> val resp = message.data.deserialize() if (resp.confirmed) { future.set(Unit) @@ -122,7 +122,7 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : future.setException(NetworkCacheError.DeregistrationFailed()) } } - net.send("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.0", req, service.address) + net.send(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address) return future } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt index ccd6a3545c..0aaa53f39c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt @@ -7,9 +7,9 @@ import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType -import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize @@ -114,7 +114,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v addMessageHandler(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, { req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) } ) - net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, null) { message, r -> + net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, null) { message, r -> val req = message.data.deserialize() processAcknowledge(req) } @@ -144,8 +144,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v // to a MessageRecipientGroup that nodes join/leave, rather than the network map // service itself managing the group val update = NetworkMapService.Update(wireReg, net.myAddress).serialize().bits - val topic = NetworkMapService.PUSH_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX - val message = net.createMessage(topic, update) + val message = net.createMessage(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, update) subscribers.locked { val toRemove = mutableListOf() diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index 515a7021fe..44a76dd1f1 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -75,11 +75,11 @@ object DataVending { .success { services.recordTransactions(req.tx) val resp = NotifyTxResponseMessage(true) - val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits) + val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) net.send(msg, req.getReplyTo(services.networkMapCache)) }.failure { val resp = NotifyTxResponseMessage(false) - val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits) + val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) net.send(msg, req.getReplyTo(services.networkMapCache)) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/FiberRequest.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/FiberRequest.kt index d088df8c74..d5714fee76 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/FiberRequest.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/FiberRequest.kt @@ -1,6 +1,7 @@ package com.r3corda.node.services.statemachine import com.r3corda.core.crypto.Party +import com.r3corda.core.messaging.TopicSession // TODO: Clean this up sealed class FiberRequest(val topic: String, @@ -13,8 +14,8 @@ sealed class FiberRequest(val topic: String, @Transient val stackTraceInCaseOfProblems: StackSnapshot? = StackSnapshot() - val receiveTopic: String - get() = topic + "." + sessionIDForReceive + val receiveTopicSession: TopicSession + get() = TopicSession(topic, sessionIDForReceive) override fun equals(other: Any?): Boolean @@ -66,7 +67,7 @@ sealed class FiberRequest(val topic: String, false override fun toString(): String { - return "Expecting response via topic ${receiveTopic} of type ${responseTypeName}" + return "Expecting response via topic ${receiveTopicSession} of type ${responseTypeName}" } // We have to do an unchecked cast, but unless the serialized form is damaged, this was diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index 23df79646f..64c7f2c423 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -8,6 +8,7 @@ import com.esotericsoftware.kryo.Kryo import com.google.common.base.Throwables import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.abbreviate +import com.r3corda.core.messaging.TopicSession import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.send import com.r3corda.core.protocols.ProtocolLogic @@ -124,7 +125,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService when (checkpoint.request) { is FiberRequest.ExpectingResponse<*> -> { - val topic = checkpoint.request.receiveTopic + val topic = checkpoint.request.receiveTopicSession val awaitingPayloadType = checkpoint.request.responseType fiber.logger.info("Restored ${fiber.logic} - it was previously waiting for message of type ${awaitingPayloadType.name} on topic $topic") iterateOnResponse(fiber, awaitingPayloadType, checkpoint.serialisedFiber, checkpoint.request) { @@ -179,9 +180,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService return createKryo(serializer.kryo) } - private fun logError(e: Throwable, payload: Any?, topic: String?, psm: ProtocolStateMachineImpl<*>) { + private fun logError(e: Throwable, payload: Any?, topicSession: TopicSession?, psm: ProtocolStateMachineImpl<*>) { psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " + - "when handling a message of type ${payload?.javaClass?.name} on topic $topic") + "when handling a message of type ${payload?.javaClass?.name} on queue $topicSession") if (psm.logger.isTraceEnabled) { val s = StringWriter() Throwables.getRootCause(e).printStackTrace(PrintWriter(s)) @@ -265,12 +266,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } } // If a non-null payload to send was provided, send it now. + val queueID = TopicSession(request.topic, request.sessionIDForSend) request.payload?.let { - val topic = "${request.topic}.${request.sessionIDForSend}" - psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" } + psm.logger.trace { "Sending message of type ${it.javaClass.name} using queue $queueID to ${request.destination} (${it.toString().abbreviate(50)})" } val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination!!.name) requireNotNull(node) { "Don't know about ${request.destination}" } - serviceHub.networkService.send(topic, it, node!!.address) + serviceHub.networkService.send(queueID, it, node!!.address) } if (request is FiberRequest.NotExpectingResponse) { // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. @@ -278,7 +279,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService try { Fiber.unpark(psm, QUASAR_UNBLOCKER) } catch(e: Throwable) { - logError(e, request.payload, request.topic, psm) + logError(e, request.payload, queueID, psm) } } } @@ -286,15 +287,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService private fun checkpointOnExpectingResponse(psm: ProtocolStateMachineImpl<*>, request: FiberRequest.ExpectingResponse<*>) { executor.checkOnThread() - val topic = "${request.topic}.${request.sessionIDForReceive}" + val queueID = request.receiveTopicSession val serialisedFiber = serializeFiber(psm) updateCheckpoint(psm, serialisedFiber, request) - psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on topic $topic" } + psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on queue $queueID" } iterateOnResponse(psm, request.responseType, serialisedFiber, request) { try { Fiber.unpark(psm, QUASAR_UNBLOCKER) } catch(e: Throwable) { - logError(e, it, topic, psm) + logError(e, it, queueID, psm) } } } @@ -308,7 +309,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService serialisedFiber: SerializedBytes>, request: FiberRequest.ExpectingResponse<*>, resumeAction: (Any?) -> Unit) { - val topic = request.receiveTopic + val topic = request.receiveTopicSession serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg -> // Assertion to ensure we don't execute on the wrong thread. executor.checkOnThread() @@ -322,7 +323,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService check(responseType.isInstance(payload)) { "Expected message of type ${responseType.name} but got ${payload.javaClass.name}" } // Update the fiber's checkpoint so that it's no longer waiting on a response, but rather has the received payload updateCheckpoint(psm, serialisedFiber, request) - psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic $topic (${payload.toString().abbreviate(50)})" } + psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic ${request.topic}.${request.sessionIDForReceive} (${payload.toString().abbreviate(50)})" } iterateStateMachine(psm, payload, resumeAction) } } diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt index d939d93015..86a0e6444c 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt @@ -4,6 +4,7 @@ package com.r3corda.node.messaging import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.TopicStringValidator +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.node.internal.testing.MockNetwork import org.junit.Before import org.junit.Test @@ -61,7 +62,7 @@ class InMemoryMessagingTests { } // Node 1 sends a message and it should end up in finalDelivery, after we run the network - node1.net.send(node1.net.createMessage("test.topic", bits), node2.info.address) + node1.net.send(node1.net.createMessage("test.topic", DEFAULT_SESSION_ID, bits), node2.info.address) network.runNetwork(rounds = 1) @@ -78,7 +79,7 @@ class InMemoryMessagingTests { var counter = 0 listOf(node1, node2, node3).forEach { it.net.addMessageHandler { msg, registration -> counter++ } } - node1.net.send(node2.net.createMessage("test.topic", bits), network.messagingNetwork.everyoneOnline) + node1.net.send(node2.net.createMessage("test.topic", DEFAULT_SESSION_ID, bits), network.messagingNetwork.everyoneOnline) network.runNetwork(rounds = 1) assertEquals(3, counter) } @@ -97,8 +98,8 @@ class InMemoryMessagingTests { received++ } - val invalidMessage = node2.net.createMessage("invalid_message", ByteArray(0)) - val validMessage = node2.net.createMessage("valid_message", ByteArray(0)) + val invalidMessage = node2.net.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0)) + val validMessage = node2.net.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0)) node2.net.send(invalidMessage, node1.net.myAddress) network.runNetwork() assertEquals(0, received) diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt index e2cd96e023..a068ce61f9 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt @@ -1,10 +1,10 @@ package com.r3corda.node.services import com.r3corda.core.messaging.Message +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.testing.freeLocalHostAndPort import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingService -import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After import org.junit.Rule @@ -14,6 +14,9 @@ import java.net.ServerSocket import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.SECONDS +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull class ArtemisMessagingServiceTests { @@ -48,11 +51,13 @@ class ArtemisMessagingServiceTests { receivedMessages.add(message) } - val message = messagingNetwork.createMessage(topic, "first msg".toByteArray()) + val message = messagingNetwork.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) messagingNetwork.send(message, messagingNetwork.myAddress) - assertThat(String(receivedMessages.poll(2, SECONDS).data)).isEqualTo("first msg") - assertThat(receivedMessages.poll(200, MILLISECONDS)).isNull() + val actual = receivedMessages.poll(2, SECONDS) + assertNotNull(actual) + assertEquals("first msg", String(actual.data)) + assertNull(receivedMessages.poll(200, MILLISECONDS)) } private fun createMessagingService(): ArtemisMessagingService { diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt index 4a2a24fb28..cc8968b67f 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import com.r3corda.contracts.asset.Cash import com.r3corda.core.contracts.* import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.testing.DUMMY_NOTARY @@ -39,7 +40,7 @@ class DataVendingServiceTests { override fun call(): Boolean { val sessionID = random63BitValue() val req = DataVending.Service.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID) - return sendAndReceive(server.identity, 0, sessionID, req).validate { it.accepted } + return sendAndReceive(server.identity, DEFAULT_SESSION_ID, sessionID, req).validate { it.accepted } } } diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index ca2c357cfe..e3db2518b8 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -13,6 +13,7 @@ import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.days import com.r3corda.core.logElapsedTime import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.ServiceType import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue @@ -217,7 +218,7 @@ private fun runBuyer(node: Node, amount: Amount>) { // next stage in our building site, we will just auto-generate fake trades to give our nodes something to do. // // As the seller initiates the two-party trade protocol, here, we will be the buyer. - node.services.networkService.addMessageHandler("$DEMO_TOPIC.0") { message, registration -> + node.services.networkService.addMessageHandler(DEMO_TOPIC, DEFAULT_SESSION_ID) { message, registration -> // We use a simple scenario-specific wrapper protocol to make things happen. val otherSide = message.data.deserialize() val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount) diff --git a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt index 6b13d2b533..047e9a10da 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt @@ -7,6 +7,7 @@ import com.r3corda.core.contracts.DealState import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.crypto.Party import com.r3corda.core.node.CordaPluginRegistry +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize @@ -53,7 +54,7 @@ object AutoOfferProtocol { } init { - services.networkService.addMessageHandler("$TOPIC.0") { msg, registration -> + services.networkService.addMessageHandler(TOPIC, DEFAULT_SESSION_ID) { msg, registration -> val progressTracker = tracker() progressTracker.currentStep = RECEIVED val autoOfferMessage = msg.data.deserialize() diff --git a/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt index a1cdc9ea11..d28b9a2910 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import com.r3corda.core.node.CordaPluginRegistry import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.serialization.deserialize import com.r3corda.node.services.api.ServiceHubInternal @@ -27,7 +28,7 @@ object ExitServerProtocol { class Service(services: ServiceHubInternal) { init { - services.networkService.addMessageHandler("$TOPIC.0") { msg, registration -> + services.networkService.addMessageHandler(TOPIC, DEFAULT_SESSION_ID) { msg, registration -> // Just to validate we got the message if (enabled) { val message = msg.data.deserialize() diff --git a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt index 6692b946c4..4edad7436e 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt @@ -3,6 +3,7 @@ package com.r3corda.demos.protocols import co.paralleluniverse.fibers.Suspendable import com.r3corda.core.node.CordaPluginRegistry import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.ProgressTracker @@ -28,7 +29,7 @@ object UpdateBusinessDayProtocol { class Service(services: ServiceHubInternal) { init { - services.networkService.addMessageHandler("${TOPIC}.0") { msg, registration -> + services.networkService.addMessageHandler(TOPIC, DEFAULT_SESSION_ID) { msg, registration -> val updateBusinessDayMessage = msg.data.deserialize() (services.clock as DemoClock).updateDate(updateBusinessDayMessage.date) }