Reverted incorrect additional advertised RPC address in NodeInfo (#2417)

This commit is contained in:
Michele Sollecito 2018-01-24 14:42:07 +00:00 committed by GitHub
parent 61c7de22d6
commit 3c0e006456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 21 additions and 49 deletions

View File

@ -9,7 +9,6 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingClient
@ -178,8 +177,7 @@ class AMQPBridgeTest {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, userService, MAX_MESSAGE_SIZE)
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
@ -208,8 +206,7 @@ class AMQPBridgeTest {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, networkMap, userService, MAX_MESSAGE_SIZE)
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, networkMap, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()

View File

@ -12,7 +12,6 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.netty.AMQPClient
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
@ -235,8 +234,7 @@ class ProtonWrapperTests {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
}
val userService = rigorousMock<RPCSecurityManager>()
val server = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, userService, MAX_MESSAGE_SIZE)
val server = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE)
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
server.start()
client.start()

View File

@ -151,7 +151,7 @@ open class Node(configuration: NodeConfiguration,
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
val advertisedAddress = info.addresses.first()
val advertisedAddress = info.addresses.single()
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
rpcMessagingClient = RPCMessagingClient(configuration.rpcOptions.sslConfig, rpcServerAddresses.admin, networkParameters.maxMessageSize)
@ -190,21 +190,13 @@ open class Node(configuration: NodeConfiguration,
private fun makeLocalMessageBroker(): NetworkHostAndPort {
with(configuration) {
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, services.networkMapCache, securityManager, networkParameters.maxMessageSize)
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, services.networkMapCache, networkParameters.maxMessageSize)
return NetworkHostAndPort("localhost", p2pAddress.port)
}
}
override fun myAddresses(): List<NetworkHostAndPort> {
val addresses = mutableListOf<NetworkHostAndPort>()
addresses.add(configuration.messagingServerAddress ?: getAdvertisedAddress())
rpcBroker?.addresses?.let {
addresses.add(it.primary)
if (it.admin != it.primary) {
addresses.add(it.admin)
}
}
return addresses
return listOf(configuration.messagingServerAddress ?: getAdvertisedAddress())
}
private fun getAdvertisedAddress(): NetworkHostAndPort {

View File

@ -153,7 +153,7 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress:
}
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
val address = node.addresses.first()
val address = node.addresses.single()
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
}

View File

