ENT-2293 & ENT-2353: Basic implementation of an RPC worker and Flow Worker (#1308)

This commit is contained in:
Matthew Nesbit 2018-08-29 13:45:27 +01:00 committed by cburlinchon
parent 260e50572a
commit 7ead9e5698
16 changed files with 1542 additions and 638 deletions

View File

@ -1,5 +1,15 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
apply plugin: 'kotlin'
apply plugin: 'java'
apply plugin: 'net.corda.plugins.quasar-utils'
description 'Corda Flow Worker'
@ -30,6 +40,7 @@ dependencies {
testCompile "junit:junit:$junit_version"
testCompile(project(':node-driver'))
integrationTestCompile(project(':bridge'))
}
task integrationTest(type: Test) {

View File

@ -1,58 +1,77 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.flowworker
import co.paralleluniverse.fibers.Suspendable
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import net.corda.core.concurrent.CordaFuture
import net.corda.confidential.SwapIdentitiesFlow
import net.corda.core.context.InvocationContext
import net.corda.core.context.Trace
import net.corda.core.crypto.Crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalances
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NodeConfigurationImpl
import net.corda.node.services.config.parseAsNodeConfiguration
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.InitialSessionMessage
import net.corda.node.services.statemachine.SessionId
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.internal.NodeBasedTest
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.core.*
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.MockServices
import net.corda.testing.node.internal.TestCordappDirectories
import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.getCallerPackage
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Before
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Paths
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.*
import kotlin.test.assertEquals
class FlowWorkerTest {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
val serializationEnvironment = SerializationEnvironmentRule(true)
private val portAllocation = PortAllocation.Incremental(10000)
private val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(),
modifiedTime = Instant.now(),
maxMessageSize = MAX_MESSAGE_SIZE,
maxTransactionSize = 4000000,
epoch = 1,
whitelistedContractImplementations = emptyMap()
)
private val bankAKeyPair = generateKeyPair()
private val bankBKeyPair = generateKeyPair()
@ -64,85 +83,182 @@ class FlowWorkerTest {
private val bankBPartyAndCertificate = getTestPartyAndCertificate(bankB)
private val notaryPartyAndCertificate = getTestPartyAndCertificate(notary)
private val cordappPackages = listOf("net.corda.finance")
private val cordapps = cordappsForPackages(getCallerPackage(NodeBasedTest::class)?.let { cordappPackages + it }
?: cordappPackages)
private val bankAInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 1111)), listOf(bankAPartyAndCertificate), 1, 1)
private val bankBInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 1112)), listOf(bankBPartyAndCertificate), 1, 1)
private lateinit var configuration: NodeConfiguration
@Before
fun setup() {
val testConfig = ConfigFactory.parseResources("test-config.conf", ConfigParseOptions.defaults().setAllowMissing(false)).parseAsNodeConfiguration() as NodeConfigurationImpl
configuration = testConfig.copy(baseDirectory = temporaryFolder.root.toPath(), dataSourceProperties = makeTestDataSourceProperties(), cordappDirectories = TestCordappDirectories.cached(cordapps).toList())
}
private val myInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 3334)), listOf(bankAPartyAndCertificate), 1, 1)
private val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(),
modifiedTime = Instant.now(),
maxMessageSize = 10485760,
maxTransactionSize = 4000000,
epoch = 1,
whitelistedContractImplementations = emptyMap()
)
@Test
fun `send message`() {
val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair)
val flowWorker = FlowWorker(flowWorkerServiceHub)
flowWorker.start()
flowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", 3333)), listOf(bankBPartyAndCertificate), 1, 1))
flowWorkerServiceHub.flowFactories[SomeFlowLogic::class.java] = InitiatedFlowFactory.Core { flowSession -> SomeFlowLogic(flowSession) }
val cordaMessage = flowWorkerServiceHub.networkService.createMessage("platform.session", data = ByteSequence.of(InitialSessionMessage(SessionId(1), 1, SomeFlowLogic::class.java.name, 1, "", "test".serialize()).serialize().bytes).bytes)
val artemisMessage = (flowWorkerServiceHub.networkService as P2PMessagingClient).messagingExecutor!!.cordaToArtemisMessage(cordaMessage)
artemisMessage!!.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString(DUMMY_BANK_B_NAME.toString()))
(flowWorkerServiceHub.networkService as P2PMessagingClient).deliver(artemisMessage)
flowWorker.stop()
}
private val cordappDirectories = TestCordappDirectories.cached(cordappsForPackages(listOf("net.corda.finance"))).toList()
@Test
fun `cash issue`() {
val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair)
val flowWorker = FlowWorker(flowWorkerServiceHub)
flowWorker.start()
val baseDirectory = DriverParameters().driverDirectory
val nodeDirectory = baseDirectory / DUMMY_BANK_A_NAME.organisation / "flowWorker"
nodeDirectory.createDirectories()
val brokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort())
flowWorkerServiceHub.database.transaction {
flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
}
val config = genericConfig().copy(myLegalName = DUMMY_BANK_A_NAME, baseDirectory = nodeDirectory,
messagingServerAddress = brokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(),
cordappDirectories = cordappDirectories)
// create test certificates
config.configureWithDevSSLCertificate()
val startFlowEventCashIssue = object : ExternalEvent.ExternalStartFlowEvent<AbstractCashFlow.Result>, DeduplicationHandler {
override val deduplicationHandler = this
override fun insideDatabaseTransaction() {}
override fun afterDatabaseTransaction() {}
override val externalCause = this
override val flowLogic = CashIssueFlow(10.DOLLARS, OpaqueBytes.of(0x01), notary)
override val context = InvocationContext.service("bla", DUMMY_BANK_A_NAME)
private val _future = openFuture<FlowStateMachine<AbstractCashFlow.Result>>()
override fun wireUpFuture(flowFuture: CordaFuture<FlowStateMachine<AbstractCashFlow.Result>>) {
_future.captureLater(flowFuture)
val trustRoot = config.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val nodeCa = config.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
val broker = createFlowWorkerBroker(config, networkParameters.maxMessageSize)
val bridgeControlListener = createBridgeControlListener(config, networkParameters.maxMessageSize)
val flowWorkerRequestQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${bankAKeyPair.public.toStringShort()}"
val flowWorkerReplyQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}reply"
val (session, consumer, producer) = createArtemisClient(config, flowWorkerReplyQueueAddress)
val (flowWorker, flowWorkerServiceHub) = createFlowWorker(config, bankAInfo, networkParameters, bankAKeyPair, trustRoot, nodeCa)
try {
flowWorkerServiceHub.database.transaction {
flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
}
override val future: CordaFuture<FlowStateMachine<AbstractCashFlow.Result>>
get() = _future
}
val result = flowWorker.startFlow(startFlowEventCashIssue)
println(result.getOrThrow().resultFuture.getOrThrow())
println("Cash " + flowWorkerServiceHub.getCashBalances())
val traceId = Trace.InvocationId.newInstance()
val startFlowMessage = StartFlow(DUMMY_BANK_A_NAME, CashIssueFlow::class.java, arrayOf(10.DOLLARS, OpaqueBytes.of(0x01), notary),
InvocationContext.service("bla", DUMMY_BANK_A_NAME), flowWorkerReplyQueueAddress, traceId)
val message = session.createMessage(true)
message.writeBodyBufferBytes(startFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes)
flowWorker.stop()
producer.send(flowWorkerRequestQueueAddress, message)
val flowReplyStateMachineRunId = receiveFlowWorkerMessage<FlowReplyStateMachineRunId>(consumer)
println(flowReplyStateMachineRunId)
val flowReplyResult = receiveFlowWorkerMessage<FlowReplyResult>(consumer)
assertEquals(traceId, flowReplyResult.replyId)
println(flowReplyResult)
val cashBalance = flowWorkerServiceHub.getCashBalances()
assertEquals(10.DOLLARS, cashBalance[USD])
println("Cash: $cashBalance")
} finally {
flowWorker.stop()
bridgeControlListener.stop()
broker.stop()
}
}
}
@Test
fun `swap identities`() {
val baseDirectory = DriverParameters().driverDirectory
private class SomeFlowLogic(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
println("FLOW START")
session.send("FLOW SEND A MESSAGE")
println("FLOW END")
val bankANodeDirectory = baseDirectory / DUMMY_BANK_A_NAME.organisation / "flowWorker"
bankANodeDirectory.createDirectories()
val bankAbrokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort())
val bankAConfig = genericConfig().copy(myLegalName = DUMMY_BANK_A_NAME, baseDirectory = bankANodeDirectory,
messagingServerAddress = bankAbrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(),
cordappDirectories = cordappDirectories)
// create test certificates
bankAConfig.configureWithDevSSLCertificate()
val bankATrustRoot = bankAConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val bankANodeCa = bankAConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
val bankABroker = createFlowWorkerBroker(bankAConfig, networkParameters.maxMessageSize)
val bankABridgeControlListener = createBridgeControlListener(bankAConfig, networkParameters.maxMessageSize)
val (bankAFlowWorker, bankAFlowWorkerServiceHub) = createFlowWorker(bankAConfig, bankAInfo, networkParameters, bankAKeyPair, bankATrustRoot, bankANodeCa)
val bankARequestQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${bankAKeyPair.public.toStringShort()}"
val bankAReplyQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}reply"
val (bankASession, bankAConsumer, bankAProducer) = createArtemisClient(bankAConfig, bankAReplyQueueAddress)
val bankBNodeDirectory = baseDirectory / DUMMY_BANK_B_NAME.organisation / "flowWorker"
bankBNodeDirectory.createDirectories()
val bankBbrokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort())
val bankBConfig = genericConfig().copy(myLegalName = DUMMY_BANK_B_NAME, baseDirectory = bankBNodeDirectory,
messagingServerAddress = bankBbrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(),
cordappDirectories = cordappDirectories)
// create test certificates
bankBConfig.configureWithDevSSLCertificate()
val bankBTrustRoot = bankBConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val bankBNodeCa = bankBConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
// NetworkParametersCopier(networkParameters).install(bankBConfig.baseDirectory)
val bankBBroker = createFlowWorkerBroker(bankBConfig, networkParameters.maxMessageSize)
val bankBBridgeControlListener = createBridgeControlListener(bankBConfig, networkParameters.maxMessageSize)
val (bankBFlowWorker, bankBFlowWorkerServiceHub) = createFlowWorker(bankBConfig, bankBInfo, networkParameters, bankBKeyPair, bankBTrustRoot, bankBNodeCa)
try {
bankAFlowWorkerServiceHub.database.transaction {
bankAFlowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
bankAFlowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", bankBConfig.messagingServerAddress!!.port)), listOf(bankBPartyAndCertificate), 1, 1))
}
bankBFlowWorkerServiceHub.database.transaction {
bankBFlowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
bankBFlowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", bankAConfig.messagingServerAddress!!.port)), listOf(bankAPartyAndCertificate), 1, 1))
}
val swapIdentitiesTraceId = Trace.InvocationId.newInstance()
val swapIdentitiesStartFlowMessage = StartFlow(DUMMY_BANK_A_NAME, SwapIdentitiesFlow::class.java, arrayOf(bankB),
InvocationContext.service("bla", DUMMY_BANK_A_NAME), bankAReplyQueueAddress, swapIdentitiesTraceId)
val swapIdentitiesMessage = bankASession.createMessage(true)
swapIdentitiesMessage.writeBodyBufferBytes(swapIdentitiesStartFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes)
bankAProducer.send(bankARequestQueueAddress, swapIdentitiesMessage)
val swapIdentitiesStateMachineRunId = receiveFlowWorkerMessage<FlowReplyStateMachineRunId>(bankAConsumer)
println(swapIdentitiesStateMachineRunId)
val swapIdentitiesResult = receiveFlowWorkerMessage<FlowReplyResult>(bankAConsumer)
assertEquals(swapIdentitiesTraceId, swapIdentitiesResult.replyId)
println(swapIdentitiesResult)
} finally {
bankAFlowWorker.stop()
bankBFlowWorker.stop()
bankABridgeControlListener.stop()
bankBBridgeControlListener.stop()
bankABroker.stop()
bankBBroker.stop()
}
}
private fun genericConfig(): NodeConfigurationImpl {
return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = DUMMY_BANK_A_NAME, emailAddress = "",
keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(),
rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null),
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)),
notary = null)
}
private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize)
broker.start()
return broker
}
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, networkParameters, ourKeyPair, trustRoot, nodeCa)
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
flowWorker.start()
return Pair(flowWorker, flowWorkerServiceHub)
}
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
val bridgeControlListener = BridgeControlListener(config, config.messagingServerAddress!!, maxMessageSize)
bridgeControlListener.start()
return bridgeControlListener
}
private fun createArtemisClient(config: NodeConfiguration, queueAddress: String): Triple<ClientSession, ClientConsumer, ClientProducer> {
val artemisClient = ArtemisMessagingClient(config, config.messagingServerAddress!!, MAX_MESSAGE_SIZE)
val started = artemisClient.start()
started.session.createQueue(queueAddress, RoutingType.ANYCAST, queueAddress, true)
return Triple(started.session, started.session.createConsumer(queueAddress), started.session.createProducer())
}
private inline fun <reified T : FlowWorkerMessage> receiveFlowWorkerMessage(consumer: ClientConsumer): T {
val message = consumer.receive()
val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
return data.deserialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT)
}
}

