Removed "FLOW" from network map topic constants

This commit is contained in:
Shams Asari 2017-02-24 15:09:53 +00:00
parent cc61be5b6a
commit 059056de65
7 changed files with 31 additions and 36 deletions

View File

@ -2,14 +2,16 @@ package net.corda.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk7.use
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.flows.sendRequest
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
@ -63,6 +65,6 @@ class P2PSecurityTest : NodeBasedTest() {
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public))
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress)
return net.sendRequest<NetworkMapService.RegistrationResponse>(REGISTER_FLOW_TOPIC, request, networkMapNode.net.myAddress)
return net.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.net.myAddress)
}
}

View File

@ -33,7 +33,6 @@ import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
import net.corda.node.services.network.NodeRegistration
import net.corda.node.services.network.PersistentNetworkMapService
@ -416,7 +415,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires)
val legalIdentityKey = obtainLegalIdentityKey()
val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress)
return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddress)
return net.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress)
}
/** This is overriden by the mock node implementation to enable operation without any network map service */

View File

@ -21,8 +21,6 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.flows.sendRequest
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
@ -83,10 +81,10 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_FLOW_TOPIC, DEFAULT_SESSION_ID) { message, r ->
net.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, r ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID,
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID,
NetworkMapService.UpdateAcknowledge(req.mapVersion, net.myAddress).serialize().bytes)
net.send(ackMessage, req.replyTo)
processUpdatePush(req)
@ -101,7 +99,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress)
val future = net.sendRequest<FetchMapResponse>(FETCH_FLOW_TOPIC, req, networkMapAddress).map { resp ->
val future = net.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { resp ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
Unit
@ -136,7 +134,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, net.myAddress)
val future = net.sendRequest<SubscribeResponse>(SUBSCRIPTION_FLOW_TOPIC, req, service.address).map {
val future = net.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, service.address).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
}
_registrationFuture.setFuture(future)

View File

@ -51,16 +51,16 @@ import javax.annotation.concurrent.ThreadSafe
interface NetworkMapService {
companion object {
val DEFAULT_EXPIRATION_PERIOD = Period.ofWeeks(4)
val FETCH_FLOW_TOPIC = "platform.network_map.fetch"
val QUERY_FLOW_TOPIC = "platform.network_map.query"
val REGISTER_FLOW_TOPIC = "platform.network_map.register"
val SUBSCRIPTION_FLOW_TOPIC = "platform.network_map.subscribe"
val DEFAULT_EXPIRATION_PERIOD: Period = Period.ofWeeks(4)
val FETCH_TOPIC = "platform.network_map.fetch"
val QUERY_TOPIC = "platform.network_map.query"
val REGISTER_TOPIC = "platform.network_map.register"
val SUBSCRIPTION_TOPIC = "platform.network_map.subscribe"
// Base topic used when pushing out updates to the network map. Consumed, for example, by the map cache.
// When subscribing to these updates, remember they must be acknowledged
val PUSH_FLOW_TOPIC = "platform.network_map.push"
val PUSH_TOPIC = "platform.network_map.push"
// Base topic for messages acknowledging pushed updates
val PUSH_ACK_FLOW_TOPIC = "platform.network_map.push_ack"
val PUSH_ACK_TOPIC = "platform.network_map.push_ack"
val logger = loggerFor<NetworkMapService>()
@ -153,19 +153,19 @@ abstract class AbstractNetworkMapService
protected fun setup() {
// Register message handlers
handlers += addMessageHandler(NetworkMapService.FETCH_FLOW_TOPIC,
handlers += addMessageHandler(NetworkMapService.FETCH_TOPIC,
{ req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.QUERY_FLOW_TOPIC,
handlers += addMessageHandler(NetworkMapService.QUERY_TOPIC,
{ req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.REGISTER_FLOW_TOPIC,
handlers += addMessageHandler(NetworkMapService.REGISTER_TOPIC,
{ req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_FLOW_TOPIC,
handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_TOPIC,
{ req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) }
)
handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID) { message, r ->
handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r ->
val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>()
processAcknowledge(req)
}
@ -211,7 +211,7 @@ abstract class AbstractNetworkMapService
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bytes
val message = net.createMessage(NetworkMapService.PUSH_FLOW_TOPIC, DEFAULT_SESSION_ID, update)
val message = net.createMessage(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID, update)
subscribers.locked {
val toRemove = mutableListOf<SingleMessageRecipient>()

View File

@ -10,10 +10,6 @@ import net.corda.node.services.network.AbstractNetworkMapService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.*
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.databaseTransaction
@ -167,23 +163,23 @@ abstract class AbstractNetworkMapServiceTest {
private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<RegistrationResponse> {
val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress)
return services.networkService.sendRequest(REGISTER_FLOW_TOPIC, req, mapServiceNode.info.address)
return services.networkService.sendRequest(NetworkMapService.REGISTER_TOPIC, req, mapServiceNode.info.address)
}
private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture<SubscribeResponse> {
val req = SubscribeRequest(subscribe, services.networkService.myAddress)
return services.networkService.sendRequest(SUBSCRIPTION_FLOW_TOPIC, req, mapServiceNode.info.address)
return services.networkService.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, mapServiceNode.info.address)
}
private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) {
val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
services.networkService.send(PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
services.networkService.send(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
}
private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> {
val net = services.networkService
val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress)
return net.sendRequest<FetchMapResponse>(FETCH_FLOW_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
return net.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
}
}

View File

@ -149,7 +149,7 @@ class ArtemisMessagingTests {
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_FLOW_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
@ -175,7 +175,7 @@ class ArtemisMessagingTests {
messagingClient.send(message, messagingClient.myAddress)
}
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_FLOW_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
@ -208,7 +208,7 @@ class ArtemisMessagingTests {
messagingClient.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
messagingClient.addMessageHandler(NetworkMapService.FETCH_FLOW_TOPIC) { message, r ->
messagingClient.addMessageHandler(NetworkMapService.FETCH_TOPIC) { message, r ->
receivedMessages.add(message)
}
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.

View File

@ -349,7 +349,7 @@ class NetworkMapVisualiser : Application() {
// Loopback messages are boring.
if (transfer.sender == transfer.recipients) return false
// Network map push acknowledgements are boring.
if (NetworkMapService.PUSH_ACK_FLOW_TOPIC in transfer.message.topicSession.topic) return false
if (NetworkMapService.PUSH_ACK_TOPIC in transfer.message.topicSession.topic) return false
val message = transfer.message.data.deserialize<Any>()
return when (message) {
is SessionEnd -> false