From 7ead9e5698972cb7b058a7d808380e6f4aa1cc88 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 29 Aug 2018 13:45:27 +0100 Subject: [PATCH] ENT-2293 & ENT-2353: Basic implementation of an RPC worker and Flow Worker (#1308) --- experimental/flow-worker/build.gradle | 13 +- .../net/corda/flowworker/FlowWorkerTest.kt | 316 ++++++++++----- .../resources/test-config.conf | 47 --- .../kotlin/net/corda/flowworker/FlowWorker.kt | 162 +++++++- .../corda/flowworker/FlowWorkerServiceHub.kt | 292 +++++++++----- experimental/rpc-worker/build.gradle | 4 +- .../corda/rpcWorker/RpcFlowWorkerDriver.kt | 196 +++++++++ .../net/corda/rpcWorker/RpcWorkerTest.kt | 136 ++----- .../net/corda/rpcWorker/CordaRpcWorkerOps.kt | 376 ++++++++++++++++++ .../kotlin/net/corda/rpcWorker/RpcWorker.kt | 296 ++++---------- .../net/corda/rpcWorker/RpcWorkerClient.kt | 23 -- .../corda/rpcWorker/RpcWorkerServiceHub.kt | 263 ++++++++++++ .../net/corda/rpcWorker/SimplisticRpcOps.kt | 29 -- .../src/main/resources/reference.conf | 18 - .../node/services/vault/NodeVaultService.kt | 3 +- .../testing/node/internal/DriverDSLImpl.kt | 6 +- 16 files changed, 1542 insertions(+), 638 deletions(-) delete mode 100644 experimental/flow-worker/src/integration-test/resources/test-config.conf create mode 100644 experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt create mode 100644 experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt delete mode 100644 experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerClient.kt create mode 100644 experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerServiceHub.kt delete mode 100644 experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/SimplisticRpcOps.kt delete mode 100644 experimental/rpc-worker/src/main/resources/reference.conf diff --git a/experimental/flow-worker/build.gradle b/experimental/flow-worker/build.gradle index ded92e0175..d3ae3a495d 100644 --- a/experimental/flow-worker/build.gradle +++ b/experimental/flow-worker/build.gradle @@ -1,5 +1,15 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + apply plugin: 'kotlin' -apply plugin: 'java' +apply plugin: 'net.corda.plugins.quasar-utils' description 'Corda Flow Worker' @@ -30,6 +40,7 @@ dependencies { testCompile "junit:junit:$junit_version" testCompile(project(':node-driver')) + integrationTestCompile(project(':bridge')) } task integrationTest(type: Test) { diff --git a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt index 620804813c..61ce2c8c58 100644 --- a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt +++ b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt @@ -1,58 +1,77 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + package net.corda.flowworker -import co.paralleluniverse.fibers.Suspendable -import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigParseOptions -import net.corda.core.concurrent.CordaFuture +import net.corda.confidential.SwapIdentitiesFlow import net.corda.core.context.InvocationContext +import net.corda.core.context.Trace import net.corda.core.crypto.Crypto.generateKeyPair -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowSession +import net.corda.core.crypto.toStringShort import net.corda.core.identity.Party -import net.corda.core.internal.FlowStateMachine -import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.createDirectories +import net.corda.core.internal.div import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.finance.DOLLARS +import net.corda.finance.USD import net.corda.finance.contracts.getCashBalances -import net.corda.finance.flows.AbstractCashFlow import net.corda.finance.flows.CashIssueFlow -import net.corda.node.internal.InitiatedFlowFactory -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.NodeConfigurationImpl -import net.corda.node.services.config.parseAsNodeConfiguration -import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.messaging.P2PMessagingClient -import net.corda.node.services.statemachine.ExternalEvent -import net.corda.node.services.statemachine.InitialSessionMessage -import net.corda.node.services.statemachine.SessionId -import net.corda.testing.core.DUMMY_BANK_A_NAME -import net.corda.testing.core.DUMMY_BANK_B_NAME -import net.corda.testing.core.DUMMY_NOTARY_NAME -import net.corda.testing.core.getTestPartyAndCertificate -import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties -import net.corda.testing.node.internal.NodeBasedTest +import net.corda.node.internal.artemis.ArtemisBroker +import net.corda.node.services.config.* +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.bridging.BridgeControlListener +import net.corda.nodeapi.internal.crypto.X509Utilities +import net.corda.testing.core.* +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.PortAllocation +import net.corda.testing.node.MockServices import net.corda.testing.node.internal.TestCordappDirectories import net.corda.testing.node.internal.cordappsForPackages -import net.corda.testing.node.internal.getCallerPackage -import org.apache.activemq.artemis.api.core.Message -import org.apache.activemq.artemis.api.core.SimpleString -import org.junit.Before +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession import org.junit.Rule import org.junit.Test -import org.junit.rules.TemporaryFolder +import java.nio.file.Paths +import java.security.KeyPair +import java.security.cert.X509Certificate import java.time.Instant +import java.util.* +import kotlin.test.assertEquals class FlowWorkerTest { @Rule @JvmField - val temporaryFolder = TemporaryFolder() + val serializationEnvironment = SerializationEnvironmentRule(true) + + private val portAllocation = PortAllocation.Incremental(10000) + + private val networkParameters = NetworkParameters( + minimumPlatformVersion = 1, + notaries = listOf(), + modifiedTime = Instant.now(), + maxMessageSize = MAX_MESSAGE_SIZE, + maxTransactionSize = 4000000, + epoch = 1, + whitelistedContractImplementations = emptyMap() + ) private val bankAKeyPair = generateKeyPair() private val bankBKeyPair = generateKeyPair() @@ -64,85 +83,182 @@ class FlowWorkerTest { private val bankBPartyAndCertificate = getTestPartyAndCertificate(bankB) private val notaryPartyAndCertificate = getTestPartyAndCertificate(notary) - private val cordappPackages = listOf("net.corda.finance") - private val cordapps = cordappsForPackages(getCallerPackage(NodeBasedTest::class)?.let { cordappPackages + it } - ?: cordappPackages) + private val bankAInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 1111)), listOf(bankAPartyAndCertificate), 1, 1) + private val bankBInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 1112)), listOf(bankBPartyAndCertificate), 1, 1) - private lateinit var configuration: NodeConfiguration - - @Before - fun setup() { - val testConfig = ConfigFactory.parseResources("test-config.conf", ConfigParseOptions.defaults().setAllowMissing(false)).parseAsNodeConfiguration() as NodeConfigurationImpl - configuration = testConfig.copy(baseDirectory = temporaryFolder.root.toPath(), dataSourceProperties = makeTestDataSourceProperties(), cordappDirectories = TestCordappDirectories.cached(cordapps).toList()) - } - - private val myInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 3334)), listOf(bankAPartyAndCertificate), 1, 1) - private val networkParameters = NetworkParameters( - minimumPlatformVersion = 1, - notaries = listOf(), - modifiedTime = Instant.now(), - maxMessageSize = 10485760, - maxTransactionSize = 4000000, - epoch = 1, - whitelistedContractImplementations = emptyMap() - ) - - @Test - fun `send message`() { - val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair) - val flowWorker = FlowWorker(flowWorkerServiceHub) - flowWorker.start() - - flowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", 3333)), listOf(bankBPartyAndCertificate), 1, 1)) - flowWorkerServiceHub.flowFactories[SomeFlowLogic::class.java] = InitiatedFlowFactory.Core { flowSession -> SomeFlowLogic(flowSession) } - - val cordaMessage = flowWorkerServiceHub.networkService.createMessage("platform.session", data = ByteSequence.of(InitialSessionMessage(SessionId(1), 1, SomeFlowLogic::class.java.name, 1, "", "test".serialize()).serialize().bytes).bytes) - val artemisMessage = (flowWorkerServiceHub.networkService as P2PMessagingClient).messagingExecutor!!.cordaToArtemisMessage(cordaMessage) - artemisMessage!!.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString(DUMMY_BANK_B_NAME.toString())) - (flowWorkerServiceHub.networkService as P2PMessagingClient).deliver(artemisMessage) - - flowWorker.stop() - } + private val cordappDirectories = TestCordappDirectories.cached(cordappsForPackages(listOf("net.corda.finance"))).toList() @Test fun `cash issue`() { - val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair) - val flowWorker = FlowWorker(flowWorkerServiceHub) - flowWorker.start() + val baseDirectory = DriverParameters().driverDirectory + val nodeDirectory = baseDirectory / DUMMY_BANK_A_NAME.organisation / "flowWorker" + nodeDirectory.createDirectories() + val brokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort()) - flowWorkerServiceHub.database.transaction { - flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate) - } + val config = genericConfig().copy(myLegalName = DUMMY_BANK_A_NAME, baseDirectory = nodeDirectory, + messagingServerAddress = brokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(), + cordappDirectories = cordappDirectories) + // create test certificates + config.configureWithDevSSLCertificate() - val startFlowEventCashIssue = object : ExternalEvent.ExternalStartFlowEvent, DeduplicationHandler { - override val deduplicationHandler = this - override fun insideDatabaseTransaction() {} - override fun afterDatabaseTransaction() {} - override val externalCause = this - override val flowLogic = CashIssueFlow(10.DOLLARS, OpaqueBytes.of(0x01), notary) - override val context = InvocationContext.service("bla", DUMMY_BANK_A_NAME) - private val _future = openFuture>() - override fun wireUpFuture(flowFuture: CordaFuture>) { - _future.captureLater(flowFuture) + val trustRoot = config.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA) + val nodeCa = config.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) + + val broker = createFlowWorkerBroker(config, networkParameters.maxMessageSize) + val bridgeControlListener = createBridgeControlListener(config, networkParameters.maxMessageSize) + + val flowWorkerRequestQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${bankAKeyPair.public.toStringShort()}" + val flowWorkerReplyQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}reply" + + val (session, consumer, producer) = createArtemisClient(config, flowWorkerReplyQueueAddress) + + val (flowWorker, flowWorkerServiceHub) = createFlowWorker(config, bankAInfo, networkParameters, bankAKeyPair, trustRoot, nodeCa) + try { + flowWorkerServiceHub.database.transaction { + flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate) } - override val future: CordaFuture> - get() = _future - } - val result = flowWorker.startFlow(startFlowEventCashIssue) - println(result.getOrThrow().resultFuture.getOrThrow()) - println("Cash " + flowWorkerServiceHub.getCashBalances()) + val traceId = Trace.InvocationId.newInstance() + val startFlowMessage = StartFlow(DUMMY_BANK_A_NAME, CashIssueFlow::class.java, arrayOf(10.DOLLARS, OpaqueBytes.of(0x01), notary), + InvocationContext.service("bla", DUMMY_BANK_A_NAME), flowWorkerReplyQueueAddress, traceId) + val message = session.createMessage(true) + message.writeBodyBufferBytes(startFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes) - flowWorker.stop() + producer.send(flowWorkerRequestQueueAddress, message) + + val flowReplyStateMachineRunId = receiveFlowWorkerMessage(consumer) + println(flowReplyStateMachineRunId) + + val flowReplyResult = receiveFlowWorkerMessage(consumer) + assertEquals(traceId, flowReplyResult.replyId) + println(flowReplyResult) + + val cashBalance = flowWorkerServiceHub.getCashBalances() + assertEquals(10.DOLLARS, cashBalance[USD]) + println("Cash: $cashBalance") + } finally { + flowWorker.stop() + bridgeControlListener.stop() + broker.stop() + } } -} + @Test + fun `swap identities`() { + val baseDirectory = DriverParameters().driverDirectory -private class SomeFlowLogic(private val session: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - println("FLOW START") - session.send("FLOW SEND A MESSAGE") - println("FLOW END") + val bankANodeDirectory = baseDirectory / DUMMY_BANK_A_NAME.organisation / "flowWorker" + bankANodeDirectory.createDirectories() + val bankAbrokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort()) + val bankAConfig = genericConfig().copy(myLegalName = DUMMY_BANK_A_NAME, baseDirectory = bankANodeDirectory, + messagingServerAddress = bankAbrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(), + cordappDirectories = cordappDirectories) + // create test certificates + bankAConfig.configureWithDevSSLCertificate() + + val bankATrustRoot = bankAConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA) + val bankANodeCa = bankAConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) + + val bankABroker = createFlowWorkerBroker(bankAConfig, networkParameters.maxMessageSize) + val bankABridgeControlListener = createBridgeControlListener(bankAConfig, networkParameters.maxMessageSize) + val (bankAFlowWorker, bankAFlowWorkerServiceHub) = createFlowWorker(bankAConfig, bankAInfo, networkParameters, bankAKeyPair, bankATrustRoot, bankANodeCa) + + val bankARequestQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${bankAKeyPair.public.toStringShort()}" + val bankAReplyQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}reply" + val (bankASession, bankAConsumer, bankAProducer) = createArtemisClient(bankAConfig, bankAReplyQueueAddress) + + val bankBNodeDirectory = baseDirectory / DUMMY_BANK_B_NAME.organisation / "flowWorker" + bankBNodeDirectory.createDirectories() + val bankBbrokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort()) + val bankBConfig = genericConfig().copy(myLegalName = DUMMY_BANK_B_NAME, baseDirectory = bankBNodeDirectory, + messagingServerAddress = bankBbrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(), + cordappDirectories = cordappDirectories) + // create test certificates + bankBConfig.configureWithDevSSLCertificate() + + val bankBTrustRoot = bankBConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA) + val bankBNodeCa = bankBConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) + // NetworkParametersCopier(networkParameters).install(bankBConfig.baseDirectory) + + val bankBBroker = createFlowWorkerBroker(bankBConfig, networkParameters.maxMessageSize) + val bankBBridgeControlListener = createBridgeControlListener(bankBConfig, networkParameters.maxMessageSize) + val (bankBFlowWorker, bankBFlowWorkerServiceHub) = createFlowWorker(bankBConfig, bankBInfo, networkParameters, bankBKeyPair, bankBTrustRoot, bankBNodeCa) + + try { + bankAFlowWorkerServiceHub.database.transaction { + bankAFlowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate) + + bankAFlowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", bankBConfig.messagingServerAddress!!.port)), listOf(bankBPartyAndCertificate), 1, 1)) + } + + bankBFlowWorkerServiceHub.database.transaction { + bankBFlowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate) + + bankBFlowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", bankAConfig.messagingServerAddress!!.port)), listOf(bankAPartyAndCertificate), 1, 1)) + } + + val swapIdentitiesTraceId = Trace.InvocationId.newInstance() + val swapIdentitiesStartFlowMessage = StartFlow(DUMMY_BANK_A_NAME, SwapIdentitiesFlow::class.java, arrayOf(bankB), + InvocationContext.service("bla", DUMMY_BANK_A_NAME), bankAReplyQueueAddress, swapIdentitiesTraceId) + val swapIdentitiesMessage = bankASession.createMessage(true) + swapIdentitiesMessage.writeBodyBufferBytes(swapIdentitiesStartFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes) + + bankAProducer.send(bankARequestQueueAddress, swapIdentitiesMessage) + + val swapIdentitiesStateMachineRunId = receiveFlowWorkerMessage(bankAConsumer) + println(swapIdentitiesStateMachineRunId) + + val swapIdentitiesResult = receiveFlowWorkerMessage(bankAConsumer) + assertEquals(swapIdentitiesTraceId, swapIdentitiesResult.replyId) + println(swapIdentitiesResult) + } finally { + bankAFlowWorker.stop() + bankBFlowWorker.stop() + bankABridgeControlListener.stop() + bankBBridgeControlListener.stop() + bankABroker.stop() + bankBBroker.stop() + } + } + + private fun genericConfig(): NodeConfigurationImpl { + return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = DUMMY_BANK_A_NAME, emailAddress = "", + keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(), + rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0), + p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null), + relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)), + notary = null) + } + + private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker { + val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize) + broker.start() + return broker + } + + private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair { + val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, networkParameters, ourKeyPair, trustRoot, nodeCa) + val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub) + flowWorker.start() + return Pair(flowWorker, flowWorkerServiceHub) + } + + private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener { + val bridgeControlListener = BridgeControlListener(config, config.messagingServerAddress!!, maxMessageSize) + bridgeControlListener.start() + return bridgeControlListener + } + + private fun createArtemisClient(config: NodeConfiguration, queueAddress: String): Triple { + val artemisClient = ArtemisMessagingClient(config, config.messagingServerAddress!!, MAX_MESSAGE_SIZE) + val started = artemisClient.start() + started.session.createQueue(queueAddress, RoutingType.ANYCAST, queueAddress, true) + return Triple(started.session, started.session.createConsumer(queueAddress), started.session.createProducer()) + } + + private inline fun receiveFlowWorkerMessage(consumer: ClientConsumer): T { + val message = consumer.receive() + val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } + return data.deserialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT) } } \ No newline at end of file diff --git a/experimental/flow-worker/src/integration-test/resources/test-config.conf b/experimental/flow-worker/src/integration-test/resources/test-config.conf deleted file mode 100644 index ce17dcbab7..0000000000 --- a/experimental/flow-worker/src/integration-test/resources/test-config.conf +++ /dev/null @@ -1,47 +0,0 @@ -baseDirectory = "" -myLegalName = "O=Bank A, L=London, C=GB" -emailAddress = "" -keyStorePassword = "cordacadevpass" -trustStorePassword = "trustpass" -dataSourceProperties = { - dataSourceClassName = org.h2.jdbcx.JdbcDataSource - dataSource.url = "jdbc:h2:file:blah" - dataSource.user = "sa" - dataSource.password = "" -} -verifierType = InMemory -p2pAddress = "localhost:3334" -flowTimeout { - timeout = 30 seconds - maxRestartCount = 3 - backoffBase = 2.0 -} -devMode = true -crlCheckSoftFail = true -database = { - transactionIsolationLevel = "REPEATABLE_READ" - exportHibernateJMXStatistics = "false" -} -h2port = 0 -useTestClock = false -rpcSettings = { - address = "locahost:3418" - adminAddress = "localhost:3419" - useSsl = false - standAloneBroker = false -} -enterpriseConfiguration = { - mutualExclusionConfiguration = { - on = false - updateInterval = 20000 - waitInterval = 40000 - } - tuning = { - flowThreadPoolSize = 1 - rpcThreadPoolSize = 4 - maximumMessagingBatchSize = 256 - p2pConfirmationWindowSize = 1048576 - brokerConnectionTtlCheckIntervalMs = 20 - } - useMultiThreadedSMM = true -} \ No newline at end of file diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt index fc1cbca11c..63ed8e2a71 100644 --- a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt @@ -1,24 +1,168 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + package net.corda.flowworker import net.corda.core.concurrent.CordaFuture +import net.corda.core.context.InvocationContext +import net.corda.core.context.Trace +import net.corda.core.crypto.toStringShort +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.uncheckedCast +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.statemachine.ExternalEvent +import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import java.util.* +import kotlin.concurrent.thread -class FlowWorker(private val flowWorkerServiceHub: FlowWorkerServiceHub) { +class FlowWorker(flowWorkerId: String, private val flowWorkerServiceHub: FlowWorkerServiceHub) { + + companion object { + const val FLOW_WORKER_QUEUE_ADDRESS_PREFIX = "${ArtemisMessagingComponent.INTERNAL_PREFIX}flow.worker." + } + + private val queueAddress = "$FLOW_WORKER_QUEUE_ADDRESS_PREFIX${flowWorkerServiceHub.myInfo.legalIdentities[0].owningKey.toStringShort()}" + private val queueName = "$queueAddress.$flowWorkerId" + + private val runOnStop = ArrayList<() -> Any?>() fun start() { flowWorkerServiceHub.start() + runOnStop += { flowWorkerServiceHub.stop() } + + val flowWorkerMessagingClient = ArtemisMessagingClient(flowWorkerServiceHub.configuration, flowWorkerServiceHub.configuration.messagingServerAddress!!, flowWorkerServiceHub.networkParameters.maxMessageSize) + runOnStop += { flowWorkerMessagingClient.stop() } + + val session = flowWorkerMessagingClient.start().session + + val queueQuery = session.queueQuery(SimpleString(queueName)) + if (!queueQuery.isExists) { + session.createQueue(queueAddress, RoutingType.ANYCAST, queueName, true) + } + + val consumer = session.createConsumer(queueName) + val producer = session.createProducer() + + consumer.setMessageHandler { message -> handleFlowWorkerMessage(message, session, producer) } + + thread { + (flowWorkerServiceHub.networkService as P2PMessagingClient).run() + } + } + + private fun handleFlowWorkerMessage(message: ClientMessage, session: ClientSession, producer: ClientProducer) { + val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } + val flowWorkerMessage = data.deserialize(context = SerializationDefaults.RPC_SERVER_CONTEXT) + + when (flowWorkerMessage) { + is StartFlow -> handleStartFlowMessage(flowWorkerMessage, session, producer) + is NetworkMapUpdate -> handleNetworkMapUpdateMessage(flowWorkerMessage) + } + } + + private fun handleStartFlowMessage(startFlowMessage: StartFlow, session: ClientSession, producer: ClientProducer) { + val logicRef = flowWorkerServiceHub.flowLogicRefFactory.createForRPC(startFlowMessage.logicType, *startFlowMessage.args) + val logic: FlowLogic<*> = uncheckedCast(flowWorkerServiceHub.flowLogicRefFactory.toFlowLogic(logicRef)) + val result = startFlow(logic, startFlowMessage.context).get() + + val stateMachineRunIdMessage = session.createMessage(true) + stateMachineRunIdMessage.writeBodyBufferBytes(FlowReplyStateMachineRunId(flowWorkerServiceHub.myInfo.legalIdentities.first().name, startFlowMessage.replyId, result.id).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes) + producer.send(startFlowMessage.clientAddress, stateMachineRunIdMessage) + + result.resultFuture.then { + val resultMessage = session.createMessage(true) + resultMessage.writeBodyBufferBytes(FlowReplyResult(flowWorkerServiceHub.myInfo.legalIdentities.first().name, startFlowMessage.replyId, it.get()).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes) + producer.send(startFlowMessage.clientAddress, resultMessage) + } + } + + private fun handleNetworkMapUpdateMessage(networkMapUpdateMessage: NetworkMapUpdate) { + val mapChange = networkMapUpdateMessage.mapChange + // TODO remove + if (mapChange is NetworkMapCache.MapChange.Added) { + flowWorkerServiceHub.networkMapCache.addNode(mapChange.node) + mapChange.node.legalIdentitiesAndCerts.forEach { + try { + flowWorkerServiceHub.identityService.verifyAndRegisterIdentity(it) + } catch (ignore: Exception) { + // Log a warning to indicate node info is not added to the network map cache. + // NetworkMapCacheImpl.logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.") + } + } + } } fun stop() { - flowWorkerServiceHub.stop() - } - - fun startFlow(event: ExternalEvent.ExternalStartFlowEvent): CordaFuture> { - flowWorkerServiceHub.database.transaction { - flowWorkerServiceHub.smm.deliverExternalEvent(event) + for (toRun in runOnStop.reversed()) { + toRun() } - return event.future + runOnStop.clear() } -} \ No newline at end of file + private fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> { + val startFlowEvent = object : ExternalEvent.ExternalStartFlowEvent, DeduplicationHandler { + override fun insideDatabaseTransaction() {} + + override fun afterDatabaseTransaction() {} + + override val externalCause: ExternalEvent + get() = this + override val deduplicationHandler: DeduplicationHandler + get() = this + + override val flowLogic: FlowLogic + get() = logic + override val context: InvocationContext + get() = context + + override fun wireUpFuture(flowFuture: CordaFuture>) { + _future.captureLater(flowFuture) + } + + private val _future = openFuture>() + override val future: CordaFuture> + get() = _future + + } + flowWorkerServiceHub.database.transaction { + flowWorkerServiceHub.smm.deliverExternalEvent(startFlowEvent) + } + return startFlowEvent.future + } +} + +@CordaSerializable +sealed class FlowWorkerMessage() { + abstract val legalName: CordaX500Name +} + +data class StartFlow(override val legalName: CordaX500Name, val logicType: Class>, val args: Array, val context: InvocationContext, val clientAddress: String, val replyId: Trace.InvocationId) : FlowWorkerMessage() + +data class FlowReplyStateMachineRunId(override val legalName: CordaX500Name, val replyId: Trace.InvocationId, val id: StateMachineRunId) : FlowWorkerMessage() + +data class FlowReplyResult(override val legalName: CordaX500Name, val replyId: Trace.InvocationId, val result: Any?) : FlowWorkerMessage() + +data class NetworkMapUpdate(override val legalName: CordaX500Name, val mapChange: NetworkMapCache.MapChange) : FlowWorkerMessage() \ No newline at end of file diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt index e10ed86091..86ad2842d9 100644 --- a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt @@ -1,15 +1,31 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + package net.corda.flowworker import com.codahale.metrics.MetricRegistry import com.google.common.collect.MutableClassToInstanceMap +import com.google.common.util.concurrent.MoreExecutors import com.jcabi.manifests.Manifests import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.confidential.SwapIdentitiesFlow +import net.corda.confidential.SwapIdentitiesHandler import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.contracts.TransactionState import net.corda.core.crypto.newSecureRandom -import net.corda.core.flows.FlowLogic +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.uncheckedCast import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.node.services.CordaService @@ -18,73 +34,89 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.effectiveSerializationEnv import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.node.CordaClock import net.corda.node.SimpleClock import net.corda.node.VersionInfo -import net.corda.node.cordapp.CordappLoader import net.corda.node.internal.* +import net.corda.node.internal.classloading.requireAnnotation import net.corda.node.internal.cordapp.CordappConfigFileProvider import net.corda.node.internal.cordapp.CordappProviderImpl import net.corda.node.internal.cordapp.JarScanningCordappLoader import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT import net.corda.node.serialization.kryo.KryoServerSerializationScheme +import net.corda.node.services.ContractUpgradeHandler +import net.corda.node.services.FinalityHandler +import net.corda.node.services.NotaryChangeHandler import net.corda.node.services.api.DummyAuditService import net.corda.node.services.api.MonitoringService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.configureWithDevSSLCertificate -import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService -import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.P2PMessagingClient -import net.corda.node.services.network.* +import net.corda.node.services.network.NetworkMapCacheImpl +import net.corda.node.services.network.NetworkMapUpdater +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.persistence.* import net.corda.node.services.schema.NodeSchemaService -import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor -import net.corda.node.services.statemachine.MultiThreadedStateMachineManager +import net.corda.node.services.statemachine.* import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.AffinityExecutor -import net.corda.nodeapi.internal.DEV_ROOT_CA -import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.isH2Database import net.corda.serialization.internal.* import org.apache.activemq.artemis.utils.ReusableLatch -import rx.schedulers.Schedulers +import org.slf4j.Logger +import rx.Observable import java.security.KeyPair +import java.security.cert.X509Certificate import java.sql.Connection import java.time.Clock -import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.reflect.KClass -class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair) : ServiceHubInternal, SingletonSerializeAsToken() { +class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair, private val trustRoot: X509Certificate, private val nodeCa: X509Certificate) : ServiceHubInternal, SingletonSerializeAsToken() { + + override val clock: CordaClock = SimpleClock(Clock.systemUTC()) + private val versionInfo = getVersionInfo() + private val cordappLoader = JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo) + private val sameVmNodeCounter = AtomicInteger() + private val serverThread = AffinityExecutor.ServiceAffinityExecutor("FlowWorker thread-${sameVmNodeCounter.incrementAndGet()}", 1) + private val busyNodeLatch = ReusableLatch() + + private val log: Logger get() = staticLog companion object { - @JvmStatic - private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader { - return JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo) - } + private val staticLog = contextLogger() } - private val versionInfo = getVersionInfo() - override val clock: CordaClock = SimpleClock(Clock.systemUTC()) - - private val runOnStop = ArrayList<() -> Any?>() - - val cordappLoader = makeCordappLoader(configuration, versionInfo) - @Suppress("LeakingThis") private var tokenizableServices: MutableList? = mutableListOf(clock, this) + private val runOnStop = ArrayList<() -> Any?>() - override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null).tokenize() + init { + (serverThread as? ExecutorService)?.let { + runOnStop += { + // We wait here, even though any in-flight messages should have been drained away because the + // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is + // arbitrary and might be inappropriate. + MoreExecutors.shutdownAndAwaitTermination(it, 50, TimeUnit.SECONDS) + } + } + } + + override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, false).tokenize() override val identityService = PersistentIdentityService().tokenize() override val database: CordaPersistence = createCordaPersistence( configuration.database, @@ -103,7 +135,6 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri private val checkpointStorage = DBCheckpointStorage() @Suppress("LeakingThis") override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database).tokenize() - private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) } private val metricRegistry = MetricRegistry() override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize() override val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize() @@ -113,19 +144,14 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri @Suppress("LeakingThis") override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService).tokenize() override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database) + val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) override val monitoringService = MonitoringService(metricRegistry).tokenize() - override val networkMapUpdater = NetworkMapUpdater( - networkMapCache, - NodeInfoWatcher( - configuration.baseDirectory, - @Suppress("LeakingThis") - Schedulers.io(), - Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec) - ), - networkMapClient, - configuration.baseDirectory, - configuration.extraNetworkMapKeys - ).closeOnStop() + + override val networkMapUpdater: NetworkMapUpdater + get() { + throw NotImplementedError() + } + private val transactionVerifierWorkerCount = 4 @Suppress("LeakingThis") override val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize() @@ -133,19 +159,54 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri override val auditService = DummyAuditService().tokenize() @Suppress("LeakingThis") - val smm = MultiThreadedStateMachineManager(this, checkpointStorage, MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize), database, newSecureRandom(), ReusableLatch(), cordappLoader.appClassLoader) + val smm = makeStateMachineManager() + + private fun makeStateMachineManager(): StateMachineManager { + val executor = MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) + runOnStop += { executor.shutdown() } + return MultiThreadedStateMachineManager( + this, + checkpointStorage, + executor, + database, + newSecureRandom(), + busyNodeLatch, + cordappLoader.appClassLoader + ) + } + // TODO Making this non-lateinit requires MockNode being able to create a blank InMemoryMessaging instance private lateinit var network: MessagingService private val cordappServices = MutableClassToInstanceMap.create() - val flowFactories = ConcurrentHashMap>, InitiatedFlowFactory<*>>() + private val flowFactories = ConcurrentHashMap>, InitiatedFlowFactory<*>>() + + override val rpcFlows: ArrayList>> + get() { + throw NotImplementedError() + } override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database) - - override val rpcFlows = ArrayList>>() - override val networkService: MessagingService get() = network + /** + * Completes once the node has successfully registered with the network map service + * or has loaded network map data from local database + */ + // TODO val nodeReadyFuture: CordaFuture get() = networkMapCache.nodeReady.map { Unit } + // TODO started + + private fun T.tokenize(): T { + tokenizableServices?.add(this) + ?: throw IllegalStateException("The tokenisable services list has already been finialised") + return this + } + + private fun T.closeOnStop(): T { + runOnStop += this::close + return this + } + override fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? { return flowFactories[initiatingFlowClass] } @@ -167,13 +228,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri override fun jdbcSession(): Connection = database.createSession() override fun registerUnloadHandler(runOnStop: () -> Unit) { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - private fun T.tokenize(): T { - tokenizableServices?.add(this) - ?: throw IllegalStateException("The tokenisable services list has already been finialised") - return this + this.runOnStop += runOnStop } private fun getVersionInfo(): VersionInfo { @@ -192,9 +247,8 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri return P2PMessagingClient( config = configuration, versionInfo = versionInfo, - serverAddress = configuration.messagingServerAddress - ?: NetworkHostAndPort("localhost", configuration.p2pAddress.port), - nodeExecutor = AffinityExecutor.ServiceAffinityExecutor("Flow Worker", 1), + serverAddress = configuration.messagingServerAddress!!, + nodeExecutor = serverThread, database = database, networkMap = networkMapCache, metricRegistry = metricRegistry, @@ -203,6 +257,87 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri ) } + private fun registerCordappFlows() { + cordappLoader.cordapps.flatMap { it.initiatedFlows } + .forEach { + try { + registerInitiatedFlowInternal(smm, it, track = false) + } catch (e: NoSuchMethodException) { + log.error("${it.name}, as an initiated flow, must have a constructor with a single parameter " + + "of type ${Party::class.java.name}") + } catch (e: Exception) { + log.error("Unable to register initiated flow ${it.name}", e) + } + } + } + + // TODO remove once not needed + private fun deprecatedFlowConstructorMessage(flowClass: Class<*>): String { + return "Installing flow factory for $flowClass accepting a ${Party::class.java.simpleName}, which is deprecated. " + + "It should accept a ${FlowSession::class.java.simpleName} instead" + } + + private fun > registerInitiatedFlowInternal(smm: StateMachineManager, initiatedFlow: Class, track: Boolean): Observable { + val constructors = initiatedFlow.declaredConstructors.associateBy { it.parameterTypes.toList() } + val flowSessionCtor = constructors[listOf(FlowSession::class.java)]?.apply { isAccessible = true } + val ctor: (FlowSession) -> F = if (flowSessionCtor == null) { + // Try to fallback to a Party constructor + val partyCtor = constructors[listOf(Party::class.java)]?.apply { isAccessible = true } + if (partyCtor == null) { + throw IllegalArgumentException("$initiatedFlow must have a constructor accepting a ${FlowSession::class.java.name}") + } else { + log.warn(deprecatedFlowConstructorMessage(initiatedFlow)) + } + { flowSession: FlowSession -> uncheckedCast(partyCtor.newInstance(flowSession.counterparty)) } + } else { + { flowSession: FlowSession -> uncheckedCast(flowSessionCtor.newInstance(flowSession)) } + } + val initiatingFlow = initiatedFlow.requireAnnotation().value.java + val (version, classWithAnnotation) = initiatingFlow.flowVersionAndInitiatingClass + require(classWithAnnotation == initiatingFlow) { + "${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}" + } + val flowFactory = InitiatedFlowFactory.CorDapp(version, initiatedFlow.appName, ctor) + val observable = internalRegisterFlowFactory(smm, initiatingFlow, flowFactory, initiatedFlow, track) + log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)") + return observable + } + + private fun > internalRegisterFlowFactory(smm: StateMachineManager, + initiatingFlowClass: Class>, + flowFactory: InitiatedFlowFactory, + initiatedFlowClass: Class, + track: Boolean): Observable { + val observable = if (track) { + smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass) + } else { + Observable.empty() + } + flowFactories[initiatingFlowClass] = flowFactory + return observable + } + + /** + * Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using + * [InitiatingFlow.version], core flows have the same version as the node's platform version. To cater for backwards + * compatibility [flowFactory] provides a second parameter which is the platform version of the initiating party. + */ + @VisibleForTesting + fun installCoreFlow(clientFlowClass: KClass>, flowFactory: (FlowSession) -> FlowLogic<*>) { + require(clientFlowClass.java.flowVersionAndInitiatingClass.first == 1) { + "${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version" + } + flowFactories[clientFlowClass.java] = InitiatedFlowFactory.Core(flowFactory) + log.debug { "Installed core flow ${clientFlowClass.java.name}" } + } + + private fun installCoreFlows() { + installCoreFlow(FinalityFlow::class, ::FinalityHandler) + installCoreFlow(NotaryChangeFlow::class, ::NotaryChangeHandler) + installCoreFlow(ContractUpgradeFlow.Initiate::class, ::ContractUpgradeHandler) + installCoreFlow(SwapIdentitiesFlow::class, ::SwapIdentitiesHandler) + } + private fun initialiseSerialization() { val serializationExists = try { effectiveSerializationEnv @@ -222,41 +357,37 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader), - rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node + rpcClientContext = AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader)) } } fun start() { + log.info("Flow Worker starting up ...") + initialiseSerialization() // TODO First thing we do is create the MessagingService. This should have been done by the c'tor but it's not // possible (yet) to due restriction from MockNode network = makeMessagingService().tokenize() - // TODO - configuration.configureWithDevSSLCertificate() - val trustRoot = DEV_ROOT_CA.certificate - val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) - - networkMapClient?.start(trustRoot) + installCoreFlows() + registerCordappFlows() servicesForResolution.start(networkParameters) - persistentNetworkMapCache.start(networkParameters.notaries) val isH2Database = isH2Database(configuration.dataSourceProperties.getProperty("dataSource.url", "")) val schemas = if (isH2Database) schemaService.internalSchemas() else schemaService.schemaOptions.keys database.startHikariPool(configuration.dataSourceProperties, configuration.database, schemas) identityService.start(trustRoot, listOf(myInfo.legalIdentitiesAndCerts.first().certificate, nodeCa)) + persistentNetworkMapCache.start(networkParameters.notaries) database.transaction { networkMapCache.start() } - // TODO - //networkMapUpdater.start(trustRoot, signedNetParams.raw.hash, signedNodeInfo.raw.hash) - - startMessaging() + identityService.ourNames = myInfo.legalIdentities.map { it.name }.toSet() + startMessagingService() database.transaction { identityService.loadIdentities(myInfo.legalIdentitiesAndCerts) @@ -271,8 +402,8 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri val frozenTokenizableServices = tokenizableServices!! tokenizableServices = null - smm.start(frozenTokenizableServices) runOnStop += { smm.stop(0) } + smm.start(frozenTokenizableServices) } } @@ -283,36 +414,15 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri runOnStop.clear() } - private fun startMessaging() { + private fun startMessagingService() { val client = network as P2PMessagingClient - - val messageBroker = if (!configuration.messagingServerExternal) { - val brokerBindAddress = configuration.messagingServerAddress - ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port) - ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize) - } else { - null - } - - // Start up the embedded MQ server - messageBroker?.apply { - closeOnStop() - start() - } + Node.printBasicNodeInfo("Advertised P2P messaging addresses", myInfo.addresses.joinToString()) client.closeOnStop() client.start( myIdentity = myInfo.legalIdentities[0].owningKey, - serviceIdentity = if (myInfo.legalIdentities.size == 1) null else myInfo.legalIdentities[1].owningKey, - advertisedAddress = myInfo.addresses.single(), + serviceIdentity = null, maxMessageSize = networkParameters.maxMessageSize, legalName = myInfo.legalIdentities[0].name.toString() ) } - - - private fun T.closeOnStop(): T { - runOnStop += this::close - return this - } - } \ No newline at end of file diff --git a/experimental/rpc-worker/build.gradle b/experimental/rpc-worker/build.gradle index 0623bf67c1..8f2e7977d9 100644 --- a/experimental/rpc-worker/build.gradle +++ b/experimental/rpc-worker/build.gradle @@ -9,8 +9,8 @@ */ apply plugin: 'kotlin' -apply plugin: 'idea' apply plugin: 'net.corda.plugins.cordapp' +apply plugin: 'net.corda.plugins.quasar-utils' description 'Experiment to make out-of-node RPC processing' @@ -48,6 +48,8 @@ dependencies { compile "info.picocli:picocli:$picocli_version" integrationTestCompile project(":test-utils") + integrationTestCompile project(":node-driver") + compile project(":experimental:flow-worker") } jar { diff --git a/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt b/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt new file mode 100644 index 0000000000..369722a06b --- /dev/null +++ b/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt @@ -0,0 +1,196 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.rpcWorker + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.sign +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.flatMap +import net.corda.core.internal.concurrent.map +import net.corda.core.internal.createDirectory +import net.corda.core.internal.div +import net.corda.core.node.NetworkParameters +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds +import net.corda.flowworker.FlowWorker +import net.corda.flowworker.FlowWorkerServiceHub +import net.corda.node.internal.NetworkParametersReader +import net.corda.node.internal.artemis.ArtemisBroker +import net.corda.node.internal.security.RPCSecurityManagerImpl +import net.corda.node.services.config.* +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.network.NodeInfoWatcher +import net.corda.node.services.rpc.ArtemisRpcBroker +import net.corda.nodeapi.internal.NodeInfoAndSigned +import net.corda.nodeapi.internal.bridging.BridgeControlListener +import net.corda.nodeapi.internal.config.User +import net.corda.nodeapi.internal.crypto.X509Utilities +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.getTestPartyAndCertificate +import net.corda.testing.driver.DriverParameters +import net.corda.testing.node.MockServices +import net.corda.testing.node.internal.DriverDSLImpl +import net.corda.testing.node.internal.InternalDriverDSL +import net.corda.testing.node.internal.TestCordappDirectories +import net.corda.testing.node.internal.genericDriver +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl +import org.apache.commons.io.FileUtils +import java.nio.file.Paths +import java.security.KeyPair +import java.security.cert.X509Certificate +import java.util.* + +fun rpcFlowWorkerDriver( + defaultParameters: DriverParameters = DriverParameters(), + dsl: RpcFlowWorkerDriverDSL.() -> A +): A { + return genericDriver( + defaultParameters = defaultParameters, + driverDslWrapper = { driverDSL: DriverDSLImpl -> RpcFlowWorkerDriverDSL(driverDSL) }, + coerce = { it }, dsl = dsl + ) +} + +data class RpcFlowWorkerHandle(val rpcAddress: NetworkHostAndPort) + +data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL { + + fun startRpcFlowWorker(myLegalName: CordaX500Name, rpcUsers: List, numberOfFlowWorkers: Int = 1): CordaFuture { + val (config, rpcWorkerConfig, flowWorkerConfigs) = generateConfigs(myLegalName, rpcUsers, numberOfFlowWorkers) + + val trustRoot = rpcWorkerConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA) + val nodeCa = rpcWorkerConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) + + val ourKeyPair = Crypto.generateKeyPair() + val ourParty = Party(myLegalName, ourKeyPair.public) + val ourPartyAndCertificate = getTestPartyAndCertificate(ourParty) + val myInfo = NodeInfo(listOf(config.messagingServerAddress!!), listOf(ourPartyAndCertificate), 1, 1) + + val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised -> + ourKeyPair.private.sign(serialised.bytes) + } + NodeInfoWatcher.saveToFile(rpcWorkerConfig.baseDirectory, nodeInfoAndSigned) + + return driverDSL.networkMapAvailability.flatMap { + val visibilityHandle = driverDSL.networkVisibilityController.register(myLegalName) + it!!.networkParametersCopier.install(rpcWorkerConfig.baseDirectory) + it.nodeInfosCopier.addConfig(rpcWorkerConfig.baseDirectory) + + val signedNetworkParameters = NetworkParametersReader(trustRoot, null, rpcWorkerConfig.baseDirectory).read() + + val flowWorkerBroker = createFlowWorkerBroker(config, signedNetworkParameters.networkParameters.maxMessageSize) + val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize) + + flowWorkerConfigs.map { + val (flowWorker, _) = createFlowWorker(it, myInfo, signedNetworkParameters.networkParameters, ourKeyPair, trustRoot, nodeCa) + shutdownManager.registerShutdown { flowWorker.stop() } + } + + val (rpcWorker, rpcWorkerServiceHub) = createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl) + + val bridgeControlListener = createBridgeControlListener(config, signedNetworkParameters.networkParameters.maxMessageSize) + + shutdownManager.registerShutdown { + bridgeControlListener.stop() + rpcWorker.stop() + flowWorkerBroker.stop() + rpcWorkerBroker.stop() + } + + visibilityHandle.listen(rpcWorkerServiceHub.rpcOps).map { + RpcFlowWorkerHandle(rpcWorkerConfig.rpcOptions.address) + } + } + } + + private fun generateConfigs(myLegalName: CordaX500Name, rpcUsers: List, numberOfFlowWorkers: Int): Triple> { + val cordappDirectories = TestCordappDirectories.cached(driverDSL.cordappsForAllNodes).toList() + + val rpcWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort()) + val rpcWorkerBrokerAdminAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort()) + val flowWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort()) + + val baseDirectory = driverDSL.driverDirectory / myLegalName.organisation + baseDirectory.createDirectory() + + val config = genericConfig().copy(myLegalName = myLegalName, baseDirectory = baseDirectory, + messagingServerAddress = flowWorkerBrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(), + cordappDirectories = cordappDirectories) + // create test certificates + config.configureWithDevSSLCertificate() + + val rpcWorkerConfig = config.copy(baseDirectory = driverDSL.driverDirectory / myLegalName.organisation / "rpcWorker", + rpcUsers = rpcUsers.map { User(it.username, it.password, it.permissions) }, + rpcSettings = NodeRpcSettings(rpcWorkerBrokerAddress, rpcWorkerBrokerAdminAddress, true, false, null)) + // copy over certificates to RpcWorker + FileUtils.copyDirectory(config.certificatesDirectory.toFile(), (rpcWorkerConfig.baseDirectory / "certificates").toFile()) + + val flowWorkerConfigs = (1..numberOfFlowWorkers).map { + val flowWorkerConfig = config.copy(baseDirectory = driverDSL.driverDirectory / myLegalName.organisation / "flowWorker$it") + // copy over certificates to FlowWorker + FileUtils.copyDirectory(config.certificatesDirectory.toFile(), (flowWorkerConfig.baseDirectory / "certificates").toFile()) + + flowWorkerConfig + } + + return Triple(config, rpcWorkerConfig, flowWorkerConfigs) + } +} + +private fun genericConfig(): NodeConfigurationImpl { + return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = DUMMY_BANK_A_NAME, emailAddress = "", + keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(), + rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0), + p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null), + relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)), + notary = null) +} + +private fun createRpcWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker { + val rpcOptions = config.rpcOptions + val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers)) + val broker = if (rpcOptions.useSsl) { + ArtemisRpcBroker.withSsl(config, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false) + } else { + ArtemisRpcBroker.withoutSsl(config, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false) + } + broker.start() + return broker +} + +private fun createRpcWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, serverControl: ActiveMQServerControl): Pair { + val rpcWorkerServiceHub = RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa) + val rpcWorker = RpcWorker(rpcWorkerServiceHub, serverControl) + rpcWorker.start() + return Pair(rpcWorker, rpcWorkerServiceHub) +} + +private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker { + val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize) + broker.start() + return broker +} + +private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair { + val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, networkParameters, ourKeyPair, trustRoot, nodeCa) + val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub) + flowWorker.start() + return Pair(flowWorker, flowWorkerServiceHub) +} + +private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener { + val bridgeControlListener = BridgeControlListener(config, config.messagingServerAddress!!, maxMessageSize) + bridgeControlListener.start() + return bridgeControlListener +} \ No newline at end of file diff --git a/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcWorkerTest.kt b/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcWorkerTest.kt index fbb619898f..74e5742c12 100644 --- a/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcWorkerTest.kt +++ b/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcWorkerTest.kt @@ -1,111 +1,47 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + package net.corda.rpcWorker -import net.corda.client.rpc.CordaRPCClientConfiguration -import net.corda.client.rpc.internal.RPCClient -import net.corda.core.internal.deleteRecursively -import net.corda.core.messaging.RPCOps -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.nodeapi.internal.config.User -import net.corda.testing.internal.setGlobalSerialization -import org.junit.After -import org.junit.Before +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.finance.DOLLARS +import net.corda.finance.contracts.getCashBalances +import net.corda.finance.flows.CashIssueFlow +import net.corda.finance.flows.CashPaymentFlow +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.node.User import org.junit.Test -import rx.Observable -import java.nio.file.Files -import kotlin.test.assertEquals class RpcWorkerTest { - private val rpcAddress = NetworkHostAndPort("localhost", 10000) - private val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT - private val user = User("user1", "test", emptySet()) - - private val serializationEnv = setGlobalSerialization(true) - - private val artemisPath = Files.createTempDirectory("RpcWorkerArtemis") - private val instance = RpcWorker(rpcAddress, user, TestRpcOpsImpl(), artemisPath) - - @Before - fun setup() { - instance.start() - } - - @After - fun tearDown() { - instance.close() - serializationEnv.unset() - artemisPath.deleteRecursively() - } - - private fun withConnectionEstablished(block: (rpcOps: TestRpcOps) -> Unit) { - val client = RPCClient(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), rpcConfiguration) - val connection = client.start(TestRpcOps::class.java, user.username, user.password) - - try { - val rpcOps = connection.proxy - block(rpcOps) - } finally { - connection.close() - } - } - @Test - fun testPing() { - withConnectionEstablished {rpcOps -> - assertEquals("pong", rpcOps.ping()) + fun `cash pay`() { + rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { + val bankAUser = User("username", "password", permissions = setOf("ALL")) + val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser), 2).get() + val bankB = startNode().get() + + val bankAProxy = CordaRPCClient(bankA.rpcAddress).start("username", "password").proxy + + val cashIssueResult = bankAProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0x01), defaultNotaryIdentity).returnValue.get() + println(cashIssueResult) + println(bankAProxy.getCashBalances()) + + val cashPayResult = bankAProxy.startFlow(::CashPaymentFlow, 2.DOLLARS, bankB.nodeInfo.singleIdentity()).returnValue.get() + println(cashPayResult) + println(bankAProxy.getCashBalances()) } } - @Test - fun testReverse() { - withConnectionEstablished {rpcOps -> - val exampleStr = "Makka Pakka" - assertEquals(exampleStr.reversed(), rpcOps.reverse(exampleStr)) - } - } - - @Test - fun testObservable() { - withConnectionEstablished { rpcOps -> - val start = 21 - val end = 100 - val observable = rpcOps.incSequence(start) - observable.take(end - start).zipWith((start..end).asIterable()) { a, b -> Pair(a, b) }.forEach { - assertEquals(it.first, it.second) - } - } - } - - /** - * Defines communication protocol - */ - interface TestRpcOps : RPCOps { - - fun ping() : String - - fun reverse(str : String) : String - - fun incSequence(start : Int) : Observable - } - - /** - * Server side implementation - */ - class TestRpcOpsImpl : TestRpcOps { - - override val protocolVersion: Int = 1 - - override fun ping(): String { - return "pong" - } - - override fun reverse(str: String): String { - return str.reversed() - } - - override fun incSequence(start: Int): Observable { - return Observable.range(start, 100) - } - } } \ No newline at end of file diff --git a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt new file mode 100644 index 0000000000..7a7f383d52 --- /dev/null +++ b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/CordaRpcWorkerOps.kt @@ -0,0 +1,376 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.rpcWorker + +import com.google.common.util.concurrent.SettableFuture +import net.corda.client.rpc.notUsed +import net.corda.core.CordaRuntimeException +import net.corda.core.concurrent.CordaFuture +import net.corda.core.context.* +import net.corda.core.contracts.ContractState +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.toStringShort +import net.corda.core.flows.FlowInitiator +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.RPC_UPLOADER +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.sign +import net.corda.core.internal.uncheckedCast +import net.corda.core.messaging.* +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.AttachmentId +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.* +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.transactions.SignedTransaction +import net.corda.flowworker.* +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.services.vault.NodeVaultService +import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import java.io.InputStream +import java.net.ConnectException +import java.security.PublicKey +import java.time.Instant +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +class CordaRpcWorkerOps( + private val services: ServiceHubInternal, + private val shutdownNode: () -> Unit +) : CordaRPCOps { + + companion object { + const val RPC_WORKER_QUEUE_ADDRESS_PREFIX = "${ArtemisMessagingComponent.INTERNAL_PREFIX}rpc.worker." + } + + private val flowWorkerQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${services.myInfo.legalIdentities[0].owningKey.toStringShort()}" + + private val rpcWorkerQueueAddress = "$RPC_WORKER_QUEUE_ADDRESS_PREFIX${services.myInfo.legalIdentities[0].owningKey.toStringShort()}" + private val rpcWorkerId = UUID.randomUUID().toString() + private val rpcWorkerQueueName = "$rpcWorkerQueueAddress.$rpcWorkerId" + + private val artemisClient = ArtemisMessagingClient(services.configuration, services.configuration.messagingServerAddress!!, services.networkParameters.maxMessageSize) + private lateinit var session: ClientSession + private lateinit var producer: ClientProducer + + private val flowReplyStateMachineRunIdMap = ConcurrentHashMap>() + private val flowReplyResultMap = ConcurrentHashMap>() + + fun start() { + session = artemisClient.start().session + producer = session.createProducer() + + val rpcWorkerQueueQuery = session.queueQuery(SimpleString(rpcWorkerQueueName)) + if (!rpcWorkerQueueQuery.isExists) { + session.createQueue(rpcWorkerQueueAddress, RoutingType.ANYCAST, rpcWorkerQueueName, true) + } + + val consumer = session.createConsumer(rpcWorkerQueueName) + consumer.setMessageHandler { message -> handleFlowWorkerMessage(message) } + + networkMapFeed().updates.subscribe { mapChange: NetworkMapCache.MapChange? -> + val networkMapUpdateMessage = NetworkMapUpdate(services.myInfo.legalIdentities.first().name, mapChange!!) + val artemisMessage = session.createMessage(true) + artemisMessage.writeBodyBufferBytes(networkMapUpdateMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes) + producer.send(flowWorkerQueueAddress, artemisMessage) + } + } + + private fun handleFlowWorkerMessage(message: ClientMessage) { + val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } + val flowWorkerMessage = data.deserialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT) + + when (flowWorkerMessage) { + is FlowReplyStateMachineRunId -> { + flowReplyStateMachineRunIdMap.remove(flowWorkerMessage.replyId)?.set(flowWorkerMessage.id) + } + is FlowReplyResult -> { + flowReplyResultMap.remove(flowWorkerMessage.replyId)?.set(flowWorkerMessage.result) + // TODO hack, fix the way we populate contractStateTypeMappings + (services.vaultService as NodeVaultService).bootstrapContractStateTypes() + } + } + } + + override fun networkMapSnapshot(): List { + val (snapshot, updates) = networkMapFeed() + updates.notUsed() + return snapshot + } + + override fun networkParametersFeed(): DataFeed { + return services.networkMapUpdater.trackParametersUpdate() + } + + override fun acceptNewNetworkParameters(parametersHash: SecureHash) { + services.networkMapUpdater.acceptNewNetworkParameters( + parametersHash, + // TODO When multiple identities design will be better specified this should be signature from node operator. + { hash -> hash.serialize().sign { services.keyManagementService.sign(it.bytes, services.myInfo.legalIdentities[0].owningKey) } } + ) + } + + override fun networkMapFeed(): DataFeed, NetworkMapCache.MapChange> { + return services.networkMapCache.track() + } + + override fun vaultQueryBy(criteria: QueryCriteria, + paging: PageSpecification, + sorting: Sort, + contractStateType: Class): Vault.Page { + contractStateType.checkIsA() + return services.vaultService._queryBy(criteria, paging, sorting, contractStateType) + } + + @RPCReturnsObservables + override fun vaultTrackBy(criteria: QueryCriteria, + paging: PageSpecification, + sorting: Sort, + contractStateType: Class): DataFeed, Vault.Update> { + contractStateType.checkIsA() + return services.vaultService._trackBy(criteria, paging, sorting, contractStateType) + } + + @Suppress("OverridingDeprecatedMember") + override fun internalVerifiedTransactionsSnapshot(): List { + val (snapshot, updates) = @Suppress("DEPRECATION") internalVerifiedTransactionsFeed() + updates.notUsed() + return snapshot + } + + override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId) + + @Suppress("OverridingDeprecatedMember") + override fun internalVerifiedTransactionsFeed(): DataFeed, SignedTransaction> { + return services.validatedTransactions.track() + } + + override fun stateMachinesSnapshot(): List { + val (snapshot, updates) = stateMachinesFeed() + updates.notUsed() + return snapshot + } + + override fun killFlow(id: StateMachineRunId): Boolean { + TODO() + } + + override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { + TODO() + } + + override fun stateMachineRecordedTransactionMappingSnapshot(): List { + val (snapshot, updates) = stateMachineRecordedTransactionMappingFeed() + updates.notUsed() + return snapshot + } + + override fun stateMachineRecordedTransactionMappingFeed(): DataFeed, StateMachineTransactionMapping> { + return services.stateMachineRecordedTransactionMapping.track() + } + + override fun nodeInfo(): NodeInfo { + return services.myInfo + } + + override fun notaryIdentities(): List { + return services.networkMapCache.notaryIdentities + } + + override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) { + services.vaultService.addNoteToTransaction(txnId, txnNote) + } + + override fun getVaultTransactionNotes(txnId: SecureHash): Iterable { + return services.vaultService.getTransactionNotes(txnId) + } + + override fun startTrackedFlowDynamic(logicType: Class>, vararg args: Any?): FlowProgressHandle { + TODO() + } + + override fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle { + // TODO + val context = InvocationContext.rpc(Actor(Actor.Id("Mark"), AuthServiceId("Test"), CordaX500Name("ff", "ff", "GB"))) + + val replyId = Trace.InvocationId.newInstance() + + val startFlowMessage = StartFlow(services.myInfo.legalIdentities.first().name, logicType, args, context, rpcWorkerQueueAddress, replyId) + val artemisMessage = session.createMessage(true) + artemisMessage.writeBodyBufferBytes(startFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes) + producer.send(flowWorkerQueueAddress, artemisMessage) + + val flowReplyStateMachineRunIdFuture = SettableFuture.create() + flowReplyStateMachineRunIdMap[replyId] = flowReplyStateMachineRunIdFuture + + val flowReplyResultFuture = openFuture() + flowReplyResultMap[replyId] = uncheckedCast(flowReplyResultFuture) + + return FlowHandleImpl(flowReplyStateMachineRunIdFuture.get(), flowReplyResultFuture) + } + + override fun attachmentExists(id: SecureHash): Boolean { + return services.attachments.openAttachment(id) != null + } + + override fun openAttachment(id: SecureHash): InputStream { + return services.attachments.openAttachment(id)!!.open() + } + + override fun uploadAttachment(jar: InputStream): SecureHash { + return services.attachments.importAttachment(jar, RPC_UPLOADER, null) + } + + override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash { + return services.attachments.importAttachment(jar, uploader, filename) + } + + override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List { + return services.attachments.queryAttachments(query, sorting) + } + + override fun currentNodeTime(): Instant = Instant.now(services.clock) + + override fun waitUntilNetworkReady(): CordaFuture = services.networkMapCache.nodeReady + + override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? { + return services.identityService.wellKnownPartyFromAnonymous(party) + } + + override fun partyFromKey(key: PublicKey): Party? { + return services.identityService.partyFromKey(key) + } + + override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? { + return services.identityService.wellKnownPartyFromX500Name(x500Name) + } + + override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? = services.networkMapCache.getNotary(x500Name) + + override fun partiesFromName(query: String, exactMatch: Boolean): Set { + return services.identityService.partiesFromName(query, exactMatch) + } + + override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? { + return services.networkMapCache.getNodeByLegalIdentity(party) + } + + override fun registeredFlows(): List = services.rpcFlows.map { it.name }.sorted() + + override fun clearNetworkMapCache() { + services.networkMapCache.clearNetworkMapCache() + } + + override fun refreshNetworkMapCache() { + try { + services.networkMapUpdater.updateNetworkMapCache() + } catch (e: Exception) { + when (e) { + is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down") + else -> throw e + } + } + } + + override fun vaultQuery(contractStateType: Class): Vault.Page { + return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType) + } + + override fun vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class): Vault.Page { + return vaultQueryBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType) + } + + override fun vaultQueryByWithPagingSpec(contractStateType: Class, criteria: QueryCriteria, paging: PageSpecification): Vault.Page { + return vaultQueryBy(criteria, paging, Sort(emptySet()), contractStateType) + } + + override fun vaultQueryByWithSorting(contractStateType: Class, criteria: QueryCriteria, sorting: Sort): Vault.Page { + return vaultQueryBy(criteria, PageSpecification(), sorting, contractStateType) + } + + override fun vaultTrack(contractStateType: Class): DataFeed, Vault.Update> { + return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType) + } + + override fun vaultTrackByCriteria(contractStateType: Class, criteria: QueryCriteria): DataFeed, Vault.Update> { + return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType) + } + + override fun vaultTrackByWithPagingSpec(contractStateType: Class, criteria: QueryCriteria, paging: PageSpecification): DataFeed, Vault.Update> { + return vaultTrackBy(criteria, paging, Sort(emptySet()), contractStateType) + } + + override fun vaultTrackByWithSorting(contractStateType: Class, criteria: QueryCriteria, sorting: Sort): DataFeed, Vault.Update> { + return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType) + } + + override fun setFlowsDrainingModeEnabled(enabled: Boolean) { + services.nodeProperties.flowsDrainingMode.setEnabled(enabled) + } + + override fun isFlowsDrainingModeEnabled(): Boolean { + return services.nodeProperties.flowsDrainingMode.isEnabled() + } + + override fun shutdown() { + artemisClient.stop() + shutdownNode.invoke() + } + + private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { + return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context) + } + + private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate { + return when (change) { + is StateMachineManager.Change.Add -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.logic)) + is StateMachineManager.Change.Removed -> StateMachineUpdate.Removed(change.logic.runId, change.result) + } + } + + private fun InvocationContext.toFlowInitiator(): FlowInitiator { + val principal = origin.principal().name + return when (origin) { + is InvocationOrigin.RPC -> FlowInitiator.RPC(principal) + is InvocationOrigin.Peer -> { + val wellKnownParty = services.identityService.wellKnownPartyFromX500Name((origin as InvocationOrigin.Peer).party) + wellKnownParty?.let { FlowInitiator.Peer(it) } + ?: throw IllegalStateException("Unknown peer with name ${(origin as InvocationOrigin.Peer).party}.") + } + is InvocationOrigin.Service -> FlowInitiator.Service(principal) + InvocationOrigin.Shell -> FlowInitiator.Shell + is InvocationOrigin.Scheduled -> FlowInitiator.Scheduled((origin as InvocationOrigin.Scheduled).scheduledState) + } + } + + /** + * RPC can be invoked from the shell where the type parameter of any [Class] parameter is lost, so we must + * explicitly check that the provided [Class] is the one we want. + */ + private inline fun Class<*>.checkIsA() { + require(TARGET::class.java.isAssignableFrom(this)) { "$name is not a ${TARGET::class.java.name}" } + } +} \ No newline at end of file diff --git a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorker.kt b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorker.kt index 6bb2a51ddb..d07f0839cf 100644 --- a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorker.kt +++ b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorker.kt @@ -1,50 +1,39 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + package net.corda.rpcWorker -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions -import net.corda.core.concurrent.CordaFuture -import net.corda.core.context.AuthServiceId import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.concurrent.doneFuture -import net.corda.core.internal.concurrent.fork -import net.corda.core.internal.concurrent.map -import net.corda.core.internal.isAbstractClass -import net.corda.core.messaging.RPCOps -import net.corda.core.serialization.internal.SerializationEnvironmentImpl -import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.core.utilities.* -import net.corda.node.internal.Startable -import net.corda.node.internal.Stoppable +import net.corda.core.internal.div +import net.corda.core.node.NodeInfo +import net.corda.node.internal.NetworkParametersReader +import net.corda.node.internal.Node +import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.security.RPCSecurityManagerImpl -import net.corda.node.serialization.amqp.AMQPServerSerializationScheme -import net.corda.node.services.messaging.RPCServer +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.SecurityConfiguration +import net.corda.node.services.messaging.InternalRPCMessagingClient import net.corda.node.services.messaging.RPCServerConfiguration -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.nodeapi.RPCApi +import net.corda.node.services.rpc.ArtemisRpcBroker import net.corda.nodeapi.internal.config.User -import net.corda.serialization.internal.AMQP_P2P_CONTEXT -import net.corda.serialization.internal.SerializationFactoryImpl -import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.TransportConfiguration -import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import net.corda.nodeapi.internal.crypto.X509Utilities import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl -import org.apache.activemq.artemis.core.config.Configuration -import org.apache.activemq.artemis.core.config.CoreQueueConfiguration -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl -import org.apache.activemq.artemis.core.security.CheckType -import org.apache.activemq.artemis.core.security.Role -import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy -import org.apache.activemq.artemis.core.settings.impl.AddressSettings -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection -import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3 import picocli.CommandLine import java.io.File -import java.lang.IllegalArgumentException import java.nio.file.FileSystems import java.nio.file.Path -import java.util.concurrent.Executors +import java.security.KeyPair +import java.security.cert.X509Certificate +import java.util.* import kotlin.system.exitProcess fun main(args: Array) { @@ -56,7 +45,8 @@ fun main(args: Array) { if (main.verbose) { throwable.printStackTrace() } else { - System.err.println("ERROR: ${throwable.message ?: ""}. Please use '--verbose' option to obtain more details.") + System.err.println("ERROR: ${throwable.message + ?: ""}. Please use '--verbose' option to obtain more details.") } exitProcess(1) } @@ -66,7 +56,7 @@ fun main(args: Array) { name = "RPC Worker", mixinStandardHelpOptions = true, showDefaultValues = true, - description = [ "Standalone RPC server endpoint with pluggable set of operations." ] + description = ["Standalone RPC server endpoint with pluggable set of operations."] ) class Main : Runnable { @CommandLine.Option( @@ -91,199 +81,77 @@ class Main : Runnable { val port = config.getInt("port") val user = User(config.getString("userName"), config.getString("password"), emptySet()) - val rpcOps = instantiateAndValidate(config.getString("rpcOpsImplClass")) val artemisDir = FileSystems.getDefault().getPath(config.getString("artemisDir")) - initialiseSerialization() - RpcWorker(NetworkHostAndPort("localhost", port), user, rpcOps, artemisDir).start() + val rpcWorkerConfig = getRpcWorkerConfig(port, user, artemisDir) + val ourKeyPair = getIdentity() + val myInfo = getNodeInfo() + + val trustRoot = rpcWorkerConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA) + val nodeCa = rpcWorkerConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) + + val signedNetworkParameters = NetworkParametersReader(trustRoot, null, rpcWorkerConfig.baseDirectory).read() + val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize) + createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl) } - private fun instantiateAndValidate(rpcOpsImplClassName: String): RPCOps { - try { - val klass = Class.forName(rpcOpsImplClassName) - if (klass.isAbstractClass) { - throw IllegalArgumentException("$rpcOpsImplClassName must not be abstract") - } - val instance = klass.newInstance() - return instance as? RPCOps ?: throw IllegalArgumentException("class '$rpcOpsImplClassName' is not extending RPCOps") - } catch (ex: ClassNotFoundException) { - throw IllegalArgumentException("class '$rpcOpsImplClassName' not found in the classpath") - } + private fun getRpcWorkerConfig(port: Int, user: User, artemisDir: Path): NodeConfiguration { + TODO("" + port + user + artemisDir) } - private fun initialiseSerialization() { - synchronized(this) { - if (nodeSerializationEnv == null) { - val classloader = this::class.java.classLoader - nodeSerializationEnv = SerializationEnvironmentImpl( - SerializationFactoryImpl().apply { - registerScheme(AMQPServerSerializationScheme(emptyList())) - }, - p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), - rpcServerContext = AMQP_P2P_CONTEXT.withClassLoader(classloader) - ) - } + private fun getIdentity(): KeyPair { + TODO() + } + + private fun getNodeInfo(): NodeInfo { + TODO() + } + + private fun createRpcWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker { + val rpcOptions = config.rpcOptions + val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers)) + val broker = if (rpcOptions.useSsl) { + ArtemisRpcBroker.withSsl(config, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false) + } else { + ArtemisRpcBroker.withoutSsl(config, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false) } + broker.start() + return broker + } + + private fun createRpcWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, serverControl: ActiveMQServerControl): Pair { + val rpcWorkerServiceHub = RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa) + val rpcWorker = RpcWorker(rpcWorkerServiceHub, serverControl) + rpcWorker.start() + return Pair(rpcWorker, rpcWorkerServiceHub) } } -/** - * Note once `stop()` been called, there is no longer an option to call `start()` and the instance should be discarded - */ -class RpcWorker(private val hostAndPort: NetworkHostAndPort, private val user: User, private val ops: RPCOps, private val artemisPath: Path) : Startable, Stoppable { +class RpcWorker(private val rpcWorkerServiceHub: RpcWorkerServiceHub, private val serverControl: ActiveMQServerControl) { - private companion object { - const val MAX_MESSAGE_SIZE: Int = 10485760 - const val notificationAddress = "notifications" - private val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality = "Nowhere", country = "GB") - private val DEFAULT_TIMEOUT = 60.seconds + private val runOnStop = ArrayList<() -> Any?>() - private val logger = contextLogger() - } - - private val executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("RpcWorker-pool-thread-%d").build()) - private val registeredShutdowns = mutableListOf(doneFuture({executorService.shutdown()})) - - override var started = false - - override fun start() { - started = true - - startRpcServer().getOrThrow(DEFAULT_TIMEOUT) - } - - override fun stop() { - val shutdownOutcomes = registeredShutdowns.map { Try.on { it.getOrThrow(DEFAULT_TIMEOUT) } } - shutdownOutcomes.reversed().forEach { - when (it) { - is Try.Success -> - try { - it.value() - } catch (t: Throwable) { - logger.warn("Exception while calling a shutdown action, this might create resource leaks", t) - } - is Try.Failure -> logger.warn("Exception while getting shutdown method, disregarding", it.exception) - } - } - - started = false - } - - private fun startRpcServer(): CordaFuture { - return startRpcBroker().map { serverControl -> - startRpcServerWithBrokerRunning(serverControl = serverControl) - } - } - - private fun startRpcBroker( - maxFileSize: Int = MAX_MESSAGE_SIZE, - maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE - ): CordaFuture { - return executorService.fork { - logger.info("Artemis files will be stored in: $artemisPath") - val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, artemisPath, hostAndPort) - val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(user)) - server.start() - registeredShutdowns.add(doneFuture({ - server.stop() - })) - server.activeMQServerControl - } - } - - private fun createNettyClientTransportConfiguration(): TransportConfiguration { - return ArtemisTcpTransport.rpcConnectorTcpTransport(hostAndPort, null) - } - - private fun startRpcServerWithBrokerRunning( - nodeLegalName: CordaX500Name = fakeNodeLegalName, - configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, - serverControl: ActiveMQServerControl - ): RPCServer { - val locator = ActiveMQClient.createServerLocatorWithoutHA(createNettyClientTransportConfiguration()).apply { - minLargeMessageSize = MAX_MESSAGE_SIZE - isUseGlobalPools = false - } - val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(User(user.username, user.password, user.permissions)), - id = AuthServiceId("RPC_WORKER_SECURITY_MANAGER")) - val rpcServer = RPCServer( - ops, - user.username, - user.password, - locator, - rpcSecurityManager, - nodeLegalName, - configuration + fun start() { + val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy( + rpcThreadPoolSize = rpcWorkerServiceHub.configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize ) - registeredShutdowns.add(doneFuture({ - rpcServer.close() - locator.close() - })) - rpcServer.start(serverControl) - return rpcServer + val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(rpcWorkerServiceHub.configuration.rpcUsers)) + val nodeName = CordaX500Name.build(rpcWorkerServiceHub.configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal) + + val internalRpcMessagingClient = InternalRPCMessagingClient(rpcWorkerServiceHub.configuration, rpcWorkerServiceHub.configuration.rpcOptions.adminAddress, Node.MAX_RPC_MESSAGE_SIZE, nodeName, rpcServerConfiguration) + internalRpcMessagingClient.init(rpcWorkerServiceHub.rpcOps, securityManager) + internalRpcMessagingClient.start(serverControl) + + runOnStop += { rpcWorkerServiceHub.stop() } + rpcWorkerServiceHub.start() + + runOnStop += { internalRpcMessagingClient.stop() } } - private fun createRpcServerArtemisConfig(maxFileSize: Int, maxBufferedBytesPerClient: Long, baseDirectory: Path, hostAndPort: NetworkHostAndPort): Configuration { - return ConfigurationImpl().apply { - val artemisDir = "$baseDirectory/artemis" - bindingsDirectory = "$artemisDir/bindings" - journalDirectory = "$artemisDir/journal" - largeMessagesDirectory = "$artemisDir/large-messages" - acceptorConfigurations = setOf(ArtemisTcpTransport.rpcAcceptorTcpTransport(hostAndPort, null)) - configureCommonSettings(maxFileSize, maxBufferedBytesPerClient) + fun stop() { + for (toRun in runOnStop.reversed()) { + toRun() } - } - - private fun ConfigurationImpl.configureCommonSettings(maxFileSize: Int, maxBufferedBytesPerClient: Long) { - managementNotificationAddress = SimpleString(notificationAddress) - isPopulateValidatedUser = true - journalBufferSize_NIO = maxFileSize - journalBufferSize_AIO = maxFileSize - journalFileSize = maxFileSize - queueConfigurations = listOf( - CoreQueueConfiguration().apply { - name = RPCApi.RPC_SERVER_QUEUE_NAME - address = RPCApi.RPC_SERVER_QUEUE_NAME - isDurable = false - }, - CoreQueueConfiguration().apply { - name = RPCApi.RPC_CLIENT_BINDING_REMOVALS - address = notificationAddress - filterString = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION - isDurable = false - }, - CoreQueueConfiguration().apply { - name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS - address = notificationAddress - filterString = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION - isDurable = false - } - ) - addressesSettings = mapOf( - "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply { - maxSizeBytes = maxBufferedBytesPerClient - addressFullMessagePolicy = AddressFullMessagePolicy.FAIL - } - ) - } -} - -private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 { - override fun validateUser(user: String?, password: String?) = isValid(user, password) - override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?) = isValid(user, password) - override fun validateUser(user: String?, password: String?, connection: RemotingConnection?): String? { - return validate(user, password) - } - - override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? { - return validate(user, password) - } - - private fun isValid(user: String?, password: String?): Boolean { - return rpcUser.username == user && rpcUser.password == password - } - - private fun validate(user: String?, password: String?): String? { - return if (isValid(user, password)) user else null + runOnStop.clear() } } \ No newline at end of file diff --git a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerClient.kt b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerClient.kt deleted file mode 100644 index 4b28fa9488..0000000000 --- a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerClient.kt +++ /dev/null @@ -1,23 +0,0 @@ -package net.corda.rpcWorker - -import net.corda.client.rpc.CordaRPCClientConfiguration -import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT - -fun main(args: Array) { - AMQPClientSerializationScheme.initialiseSerialization() - val client = RPCClient(ArtemisTcpTransport.rpcConnectorTcpTransport(NetworkHostAndPort("localhost", 20002), null), - CordaRPCClientConfiguration.DEFAULT, serializationContext = AMQP_RPC_CLIENT_CONTEXT) - val connection = client.start(SimplisticRpcOps::class.java, "user1", "test1") - - try { - val rpcOps = connection.proxy - println("Server hostname and PID: " + rpcOps.hostnameAndPid()) - println("Server timestamp: " + rpcOps.currentTimeStamp()) - } finally { - connection.close() - } -} \ No newline at end of file diff --git a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerServiceHub.kt b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerServiceHub.kt new file mode 100644 index 0000000000..61bac4b474 --- /dev/null +++ b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/RpcWorkerServiceHub.kt @@ -0,0 +1,263 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.rpcWorker + +import com.codahale.metrics.MetricRegistry +import com.jcabi.manifests.Manifests +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState +import net.corda.core.crypto.sign +import net.corda.core.flows.FlowLogic +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.ContractUpgradeService +import net.corda.core.node.services.TransactionVerifierService +import net.corda.core.serialization.SerializeAsToken +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.internal.SerializationEnvironmentImpl +import net.corda.core.serialization.internal.effectiveSerializationEnv +import net.corda.core.serialization.internal.nodeSerializationEnv +import net.corda.core.utilities.contextLogger +import net.corda.node.CordaClock +import net.corda.node.SimpleClock +import net.corda.node.VersionInfo +import net.corda.node.internal.* +import net.corda.node.internal.cordapp.CordappConfigFileProvider +import net.corda.node.internal.cordapp.CordappProviderImpl +import net.corda.node.internal.cordapp.JarScanningCordappLoader +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme +import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT +import net.corda.node.serialization.kryo.KryoServerSerializationScheme +import net.corda.node.services.api.AuditService +import net.corda.node.services.api.MonitoringService +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.WritableTransactionStorage +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.identity.PersistentIdentityService +import net.corda.node.services.keys.PersistentKeyManagementService +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.network.* +import net.corda.node.services.persistence.DBTransactionMappingStorage +import net.corda.node.services.persistence.DBTransactionStorage +import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.persistence.NodePropertiesPersistentStore +import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.services.vault.NodeVaultService +import net.corda.nodeapi.internal.NodeInfoAndSigned +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.isH2Database +import net.corda.serialization.internal.* +import org.slf4j.Logger +import rx.schedulers.Schedulers +import java.security.KeyPair +import java.security.cert.X509Certificate +import java.sql.Connection +import java.time.Clock +import java.time.Duration +import java.util.* + +class RpcWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, private val signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, private val ourKeyPair: KeyPair, private val trustRoot: X509Certificate, private val nodeCa: X509Certificate) : ServiceHubInternal, SingletonSerializeAsToken() { + + override val clock: CordaClock = SimpleClock(Clock.systemUTC()) + private val versionInfo = getVersionInfo() + private val cordappLoader = JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo) + + private val log: Logger get() = staticLog + + companion object { + private val staticLog = contextLogger() + } + + private val runOnStop = ArrayList<() -> Any?>() + + override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, false) + override val identityService = PersistentIdentityService() + override val database: CordaPersistence = createCordaPersistence( + configuration.database, + identityService::wellKnownPartyFromX500Name, + identityService::wellKnownPartyFromAnonymous, + schemaService + ) + + init { + // TODO Break cyclic dependency + identityService.database = database + } + + private val persistentNetworkMapCache = PersistentNetworkMapCache(database, myInfo.legalIdentities[0].name) + override val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database) + @Suppress("LeakingThis") + override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database) + private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) } + private val metricRegistry = MetricRegistry() + override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound) + + override val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments) + + @Suppress("LeakingThis") + override val keyManagementService = PersistentKeyManagementService(identityService, database) + private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions) + @Suppress("LeakingThis") + override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService) + override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database) + override val monitoringService = MonitoringService(metricRegistry) + override val networkMapUpdater = NetworkMapUpdater( + networkMapCache, + NodeInfoWatcher( + configuration.baseDirectory, + @Suppress("LeakingThis") + Schedulers.io(), + Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec) + ), + networkMapClient, + configuration.baseDirectory, + configuration.extraNetworkMapKeys + ).closeOnStop() + + override val networkParameters = signedNetworkParameters.networkParameters + + override val transactionVerifierService: TransactionVerifierService + get() { + throw NotImplementedError() + } + override val contractUpgradeService: ContractUpgradeService + get() { + throw NotImplementedError() + } + override val auditService: AuditService + get() { + throw NotImplementedError() + } + + // TODO schedulerService + + override val rpcFlows = ArrayList>>() + override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database) + override val networkService: MessagingService + get() { + throw NotImplementedError() + } + + private fun T.closeOnStop(): T { + runOnStop += this::close + return this + } + + override fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? { + throw NotImplementedError() + } + + override fun loadState(stateRef: StateRef): TransactionState<*> { + return servicesForResolution.loadState(stateRef) + } + + override fun loadStates(stateRefs: Set): Set> { + return servicesForResolution.loadStates(stateRefs) + } + + override fun cordaService(type: Class): T { + throw NotImplementedError() + } + + override fun jdbcSession(): Connection { + throw NotImplementedError() + } + + override fun registerUnloadHandler(runOnStop: () -> Unit) { + this.runOnStop += runOnStop + } + + private fun getVersionInfo(): VersionInfo { + // Manifest properties are only available if running from the corda jar + fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null + + return VersionInfo( + manifestValue("Corda-Platform-Version")?.toInt() ?: 1, + manifestValue("Corda-Release-Version") ?: "Unknown", + manifestValue("Corda-Revision") ?: "Unknown", + manifestValue("Corda-Vendor") ?: "Unknown" + ) + } + + private fun initialiseSerialization() { + val serializationExists = try { + effectiveSerializationEnv + true + } catch (e: IllegalStateException) { + false + } + if (!serializationExists) { + val classloader = cordappLoader.appClassLoader + nodeSerializationEnv = SerializationEnvironmentImpl( + SerializationFactoryImpl().apply { + registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps)) + registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps)) + registerScheme(KryoServerSerializationScheme()) + }, + p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), + rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), + storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), + checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader), + rpcClientContext = AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader)) + } + } + + val rpcOps = CordaRpcWorkerOps(this, {}) + + fun start() { + log.info("Rpc Worker starting up ...") + + initialiseSerialization() + + networkMapClient?.start(trustRoot) + + servicesForResolution.start(networkParameters) + + val isH2Database = isH2Database(configuration.dataSourceProperties.getProperty("dataSource.url", "")) + val schemas = if (isH2Database) schemaService.internalSchemas() else schemaService.schemaOptions.keys + + database.startHikariPool(configuration.dataSourceProperties, configuration.database, schemas) + identityService.start(trustRoot, listOf(myInfo.legalIdentitiesAndCerts.first().certificate, nodeCa)) + persistentNetworkMapCache.start(networkParameters.notaries) + + runOnStop += { rpcOps.shutdown() } + rpcOps.start() + + database.transaction { + networkMapCache.start() + networkMapCache.addNode(myInfo) + } + + val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised -> + ourKeyPair.private.sign(serialised.bytes) + } + identityService.ourNames = myInfo.legalIdentities.map { it.name }.toSet() + + networkMapUpdater.start(trustRoot, signedNetworkParameters.signed.raw.hash, nodeInfoAndSigned.signed.raw.hash) + + database.transaction { + identityService.loadIdentities(myInfo.legalIdentitiesAndCerts) + attachments.start() + nodeProperties.start() + keyManagementService.start(setOf(ourKeyPair)) + vaultService.start() + } + } + + fun stop() { + for (toRun in runOnStop.reversed()) { + toRun() + } + runOnStop.clear() + } +} \ No newline at end of file diff --git a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/SimplisticRpcOps.kt b/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/SimplisticRpcOps.kt deleted file mode 100644 index 97c1974564..0000000000 --- a/experimental/rpc-worker/src/main/kotlin/net/corda/rpcWorker/SimplisticRpcOps.kt +++ /dev/null @@ -1,29 +0,0 @@ -package net.corda.rpcWorker - -import net.corda.core.messaging.RPCOps -import java.lang.management.ManagementFactory -import java.net.InetAddress -import java.time.ZonedDateTime - -// TODO: This interface should really be residing in the "client" sub-module such that the JAR where this interface (but no the implementation) -// Is available to RPC clients -interface SimplisticRpcOps : RPCOps { - fun currentTimeStamp(): String - fun hostnameAndPid(): String -} - -class SimplisticRpcOpsImpl : SimplisticRpcOps { - - override val protocolVersion: Int = 1 - - override fun currentTimeStamp(): String { - return ZonedDateTime.now().toString() - } - - override fun hostnameAndPid(): String { - val info = ManagementFactory.getRuntimeMXBean() - val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this - val hostName: String = InetAddress.getLocalHost().hostName - return "$hostName:$pid" - } -} \ No newline at end of file diff --git a/experimental/rpc-worker/src/main/resources/reference.conf b/experimental/rpc-worker/src/main/resources/reference.conf deleted file mode 100644 index d0b442a7c2..0000000000 --- a/experimental/rpc-worker/src/main/resources/reference.conf +++ /dev/null @@ -1,18 +0,0 @@ -// -// R3 Proprietary and Confidential -// -// Copyright (c) 2018 R3 Limited. All rights reserved. -// -// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. -// -// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - -port = 20002 - -userName = user1 - -password = test1 - -rpcOpsImplClass = net.corda.rpcWorker.SimplisticRpcOpsImpl - -artemisDir = "C:\\Users\\Viktor Kolomeyko\\AppData\\Local\\Temp\\Artemis" \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 4316c4048e..9461ce72a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -530,7 +530,6 @@ class NodeVaultService( val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java) val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java) - // TODO: revisit (use single instance of parser for all queries) val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates) @@ -617,7 +616,7 @@ class NodeVaultService( /** * Derive list from existing vault states and then incrementally update using vault observables */ - private fun bootstrapContractStateTypes() { + fun bootstrapContractStateTypes() { val criteria = criteriaBuilder.createQuery(String::class.java) val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java) criteria.select(vaultStates.get("contractStateClassName")).distinct(true) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 95e8853830..a3ebe46216 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -106,13 +106,13 @@ class DriverDSLImpl( private var _shutdownManager: ShutdownManager? = null override val shutdownManager get() = _shutdownManager!! // Map from a nodes legal name to an observable emitting the number of nodes in its network map. - private val networkVisibilityController = NetworkVisibilityController() + val networkVisibilityController = NetworkVisibilityController() /** * Future which completes when the network map infrastructure is available, whether a local one or one from the CZ. * This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] * object, which is null if the network map is being provided by the CZ. */ - private lateinit var networkMapAvailability: CordaFuture + lateinit var networkMapAvailability: CordaFuture private lateinit var _notaries: CordaFuture> override val notaryHandles: List get() = _notaries.getOrThrow() @@ -889,7 +889,7 @@ class DriverDSLImpl( * Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all * current nodes see everyone. */ -private class NetworkVisibilityController { +class NetworkVisibilityController { private val nodeVisibilityHandles = ThreadBox(HashMap()) fun register(name: CordaX500Name): VisibilityHandle {