View File

@ -1,47 +0,0 @@
baseDirectory = ""
myLegalName = "O=Bank A, L=London, C=GB"
emailAddress = ""
keyStorePassword = "cordacadevpass"
trustStorePassword = "trustpass"
dataSourceProperties = {
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
dataSource.url = "jdbc:h2:file:blah"
dataSource.user = "sa"
dataSource.password = ""
}
verifierType = InMemory
p2pAddress = "localhost:3334"
flowTimeout {
timeout = 30 seconds
maxRestartCount = 3
backoffBase = 2.0
}
devMode = true
crlCheckSoftFail = true
database = {
transactionIsolationLevel = "REPEATABLE_READ"
exportHibernateJMXStatistics = "false"
}
h2port = 0
useTestClock = false
rpcSettings = {
address = "locahost:3418"
adminAddress = "localhost:3419"
useSsl = false
standAloneBroker = false
}
enterpriseConfiguration = {
mutualExclusionConfiguration = {
on = false
updateInterval = 20000
waitInterval = 40000
}
tuning = {
flowThreadPoolSize = 1
rpcThreadPoolSize = 4
maximumMessagingBatchSize = 256
p2pConfirmationWindowSize = 1048576
brokerConnectionTtlCheckIntervalMs = 20
}
useMultiThreadedSMM = true
}

View File

@ -1,24 +1,168 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.flowworker
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.context.Trace
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import java.util.*
import kotlin.concurrent.thread
class FlowWorker(private val flowWorkerServiceHub: FlowWorkerServiceHub) {
class FlowWorker(flowWorkerId: String, private val flowWorkerServiceHub: FlowWorkerServiceHub) {
companion object {
const val FLOW_WORKER_QUEUE_ADDRESS_PREFIX = "${ArtemisMessagingComponent.INTERNAL_PREFIX}flow.worker."
}
private val queueAddress = "$FLOW_WORKER_QUEUE_ADDRESS_PREFIX${flowWorkerServiceHub.myInfo.legalIdentities[0].owningKey.toStringShort()}"
private val queueName = "$queueAddress.$flowWorkerId"
private val runOnStop = ArrayList<() -> Any?>()
fun start() {
flowWorkerServiceHub.start()
runOnStop += { flowWorkerServiceHub.stop() }
val flowWorkerMessagingClient = ArtemisMessagingClient(flowWorkerServiceHub.configuration, flowWorkerServiceHub.configuration.messagingServerAddress!!, flowWorkerServiceHub.networkParameters.maxMessageSize)
runOnStop += { flowWorkerMessagingClient.stop() }
val session = flowWorkerMessagingClient.start().session
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
session.createQueue(queueAddress, RoutingType.ANYCAST, queueName, true)
}
val consumer = session.createConsumer(queueName)
val producer = session.createProducer()
consumer.setMessageHandler { message -> handleFlowWorkerMessage(message, session, producer) }
thread {
(flowWorkerServiceHub.networkService as P2PMessagingClient).run()
}
}
private fun handleFlowWorkerMessage(message: ClientMessage, session: ClientSession, producer: ClientProducer) {
val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val flowWorkerMessage = data.deserialize<FlowWorkerMessage>(context = SerializationDefaults.RPC_SERVER_CONTEXT)
when (flowWorkerMessage) {
is StartFlow -> handleStartFlowMessage(flowWorkerMessage, session, producer)
is NetworkMapUpdate -> handleNetworkMapUpdateMessage(flowWorkerMessage)
}
}
private fun handleStartFlowMessage(startFlowMessage: StartFlow, session: ClientSession, producer: ClientProducer) {
val logicRef = flowWorkerServiceHub.flowLogicRefFactory.createForRPC(startFlowMessage.logicType, *startFlowMessage.args)
val logic: FlowLogic<*> = uncheckedCast(flowWorkerServiceHub.flowLogicRefFactory.toFlowLogic(logicRef))
val result = startFlow(logic, startFlowMessage.context).get()
val stateMachineRunIdMessage = session.createMessage(true)
stateMachineRunIdMessage.writeBodyBufferBytes(FlowReplyStateMachineRunId(flowWorkerServiceHub.myInfo.legalIdentities.first().name, startFlowMessage.replyId, result.id).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes)
producer.send(startFlowMessage.clientAddress, stateMachineRunIdMessage)
result.resultFuture.then {
val resultMessage = session.createMessage(true)
resultMessage.writeBodyBufferBytes(FlowReplyResult(flowWorkerServiceHub.myInfo.legalIdentities.first().name, startFlowMessage.replyId, it.get()).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes)
producer.send(startFlowMessage.clientAddress, resultMessage)
}
}
private fun handleNetworkMapUpdateMessage(networkMapUpdateMessage: NetworkMapUpdate) {
val mapChange = networkMapUpdateMessage.mapChange
// TODO remove
if (mapChange is NetworkMapCache.MapChange.Added) {
flowWorkerServiceHub.networkMapCache.addNode(mapChange.node)
mapChange.node.legalIdentitiesAndCerts.forEach {
try {
flowWorkerServiceHub.identityService.verifyAndRegisterIdentity(it)
} catch (ignore: Exception) {
// Log a warning to indicate node info is not added to the network map cache.
// NetworkMapCacheImpl.logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.")
}
}
}
}
fun stop() {
flowWorkerServiceHub.stop()
}
fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<FlowStateMachine<T>> {
flowWorkerServiceHub.database.transaction {
flowWorkerServiceHub.smm.deliverExternalEvent(event)
for (toRun in runOnStop.reversed()) {
toRun()
}
return event.future
runOnStop.clear()
}
}
private fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>> {
val startFlowEvent = object : ExternalEvent.ExternalStartFlowEvent<T>, DeduplicationHandler {
override fun insideDatabaseTransaction() {}
override fun afterDatabaseTransaction() {}
override val externalCause: ExternalEvent
get() = this
override val deduplicationHandler: DeduplicationHandler
get() = this
override val flowLogic: FlowLogic<T>
get() = logic
override val context: InvocationContext
get() = context
override fun wireUpFuture(flowFuture: CordaFuture<FlowStateMachine<T>>) {
_future.captureLater(flowFuture)
}
private val _future = openFuture<FlowStateMachine<T>>()
override val future: CordaFuture<FlowStateMachine<T>>
get() = _future
}
flowWorkerServiceHub.database.transaction {
flowWorkerServiceHub.smm.deliverExternalEvent(startFlowEvent)
}
return startFlowEvent.future
}
}
@CordaSerializable
sealed class FlowWorkerMessage() {
abstract val legalName: CordaX500Name
}
data class StartFlow(override val legalName: CordaX500Name, val logicType: Class<out FlowLogic<*>>, val args: Array<out Any?>, val context: InvocationContext, val clientAddress: String, val replyId: Trace.InvocationId) : FlowWorkerMessage()
data class FlowReplyStateMachineRunId(override val legalName: CordaX500Name, val replyId: Trace.InvocationId, val id: StateMachineRunId) : FlowWorkerMessage()
data class FlowReplyResult(override val legalName: CordaX500Name, val replyId: Trace.InvocationId, val result: Any?) : FlowWorkerMessage()
data class NetworkMapUpdate(override val legalName: CordaX500Name, val mapChange: NetworkMapCache.MapChange) : FlowWorkerMessage()

