mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Merge branch 'release/os/4.8' into shams-4.9-fwrd-merge-d140cb59
# Conflicts: # node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt # node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt # node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt # node/src/main/kotlin/net/corda/node/internal/artemis/CertificateChainCheckPolicy.kt
This commit is contained in:
@ -269,6 +269,8 @@ tasks.register('integrationTest', Test) {
|
||||
testClassesDirs = sourceSets.integrationTest.output.classesDirs
|
||||
classpath = sourceSets.integrationTest.runtimeClasspath
|
||||
maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger()
|
||||
// CertificateRevocationListNodeTests
|
||||
systemProperty 'net.corda.dpcrl.connect.timeout', '4000'
|
||||
}
|
||||
|
||||
tasks.register('slowIntegrationTest', Test) {
|
||||
|
@ -27,6 +27,7 @@ import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
import java.time.Duration
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
import kotlin.test.assertFalse
|
||||
@ -123,7 +124,7 @@ class AMQPClientSslErrorsTest(@Suppress("unused") private val iteration: Int) {
|
||||
override val keyStore = keyStore
|
||||
override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
override val sslHandshakeTimeout: Long = 3000
|
||||
override val sslHandshakeTimeout: Duration = 3.seconds
|
||||
}
|
||||
|
||||
clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -7,6 +7,8 @@ import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.coretesting.internal.rigorousMock
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.node.services.config.FlowTimeoutConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
@ -22,8 +24,6 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.coretesting.internal.rigorousMock
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.internal.MOCK_VERSION_INFO
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
|
||||
@ -57,7 +57,6 @@ class ArtemisMessagingTest {
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
// THe
|
||||
private val portAllocation = incrementalPortAllocation()
|
||||
private val serverPort = portAllocation.nextPort()
|
||||
private val identity = generateKeyPair()
|
||||
@ -201,7 +200,9 @@ class ArtemisMessagingTest {
|
||||
messagingClient!!.start(identity.public, null, maxMessageSize)
|
||||
}
|
||||
|
||||
private fun createAndStartClientAndServer(platformVersion: Int = 1, serverMaxMessageSize: Int = MAX_MESSAGE_SIZE, clientMaxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
|
||||
private fun createAndStartClientAndServer(platformVersion: Int = 1,
|
||||
serverMaxMessageSize: Int = MAX_MESSAGE_SIZE,
|
||||
clientMaxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
|
||||
val receivedMessages = LinkedBlockingQueue<ReceivedMessage>()
|
||||
|
||||
createMessagingServer(maxMessageSize = serverMaxMessageSize).start()
|
||||
@ -242,7 +243,7 @@ class ArtemisMessagingTest {
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null).apply {
|
||||
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
||||
|
@ -5,15 +5,14 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfigImpl
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.certPathToString
|
||||
import java.security.KeyStore
|
||||
import java.security.cert.CertPathValidator
|
||||
import java.security.cert.CertPathValidatorException
|
||||
import java.security.cert.CertificateException
|
||||
import java.security.cert.PKIXBuilderParameters
|
||||
import java.security.cert.PKIXRevocationChecker
|
||||
import java.security.cert.X509CertSelector
|
||||
import java.util.EnumSet
|
||||
|
||||
sealed class CertificateChainCheckPolicy {
|
||||
companion object {
|
||||
@ -94,13 +93,12 @@ sealed class CertificateChainCheckPolicy {
|
||||
}
|
||||
}
|
||||
|
||||
class RevocationCheck(val revocationMode: RevocationConfig.Mode) : CertificateChainCheckPolicy() {
|
||||
class RevocationCheck(val revocationConfig: RevocationConfig) : CertificateChainCheckPolicy() {
|
||||
constructor(revocationMode: RevocationConfig.Mode) : this(RevocationConfigImpl(revocationMode))
|
||||
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
|
||||
if (revocationMode == RevocationConfig.Mode.OFF) {
|
||||
return
|
||||
}
|
||||
// Convert javax.security.cert.X509Certificate to java.security.cert.X509Certificate.
|
||||
val chain = theirChain.map { X509CertificateFactory().generateCertificate(it.encoded.inputStream()) }
|
||||
log.info("Check Client Certpath:\r\n${certPathToString(chain.toTypedArray())}")
|
||||
@ -110,17 +108,7 @@ sealed class CertificateChainCheckPolicy {
|
||||
// See PKIXValidator.engineValidate() for reference implementation.
|
||||
val certPath = X509Utilities.buildCertPath(chain.dropLast(1))
|
||||
val certPathValidator = CertPathValidator.getInstance("PKIX")
|
||||
val pkixRevocationChecker = certPathValidator.revocationChecker as PKIXRevocationChecker
|
||||
pkixRevocationChecker.options = EnumSet.of(
|
||||
// Prefer CRL over OCSP
|
||||
PKIXRevocationChecker.Option.PREFER_CRLS,
|
||||
// Don't fall back to OCSP checking
|
||||
PKIXRevocationChecker.Option.NO_FALLBACK)
|
||||
if (revocationMode == RevocationConfig.Mode.SOFT_FAIL) {
|
||||
// Allow revocation check to succeed if the revocation status cannot be determined for one of
|
||||
// the following reasons: The CRL or OCSP response cannot be obtained because of a network error.
|
||||
pkixRevocationChecker.options = pkixRevocationChecker.options + PKIXRevocationChecker.Option.SOFT_FAIL
|
||||
}
|
||||
val pkixRevocationChecker = revocationConfig.createPKIXRevocationChecker()
|
||||
val params = PKIXBuilderParameters(trustStore, X509CertSelector())
|
||||
params.addCertPathChecker(pkixRevocationChecker)
|
||||
try {
|
||||
|
@ -56,7 +56,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
|
||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val messagingServerAddress: NetworkHostAndPort,
|
||||
private val maxMessageSize: Int,
|
||||
private val journalBufferTimeout : Int?) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||
private val journalBufferTimeout : Int?,
|
||||
private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
@ -130,7 +131,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
// The transaction cache is configurable, and drives other cache sizes.
|
||||
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
|
||||
|
||||
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions))
|
||||
acceptorConfigurations.add(p2pAcceptorTcpTransport(
|
||||
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
|
||||
config.p2pSslOptions,
|
||||
trace = trace
|
||||
))
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
||||
|
@ -0,0 +1,67 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator
|
||||
import io.netty.channel.group.ChannelGroup
|
||||
import io.netty.handler.logging.LogLevel
|
||||
import io.netty.handler.logging.LoggingHandler
|
||||
import io.netty.handler.ssl.SslHandler
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor
|
||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
|
||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutor
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
|
||||
@Suppress("unused") // Used via reflection in ArtemisTcpTransport
|
||||
class NodeNettyAcceptorFactory : AcceptorFactory {
|
||||
override fun createAcceptor(name: String?,
|
||||
clusterConnection: ClusterConnection?,
|
||||
configuration: Map<String, Any>,
|
||||
handler: BufferHandler?,
|
||||
listener: ServerConnectionLifeCycleListener?,
|
||||
threadPool: Executor,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?): Acceptor {
|
||||
val failureExecutor = OrderedExecutor(threadPool)
|
||||
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||
}
|
||||
|
||||
private class NodeNettyAcceptor(name: String?,
|
||||
clusterConnection: ClusterConnection?,
|
||||
configuration: Map<String, Any>,
|
||||
handler: BufferHandler?,
|
||||
listener: ServerConnectionLifeCycleListener?,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
failureExecutor: Executor,
|
||||
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?) :
|
||||
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||
{
|
||||
override fun start() {
|
||||
super.start()
|
||||
if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) {
|
||||
// Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the
|
||||
// serverChannelGroup field. This field is only available after start(), hence why we add the logger here.
|
||||
declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel ->
|
||||
channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getSslHandler(alloc: ByteBufAllocator?): SslHandler {
|
||||
val sslHandler = super.getSslHandler(alloc)
|
||||
val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration?
|
||||
if (handshakeTimeout != null) {
|
||||
sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis()
|
||||
}
|
||||
return sslHandler
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user