From 059056de65aee7b81d7251e04bb62091bb20a41a Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 24 Feb 2017 15:09:53 +0000 Subject: [PATCH] Removed "FLOW" from network map topic constants --- .../services/messaging/P2PSecurityTest.kt | 8 +++--- .../net/corda/node/internal/AbstractNode.kt | 3 +-- .../network/InMemoryNetworkMapCache.kt | 10 +++---- .../services/network/NetworkMapService.kt | 26 +++++++++---------- .../services/InMemoryNetworkMapServiceTest.kt | 12 +++------ .../messaging/ArtemisMessagingTests.kt | 6 ++--- .../net/corda/netmap/NetworkMapVisualiser.kt | 2 +- 7 files changed, 31 insertions(+), 36 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index 7bc3da78ea..d51286cb45 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -2,14 +2,16 @@ package net.corda.services.messaging import com.google.common.util.concurrent.ListenableFuture import kotlinx.support.jdk7.use -import net.corda.core.* import net.corda.core.crypto.Party +import net.corda.core.div +import net.corda.core.getOrThrow import net.corda.core.node.NodeInfo +import net.corda.core.random63BitValue +import net.corda.core.seconds import net.corda.flows.sendRequest import net.corda.node.internal.NetworkMapInfo import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapService -import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NodeRegistration import net.corda.node.utilities.AddOrRemove @@ -63,6 +65,6 @@ class P2PSecurityTest : NodeBasedTest() { val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public)) val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX) val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress) - return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapNode.net.myAddress) + return net.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.net.myAddress) } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ed60495ccd..dbcf24d300 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -33,7 +33,6 @@ import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.NetworkMapService -import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC import net.corda.node.services.network.NetworkMapService.RegistrationResponse import net.corda.node.services.network.NodeRegistration import net.corda.node.services.network.PersistentNetworkMapService @@ -416,7 +415,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires) val legalIdentityKey = obtainLegalIdentityKey() val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress) - return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddress) + return net.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress) } /** This is overriden by the mock node implementation to enable operation without any network map service */ diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index ae30fe3cfd..b539414fe6 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -21,8 +21,6 @@ import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor import net.corda.flows.sendRequest -import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC -import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC import net.corda.node.services.network.NetworkMapService.FetchMapResponse import net.corda.node.services.network.NetworkMapService.SubscribeResponse import net.corda.node.utilities.AddOrRemove @@ -83,10 +81,10 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach ifChangedSinceVer: Int?): ListenableFuture { if (subscribe && !registeredForPush) { // Add handler to the network, for updates received from the remote network map service. - net.addMessageHandler(NetworkMapService.PUSH_FLOW_TOPIC, DEFAULT_SESSION_ID) { message, r -> + net.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, r -> try { val req = message.data.deserialize() - val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID, + val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, NetworkMapService.UpdateAcknowledge(req.mapVersion, net.myAddress).serialize().bytes) net.send(ackMessage, req.replyTo) processUpdatePush(req) @@ -101,7 +99,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach // Fetch the network map and register for updates at the same time val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress) - val future = net.sendRequest(FETCH_FLOW_TOPIC, req, networkMapAddress).map { resp -> + val future = net.sendRequest(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { resp -> // We may not receive any nodes back, if the map hasn't changed since the version specified resp.nodes?.forEach { processRegistration(it) } Unit @@ -136,7 +134,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture { // Fetch the network map and register for updates at the same time val req = NetworkMapService.SubscribeRequest(false, net.myAddress) - val future = net.sendRequest(SUBSCRIPTION_FLOW_TOPIC, req, service.address).map { + val future = net.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, service.address).map { if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() } _registrationFuture.setFuture(future) diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt index 9a128243f6..8d8e6302cf 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt @@ -51,16 +51,16 @@ import javax.annotation.concurrent.ThreadSafe interface NetworkMapService { companion object { - val DEFAULT_EXPIRATION_PERIOD = Period.ofWeeks(4) - val FETCH_FLOW_TOPIC = "platform.network_map.fetch" - val QUERY_FLOW_TOPIC = "platform.network_map.query" - val REGISTER_FLOW_TOPIC = "platform.network_map.register" - val SUBSCRIPTION_FLOW_TOPIC = "platform.network_map.subscribe" + val DEFAULT_EXPIRATION_PERIOD: Period = Period.ofWeeks(4) + val FETCH_TOPIC = "platform.network_map.fetch" + val QUERY_TOPIC = "platform.network_map.query" + val REGISTER_TOPIC = "platform.network_map.register" + val SUBSCRIPTION_TOPIC = "platform.network_map.subscribe" // Base topic used when pushing out updates to the network map. Consumed, for example, by the map cache. // When subscribing to these updates, remember they must be acknowledged - val PUSH_FLOW_TOPIC = "platform.network_map.push" + val PUSH_TOPIC = "platform.network_map.push" // Base topic for messages acknowledging pushed updates - val PUSH_ACK_FLOW_TOPIC = "platform.network_map.push_ack" + val PUSH_ACK_TOPIC = "platform.network_map.push_ack" val logger = loggerFor() @@ -153,19 +153,19 @@ abstract class AbstractNetworkMapService protected fun setup() { // Register message handlers - handlers += addMessageHandler(NetworkMapService.FETCH_FLOW_TOPIC, + handlers += addMessageHandler(NetworkMapService.FETCH_TOPIC, { req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) } ) - handlers += addMessageHandler(NetworkMapService.QUERY_FLOW_TOPIC, + handlers += addMessageHandler(NetworkMapService.QUERY_TOPIC, { req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) } ) - handlers += addMessageHandler(NetworkMapService.REGISTER_FLOW_TOPIC, + handlers += addMessageHandler(NetworkMapService.REGISTER_TOPIC, { req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) } ) - handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_FLOW_TOPIC, + handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_TOPIC, { req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) } ) - handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID) { message, r -> + handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r -> val req = message.data.deserialize() processAcknowledge(req) } @@ -211,7 +211,7 @@ abstract class AbstractNetworkMapService // to a MessageRecipientGroup that nodes join/leave, rather than the network map // service itself managing the group val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bytes - val message = net.createMessage(NetworkMapService.PUSH_FLOW_TOPIC, DEFAULT_SESSION_ID, update) + val message = net.createMessage(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID, update) subscribers.locked { val toRemove = mutableListOf() diff --git a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt index b0a0ebab1a..1eb68509ff 100644 --- a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt @@ -10,10 +10,6 @@ import net.corda.node.services.network.AbstractNetworkMapService import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.* -import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC -import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_FLOW_TOPIC -import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC -import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC import net.corda.node.services.network.NodeRegistration import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.databaseTransaction @@ -167,23 +163,23 @@ abstract class AbstractNetworkMapServiceTest { private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture { val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress) - return services.networkService.sendRequest(REGISTER_FLOW_TOPIC, req, mapServiceNode.info.address) + return services.networkService.sendRequest(NetworkMapService.REGISTER_TOPIC, req, mapServiceNode.info.address) } private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture { val req = SubscribeRequest(subscribe, services.networkService.myAddress) - return services.networkService.sendRequest(SUBSCRIPTION_FLOW_TOPIC, req, mapServiceNode.info.address) + return services.networkService.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, mapServiceNode.info.address) } private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) { val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress) - services.networkService.send(PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address) + services.networkService.send(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address) } private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future?> { val net = services.networkService val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress) - return net.sendRequest(FETCH_FLOW_TOPIC, req, mapServiceNode.info.address).map { it.nodes } + return net.sendRequest(NetworkMapService.FETCH_TOPIC, req, mapServiceNode.info.address).map { it.nodes } } } diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index ee0a271848..bf3f50de36 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -149,7 +149,7 @@ class ArtemisMessagingTests { val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) messagingClient.send(message, messagingClient.myAddress) - val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_FLOW_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) + val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) messagingClient.send(networkMapMessage, messagingClient.myAddress) val actual: Message = receivedMessages.take() @@ -175,7 +175,7 @@ class ArtemisMessagingTests { messagingClient.send(message, messagingClient.myAddress) } - val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_FLOW_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) + val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) messagingClient.send(networkMapMessage, messagingClient.myAddress) val actual: Message = receivedMessages.take() @@ -208,7 +208,7 @@ class ArtemisMessagingTests { messagingClient.addMessageHandler(topic) { message, r -> receivedMessages.add(message) } - messagingClient.addMessageHandler(NetworkMapService.FETCH_FLOW_TOPIC) { message, r -> + messagingClient.addMessageHandler(NetworkMapService.FETCH_TOPIC) { message, r -> receivedMessages.add(message) } // Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered. diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt index 0f3feab89d..1982774a92 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt @@ -349,7 +349,7 @@ class NetworkMapVisualiser : Application() { // Loopback messages are boring. if (transfer.sender == transfer.recipients) return false // Network map push acknowledgements are boring. - if (NetworkMapService.PUSH_ACK_FLOW_TOPIC in transfer.message.topicSession.topic) return false + if (NetworkMapService.PUSH_ACK_TOPIC in transfer.message.topicSession.topic) return false val message = transfer.message.data.deserialize() return when (message) { is SessionEnd -> false