mirror of
https://github.com/corda/corda.git
synced 2025-02-11 21:26:23 +00:00
Merge branch 'release/os/4.9' into shams-4.10-merge-e6a80822
# Conflicts: # .github/workflows/check-pr-title.yml # .snyk # node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt # node/src/integration-test/kotlin/net/corda/node/amqp/AMQPClientSslErrorsTest.kt # node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
This commit is contained in:
commit
3a6deeefa7
2
.github/workflows/check-pr-title.yml
vendored
2
.github/workflows/check-pr-title.yml
vendored
@ -9,6 +9,6 @@ jobs:
|
||||
steps:
|
||||
- uses: morrisoncole/pr-lint-action@v1.6.1
|
||||
with:
|
||||
title-regex: '^((CORDA|AG|EG|ENT|INFRA|NAAS|ES)-\d+|NOTICK)(.*)'
|
||||
title-regex: '^((CORDA|AG|EG|ENT|INFRA|ES)-\d+|NOTICK)(.*)'
|
||||
on-failed-regex-comment: "PR title failed to match regex -> `%regex%`"
|
||||
repo-token: "${{ secrets.GITHUB_TOKEN }}"
|
||||
|
4
.snyk
4
.snyk
@ -159,7 +159,7 @@ ignore:
|
||||
assessment. Liquibase is used to apply the database migration changes.
|
||||
XML files are used here to define the changes not YAML and therefore
|
||||
the Corda node itself is not exposed to this deserialisation
|
||||
vulnerability.
|
||||
vulnerability.
|
||||
expires: 2023-07-12T17:00:51.957Z
|
||||
created: 2022-12-29T17:00:51.970Z
|
||||
SNYK-JAVA-ORGYAML-3016889:
|
||||
@ -180,7 +180,7 @@ ignore:
|
||||
- '*':
|
||||
reason: >-
|
||||
H2 console is not enabled for any of the applications we are running.
|
||||
When it comes to DB connectivity parameters, we do not allow changing
|
||||
When it comes to DB connectivity parameters, we do not allow changing
|
||||
them as they are supplied by Corda Node configuration file.
|
||||
expires: 2023-07-28T11:36:39.068Z
|
||||
created: 2022-12-29T11:36:39.089Z
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.client.rpc
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
||||
import net.corda.client.rpc.internal.SerializationEnvironmentHelper
|
||||
@ -52,7 +53,7 @@ class CordaRPCConnection private constructor(
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
): CordaRPCConnection {
|
||||
val observersPool: ExecutorService = Executors.newCachedThreadPool()
|
||||
val observersPool: ExecutorService = Executors.newCachedThreadPool(DefaultThreadFactory("RPCObserver"))
|
||||
return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps(
|
||||
addresses,
|
||||
username,
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.client.rpc.internal
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.client.rpc.ConnectionFailureException
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
@ -99,7 +100,8 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
ErrorInterceptingHandler(reconnectingRPCConnection)) as CordaRPCOps
|
||||
}
|
||||
}
|
||||
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
|
||||
private val retryFlowsPool = Executors.newScheduledThreadPool(1, DefaultThreadFactory("FlowRetry"))
|
||||
|
||||
/**
|
||||
* This function runs a flow and retries until it completes successfully.
|
||||
*
|
||||
|
@ -0,0 +1,29 @@
|
||||
package net.corda.coretests.crypto.internal
|
||||
|
||||
import net.corda.coretesting.internal.DEV_ROOT_CA
|
||||
import net.corda.testing.core.createCRL
|
||||
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
|
||||
import org.junit.Test
|
||||
|
||||
class ProviderMapTest {
|
||||
// https://github.com/corda/corda/pull/3997
|
||||
@Test(timeout = 300_000)
|
||||
fun `verify CRL algorithms`() {
|
||||
val crl = createCRL(
|
||||
issuer = DEV_ROOT_CA,
|
||||
revokedCerts = emptyList(),
|
||||
signatureAlgorithm = "SHA256withECDSA"
|
||||
)
|
||||
// This should pass.
|
||||
crl.verify(DEV_ROOT_CA.keyPair.public)
|
||||
|
||||
// Try changing the algorithm to EC will fail.
|
||||
assertThatIllegalArgumentException().isThrownBy {
|
||||
createCRL(
|
||||
issuer = DEV_ROOT_CA,
|
||||
revokedCerts = emptyList(),
|
||||
signatureAlgorithm = "EC"
|
||||
)
|
||||
}.withMessage("Unknown signature type requested: EC")
|
||||
}
|
||||
}
|
@ -65,7 +65,7 @@ interface ServicesForResolution {
|
||||
/**
|
||||
* Given a [Set] of [StateRef]'s loads the referenced transaction and looks up the specified output [ContractState].
|
||||
*
|
||||
* @throws TransactionResolutionException if [stateRef] points to a non-existent transaction.
|
||||
* @throws TransactionResolutionException if any of the [stateRefs] point to a non-existent transaction.
|
||||
*/
|
||||
// TODO: future implementation to use a Vault state ref -> contract state BLOB table and perform single query bulk load
|
||||
// as the existing transaction store will become encrypted at some point
|
||||
|
@ -1,3 +1,5 @@
|
||||
@file:Suppress("LongParameterList")
|
||||
|
||||
package net.corda.core.node.services
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
@ -197,8 +199,7 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
||||
* 4) Status types used in this query: [StateStatus.UNCONSUMED], [StateStatus.CONSUMED], [StateStatus.ALL].
|
||||
* 5) Other results as a [List] of any type (eg. aggregate function results with/without group by).
|
||||
*
|
||||
* Note: currently otherResults are used only for Aggregate Functions (in which case, the states and statesMetadata
|
||||
* results will be empty).
|
||||
* Note: currently [otherResults] is used only for aggregate functions (in which case, [states] and [statesMetadata] will be empty).
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class Page<out T : ContractState>(val states: List<StateAndRef<T>>,
|
||||
@ -213,11 +214,11 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
||||
val contractStateClassName: String,
|
||||
val recordedTime: Instant,
|
||||
val consumedTime: Instant?,
|
||||
val status: Vault.StateStatus,
|
||||
val status: StateStatus,
|
||||
val notary: AbstractParty?,
|
||||
val lockId: String?,
|
||||
val lockUpdateTime: Instant?,
|
||||
val relevancyStatus: Vault.RelevancyStatus? = null,
|
||||
val relevancyStatus: RelevancyStatus? = null,
|
||||
val constraintInfo: ConstraintInfo? = null
|
||||
) {
|
||||
fun copy(
|
||||
@ -225,7 +226,7 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
||||
contractStateClassName: String = this.contractStateClassName,
|
||||
recordedTime: Instant = this.recordedTime,
|
||||
consumedTime: Instant? = this.consumedTime,
|
||||
status: Vault.StateStatus = this.status,
|
||||
status: StateStatus = this.status,
|
||||
notary: AbstractParty? = this.notary,
|
||||
lockId: String? = this.lockId,
|
||||
lockUpdateTime: Instant? = this.lockUpdateTime
|
||||
@ -237,11 +238,11 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
||||
contractStateClassName: String = this.contractStateClassName,
|
||||
recordedTime: Instant = this.recordedTime,
|
||||
consumedTime: Instant? = this.consumedTime,
|
||||
status: Vault.StateStatus = this.status,
|
||||
status: StateStatus = this.status,
|
||||
notary: AbstractParty? = this.notary,
|
||||
lockId: String? = this.lockId,
|
||||
lockUpdateTime: Instant? = this.lockUpdateTime,
|
||||
relevancyStatus: Vault.RelevancyStatus?
|
||||
relevancyStatus: RelevancyStatus?
|
||||
): StateMetadata {
|
||||
return StateMetadata(ref, contractStateClassName, recordedTime, consumedTime, status, notary, lockId, lockUpdateTime, relevancyStatus, ConstraintInfo(AlwaysAcceptAttachmentConstraint))
|
||||
}
|
||||
@ -249,9 +250,9 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
||||
|
||||
companion object {
|
||||
@Deprecated("No longer used. The vault does not emit empty updates")
|
||||
val NoUpdate = Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL, references = emptySet())
|
||||
val NoUpdate = Update(emptySet(), emptySet(), type = UpdateType.GENERAL, references = emptySet())
|
||||
@Deprecated("No longer used. The vault does not emit empty updates")
|
||||
val NoNotaryUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.NOTARY_CHANGE, references = emptySet())
|
||||
val NoNotaryUpdate = Update(emptySet(), emptySet(), type = UpdateType.NOTARY_CHANGE, references = emptySet())
|
||||
}
|
||||
}
|
||||
|
||||
@ -302,7 +303,7 @@ interface VaultService {
|
||||
fun whenConsumed(ref: StateRef): CordaFuture<Vault.Update<ContractState>> {
|
||||
val query = QueryCriteria.VaultQueryCriteria(
|
||||
stateRefs = listOf(ref),
|
||||
status = Vault.StateStatus.CONSUMED
|
||||
status = StateStatus.CONSUMED
|
||||
)
|
||||
val result = trackBy<ContractState>(query)
|
||||
val snapshot = result.snapshot.states
|
||||
@ -358,8 +359,8 @@ interface VaultService {
|
||||
/**
|
||||
* Helper function to determine spendable states and soft locking them.
|
||||
* Currently performance will be worse than for the hand optimised version in
|
||||
* [Cash.unconsumedCashStatesForSpending]. However, this is fully generic and can operate with custom [FungibleState]
|
||||
* and [FungibleAsset] states.
|
||||
* [net.corda.finance.workflows.asset.selection.AbstractCashSelection.unconsumedCashStatesForSpending]. However, this is fully generic
|
||||
* and can operate with custom [FungibleState] and [FungibleAsset] states.
|
||||
* @param lockId The [FlowLogic.runId]'s [UUID] of the current flow used to soft lock the states.
|
||||
* @param eligibleStatesQuery A custom query object that selects down to the appropriate subset of all states of the
|
||||
* [contractStateType]. e.g. by selecting on account, issuer, etc. The query is internally augmented with the
|
||||
|
@ -21,14 +21,29 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.hours
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.coretesting.internal.NettyTestClient
|
||||
import net.corda.coretesting.internal.NettyTestHandler
|
||||
import net.corda.coretesting.internal.NettyTestServer
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.createDevNodeCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.DEFAULT_IDENTITY_SIGNATURE_SCHEME
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME
|
||||
import net.corda.nodeapi.internal.crypto.checkValidity
|
||||
import net.corda.nodeapi.internal.crypto.getSupportedKey
|
||||
import net.corda.nodeapi.internal.crypto.loadOrCreateKeyStore
|
||||
import net.corda.nodeapi.internal.crypto.save
|
||||
import net.corda.nodeapi.internal.crypto.toBc
|
||||
import net.corda.nodeapi.internal.crypto.x509
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
import net.corda.nodeapi.internal.installDevNodeCaCertPath
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.init
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.keyManagerFactory
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.trustManagerFactory
|
||||
import net.corda.nodeapi.internal.registerDevP2pCertificates
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.serialization.internal.AllWhitelist
|
||||
import net.corda.serialization.internal.SerializationContextImpl
|
||||
import net.corda.serialization.internal.SerializationFactoryImpl
|
||||
@ -37,25 +52,16 @@ import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.coretesting.internal.NettyTestClient
|
||||
import net.corda.coretesting.internal.NettyTestHandler
|
||||
import net.corda.coretesting.internal.NettyTestServer
|
||||
import net.corda.testing.internal.createDevIntermediateCaCertPath
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.checkValidity
|
||||
import net.corda.nodeapi.internal.crypto.getSupportedKey
|
||||
import net.corda.nodeapi.internal.crypto.loadOrCreateKeyStore
|
||||
import net.corda.nodeapi.internal.crypto.save
|
||||
import net.corda.nodeapi.internal.crypto.toBc
|
||||
import net.corda.nodeapi.internal.crypto.x509
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
import net.corda.testing.internal.IS_OPENJ9
|
||||
import net.corda.testing.internal.createDevIntermediateCaCertPath
|
||||
import net.i2p.crypto.eddsa.EdDSAPrivateKey
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.asn1.x509.*
|
||||
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
|
||||
import org.bouncycastle.asn1.x509.BasicConstraints
|
||||
import org.bouncycastle.asn1.x509.CRLDistPoint
|
||||
import org.bouncycastle.asn1.x509.Extension
|
||||
import org.bouncycastle.asn1.x509.KeyUsage
|
||||
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
|
||||
import org.bouncycastle.jcajce.provider.asymmetric.edec.BCEdDSAPrivateKey
|
||||
import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PrivateKey
|
||||
import org.junit.Assume
|
||||
@ -74,10 +80,19 @@ import java.security.PrivateKey
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.X509Certificate
|
||||
import java.util.*
|
||||
import javax.net.ssl.*
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLParameters
|
||||
import javax.net.ssl.SSLServerSocket
|
||||
import javax.net.ssl.SSLSocket
|
||||
import javax.security.auth.x500.X500Principal
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertNull
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.test.fail
|
||||
|
||||
class X509UtilitiesTest {
|
||||
private companion object {
|
||||
@ -295,15 +310,10 @@ class X509UtilitiesTest {
|
||||
sslConfig.keyStore.get(true).registerDevP2pCertificates(MEGA_CORP.name, rootCa.certificate, intermediateCa)
|
||||
sslConfig.createTrustStore(rootCa.certificate)
|
||||
|
||||
val keyStore = sslConfig.keyStore.get()
|
||||
val trustStore = sslConfig.trustStore.get()
|
||||
|
||||
val context = SSLContext.getInstance("TLS")
|
||||
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
keyManagerFactory.init(keyStore)
|
||||
val keyManagerFactory = keyManagerFactory(sslConfig.keyStore.get())
|
||||
val keyManagers = keyManagerFactory.keyManagers
|
||||
val trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
|
||||
trustMgrFactory.init(trustStore)
|
||||
val trustMgrFactory = trustManagerFactory(sslConfig.trustStore.get())
|
||||
val trustManagers = trustMgrFactory.trustManagers
|
||||
context.init(keyManagers, trustManagers, newSecureRandom())
|
||||
|
||||
@ -388,15 +398,8 @@ class X509UtilitiesTest {
|
||||
sslConfig.keyStore.get(true).registerDevP2pCertificates(MEGA_CORP.name, rootCa.certificate, intermediateCa)
|
||||
sslConfig.createTrustStore(rootCa.certificate)
|
||||
|
||||
val keyStore = sslConfig.keyStore.get()
|
||||
val trustStore = sslConfig.trustStore.get()
|
||||
|
||||
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
keyManagerFactory.init(keyStore)
|
||||
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
|
||||
trustManagerFactory.init(trustStore)
|
||||
|
||||
val keyManagerFactory = keyManagerFactory(sslConfig.keyStore.get())
|
||||
val trustManagerFactory = trustManagerFactory(sslConfig.trustStore.get())
|
||||
|
||||
val sslServerContext = SslContextBuilder
|
||||
.forServer(keyManagerFactory)
|
||||
|
@ -42,8 +42,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
override fun start(): Started = synchronized(this) {
|
||||
check(started == null) { "start can't be called twice" }
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
|
||||
val backupTransports = backupServerAddressPool.map {
|
||||
p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace)
|
||||
val backupTransports = backupServerAddressPool.mapIndexed { index, address ->
|
||||
p2pConnectorTcpTransport(address, config, threadPoolName = "$threadPoolName-backup${index+1}", trace = trace)
|
||||
}
|
||||
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
|
@ -1,16 +1,18 @@
|
||||
@file:Suppress("LongParameterList")
|
||||
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.config.SslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.trustManagerFactory
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import java.nio.file.Path
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
class ArtemisTcpTransport {
|
||||
@ -23,6 +25,7 @@ class ArtemisTcpTransport {
|
||||
val TLS_VERSIONS = listOf("TLSv1.2")
|
||||
|
||||
const val SSL_HANDSHAKE_TIMEOUT_NAME = "Corda-SSLHandshakeTimeout"
|
||||
const val TRUST_MANAGER_FACTORY_NAME = "Corda-TrustManagerFactory"
|
||||
const val TRACE_NAME = "Corda-Trace"
|
||||
const val THREAD_POOL_NAME_NAME = "Corda-ThreadPoolName"
|
||||
|
||||
@ -30,7 +33,6 @@ class ArtemisTcpTransport {
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
|
||||
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
|
||||
private const val P2P_PROTOCOLS = "CORE,AMQP"
|
||||
|
||||
private const val RPC_PROTOCOLS = "CORE"
|
||||
|
||||
private fun defaultArtemisOptions(hostAndPort: NetworkHostAndPort, protocols: String) = mapOf(
|
||||
@ -39,46 +41,35 @@ class ArtemisTcpTransport {
|
||||
TransportConstants.PORT_PROP_NAME to hostAndPort.port,
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to protocols,
|
||||
TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME to (nodeSerializationEnv != null),
|
||||
TransportConstants.REMOTING_THREADS_PROPNAME to (if (nodeSerializationEnv != null) -1 else 1),
|
||||
// turn off direct delivery in Artemis - this is latency optimisation that can lead to
|
||||
//hick-ups under high load (CORDA-1336)
|
||||
TransportConstants.DIRECT_DELIVER to false)
|
||||
|
||||
private val defaultSSLOptions = mapOf(
|
||||
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
|
||||
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","))
|
||||
|
||||
private fun SslConfiguration.addToTransportOptions(options: MutableMap<String, Any>) {
|
||||
if (keyStore != null || trustStore != null) {
|
||||
options[TransportConstants.SSL_ENABLED_PROP_NAME] = true
|
||||
options[TransportConstants.NEED_CLIENT_AUTH_PROP_NAME] = true
|
||||
}
|
||||
keyStore?.let {
|
||||
with (it) {
|
||||
path.requireOnDefaultFileSystem()
|
||||
options.putAll(get().toKeyStoreTransportOptions(path))
|
||||
options[TransportConstants.KEYSTORE_TYPE_PROP_NAME] = "JKS"
|
||||
options[TransportConstants.KEYSTORE_PATH_PROP_NAME] = path
|
||||
options[TransportConstants.KEYSTORE_PASSWORD_PROP_NAME] = get().password
|
||||
}
|
||||
}
|
||||
trustStore?.let {
|
||||
with (it) {
|
||||
path.requireOnDefaultFileSystem()
|
||||
options.putAll(get().toTrustStoreTransportOptions(path))
|
||||
options[TransportConstants.TRUSTSTORE_TYPE_PROP_NAME] = "JKS"
|
||||
options[TransportConstants.TRUSTSTORE_PATH_PROP_NAME] = path
|
||||
options[TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME] = get().password
|
||||
}
|
||||
}
|
||||
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
|
||||
options[SSL_HANDSHAKE_TIMEOUT_NAME] = handshakeTimeout ?: DEFAULT_SSL_HANDSHAKE_TIMEOUT
|
||||
}
|
||||
|
||||
private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf(
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.KEYSTORE_TYPE_PROP_NAME to "JKS",
|
||||
TransportConstants.KEYSTORE_PATH_PROP_NAME to path,
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to password,
|
||||
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true)
|
||||
|
||||
private fun CertificateStore.toTrustStoreTransportOptions(path: Path) = mapOf(
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.TRUSTSTORE_TYPE_PROP_NAME to "JKS",
|
||||
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to path,
|
||||
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to password,
|
||||
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true)
|
||||
|
||||
private fun ClientRpcSslOptions.toTransportOptions() = mapOf(
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.TRUSTSTORE_TYPE_PROP_NAME to trustStoreProvider,
|
||||
@ -94,76 +85,110 @@ class ArtemisTcpTransport {
|
||||
|
||||
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: MutualSslConfiguration?,
|
||||
trustManagerFactory: TrustManagerFactory? = config?.trustStore?.get()?.let(::trustManagerFactory),
|
||||
enableSSL: Boolean = true,
|
||||
threadPoolName: String = "P2PServer",
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (enableSSL) {
|
||||
config?.addToTransportOptions(options)
|
||||
}
|
||||
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
|
||||
return createAcceptorTransport(
|
||||
hostAndPort,
|
||||
P2P_PROTOCOLS,
|
||||
options,
|
||||
trustManagerFactory,
|
||||
enableSSL,
|
||||
threadPoolName,
|
||||
trace,
|
||||
remotingThreads
|
||||
)
|
||||
}
|
||||
|
||||
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: MutualSslConfiguration?,
|
||||
enableSSL: Boolean = true,
|
||||
threadPoolName: String = "P2PClient",
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (enableSSL) {
|
||||
config?.addToTransportOptions(options)
|
||||
}
|
||||
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
|
||||
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace, remotingThreads)
|
||||
}
|
||||
|
||||
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: BrokerRpcSslOptions?,
|
||||
enableSSL: Boolean = true,
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
threadPoolName: String = "RPCServer",
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (config != null && enableSSL) {
|
||||
config.keyStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCServer", trace)
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, threadPoolName, trace, remotingThreads)
|
||||
}
|
||||
|
||||
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: ClientRpcSslOptions?,
|
||||
enableSSL: Boolean = true,
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (config != null && enableSSL) {
|
||||
config.trustStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace, remotingThreads)
|
||||
}
|
||||
|
||||
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: SslConfiguration,
|
||||
threadPoolName: String = "Internal-RPCClient",
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
config.addToTransportOptions(options)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, threadPoolName, trace, null)
|
||||
}
|
||||
|
||||
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: SslConfiguration,
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
threadPoolName: String = "Internal-RPCServer",
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
config.addToTransportOptions(options)
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCServer", trace)
|
||||
return createAcceptorTransport(
|
||||
hostAndPort,
|
||||
RPC_PROTOCOLS,
|
||||
options,
|
||||
trustManagerFactory(requireNotNull(config.trustStore).get()),
|
||||
true,
|
||||
threadPoolName,
|
||||
trace,
|
||||
remotingThreads
|
||||
)
|
||||
}
|
||||
|
||||
private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort,
|
||||
protocols: String,
|
||||
options: MutableMap<String, Any>,
|
||||
trustManagerFactory: TrustManagerFactory?,
|
||||
enableSSL: Boolean,
|
||||
threadPoolName: String,
|
||||
trace: Boolean): TransportConfiguration {
|
||||
trace: Boolean,
|
||||
remotingThreads: Int?): TransportConfiguration {
|
||||
// Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
|
||||
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0
|
||||
if (trustManagerFactory != null) {
|
||||
// NettyAcceptor only creates default TrustManagerFactorys with the provided trust store details. However, we need to use
|
||||
// more customised instances which use our revocation checkers, so we pass them in, to be picked up by Node(Open)SSLContextFactory.
|
||||
options[TRUST_MANAGER_FACTORY_NAME] = trustManagerFactory
|
||||
}
|
||||
return createTransport(
|
||||
"net.corda.node.services.messaging.NodeNettyAcceptorFactory",
|
||||
hostAndPort,
|
||||
@ -171,7 +196,8 @@ class ArtemisTcpTransport {
|
||||
options,
|
||||
enableSSL,
|
||||
threadPoolName,
|
||||
trace
|
||||
trace,
|
||||
remotingThreads
|
||||
)
|
||||
}
|
||||
|
||||
@ -180,15 +206,21 @@ class ArtemisTcpTransport {
|
||||
options: MutableMap<String, Any>,
|
||||
enableSSL: Boolean,
|
||||
threadPoolName: String,
|
||||
trace: Boolean): TransportConfiguration {
|
||||
trace: Boolean,
|
||||
remotingThreads: Int?): TransportConfiguration {
|
||||
if (enableSSL) {
|
||||
// This is required to stop Client checking URL address vs. Server provided certificate
|
||||
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
|
||||
}
|
||||
return createTransport(
|
||||
NodeNettyConnectorFactory::class.java.name,
|
||||
CordaNettyConnectorFactory::class.java.name,
|
||||
hostAndPort,
|
||||
protocols,
|
||||
options,
|
||||
enableSSL,
|
||||
threadPoolName,
|
||||
trace
|
||||
trace,
|
||||
remotingThreads
|
||||
)
|
||||
}
|
||||
|
||||
@ -198,13 +230,15 @@ class ArtemisTcpTransport {
|
||||
options: MutableMap<String, Any>,
|
||||
enableSSL: Boolean,
|
||||
threadPoolName: String,
|
||||
trace: Boolean): TransportConfiguration {
|
||||
trace: Boolean,
|
||||
remotingThreads: Int?): TransportConfiguration {
|
||||
options += defaultArtemisOptions(hostAndPort, protocols)
|
||||
if (enableSSL) {
|
||||
options += defaultSSLOptions
|
||||
// This is required to stop Client checking URL address vs. Server provided certificate
|
||||
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
|
||||
options[TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME] = CIPHER_SUITES.joinToString(",")
|
||||
options[TransportConstants.ENABLED_PROTOCOLS_PROP_NAME] = TLS_VERSIONS.joinToString(",")
|
||||
}
|
||||
// By default, use only one remoting thread in tests (https://github.com/corda/corda/pull/2357)
|
||||
options[TransportConstants.REMOTING_THREADS_PROPNAME] = remotingThreads ?: if (nodeSerializationEnv == null) 1 else -1
|
||||
options[THREAD_POOL_NAME_NAME] = threadPoolName
|
||||
options[TRACE_NAME] = trace
|
||||
return TransportConfiguration(className, options)
|
||||
|
@ -1,8 +1,14 @@
|
||||
@file:JvmName("ArtemisUtils")
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.internal.declaredField
|
||||
import org.apache.activemq.artemis.utils.actors.ProcessorBase
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ThreadFactory
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/**
|
||||
* Require that the [Path] is on a default file system, and therefore is one that Artemis is willing to use.
|
||||
@ -16,3 +22,29 @@ fun requireMessageSize(messageSize: Int, limit: Int) {
|
||||
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
|
||||
}
|
||||
|
||||
val Executor.rootExecutor: Executor get() {
|
||||
var executor: Executor = this
|
||||
while (executor is ProcessorBase<*>) {
|
||||
executor = executor.declaredField<Executor>("delegate").value
|
||||
}
|
||||
return executor
|
||||
}
|
||||
|
||||
fun Executor.setThreadPoolName(threadPoolName: String) {
|
||||
(rootExecutor as? ThreadPoolExecutor)?.let { it.threadFactory = NamedThreadFactory(threadPoolName, it.threadFactory) }
|
||||
}
|
||||
|
||||
private class NamedThreadFactory(poolName: String, private val delegate: ThreadFactory) : ThreadFactory {
|
||||
companion object {
|
||||
private val poolId = AtomicInteger(0)
|
||||
}
|
||||
|
||||
private val prefix = "$poolName-${poolId.incrementAndGet()}-"
|
||||
private val nextId = AtomicInteger(0)
|
||||
|
||||
override fun newThread(r: Runnable): Thread {
|
||||
val thread = delegate.newThread(r)
|
||||
thread.name = "$prefix${nextId.incrementAndGet()}"
|
||||
return thread
|
||||
}
|
||||
}
|
||||
|
@ -14,15 +14,16 @@ import org.apache.activemq.artemis.utils.ConfigurationHelper
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
|
||||
class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
class CordaNettyConnectorFactory : ConnectorFactory {
|
||||
override fun createConnector(configuration: MutableMap<String, Any>?,
|
||||
handler: BufferHandler?,
|
||||
listener: ClientConnectionLifeCycleListener?,
|
||||
closeExecutor: Executor?,
|
||||
threadPool: Executor?,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
closeExecutor: Executor,
|
||||
threadPool: Executor,
|
||||
scheduledThreadPool: ScheduledExecutorService,
|
||||
protocolManager: ClientProtocolManager?): Connector {
|
||||
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
|
||||
setThreadPoolName(threadPool, closeExecutor, scheduledThreadPool, threadPoolName)
|
||||
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||
return NettyConnector(
|
||||
configuration,
|
||||
@ -31,7 +32,7 @@ class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
closeExecutor,
|
||||
threadPool,
|
||||
scheduledThreadPool,
|
||||
MyClientProtocolManager(threadPoolName, trace)
|
||||
MyClientProtocolManager("$threadPoolName-netty", trace)
|
||||
)
|
||||
}
|
||||
|
||||
@ -39,6 +40,17 @@ class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
|
||||
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
|
||||
|
||||
private fun setThreadPoolName(threadPool: Executor, closeExecutor: Executor, scheduledThreadPool: ScheduledExecutorService, name: String) {
|
||||
threadPool.setThreadPoolName("$name-artemis")
|
||||
// Artemis will actually wrap the same backing Executor to create multiple "OrderedExecutors". In this scenerio both the threadPool
|
||||
// and the closeExecutor are the same when it comes to the pool names. If however they are different then given them separate names.
|
||||
if (threadPool.rootExecutor !== closeExecutor.rootExecutor) {
|
||||
closeExecutor.setThreadPoolName("$name-artemis-closer")
|
||||
}
|
||||
// The scheduler is separate
|
||||
scheduledThreadPool.setThreadPoolName("$name-artemis-scheduler")
|
||||
}
|
||||
|
||||
|
||||
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
|
||||
override fun addChannelHandlers(pipeline: ChannelPipeline) {
|
@ -0,0 +1,32 @@
|
||||
@file:Suppress("LongParameterList", "MagicNumber")
|
||||
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.core.utilities.seconds
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.BlockingQueue
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Creates a [ThreadPoolExecutor] which will use a maximum of [maxPoolSize] threads at any given time and will by default idle down to 0
|
||||
* threads.
|
||||
*/
|
||||
fun namedThreadPoolExecutor(maxPoolSize: Int,
|
||||
corePoolSize: Int = 0,
|
||||
idleKeepAlive: Duration = 30.seconds,
|
||||
workQueue: BlockingQueue<Runnable> = LinkedBlockingQueue(),
|
||||
poolName: String = "pool",
|
||||
daemonThreads: Boolean = false,
|
||||
threadPriority: Int = Thread.NORM_PRIORITY): ThreadPoolExecutor {
|
||||
return ThreadPoolExecutor(
|
||||
corePoolSize,
|
||||
maxPoolSize,
|
||||
idleKeepAlive.toNanos(),
|
||||
TimeUnit.NANOSECONDS,
|
||||
workQueue,
|
||||
DefaultThreadFactory(poolName, daemonThreads, threadPriority)
|
||||
)
|
||||
}
|
@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.slf4j.MDC
|
||||
import rx.Subscription
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
@ -53,7 +55,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
maxMessageSize: Int,
|
||||
revocationConfig: RevocationConfig,
|
||||
enableSNI: Boolean,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService? = null,
|
||||
trace: Boolean,
|
||||
sslHandshakeTimeout: Duration?,
|
||||
@ -78,9 +80,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
|
||||
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, revocationConfig,useOpenSSL, enableSNI, trace = trace, _sslHandshakeTimeout = sslHandshakeTimeout)
|
||||
private var sharedEventLoopGroup: EventLoopGroup? = null
|
||||
private var sslDelegatedTaskExecutor: ExecutorService? = null
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
private const val CORDA_NUM_BRIDGE_THREADS_PROP_NAME = "net.corda.nodeapi.amqpbridgemanager.NumBridgeThreads"
|
||||
|
||||
@ -97,18 +101,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
* however Artemis and the remote Corda instanced will deduplicate these messages.
|
||||
*/
|
||||
@Suppress("TooManyFunctions")
|
||||
private class AMQPBridge(val sourceX500Name: String,
|
||||
val queueName: String,
|
||||
val targets: List<NetworkHostAndPort>,
|
||||
val legalNames: Set<CordaX500Name>,
|
||||
private val amqpConfig: AMQPConfiguration,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
private val artemis: ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService?,
|
||||
private val bridgeConnectionTTLSeconds: Int) {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
private inner class AMQPBridge(val sourceX500Name: String,
|
||||
val queueName: String,
|
||||
val targets: List<NetworkHostAndPort>,
|
||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||
private val amqpConfig: AMQPConfiguration) {
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
val oldMDC = MDC.getCopyOfContextMap() ?: emptyMap<String, String>()
|
||||
@ -116,7 +113,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
MDC.put("queueName", queueName)
|
||||
MDC.put("source", amqpConfig.sourceX500Name)
|
||||
MDC.put("targets", targets.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString())
|
||||
block()
|
||||
} finally {
|
||||
@ -134,13 +131,18 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
|
||||
val amqpClient = AMQPClient(targets, legalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
|
||||
val amqpClient = AMQPClient(
|
||||
targets,
|
||||
allowedRemoteLegalNames,
|
||||
amqpConfig,
|
||||
AMQPClient.NettyThreading.Shared(sharedEventLoopGroup!!, sslDelegatedTaskExecutor!!)
|
||||
)
|
||||
private var session: ClientSession? = null
|
||||
private var consumer: ClientConsumer? = null
|
||||
private var connectedSubscription: Subscription? = null
|
||||
@Volatile
|
||||
private var messagesReceived: Boolean = false
|
||||
private val eventLoop: EventLoop = sharedEventGroup.next()
|
||||
private val eventLoop: EventLoop = sharedEventLoopGroup!!.next()
|
||||
private var artemisState: ArtemisState = ArtemisState.STOPPED
|
||||
set(value) {
|
||||
logDebugWithMDC { "State change $field to $value" }
|
||||
@ -152,32 +154,9 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
private var scheduledExecutorService: ScheduledExecutorService
|
||||
= Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("bridge-connection-reset-%d").build())
|
||||
|
||||
@Suppress("ClassNaming")
|
||||
private sealed class ArtemisState {
|
||||
object STARTING : ArtemisState()
|
||||
data class STARTED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
object CHECKING : ArtemisState()
|
||||
object RESTARTED : ArtemisState()
|
||||
object RECEIVING : ArtemisState()
|
||||
|
||||
object AMQP_STOPPED : ArtemisState()
|
||||
object AMQP_STARTING : ArtemisState()
|
||||
object AMQP_STARTED : ArtemisState()
|
||||
object AMQP_RESTARTED : ArtemisState()
|
||||
|
||||
object STOPPING : ArtemisState()
|
||||
object STOPPED : ArtemisState()
|
||||
data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
open val pending: ScheduledFuture<Unit>? = null
|
||||
|
||||
override fun toString(): String = javaClass.simpleName
|
||||
}
|
||||
|
||||
private fun artemis(inProgress: ArtemisState, block: (precedingState: ArtemisState) -> ArtemisState) {
|
||||
val runnable = {
|
||||
synchronized(artemis) {
|
||||
synchronized(artemis!!) {
|
||||
try {
|
||||
val precedingState = artemisState
|
||||
artemisState.pending?.cancel(false)
|
||||
@ -231,7 +210,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
ArtemisState.STOPPING
|
||||
}
|
||||
bridgeMetricsService?.bridgeDisconnected(targets, legalNames)
|
||||
bridgeMetricsService?.bridgeDisconnected(targets, allowedRemoteLegalNames)
|
||||
connectedSubscription?.unsubscribe()
|
||||
connectedSubscription = null
|
||||
// Do this last because we already scheduled the Artemis stop, so it's okay to unsubscribe onConnected first.
|
||||
@ -243,7 +222,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
if (connected) {
|
||||
logInfoWithMDC("Bridge Connected")
|
||||
|
||||
bridgeMetricsService?.bridgeConnected(targets, legalNames)
|
||||
bridgeMetricsService?.bridgeConnected(targets, allowedRemoteLegalNames)
|
||||
if (bridgeConnectionTTLSeconds > 0) {
|
||||
// AMQP outbound connection will be restarted periodically with bridgeConnectionTTLSeconds interval
|
||||
amqpRestartEvent = scheduledArtemisInExecutor(bridgeConnectionTTLSeconds.toLong(), TimeUnit.SECONDS,
|
||||
@ -253,7 +232,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
}
|
||||
artemis(ArtemisState.STARTING) {
|
||||
val startedArtemis = artemis.started
|
||||
val startedArtemis = artemis!!.started
|
||||
if (startedArtemis == null) {
|
||||
logInfoWithMDC("Bridge Connected but Artemis is disconnected")
|
||||
ArtemisState.STOPPED
|
||||
@ -286,7 +265,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
logInfoWithMDC("Bridge Disconnected")
|
||||
amqpRestartEvent?.cancel(false)
|
||||
if (artemisState != ArtemisState.AMQP_STARTING && artemisState != ArtemisState.STOPPED) {
|
||||
bridgeMetricsService?.bridgeDisconnected(targets, legalNames)
|
||||
bridgeMetricsService?.bridgeDisconnected(targets, allowedRemoteLegalNames)
|
||||
}
|
||||
artemis(ArtemisState.STOPPING) { precedingState: ArtemisState ->
|
||||
logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected")
|
||||
@ -418,10 +397,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
properties[key] = value
|
||||
}
|
||||
}
|
||||
logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty(MESSAGE_ID_KEY)}" }
|
||||
logDebugWithMDC { "Bridged Send to ${allowedRemoteLegalNames.first()} uuid: ${artemisMessage.getObjectProperty(MESSAGE_ID_KEY)}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(artemisMessage.payload(), peerInbox,
|
||||
legalNames.first().toString(),
|
||||
allowedRemoteLegalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
@ -457,6 +436,29 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("ClassNaming")
|
||||
private sealed class ArtemisState {
|
||||
object STARTING : ArtemisState()
|
||||
data class STARTED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
object CHECKING : ArtemisState()
|
||||
object RESTARTED : ArtemisState()
|
||||
object RECEIVING : ArtemisState()
|
||||
|
||||
object AMQP_STOPPED : ArtemisState()
|
||||
object AMQP_STARTING : ArtemisState()
|
||||
object AMQP_STARTED : ArtemisState()
|
||||
object AMQP_RESTARTED : ArtemisState()
|
||||
|
||||
object STOPPING : ArtemisState()
|
||||
object STOPPED : ArtemisState()
|
||||
data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
open val pending: ScheduledFuture<Unit>? = null
|
||||
|
||||
override fun toString(): String = javaClass.simpleName
|
||||
}
|
||||
|
||||
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||
lock.withLock {
|
||||
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
|
||||
@ -467,8 +469,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize,
|
||||
revocationConfig, useOpenSsl, enableSNI, sourceX500Name, trace, sslHandshakeTimeout) }
|
||||
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!,
|
||||
bridgeMetricsService, bridgeConnectionTTLSeconds)
|
||||
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig)
|
||||
bridges += newBridge
|
||||
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
||||
newBridge
|
||||
@ -486,7 +487,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
queueNamesToBridgesMap.remove(queueName)
|
||||
}
|
||||
bridge.stop()
|
||||
bridgeMetricsService?.bridgeDestroyed(bridge.targets, bridge.legalNames)
|
||||
bridgeMetricsService?.bridgeDestroyed(bridge.targets, bridge.allowedRemoteLegalNames)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -497,15 +498,16 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
// queueNamesToBridgesMap returns a mutable list, .toList converts it to a immutable list so it won't be changed by the [destroyBridge] method.
|
||||
val bridges = queueNamesToBridgesMap[queueName]?.toList()
|
||||
destroyBridge(queueName, bridges?.flatMap { it.targets } ?: emptyList())
|
||||
bridges?.map {
|
||||
it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.legalNames.toList(), serviceAddress = false)
|
||||
}?.toMap() ?: emptyMap()
|
||||
bridges?.associate {
|
||||
it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.allowedRemoteLegalNames.toList(), serviceAddress = false)
|
||||
} ?: emptyMap()
|
||||
}
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY))
|
||||
val artemis = artemisMessageClientFactory()
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("NettyBridge", Thread.MAX_PRIORITY))
|
||||
sslDelegatedTaskExecutor = sslDelegatedTaskExecutor("NettyBridge")
|
||||
val artemis = artemisMessageClientFactory("ArtemisBridge")
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
}
|
||||
@ -522,6 +524,8 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
sharedEventLoopGroup = null
|
||||
queueNamesToBridgesMap.clear()
|
||||
artemis?.stop()
|
||||
sslDelegatedTaskExecutor?.shutdown()
|
||||
sslDelegatedTaskExecutor = null
|
||||
}
|
||||
}
|
||||
}
|
@ -35,7 +35,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
maxMessageSize: Int,
|
||||
revocationConfig: RevocationConfig,
|
||||
enableSNI: Boolean,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||
bridgeMetricsService: BridgeMetricsService? = null,
|
||||
trace: Boolean = false,
|
||||
sslHandshakeTimeout: Duration? = null,
|
||||
@ -80,7 +80,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
bridgeNotifyQueue = "$BRIDGE_NOTIFY.$queueDisambiguityId"
|
||||
|
||||
bridgeManager.start()
|
||||
val artemis = artemisMessageClientFactory()
|
||||
val artemis = artemisMessageClientFactory("BridgeControl")
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
val artemisClient = artemis.started!!
|
||||
|
@ -37,7 +37,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
||||
maxMessageSize: Int,
|
||||
revocationConfig: RevocationConfig,
|
||||
enableSNI: Boolean,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService? = null,
|
||||
private val isLocalInbox: (String) -> Boolean,
|
||||
trace: Boolean,
|
||||
@ -204,7 +204,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
||||
|
||||
override fun start() {
|
||||
super.start()
|
||||
val artemis = artemisMessageClientFactory()
|
||||
val artemis = artemisMessageClientFactory("LoopbackBridge")
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
}
|
||||
|
@ -5,16 +5,37 @@ package net.corda.nodeapi.internal.crypto
|
||||
import net.corda.core.CordaOID
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.CertRole
|
||||
import net.corda.core.internal.SignedDataWithCert
|
||||
import net.corda.core.internal.reader
|
||||
import net.corda.core.internal.signWithCert
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.internal.validate
|
||||
import net.corda.core.internal.writer
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.core.utilities.toHex
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPointsToString
|
||||
import org.bouncycastle.asn1.*
|
||||
import org.bouncycastle.asn1.ASN1EncodableVector
|
||||
import org.bouncycastle.asn1.ASN1ObjectIdentifier
|
||||
import org.bouncycastle.asn1.ASN1Sequence
|
||||
import org.bouncycastle.asn1.DERSequence
|
||||
import org.bouncycastle.asn1.DERUTF8String
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.asn1.x500.style.BCStyle
|
||||
import org.bouncycastle.asn1.x509.*
|
||||
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
|
||||
import org.bouncycastle.asn1.x509.BasicConstraints
|
||||
import org.bouncycastle.asn1.x509.CRLDistPoint
|
||||
import org.bouncycastle.asn1.x509.DistributionPoint
|
||||
import org.bouncycastle.asn1.x509.DistributionPointName
|
||||
import org.bouncycastle.asn1.x509.Extension
|
||||
import org.bouncycastle.asn1.x509.GeneralName
|
||||
import org.bouncycastle.asn1.x509.GeneralNames
|
||||
import org.bouncycastle.asn1.x509.KeyPurposeId
|
||||
import org.bouncycastle.asn1.x509.KeyUsage
|
||||
import org.bouncycastle.asn1.x509.NameConstraints
|
||||
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
|
||||
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo
|
||||
import org.bouncycastle.cert.X509CertificateHolder
|
||||
import org.bouncycastle.cert.X509v3CertificateBuilder
|
||||
import org.bouncycastle.cert.bc.BcX509ExtensionUtils
|
||||
@ -32,8 +53,13 @@ import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.security.PublicKey
|
||||
import java.security.SignatureException
|
||||
import java.security.cert.*
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.Certificate
|
||||
import java.security.cert.CertificateException
|
||||
import java.security.cert.CertificateFactory
|
||||
import java.security.cert.TrustAnchor
|
||||
import java.security.cert.X509CRL
|
||||
import java.security.cert.X509Certificate
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
@ -359,7 +385,7 @@ object X509Utilities {
|
||||
|
||||
private fun addCrlInfo(builder: X509v3CertificateBuilder, crlDistPoint: String?, crlIssuer: X500Name?) {
|
||||
if (crlDistPoint != null) {
|
||||
val distPointName = DistributionPointName(GeneralNames(GeneralName(GeneralName.uniformResourceIdentifier, crlDistPoint)))
|
||||
val distPointName = DistributionPointName(toGeneralNames(crlDistPoint, GeneralName.uniformResourceIdentifier))
|
||||
val crlIssuerGeneralNames = crlIssuer?.let {
|
||||
GeneralNames(GeneralName(crlIssuer))
|
||||
}
|
||||
@ -379,6 +405,8 @@ object X509Utilities {
|
||||
bytes[0] = bytes[0].and(0x3F).or(0x40)
|
||||
return BigInteger(bytes)
|
||||
}
|
||||
|
||||
fun toGeneralNames(string: String, tag: Int = GeneralName.directoryName): GeneralNames = GeneralNames(GeneralName(tag, string))
|
||||
}
|
||||
|
||||
// Assuming cert type to role is 1:1
|
||||
|
@ -27,16 +27,17 @@ import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPChannelHandler.Companion.PROXY_LOGGER_NAME
|
||||
import net.corda.nodeapi.internal.requireMessageSize
|
||||
import net.corda.nodeapi.internal.revocation.CertDistPointCrlSource
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Long.min
|
||||
import java.net.InetSocketAddress
|
||||
import java.security.cert.CertPathValidatorException
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
enum class ProxyVersion {
|
||||
@ -63,8 +64,8 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
|
||||
class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||
private val configuration: AMQPConfiguration,
|
||||
private val sharedThreadPool: EventLoopGroup? = null,
|
||||
private val threadPoolName: String = "AMQPClient") : AutoCloseable {
|
||||
private val nettyThreading: NettyThreading = NettyThreading.NonShared("AMQPClient"),
|
||||
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON) : AutoCloseable {
|
||||
companion object {
|
||||
init {
|
||||
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
|
||||
@ -84,14 +85,12 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
private val lock = ReentrantLock()
|
||||
@Volatile
|
||||
private var started: Boolean = false
|
||||
private var workerGroup: EventLoopGroup? = null
|
||||
@Volatile
|
||||
private var clientChannel: Channel? = null
|
||||
// Offset into the list of targets, so that we can implement round-robin reconnect logic.
|
||||
private var targetIndex = 0
|
||||
private var currentTarget: NetworkHostAndPort = targets.first()
|
||||
private var retryInterval = MIN_RETRY_INTERVAL
|
||||
private val revocationChecker = configuration.revocationConfig.createPKIXRevocationChecker()
|
||||
private val handshakeFailureRetryTargets = mutableSetOf<NetworkHostAndPort>()
|
||||
private var retryingHandshakeFailures = false
|
||||
private var retryOffset = 0
|
||||
@ -172,7 +171,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
log.info("Failed to connect to $currentTarget", future.cause())
|
||||
|
||||
if (started) {
|
||||
workerGroup?.schedule({
|
||||
nettyThreading.eventLoopGroup.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
@ -191,7 +190,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
clientChannel = null
|
||||
if (started && !amqpActive) {
|
||||
log.debug { "Scheduling restart of $currentTarget (AMQP inactive)" }
|
||||
workerGroup?.schedule({
|
||||
nettyThreading.eventLoopGroup.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
@ -199,17 +198,16 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
}
|
||||
|
||||
private class ClientChannelInitializer(val parent: AMQPClient) : ChannelInitializer<SocketChannel>() {
|
||||
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
|
||||
private val keyManagerFactory = keyManagerFactory(parent.configuration.keyStore)
|
||||
private val trustManagerFactory = trustManagerFactoryWithRevocation(
|
||||
parent.configuration.trustStore,
|
||||
parent.configuration.revocationConfig,
|
||||
parent.distPointCrlSource
|
||||
)
|
||||
private val conf = parent.configuration
|
||||
@Volatile
|
||||
private lateinit var amqpChannelHandler: AMQPChannelHandler
|
||||
|
||||
init {
|
||||
keyManagerFactory.init(conf.keyStore)
|
||||
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, parent.revocationChecker))
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod")
|
||||
override fun initChannel(ch: SocketChannel) {
|
||||
val pipeline = ch.pipeline()
|
||||
@ -249,9 +247,22 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent |