diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index df2cc5f069..7c96c484e4 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -14,7 +14,6 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.internal.Node -import net.corda.node.services.api.DEFAULT_SESSION_ID import net.corda.node.services.messaging.* import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.SimpleNotaryService @@ -167,7 +166,7 @@ class P2PMessagingTest : NodeBasedTest() { distributedServiceNodes.forEach { val nodeName = it.info.legalIdentity.name var ignoreRequests = false - it.network.addMessageHandler(dummyTopic, DEFAULT_SESSION_ID) { netMessage, _ -> + it.network.addMessageHandler(dummyTopic) { netMessage, _ -> requestsReceived.incrementAndGet() firstRequestReceived.countDown() // The node which receives the first request will ignore all requests @@ -210,7 +209,7 @@ class P2PMessagingTest : NodeBasedTest() { } private fun Node.respondWith(message: Any) { - network.addMessageHandler(javaClass.name, DEFAULT_SESSION_ID) { netMessage, _ -> + network.addMessageHandler(javaClass.name) { netMessage, _ -> val request = netMessage.data.deserialize() val response = network.createMessage(javaClass.name, request.sessionID, message.serialize().bytes) network.send(response, request.replyTo) diff --git a/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt index 73acab6e44..f14f0af01a 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt @@ -27,7 +27,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton addMessageHandler(topic: String, crossinline handler: (Q) -> R, crossinline exceptionConsumer: (Message, Exception) -> Unit): MessageHandlerRegistration { - return network.addMessageHandler(topic, DEFAULT_SESSION_ID) { message, _ -> + return network.addMessageHandler(topic, MessagingService.DEFAULT_SESSION_ID) { message, _ -> try { val request = message.data.deserialize() val response = handler(request) diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index a3989324a0..69e010892b 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -24,12 +24,6 @@ import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.utilities.CordaPersistence -/** - * Session ID to use for services listening for the first message in a session (before a - * specific session ID has been established). - */ -val DEFAULT_SESSION_ID = 0L - interface NetworkMapCacheInternal : NetworkMapCache { /** * Deregister from updates from the given map service. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 3e65c572f0..4471cb24b2 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -9,7 +9,6 @@ import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.node.services.api.DEFAULT_SESSION_ID import org.bouncycastle.asn1.x500.X500Name import java.time.Instant import java.util.* @@ -28,6 +27,14 @@ import javax.annotation.concurrent.ThreadSafe */ @ThreadSafe interface MessagingService { + companion object { + /** + * Session ID to use for services listening for the first message in a session (before a + * specific session ID has been established). + */ + val DEFAULT_SESSION_ID = 0L + } + /** * The provided function will be invoked for each received message whose topic matches the given string. The callback * will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result. @@ -112,7 +119,7 @@ interface MessagingService { * @param sessionID identifier for the session the message is part of. For messages sent to services before the * construction of a session, use [DEFAULT_SESSION_ID]. */ -fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message +fun MessagingService.createMessage(topic: String, sessionID: Long = MessagingService.DEFAULT_SESSION_ID, data: ByteArray): Message = createMessage(TopicSession(topic, sessionID), data) /** @@ -191,8 +198,8 @@ interface MessageHandlerRegistration * a session is established, use [DEFAULT_SESSION_ID]. */ @CordaSerializable -data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) { - fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID +data class TopicSession(val topic: String, val sessionID: Long = MessagingService.DEFAULT_SESSION_ID) { + fun isBlank() = topic.isBlank() && sessionID == MessagingService.DEFAULT_SESSION_ID override fun toString(): String = "$topic.$sessionID" } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt index 7536fcc33c..15c68a3b66 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt @@ -4,7 +4,6 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.serialization.CordaSerializable -import net.corda.node.services.api.DEFAULT_SESSION_ID /** * Abstract superclass for request messages sent to services which expect a reply. @@ -23,6 +22,6 @@ fun MessagingService.sendRequest(topic: String, request: ServiceRequestMessage, target: MessageRecipients): CordaFuture { val responseFuture = onNext(topic, request.sessionID) - send(topic, DEFAULT_SESSION_ID, request, target) + send(topic, MessagingService.DEFAULT_SESSION_ID, request, target) return responseFuture } diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index 3ae68413d7..774e93f499 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -1,10 +1,10 @@ package net.corda.node.services.network -import net.corda.core.internal.VisibleForTesting import net.corda.core.concurrent.CordaFuture -import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed @@ -17,7 +17,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor -import net.corda.node.services.api.DEFAULT_SESSION_ID import net.corda.node.services.api.NetworkCacheError import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.messaging.MessagingService @@ -99,11 +98,11 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single ifChangedSinceVer: Int?): CordaFuture { if (subscribe && !registeredForPush) { // Add handler to the network, for updates received from the remote network map service. - network.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ -> + network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ -> try { val req = message.data.deserialize() - val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, - NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes) + val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, + data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes) network.send(ackMessage, req.replyTo) processUpdatePush(req) } catch(e: NodeMapError) { diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt index 0fed3ea12b..21b728e6ff 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt @@ -1,12 +1,12 @@ package net.corda.node.services.network -import net.corda.core.internal.VisibleForTesting -import net.corda.core.internal.ThreadBox import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.SignedData import net.corda.core.crypto.isFulfilledBy import net.corda.core.crypto.random63BitValue import net.corda.core.identity.PartyAndCertificate +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.VisibleForTesting import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo @@ -20,9 +20,9 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.services.api.AbstractNodeService -import net.corda.node.services.api.DEFAULT_SESSION_ID import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.MessageHandlerRegistration +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.ServiceRequestMessage import net.corda.node.services.messaging.createMessage import net.corda.node.services.network.NetworkMapService.* @@ -172,7 +172,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal, handlers += addMessageHandler(QUERY_TOPIC) { req: QueryIdentityRequest -> processQueryRequest(req) } handlers += addMessageHandler(REGISTER_TOPIC) { req: RegistrationRequest -> processRegistrationRequest(req) } handlers += addMessageHandler(SUBSCRIPTION_TOPIC) { req: SubscribeRequest -> processSubscriptionRequest(req) } - handlers += network.addMessageHandler(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, _ -> + handlers += network.addMessageHandler(PUSH_ACK_TOPIC) { message, _ -> val req = message.data.deserialize() processAcknowledge(req) } @@ -290,7 +290,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal, // to a MessageRecipientGroup that nodes join/leave, rather than the network map // service itself managing the group val update = NetworkMapService.Update(wireReg, newMapVersion, network.myAddress).serialize().bytes - val message = network.createMessage(PUSH_TOPIC, DEFAULT_SESSION_ID, update) + val message = network.createMessage(PUSH_TOPIC, data = update) subscribers.locked { // Remove any stale subscribers diff --git a/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt b/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt index 7dbce0a062..01f04ada97 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt @@ -1,14 +1,13 @@ package net.corda.node.messaging import net.corda.core.node.services.ServiceInfo -import net.corda.node.services.api.DEFAULT_SESSION_ID import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.TopicStringValidator import net.corda.node.services.messaging.createMessage import net.corda.node.services.network.NetworkMapService import net.corda.testing.node.MockNetwork -import org.junit.After import net.corda.testing.resetTestSerialization +import org.junit.After import org.junit.Before import org.junit.Test import java.util.* @@ -70,7 +69,7 @@ class InMemoryMessagingTests { } // Node 1 sends a message and it should end up in finalDelivery, after we run the network - node1.network.send(node1.network.createMessage("test.topic", DEFAULT_SESSION_ID, bits), node2.network.myAddress) + node1.network.send(node1.network.createMessage("test.topic", data = bits), node2.network.myAddress) mockNet.runNetwork(rounds = 1) @@ -87,7 +86,7 @@ class InMemoryMessagingTests { var counter = 0 listOf(node1, node2, node3).forEach { it.network.addMessageHandler { _, _ -> counter++ } } - node1.network.send(node2.network.createMessage("test.topic", DEFAULT_SESSION_ID, bits), mockNet.messagingNetwork.everyoneOnline) + node1.network.send(node2.network.createMessage("test.topic", data = bits), mockNet.messagingNetwork.everyoneOnline) mockNet.runNetwork(rounds = 1) assertEquals(3, counter) } @@ -106,8 +105,8 @@ class InMemoryMessagingTests { received++ } - val invalidMessage = node2.network.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0)) - val validMessage = node2.network.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0)) + val invalidMessage = node2.network.createMessage("invalid_message", data = ByteArray(0)) + val validMessage = node2.network.createMessage("valid_message", data = ByteArray(0)) node2.network.send(invalidMessage, node1.network.myAddress) mockNet.runNetwork() assertEquals(0, received) @@ -118,8 +117,8 @@ class InMemoryMessagingTests { // Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so // this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops - val invalidMessage2 = node2.network.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0)) - val validMessage2 = node2.network.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0)) + val invalidMessage2 = node2.network.createMessage("invalid_message", data = ByteArray(0)) + val validMessage2 = node2.network.createMessage("valid_message", data = ByteArray(0)) node2.network.send(invalidMessage2, node1.network.myAddress) node2.network.send(validMessage2, node1.network.myAddress) mockNet.runNetwork() diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 84c20bc453..098b7c41de 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -9,11 +9,9 @@ import net.corda.core.messaging.RPCOps import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl -import net.corda.node.services.api.DEFAULT_SESSION_ID import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate -import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.PersistentUniquenessProvider @@ -126,7 +124,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { val receivedMessages = LinkedBlockingQueue() val messagingClient = createAndStartClientAndServer(receivedMessages) - val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) + val message = messagingClient.createMessage(topic, data = "first msg".toByteArray()) messagingClient.send(message, messagingClient.myAddress) val actual: Message = receivedMessages.take() @@ -142,10 +140,10 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { val receivedMessages = LinkedBlockingQueue() val messagingClient = createAndStartClientAndServer(receivedMessages) - val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) + val message = messagingClient.createMessage(topic, data = "first msg".toByteArray()) messagingClient.send(message, messagingClient.myAddress) - val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) + val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, data = "second msg".toByteArray()) messagingClient.send(networkMapMessage, messagingClient.myAddress) val actual: Message = receivedMessages.take() @@ -167,11 +165,11 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { val messagingClient = createAndStartClientAndServer(receivedMessages) for (iter in 1..iterations) { - val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg $iter".toByteArray()) + val message = messagingClient.createMessage(topic, data = "first msg $iter".toByteArray()) messagingClient.send(message, messagingClient.myAddress) } - val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) + val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, data = "second msg".toByteArray()) messagingClient.send(networkMapMessage, messagingClient.myAddress) val actual: Message = receivedMessages.take() diff --git a/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt index 9d385513fb..c817fca191 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt @@ -6,8 +6,8 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.serialization.deserialize import net.corda.core.utilities.getOrThrow -import net.corda.node.services.api.DEFAULT_SESSION_ID import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.send import net.corda.node.services.messaging.sendRequest import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Added @@ -228,7 +228,7 @@ abstract class AbstractNetworkMapServiceTest private fun MockNode.subscribe(): Queue { val request = SubscribeRequest(true, network.myAddress) val updates = LinkedBlockingQueue() - services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ -> + services.networkService.addMessageHandler(PUSH_TOPIC) { message, _ -> updates += message.data.deserialize() } val response = services.networkService.sendRequest(SUBSCRIPTION_TOPIC, request, mapServiceNode.network.myAddress) @@ -246,7 +246,7 @@ abstract class AbstractNetworkMapServiceTest private fun MockNode.ackUpdate(mapVersion: Int) { val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress) - services.networkService.send(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress) + services.networkService.send(PUSH_ACK_TOPIC, MessagingService.DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress) mockNet.runNetwork() }