Break down topic into component parts

Break down what is referred to as "topic" of a message into its component parts. This splits the
general topic from the session ID, so it's clear where a session ID is provided, and whether any
given topic string includes a session ID or not.
This commit is contained in:
Ross Nicoll 2016-07-29 10:36:21 +01:00
parent 7d39a101d4
commit cf4bb0c9af
20 changed files with 198 additions and 87 deletions

View File

@ -1,6 +1,7 @@
package com.r3corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef
import com.r3corda.core.serialization.serialize
import java.time.Instant
@ -22,7 +23,7 @@ import javax.annotation.concurrent.ThreadSafe
interface MessagingService {
/**
* The provided function will be invoked for each received message whose topic matches the given string, on the given
* executor. The topic can be the empty string to match all messages.
* executor.
*
* If no executor is received then the callback will run on threads provided by the messaging service, and the
* callback is expected to be thread safe as a result.
@ -30,8 +31,28 @@ interface MessagingService {
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
* itself and yet addMessageHandler hasn't returned the handle yet.
*
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
*/
fun addMessageHandler(topic: String = "", executor: Executor? = null, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, executor: Executor? = null, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
/**
* The provided function will be invoked for each received message whose topic and session matches, on the
* given executor.
*
* If no executor is received then the callback will run on threads provided by the messaging service, and the
* callback is expected to be thread safe as a result.
*
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
* itself and yet addMessageHandler hasn't returned the handle yet.
*
* @param topicSession identifier for the topic and session to listen for messages arriving on.
*/
fun addMessageHandler(topicSession: TopicSession, executor: Executor? = null, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
/**
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
@ -55,34 +76,81 @@ interface MessagingService {
/**
* Returns an initialised [Message] with the current time, etc, already filled in.
*
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* Must not be blank.
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
* construction of a session, use [DEFAULT_SESSION_ID].
*/
fun createMessage(topic: String, data: ByteArray): Message
fun createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
/**
* Returns an initialised [Message] with the current time, etc, already filled in.
*
* @param topicSession identifier for the topic and session the message is sent to.
*/
fun createMessage(topicSession: TopicSession, data: ByteArray): Message
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
}
/**
* Registers a handler for the given topic that runs the given callback with the message and then removes itself. This
* is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback doesn't
* take the registration object, unlike the callback to [MessagingService.addMessageHandler].
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is
* automatically deregistered before the callback runs.
*
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
*/
fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) {
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: Executor? = null, callback: (Message) -> Unit)
= runOnNextMessage(TopicSession(topic, sessionID), executor, callback)
/**
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler].
*
* @param topicSession identifier for the topic and session to listen for messages arriving on.
*/
fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, callback: (Message) -> Unit) {
val consumed = AtomicBoolean()
addMessageHandler(topic, executor) { msg, reg ->
addMessageHandler(topicSession, executor) { msg, reg ->
removeMessageHandler(reg)
check(!consumed.getAndSet(true)) { "Called more than once" }
check(msg.topic == topic) { "Topic mismatch: ${msg.topic} vs $topic" }
check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" }
callback(msg)
}
}
fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) {
send(createMessage(topic, payload.serialize().bits), to)
}
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients)
= send(TopicSession(topic, sessionID), payload, to)
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients)
= send(createMessage(topicSession, payload.serialize().bits), to)
interface MessageHandlerRegistration
/**
* An identifier for the endpoint [MessagingService] message handlers listen at.
*
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
*/
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
companion object {
val Blank = TopicSession("", DEFAULT_SESSION_ID)
}
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
override fun toString(): String = "${topic}.${sessionID}"
}
/**
* A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in
* Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor
@ -94,7 +162,7 @@ interface MessageHandlerRegistration
* the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id.
*/
interface Message {
val topic: String
val topicSession: TopicSession
val data: ByteArray
val debugTimestamp: Instant
val debugMessageID: String

View File

@ -10,9 +10,10 @@ import java.security.PrivateKey
import java.security.PublicKey
/**
* Postfix for base topics when sending a request to a service.
* Session ID to use for services listening for the first message in a session (before a
* specific session ID has been established).
*/
val TOPIC_DEFAULT_POSTFIX = ".0"
val DEFAULT_SESSION_ID = 0L
/**
* This file defines various 'services' which are not currently fleshed out. A service is a module that provides

View File

@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds
@ -157,7 +158,7 @@ object TwoPartyDealProtocol {
// Copy the transaction to every regulator in the network. This is obviously completely bogus, it's
// just for demo purposes.
for (regulator in regulators) {
send(regulator.identity, 0, fullySigned)
send(regulator.identity, DEFAULT_SESSION_ID, fullySigned)
}
}
@ -461,7 +462,7 @@ object TwoPartyDealProtocol {
val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.storageService.myLegalIdentity, timeout)
// Send initiation to other side to launch one side of the fixing protocol (the Fixer).
send(sortedParties[1], 0, initation)
send(sortedParties[1], DEFAULT_SESSION_ID, initation)
// Then start the other side of the fixing protocol.
val protocol = Floater(ref, sessionID)

View File

@ -273,11 +273,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
val reg = NodeRegistration(info, networkMapSeq++, type, expires)
val sessionID = random63BitValue()
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
val message = net.createMessage("$REGISTER_PROTOCOL_TOPIC.0", request.serialize().bits)
val message = net.createMessage(REGISTER_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, request.serialize().bits)
val future = SettableFuture.create<NetworkMapService.RegistrationResponse>()
val topic = "$REGISTER_PROTOCOL_TOPIC.$sessionID"
net.runOnNextMessage(topic, RunOnCallerThread) { message ->
net.runOnNextMessage(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread) { message ->
future.set(message.data.deserialize())
}
net.send(message, serviceInfo.address)

View File

@ -2,8 +2,9 @@ package com.r3corda.node.services.api
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
@ -29,13 +30,13 @@ abstract class AbstractNodeService(val net: MessagingService, val networkMapCach
addMessageHandler(topic: String,
crossinline handler: (Q) -> R,
crossinline exceptionConsumer: (Message, Exception) -> Unit) {
net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
net.addMessageHandler(topic, DEFAULT_SESSION_ID, null) { message, r ->
try {
val request = message.data.deserialize<Q>()
val response = handler(request)
// If the return type R is Unit, then do not send a response
if (response.javaClass != Unit.javaClass) {
val msg = net.createMessage("$topic.${request.sessionID}", response.serialize().bits)
val msg = net.createMessage(topic, request.sessionID, response.serialize().bits)
net.send(msg, request.getReplyTo(networkMapCache))
}
} catch(e: Exception) {

View File

@ -1,6 +1,7 @@
package com.r3corda.node.services.clientapi
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.ServiceHubInternal
@ -19,7 +20,7 @@ object FixingSessionInitiation {
class Service(services: ServiceHubInternal) {
init {
services.networkService.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
services.networkService.addMessageHandler(TwoPartyDealProtocol.FIX_INITIATE_TOPIC, DEFAULT_SESSION_ID) { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
services.startProtocol("fixings", protocol)

View File

@ -81,6 +81,8 @@ class ArtemisMessagingService(val directory: Path,
// confusion.
val TOPIC_PROPERTY = "platform-topic"
val SESSION_ID_PROPERTY = "session-id"
/** Temp helper until network map is established. */
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname))
@ -101,7 +103,7 @@ class ArtemisMessagingService(val directory: Path,
/** A registration to handle messages of different types */
inner class Handler(val executor: Executor?,
val topic: String,
val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
private val handlers = CopyOnWriteArrayList<Handler>()
@ -180,12 +182,17 @@ class ArtemisMessagingService(val directory: Path,
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
return@setMessageHandler
}
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
return@setMessageHandler
}
val topic = message.getStringProperty(TOPIC_PROPERTY)
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val msg = object : Message {
override val topic = topic
override val topicSession = TopicSession(topic, sessionID)
override val data: ByteArray = body
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val debugMessageID: String = message.messageID.toString()
@ -208,12 +215,12 @@ class ArtemisMessagingService(val directory: Path,
private fun deliverMessage(msg: Message): Boolean {
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic }
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty()) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for ${msg.topic} that doesn't have any registered handlers yet")
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
// This is a hack; transient messages held in memory isn't crash resistant.
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
@ -227,7 +234,7 @@ class ArtemisMessagingService(val directory: Path,
try {
handler.callback(msg, handler)
} catch(e: Exception) {
log.error("Caught exception whilst executing message handler for ${msg.topic}", e)
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
}
}
}
@ -259,13 +266,24 @@ class ArtemisMessagingService(val directory: Path,
override fun send(message: Message, target: MessageRecipients) {
if (target !is Address)
TODO("Only simple sends to single recipients are currently implemented")
val artemisMessage = session!!.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data)
val artemisMessage = session!!.createMessage(true).apply {
val sessionID = message.topicSession.sessionID
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data)
}
getSendClient(target).send(artemisMessage)
}
override fun addMessageHandler(topic: String, executor: Executor?,
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
override fun addMessageHandler(topicSession: TopicSession,
executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
val handler = Handler(executor, topic, callback)
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(executor, topicSession, callback)
handlers.add(handler)
undeliveredMessages.removeIf { deliverMessage(it) }
return handler
@ -275,18 +293,21 @@ class ArtemisMessagingService(val directory: Path,
handlers.remove(registration)
}
override fun createMessage(topic: String, data: ByteArray): Message {
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
return object : Message {
override val topic: String get() = topic
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
override fun toString() = topic + "#" + String(data)
override fun toString() = topicSession.toString() + "#" + String(data)
}
}
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
override val myAddress: SingleMessageRecipient = Address(myHostPort)
private enum class ConnectionDirection { INBOUND, OUTBOUND }

View File

@ -42,7 +42,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) {
override fun toString() = "${message.topic} from '${sender.myAddress}' to '$recipients'"
override fun toString() = "${message.topicSession} from '${sender.myAddress}' to '$recipients'"
}
// All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false
@ -197,8 +197,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
*/
@ThreadSafe
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingServiceInternal {
inner class Handler(val executor: Executor?, val topic: String,
inner class Handler(val executor: Executor?, val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@Volatile
@ -226,10 +225,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
override fun registerTrustedAddress(address: SingleMessageRecipient) {}
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
override fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running)
val (handler, items) = state.locked {
val handler = Handler(executor, topic, callback).apply { handlers.add(this) }
val handler = Handler(executor, topicSession, callback).apply { handlers.add(this) }
val items = ArrayList(pendingRedelivery)
pendingRedelivery.clear()
Pair(handler, items)
@ -262,16 +264,20 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
netNodeHasShutdown(handle)
}
/** Returns the given (topic, data) pair as a newly created message object.*/
override fun createMessage(topic: String, data: ByteArray): Message {
/** Returns the given (topic & session, data) pair as a newly created message object. */
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
/** Returns the given (topic & session, data) pair as a newly created message object. */
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
return object : Message {
override val topic: String get() = topic
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val debugMessageID: String get() = serialise().sha256().prefixChars()
override fun toString() = topic + "#" + String(data)
override fun toString() = topicSession.toString() + "#" + String(data)
}
}
@ -300,7 +306,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
while (deliverTo == null) {
val transfer = (if (block) q.take() else q.poll()) ?: return null
deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
val h = handlers.filter { if (it.topicSession.isBlank()) true else transfer.message.topicSession == it.topicSession }
if (h.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
@ -308,6 +314,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
log.warn("Message to ${transfer.message.topicSession} could not be delivered")
pendingRedelivery.add(transfer)
null
} else {
@ -335,7 +342,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
try {
handler.callback(transfer.message, handler)
} catch(e: Exception) {
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
}
}
}

View File

@ -10,10 +10,10 @@ import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkCacheError
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
@ -57,11 +57,11 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r ->
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, null) { message, r ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX,
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID,
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits)
net.send(ackMessage, req.replyTo)
processUpdatePush(req)
@ -81,13 +81,13 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message ->
net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
future.set(Unit)
}
net.send("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.0", req, service.address)
net.send(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address)
return future
}
@ -114,7 +114,7 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message ->
net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
if (resp.confirmed) {
future.set(Unit)
@ -122,7 +122,7 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
future.setException(NetworkCacheError.DeregistrationFailed())
}
}
net.send("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.0", req, service.address)
net.send(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address)
return future
}