View File

@ -1,15 +1,31 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.flowworker
import com.codahale.metrics.MetricRegistry
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.MoreExecutors
import com.jcabi.manifests.Manifests
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.confidential.SwapIdentitiesFlow
import net.corda.confidential.SwapIdentitiesHandler
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.newSecureRandom
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.CordaService
@ -18,73 +34,89 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.CordaClock
import net.corda.node.SimpleClock
import net.corda.node.VersionInfo
import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.*
import net.corda.node.internal.classloading.requireAnnotation
import net.corda.node.internal.cordapp.CordappConfigFileProvider
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.node.internal.cordapp.JarScanningCordappLoader
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.api.DummyAuditService
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.config.shouldInitCrashShell
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.network.*
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.*
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.isH2Database
import net.corda.serialization.internal.*
import org.apache.activemq.artemis.utils.ReusableLatch
import rx.schedulers.Schedulers
import org.slf4j.Logger
import rx.Observable
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Clock
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.reflect.KClass
class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair) : ServiceHubInternal, SingletonSerializeAsToken() {
class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair, private val trustRoot: X509Certificate, private val nodeCa: X509Certificate) : ServiceHubInternal, SingletonSerializeAsToken() {
override val clock: CordaClock = SimpleClock(Clock.systemUTC())
private val versionInfo = getVersionInfo()
private val cordappLoader = JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo)
private val sameVmNodeCounter = AtomicInteger()
private val serverThread = AffinityExecutor.ServiceAffinityExecutor("FlowWorker thread-${sameVmNodeCounter.incrementAndGet()}", 1)
private val busyNodeLatch = ReusableLatch()
private val log: Logger get() = staticLog
companion object {
@JvmStatic
private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader {
return JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo)
}
private val staticLog = contextLogger()
}
private val versionInfo = getVersionInfo()
override val clock: CordaClock = SimpleClock(Clock.systemUTC())
private val runOnStop = ArrayList<() -> Any?>()
val cordappLoader = makeCordappLoader(configuration, versionInfo)
@Suppress("LeakingThis")
private var tokenizableServices: MutableList<Any>? = mutableListOf(clock, this)
private val runOnStop = ArrayList<() -> Any?>()
override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null).tokenize()
init {
(serverThread as? ExecutorService)?.let {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(it, 50, TimeUnit.SECONDS)
}
}
}
override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, false).tokenize()
override val identityService = PersistentIdentityService().tokenize()
override val database: CordaPersistence = createCordaPersistence(
configuration.database,
@ -103,7 +135,6 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
private val checkpointStorage = DBCheckpointStorage()
@Suppress("LeakingThis")
override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database).tokenize()
private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
private val metricRegistry = MetricRegistry()
override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize()
override val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize()
@ -113,19 +144,14 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
@Suppress("LeakingThis")
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService).tokenize()
override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
override val monitoringService = MonitoringService(metricRegistry).tokenize()
override val networkMapUpdater = NetworkMapUpdater(
networkMapCache,
NodeInfoWatcher(
configuration.baseDirectory,
@Suppress("LeakingThis")
Schedulers.io(),
Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)
),
networkMapClient,
configuration.baseDirectory,
configuration.extraNetworkMapKeys
).closeOnStop()
override val networkMapUpdater: NetworkMapUpdater
get() {
throw NotImplementedError()
}
private val transactionVerifierWorkerCount = 4
@Suppress("LeakingThis")
override val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize()
@ -133,19 +159,54 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
override val auditService = DummyAuditService().tokenize()
@Suppress("LeakingThis")
val smm = MultiThreadedStateMachineManager(this, checkpointStorage, MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize), database, newSecureRandom(), ReusableLatch(), cordappLoader.appClassLoader)
val smm = makeStateMachineManager()
private fun makeStateMachineManager(): StateMachineManager {
val executor = MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
runOnStop += { executor.shutdown() }
return MultiThreadedStateMachineManager(
this,
checkpointStorage,
executor,
database,
newSecureRandom(),
busyNodeLatch,
cordappLoader.appClassLoader
)
}
// TODO Making this non-lateinit requires MockNode being able to create a blank InMemoryMessaging instance
private lateinit var network: MessagingService
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
override val rpcFlows: ArrayList<Class<out FlowLogic<*>>>
get() {
throw NotImplementedError()
}
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val networkService: MessagingService get() = network
/**
* Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database
*/
// TODO val nodeReadyFuture: CordaFuture<Unit> get() = networkMapCache.nodeReady.map { Unit }
// TODO started
private fun <T : Any> T.tokenize(): T {
tokenizableServices?.add(this)
?: throw IllegalStateException("The tokenisable services list has already been finialised")
return this
}
private fun <T : AutoCloseable> T.closeOnStop(): T {
runOnStop += this::close
return this
}
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {
return flowFactories[initiatingFlowClass]
}
@ -167,13 +228,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
override fun jdbcSession(): Connection = database.createSession()
override fun registerUnloadHandler(runOnStop: () -> Unit) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
private fun <T : Any> T.tokenize(): T {
tokenizableServices?.add(this)
?: throw IllegalStateException("The tokenisable services list has already been finialised")
return this
this.runOnStop += runOnStop
}
private fun getVersionInfo(): VersionInfo {
@ -192,9 +247,8 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
return P2PMessagingClient(
config = configuration,
versionInfo = versionInfo,
serverAddress = configuration.messagingServerAddress
?: NetworkHostAndPort("localhost", configuration.p2pAddress.port),
nodeExecutor = AffinityExecutor.ServiceAffinityExecutor("Flow Worker", 1),
serverAddress = configuration.messagingServerAddress!!,
nodeExecutor = serverThread,
database = database,
networkMap = networkMapCache,
metricRegistry = metricRegistry,
@ -203,6 +257,87 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
)
}
private fun registerCordappFlows() {
cordappLoader.cordapps.flatMap { it.initiatedFlows }
.forEach {
try {
registerInitiatedFlowInternal(smm, it, track = false)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as an initiated flow, must have a constructor with a single parameter " +
"of type ${Party::class.java.name}")
} catch (e: Exception) {
log.error("Unable to register initiated flow ${it.name}", e)
}
}
}
// TODO remove once not needed
private fun deprecatedFlowConstructorMessage(flowClass: Class<*>): String {
return "Installing flow factory for $flowClass accepting a ${Party::class.java.simpleName}, which is deprecated. " +
"It should accept a ${FlowSession::class.java.simpleName} instead"
}
private fun <F : FlowLogic<*>> registerInitiatedFlowInternal(smm: StateMachineManager, initiatedFlow: Class<F>, track: Boolean): Observable<F> {
val constructors = initiatedFlow.declaredConstructors.associateBy { it.parameterTypes.toList() }
val flowSessionCtor = constructors[listOf(FlowSession::class.java)]?.apply { isAccessible = true }
val ctor: (FlowSession) -> F = if (flowSessionCtor == null) {
// Try to fallback to a Party constructor
val partyCtor = constructors[listOf(Party::class.java)]?.apply { isAccessible = true }
if (partyCtor == null) {
throw IllegalArgumentException("$initiatedFlow must have a constructor accepting a ${FlowSession::class.java.name}")
} else {
log.warn(deprecatedFlowConstructorMessage(initiatedFlow))
}
{ flowSession: FlowSession -> uncheckedCast(partyCtor.newInstance(flowSession.counterparty)) }
} else {
{ flowSession: FlowSession -> uncheckedCast(flowSessionCtor.newInstance(flowSession)) }
}
val initiatingFlow = initiatedFlow.requireAnnotation<InitiatedBy>().value.java
val (version, classWithAnnotation) = initiatingFlow.flowVersionAndInitiatingClass
require(classWithAnnotation == initiatingFlow) {
"${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}"
}
val flowFactory = InitiatedFlowFactory.CorDapp(version, initiatedFlow.appName, ctor)
val observable = internalRegisterFlowFactory(smm, initiatingFlow, flowFactory, initiatedFlow, track)
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
return observable
}
private fun <F : FlowLogic<*>> internalRegisterFlowFactory(smm: StateMachineManager,
initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
val observable = if (track) {
smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass)
} else {
Observable.empty()
}
flowFactories[initiatingFlowClass] = flowFactory
return observable
}
/**
* Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using
* [InitiatingFlow.version], core flows have the same version as the node's platform version. To cater for backwards
* compatibility [flowFactory] provides a second parameter which is the platform version of the initiating party.
*/
@VisibleForTesting
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, flowFactory: (FlowSession) -> FlowLogic<*>) {
require(clientFlowClass.java.flowVersionAndInitiatingClass.first == 1) {
"${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version"
}
flowFactories[clientFlowClass.java] = InitiatedFlowFactory.Core(flowFactory)
log.debug { "Installed core flow ${clientFlowClass.java.name}" }
}
private fun installCoreFlows() {
installCoreFlow(FinalityFlow::class, ::FinalityHandler)
installCoreFlow(NotaryChangeFlow::class, ::NotaryChangeHandler)
installCoreFlow(ContractUpgradeFlow.Initiate::class, ::ContractUpgradeHandler)
installCoreFlow(SwapIdentitiesFlow::class, ::SwapIdentitiesHandler)
}
private fun initialiseSerialization() {
val serializationExists = try {
effectiveSerializationEnv
@ -222,41 +357,37 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader),
rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node
rpcClientContext = AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader))
}
}
fun start() {
log.info("Flow Worker starting up ...")
initialiseSerialization()
// TODO First thing we do is create the MessagingService. This should have been done by the c'tor but it's not
// possible (yet) to due restriction from MockNode
network = makeMessagingService().tokenize()
// TODO
configuration.configureWithDevSSLCertificate()
val trustRoot = DEV_ROOT_CA.certificate
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
networkMapClient?.start(trustRoot)
installCoreFlows()
registerCordappFlows()
servicesForResolution.start(networkParameters)
persistentNetworkMapCache.start(networkParameters.notaries)
val isH2Database = isH2Database(configuration.dataSourceProperties.getProperty("dataSource.url", ""))
val schemas = if (isH2Database) schemaService.internalSchemas() else schemaService.schemaOptions.keys
database.startHikariPool(configuration.dataSourceProperties, configuration.database, schemas)
identityService.start(trustRoot, listOf(myInfo.legalIdentitiesAndCerts.first().certificate, nodeCa))
persistentNetworkMapCache.start(networkParameters.notaries)
database.transaction {
networkMapCache.start()
}
// TODO
//networkMapUpdater.start(trustRoot, signedNetParams.raw.hash, signedNodeInfo.raw.hash)
startMessaging()
identityService.ourNames = myInfo.legalIdentities.map { it.name }.toSet()
startMessagingService()
database.transaction {
identityService.loadIdentities(myInfo.legalIdentitiesAndCerts)
@ -271,8 +402,8 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
val frozenTokenizableServices = tokenizableServices!!
tokenizableServices = null
smm.start(frozenTokenizableServices)
runOnStop += { smm.stop(0) }
smm.start(frozenTokenizableServices)
}
}
@ -283,36 +414,15 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
runOnStop.clear()
}
private fun startMessaging() {
private fun startMessagingService() {
val client = network as P2PMessagingClient
val messageBroker = if (!configuration.messagingServerExternal) {
val brokerBindAddress = configuration.messagingServerAddress
?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
} else {
null
}
// Start up the embedded MQ server
messageBroker?.apply {
closeOnStop()
start()
}
Node.printBasicNodeInfo("Advertised P2P messaging addresses", myInfo.addresses.joinToString())
client.closeOnStop()
client.start(
myIdentity = myInfo.legalIdentities[0].owningKey,
serviceIdentity = if (myInfo.legalIdentities.size == 1) null else myInfo.legalIdentities[1].owningKey,
advertisedAddress = myInfo.addresses.single(),
serviceIdentity = null,
maxMessageSize = networkParameters.maxMessageSize,
legalName = myInfo.legalIdentities[0].name.toString()
)
}
private fun <T : AutoCloseable> T.closeOnStop(): T {
runOnStop += this::close
return this
}
}

View File

@ -9,8 +9,8 @@
*/
apply plugin: 'kotlin'
apply plugin: 'idea'
apply plugin: 'net.corda.plugins.cordapp'
apply plugin: 'net.corda.plugins.quasar-utils'
description 'Experiment to make out-of-node RPC processing'
@ -48,6 +48,8 @@ dependencies {
compile "info.picocli:picocli:$picocli_version"
integrationTestCompile project(":test-utils")
integrationTestCompile project(":node-driver")
compile project(":experimental:flow-worker")
}
jar {

View File

@ -0,0 +1,196 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.rpcWorker
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.sign
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.createDirectory
import net.corda.core.internal.div
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.flowworker.FlowWorker
import net.corda.flowworker.FlowWorkerServiceHub
import net.corda.node.internal.NetworkParametersReader
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.rpc.ArtemisRpcBroker
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.testing.driver.DriverParameters
import net.corda.testing.node.MockServices
import net.corda.testing.node.internal.DriverDSLImpl
import net.corda.testing.node.internal.InternalDriverDSL
import net.corda.testing.node.internal.TestCordappDirectories
import net.corda.testing.node.internal.genericDriver
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.commons.io.FileUtils
import java.nio.file.Paths
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.util.*
fun <A> rpcFlowWorkerDriver(
defaultParameters: DriverParameters = DriverParameters(),
dsl: RpcFlowWorkerDriverDSL.() -> A
): A {
return genericDriver(
defaultParameters = defaultParameters,
driverDslWrapper = { driverDSL: DriverDSLImpl -> RpcFlowWorkerDriverDSL(driverDSL) },
coerce = { it }, dsl = dsl
)
}
data class RpcFlowWorkerHandle(val rpcAddress: NetworkHostAndPort)
data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL {
fun startRpcFlowWorker(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int = 1): CordaFuture<RpcFlowWorkerHandle> {
val (config, rpcWorkerConfig, flowWorkerConfigs) = generateConfigs(myLegalName, rpcUsers, numberOfFlowWorkers)
val trustRoot = rpcWorkerConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val nodeCa = rpcWorkerConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
val ourKeyPair = Crypto.generateKeyPair()
val ourParty = Party(myLegalName, ourKeyPair.public)
val ourPartyAndCertificate = getTestPartyAndCertificate(ourParty)
val myInfo = NodeInfo(listOf(config.messagingServerAddress!!), listOf(ourPartyAndCertificate), 1, 1)
val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised ->
ourKeyPair.private.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(rpcWorkerConfig.baseDirectory, nodeInfoAndSigned)
return driverDSL.networkMapAvailability.flatMap {
val visibilityHandle = driverDSL.networkVisibilityController.register(myLegalName)
it!!.networkParametersCopier.install(rpcWorkerConfig.baseDirectory)
it.nodeInfosCopier.addConfig(rpcWorkerConfig.baseDirectory)
val signedNetworkParameters = NetworkParametersReader(trustRoot, null, rpcWorkerConfig.baseDirectory).read()
val flowWorkerBroker = createFlowWorkerBroker(config, signedNetworkParameters.networkParameters.maxMessageSize)
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize)
flowWorkerConfigs.map {
val (flowWorker, _) = createFlowWorker(it, myInfo, signedNetworkParameters.networkParameters, ourKeyPair, trustRoot, nodeCa)
shutdownManager.registerShutdown { flowWorker.stop() }
}
val (rpcWorker, rpcWorkerServiceHub) = createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl)
val bridgeControlListener = createBridgeControlListener(config, signedNetworkParameters.networkParameters.maxMessageSize)
shutdownManager.registerShutdown {
bridgeControlListener.stop()
rpcWorker.stop()
flowWorkerBroker.stop()
rpcWorkerBroker.stop()
}
visibilityHandle.listen(rpcWorkerServiceHub.rpcOps).map {
RpcFlowWorkerHandle(rpcWorkerConfig.rpcOptions.address)
}
}
}
private fun generateConfigs(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int): Triple<NodeConfiguration, NodeConfiguration, List<NodeConfiguration>> {
val cordappDirectories = TestCordappDirectories.cached(driverDSL.cordappsForAllNodes).toList()
val rpcWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val rpcWorkerBrokerAdminAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val flowWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val baseDirectory = driverDSL.driverDirectory / myLegalName.organisation
baseDirectory.createDirectory()
val config = genericConfig().copy(myLegalName = myLegalName, baseDirectory = baseDirectory,
messagingServerAddress = flowWorkerBrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(),
cordappDirectories = cordappDirectories)
// create test certificates
config.configureWithDevSSLCertificate()
val rpcWorkerConfig = config.copy(baseDirectory = driverDSL.driverDirectory / myLegalName.organisation / "rpcWorker",
rpcUsers = rpcUsers.map { User(it.username, it.password, it.permissions) },
rpcSettings = NodeRpcSettings(rpcWorkerBrokerAddress, rpcWorkerBrokerAdminAddress, true, false, null))
// copy over certificates to RpcWorker
FileUtils.copyDirectory(config.certificatesDirectory.toFile(), (rpcWorkerConfig.baseDirectory / "certificates").toFile())
val flowWorkerConfigs = (1..numberOfFlowWorkers).map {
val flowWorkerConfig = config.copy(baseDirectory = driverDSL.driverDirectory / myLegalName.organisation / "flowWorker$it")
// copy over certificates to FlowWorker
FileUtils.copyDirectory(config.certificatesDirectory.toFile(), (flowWorkerConfig.baseDirectory / "certificates").toFile())
flowWorkerConfig
}
return Triple(config, rpcWorkerConfig, flowWorkerConfigs)
}
}
private fun genericConfig(): NodeConfigurationImpl {
return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = DUMMY_BANK_A_NAME, emailAddress = "",
keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(),
rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null),
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)),
notary = null)
}
private fun createRpcWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
val rpcOptions = config.rpcOptions
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers))
val broker = if (rpcOptions.useSsl) {
ArtemisRpcBroker.withSsl(config, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
} else {
ArtemisRpcBroker.withoutSsl(config, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
}
broker.start()
return broker
}
private fun createRpcWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, serverControl: ActiveMQServerControl): Pair<RpcWorker, RpcWorkerServiceHub> {
val rpcWorkerServiceHub = RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa)
val rpcWorker = RpcWorker(rpcWorkerServiceHub, serverControl)
rpcWorker.start()
return Pair(rpcWorker, rpcWorkerServiceHub)
}
private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize)
broker.start()
return broker
}
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, networkParameters, ourKeyPair, trustRoot, nodeCa)
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
flowWorker.start()
return Pair(flowWorker, flowWorkerServiceHub)
}
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
val bridgeControlListener = BridgeControlListener(config, config.messagingServerAddress!!, maxMessageSize)
bridgeControlListener.start()
return bridgeControlListener
}

View File

@ -1,111 +1,47 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.rpcWorker
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.RPCClient
import net.corda.core.internal.deleteRecursively
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.internal.config.User
import net.corda.testing.internal.setGlobalSerialization
import org.junit.After
import org.junit.Before
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.node.User
import org.junit.Test
import rx.Observable
import java.nio.file.Files
import kotlin.test.assertEquals
class RpcWorkerTest {
private val rpcAddress = NetworkHostAndPort("localhost", 10000)
private val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
private val user = User("user1", "test", emptySet())
private val serializationEnv = setGlobalSerialization(true)
private val artemisPath = Files.createTempDirectory("RpcWorkerArtemis")
private val instance = RpcWorker(rpcAddress, user, TestRpcOpsImpl(), artemisPath)
@Before
fun setup() {
instance.start()
}
@After
fun tearDown() {
instance.close()
serializationEnv.unset()
artemisPath.deleteRecursively()
}
private fun withConnectionEstablished(block: (rpcOps: TestRpcOps) -> Unit) {
val client = RPCClient<TestRpcOps>(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), rpcConfiguration)
val connection = client.start(TestRpcOps::class.java, user.username, user.password)
try {
val rpcOps = connection.proxy
block(rpcOps)
} finally {
connection.close()
}
}
@Test
fun testPing() {
withConnectionEstablished {rpcOps ->
assertEquals("pong", rpcOps.ping())
fun `cash pay`() {
rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
val bankAUser = User("username", "password", permissions = setOf("ALL"))
val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser), 2).get()
val bankB = startNode().get()
val bankAProxy = CordaRPCClient(bankA.rpcAddress).start("username", "password").proxy
val cashIssueResult = bankAProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0x01), defaultNotaryIdentity).returnValue.get()
println(cashIssueResult)
println(bankAProxy.getCashBalances())
val cashPayResult = bankAProxy.startFlow(::CashPaymentFlow, 2.DOLLARS, bankB.nodeInfo.singleIdentity()).returnValue.get()
println(cashPayResult)
println(bankAProxy.getCashBalances())
}
}
@Test
fun testReverse() {
withConnectionEstablished {rpcOps ->
val exampleStr = "Makka Pakka"
assertEquals(exampleStr.reversed(), rpcOps.reverse(exampleStr))
}
}
@Test
fun testObservable() {
withConnectionEstablished { rpcOps ->
val start = 21
val end = 100
val observable = rpcOps.incSequence(start)
observable.take(end - start).zipWith((start..end).asIterable()) { a, b -> Pair(a, b) }.forEach {
assertEquals(it.first, it.second)
}
}
}
/**
* Defines communication protocol
*/
interface TestRpcOps : RPCOps {
fun ping() : String
fun reverse(str : String) : String
fun incSequence(start : Int) : Observable<Int>
}
/**
* Server side implementation
*/
class TestRpcOpsImpl : TestRpcOps {
override val protocolVersion: Int = 1
override fun ping(): String {
return "pong"
}
override fun reverse(str: String): String {
return str.reversed()
}
override fun incSequence(start: Int): Observable<Int> {
return Observable.range(start, 100)
}
}
}

View File

@ -0,0 +1,376 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.rpcWorker
import com.google.common.util.concurrent.SettableFuture
import net.corda.client.rpc.notUsed
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.*
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.sign
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.flowworker.*
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.vault.NodeVaultService
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import java.io.InputStream
import java.net.ConnectException
import java.security.PublicKey
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
class CordaRpcWorkerOps(
private val services: ServiceHubInternal,
private val shutdownNode: () -> Unit
) : CordaRPCOps {
companion object {
const val RPC_WORKER_QUEUE_ADDRESS_PREFIX = "${ArtemisMessagingComponent.INTERNAL_PREFIX}rpc.worker."
}
private val flowWorkerQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${services.myInfo.legalIdentities[0].owningKey.toStringShort()}"
private val rpcWorkerQueueAddress = "$RPC_WORKER_QUEUE_ADDRESS_PREFIX${services.myInfo.legalIdentities[0].owningKey.toStringShort()}"
private val rpcWorkerId = UUID.randomUUID().toString()
private val rpcWorkerQueueName = "$rpcWorkerQueueAddress.$rpcWorkerId"
private val artemisClient = ArtemisMessagingClient(services.configuration, services.configuration.messagingServerAddress!!, services.networkParameters.maxMessageSize)
private lateinit var session: ClientSession
private lateinit var producer: ClientProducer
private val flowReplyStateMachineRunIdMap = ConcurrentHashMap<Trace.InvocationId, SettableFuture<StateMachineRunId>>()
private val flowReplyResultMap = ConcurrentHashMap<Trace.InvocationId, OpenFuture<Any?>>()
fun start() {
session = artemisClient.start().session
producer = session.createProducer()
val rpcWorkerQueueQuery = session.queueQuery(SimpleString(rpcWorkerQueueName))
if (!rpcWorkerQueueQuery.isExists) {
session.createQueue(rpcWorkerQueueAddress, RoutingType.ANYCAST, rpcWorkerQueueName, true)
}
val consumer = session.createConsumer(rpcWorkerQueueName)
consumer.setMessageHandler { message -> handleFlowWorkerMessage(message) }
networkMapFeed().updates.subscribe { mapChange: NetworkMapCache.MapChange? ->
val networkMapUpdateMessage = NetworkMapUpdate(services.myInfo.legalIdentities.first().name, mapChange!!)
val artemisMessage = session.createMessage(true)
artemisMessage.writeBodyBufferBytes(networkMapUpdateMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes)
producer.send(flowWorkerQueueAddress, artemisMessage)
}
}
private fun handleFlowWorkerMessage(message: ClientMessage) {
val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val flowWorkerMessage = data.deserialize<FlowWorkerMessage>(context = SerializationDefaults.RPC_CLIENT_CONTEXT)
when (flowWorkerMessage) {
is FlowReplyStateMachineRunId -> {
flowReplyStateMachineRunIdMap.remove(flowWorkerMessage.replyId)?.set(flowWorkerMessage.id)
}
is FlowReplyResult -> {
flowReplyResultMap.remove(flowWorkerMessage.replyId)?.set(flowWorkerMessage.result)
// TODO hack, fix the way we populate contractStateTypeMappings
(services.vaultService as NodeVaultService).bootstrapContractStateTypes()
}
}
}
override fun networkMapSnapshot(): List<NodeInfo> {
val (snapshot, updates) = networkMapFeed()
updates.notUsed()
return snapshot
}
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
return services.networkMapUpdater.trackParametersUpdate()
}
override fun acceptNewNetworkParameters(parametersHash: SecureHash) {
services.networkMapUpdater.acceptNewNetworkParameters(
parametersHash,
// TODO When multiple identities design will be better specified this should be signature from node operator.
{ hash -> hash.serialize().sign { services.keyManagementService.sign(it.bytes, services.myInfo.legalIdentities[0].owningKey) } }
)
}
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
return services.networkMapCache.track()
}
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria,
paging: PageSpecification,
sorting: Sort,
contractStateType: Class<out T>): Vault.Page<T> {
contractStateType.checkIsA<ContractState>()
return services.vaultService._queryBy(criteria, paging, sorting, contractStateType)
}
@RPCReturnsObservables
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria,
paging: PageSpecification,
sorting: Sort,
contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
contractStateType.checkIsA<ContractState>()
return services.vaultService._trackBy(criteria, paging, sorting, contractStateType)
}
@Suppress("OverridingDeprecatedMember")
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
val (snapshot, updates) = @Suppress("DEPRECATION") internalVerifiedTransactionsFeed()
updates.notUsed()
return snapshot
}
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId)
@Suppress("OverridingDeprecatedMember")
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return services.validatedTransactions.track()
}
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
val (snapshot, updates) = stateMachinesFeed()
updates.notUsed()
return snapshot
}
override fun killFlow(id: StateMachineRunId): Boolean {
TODO()
}
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
TODO()
}
override fun stateMachineRecordedTransactionMappingSnapshot(): List<StateMachineTransactionMapping> {
val (snapshot, updates) = stateMachineRecordedTransactionMappingFeed()
updates.notUsed()
return snapshot
}
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return services.stateMachineRecordedTransactionMapping.track()
}
override fun nodeInfo(): NodeInfo {
return services.myInfo
}
override fun notaryIdentities(): List<Party> {
return services.networkMapCache.notaryIdentities
}
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) {
services.vaultService.addNoteToTransaction(txnId, txnNote)
}
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> {
return services.vaultService.getTransactionNotes(txnId)
}
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
TODO()
}
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
// TODO
val context = InvocationContext.rpc(Actor(Actor.Id("Mark"), AuthServiceId("Test"), CordaX500Name("ff", "ff", "GB")))
val replyId = Trace.InvocationId.newInstance()
val startFlowMessage = StartFlow(services.myInfo.legalIdentities.first().name, logicType, args, context, rpcWorkerQueueAddress, replyId)
val artemisMessage = session.createMessage(true)
artemisMessage.writeBodyBufferBytes(startFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes)
producer.send(flowWorkerQueueAddress, artemisMessage)
val flowReplyStateMachineRunIdFuture = SettableFuture.create<StateMachineRunId>()
flowReplyStateMachineRunIdMap[replyId] = flowReplyStateMachineRunIdFuture
val flowReplyResultFuture = openFuture<T>()
flowReplyResultMap[replyId] = uncheckedCast(flowReplyResultFuture)
return FlowHandleImpl(flowReplyStateMachineRunIdFuture.get(), flowReplyResultFuture)
}
override fun attachmentExists(id: SecureHash): Boolean {
return services.attachments.openAttachment(id) != null
}
override fun openAttachment(id: SecureHash): InputStream {
return services.attachments.openAttachment(id)!!.open()
}
override fun uploadAttachment(jar: InputStream): SecureHash {
return services.attachments.importAttachment(jar, RPC_UPLOADER, null)
}
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash {
return services.attachments.importAttachment(jar, uploader, filename)
}
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
return services.attachments.queryAttachments(query, sorting)
}
override fun currentNodeTime(): Instant = Instant.now(services.clock)
override fun waitUntilNetworkReady(): CordaFuture<Void?> = services.networkMapCache.nodeReady
override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? {
return services.identityService.wellKnownPartyFromAnonymous(party)
}
override fun partyFromKey(key: PublicKey): Party? {
return services.identityService.partyFromKey(key)
}
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? {
return services.identityService.wellKnownPartyFromX500Name(x500Name)
}
override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? = services.networkMapCache.getNotary(x500Name)
override fun partiesFromName(query: String, exactMatch: Boolean): Set<Party> {
return services.identityService.partiesFromName(query, exactMatch)
}
override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? {
return services.networkMapCache.getNodeByLegalIdentity(party)
}
override fun registeredFlows(): List<String> = services.rpcFlows.map { it.name }.sorted()
override fun clearNetworkMapCache() {
services.networkMapCache.clearNetworkMapCache()
}
override fun refreshNetworkMapCache() {
try {
services.networkMapUpdater.updateNetworkMapCache()
} catch (e: Exception) {
when (e) {
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")
else -> throw e
}
}
}
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>): Vault.Page<T> {
return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
return vaultQueryBy(criteria, paging, Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), sorting, contractStateType)
}
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, paging, Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
}
override fun setFlowsDrainingModeEnabled(enabled: Boolean) {
services.nodeProperties.flowsDrainingMode.setEnabled(enabled)
}
override fun isFlowsDrainingModeEnabled(): Boolean {
return services.nodeProperties.flowsDrainingMode.isEnabled()
}
override fun shutdown() {
artemisClient.stop()
shutdownNode.invoke()
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
}
private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
return when (change) {
is StateMachineManager.Change.Add -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.logic))
is StateMachineManager.Change.Removed -> StateMachineUpdate.Removed(change.logic.runId, change.result)
}
}
private fun InvocationContext.toFlowInitiator(): FlowInitiator {
val principal = origin.principal().name
return when (origin) {
is InvocationOrigin.RPC -> FlowInitiator.RPC(principal)
is InvocationOrigin.Peer -> {
val wellKnownParty = services.identityService.wellKnownPartyFromX500Name((origin as InvocationOrigin.Peer).party)
wellKnownParty?.let { FlowInitiator.Peer(it) }
?: throw IllegalStateException("Unknown peer with name ${(origin as InvocationOrigin.Peer).party}.")
}
is InvocationOrigin.Service -> FlowInitiator.Service(principal)
InvocationOrigin.Shell -> FlowInitiator.Shell
is InvocationOrigin.Scheduled -> FlowInitiator.Scheduled((origin as InvocationOrigin.Scheduled).scheduledState)
}
}
/**
* RPC can be invoked from the shell where the type parameter of any [Class] parameter is lost, so we must
* explicitly check that the provided [Class] is the one we want.
*/
private inline fun <reified TARGET> Class<*>.checkIsA() {
require(TARGET::class.java.isAssignableFrom(this)) { "$name is not a ${TARGET::class.java.name}" }
}
}

View File

@ -1,50 +1,39 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.rpcWorker
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.AuthServiceId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.isAbstractClass
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.*
import net.corda.node.internal.Startable
import net.corda.node.internal.Stoppable
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.node.internal.NetworkParametersReader
import net.corda.node.internal.Node
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.node.services.messaging.RPCServer
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.messaging.InternalRPCMessagingClient
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.RPCApi
import net.corda.node.services.rpc.ArtemisRpcBroker
import net.corda.nodeapi.internal.config.User
import net.corda.serialization.internal.AMQP_P2P_CONTEXT
import net.corda.serialization.internal.SerializationFactoryImpl
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import net.corda.nodeapi.internal.crypto.X509Utilities
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.security.CheckType
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
import picocli.CommandLine
import java.io.File
import java.lang.IllegalArgumentException
import java.nio.file.FileSystems
import java.nio.file.Path
import java.util.concurrent.Executors
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.util.*
import kotlin.system.exitProcess
fun main(args: Array<String>) {
@ -56,7 +45,8 @@ fun main(args: Array<String>) {
if (main.verbose) {
throwable.printStackTrace()
} else {
System.err.println("ERROR: ${throwable.message ?: ""}. Please use '--verbose' option to obtain more details.")
System.err.println("ERROR: ${throwable.message
?: ""}. Please use '--verbose' option to obtain more details.")
}
exitProcess(1)
}
@ -66,7 +56,7 @@ fun main(args: Array<String>) {
name = "RPC Worker",
mixinStandardHelpOptions = true,
showDefaultValues = true,
description = [ "Standalone RPC server endpoint with pluggable set of operations." ]
description = ["Standalone RPC server endpoint with pluggable set of operations."]
)
class Main : Runnable {
@CommandLine.Option(
@ -91,199 +81,77 @@ class Main : Runnable {
val port = config.getInt("port")
val user = User(config.getString("userName"), config.getString("password"), emptySet())
val rpcOps = instantiateAndValidate(config.getString("rpcOpsImplClass"))
val artemisDir = FileSystems.getDefault().getPath(config.getString("artemisDir"))
initialiseSerialization()
RpcWorker(NetworkHostAndPort("localhost", port), user, rpcOps, artemisDir).start()
val rpcWorkerConfig = getRpcWorkerConfig(port, user, artemisDir)
val ourKeyPair = getIdentity()
val myInfo = getNodeInfo()
val trustRoot = rpcWorkerConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val nodeCa = rpcWorkerConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
val signedNetworkParameters = NetworkParametersReader(trustRoot, null, rpcWorkerConfig.baseDirectory).read()
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize)
createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl)
}
private fun instantiateAndValidate(rpcOpsImplClassName: String): RPCOps {
try {
val klass = Class.forName(rpcOpsImplClassName)
if (klass.isAbstractClass) {
throw IllegalArgumentException("$rpcOpsImplClassName must not be abstract")
}
val instance = klass.newInstance()
return instance as? RPCOps ?: throw IllegalArgumentException("class '$rpcOpsImplClassName' is not extending RPCOps")
} catch (ex: ClassNotFoundException) {
throw IllegalArgumentException("class '$rpcOpsImplClassName' not found in the classpath")
}
private fun getRpcWorkerConfig(port: Int, user: User, artemisDir: Path): NodeConfiguration {
TODO("" + port + user + artemisDir)
}
private fun initialiseSerialization() {
synchronized(this) {
if (nodeSerializationEnv == null) {
val classloader = this::class.java.classLoader
nodeSerializationEnv = SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(AMQPServerSerializationScheme(emptyList()))
},
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
rpcServerContext = AMQP_P2P_CONTEXT.withClassLoader(classloader)
)
}
private fun getIdentity(): KeyPair {
TODO()
}
private fun getNodeInfo(): NodeInfo {
TODO()
}
private fun createRpcWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
val rpcOptions = config.rpcOptions
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers))
val broker = if (rpcOptions.useSsl) {
ArtemisRpcBroker.withSsl(config, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
} else {
ArtemisRpcBroker.withoutSsl(config, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
}
broker.start()
return broker
}
private fun createRpcWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, serverControl: ActiveMQServerControl): Pair<RpcWorker, RpcWorkerServiceHub> {
val rpcWorkerServiceHub = RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa)
val rpcWorker = RpcWorker(rpcWorkerServiceHub, serverControl)
rpcWorker.start()
return Pair(rpcWorker, rpcWorkerServiceHub)
}
}
/**
* Note once `stop()` been called, there is no longer an option to call `start()` and the instance should be discarded
*/
class RpcWorker(private val hostAndPort: NetworkHostAndPort, private val user: User, private val ops: RPCOps, private val artemisPath: Path) : Startable, Stoppable {
class RpcWorker(private val rpcWorkerServiceHub: RpcWorkerServiceHub, private val serverControl: ActiveMQServerControl) {
private companion object {
const val MAX_MESSAGE_SIZE: Int = 10485760
const val notificationAddress = "notifications"
private val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality = "Nowhere", country = "GB")
private val DEFAULT_TIMEOUT = 60.seconds
private val runOnStop = ArrayList<() -> Any?>()
private val logger = contextLogger()
}
private val executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("RpcWorker-pool-thread-%d").build())
private val registeredShutdowns = mutableListOf(doneFuture({executorService.shutdown()}))
override var started = false
override fun start() {
started = true
startRpcServer().getOrThrow(DEFAULT_TIMEOUT)
}
override fun stop() {
val shutdownOutcomes = registeredShutdowns.map { Try.on { it.getOrThrow(DEFAULT_TIMEOUT) } }
shutdownOutcomes.reversed().forEach {
when (it) {
is Try.Success ->
try {
it.value()
} catch (t: Throwable) {
logger.warn("Exception while calling a shutdown action, this might create resource leaks", t)
}
is Try.Failure -> logger.warn("Exception while getting shutdown method, disregarding", it.exception)
}
}
started = false
}
private fun startRpcServer(): CordaFuture<RPCServer> {
return startRpcBroker().map { serverControl ->
startRpcServerWithBrokerRunning(serverControl = serverControl)
}
}
private fun startRpcBroker(
maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE
): CordaFuture<ActiveMQServerControl> {
return executorService.fork {
logger.info("Artemis files will be stored in: $artemisPath")
val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, artemisPath, hostAndPort)
val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(user))
server.start()
registeredShutdowns.add(doneFuture({
server.stop()
}))
server.activeMQServerControl
}
}
private fun createNettyClientTransportConfiguration(): TransportConfiguration {
return ArtemisTcpTransport.rpcConnectorTcpTransport(hostAndPort, null)
}
private fun startRpcServerWithBrokerRunning(
nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
serverControl: ActiveMQServerControl
): RPCServer {
val locator = ActiveMQClient.createServerLocatorWithoutHA(createNettyClientTransportConfiguration()).apply {
minLargeMessageSize = MAX_MESSAGE_SIZE
isUseGlobalPools = false
}
val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(User(user.username, user.password, user.permissions)),
id = AuthServiceId("RPC_WORKER_SECURITY_MANAGER"))
val rpcServer = RPCServer(
ops,
user.username,
user.password,
locator,
rpcSecurityManager,
nodeLegalName,
configuration
fun start() {
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy(
rpcThreadPoolSize = rpcWorkerServiceHub.configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
)
registeredShutdowns.add(doneFuture({
rpcServer.close()
locator.close()
}))
rpcServer.start(serverControl)
return rpcServer
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(rpcWorkerServiceHub.configuration.rpcUsers))
val nodeName = CordaX500Name.build(rpcWorkerServiceHub.configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal)
val internalRpcMessagingClient = InternalRPCMessagingClient(rpcWorkerServiceHub.configuration, rpcWorkerServiceHub.configuration.rpcOptions.adminAddress, Node.MAX_RPC_MESSAGE_SIZE, nodeName, rpcServerConfiguration)
internalRpcMessagingClient.init(rpcWorkerServiceHub.rpcOps, securityManager)
internalRpcMessagingClient.start(serverControl)
runOnStop += { rpcWorkerServiceHub.stop() }
rpcWorkerServiceHub.start()
runOnStop += { internalRpcMessagingClient.stop() }
}
private fun createRpcServerArtemisConfig(maxFileSize: Int, maxBufferedBytesPerClient: Long, baseDirectory: Path, hostAndPort: NetworkHostAndPort): Configuration {
return ConfigurationImpl().apply {
val artemisDir = "$baseDirectory/artemis"
bindingsDirectory = "$artemisDir/bindings"
journalDirectory = "$artemisDir/journal"
largeMessagesDirectory = "$artemisDir/large-messages"
acceptorConfigurations = setOf(ArtemisTcpTransport.rpcAcceptorTcpTransport(hostAndPort, null))
configureCommonSettings(maxFileSize, maxBufferedBytesPerClient)
fun stop() {
for (toRun in runOnStop.reversed()) {
toRun()
}
}
private fun ConfigurationImpl.configureCommonSettings(maxFileSize: Int, maxBufferedBytesPerClient: Long) {
managementNotificationAddress = SimpleString(notificationAddress)
isPopulateValidatedUser = true
journalBufferSize_NIO = maxFileSize
journalBufferSize_AIO = maxFileSize
journalFileSize = maxFileSize
queueConfigurations = listOf(
CoreQueueConfiguration().apply {
name = RPCApi.RPC_SERVER_QUEUE_NAME
address = RPCApi.RPC_SERVER_QUEUE_NAME
isDurable = false
},
CoreQueueConfiguration().apply {
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS
address = notificationAddress
filterString = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION
isDurable = false
},
CoreQueueConfiguration().apply {
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS
address = notificationAddress
filterString = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION
isDurable = false
}
)
addressesSettings = mapOf(
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
maxSizeBytes = maxBufferedBytesPerClient
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
}
)
}
}
private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 {
override fun validateUser(user: String?, password: String?) = isValid(user, password)
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?) = isValid(user, password)
override fun validateUser(user: String?, password: String?, connection: RemotingConnection?): String? {
return validate(user, password)
}
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? {
return validate(user, password)
}
private fun isValid(user: String?, password: String?): Boolean {
return rpcUser.username == user && rpcUser.password == password
}
private fun validate(user: String?, password: String?): String? {
return if (isValid(user, password)) user else null
runOnStop.clear()
}
}

