From 48952dfc022be9f64fa85173460a1248818fdcf2 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 8 Mar 2017 17:21:43 +0000 Subject: [PATCH] Add node-api, split minimal node functionality, OutOfProcessTransactionVerifierService --- .../net/corda/client/rpc/CordaRPCClient.kt | 2 +- core/src/main/kotlin/net/corda/core/Utils.kt | 3 +- .../kotlin/net/corda/core/node/ServiceHub.kt | 1 + .../net/corda/core/node/services/Services.kt | 12 + .../core/transactions/WireTransaction.kt | 28 +- .../corda/flows/ResolveTransactionsFlow.kt | 5 +- .../example-out-of-process-verifier-node.conf | 9 + .../src/main/resources/example-verifier.conf | 3 + .../net/corda/docs/ExampleConfigTest.kt | 52 +++ .../net/corda/docs/ExampleNodeConfTest.kt | 34 -- docs/source/index.rst | 1 + docs/source/out-of-process-verification.rst | 26 ++ .../nodeapi/ArtemisMessagingComponent.kt | 2 +- .../kotlin/net/corda/nodeapi/VerifierApi.kt | 59 ++++ .../services/messaging/MQSecurityTest.kt | 4 +- .../kotlin/net/corda/node/driver/Driver.kt | 27 +- .../net/corda/node/internal/AbstractNode.kt | 5 + .../kotlin/net/corda/node/internal/Node.kt | 4 +- .../node/services/config/ConfigUtilities.kt | 4 +- .../node/services/config/NodeConfiguration.kt | 31 +- .../messaging/ArtemisMessagingServer.kt | 176 ++++++++--- .../services/messaging/NodeMessagingClient.kt | 56 +++- .../InMemoryTransactionVerifierService.kt | 18 ++ .../OutOfProcessTransactionVerifierService.kt | 70 ++++ .../corda/node/utilities/AffinityExecutor.kt | 9 +- node/src/main/resources/reference.conf | 3 +- .../node/services/MockServiceHubInternal.kt | 6 +- .../messaging/ArtemisMessagingTests.kt | 5 +- .../node/utilities/AffinityExecutorTests.kt | 14 - .../corda/attachmentdemo/AttachmentDemo.kt | 2 +- .../kotlin/net/corda/notarydemo/NotaryDemo.kt | 2 +- settings.gradle | 1 + test-utils/build.gradle | 1 + .../kotlin/net/corda/testing/CoreTestUtils.kt | 6 +- .../corda/testing/messaging/SimpleMQClient.kt | 2 +- .../kotlin/net/corda/testing/node/MockNode.kt | 3 + .../net/corda/testing/node/MockServices.kt | 2 + .../net/corda/testing/node/SimpleNode.kt | 6 +- .../net/corda/loadtest/ConnectionManager.kt | 10 +- .../kotlin/net/corda/loadtest/LoadTest.kt | 2 +- verifier/build.gradle | 90 ++++++ .../net/corda/verifier/GeneratedLedger.kt | 230 ++++++++++++++ .../net/corda/verifier/VerifierDriver.kt | 299 ++++++++++++++++++ .../net/corda/verifier/VerifierTests.kt | 125 ++++++++ .../kotlin/net/corda/verifier/Verifier.kt | 81 +++++ .../main/resources/verifier-reference.conf | 3 + 46 files changed, 1390 insertions(+), 144 deletions(-) create mode 100644 docs/source/example-code/src/main/resources/example-out-of-process-verifier-node.conf create mode 100644 docs/source/example-code/src/main/resources/example-verifier.conf create mode 100644 docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt delete mode 100644 docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleNodeConfTest.kt create mode 100644 docs/source/out-of-process-verification.rst create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt create mode 100644 verifier/build.gradle create mode 100644 verifier/src/integration-test/kotlin/net/corda/verifier/GeneratedLedger.kt create mode 100644 verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt create mode 100644 verifier/src/integration-test/kotlin/net/corda/verifier/VerifierTests.kt create mode 100644 verifier/src/main/kotlin/net/corda/verifier/Verifier.kt create mode 100644 verifier/src/main/resources/verifier-reference.conf diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 15c290400f..aebf94b5cd 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -1,7 +1,6 @@ package net.corda.client.rpc import com.google.common.net.HostAndPort -import net.corda.nodeapi.config.SSLConfiguration import net.corda.core.ThreadBox import net.corda.core.logElapsedTime import net.corda.core.messaging.CordaRPCOps @@ -12,6 +11,7 @@ import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.RPCException +import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.rpcLog import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.client.ActiveMQClient diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index ca39b64fcd..f3c1a01899 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -102,7 +102,8 @@ fun ListenableFuture.failure(executor: Executor, body: (Throwable) -> Uni infix fun ListenableFuture.then(body: () -> Unit): ListenableFuture = apply { then(RunOnCallerThread, body) } infix fun ListenableFuture.success(body: (T) -> Unit): ListenableFuture = apply { success(RunOnCallerThread, body) } infix fun ListenableFuture.failure(body: (Throwable) -> Unit): ListenableFuture = apply { failure(RunOnCallerThread, body) } -infix fun ListenableFuture.map(mapper: (F) -> T): ListenableFuture = Futures.transform(this, Function { mapper(it!!) }) +@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe. +infix fun ListenableFuture.map(mapper: (F) -> T): ListenableFuture = Futures.transform(this, Function { (mapper as (F?) -> T)(it) }) infix fun ListenableFuture.flatMap(mapper: (F) -> ListenableFuture): ListenableFuture = Futures.transformAsync(this) { mapper(it!!) } /** Executes the given block and sets the future to either the result, or any exception that was thrown. */ inline fun SettableFuture.catch(block: () -> T) { diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 37c71d5b88..1c7f5ddb18 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -43,6 +43,7 @@ interface ServiceHub : ServicesForResolution { override val storageService: StorageService val networkMapCache: NetworkMapCache val schedulerService: SchedulerService + val transactionVerifierService: TransactionVerifierService val clock: Clock val myInfo: NodeInfo diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index 2336c0c68a..39dbc5e71a 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.* import net.corda.core.flows.FlowException import net.corda.core.serialization.CordaSerializable import net.corda.core.toFuture +import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.WireTransaction import rx.Observable @@ -366,3 +367,14 @@ interface SchedulerService { /** Unschedule all activity for a TX output, probably because it was consumed. */ fun unscheduleStateActivity(ref: StateRef) } + +/** + * Provides verification service. The implementation may be a simple in-memory verify() call or perhaps an IPC/RPC. + */ +interface TransactionVerifierService { + /** + * @param transaction The transaction to be verified. + * @return A future that completes successfully if the transaction verified, or sets an exception the verifier threw. + */ + fun verify(transaction: LedgerTransaction): ListenableFuture<*> +} diff --git a/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt b/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt index ef1f82d0e2..3166205446 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt @@ -72,16 +72,36 @@ class WireTransaction( */ @Throws(AttachmentResolutionException::class, TransactionResolutionException::class) fun toLedgerTransaction(services: ServicesForResolution): LedgerTransaction { + return toLedgerTransaction( + resolveIdentity = { services.identityService.partyFromKey(it) }, + resolveAttachment = { services.storageService.attachments.openAttachment(it) }, + resolveStateRef = { services.loadState(it) } + ) + } + + /** + * Looks up identities, attachments and dependent input states using the provided lookup functions in order to + * construct a [LedgerTransaction]. Note that identity lookup failure does *not* cause an exception to be thrown. + * + * @throws AttachmentResolutionException if a required attachment was not found using [resolveAttachment]. + * @throws TransactionResolutionException if an input was not found not using [resolveStateRef]. + */ + @Throws(AttachmentResolutionException::class, TransactionResolutionException::class) + fun toLedgerTransaction( + resolveIdentity: (CompositeKey) -> Party?, + resolveAttachment: (SecureHash) -> Attachment?, + resolveStateRef: (StateRef) -> TransactionState<*>? + ): LedgerTransaction { // Look up public keys to authenticated identities. This is just a stub placeholder and will all change in future. val authenticatedArgs = commands.map { - val parties = it.signers.mapNotNull { pk -> services.identityService.partyFromKey(pk) } + val parties = it.signers.mapNotNull { pk -> resolveIdentity(pk) } AuthenticatedObject(it.signers, parties, it.value) } // Open attachments specified in this transaction. If we haven't downloaded them, we fail. - val attachments = attachments.map { - services.storageService.attachments.openAttachment(it) ?: throw AttachmentResolutionException(it) + val attachments = attachments.map { resolveAttachment(it) ?: throw AttachmentResolutionException(it) } + val resolvedInputs = inputs.map { ref -> + resolveStateRef(ref)?.let { StateAndRef(it, ref) } ?: throw TransactionResolutionException(ref.txhash) } - val resolvedInputs = inputs.map { StateAndRef(services.loadState(it), it) } return LedgerTransaction(resolvedInputs, outputs, authenticatedArgs, attachments, id, notary, mustSign, timestamp, type) } diff --git a/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt index 969f3c18f9..81cdbb1902 100644 --- a/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt @@ -5,6 +5,7 @@ import net.corda.core.checkedAdd import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic +import net.corda.core.getOrThrow import net.corda.core.node.recordTransactions import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.LedgerTransaction @@ -107,7 +108,9 @@ class ResolveTransactionsFlow(private val txHashes: Set, for (stx in newTxns) { // Resolve to a LedgerTransaction and then run all contracts. val ltx = stx.toLedgerTransaction(serviceHub) - ltx.verify() + // Block on each verification request. + // TODO We could recover some parallelism from the dependency graph. + serviceHub.transactionVerifierService.verify(ltx).getOrThrow() serviceHub.recordTransactions(stx) result += ltx } diff --git a/docs/source/example-code/src/main/resources/example-out-of-process-verifier-node.conf b/docs/source/example-code/src/main/resources/example-out-of-process-verifier-node.conf new file mode 100644 index 0000000000..a98bb3d3e7 --- /dev/null +++ b/docs/source/example-code/src/main/resources/example-out-of-process-verifier-node.conf @@ -0,0 +1,9 @@ +myLegalName : "Bank A" +nearestCity : "London" +p2pAddress : "my-corda-node:10002" +webAddress : "localhost:10003" +networkMapService : { + address : "my-network-map:10000" + legalName : "Network Map Service" +} +verifierType: "OutOfProcess" diff --git a/docs/source/example-code/src/main/resources/example-verifier.conf b/docs/source/example-code/src/main/resources/example-verifier.conf new file mode 100644 index 0000000000..2799cb60de --- /dev/null +++ b/docs/source/example-code/src/main/resources/example-verifier.conf @@ -0,0 +1,3 @@ +nodeHostAndPort: "my-corda-node:10002" +keyStorePassword : "cordacadevpass" +trustStorePassword : "trustpass" \ No newline at end of file diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt new file mode 100644 index 0000000000..33b0698ecd --- /dev/null +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt @@ -0,0 +1,52 @@ +package net.corda.docs + +import net.corda.node.services.config.ConfigHelper +import net.corda.node.services.config.FullNodeConfiguration +import net.corda.verifier.Verifier +import org.junit.Test +import java.nio.file.Path +import java.nio.file.Paths +import kotlin.reflect.declaredMemberProperties + +class ExampleConfigTest { + + private fun readAndCheckConfigurations(vararg configFilenames: String, loadConfig: (Path) -> A) { + configFilenames.forEach { + println("Checking $it") + val configFileResource = ExampleConfigTest::class.java.classLoader.getResource(it) + val config = loadConfig(Paths.get(configFileResource.toURI())) + // Force the config fields as they are resolved lazily + config.javaClass.kotlin.declaredMemberProperties.forEach { member -> + member.get(config) + } + } + } + + @Test + fun `example node_confs parses fine`() { + readAndCheckConfigurations( + "example-node.conf", + "example-out-of-process-verifier-node.conf", + "example-network-map-node.conf" + ) { + val baseDirectory = Paths.get("some-example-base-dir") + FullNodeConfiguration( + baseDirectory, + ConfigHelper.loadConfig( + baseDirectory = baseDirectory, + configFile = it + ) + ) + } + } + + @Test + fun `example verifier_conf parses fine`() { + readAndCheckConfigurations( + "example-verifier.conf" + ) { + val baseDirectory = Paths.get("some-example-base-dir") + Verifier.loadConfiguration(baseDirectory, it) + } + } +} \ No newline at end of file diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleNodeConfTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleNodeConfTest.kt deleted file mode 100644 index 60024966a3..0000000000 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleNodeConfTest.kt +++ /dev/null @@ -1,34 +0,0 @@ -package net.corda.docs - -import net.corda.node.services.config.ConfigHelper -import net.corda.node.services.config.FullNodeConfiguration -import org.junit.Test -import java.nio.file.Paths -import kotlin.reflect.declaredMemberProperties - -class ExampleNodeConfTest { - @Test - fun exampleNodeConfParsesFine() { - val exampleNodeConfFilenames = arrayOf( - "example-node.conf", - "example-network-map-node.conf" - ) - - exampleNodeConfFilenames.forEach { - println("Checking $it") - val configResource = ExampleNodeConfTest::class.java.classLoader.getResource(it) - val baseDirectory = Paths.get("some-example-base-dir") - val nodeConfig = FullNodeConfiguration( - baseDirectory, - ConfigHelper.loadConfig( - baseDirectory = baseDirectory, - configFile = Paths.get(configResource.toURI()) - ) - ) - // Force the config fields as they are resolved lazily - nodeConfig.javaClass.kotlin.declaredMemberProperties.forEach { member -> - member.get(nodeConfig) - } - } - } -} \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index 7f212afc77..e564df6d1e 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -134,6 +134,7 @@ Documentation Contents: further-notes-on-kotlin publishing-corda azure-vm + out-of-process-verification .. toctree:: :maxdepth: 2 diff --git a/docs/source/out-of-process-verification.rst b/docs/source/out-of-process-verification.rst new file mode 100644 index 0000000000..90e3f6befc --- /dev/null +++ b/docs/source/out-of-process-verification.rst @@ -0,0 +1,26 @@ +Out of process verification +=========================== + +A Corda node does transaction verification through ``ServiceHub.transactionVerifierService``. This is by default an +``InMemoryTransactionVerifierService`` which just verifies transactions in-process. + +Corda may be configured to use out of process verification. Any number of verifiers may be started connecting to a node +through the node's exposed artemis SSL port. The messaging layer takes care of load balancing. + +.. note:: We plan to introduce kernel level sandboxing around the out of process verifiers as an additional line of + defence in case of inner sandbox escapes. + +To configure a node to use out of process verification specify the ``verifierType`` option in your node.conf: + +.. literalinclude:: example-code/src/main/resources/example-out-of-process-verifier-node.conf + :language: cfg + +You can build a verifier jar using ``./gradlew verifier:standaloneJar``. + +And run it with ``java -jar verifier/build/libs/corda-verifier.jar ``. + +``PATH_TO_VERIFIER_BASE_DIR`` should contain a ``certificates`` folder akin to the one in a node directory, and a +``verifier.conf`` containing the following: + +.. literalinclude:: example-code/src/main/resources/example-verifier.conf + :language: cfg diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt index 77a5b1c087..900f8b0ac2 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt @@ -2,7 +2,6 @@ package net.corda.nodeapi import com.google.common.annotations.VisibleForTesting import com.google.common.net.HostAndPort -import net.corda.nodeapi.config.SSLConfiguration import net.corda.core.crypto.CompositeKey import net.corda.core.messaging.MessageRecipientGroup import net.corda.core.messaging.MessageRecipients @@ -10,6 +9,7 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.read import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.nodeapi.config.SSLConfiguration import java.security.KeyStore /** diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt new file mode 100644 index 0000000000..4fa1526536 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/VerifierApi.kt @@ -0,0 +1,59 @@ +package net.corda.nodeapi + +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.transactions.LedgerTransaction +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.reader.MessageUtil + +object VerifierApi { + val VERIFIER_USERNAME = "SystemUsers/Verifier" + val VERIFICATION_REQUESTS_QUEUE_NAME = "verifier.requests" + val VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX = "verifier.responses" + private val VERIFICATION_ID_FIELD_NAME = "id" + private val RESULT_EXCEPTION_FIELD_NAME = "result-exception" + + data class VerificationRequest( + val verificationId: Long, + val transaction: LedgerTransaction, + val responseAddress: SimpleString + ) { + companion object { + fun fromClientMessage(message: ClientMessage): VerificationRequest { + return VerificationRequest( + message.getLongProperty(VERIFICATION_ID_FIELD_NAME), + ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }.deserialize(), + MessageUtil.getJMSReplyTo(message) + ) + } + } + + fun writeToClientMessage(message: ClientMessage) { + message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId) + message.writeBodyBufferBytes(transaction.serialize().bytes) + MessageUtil.setJMSReplyTo(message, responseAddress) + } + } + + data class VerificationResponse( + val verificationId: Long, + val exception: Throwable? + ) { + companion object { + fun fromClientMessage(message: ClientMessage): VerificationResponse { + return VerificationResponse( + message.getLongProperty(VERIFICATION_ID_FIELD_NAME), + message.getBytesProperty(RESULT_EXCEPTION_FIELD_NAME)?.deserialize() + ) + } + } + + fun writeToClientMessage(message: ClientMessage) { + message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId) + if (exception != null) { + message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize().bytes) + } + } + } +} diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index 2301c42271..d9b74d0894 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -3,7 +3,6 @@ package net.corda.services.messaging import co.paralleluniverse.fibers.Suspendable import com.google.common.net.HostAndPort import net.corda.client.rpc.CordaRPCClientImpl -import net.corda.nodeapi.config.SSLConfiguration import net.corda.core.crypto.Party import net.corda.core.crypto.composite import net.corda.core.crypto.generateKeyPair @@ -23,6 +22,7 @@ import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE import net.corda.nodeapi.User +import net.corda.nodeapi.config.SSLConfiguration import net.corda.testing.configureTestSSL import net.corda.testing.messaging.SimpleMQClient import net.corda.testing.node.NodeBasedTest @@ -82,7 +82,7 @@ abstract class MQSecurityTest : NodeBasedTest() { } @Test - fun `create queue for peer which has not been communciated with`() { + fun `create queue for peer which has not been communicated with`() { val bob = startNode("Bob").getOrThrow() assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.legalIdentity.owningKey.toBase58String()}") } diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index f5641b937a..aa5df87d5e 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -5,8 +5,8 @@ import com.google.common.net.HostAndPort import com.google.common.util.concurrent.* import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions -import net.corda.core.ThreadBox import net.corda.client.rpc.CordaRPCClient +import net.corda.core.ThreadBox import net.corda.core.crypto.Party import net.corda.core.div import net.corda.core.flatMap @@ -19,7 +19,7 @@ import net.corda.core.utilities.loggerFor import net.corda.node.LOGS_DIRECTORY_NAME import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.FullNodeConfiguration -import net.corda.node.services.messaging.NodeMessagingClient +import net.corda.node.services.config.VerifierType import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.ServiceIdentityGenerator @@ -68,6 +68,7 @@ interface DriverDSLExposedInterface { fun startNode(providedName: String? = null, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), + verifierType: VerifierType = VerifierType.InMemory, customOverrides: Map = emptyMap()): ListenableFuture /** @@ -83,6 +84,7 @@ interface DriverDSLExposedInterface { notaryName: String, clusterSize: Int = 3, type: ServiceType = RaftValidatingNotaryService.type, + verifierType: VerifierType = VerifierType.InMemory, rpcUsers: List = emptyList()): Future>> /** @@ -344,7 +346,6 @@ class DriverDSL( val shutdownManager = ShutdownManager(executorService) class State { - val clients = LinkedList() val processes = ArrayList>() } @@ -373,9 +374,6 @@ class DriverDSL( } override fun shutdown() { - state.locked { - clients.forEach(NodeMessagingClient::stop) - } shutdownManager.shutdown() // Check that we shut down properly @@ -396,8 +394,13 @@ class DriverDSL( } } - override fun startNode(providedName: String?, advertisedServices: Set, - rpcUsers: List, customOverrides: Map): ListenableFuture { + override fun startNode( + providedName: String?, + advertisedServices: Set, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map + ): ListenableFuture { val p2pAddress = portAllocation.nextHostAndPort() val rpcAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort() @@ -422,7 +425,8 @@ class DriverDSL( "password" to it.password, "permissions" to it.permissions ) - } + }, + "verifierType" to verifierType.name ) + customOverrides val configuration = FullNodeConfiguration( @@ -450,6 +454,7 @@ class DriverDSL( notaryName: String, clusterSize: Int, type: ServiceType, + verifierType: VerifierType, rpcUsers: List ): ListenableFuture>> { val nodeNames = (1..clusterSize).map { "Notary Node $it" } @@ -461,12 +466,12 @@ class DriverDSL( val notaryClusterAddress = portAllocation.nextHostAndPort() // Start the first node that will bootstrap the cluster - val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) + val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) // All other nodes will join the cluster val restNotaryFutures = nodeNames.drop(1).map { val nodeAddress = portAllocation.nextHostAndPort() val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString())) - startNode(it, advertisedService, rpcUsers, configOverride) + startNode(it, advertisedService, rpcUsers, verifierType, configOverride) } return firstNotaryFuture.flatMap { firstNotary -> diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a9fe847eb3..a2efbf5e37 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -118,6 +118,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val clock: Clock = platformClock override val myInfo: NodeInfo get() = info override val schemaService: SchemaService get() = schemas + override val transactionVerifierService: TransactionVerifierService get() = txVerifierService // Internal only override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) @@ -154,6 +155,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, lateinit var keyManagement: KeyManagementService var inNodeNetworkMapService: NetworkMapService? = null var inNodeNotaryService: NotaryService? = null + lateinit var txVerifierService: TransactionVerifierService lateinit var identity: IdentityService lateinit var net: MessagingServiceInternal lateinit var netMapCache: NetworkMapCache @@ -252,6 +254,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, net = makeMessagingService() schemas = makeSchemaService() vault = makeVaultService(configuration.dataSourceProperties) + txVerifierService = makeTransactionVerifierService() info = makeInfo() identity = makeIdentityService() @@ -478,6 +481,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, protected open fun makeSchemaService(): SchemaService = NodeSchemaService() + protected abstract fun makeTransactionVerifierService() : TransactionVerifierService + open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 904122317a..d1cb5fa479 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -56,6 +56,7 @@ class Node(override val configuration: FullNodeConfiguration, override val log: Logger get() = logger override val version: Version get() = nodeVersionInfo.version override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress) + override fun makeTransactionVerifierService() = (net as NodeMessagingClient).verifierService // DISCUSSION // @@ -136,7 +137,8 @@ class Node(override val configuration: FullNodeConfiguration, myIdentityOrNullIfNetworkMapService, serverThread, database, - networkMapRegistrationFuture) + networkMapRegistrationFuture, + services.monitoringService) } private fun makeLocalMessageBroker(): HostAndPort { diff --git a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt index 8665e1ea1d..17616796fa 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt @@ -7,15 +7,13 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRenderOptions -import net.corda.nodeapi.config.SSLConfiguration import net.corda.core.copyTo import net.corda.core.createDirectories import net.corda.core.crypto.X509Utilities import net.corda.core.div import net.corda.core.exists import net.corda.core.utilities.loggerFor -import java.net.URL -import java.nio.file.Files +import net.corda.nodeapi.config.SSLConfiguration import java.nio.file.Path object ConfigHelper { diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 19c3ce98ac..b850407f42 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -2,25 +2,29 @@ package net.corda.node.services.config import com.google.common.net.HostAndPort import com.typesafe.config.Config -import net.corda.nodeapi.config.SSLConfiguration -import net.corda.nodeapi.config.getListOrElse -import net.corda.nodeapi.config.getOrElse -import net.corda.nodeapi.config.getValue import net.corda.core.div import net.corda.core.node.NodeVersionInfo import net.corda.core.node.services.ServiceInfo import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.Node import net.corda.node.serialization.NodeClock +import net.corda.node.services.messaging.CertificateChainCheckPolicy import net.corda.node.services.network.NetworkMapService import net.corda.node.utilities.TestClock import net.corda.nodeapi.User +import net.corda.nodeapi.config.getListOrElse +import net.corda.nodeapi.config.getOrElse +import net.corda.nodeapi.config.getValue import java.net.URL import java.nio.file.Path import java.util.* +enum class VerifierType { + InMemory, + OutOfProcess +} -interface NodeConfiguration : SSLConfiguration { +interface NodeConfiguration : net.corda.nodeapi.config.SSLConfiguration { val baseDirectory: Path override val certificatesDirectory: Path get() = baseDirectory / "certificates" val myLegalName: String @@ -32,6 +36,8 @@ interface NodeConfiguration : SSLConfiguration { val rpcUsers: List get() = emptyList() val devMode: Boolean val certificateSigningService: URL + val certificateChainCheckPolicies: Map + val verifierType: VerifierType } /** @@ -61,6 +67,10 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config val permissions = it.getListOrElse("permissions") { emptyList() }.toSet() User(username, password, permissions) } + override val certificateChainCheckPolicies = config.getOptionalConfig("certificateChainCheckPolicies")?.run { + entrySet().associateByTo(HashMap(), { it.key }, { parseCertificateChainCheckPolicy(getConfig(it.key)) }) + } ?: emptyMap() + override val verifierType: VerifierType by config val useHTTPS: Boolean by config val p2pAddress: HostAndPort by config val rpcAddress: HostAndPort? by config @@ -90,4 +100,15 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config } } +private fun parseCertificateChainCheckPolicy(config: Config): CertificateChainCheckPolicy { + val policy = config.getString("policy") + return when (policy) { + "Any" -> CertificateChainCheckPolicy.Any + "RootMustMatch" -> CertificateChainCheckPolicy.RootMustMatch + "LeafMustMatch" -> CertificateChainCheckPolicy.LeafMustMatch + "MustContainOneOf" -> CertificateChainCheckPolicy.MustContainOneOf(config.getStringList("trustedAliases").toSet()) + else -> throw IllegalArgumentException("Invalid certificate chain check policy $policy") + } +} + private fun Config.getOptionalConfig(path: String): Config? = if (hasPath(path)) getConfig(path) else null diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 95f7bbfc1f..2fbd254598 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -22,13 +22,11 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE +import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE +import net.corda.nodeapi.* import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER -import net.corda.nodeapi.ArtemisMessagingComponent -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.nodeapi.ConnectionDirection -import net.corda.nodeapi.expectedOnDefaultFileSystem import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration @@ -51,8 +49,8 @@ import org.bouncycastle.asn1.x500.X500Name import rx.Subscription import java.io.IOException import java.math.BigInteger +import java.security.KeyStore import java.security.Principal -import java.security.PublicKey import java.util.* import java.util.concurrent.Executor import java.util.concurrent.ScheduledExecutorService @@ -67,6 +65,7 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE import javax.security.auth.login.FailedLoginException import javax.security.auth.login.LoginException import javax.security.auth.spi.LoginModule +import javax.security.cert.CertificateException import javax.security.cert.X509Certificate // TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman. @@ -201,6 +200,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, * 1. The node itself. It is given full access to all valid queues. * 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue. * 3. RPC users. These are only given sufficient access to perform RPC with us. + * 4. Verifiers. These are given read access to the verification request queue and write access to the response queue. */ private fun ConfigurationImpl.configureAddressSecurity() { val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true) @@ -214,6 +214,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, nodeInternalRole, restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)) } + securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true)) + securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.*"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true)) } private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, @@ -224,9 +226,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { - val rootCAPublicKey = X509Utilities - .loadCertificateFromKeyStore(config.trustStoreFile, config.trustStorePassword, CORDA_ROOT_CA) - .publicKey val ourCertificate = X509Utilities .loadCertificateFromKeyStore(config.keyStoreFile, config.keyStorePassword, CORDA_CLIENT_CA) val ourSubjectDN = X500Name(ourCertificate.subjectDN.name) @@ -234,13 +233,22 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, require(ourSubjectDN.commonName == config.myLegalName) { "Legal name does not match with our subject CN: $ourSubjectDN" } + val defaultCertPolicies = mapOf( + PEER_ROLE to CertificateChainCheckPolicy.RootMustMatch, + NODE_ROLE to CertificateChainCheckPolicy.LeafMustMatch, + VERIFIER_ROLE to CertificateChainCheckPolicy.RootMustMatch + ) + val keyStore = X509Utilities.loadKeyStore(config.keyStoreFile, config.keyStorePassword) + val trustStore = X509Utilities.loadKeyStore(config.trustStoreFile, config.trustStorePassword) + val certChecks = defaultCertPolicies.mapValues { + (config.certificateChainCheckPolicies[it.key] ?: it.value).createCheck(keyStore, trustStore) + } val securityConfig = object : SecurityConfiguration() { // Override to make it work with our login module override fun getAppConfigurationEntry(name: String): Array { val options = mapOf( RPCUserService::class.java.name to userService, - CORDA_ROOT_CA to rootCAPublicKey, - CORDA_CLIENT_CA to ourCertificate.publicKey) + NodeLoginModule.CERT_CHAIN_CHECKS_OPTION_NAME to certChecks) return arrayOf(AppConfigurationEntry(name, REQUIRED, options)) } } @@ -448,6 +456,66 @@ private class VerifyingNettyConnector(configuration: MutableMap?, } } +sealed class CertificateChainCheckPolicy { + + @FunctionalInterface + interface Check { + fun checkCertificateChain(theirChain: Array) + } + + abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check + + object Any : CertificateChainCheckPolicy() { + override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { + return object : Check { + override fun checkCertificateChain(theirChain: Array) { + } + } + } + } + + object RootMustMatch : CertificateChainCheckPolicy() { + override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { + val rootPublicKey = trustStore.getCertificate(CORDA_ROOT_CA).publicKey + return object : Check { + override fun checkCertificateChain(theirChain: Array) { + val theirRoot = theirChain.last().publicKey + if (rootPublicKey != theirRoot) { + throw CertificateException("Root certificate mismatch, their root = $theirRoot") + } + } + } + } + } + + object LeafMustMatch : CertificateChainCheckPolicy() { + override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { + val ourPublicKey = keyStore.getCertificate(CORDA_CLIENT_CA).publicKey + return object : Check { + override fun checkCertificateChain(theirChain: Array) { + val theirLeaf = theirChain.first().publicKey + if (ourPublicKey != theirLeaf) { + throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf") + } + } + } + } + } + + class MustContainOneOf(val trustedAliases: Set) : CertificateChainCheckPolicy() { + override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { + val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet() + return object : Check { + override fun checkCertificateChain(theirChain: Array) { + if (!theirChain.any { it.publicKey in trustedPublicKeys }) { + throw CertificateException("Their certificate chain contained none of the trusted ones") + } + } + } + } + } +} + /** * Clients must connect to us with a username and password and must use TLS. If a someone connects with * [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate @@ -465,6 +533,9 @@ class NodeLoginModule : LoginModule { const val PEER_ROLE = "SystemRoles/Peer" const val NODE_ROLE = "SystemRoles/Node" const val RPC_ROLE = "SystemRoles/RPC" + const val VERIFIER_ROLE = "SystemRoles/Verifier" + + const val CERT_CHAIN_CHECKS_OPTION_NAME = "CertChainChecks" val log = loggerFor() } @@ -473,23 +544,26 @@ class NodeLoginModule : LoginModule { private lateinit var subject: Subject private lateinit var callbackHandler: CallbackHandler private lateinit var userService: RPCUserService - private lateinit var ourRootCAPublicKey: PublicKey - private lateinit var ourPublicKey: PublicKey + private lateinit var peerCertCheck: CertificateChainCheckPolicy.Check + private lateinit var nodeCertCheck: CertificateChainCheckPolicy.Check + private lateinit var verifierCertCheck: CertificateChainCheckPolicy.Check private val principals = ArrayList() + @Suppress("UNCHECKED_CAST") override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map, options: Map) { this.subject = subject this.callbackHandler = callbackHandler userService = options[RPCUserService::class.java.name] as RPCUserService - ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey - ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey + val certChainChecks = options[CERT_CHAIN_CHECKS_OPTION_NAME] as Map + peerCertCheck = certChainChecks[PEER_ROLE]!! + nodeCertCheck = certChainChecks[NODE_ROLE]!! + verifierCertCheck = certChainChecks[VERIFIER_ROLE]!! } override fun login(): Boolean { val nameCallback = NameCallback("Username: ") val passwordCallback = PasswordCallback("Password: ", false) val certificateCallback = CertificateCallback() - try { callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback)) } catch (e: IOException) { @@ -504,32 +578,38 @@ class NodeLoginModule : LoginModule { log.info("Processing login for $username") - val validatedUser = when (determineUserRole(certificates, username)) { - PEER_ROLE -> authenticatePeer(certificates) - NODE_ROLE -> authenticateNode(certificates) - RPC_ROLE -> authenticateRpcUser(password, username) - else -> throw FailedLoginException("Peer does not belong on our network") - } - principals += UserPrincipal(validatedUser) + try { + val validatedUser = when (determineUserRole(certificates, username)) { + PEER_ROLE -> authenticatePeer(certificates) + NODE_ROLE -> authenticateNode(certificates) + VERIFIER_ROLE -> authenticateVerifier(certificates) + RPC_ROLE -> authenticateRpcUser(password, username) + else -> throw FailedLoginException("Peer does not belong on our network") + } + principals += UserPrincipal(validatedUser) - loginSucceeded = true - return loginSucceeded + loginSucceeded = true + return loginSucceeded + } catch (exception: FailedLoginException) { + log.warn("$exception") + throw exception + } } private fun authenticateNode(certificates: Array): String { - val peerCertificate = certificates.first() - if (peerCertificate.publicKey != ourPublicKey) { - throw FailedLoginException("Only the node can login as $NODE_USER") - } + nodeCertCheck.checkCertificateChain(certificates) principals += RolePrincipal(NODE_ROLE) - return peerCertificate.subjectDN.name + return certificates.first().subjectDN.name + } + + private fun authenticateVerifier(certificates: Array): String { + verifierCertCheck.checkCertificateChain(certificates) + principals += RolePrincipal(VERIFIER_ROLE) + return certificates.first().subjectDN.name } private fun authenticatePeer(certificates: Array): String { - val theirRootCAPublicKey = certificates.last().publicKey - if (theirRootCAPublicKey != ourRootCAPublicKey) { - throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey") - } + peerCertCheck.checkCertificateChain(certificates) principals += RolePrincipal(PEER_ROLE) return certificates.first().subjectDN.name } @@ -547,14 +627,28 @@ class NodeLoginModule : LoginModule { } private fun determineUserRole(certificates: Array?, username: String): String? { - return if (username == PEER_USER || username == NODE_USER) { - certificates ?: throw FailedLoginException("No TLS?") - if (username == PEER_USER) PEER_ROLE else NODE_ROLE - } else if (certificates == null) { - // Assume they're an RPC user if its from a non-ssl connection - RPC_ROLE - } else { - null + fun requireTls() = require(certificates != null) { "No TLS?" } + return when (username) { + PEER_USER -> { + requireTls() + PEER_ROLE + } + NODE_USER -> { + requireTls() + NODE_ROLE + } + VerifierApi.VERIFIER_USERNAME -> { + requireTls() + VERIFIER_ROLE + } + else -> { + // Assume they're an RPC user if its from a non-ssl connection + if (certificates == null) { + RPC_ROLE + } else { + null + } + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index e97cf9b444..52151b41f5 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -7,19 +7,29 @@ import net.corda.core.crypto.CompositeKey import net.corda.core.messaging.* import net.corda.core.node.NodeVersionInfo import net.corda.core.node.services.PartyInfo +import net.corda.core.node.services.TransactionVerifierService +import net.corda.core.random63BitValue import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.opaque import net.corda.core.success +import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.nodeapi.ConnectionDirection import net.corda.node.services.RPCUserService import net.corda.node.services.api.MessagingServiceInternal +import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.VerifierType import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.services.transactions.InMemoryTransactionVerifierService +import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService import net.corda.node.utilities.* import net.corda.nodeapi.ArtemisMessagingComponent +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.VerifierApi +import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME +import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.* import org.apache.activemq.artemis.api.core.SimpleString @@ -33,6 +43,7 @@ import java.time.Instant import java.util.* import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.ThreadSafe // TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox @@ -62,7 +73,9 @@ class NodeMessagingClient(override val config: NodeConfiguration, val myIdentity: CompositeKey?, val nodeExecutor: AffinityExecutor, val database: Database, - val networkMapRegistrationFuture: ListenableFuture) : ArtemisMessagingComponent(), MessagingServiceInternal { + val networkMapRegistrationFuture: ListenableFuture, + val monitoringService: MonitoringService +) : ArtemisMessagingComponent(), MessagingServiceInternal { companion object { private val log = loggerFor() @@ -75,6 +88,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, private val nodeVersionProperty = SimpleString("node-version") private val nodeVendorProperty = SimpleString("node-vendor") private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0")) + private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}" } private class InnerState { @@ -88,6 +102,11 @@ class NodeMessagingClient(override val config: NodeConfiguration, // Consumer for inbound client RPC messages. var rpcConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null + var verificationResponseConsumer: ClientConsumer? = null + } + val verifierService = when (config.verifierType) { + VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4) + VerifierType.OutOfProcess -> createOutOfProcessVerifierService() } /** A registration to handle messages of different types */ @@ -163,6 +182,19 @@ class NodeMessagingClient(override val config: NodeConfiguration, rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE) rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName) + + fun checkVerifierCount() { + if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) { + log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!") + } + } + + if (config.verifierType == VerifierType.OutOfProcess) { + createQueueIfAbsent(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME) + createQueueIfAbsent(verifierResponseAddress) + verificationResponseConsumer = session.createConsumer(verifierResponseAddress) + messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS) + } } } @@ -224,6 +256,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, check(!running) { "run can't be called twice" } running = true rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor) + (verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!) p2pConsumer!! } @@ -463,6 +496,23 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } + private fun createOutOfProcessVerifierService(): TransactionVerifierService { + return object : OutOfProcessTransactionVerifierService(monitoringService) { + override fun sendRequest(nonce: Long, transaction: LedgerTransaction) { + messagingExecutor.fetchFrom { + state.locked { + val message = session!!.createMessage(false) + val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress)) + request.writeToClientMessage(message) + producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message) + } + } + } + + } + } + + override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { return when (partyInfo) { is PartyInfo.Node -> partyInfo.node.address diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt new file mode 100644 index 0000000000..57619e66e1 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt @@ -0,0 +1,18 @@ +package net.corda.node.services.transactions + +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import net.corda.core.node.services.TransactionVerifierService +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.transactions.LedgerTransaction +import java.util.concurrent.Executors + +class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService { + private val workerPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfWorkers)) + + override fun verify(transaction: LedgerTransaction): ListenableFuture<*> { + return workerPool.submit { + transaction.verify() + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt new file mode 100644 index 0000000000..5711458fb6 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt @@ -0,0 +1,70 @@ +package net.corda.node.services.transactions + +import com.codahale.metrics.Gauge +import com.codahale.metrics.Timer +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import net.corda.core.crypto.SecureHash +import net.corda.core.node.services.TransactionVerifierService +import net.corda.core.random63BitValue +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.utilities.debug +import net.corda.core.utilities.loggerFor +import net.corda.node.services.api.MonitoringService +import net.corda.nodeapi.VerifierApi +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import java.util.concurrent.ConcurrentHashMap + +abstract class OutOfProcessTransactionVerifierService( + val monitoringService: MonitoringService +) : SingletonSerializeAsToken(), TransactionVerifierService { + companion object { + val log = loggerFor() + } + private data class VerificationHandle( + val transactionId: SecureHash, + val resultFuture: SettableFuture, + val durationTimerContext: Timer.Context + ) + private val verificationHandles = ConcurrentHashMap() + + // Metrics + private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name" + private val durationTimer = monitoringService.metrics.timer(metric("Verification.Duration")) + private val successMeter = monitoringService.metrics.meter(metric("Verification.Success")) + private val failureMeter = monitoringService.metrics.meter(metric("Verification.Failure")) + + class VerificationResultForUnknownTransaction(nonce: Long) : + Exception("Verification result arrived for unknown transaction nonce $nonce") + + fun start(responseConsumer: ClientConsumer) { + log.info("Starting out of process verification service") + monitoringService.metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size }) + responseConsumer.setMessageHandler { message -> + val response = VerifierApi.VerificationResponse.fromClientMessage(message) + val handle = verificationHandles.remove(response.verificationId) ?: + throw VerificationResultForUnknownTransaction(response.verificationId) + handle.durationTimerContext.stop() + val exception = response.exception + if (exception == null) { + successMeter.mark() + handle.resultFuture.set(Unit) + } else { + failureMeter.mark() + handle.resultFuture.setException(exception) + } + } + } + + abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction) + + override fun verify(transaction: LedgerTransaction): ListenableFuture<*> { + log.info("Verifying ${transaction.id}") + val future = SettableFuture.create() + val nonce = random63BitValue() + verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time()) + sendRequest(nonce, transaction) + return future + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt b/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt index 2e7a56f200..818ec902c3 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt @@ -1,5 +1,6 @@ package net.corda.node.utilities +import com.google.common.util.concurrent.ListeningScheduledExecutorService import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.Uninterruptibles import net.corda.core.utilities.loggerFor @@ -51,13 +52,12 @@ interface AffinityExecutor : Executor { * tasks in the future and verify code is running on the executor. */ open class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor, - ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue()) { + ScheduledThreadPoolExecutor(numThreads) { companion object { val logger = loggerFor() } private val threads = Collections.synchronizedSet(HashSet()) - private val uncaughtExceptionHandler = Thread.currentThread().uncaughtExceptionHandler init { setThreadFactory(fun(runnable: Runnable): Thread { @@ -77,11 +77,6 @@ interface AffinityExecutor : Executor { }) } - override fun afterExecute(r: Runnable, t: Throwable?) { - if (t != null) - uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t) - } - override val isOnThread: Boolean get() = Thread.currentThread() in threads override fun flush() { diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 097f608fbd..51b4b29675 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -14,4 +14,5 @@ devMode = true certificateSigningService = "https://cordaci-netperm.corda.r3cev.com" useHTTPS = false h2port = 0 -useTestClock = false \ No newline at end of file +useTestClock = false +verifierType = InMemory \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index 04041ced3f..20e293ecdb 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -16,6 +16,7 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.persistence.DataVending import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.testing.MOCK_IDENTITY_SERVICE import net.corda.testing.node.MockNetworkMapCache import net.corda.testing.node.MockStorageService @@ -32,8 +33,11 @@ open class MockServiceHubInternal( val scheduler: SchedulerService? = null, val overrideClock: Clock? = NodeClock(), val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(), - val schemas: SchemaService? = NodeSchemaService() + val schemas: SchemaService? = NodeSchemaService(), + val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2) ) : ServiceHubInternal() { + override val transactionVerifierService: TransactionVerifierService + get() = customTransactionVerifierService ?: throw UnsupportedOperationException() override val vaultService: VaultService get() = customVault ?: throw UnsupportedOperationException() override val keyManagementService: KeyManagementService diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index e1a1cd9c91..4006143a71 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -1,5 +1,6 @@ package net.corda.node.services.messaging +import com.codahale.metrics.MetricRegistry import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture @@ -14,6 +15,7 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.utilities.LogHelper import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl +import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate @@ -226,7 +228,8 @@ class ArtemisMessagingTests { identity.public.composite, ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, - networkMapRegistrationFuture).apply { + networkMapRegistrationFuture, + MonitoringService(MetricRegistry())).apply { config.configureWithDevSSLCertificate() messagingClient = this } diff --git a/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt b/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt index ebbf726c6f..ef63bc7cd5 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt @@ -82,18 +82,4 @@ class AffinityExecutorTests { latch.countDown() executor.flush() } - - @Test fun `exceptions are reported to the specified handler`() { - val exception = AtomicReference() - // Run in a separate thread to avoid messing with any default exception handlers in the unit test thread. - thread { - Thread.currentThread().setUncaughtExceptionHandler { thread, throwable -> exception.set(throwable) } - _executor = AffinityExecutor.ServiceAffinityExecutor("test3", 1) - executor.execute { - throw Exception("foo") - } - executor.flush() - }.join() - assertEquals("foo", exception.get()?.message) - } } diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt index ab4dbf863a..95be07aad4 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt @@ -3,7 +3,6 @@ package net.corda.attachmentdemo import com.google.common.net.HostAndPort import joptsimple.OptionParser import net.corda.client.rpc.CordaRPCClient -import net.corda.nodeapi.config.SSLConfiguration import net.corda.core.contracts.TransactionType import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash @@ -13,6 +12,7 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.Emoji import net.corda.flows.FinalityFlow +import net.corda.nodeapi.config.SSLConfiguration import net.corda.testing.ALICE_KEY import java.nio.file.Path import java.nio.file.Paths diff --git a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt index e24100128a..9cd6d733e0 100644 --- a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt +++ b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt @@ -4,7 +4,6 @@ import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import joptsimple.OptionParser import net.corda.client.rpc.CordaRPCClient -import net.corda.nodeapi.config.SSLConfiguration import net.corda.core.crypto.toStringShort import net.corda.core.div import net.corda.core.getOrThrow @@ -12,6 +11,7 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.transactions.SignedTransaction import net.corda.flows.NotaryFlow +import net.corda.nodeapi.config.SSLConfiguration import net.corda.notarydemo.flows.DummyIssueAndMove import java.nio.file.Path import java.nio.file.Paths diff --git a/settings.gradle b/settings.gradle index 39dea07db0..f12edeb204 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,6 +16,7 @@ include 'client:mock' include 'client:rpc' include 'experimental' include 'experimental:sandbox' +include 'verifier' include 'test-utils' include 'tools:explorer' include 'tools:explorer:capsule' diff --git a/test-utils/build.gradle b/test-utils/build.gradle index 25501c842e..2b9b8905d1 100644 --- a/test-utils/build.gradle +++ b/test-utils/build.gradle @@ -27,6 +27,7 @@ dependencies { compile project(':core') compile project(':node') compile project(':node:webserver') + compile project(':verifier') compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index 94f30a5cae..e784558e36 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -22,6 +22,8 @@ import net.corda.node.internal.AbstractNode import net.corda.node.internal.NetworkMapInfo import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureDevKeyAndTrustStores +import net.corda.node.services.config.VerifierType +import net.corda.node.services.messaging.CertificateChainCheckPolicy import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.utilities.AddOrRemove.ADD import net.corda.testing.node.MockIdentityService @@ -166,7 +168,9 @@ data class TestNodeConfiguration( override val emailAddress: String = "", override val exportJMXto: String = "", override val devMode: Boolean = true, - override val certificateSigningService: URL = URL("http://localhost")) : NodeConfiguration + override val certificateSigningService: URL = URL("http://localhost"), + override val certificateChainCheckPolicies: Map = emptyMap(), + override val verifierType: VerifierType = VerifierType.InMemory) : NodeConfiguration fun Config.getHostAndPort(name: String) = HostAndPort.fromString(getString(name)) diff --git a/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt b/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt index ff29645e35..f3bbc770cf 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt @@ -1,10 +1,10 @@ package net.corda.testing.messaging import com.google.common.net.HostAndPort -import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.config.SSLConfiguration import net.corda.testing.configureTestSSL import org.apache.activemq.artemis.api.core.client.* diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 4879eace01..b995b3763e 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -22,6 +22,7 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.transactions.InMemoryUniquenessProvider import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.services.transactions.ValidatingNotaryService @@ -198,6 +199,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider = InMemoryUniquenessProvider() + override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) + override fun start(): MockNode { super.start() mockNet.identities.add(info.legalIdentity) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index 189beca997..d860514fcf 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -18,6 +18,7 @@ import net.corda.node.services.persistence.InMemoryStateMachineRecordedTransacti import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.vault.NodeVaultService +import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.testing.MEGA_CORP import net.corda.testing.MINI_CORP import net.corda.testing.MOCK_VERSION @@ -68,6 +69,7 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub { override val clock: Clock get() = Clock.systemUTC() override val schedulerService: SchedulerService get() = throw UnsupportedOperationException() override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite), MOCK_VERSION) + override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) fun makeVaultService(dataSourceProps: Properties): VaultService { val vaultService = NodeVaultService(this, dataSourceProps) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt index de23a54e79..18b6af8374 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt @@ -1,5 +1,6 @@ package net.corda.testing.node +import com.codahale.metrics.MetricRegistry import com.google.common.net.HostAndPort import com.google.common.util.concurrent.SettableFuture import net.corda.core.crypto.composite @@ -19,6 +20,7 @@ import org.jetbrains.exposed.sql.Database import java.io.Closeable import java.security.KeyPair import kotlin.concurrent.thread +import net.corda.node.services.api.MonitoringService /** * This is a bare-bones node which can only send and receive messages. It doesn't register with a network map service or @@ -29,6 +31,7 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL private val databaseWithCloseable: Pair = configureDatabase(config.dataSourceProperties) val database: Database get() = databaseWithCloseable.second val userService = RPCUserServiceImpl(config) + val monitoringService = MonitoringService(MetricRegistry()) val identity: KeyPair = generateKeyPair() val executor = ServiceAffinityExecutor(config.myLegalName, 1) val broker = ArtemisMessagingServer(config, address, rpcAddress, InMemoryNetworkMapCache(), userService) @@ -41,7 +44,8 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL identity.public.composite, executor, database, - networkMapRegistrationFuture) + networkMapRegistrationFuture, + monitoringService) } fun start() { diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt index 4e97fc6966..9710a1e908 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt @@ -13,7 +13,6 @@ import net.corda.node.driver.PortAllocation import org.slf4j.LoggerFactory import java.io.ByteArrayOutputStream import java.io.Closeable -import java.nio.file.Path import java.util.* private val log = LoggerFactory.getLogger(ConnectionManager::class.java) @@ -93,16 +92,15 @@ class ConnectionManager(private val username: String, private val jSch: JSch) { * safely cleaned up if an exception is thrown. * * @param username The UNIX username to use for SSH authentication. - * @param nodeHostsAndCertificatesPaths The list of hosts and associated remote paths to the nodes' certificate directories. + * @param nodeHosts The list of hosts. * @param remoteMessagingPort The Artemis messaging port nodes are listening on. * @param tunnelPortAllocation A local port allocation strategy for creating SSH tunnels. - * @param certificatesBaseDirectory A local directory to put downloaded certificates in. * @param withConnections An action to run once we're connected to the nodes. * @return The return value of [withConnections] */ fun connectToNodes( username: String, - nodeHostsAndCertificatesPaths: List>, + nodeHosts: List, remoteMessagingPort: Int, tunnelPortAllocation: PortAllocation, rpcUsername: String, @@ -110,9 +108,9 @@ fun connectToNodes( withConnections: (List) -> A ): A { val manager = ConnectionManager(username, setupJSchWithSshAgent()) - val connections = nodeHostsAndCertificatesPaths.parallelStream().map { nodeHostAndCertificatesPath -> + val connections = nodeHosts.parallelStream().map { nodeHost -> manager.connectToNode( - nodeHost = nodeHostAndCertificatesPath.first, + nodeHost = nodeHost, remoteMessagingPort = remoteMessagingPort, localTunnelAddress = tunnelPortAllocation.nextHostAndPort(), rpcUsername = rpcUsername, diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt index 4a7bb1775e..ba67dff6cf 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt @@ -160,7 +160,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List, + // notary -> outputs. We need to track this because of the unique-notary-on-inputs invariant + val availableOutputs: Map>>, + val attachments: Set, + val identities: Set +) { + val hashTransactionMap: Map by lazy { transactions.associateBy(WireTransaction::id) } + val attachmentMap: Map by lazy { attachments.associateBy(Attachment::id) } + val identityMap: Map by lazy { identities.associateBy(Party::owningKey) } + + companion object { + val empty = GeneratedLedger(emptyList(), emptyMap(), emptySet(), emptySet()) + } + + fun resolveWireTransaction(transaction: WireTransaction): LedgerTransaction { + return transaction.toLedgerTransaction( + resolveIdentity = { identityMap[it] }, + resolveAttachment = { attachmentMap[it] }, + resolveStateRef = { hashTransactionMap[it.txhash]?.outputs?.get(it.index) } + ) + } + + val attachmentsGenerator: Generator> by lazy { + Generator.replicatePoisson(1.0, pickOneOrMaybeNew(attachments, attachmentGenerator)) + } + + val commandsGenerator: Generator>> by lazy { + Generator.replicatePoisson(4.0, commandGenerator(identities)) + } + + /** + * Generates an issuance(root) transaction. + * Invariants: The input list must be empty. + */ + val issuanceGenerator: Generator> by lazy { + val outputsGen = outputsGenerator.bind { outputs -> + Generator.sequence( + outputs.map { output -> + pickOneOrMaybeNew(identities, partyGenerator).map { notary -> + TransactionState(output, notary, null) + } + } + ) + } + attachmentsGenerator.combine(outputsGen, commandsGenerator) { txAttachments, outputs, commands -> + val signers = commands.flatMap { it.first.signers } + val newTransaction = WireTransaction( + emptyList(), + txAttachments.map { it.id }, + outputs, + commands.map { it.first }, + null, + signers, + TransactionType.General(), + null + ) + val newOutputStateAndRefs = outputs.mapIndexed { i, state -> + StateAndRef(state, StateRef(newTransaction.id, i)) + } + val newAvailableOutputs = availableOutputs + newOutputStateAndRefs.groupBy { it.state.notary } + val newAttachments = attachments + txAttachments + val newIdentities = identities + commands.map { it.second } + outputs.map { it.notary } + val newLedger = GeneratedLedger(transactions + newTransaction, newAvailableOutputs, newAttachments, newIdentities) + Pair(newTransaction, newLedger) + } + } + + /** + * Generates a regular non-issue transaction. + * Invariants: + * * Input and output notaries must be one and the same. + */ + fun regularTransactionGenerator(inputNotary: Party, inputsToChooseFrom: List>): Generator> { + val outputsGen = outputsGenerator.map { outputs -> + outputs.map { output -> + TransactionState(output, inputNotary, null) + } + } + val inputsGen = Generator.sampleBernoulli(inputsToChooseFrom) + return inputsGen.combine(attachmentsGenerator, outputsGen, commandsGenerator) { inputs, txAttachments, outputs, commands -> + val signers = commands.flatMap { it.first.signers } + inputNotary.owningKey + val newTransaction = WireTransaction( + inputs.map { it.ref }, + txAttachments.map { it.id }, + outputs, + commands.map { it.first }, + inputNotary, + signers, + TransactionType.General(), + null + ) + val newOutputStateAndRefs = outputs.mapIndexed { i, state -> + StateAndRef(state, StateRef(newTransaction.id, i)) + } + val availableOutputsMinusConsumed = HashMap(availableOutputs) + if (inputs.size == inputsToChooseFrom.size) { + availableOutputsMinusConsumed.remove(inputNotary) + } else { + availableOutputsMinusConsumed[inputNotary] = inputsToChooseFrom - inputs + } + val newAvailableOutputs = availableOutputsMinusConsumed + newOutputStateAndRefs.groupBy { it.state.notary } + val newAttachments = attachments + txAttachments + val newIdentities = identities + commands.map { it.second } + val newLedger = GeneratedLedger(transactions + newTransaction, newAvailableOutputs, newAttachments, newIdentities) + Pair(newTransaction, newLedger) + } + } + + /** + * Generates a notary change transaction. + * Invariants: + * * Input notary must be different from the output ones. + * * All other data must stay the same. + */ + fun notaryChangeTransactionGenerator(inputNotary: Party, inputsToChooseFrom: List>): Generator> { + val newNotaryGen = pickOneOrMaybeNew(identities - inputNotary, partyGenerator) + val inputsGen = Generator.sampleBernoulli(inputsToChooseFrom) + return inputsGen.bind { inputs -> + val signers = inputs.flatMap { it.state.data.participants } + inputNotary.owningKey + val outputsGen = Generator.sequence(inputs.map { input -> newNotaryGen.map { TransactionState(input.state.data, it, null) } }) + outputsGen.combine(attachmentsGenerator) { outputs, txAttachments -> + val newNotaries = outputs.map { it.notary } + val newTransaction = WireTransaction( + inputs.map { it.ref }, + txAttachments.map { it.id }, + outputs, + emptyList(), + inputNotary, + signers, + TransactionType.NotaryChange(), + null + ) + val newOutputStateAndRefs = outputs.mapIndexed { i, state -> + StateAndRef(state, StateRef(newTransaction.id, i)) + } + val availableOutputsMinusConsumed = HashMap(availableOutputs) + availableOutputsMinusConsumed[inputNotary] = inputsToChooseFrom - inputs + val newAvailableOutputs = availableOutputsMinusConsumed + newOutputStateAndRefs.groupBy { it.state.notary } + val newAttachments = attachments + txAttachments + val newIdentities = identities + newNotaries + val newLedger = GeneratedLedger(transactions + newTransaction, newAvailableOutputs, newAttachments, newIdentities) + Pair(newTransaction, newLedger) + } + } + } + + /** + * Generates a valid transaction. It may be one of three types of issuance, regular and notary change. These have + * different invariants on notary fields. + */ + val transactionGenerator: Generator> by lazy { + if (availableOutputs.isEmpty()) { + issuanceGenerator + } else { + Generator.pickOne(availableOutputs.keys.toList()).bind { inputNotary -> + val inputsToChooseFrom = availableOutputs[inputNotary]!! + Generator.frequency( + 0.3 to issuanceGenerator, + 0.4 to regularTransactionGenerator(inputNotary, inputsToChooseFrom), + 0.3 to notaryChangeTransactionGenerator(inputNotary, inputsToChooseFrom) + ) + } + } + } +} + +data class GeneratedState( + val nonce: Long, + override val participants: List +) : ContractState { + override val contract = DummyContract() +} + +class GeneratedAttachment( + val bytes: ByteArray +) : Attachment { + override val id = bytes.sha256() + override fun open() = ByteArrayInputStream(bytes) +} + +class GeneratedCommandData( + val nonce: Long +) : CommandData + +val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) } +val publicKeyGenerator = keyPairGenerator.map { it.public.composite } +val stateGenerator: Generator = + Generator.replicatePoisson(2.0, publicKeyGenerator).combine(Generator.long()) { participants, nonce -> + GeneratedState(nonce, participants) + } + +fun commandGenerator(partiesToPickFrom: Collection): Generator> { + return pickOneOrMaybeNew(partiesToPickFrom, partyGenerator).combine(Generator.long()) { signer, nonce -> + Pair( + Command(GeneratedCommandData(nonce), signer.owningKey), + signer + ) + } +} +val partyGenerator: Generator = Generator.int().combine(publicKeyGenerator) { n, key -> Party("Party$n", key) } + +fun pickOneOrMaybeNew(from: Collection, generator: Generator): Generator { + if (from.isEmpty()) { + return generator + } else { + return generator.bind { + Generator.pickOne(from + it) + } + } +} + +val attachmentGenerator: Generator = Generator.bytes(16).map(::GeneratedAttachment) +val outputsGenerator = Generator.replicatePoisson(3.0, stateGenerator) diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt new file mode 100644 index 0000000000..5c73c848ca --- /dev/null +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt @@ -0,0 +1,299 @@ +package net.corda.verifier + +import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.ListeningScheduledExecutorService +import com.google.common.util.concurrent.SettableFuture +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import net.corda.core.div +import net.corda.core.map +import net.corda.core.random63BitValue +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.utilities.loggerFor +import net.corda.node.driver.* +import net.corda.node.services.config.configureDevKeyAndTrustStores +import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.VerifierApi +import net.corda.nodeapi.config.SSLConfiguration +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.core.config.Configuration +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory +import org.apache.activemq.artemis.core.security.CheckType +import org.apache.activemq.artemis.core.security.Role +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager +import java.nio.file.Path +import java.nio.file.Paths +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +/** + * This file defines an extension to [DriverDSL] that allows starting of verifier processes and + * lightweight verification requestors. + */ +interface VerifierExposedDSLInterface : DriverDSLExposedInterface { + /** Starts a lightweight verification requestor that implements the Node's Verifier API */ + fun startVerificationRequestor(name: String): ListenableFuture + /** Starts an out of process verifier connected to [address] */ + fun startVerifier(address: HostAndPort): ListenableFuture + /** + * Waits until [number] verifiers are listening for verification requests coming from the Node. Check + * [VerificationRequestorHandle.waitUntilNumberOfVerifiers] for an equivalent for requestors. + */ + fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) +} +/** Starts a verifier connecting to the specified node */ +fun VerifierExposedDSLInterface.startVerifier(nodeHandle: NodeHandle) = + startVerifier(nodeHandle.configuration.p2pAddress) +/** Starts a verifier connecting to the specified requestor */ +fun VerifierExposedDSLInterface.startVerifier(verificationRequestorHandle: VerificationRequestorHandle) = + startVerifier(verificationRequestorHandle.p2pAddress) + +interface VerifierInternalDSLInterface : DriverDSLInternalInterface, VerifierExposedDSLInterface + +/** + * Behaves the same as [driver] and adds verifier-related functionality. + */ +fun verifierDriver( + isDebug: Boolean = false, + driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()), + portAllocation: PortAllocation = PortAllocation.Incremental(10000), + sshdPortAllocation: PortAllocation = PortAllocation.Incremental(20000), + debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), + systemProperties: Map = emptyMap(), + useTestClock: Boolean = false, + automaticallyStartNetworkMap: Boolean = true, + dsl: VerifierExposedDSLInterface.() -> A +) = genericDriver( + driverDsl = VerifierDriverDSL( + DriverDSL( + portAllocation = portAllocation, + sshdPortAllocation = sshdPortAllocation, + debugPortAllocation = debugPortAllocation, + systemProperties = systemProperties, + driverDirectory = driverDirectory.toAbsolutePath(), + useTestClock = useTestClock, + automaticallyStartNetworkMap = automaticallyStartNetworkMap, + isDebug = isDebug + ) + ), + coerce = { it }, + dsl = dsl +) + +/** A handle for a verifier */ +data class VerifierHandle( + val process: Process +) + +/** A handle for the verification requestor */ +data class VerificationRequestorHandle( + val p2pAddress: HostAndPort, + private val responseAddress: SimpleString, + private val session: ClientSession, + private val requestProducer: ClientProducer, + private val addVerificationFuture: (Long, SettableFuture) -> Unit, + private val executorService: ListeningScheduledExecutorService +) { + fun verifyTransaction(transaction: LedgerTransaction): ListenableFuture { + val message = session.createMessage(false) + val verificationId = random63BitValue() + val request = VerifierApi.VerificationRequest(verificationId, transaction, responseAddress) + request.writeToClientMessage(message) + val verificationFuture = SettableFuture.create() + addVerificationFuture(verificationId, verificationFuture) + requestProducer.send(message) + return verificationFuture + } + + fun waitUntilNumberOfVerifiers(number: Int) { + poll(executorService, "$number verifiers to come online") { + if (session.queueQuery(SimpleString(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount >= number) { + Unit + } else { + null + } + }.get() + } +} + + +data class VerifierDriverDSL( + val driverDSL: DriverDSL +) : DriverDSLInternalInterface by driverDSL, VerifierInternalDSLInterface { + val verifierCount = AtomicInteger(0) + + companion object { + private val log = loggerFor() + fun createConfiguration(baseDirectory: Path, nodeHostAndPort: HostAndPort): Config { + return ConfigFactory.parseMap( + mapOf( + "baseDirectory" to baseDirectory.toString(), + "nodeHostAndPort" to nodeHostAndPort.toString() + ) + ) + } + + fun createVerificationRequestorArtemisConfig(baseDirectory: Path, responseAddress: String, hostAndPort: HostAndPort, sslConfiguration: SSLConfiguration): Configuration { + val connectionDirection = ConnectionDirection.Inbound(acceptorFactoryClassName = NettyAcceptorFactory::class.java.name) + return ConfigurationImpl().apply { + val artemisDir = "$baseDirectory/artemis" + bindingsDirectory = "$artemisDir/bindings" + journalDirectory = "$artemisDir/journal" + largeMessagesDirectory = "$artemisDir/large-messages" + acceptorConfigurations = setOf(ArtemisTcpTransport.tcpTransport(connectionDirection, hostAndPort, sslConfiguration)) + queueConfigurations = listOf( + CoreQueueConfiguration().apply { + name = VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME + address = VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME + isDurable = false + }, + CoreQueueConfiguration().apply { + name = responseAddress + address = responseAddress + isDurable = false + } + ) + } + } + } + + override fun startVerificationRequestor(name: String): ListenableFuture { + val hostAndPort = driverDSL.portAllocation.nextHostAndPort() + return driverDSL.executorService.submit { + startVerificationRequestorInternal(name, hostAndPort) + } + } + + private fun startVerificationRequestorInternal(name: String, hostAndPort: HostAndPort): VerificationRequestorHandle { + val baseDir = driverDSL.driverDirectory / name + val sslConfig = object : SSLConfiguration { + override val certificatesDirectory = baseDir / "certificates" + override val keyStorePassword: String get() = "cordacadevpass" + override val trustStorePassword: String get() = "trustpass" + } + sslConfig.configureDevKeyAndTrustStores(name) + + val responseQueueNonce = random63BitValue() + val responseAddress = "${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.$responseQueueNonce" + + val artemisConfig = createVerificationRequestorArtemisConfig(baseDir, responseAddress, hostAndPort, sslConfig) + + val securityManager = object : ActiveMQSecurityManager { + // We don't need auth, SSL is good enough + override fun validateUser(user: String?, password: String?) = true + override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?) = true + } + + val server = ActiveMQServerImpl(artemisConfig, securityManager) + log.info("Starting verification requestor Artemis server with base dir $baseDir") + server.start() + driverDSL.shutdownManager.registerShutdown(Futures.immediateFuture { + server.stop() + }) + + val locator = ActiveMQClient.createServerLocatorWithoutHA() + val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfig) + val sessionFactory = locator.createSessionFactory(transport) + val session = sessionFactory.createSession() + driverDSL.shutdownManager.registerShutdown(Futures.immediateFuture { + session.stop() + sessionFactory.close() + }) + val producer = session.createProducer(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME) + + val consumer = session.createConsumer(responseAddress) + // We demux the individual txs ourselves to avoid race when a new verifier is added + val verificationResponseFutures = ConcurrentHashMap>() + consumer.setMessageHandler { + val result = VerifierApi.VerificationResponse.fromClientMessage(it) + val resultFuture = verificationResponseFutures.remove(result.verificationId) + log.info("${verificationResponseFutures.size} verifications left") + if (resultFuture != null) { + resultFuture.set(result.exception) + } else { + log.warn("Verification requestor $name can't find tx result future with id ${result.verificationId}, possible dupe") + } + } + session.start() + return VerificationRequestorHandle( + p2pAddress = hostAndPort, + responseAddress = SimpleString(responseAddress), + session = session, + requestProducer = producer, + addVerificationFuture = { verificationNonce, future -> + verificationResponseFutures.put(verificationNonce, future) + }, + executorService = driverDSL.executorService + ) + } + + override fun startVerifier(address: HostAndPort): ListenableFuture { + log.info("Starting verifier connecting to address $address") + val id = verifierCount.andIncrement + val verifierName = "verifier$id" + val baseDirectory = driverDSL.driverDirectory / verifierName + val config = createConfiguration(baseDirectory, address) + val configFilename = "verifier.conf" + writeConfig(baseDirectory, configFilename, config) + Verifier.loadConfiguration(baseDirectory, baseDirectory / configFilename).configureDevKeyAndTrustStores(verifierName) + + val className = Verifier::class.java.name + val separator = System.getProperty("file.separator") + val classpath = System.getProperty("java.class.path") + val path = System.getProperty("java.home") + separator + "bin" + separator + "java" + val debugPortArg = if (driverDSL.isDebug) + listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${driverDSL.debugPortAllocation.nextPort()}") + else + emptyList() + + val javaArgs = + listOf(path) + debugPortArg + + listOf( + "-Xmx200m", + "-XX:+UseG1GC", + "-cp", classpath, + className, + baseDirectory.toString() + ) + val builder = ProcessBuilder(javaArgs) + builder.inheritIO() + + val processFuture = driverDSL.executorService.submit { builder.start() } + driverDSL.shutdownManager.registerProcessShutdown(processFuture) + return processFuture.map(::VerifierHandle) + } + + private fun NodeHandle.connectToNode(closure: (ClientSession) -> A): A { + val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), configuration.p2pAddress, configuration) + val locator = ActiveMQClient.createServerLocatorWithoutHA(transport) + val sessionFactory = locator.createSessionFactory() + val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, locator.ackBatchSize) + try { + return closure(session) + } finally { + session.close() + } + } + + override fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) { + connectToNode { session -> + poll(driverDSL.executorService, "$number verifiers to come online") { + if (session.queueQuery(SimpleString(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount >= number) { + Unit + } else { + null + } + }.get() + } + } +} diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierTests.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierTests.kt new file mode 100644 index 0000000000..cf16d2a7f5 --- /dev/null +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierTests.kt @@ -0,0 +1,125 @@ +package net.corda.verifier + +import com.google.common.util.concurrent.Futures +import net.corda.client.mock.generateOrFail +import net.corda.core.contracts.DOLLARS +import net.corda.core.map +import net.corda.core.messaging.startFlow +import net.corda.core.node.services.ServiceInfo +import net.corda.core.serialization.OpaqueBytes +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.WireTransaction +import net.corda.flows.CashIssueFlow +import net.corda.flows.CashPaymentFlow +import net.corda.node.services.config.VerifierType +import net.corda.node.services.transactions.ValidatingNotaryService +import org.junit.Test +import java.util.* +import java.util.concurrent.atomic.AtomicInteger + +class VerifierTests { + private fun generateTransactions(number: Int): List { + var currentLedger = GeneratedLedger.empty + val transactions = ArrayList() + val random = SplittableRandom() + for (i in 0..number - 1) { + val (tx, ledger) = currentLedger.transactionGenerator.generateOrFail(random) + transactions.add(tx) + currentLedger = ledger + } + return transactions.map { currentLedger.resolveWireTransaction(it) } + } + + @Test + fun `single verifier works with requestor`() { + verifierDriver(automaticallyStartNetworkMap = false) { + val aliceFuture = startVerificationRequestor("Alice") + val transactions = generateTransactions(100) + val alice = aliceFuture.get() + startVerifier(alice) + alice.waitUntilNumberOfVerifiers(1) + val results = Futures.allAsList(transactions.map { alice.verifyTransaction(it) }).get() + results.forEach { + if (it != null) { + throw it + } + } + } + } + + @Test + fun `multiple verifiers work with requestor`() { + verifierDriver(automaticallyStartNetworkMap = false) { + val aliceFuture = startVerificationRequestor("Alice") + val transactions = generateTransactions(100) + val alice = aliceFuture.get() + val numberOfVerifiers = 4 + for (i in 1..numberOfVerifiers) { + startVerifier(alice) + } + alice.waitUntilNumberOfVerifiers(numberOfVerifiers) + val results = Futures.allAsList(transactions.map { alice.verifyTransaction(it) }).get() + results.forEach { + if (it != null) { + throw it + } + } + } + } + + @Test + fun `verification redistributes on verifier death`() { + verifierDriver(automaticallyStartNetworkMap = false) { + val aliceFuture = startVerificationRequestor("Alice") + val numberOfTransactions = 100 + val transactions = generateTransactions(numberOfTransactions) + val alice = aliceFuture.get() + val verifier1 = startVerifier(alice) + val verifier2 = startVerifier(alice) + val verifier3 = startVerifier(alice) + alice.waitUntilNumberOfVerifiers(3) + val remainingTransactionsCount = AtomicInteger(numberOfTransactions) + val futures = transactions.map { transaction -> + val future = alice.verifyTransaction(transaction) + // Kill verifiers as results are coming in, forcing artemis to redistribute. + future.map { + val remaining = remainingTransactionsCount.decrementAndGet() + when (remaining) { + 33 -> verifier1.get().process.destroy() + 66 -> verifier2.get().process.destroy() + } + it + } + } + Futures.allAsList(futures).get() + } + } + + @Test + fun `verification request waits until verifier comes online`() { + verifierDriver(automaticallyStartNetworkMap = false) { + val aliceFuture = startVerificationRequestor("Alice") + val transactions = generateTransactions(100) + val alice = aliceFuture.get() + val futures = transactions.map { alice.verifyTransaction(it) } + startVerifier(alice) + Futures.allAsList(futures).get() + } + } + + @Test + fun `single verifier works with a node`() { + verifierDriver { + val aliceFuture = startNode("Alice") + val notaryFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)), verifierType = VerifierType.OutOfProcess) + val alice = aliceFuture.get() + val notary = notaryFuture.get() + startVerifier(notary) + alice.rpc.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, notaryFuture.get().nodeInfo.notaryIdentity).returnValue.get() + notary.waitUntilNumberOfVerifiers(1) + for (i in 1..10) { + alice.rpc.startFlow(::CashPaymentFlow, 10.DOLLARS, alice.nodeInfo.legalIdentity).returnValue.get() + } + } + } +} \ No newline at end of file diff --git a/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt b/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt new file mode 100644 index 0000000000..822c506289 --- /dev/null +++ b/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt @@ -0,0 +1,81 @@ +package net.corda.verifier + +import com.google.common.net.HostAndPort +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import net.corda.core.ErrorOr +import net.corda.core.div +import net.corda.core.utilities.debug +import net.corda.core.utilities.loggerFor +import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.VerifierApi +import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME +import net.corda.nodeapi.config.SSLConfiguration +import net.corda.nodeapi.config.getValue +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import java.nio.file.Path +import java.nio.file.Paths + +data class VerifierConfiguration( + val baseDirectory: Path, + val config: Config +) : SSLConfiguration { + val nodeHostAndPort: HostAndPort by config + override val keyStorePassword: String by config + override val trustStorePassword: String by config + override val certificatesDirectory = baseDirectory / "certificates" +} + +class Verifier { + companion object { + private val log = loggerFor() + + fun loadConfiguration(baseDirectory: Path, configPath: Path): VerifierConfiguration { + val defaultConfig = ConfigFactory.parseResources("verifier-reference.conf", ConfigParseOptions.defaults().setAllowMissing(false)) + val customConfig = ConfigFactory.parseFile(configPath.toFile(), ConfigParseOptions.defaults().setAllowMissing(false)) + val resolvedConfig = customConfig.withFallback(defaultConfig).resolve() + return VerifierConfiguration(baseDirectory, resolvedConfig) + } + + @JvmStatic + fun main(args: Array) { + require(args.isNotEmpty()) { "Usage: BASE_DIR_CONTAINING_VERIFIER_CONF" } + val baseDirectory = Paths.get(args[0]) + val verifierConfig = loadConfiguration(baseDirectory, baseDirectory / "verifier.conf") + val locator = ActiveMQClient.createServerLocatorWithHA( + tcpTransport(ConnectionDirection.Outbound(), verifierConfig.nodeHostAndPort, verifierConfig) + ) + val sessionFactory = locator.createSessionFactory() + val session = sessionFactory.createSession( + VerifierApi.VERIFIER_USERNAME, VerifierApi.VERIFIER_USERNAME, false, true, true, locator.isPreAcknowledge, locator.ackBatchSize + ) + Runtime.getRuntime().addShutdownHook(Thread { + log.info("Shutting down") + session.close() + sessionFactory.close() + }) + val consumer = session.createConsumer(VERIFICATION_REQUESTS_QUEUE_NAME) + val replyProducer = session.createProducer() + consumer.setMessageHandler { + val request = VerifierApi.VerificationRequest.fromClientMessage(it) + log.debug { "Received verification request with id ${request.verificationId}" } + val result = ErrorOr.catch { + request.transaction.verify() + } + if (result.error != null) { + log.debug { "Verification returned with error ${result.error}" } + } + val reply = session.createMessage(false) + val response = VerifierApi.VerificationResponse(request.verificationId, result.error) + response.writeToClientMessage(reply) + replyProducer.send(request.responseAddress, reply) + it.acknowledge() + } + session.start() + log.info("Verifier started") + Thread.sleep(Long.MAX_VALUE) + } + } +} \ No newline at end of file diff --git a/verifier/src/main/resources/verifier-reference.conf b/verifier/src/main/resources/verifier-reference.conf new file mode 100644 index 0000000000..381f1631c9 --- /dev/null +++ b/verifier/src/main/resources/verifier-reference.conf @@ -0,0 +1,3 @@ +# nodeHostAndPort = "localhost:12345" +keyStorePassword = "cordacadevpass" +trustStorePassword = "trustpass" \ No newline at end of file