View File

@ -7,9 +7,9 @@ import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
@ -114,7 +114,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
addMessageHandler(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
{ req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) }
)
net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, null) { message, r ->
val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>()
processAcknowledge(req)
}
@ -144,8 +144,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, net.myAddress).serialize().bits
val topic = NetworkMapService.PUSH_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX
val message = net.createMessage(topic, update)
val message = net.createMessage(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, update)
subscribers.locked {
val toRemove = mutableListOf<SingleMessageRecipient>()

View File

@ -75,11 +75,11 @@ object DataVending {
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
}

View File

@ -1,6 +1,7 @@
package com.r3corda.node.services.statemachine
import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.TopicSession
// TODO: Clean this up
sealed class FiberRequest(val topic: String,
@ -13,8 +14,8 @@ sealed class FiberRequest(val topic: String,
@Transient
val stackTraceInCaseOfProblems: StackSnapshot? = StackSnapshot()
val receiveTopic: String
get() = topic + "." + sessionIDForReceive
val receiveTopicSession: TopicSession
get() = TopicSession(topic, sessionIDForReceive)
override fun equals(other: Any?): Boolean
@ -66,7 +67,7 @@ sealed class FiberRequest(val topic: String,
false
override fun toString(): String {
return "Expecting response via topic ${receiveTopic} of type ${responseTypeName}"
return "Expecting response via topic ${receiveTopicSession} of type ${responseTypeName}"
}
// We have to do an unchecked cast, but unless the serialized form is damaged, this was

View File

@ -8,6 +8,7 @@ import com.esotericsoftware.kryo.Kryo
import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.abbreviate
import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send
import com.r3corda.core.protocols.ProtocolLogic
@ -124,7 +125,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
when (checkpoint.request) {
is FiberRequest.ExpectingResponse<*> -> {
val topic = checkpoint.request.receiveTopic
val topic = checkpoint.request.receiveTopicSession
val awaitingPayloadType = checkpoint.request.responseType
fiber.logger.info("Restored ${fiber.logic} - it was previously waiting for message of type ${awaitingPayloadType.name} on topic $topic")
iterateOnResponse(fiber, awaitingPayloadType, checkpoint.serialisedFiber, checkpoint.request) {
@ -179,9 +180,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
return createKryo(serializer.kryo)
}
private fun logError(e: Throwable, payload: Any?, topic: String?, psm: ProtocolStateMachineImpl<*>) {
private fun logError(e: Throwable, payload: Any?, topicSession: TopicSession?, psm: ProtocolStateMachineImpl<*>) {
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
"when handling a message of type ${payload?.javaClass?.name} on topic $topic")
"when handling a message of type ${payload?.javaClass?.name} on queue $topicSession")
if (psm.logger.isTraceEnabled) {
val s = StringWriter()
Throwables.getRootCause(e).printStackTrace(PrintWriter(s))
@ -265,12 +266,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
}
}
// If a non-null payload to send was provided, send it now.
val queueID = TopicSession(request.topic, request.sessionIDForSend)
request.payload?.let {
val topic = "${request.topic}.${request.sessionIDForSend}"
psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" }
psm.logger.trace { "Sending message of type ${it.javaClass.name} using queue $queueID to ${request.destination} (${it.toString().abbreviate(50)})" }
val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination!!.name)
requireNotNull(node) { "Don't know about ${request.destination}" }
serviceHub.networkService.send(topic, it, node!!.address)
serviceHub.networkService.send(queueID, it, node!!.address)
}
if (request is FiberRequest.NotExpectingResponse) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
@ -278,7 +279,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
try {
Fiber.unpark(psm, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, request.payload, request.topic, psm)
logError(e, request.payload, queueID, psm)
}
}
}
@ -286,15 +287,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
private fun checkpointOnExpectingResponse(psm: ProtocolStateMachineImpl<*>, request: FiberRequest.ExpectingResponse<*>) {
executor.checkOnThread()
val topic = "${request.topic}.${request.sessionIDForReceive}"
val queueID = request.receiveTopicSession
val serialisedFiber = serializeFiber(psm)
updateCheckpoint(psm, serialisedFiber, request)
psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on topic $topic" }
psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on queue $queueID" }
iterateOnResponse(psm, request.responseType, serialisedFiber, request) {
try {
Fiber.unpark(psm, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, it, topic, psm)
logError(e, it, queueID, psm)
}
}
}
@ -308,7 +309,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
request: FiberRequest.ExpectingResponse<*>,
resumeAction: (Any?) -> Unit) {
val topic = request.receiveTopic
val topic = request.receiveTopicSession
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
// Assertion to ensure we don't execute on the wrong thread.
executor.checkOnThread()
@ -322,7 +323,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
check(responseType.isInstance(payload)) { "Expected message of type ${responseType.name} but got ${payload.javaClass.name}" }
// Update the fiber's checkpoint so that it's no longer waiting on a response, but rather has the received payload
updateCheckpoint(psm, serialisedFiber, request)
psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic $topic (${payload.toString().abbreviate(50)})" }
psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic ${request.topic}.${request.sessionIDForReceive} (${payload.toString().abbreviate(50)})" }
iterateStateMachine(psm, payload, resumeAction)
}
}