View File

@ -1,23 +0,0 @@
package net.corda.rpcWorker
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
fun main(args: Array<String>) {
AMQPClientSerializationScheme.initialiseSerialization()
val client = RPCClient<SimplisticRpcOps>(ArtemisTcpTransport.rpcConnectorTcpTransport(NetworkHostAndPort("localhost", 20002), null),
CordaRPCClientConfiguration.DEFAULT, serializationContext = AMQP_RPC_CLIENT_CONTEXT)
val connection = client.start(SimplisticRpcOps::class.java, "user1", "test1")
try {
val rpcOps = connection.proxy
println("Server hostname and PID: " + rpcOps.hostnameAndPid())
println("Server timestamp: " + rpcOps.currentTimeStamp())
} finally {
connection.close()
}
}

View File

@ -0,0 +1,263 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.rpcWorker
import com.codahale.metrics.MetricRegistry
import com.jcabi.manifests.Manifests
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.sign
import net.corda.core.flows.FlowLogic
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ContractUpgradeService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.contextLogger
import net.corda.node.CordaClock
import net.corda.node.SimpleClock
import net.corda.node.VersionInfo
import net.corda.node.internal.*
import net.corda.node.internal.cordapp.CordappConfigFileProvider
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.node.internal.cordapp.JarScanningCordappLoader
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
import net.corda.node.services.api.AuditService
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.*
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.vault.NodeVaultService
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.isH2Database
import net.corda.serialization.internal.*
import org.slf4j.Logger
import rx.schedulers.Schedulers
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Clock
import java.time.Duration
import java.util.*
class RpcWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, private val signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, private val ourKeyPair: KeyPair, private val trustRoot: X509Certificate, private val nodeCa: X509Certificate) : ServiceHubInternal, SingletonSerializeAsToken() {
override val clock: CordaClock = SimpleClock(Clock.systemUTC())
private val versionInfo = getVersionInfo()
private val cordappLoader = JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo)
private val log: Logger get() = staticLog
companion object {
private val staticLog = contextLogger()
}
private val runOnStop = ArrayList<() -> Any?>()
override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, false)
override val identityService = PersistentIdentityService()
override val database: CordaPersistence = createCordaPersistence(
configuration.database,
identityService::wellKnownPartyFromX500Name,
identityService::wellKnownPartyFromAnonymous,
schemaService
)
init {
// TODO Break cyclic dependency
identityService.database = database
}
private val persistentNetworkMapCache = PersistentNetworkMapCache(database, myInfo.legalIdentities[0].name)
override val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database)
@Suppress("LeakingThis")
override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database)
private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
private val metricRegistry = MetricRegistry()
override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
override val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments)
@Suppress("LeakingThis")
override val keyManagementService = PersistentKeyManagementService(identityService, database)
private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions)
@Suppress("LeakingThis")
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService)
override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
override val monitoringService = MonitoringService(metricRegistry)
override val networkMapUpdater = NetworkMapUpdater(
networkMapCache,
NodeInfoWatcher(
configuration.baseDirectory,
@Suppress("LeakingThis")
Schedulers.io(),
Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)
),
networkMapClient,
configuration.baseDirectory,
configuration.extraNetworkMapKeys
).closeOnStop()
override val networkParameters = signedNetworkParameters.networkParameters
override val transactionVerifierService: TransactionVerifierService
get() {
throw NotImplementedError()
}
override val contractUpgradeService: ContractUpgradeService
get() {
throw NotImplementedError()
}
override val auditService: AuditService
get() {
throw NotImplementedError()
}
// TODO schedulerService
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
override val networkService: MessagingService
get() {
throw NotImplementedError()
}
private fun <T : AutoCloseable> T.closeOnStop(): T {
runOnStop += this::close
return this
}
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {
throw NotImplementedError()
}
override fun loadState(stateRef: StateRef): TransactionState<*> {
return servicesForResolution.loadState(stateRef)
}
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
return servicesForResolution.loadStates(stateRefs)
}
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
throw NotImplementedError()
}
override fun jdbcSession(): Connection {
throw NotImplementedError()
}
override fun registerUnloadHandler(runOnStop: () -> Unit) {
this.runOnStop += runOnStop
}
private fun getVersionInfo(): VersionInfo {
// Manifest properties are only available if running from the corda jar
fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null
return VersionInfo(
manifestValue("Corda-Platform-Version")?.toInt() ?: 1,
manifestValue("Corda-Release-Version") ?: "Unknown",
manifestValue("Corda-Revision") ?: "Unknown",
manifestValue("Corda-Vendor") ?: "Unknown"
)
}
private fun initialiseSerialization() {
val serializationExists = try {
effectiveSerializationEnv
true
} catch (e: IllegalStateException) {
false
}
if (!serializationExists) {
val classloader = cordappLoader.appClassLoader
nodeSerializationEnv = SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps))
registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps))
registerScheme(KryoServerSerializationScheme())
},
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader),
rpcClientContext = AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader))
}
}
val rpcOps = CordaRpcWorkerOps(this, {})
fun start() {
log.info("Rpc Worker starting up ...")
initialiseSerialization()
networkMapClient?.start(trustRoot)
servicesForResolution.start(networkParameters)
val isH2Database = isH2Database(configuration.dataSourceProperties.getProperty("dataSource.url", ""))
val schemas = if (isH2Database) schemaService.internalSchemas() else schemaService.schemaOptions.keys
database.startHikariPool(configuration.dataSourceProperties, configuration.database, schemas)
identityService.start(trustRoot, listOf(myInfo.legalIdentitiesAndCerts.first().certificate, nodeCa))
persistentNetworkMapCache.start(networkParameters.notaries)
runOnStop += { rpcOps.shutdown() }
rpcOps.start()
database.transaction {
networkMapCache.start()
networkMapCache.addNode(myInfo)
}
val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised ->
ourKeyPair.private.sign(serialised.bytes)
}
identityService.ourNames = myInfo.legalIdentities.map { it.name }.toSet()
networkMapUpdater.start(trustRoot, signedNetworkParameters.signed.raw.hash, nodeInfoAndSigned.signed.raw.hash)
database.transaction {
identityService.loadIdentities(myInfo.legalIdentitiesAndCerts)
attachments.start()
nodeProperties.start()
keyManagementService.start(setOf(ourKeyPair))
vaultService.start()
}
}
fun stop() {
for (toRun in runOnStop.reversed()) {
toRun()
}
runOnStop.clear()
}
}

View File

@ -1,29 +0,0 @@
package net.corda.rpcWorker
import net.corda.core.messaging.RPCOps
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.time.ZonedDateTime
// TODO: This interface should really be residing in the "client" sub-module such that the JAR where this interface (but no the implementation)
// Is available to RPC clients
interface SimplisticRpcOps : RPCOps {
fun currentTimeStamp(): String
fun hostnameAndPid(): String
}
class SimplisticRpcOpsImpl : SimplisticRpcOps {
override val protocolVersion: Int = 1
override fun currentTimeStamp(): String {
return ZonedDateTime.now().toString()
}
override fun hostnameAndPid(): String {
val info = ManagementFactory.getRuntimeMXBean()
val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this
val hostName: String = InetAddress.getLocalHost().hostName
return "$hostName:$pid"
}
}

View File

@ -1,18 +0,0 @@
//
// R3 Proprietary and Confidential
//
// Copyright (c) 2018 R3 Limited. All rights reserved.
//
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
//
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
port = 20002
userName = user1
password = test1
rpcOpsImplClass = net.corda.rpcWorker.SimplisticRpcOpsImpl
artemisDir = "C:\\Users\\Viktor Kolomeyko\\AppData\\Local\\Temp\\Artemis"

View File

@ -530,7 +530,6 @@ class NodeVaultService(
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
// TODO: revisit (use single instance of parser for all queries)
val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)
@ -617,7 +616,7 @@ class NodeVaultService(
/**
* Derive list from existing vault states and then incrementally update using vault observables
*/
private fun bootstrapContractStateTypes() {
fun bootstrapContractStateTypes() {
val criteria = criteriaBuilder.createQuery(String::class.java)
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)

View File

@ -106,13 +106,13 @@ class DriverDSLImpl(
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val networkVisibilityController = NetworkVisibilityController()
val networkVisibilityController = NetworkVisibilityController()
/**
* Future which completes when the network map infrastructure is available, whether a local one or one from the CZ.
* This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap]
* object, which is null if the network map is being provided by the CZ.
*/
private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
override val notaryHandles: List<NotaryHandle> get() = _notaries.getOrThrow()
@ -889,7 +889,7 @@ class DriverDSLImpl(
* Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all
* current nodes see everyone.
*/
private class NetworkVisibilityController {
class NetworkVisibilityController {
private val nodeVisibilityHandles = ThreadBox(HashMap<String, VisibilityHandle>())
fun register(name: CordaX500Name): VisibilityHandle {