Retire HostAndPort (#962)

* Don't attempt to parse a resolved InetSocketAddress toString
* A mock node isn't reachable via an address
This commit is contained in:
Andrzej Cichocki
2017-07-07 15:11:07 +01:00
committed by GitHub
parent 7822118835
commit cefa14507a
61 changed files with 290 additions and 266 deletions

View File

@ -1,6 +1,5 @@
package net.corda.node.services
import com.google.common.net.HostAndPort
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.ErrorOr
import net.corda.core.contracts.ContractState
@ -13,6 +12,7 @@ import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
@ -53,7 +53,7 @@ class BFTNotaryServiceTests {
serviceType.id,
clusterName)
val bftNotaryService = ServiceInfo(serviceType, clusterName)
val notaryClusterAddresses = replicaIds.map { HostAndPort.fromParts("localhost", 11000 + it * 10) }
val notaryClusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) }
replicaIds.forEach { replicaId ->
mockNet.createNode(
node.network.myAddress,

View File

@ -1,7 +1,6 @@
package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.toBase58String
@ -11,6 +10,7 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.crypto.random63BitValue
import net.corda.testing.ALICE
import net.corda.testing.BOB
@ -144,13 +144,13 @@ abstract class MQSecurityTest : NodeBasedTest() {
assertAllQueueCreationAttacksFail(randomQueue)
}
fun clientTo(target: HostAndPort, sslConfiguration: SSLConfiguration? = configureTestSSL()): SimpleMQClient {
fun clientTo(target: NetworkHostAndPort, sslConfiguration: SSLConfiguration? = configureTestSSL()): SimpleMQClient {
val client = SimpleMQClient(target, sslConfiguration)
clients += client
return client
}
fun loginToRPC(target: HostAndPort, rpcUser: User, sslConfiguration: SSLConfiguration? = null): CordaRPCOps {
fun loginToRPC(target: NetworkHostAndPort, rpcUser: User, sslConfiguration: SSLConfiguration? = null): CordaRPCOps {
return CordaRPCClient(target, sslConfiguration).start(rpcUser.username, rpcUser.password).proxy
}

View File

@ -3,7 +3,6 @@ package net.corda.node.internal
import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
@ -24,6 +23,7 @@ import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.flows.*
import net.corda.node.services.*
@ -621,7 +621,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
/** Return list of node's addresses. It's overridden in MockNetwork as we don't have real addresses for MockNodes. */
protected abstract fun myAddresses(): List<HostAndPort>
protected abstract fun myAddresses(): List<NetworkHostAndPort>
/** This is overriden by the mock node implementation to enable operation without any network map service */
protected open fun noNetworkMapConfigured(): ListenableFuture<Unit> {

View File

@ -1,7 +1,6 @@
package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
@ -12,7 +11,9 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.core.success
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.parseNetworkHostAndPort
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.serialization.NodeClock
@ -155,21 +156,21 @@ open class Node(override val configuration: FullNodeConfiguration,
advertisedAddress)
}
private fun makeLocalMessageBroker(): HostAndPort {
private fun makeLocalMessageBroker(): NetworkHostAndPort {
with(configuration) {
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, rpcAddress?.port, services.networkMapCache, userService)
return HostAndPort.fromParts("localhost", p2pAddress.port)
return NetworkHostAndPort("localhost", p2pAddress.port)
}
}
private fun getAdvertisedAddress(): HostAndPort {
private fun getAdvertisedAddress(): NetworkHostAndPort {
return with(configuration) {
val useHost = if (detectPublicIp) {
tryDetectIfNotPublicHost(p2pAddress.host) ?: p2pAddress.host
} else {
p2pAddress.host
}
HostAndPort.fromParts(useHost, p2pAddress.port)
NetworkHostAndPort(useHost, p2pAddress.port)
}
}
@ -201,7 +202,7 @@ open class Node(override val configuration: FullNodeConfiguration,
* it back to the queue.
* - Once the message is received the session is closed and the queue deleted.
*/
private fun discoverPublicHost(serverAddress: HostAndPort): String? {
private fun discoverPublicHost(serverAddress: NetworkHostAndPort): String? {
log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" }
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
@ -227,14 +228,14 @@ open class Node(override val configuration: FullNodeConfiguration,
val consumer = session.createConsumer(queueName)
val artemisMessage: ClientMessage = consumer.receive(10.seconds.toMillis()) ?:
throw IOException("Did not receive a response from the Network Map Service at $serverAddress")
val publicHostAndPort = HostAndPort.fromString(artemisMessage.getStringProperty(ipDetectResponseProperty))
val publicHostAndPort = artemisMessage.getStringProperty(ipDetectResponseProperty)
log.info("Detected public address: $publicHostAndPort")
consumer.close()
session.deleteQueue(queueName)
clientFactory.close()
return publicHostAndPort.host.removePrefix("/")
return publicHostAndPort.removePrefix("/").parseNetworkHostAndPort().host
}
override fun startMessagingService(rpcOps: RPCOps) {
@ -257,7 +258,7 @@ open class Node(override val configuration: FullNodeConfiguration,
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
override fun myAddresses(): List<HostAndPort> {
override fun myAddresses(): List<NetworkHostAndPort> {
val address = network.myAddress as ArtemisMessagingComponent.ArtemisPeerAddress
return listOf(address.hostAndPort)
}
@ -359,4 +360,4 @@ open class Node(override val configuration: FullNodeConfiguration,
class ConfigurationException(message: String) : Exception(message)
data class NetworkMapInfo(val address: HostAndPort, val legalName: X500Name)
data class NetworkMapInfo(val address: NetworkHostAndPort, val legalName: X500Name)

View File

@ -1,7 +1,7 @@
package net.corda.node.services.config
import com.google.common.net.HostAndPort
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.node.services.network.NetworkMapService
@ -27,8 +27,8 @@ interface NodeConfiguration : NodeSSLConfiguration {
val verifierType: VerifierType
val messageRedeliveryDelaySeconds: Int
val bftReplicaId: Int?
val notaryNodeAddress: HostAndPort?
val notaryClusterAddresses: List<HostAndPort>
val notaryNodeAddress: NetworkHostAndPort?
val notaryClusterAddresses: List<NetworkHostAndPort>
}
data class FullNodeConfiguration(
@ -50,15 +50,15 @@ data class FullNodeConfiguration(
override val messageRedeliveryDelaySeconds: Int = 30,
val useHTTPS: Boolean,
@OldConfig("artemisAddress")
val p2pAddress: HostAndPort,
val rpcAddress: HostAndPort?,
val p2pAddress: NetworkHostAndPort,
val rpcAddress: NetworkHostAndPort?,
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
val messagingServerAddress: HostAndPort?,
val messagingServerAddress: NetworkHostAndPort?,
val extraAdvertisedServiceIds: List<String>,
override val bftReplicaId: Int?,
override val notaryNodeAddress: HostAndPort?,
override val notaryClusterAddresses: List<HostAndPort>,
override val notaryNodeAddress: NetworkHostAndPort?,
override val notaryClusterAddresses: List<NetworkHostAndPort>,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
override val devMode: Boolean = false,
val useTestClock: Boolean = false,

View File

@ -1,6 +1,5 @@
package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import io.netty.handler.ssl.SslHandler
@ -11,6 +10,7 @@ import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node
@ -376,7 +376,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) =
ArtemisTcpTransport.tcpTransport(connectionDirection, HostAndPort.fromParts(host, port), config, enableSSL = enableSSL)
ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL)
/**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
@ -384,7 +384,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
private fun deployBridge(queueName: String, target: HostAndPort, legalName: X500Name) {
private fun deployBridge(queueName: String, target: NetworkHostAndPort, legalName: X500Name) {
val connectionDirection = ConnectionDirection.Outbound(
connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name,
expectedCommonName = legalName
@ -420,7 +420,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
private fun getBridgeName(queueName: String, hostAndPort: HostAndPort): String = "$queueName -> $hostAndPort"
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
// This is called on one of Artemis' background threads
internal fun hostVerificationFail(expectedLegalName: X500Name, errorMsg: String?) {

View File

@ -1,6 +1,5 @@
package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.*
import net.corda.core.crypto.random63BitValue
@ -12,6 +11,7 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.utilities.opaque
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
@ -71,13 +71,13 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration,
val versionInfo: VersionInfo,
val serverAddress: HostAndPort,
val serverAddress: NetworkHostAndPort,
val myIdentity: PublicKey?,
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService,
advertisedAddress: HostAndPort = serverAddress
advertisedAddress: NetworkHostAndPort = serverAddress
) : ArtemisMessagingComponent(), MessagingService {
companion object {
private val log = loggerFor<NodeMessagingClient>()

View File

@ -1,7 +1,7 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import net.corda.core.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import java.io.FileWriter
@ -17,14 +17,14 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
* Each instance of this class creates such a configHome, accessible via [path].
* The files are deleted on [close] typically via [use], see [PathManager] for details.
*/
class BFTSMaRtConfig(private val replicaAddresses: List<HostAndPort>, debug: Boolean = false) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, debug: Boolean = false) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
companion object {
private val log = loggerFor<BFTSMaRtConfig>()
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
}
init {
val claimedPorts = mutableSetOf<HostAndPort>()
val claimedPorts = mutableSetOf<NetworkHostAndPort>()
val n = replicaAddresses.size
(0 until n).forEach { replicaId ->
// Each replica claims the configured port and the next one:
@ -66,7 +66,7 @@ class BFTSMaRtConfig(private val replicaAddresses: List<HostAndPort>, debug: Boo
log.debug { "Replica $peerId is ready for P2P." }
}
private fun replicaPorts(replicaId: Int): List<HostAndPort> {
private fun replicaPorts(replicaId: Int): List<NetworkHostAndPort> {
val base = replicaAddresses[replicaId]
return BFTSMaRtPort.values().map { it.ofReplica(base) }
}
@ -76,10 +76,10 @@ private enum class BFTSMaRtPort(private val off: Int) {
FOR_CLIENTS(0),
FOR_REPLICAS(1);
fun ofReplica(base: HostAndPort) = HostAndPort.fromParts(base.host, base.port + off)
fun ofReplica(base: NetworkHostAndPort) = NetworkHostAndPort(base.host, base.port + off)
}
private fun HostAndPort.isListening() = try {
private fun NetworkHostAndPort.isListening() = try {
Socket(host, port).use { true } // Will cause one error to be logged in the replica on success.
} catch (e: SocketException) {
false

View File

@ -1,6 +1,5 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import io.atomix.catalyst.buffer.BufferInput
import io.atomix.catalyst.buffer.BufferOutput
import io.atomix.catalyst.serializer.Serializer
@ -48,13 +47,13 @@ class RaftUniquenessProvider(services: ServiceHubInternal) : UniquenessProvider,
/** Directory storing the Raft log and state machine snapshots */
private val storagePath: Path = services.configuration.baseDirectory
/** Address of the Copycat node run by this Corda node */
private val myAddress: HostAndPort = services.configuration.notaryNodeAddress
private val myAddress = services.configuration.notaryNodeAddress
?: throw IllegalArgumentException("notaryNodeAddress must be specified in configuration")
/**
* List of node addresses in the existing Copycat cluster. At least one active node must be
* provided to join the cluster. If empty, a new cluster will be bootstrapped.
*/
private val clusterAddresses: List<HostAndPort> = services.configuration.notaryClusterAddresses
private val clusterAddresses = services.configuration.notaryClusterAddresses
/** The database to store the state machine state in */
private val db: Database = services.database
/** SSL configuration */

View File

@ -1,9 +0,0 @@
package net.corda.node.utilities
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import java.nio.file.Path
import java.nio.file.Paths
fun Config.getHostAndPort(name: String): HostAndPort = HostAndPort.fromString(getString(name))
fun Config.getPath(name: String): Path = Paths.get(getString(name))

View File

@ -1,7 +1,7 @@
package net.corda.node.services.config
import com.google.common.net.HostAndPort
import net.corda.core.crypto.commonName
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.ALICE
import net.corda.nodeapi.User
import net.corda.testing.node.makeTestDataSourceProperties
@ -25,8 +25,8 @@ class FullNodeConfigurationTest {
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
useHTTPS = false,
p2pAddress = HostAndPort.fromParts("localhost", 0),
rpcAddress = HostAndPort.fromParts("localhost", 1),
p2pAddress = NetworkHostAndPort("localhost", 0),
rpcAddress = NetworkHostAndPort("localhost", 1),
messagingServerAddress = null,
extraAdvertisedServiceIds = emptyList(),
bftReplicaId = null,

View File

@ -1,13 +1,14 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.generateKeyPair
import net.corda.core.messaging.RPCOps
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.ALICE
import net.corda.testing.LogHelper
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
@ -20,7 +21,6 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.transaction
import net.corda.testing.ALICE
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.freePort
import net.corda.testing.node.MOCK_VERSION_INFO
@ -218,7 +218,7 @@ class ArtemisMessagingTests {
return messagingClient
}
private fun createMessagingClient(server: HostAndPort = HostAndPort.fromParts("localhost", serverPort)): NodeMessagingClient {
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort)): NodeMessagingClient {
return database.transaction {
NodeMessagingClient(
config,

View File

@ -1,6 +1,6 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.transactions.BFTSMaRtConfig.Companion.portIsClaimedFormat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
@ -28,7 +28,7 @@ class BFTSMaRtConfigTests {
@Test
fun `overlapping port ranges are rejected`() {
fun addresses(vararg ports: Int) = ports.map { HostAndPort.fromParts("localhost", it) }
fun addresses(vararg ports: Int) = ports.map { NetworkHostAndPort("localhost", it) }
assertThatThrownBy { BFTSMaRtConfig(addresses(11000, 11001)).use {} }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessage(portIsClaimedFormat.format("localhost:11001", setOf("localhost:11000", "localhost:11001")))

View File

@ -1,6 +1,5 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import io.atomix.catalyst.transport.Address
import io.atomix.copycat.client.ConnectionStrategies
import io.atomix.copycat.client.CopycatClient
@ -8,6 +7,7 @@ import io.atomix.copycat.server.CopycatServer
import io.atomix.copycat.server.storage.Storage
import io.atomix.copycat.server.storage.StorageLevel
import net.corda.core.getOrThrow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.LogHelper
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.configureDatabase
@ -87,7 +87,7 @@ class DistributedImmutableMapTests {
return cluster.map { it.getOrThrow() }
}
private fun createReplica(myAddress: HostAndPort, clusterAddress: HostAndPort? = null): CompletableFuture<Member> {
private fun createReplica(myAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): CompletableFuture<Member> {
val storage = Storage.builder().withStorageLevel(StorageLevel.MEMORY).build()
val address = Address(myAddress.host, myAddress.port)