View File

@ -4,6 +4,7 @@ package com.r3corda.node.messaging
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.TopicStringValidator
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.node.internal.testing.MockNetwork
import org.junit.Before
import org.junit.Test
@ -61,7 +62,7 @@ class InMemoryMessagingTests {
}
// Node 1 sends a message and it should end up in finalDelivery, after we run the network
node1.net.send(node1.net.createMessage("test.topic", bits), node2.info.address)
node1.net.send(node1.net.createMessage("test.topic", DEFAULT_SESSION_ID, bits), node2.info.address)
network.runNetwork(rounds = 1)
@ -78,7 +79,7 @@ class InMemoryMessagingTests {
var counter = 0
listOf(node1, node2, node3).forEach { it.net.addMessageHandler { msg, registration -> counter++ } }
node1.net.send(node2.net.createMessage("test.topic", bits), network.messagingNetwork.everyoneOnline)
node1.net.send(node2.net.createMessage("test.topic", DEFAULT_SESSION_ID, bits), network.messagingNetwork.everyoneOnline)
network.runNetwork(rounds = 1)
assertEquals(3, counter)
}
@ -97,8 +98,8 @@ class InMemoryMessagingTests {
received++
}
val invalidMessage = node2.net.createMessage("invalid_message", ByteArray(0))
val validMessage = node2.net.createMessage("valid_message", ByteArray(0))
val invalidMessage = node2.net.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0))
val validMessage = node2.net.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0))
node2.net.send(invalidMessage, node1.net.myAddress)
network.runNetwork()
assertEquals(0, received)

