diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index b8c92b495d..5d904fcc4a 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -16,7 +16,6 @@ import javafx.beans.property.SimpleObjectProperty import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCConnection -import net.corda.client.rpc.RPCException import net.corda.core.contracts.ContractState import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party @@ -32,7 +31,6 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.seconds -import org.apache.activemq.artemis.api.core.ActiveMQException import rx.Observable import rx.Subscription import rx.subjects.PublishSubject @@ -135,7 +133,7 @@ class NodeMonitorModel { } } - val stateMachines = performRpcReconnect(nodeHostAndPort, username, password) + val stateMachines = performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = false) // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... @@ -156,9 +154,9 @@ class NodeMonitorModel { futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) } - private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List { + private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): List { - val connection = establishConnectionWithRetry(nodeHostAndPort, username, password) + val connection = establishConnectionWithRetry(nodeHostAndPort, username, password, shouldRetry) val proxy = connection.proxy val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed() @@ -176,7 +174,7 @@ class NodeMonitorModel { // force closing the connection to avoid propagation of notification to the server side. connection.forceClose() // Perform re-connect. - performRpcReconnect(nodeHostAndPort, username, password) + performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = true) }) retryableStateMachineUpdatesSubscription.set(subscription) @@ -186,7 +184,7 @@ class NodeMonitorModel { return stateMachineInfos } - private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection { + private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): CordaRPCConnection { val retryInterval = 5.seconds @@ -205,21 +203,15 @@ class NodeMonitorModel { require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) _connection } catch (throwable: Throwable) { - when (throwable) { - is ActiveMQException, is RPCException -> { - // Happens when: - // * incorrect credentials provided; - // * incorrect endpoint specified; - // - no point to retry connecting. - throw throwable - } - else -> { - // Deliberately not logging full stack trace as it will be full of internal stacktraces. - logger.info("Exception upon establishing connection: " + throwable.message) - null - } + if (shouldRetry) { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + logger.info("Exception upon establishing connection: " + throwable.message) + null + } else { + throw throwable } } + if (connection != null) { logger.info("Connection successfully established with: $nodeHostAndPort") return connection diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 9208645842..66ee5645f9 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,7 @@ release, see :doc:`upgrade-notes`. Unreleased ---------- +* Remove all references to the out-of-process transaction verification. * Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code. diff --git a/docs/source/corda-api.rst b/docs/source/corda-api.rst index c967d7431e..9e79e9d8a7 100644 --- a/docs/source/corda-api.rst +++ b/docs/source/corda-api.rst @@ -90,7 +90,6 @@ The following modules are available but we do not commit to their stability or c * **net.corda.tools.explorer**: a GUI front-end for Corda * **net.corda.tools.graphs**: utilities to infer project dependencies * **net.corda.tools.loadtest**: Corda load tests -* **net.corda.verifier**: allows out-of-node transaction verification, allowing verification to scale horizontally * **net.corda.webserver**: is a servlet container for CorDapps that export HTTP endpoints. This server is an RPC client of the node * **net.corda.sandbox-creator**: sandbox utilities * **net.corda.quasar.hook**: agent to hook into Quasar and provide types exclusion lists diff --git a/docs/source/corda-repo-layout.rst b/docs/source/corda-repo-layout.rst index c297603609..c280cf4fef 100644 --- a/docs/source/corda-repo-layout.rst +++ b/docs/source/corda-repo-layout.rst @@ -23,5 +23,4 @@ The Corda repository comprises the following folders: mock network) implementation * **tools** contains the explorer which is a GUI front-end for Corda, and also the DemoBench which is a GUI tool that allows you to run Corda nodes locally for demonstrations -* **verifier** allows out-of-node transaction verification, allowing verification to scale horizontally * **webserver** is a servlet container for CorDapps that export HTTP endpoints. This server is an RPC client of the node \ No newline at end of file diff --git a/docs/source/design/monitoring-management/design.md b/docs/source/design/monitoring-management/design.md index 2c0515c980..f90af477e5 100644 --- a/docs/source/design/monitoring-management/design.md +++ b/docs/source/design/monitoring-management/design.md @@ -232,7 +232,7 @@ In general, the requirements outlined in this design are cross-cutting concerns * `FlowErrorAuditEvent` (unused) * `SystemAuditEvent` (unused) * Modules impacted - * All modules packaged and shipped as part of a Corda distribution (as published to Artifactory / Maven): *core, node, node-api, node-driver, finance, confidential-identities, test-common, test-utils, verifier, webserver, jackson, jfx, mock, rpc* + * All modules packaged and shipped as part of a Corda distribution (as published to Artifactory / Maven): *core, node, node-api, node-driver, finance, confidential-identities, test-common, test-utils, webserver, jackson, jfx, mock, rpc* ### Functional @@ -458,7 +458,6 @@ Corda subsystem components: | NotaryService | RaftNonValidatingNotaryService | as above | | NotaryService | BFTNonValidatingNotaryService | Logging coverage (info, debug) | | Doorman | DoormanServer (Enterprise only) | Some logging (info, warn, error), and use of `println` | -| TransactionVerifierService | OutOfProcessTransactionVerifierService (Enterprise only) | some logging (info) | | | | | Corda core flows: diff --git a/docs/source/out-of-process-verification.rst b/docs/source/out-of-process-verification.rst deleted file mode 100644 index 269fc37eaf..0000000000 --- a/docs/source/out-of-process-verification.rst +++ /dev/null @@ -1,26 +0,0 @@ -Out-of-process verification -=========================== - -A Corda node does transaction verification through ``ServiceHub.transactionVerifierService``. This is by default an -``InMemoryTransactionVerifierService`` which just verifies transactions in-process. - -Corda may be configured to use out of process verification. Any number of verifiers may be started connecting to a node -through the node's exposed artemis SSL port. The messaging layer takes care of load balancing. - -.. note:: We plan to introduce kernel level sandboxing around the out of process verifiers as an additional line of - defence in case of inner sandbox escapes. - -To configure a node to use out of process verification specify the ``verifierType`` option in your node.conf: - -.. literalinclude:: example-code/src/main/resources/example-out-of-process-verifier-node.conf - :language: cfg - -You can build a verifier jar using ``./gradlew verifier:standaloneJar``. - -And run it with ``java -jar verifier/build/libs/corda-verifier.jar ``. - -``PATH_TO_VERIFIER_BASE_DIR`` should contain a ``certificates`` folder akin to the one in a node directory, and a -``verifier.conf`` containing the following: - -.. literalinclude:: example-code/src/main/resources/example-verifier.conf - :language: cfg diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt deleted file mode 100644 index d4cae18e13..0000000000 --- a/node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.nodeapi - -import net.corda.core.serialization.* -import net.corda.core.transactions.LedgerTransaction -import net.corda.core.utilities.sequence -import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ClientMessage -import org.apache.activemq.artemis.reader.MessageUtil - -object VerifierApi { - const val VERIFIER_USERNAME = "SystemUsers/Verifier" - const val VERIFICATION_REQUESTS_QUEUE_NAME = "verifier.requests" - const val VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX = "verifier.responses" - private const val VERIFICATION_ID_FIELD_NAME = "id" - private const val RESULT_EXCEPTION_FIELD_NAME = "result-exception" - - data class VerificationRequest( - val verificationId: Long, - val transaction: LedgerTransaction, - val responseAddress: SimpleString - ) { - companion object { - fun fromClientMessage(message: ClientMessage): ObjectWithCompatibleContext { - val bytes = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } - val bytesSequence = bytes.sequence() - val (transaction, context) = bytesSequence.deserializeWithCompatibleContext() - val request = VerificationRequest( - message.getLongProperty(VERIFICATION_ID_FIELD_NAME), - transaction, - MessageUtil.getJMSReplyTo(message)) - return ObjectWithCompatibleContext(request, context) - } - } - - fun writeToClientMessage(message: ClientMessage) { - message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId) - message.writeBodyBufferBytes(transaction.serialize().bytes) - MessageUtil.setJMSReplyTo(message, responseAddress) - } - } - - data class VerificationResponse( - val verificationId: Long, - val exception: Throwable? - ) { - companion object { - fun fromClientMessage(message: ClientMessage): VerificationResponse { - return VerificationResponse( - message.getLongProperty(VERIFICATION_ID_FIELD_NAME), - message.getBytesProperty(RESULT_EXCEPTION_FIELD_NAME)?.deserialize() - ) - } - } - - fun writeToClientMessage(message: ClientMessage, context: SerializationContext) { - message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId) - if (exception != null) { - message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize(context = context).bytes) - } - } - } -} \ No newline at end of file diff --git a/node-api/src/main/resources/certificates/cordadevcakeys.jks b/node-api/src/main/resources/certificates/cordadevcakeys.jks index ec72cb5081..83ab6bb01f 100644 Binary files a/node-api/src/main/resources/certificates/cordadevcakeys.jks and b/node-api/src/main/resources/certificates/cordadevcakeys.jks differ diff --git a/node-api/src/main/resources/certificates/cordatruststore.jks b/node-api/src/main/resources/certificates/cordatruststore.jks index 97b4aebe18..47528654e0 100644 Binary files a/node-api/src/main/resources/certificates/cordatruststore.jks and b/node-api/src/main/resources/certificates/cordatruststore.jks differ diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/DevCertificatesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/DevCertificatesTest.kt new file mode 100644 index 0000000000..4fdc43dd2d --- /dev/null +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/DevCertificatesTest.kt @@ -0,0 +1,41 @@ +package net.corda.nodeapi.internal.crypto + +import net.corda.core.internal.validate +import net.corda.nodeapi.internal.DEV_CA_TRUST_STORE_FILE +import net.corda.nodeapi.internal.DEV_CA_TRUST_STORE_PASS +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.security.cert.TrustAnchor +import java.security.cert.X509Certificate + +class DevCertificatesTest { + private companion object { + const val OLD_DEV_KEYSTORE_PASS = "password" + const val OLD_NODE_DEV_KEYSTORE_FILE_NAME = "nodekeystore.jks" + } + + @Rule + @JvmField + val tempFolder = TemporaryFolder() + + @Test + fun `create server certificate in keystore for SSL`() { + // given + val newTrustStore = loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/$DEV_CA_TRUST_STORE_FILE"), DEV_CA_TRUST_STORE_PASS) + val newTrustRoot = newTrustStore.getX509Certificate(X509Utilities.CORDA_ROOT_CA) + val newTrustAnchor = TrustAnchor(newTrustRoot, null) + + val oldNodeCaKeyStore = loadKeyStore(javaClass.classLoader.getResourceAsStream("regression-test/$OLD_NODE_DEV_KEYSTORE_FILE_NAME"), OLD_DEV_KEYSTORE_PASS) + val oldX509Certificates = oldNodeCaKeyStore.getCertificateChain(X509Utilities.CORDA_CLIENT_CA).map { + it as X509Certificate + }.toTypedArray() + + val certPath = X509Utilities.buildCertPath(*oldX509Certificates) + + // when + certPath.validate(newTrustAnchor) + + // then no exception is thrown + } +} diff --git a/node-api/src/test/resources/regression-test/nodekeystore.jks b/node-api/src/test/resources/regression-test/nodekeystore.jks new file mode 100644 index 0000000000..948cac3577 Binary files /dev/null and b/node-api/src/test/resources/regression-test/nodekeystore.jks differ diff --git a/node-api/src/test/resources/regression-test/sslkeystore.jks b/node-api/src/test/resources/regression-test/sslkeystore.jks new file mode 100644 index 0000000000..60288cb5f7 Binary files /dev/null and b/node-api/src/test/resources/regression-test/sslkeystore.jks differ diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index 4e709fd91c..10e9c2c6d9 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -94,7 +94,7 @@ class CertificateRevocationListNodeTests { } @Test - fun `Simple AMPQ Client to Server connection works`() { + fun `Simple AMPQ Client to Server connection works and soft fail is enabled`() { val crlCheckSoftFail = true val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail) amqpServer.use { @@ -126,7 +126,39 @@ class CertificateRevocationListNodeTests { } @Test - fun `AMPQ Client to Server connection fails when client's certificate is revoked`() { + fun `Simple AMPQ Client to Server connection works and soft fail is disabled`() { + val crlCheckSoftFail = false + val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail) + amqpServer.use { + amqpServer.start() + val receiveSubs = amqpServer.onReceive.subscribe { + assertEquals(BOB_NAME.toString(), it.sourceLegalName) + assertEquals(P2P_PREFIX + "Test", it.topic) + assertEquals("Test", String(it.payload)) + it.complete(true) + } + val (amqpClient, _) = createClient(serverPort, crlCheckSoftFail) + amqpClient.use { + val serverConnected = amqpServer.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(true, serverConnect.connected) + val clientConnect = clientConnected.get() + assertEquals(true, clientConnect.connected) + val msg = amqpClient.createMessage("Test".toByteArray(), + P2P_PREFIX + "Test", + ALICE_NAME.toString(), + emptyMap()) + amqpClient.write(msg) + assertEquals(MessageStatus.Acknowledged, msg.onComplete.get()) + receiveSubs.unsubscribe() + } + } + } + + @Test + fun `AMPQ Client to Server connection fails when client's certificate is revoked and soft fail is enabled`() { val crlCheckSoftFail = true val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail) amqpServer.use { @@ -146,6 +178,27 @@ class CertificateRevocationListNodeTests { } } + @Test + fun `AMPQ Client to Server connection fails when client's certificate is revoked and soft fail is disabled`() { + val crlCheckSoftFail = false + val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail) + amqpServer.use { + amqpServer.start() + amqpServer.onReceive.subscribe { + it.complete(true) + } + val (amqpClient, clientCert) = createClient(serverPort, crlCheckSoftFail) + revokedNodeCerts.add(clientCert.serialNumber) + amqpClient.use { + val serverConnected = amqpServer.onConnection.toFuture() + amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(false, serverConnect.connected) + } + } + } + @Test fun `AMPQ Client to Server connection fails when servers's certificate is revoked`() { val crlCheckSoftFail = true diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index c3b271e4e9..ea0b5be7c1 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -54,7 +54,6 @@ import net.corda.node.services.messaging.InternalRPCMessagingClient import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.messaging.RPCServerConfiguration -import net.corda.node.services.messaging.VerifierMessagingClient import net.corda.node.services.rpc.ArtemisRpcBroker import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.utilities.AddressUtils @@ -138,10 +137,7 @@ open class Node(configuration: NodeConfiguration, } override val log: Logger get() = staticLog - override fun makeTransactionVerifierService(): TransactionVerifierService = when (configuration.verifierType) { - VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //verifierMessagingClient!!.verifierService - VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4) - } + override fun makeTransactionVerifierService(): TransactionVerifierService = InMemoryTransactionVerifierService(numberOfWorkers = 4) private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1" @@ -232,10 +228,6 @@ open class Node(configuration: NodeConfiguration, printBasicNodeInfo("RPC connection address", it.primary.toString()) printBasicNodeInfo("RPC admin connection address", it.admin.toString()) } - verifierMessagingClient = when (configuration.verifierType) { - VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) - VerifierType.InMemory -> null - } require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" } val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey return P2PMessagingClient( @@ -337,10 +329,6 @@ open class Node(configuration: NodeConfiguration, runOnStop += this::close init(rpcOps, securityManager) } - verifierMessagingClient?.run { - runOnStop += this::stop - start() - } (network as P2PMessagingClient).apply { runOnStop += this::stop start() @@ -443,12 +431,10 @@ open class Node(configuration: NodeConfiguration, } private var internalRpcMessagingClient: InternalRPCMessagingClient? = null - private var verifierMessagingClient: VerifierMessagingClient? = null /** Starts a blocking event loop for message dispatch. */ fun run() { internalRpcMessagingClient?.start(rpcBroker!!.serverControl) - verifierMessagingClient?.start2() (network as P2PMessagingClient).run() } diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt index ce9f261a4f..317df6a66c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt @@ -184,7 +184,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List - listOf("main", "production/classes").none { url.toString().contains("$it/$resource") } || listOf("net.corda.core", "net.corda.node", "net.corda.finance").none { scanPackage.startsWith(it) } + !url.toString().contains("main/$resource") || listOf("net.corda.core", "net.corda.node", "net.corda.finance").none { scanPackage.startsWith(it) } } .map { url -> if (url.protocol == "jar") { diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 98f5a6fb79..622976d72a 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -455,8 +455,7 @@ data class NodeH2Settings( ) enum class VerifierType { - InMemory, - OutOfProcess + InMemory } enum class CertChainPolicyType { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt deleted file mode 100644 index 1c89262841..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.node.services.messaging - -import com.codahale.metrics.MetricRegistry -import net.corda.core.crypto.random63BitValue -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.transactions.LedgerTransaction -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.loggerFor -import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService -import net.corda.node.utilities.AffinityExecutor -import net.corda.nodeapi.VerifierApi -import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME -import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingClient -import net.corda.nodeapi.internal.config.SSLConfiguration -import org.apache.activemq.artemis.api.core.RoutingType -import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import java.util.concurrent.TimeUnit - -class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() { - companion object { - private val log = loggerFor() - private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}" - } - - private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize) - /** An executor for sending messages */ - private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1) - private var verificationResponseConsumer: ClientConsumer? = null - fun start(): Unit = synchronized(this) { - val session = artemis.start().session - fun checkVerifierCount() { - if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) { - log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!") - } - } - - // Attempts to create a durable queue on the broker which is bound to an address of the same name. - fun createQueueIfAbsent(queueName: String) { - val queueQuery = session.queueQuery(SimpleString(queueName)) - if (!queueQuery.isExists) { - log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) - } - } - createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME) - createQueueIfAbsent(verifierResponseAddress) - verificationResponseConsumer = session.createConsumer(verifierResponseAddress) - messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS) - } - - fun start2() = synchronized(this) { - verifierService.start(verificationResponseConsumer!!) - } - - fun stop() = synchronized(this) { - artemis.stop() - } - - internal val verifierService = OutOfProcessTransactionVerifierService(metrics) { nonce, transaction -> - messagingExecutor.fetchFrom { - sendRequest(nonce, transaction) - } - } - - private fun sendRequest(nonce: Long, transaction: LedgerTransaction) = synchronized(this) { - val started = artemis.started!! - val message = started.session.createMessage(false) - val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress)) - request.writeToClientMessage(message) - started.producer.send(VERIFICATION_REQUESTS_QUEUE_NAME, message) - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt deleted file mode 100644 index 8d8834d121..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.node.services.transactions - -import com.codahale.metrics.Gauge -import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.Timer -import net.corda.core.concurrent.CordaFuture -import net.corda.core.crypto.SecureHash -import net.corda.core.node.services.TransactionVerifierService -import net.corda.core.crypto.random63BitValue -import net.corda.core.internal.concurrent.OpenFuture -import net.corda.core.internal.concurrent.openFuture -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.transactions.LedgerTransaction -import net.corda.core.utilities.contextLogger -import net.corda.nodeapi.VerifierApi -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import java.util.concurrent.ConcurrentHashMap - -class OutOfProcessTransactionVerifierService( - private val metrics: MetricRegistry, - private val sendRequest: (Long, LedgerTransaction) -> Unit -) : SingletonSerializeAsToken(), TransactionVerifierService { - companion object { - private val log = contextLogger() - } - - private data class VerificationHandle( - val transactionId: SecureHash, - val resultFuture: OpenFuture, - val durationTimerContext: Timer.Context - ) - - private val verificationHandles = ConcurrentHashMap() - - // Metrics - private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name" - - private val durationTimer = metrics.timer(metric("Verification.Duration")) - private val successMeter = metrics.meter(metric("Verification.Success")) - private val failureMeter = metrics.meter(metric("Verification.Failure")) - - class VerificationResultForUnknownTransaction(nonce: Long) : - Exception("Verification result arrived for unknown transaction nonce $nonce") - - fun start(responseConsumer: ClientConsumer) { - log.info("Starting out of process verification service") - metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size }) - responseConsumer.setMessageHandler { message -> - val response = VerifierApi.VerificationResponse.fromClientMessage(message) - val handle = verificationHandles.remove(response.verificationId) ?: throw VerificationResultForUnknownTransaction(response.verificationId) - handle.durationTimerContext.stop() - val exception = response.exception - if (exception == null) { - successMeter.mark() - handle.resultFuture.set(Unit) - } else { - failureMeter.mark() - handle.resultFuture.setException(exception) - } - } - } - - override fun verify(transaction: LedgerTransaction): CordaFuture<*> { - log.info("Verifying ${transaction.id}") - val future = openFuture() - val nonce = random63BitValue() - verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time()) - sendRequest(nonce, transaction) - return future - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index cf466a59d8..a5558936fd 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -12,21 +12,50 @@ package net.corda.node.services.vault import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand -import net.corda.core.contracts.* +import net.corda.core.contracts.Amount +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.FungibleAsset +import net.corda.core.contracts.OwnableState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.internal.* import net.corda.core.messaging.DataFeed import net.corda.core.node.ServicesForResolution import net.corda.core.node.StatesToRecord -import net.corda.core.node.services.* -import net.corda.core.node.services.vault.* +import net.corda.core.node.services.KeyManagementService +import net.corda.core.node.services.StatesNotAvailableException +import net.corda.core.node.services.Vault +import net.corda.core.node.services.VaultQueryException +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE +import net.corda.core.node.services.vault.MAX_PAGE_SIZE +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.Sort +import net.corda.core.node.services.vault.SortAttribute +import net.corda.core.node.services.vault.builder import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.transactions.* -import net.corda.core.utilities.* +import net.corda.core.transactions.ContractUpgradeWireTransaction +import net.corda.core.transactions.CoreTransaction +import net.corda.core.transactions.FullTransaction +import net.corda.core.transactions.NotaryChangeWireTransaction +import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.core.utilities.toHexString +import net.corda.core.utilities.toNonEmptySet +import net.corda.core.utilities.trace import net.corda.node.services.api.VaultServiceInternal import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.nodeapi.internal.persistence.* +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.HibernateConfiguration +import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit +import net.corda.nodeapi.internal.persistence.currentDBSession +import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import org.hibernate.Session import rx.Observable import rx.subjects.PublishSubject @@ -142,7 +171,8 @@ class NodeVaultService( val ourNewStates = when (statesToRecord) { StatesToRecord.NONE -> throw AssertionError("Should not reach here") StatesToRecord.ONLY_RELEVANT -> tx.outputs.withIndex().filter { - isRelevant(it.value.data, keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } }).toSet()) } + isRelevant(it.value.data, keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } }).toSet()) + } StatesToRecord.ALL_VISIBLE -> tx.outputs.withIndex() }.map { tx.outRef(it.index) } @@ -168,16 +198,13 @@ class NodeVaultService( else -> throw IllegalArgumentException("Unsupported transaction type: ${tx.javaClass.name}") } val myKeys by lazy { keyManagementService.filterMyKeys(ltx.outputs.flatMap { it.data.participants.map { it.owningKey } }) } - val (consumedStateAndRefs, producedStates) = ltx.inputs. - zip(ltx.outputs). - filter { (_, output) -> - if (statesToRecord == StatesToRecord.ONLY_RELEVANT) { - isRelevant(output.data, myKeys.toSet()) - } else { - true - } - }. - unzip() + val (consumedStateAndRefs, producedStates) = ltx.inputs.zip(ltx.outputs).filter { (_, output) -> + if (statesToRecord == StatesToRecord.ONLY_RELEVANT) { + isRelevant(output.data, myKeys.toSet()) + } else { + true + } + }.unzip() val producedStateAndRefs = producedStates.map { ltx.outRef(it.data) } if (consumedStateAndRefs.isEmpty() && producedStateAndRefs.isEmpty()) { @@ -403,7 +430,7 @@ class NodeVaultService( if (!seen) { val contractInterfaces = deriveContractInterfaces(concreteType) contractInterfaces.map { - val contractInterface = contractStateTypeMappings.getOrPut(it.name, { mutableSetOf() }) + val contractInterface = contractStateTypeMappings.getOrPut(it.name) { mutableSetOf() } contractInterface.add(concreteType.name) } } @@ -471,7 +498,7 @@ class NodeVaultService( if (!paging.isDefault && index == paging.pageSize) // skip last result if paged return@forEachIndexed val vaultState = result[0] as VaultSchemaV1.VaultStates - val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId!!), vaultState.stateRef!!.index!!) + val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId), vaultState.stateRef!!.index) stateRefs.add(stateRef) statesMeta.add(Vault.StateMetadata(stateRef, vaultState.contractStateClassName, @@ -529,13 +556,24 @@ class NodeVaultService( val distinctTypes = results.map { it } val contractInterfaceToConcreteTypes = mutableMapOf>() + val unknownTypes = mutableSetOf() distinctTypes.forEach { type -> - val concreteType: Class = uncheckedCast(Class.forName(type)) - val contractInterfaces = deriveContractInterfaces(concreteType) - contractInterfaces.map { - val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableSetOf() }) - contractInterface.add(concreteType.name) + val concreteType: Class? = try { + uncheckedCast(Class.forName(type)) + } catch (e: ClassNotFoundException) { + unknownTypes += type + null } + concreteType?.let { + val contractInterfaces = deriveContractInterfaces(it) + contractInterfaces.map { + val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name) { mutableSetOf() } + contractInterface.add(it.name) + } + } + } + if (unknownTypes.isNotEmpty()) { + log.warn("There are unknown contract state types in the vault, which will prevent these states from being used. The relevant CorDapps must be loaded for these states to be used. The types not on the classpath are ${unknownTypes.joinToString(", ", "[", "]")}.") } return contractInterfaceToConcreteTypes } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt index 969cde4a8e..ab26f595c2 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt @@ -25,12 +25,10 @@ import net.corda.core.flows.FlowException import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.internal.AbstractAttachment -import net.corda.core.internal.x500Name import net.corda.core.serialization.* import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.OpaqueBytes import net.corda.node.serialization.amqp.AMQPServerSerializationScheme -import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA import net.corda.nodeapi.internal.crypto.ContentSignerBuilder import net.corda.serialization.internal.* import net.corda.serialization.internal.amqp.SerializerFactory.Companion.isPrimitive @@ -45,6 +43,7 @@ import org.apache.qpid.proton.amqp.* import org.apache.qpid.proton.codec.DecoderImpl import org.apache.qpid.proton.codec.EncoderImpl import org.assertj.core.api.Assertions.* +import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.cert.X509v2CRLBuilder import org.bouncycastle.cert.jcajce.JcaX509CRLConverter import org.bouncycastle.jce.provider.BouncyCastleProvider @@ -667,8 +666,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi val scheme = AMQPServerSerializationScheme(emptyList()) val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" } .java.getDeclaredMethod("registerCustomSerializers", - SerializationContext::class.java, - SerializerFactory::class.java) + SerializationContext::class.java, + SerializerFactory::class.java) func.isAccessible = true val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()) @@ -1021,7 +1020,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi } private fun emptyCrl(): X509CRL { - val builder = X509v2CRLBuilder(CordaX500Name.build(DEV_INTERMEDIATE_CA.certificate.issuerX500Principal).x500Name, Date()) + val builder = X509v2CRLBuilder(X500Name("CN=Corda Root CA, O=R3 HoldCo LLC, L=New York, C=US"), Date()) val provider = BouncyCastleProvider() val crlHolder = builder.build(ContentSignerBuilder.build(Crypto.RSA_SHA256, Crypto.generateKeyPair(Crypto.RSA_SHA256).private, provider)) return JcaX509CRLConverter().setProvider(provider).getCRL(crlHolder) @@ -1330,12 +1329,12 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi } interface DataClassByInterface { - val v : V + val v: V } @Test fun dataClassBy() { - data class C (val s: String) : DataClassByInterface { + data class C(val s: String) : DataClassByInterface { override val v: String = "-- $s" } @@ -1349,8 +1348,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi try { val i2 = DeserializationInput(testDefaultFactory()).deserialize(bytes) - } catch (e : NotSerializableException) { - throw Error ("Deserializing serialized \$C should not throw") + } catch (e: NotSerializableException) { + throw Error("Deserializing serialized \$C should not throw") } } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt index 79edc85bbe..7e4553b422 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt @@ -21,8 +21,7 @@ import net.corda.testing.node.NotarySpec import java.nio.file.Path enum class VerifierType { - InMemory, - OutOfProcess + InMemory } /**