@ -17,7 +17,6 @@ import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
import net.corda.node.internal.artemis.SecureArtemisConfiguration
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
@ -34,9 +33,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_TLS
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
@ -84,7 +80,6 @@ import javax.security.auth.spi.LoginModule
class ArtemisMessagingServer(private val config: NodeConfiguration,
private val p2pPort: Int,
val networkMapCache: NetworkMapCacheInternal,
val securityManager: RPCSecurityManager,
val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
@ -132,8 +127,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
// Artemis IO errors
@Throws(IOException::class, KeyStoreException::class)
private fun configureAndStartServer() {
val (artemisConfig, securityPlugin) = createArtemisConfig()
val securityManager = createArtemisSecurityManager(securityPlugin)
val artemisConfig = createArtemisConfig()
val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply {
// Throw any exceptions which are detected during startup
registerActivationFailureListener { exception -> throw exception }
@ -188,14 +183,13 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
* 3. RPC users. These are only given sufficient access to perform RPC with us.
* 4. Verifiers. These are given read access to the verification request queue and write access to the response queue.
*/
private fun ConfigurationImpl.configureAddressSecurity(): Pair<Configuration, LoginListener> {
private fun ConfigurationImpl.configureAddressSecurity(): Configuration {
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true))
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.#"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
val onLoginListener = { _: String -> }
return Pair(this, onLoginListener)
return this
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
@ -206,7 +200,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
}
@Throws(IOException::class, KeyStoreException::class)
private fun createArtemisSecurityManager(loginListener: LoginListener): ActiveMQJAASSecurityManager {
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val keyStore = config.loadSslKeyStore().internal
val trustStore = config.loadTrustStore().internal
@ -222,10 +216,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(
LoginListener::javaClass.name to loginListener,
RPCSecurityManager::class.java.name to securityManager,
NodeLoginModule.CERT_CHAIN_CHECKS_OPTION_NAME to certChecks)
val options = mapOf(NodeLoginModule.CERT_CHAIN_CHECKS_OPTION_NAME to certChecks)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
@ -236,7 +227,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
log.debug { "Queue created: $queueName, deploying bridge(s)" }
fun deployBridgeToPeer(nodeInfo: NodeInfo) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
val address = nodeInfo.addresses.first()
val address = nodeInfo.addresses.single()
bridgeManager.deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet())
}
@ -265,7 +256,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
private fun updateBridgesOnNetworkChange(change: MapChange) {
log.debug { "Updating bridges on network map change: ${change.node}" }
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
val address = node.addresses.first()
val address = node.addresses.single()
return node.legalIdentitiesAndCerts.map { NodeAddress(it.party.owningKey, address) }.asSequence()
}
@ -329,8 +320,6 @@ class NodeLoginModule : LoginModule {
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var securityManager: RPCSecurityManager
private lateinit var loginListener: LoginListener
private lateinit var peerCertCheck: CertificateChainCheckPolicy.Check
private lateinit var nodeCertCheck: CertificateChainCheckPolicy.Check
private lateinit var verifierCertCheck: CertificateChainCheckPolicy.Check
@ -339,8 +328,6 @@ class NodeLoginModule : LoginModule {
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
securityManager = options[RPCSecurityManager::class.java.name] as RPCSecurityManager
loginListener = options[LoginListener::javaClass.name] as LoginListener
val certChainChecks: Map<String, CertificateChainCheckPolicy.Check> = uncheckedCast(options[CERT_CHAIN_CHECKS_OPTION_NAME])
peerCertCheck = certChainChecks[PEER_ROLE]!!
nodeCertCheck = certChainChecks[NODE_ROLE]!!
@ -439,6 +426,4 @@ class NodeLoginModule : LoginModule {
private fun clear() {
loginSucceeded = false
}
}
typealias LoginListener = (String) -> Unit
}

View File

@ -43,7 +43,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
}
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
val address = node.addresses.first()
val address = node.addresses.single()
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
}

View File

@ -523,7 +523,7 @@ class P2PMessagingClient(config: NodeConfiguration,
// TODO Rethink PartyInfo idea and merging PeerAddress/ServiceAddress (the only difference is that Service address doesn't hold host and port)
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
is PartyInfo.SingleNode -> NodeAddress(partyInfo.party.owningKey, partyInfo.addresses.first())
is PartyInfo.SingleNode -> NodeAddress(partyInfo.party.owningKey, partyInfo.addresses.single())
is PartyInfo.DistributedNode -> ServiceAddress(partyInfo.party.owningKey)
}
}

View File

@ -184,7 +184,7 @@ class ArtemisMessagingTest {
}
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, networkMapCache, securityManager, maxMessageSize).apply {
return ArtemisMessagingServer(config, local, networkMapCache, maxMessageSize).apply {
config.configureWithDevSSLCertificate()
messagingServer = this
}

View File

@ -28,13 +28,13 @@ class DriverTests {
val DUMMY_REGULATOR_NAME = CordaX500Name("Regulator A", "Paris", "FR")
val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
fun nodeMustBeUp(handleFuture: CordaFuture<out NodeHandle>) = handleFuture.getOrThrow().apply {
val hostAndPort = nodeInfo.addresses.first()
val hostAndPort = nodeInfo.addresses.single()
// Check that the port is bound
addressMustBeBound(executorService, hostAndPort, (this as? NodeHandle.OutOfProcess)?.process)
}
fun nodeMustBeDown(handle: NodeHandle) {
val hostAndPort = handle.nodeInfo.addresses.first()
val hostAndPort = handle.nodeInfo.addresses.single()
// Check that the port is bound
addressMustNotBeBound(executorService, hostAndPort)
}