View File

@ -1,10 +1,10 @@
package com.r3corda.node.services
import com.r3corda.core.messaging.Message
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.testing.freeLocalHostAndPort
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Rule
@ -14,6 +14,9 @@ import java.net.ServerSocket
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
class ArtemisMessagingServiceTests {
@ -48,11 +51,13 @@ class ArtemisMessagingServiceTests {
receivedMessages.add(message)
}
val message = messagingNetwork.createMessage(topic, "first msg".toByteArray())
val message = messagingNetwork.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingNetwork.send(message, messagingNetwork.myAddress)
assertThat(String(receivedMessages.poll(2, SECONDS).data)).isEqualTo("first msg")
assertThat(receivedMessages.poll(200, MILLISECONDS)).isNull()
val actual = receivedMessages.poll(2, SECONDS)
assertNotNull(actual)
assertEquals("first msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
private fun createMessagingService(): ArtemisMessagingService {

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.testing.DUMMY_NOTARY
@ -39,7 +40,7 @@ class DataVendingServiceTests {
override fun call(): Boolean {
val sessionID = random63BitValue()
val req = DataVending.Service.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID)
return sendAndReceive<DataVending.Service.NotifyTxResponseMessage>(server.identity, 0, sessionID, req).validate { it.accepted }
return sendAndReceive<DataVending.Service.NotifyTxResponseMessage>(server.identity, DEFAULT_SESSION_ID, sessionID, req).validate { it.accepted }
}
}

View File

@ -13,6 +13,7 @@ import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.days
import com.r3corda.core.logElapsedTime
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
@ -217,7 +218,7 @@ private fun runBuyer(node: Node, amount: Amount<Issued<Currency>>) {
// next stage in our building site, we will just auto-generate fake trades to give our nodes something to do.
//
// As the seller initiates the two-party trade protocol, here, we will be the buyer.
node.services.networkService.addMessageHandler("$DEMO_TOPIC.0") { message, registration ->
node.services.networkService.addMessageHandler(DEMO_TOPIC, DEFAULT_SESSION_ID) { message, registration ->
// We use a simple scenario-specific wrapper protocol to make things happen.
val otherSide = message.data.deserialize<Party>()
val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount)

View File

@ -7,6 +7,7 @@ import com.r3corda.core.contracts.DealState
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
@ -53,7 +54,7 @@ object AutoOfferProtocol {
}
init {
services.networkService.addMessageHandler("$TOPIC.0") { msg, registration ->
services.networkService.addMessageHandler(TOPIC, DEFAULT_SESSION_ID) { msg, registration ->
val progressTracker = tracker()
progressTracker.currentStep = RECEIVED
val autoOfferMessage = msg.data.deserialize<AutoOfferMessage>()

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.services.api.ServiceHubInternal
@ -27,7 +28,7 @@ object ExitServerProtocol {
class Service(services: ServiceHubInternal) {
init {
services.networkService.addMessageHandler("$TOPIC.0") { msg, registration ->
services.networkService.addMessageHandler(TOPIC, DEFAULT_SESSION_ID) { msg, registration ->
// Just to validate we got the message
if (enabled) {
val message = msg.data.deserialize<ExitMessage>()

View File

@ -3,6 +3,7 @@ package com.r3corda.demos.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.ProgressTracker
@ -28,7 +29,7 @@ object UpdateBusinessDayProtocol {
class Service(services: ServiceHubInternal) {
init {
services.networkService.addMessageHandler("${TOPIC}.0") { msg, registration ->
services.networkService.addMessageHandler(TOPIC, DEFAULT_SESSION_ID) { msg, registration ->
val updateBusinessDayMessage = msg.data.deserialize<UpdateBusinessDayMessage>()
(services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)
}