Move default session ID into messaging service

Move default session ID into messaging service to clean up the API. This means it no longer shows up in a Java class (ServiceHubInternalKt) as its only member, and is instead scoped into the service that actually uses the value. This does result in a MessagingService.Companion object (as MessagingService is an interface), however this is probably the most sensible solution.
This commit is contained in:
Ross Nicoll 2017-08-25 16:23:05 +01:00 committed by GitHub
parent a35d835d38
commit 0fb4465c10
10 changed files with 40 additions and 45 deletions

View File

@ -14,7 +14,6 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.Node
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
@ -167,7 +166,7 @@ class P2PMessagingTest : NodeBasedTest() {
distributedServiceNodes.forEach {
val nodeName = it.info.legalIdentity.name
var ignoreRequests = false
it.network.addMessageHandler(dummyTopic, DEFAULT_SESSION_ID) { netMessage, _ ->
it.network.addMessageHandler(dummyTopic) { netMessage, _ ->
requestsReceived.incrementAndGet()
firstRequestReceived.countDown()
// The node which receives the first request will ignore all requests
@ -210,7 +209,7 @@ class P2PMessagingTest : NodeBasedTest() {
}
private fun Node.respondWith(message: Any) {
network.addMessageHandler(javaClass.name, DEFAULT_SESSION_ID) { netMessage, _ ->
network.addMessageHandler(javaClass.name) { netMessage, _ ->
val request = netMessage.data.deserialize<TestRequest>()
val response = network.createMessage(javaClass.name, request.sessionID, message.serialize().bytes)
network.send(response, request.replyTo)

View File

@ -27,7 +27,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
addMessageHandler(topic: String,
crossinline handler: (Q) -> R,
crossinline exceptionConsumer: (Message, Exception) -> Unit): MessageHandlerRegistration {
return network.addMessageHandler(topic, DEFAULT_SESSION_ID) { message, _ ->
return network.addMessageHandler(topic, MessagingService.DEFAULT_SESSION_ID) { message, _ ->
try {
val request = message.data.deserialize<Q>()
val response = handler(request)

View File

@ -24,12 +24,6 @@ import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.CordaPersistence
/**
* Session ID to use for services listening for the first message in a session (before a
* specific session ID has been established).
*/
val DEFAULT_SESSION_ID = 0L
interface NetworkMapCacheInternal : NetworkMapCache {
/**
* Deregister from updates from the given map service.

View File

@ -9,7 +9,6 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.services.api.DEFAULT_SESSION_ID
import org.bouncycastle.asn1.x500.X500Name
import java.time.Instant
import java.util.*
@ -28,6 +27,14 @@ import javax.annotation.concurrent.ThreadSafe
*/
@ThreadSafe
interface MessagingService {
companion object {
/**
* Session ID to use for services listening for the first message in a session (before a
* specific session ID has been established).
*/
val DEFAULT_SESSION_ID = 0L
}
/**
* The provided function will be invoked for each received message whose topic matches the given string. The callback
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
@ -112,7 +119,7 @@ interface MessagingService {
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
* construction of a session, use [DEFAULT_SESSION_ID].
*/
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
fun MessagingService.createMessage(topic: String, sessionID: Long = MessagingService.DEFAULT_SESSION_ID, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
/**
@ -191,8 +198,8 @@ interface MessageHandlerRegistration
* a session is established, use [DEFAULT_SESSION_ID].
*/
@CordaSerializable
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
data class TopicSession(val topic: String, val sessionID: Long = MessagingService.DEFAULT_SESSION_ID) {
fun isBlank() = topic.isBlank() && sessionID == MessagingService.DEFAULT_SESSION_ID
override fun toString(): String = "$topic.$sessionID"
}

View File

@ -4,7 +4,6 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
import net.corda.node.services.api.DEFAULT_SESSION_ID
/**
* Abstract superclass for request messages sent to services which expect a reply.
@ -23,6 +22,6 @@ fun <R : Any> MessagingService.sendRequest(topic: String,
request: ServiceRequestMessage,
target: MessageRecipients): CordaFuture<R> {
val responseFuture = onNext<R>(topic, request.sessionID)
send(topic, DEFAULT_SESSION_ID, request, target)
send(topic, MessagingService.DEFAULT_SESSION_ID, request, target)
return responseFuture
}

View File

@ -1,10 +1,10 @@
package net.corda.node.services.network
import net.corda.core.internal.VisibleForTesting
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
@ -17,7 +17,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.api.NetworkCacheError
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.messaging.MessagingService
@ -99,11 +98,11 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
ifChangedSinceVer: Int?): CordaFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
network.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID,
NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes)
val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC,
data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes)
network.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch(e: NodeMapError) {

View File

@ -1,12 +1,12 @@
package net.corda.node.services.network
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.ThreadBox
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
@ -20,9 +20,9 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.AbstractNodeService
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.MessageHandlerRegistration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.ServiceRequestMessage
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.network.NetworkMapService.*
@ -172,7 +172,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal,
handlers += addMessageHandler(QUERY_TOPIC) { req: QueryIdentityRequest -> processQueryRequest(req) }
handlers += addMessageHandler(REGISTER_TOPIC) { req: RegistrationRequest -> processRegistrationRequest(req) }
handlers += addMessageHandler(SUBSCRIPTION_TOPIC) { req: SubscribeRequest -> processSubscriptionRequest(req) }
handlers += network.addMessageHandler(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
handlers += network.addMessageHandler(PUSH_ACK_TOPIC) { message, _ ->
val req = message.data.deserialize<UpdateAcknowledge>()
processAcknowledge(req)
}
@ -290,7 +290,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal,
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, newMapVersion, network.myAddress).serialize().bytes
val message = network.createMessage(PUSH_TOPIC, DEFAULT_SESSION_ID, update)
val message = network.createMessage(PUSH_TOPIC, data = update)
subscribers.locked {
// Remove any stale subscribers

View File

@ -1,14 +1,13 @@
package net.corda.node.messaging
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.TopicStringValidator
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.node.MockNetwork
import org.junit.After
import net.corda.testing.resetTestSerialization
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
@ -70,7 +69,7 @@ class InMemoryMessagingTests {
}
// Node 1 sends a message and it should end up in finalDelivery, after we run the network
node1.network.send(node1.network.createMessage("test.topic", DEFAULT_SESSION_ID, bits), node2.network.myAddress)
node1.network.send(node1.network.createMessage("test.topic", data = bits), node2.network.myAddress)
mockNet.runNetwork(rounds = 1)
@ -87,7 +86,7 @@ class InMemoryMessagingTests {
var counter = 0
listOf(node1, node2, node3).forEach { it.network.addMessageHandler { _, _ -> counter++ } }
node1.network.send(node2.network.createMessage("test.topic", DEFAULT_SESSION_ID, bits), mockNet.messagingNetwork.everyoneOnline)
node1.network.send(node2.network.createMessage("test.topic", data = bits), mockNet.messagingNetwork.everyoneOnline)
mockNet.runNetwork(rounds = 1)
assertEquals(3, counter)
}
@ -106,8 +105,8 @@ class InMemoryMessagingTests {
received++
}
val invalidMessage = node2.network.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0))
val validMessage = node2.network.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0))
val invalidMessage = node2.network.createMessage("invalid_message", data = ByteArray(0))
val validMessage = node2.network.createMessage("valid_message", data = ByteArray(0))
node2.network.send(invalidMessage, node1.network.myAddress)
mockNet.runNetwork()
assertEquals(0, received)
@ -118,8 +117,8 @@ class InMemoryMessagingTests {
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
// this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops
val invalidMessage2 = node2.network.createMessage("invalid_message", DEFAULT_SESSION_ID, ByteArray(0))
val validMessage2 = node2.network.createMessage("valid_message", DEFAULT_SESSION_ID, ByteArray(0))
val invalidMessage2 = node2.network.createMessage("invalid_message", data = ByteArray(0))
val validMessage2 = node2.network.createMessage("valid_message", data = ByteArray(0))
node2.network.send(invalidMessage2, node1.network.myAddress)
node2.network.send(validMessage2, node1.network.myAddress)
mockNet.runNetwork()

View File

@ -9,11 +9,9 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider
@ -126,7 +124,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
val message = messagingClient.createMessage(topic, data = "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
@ -142,10 +140,10 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
val message = messagingClient.createMessage(topic, data = "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, data = "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
@ -167,11 +165,11 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
val messagingClient = createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg $iter".toByteArray())
val message = messagingClient.createMessage(topic, data = "first msg $iter".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
}
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, data = "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()

View File

@ -6,8 +6,8 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.send
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Added
@ -228,7 +228,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
private fun MockNode.subscribe(): Queue<Update> {
val request = SubscribeRequest(true, network.myAddress)
val updates = LinkedBlockingQueue<Update>()
services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
services.networkService.addMessageHandler(PUSH_TOPIC) { message, _ ->
updates += message.data.deserialize<Update>()
}
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.network.myAddress)
@ -246,7 +246,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
private fun MockNode.ackUpdate(mapVersion: Int) {
val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
services.networkService.send(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress)
services.networkService.send(PUSH_ACK_TOPIC, MessagingService.DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress)
mockNet.runNetwork()
}