diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 212f9e9d5a..79580df8c4 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -136,6 +136,9 @@ + + + diff --git a/experimental/flow-worker/build.gradle b/experimental/flow-worker/build.gradle new file mode 100644 index 0000000000..ded92e0175 --- /dev/null +++ b/experimental/flow-worker/build.gradle @@ -0,0 +1,38 @@ +apply plugin: 'kotlin' +apply plugin: 'java' + +description 'Corda Flow Worker' + +configurations { + integrationTestCompile.extendsFrom testCompile + integrationTestRuntime.extendsFrom testRuntime +} + +sourceSets { + integrationTest { + kotlin { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integration-test/kotlin') + } + resources { + srcDir file('src/integration-test/resources') + } + } +} + +dependencies { + compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" + compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" + + compile(project(':core')) + compile(project(':node')) + + testCompile "junit:junit:$junit_version" + testCompile(project(':node-driver')) +} + +task integrationTest(type: Test) { + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath +} \ No newline at end of file diff --git a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt new file mode 100644 index 0000000000..620804813c --- /dev/null +++ b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt @@ -0,0 +1,148 @@ +package net.corda.flowworker + +import co.paralleluniverse.fibers.Suspendable +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import net.corda.core.concurrent.CordaFuture +import net.corda.core.context.InvocationContext +import net.corda.core.crypto.Crypto.generateKeyPair +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.identity.Party +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.node.NetworkParameters +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.serialize +import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.finance.DOLLARS +import net.corda.finance.contracts.getCashBalances +import net.corda.finance.flows.AbstractCashFlow +import net.corda.finance.flows.CashIssueFlow +import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.NodeConfigurationImpl +import net.corda.node.services.config.parseAsNodeConfiguration +import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.messaging.P2PMessagingClient +import net.corda.node.services.statemachine.ExternalEvent +import net.corda.node.services.statemachine.InitialSessionMessage +import net.corda.node.services.statemachine.SessionId +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.getTestPartyAndCertificate +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.internal.NodeBasedTest +import net.corda.testing.node.internal.TestCordappDirectories +import net.corda.testing.node.internal.cordappsForPackages +import net.corda.testing.node.internal.getCallerPackage +import org.apache.activemq.artemis.api.core.Message +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.time.Instant + +class FlowWorkerTest { + + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + + private val bankAKeyPair = generateKeyPair() + private val bankBKeyPair = generateKeyPair() + private val notaryKeyPair = generateKeyPair() + private val bankA = Party(DUMMY_BANK_A_NAME, bankAKeyPair.public) + private val bankB = Party(DUMMY_BANK_B_NAME, bankBKeyPair.public) + private val notary = Party(DUMMY_NOTARY_NAME, notaryKeyPair.public) + private val bankAPartyAndCertificate = getTestPartyAndCertificate(bankA) + private val bankBPartyAndCertificate = getTestPartyAndCertificate(bankB) + private val notaryPartyAndCertificate = getTestPartyAndCertificate(notary) + + private val cordappPackages = listOf("net.corda.finance") + private val cordapps = cordappsForPackages(getCallerPackage(NodeBasedTest::class)?.let { cordappPackages + it } + ?: cordappPackages) + + private lateinit var configuration: NodeConfiguration + + @Before + fun setup() { + val testConfig = ConfigFactory.parseResources("test-config.conf", ConfigParseOptions.defaults().setAllowMissing(false)).parseAsNodeConfiguration() as NodeConfigurationImpl + configuration = testConfig.copy(baseDirectory = temporaryFolder.root.toPath(), dataSourceProperties = makeTestDataSourceProperties(), cordappDirectories = TestCordappDirectories.cached(cordapps).toList()) + } + + private val myInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 3334)), listOf(bankAPartyAndCertificate), 1, 1) + private val networkParameters = NetworkParameters( + minimumPlatformVersion = 1, + notaries = listOf(), + modifiedTime = Instant.now(), + maxMessageSize = 10485760, + maxTransactionSize = 4000000, + epoch = 1, + whitelistedContractImplementations = emptyMap() + ) + + @Test + fun `send message`() { + val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair) + val flowWorker = FlowWorker(flowWorkerServiceHub) + flowWorker.start() + + flowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", 3333)), listOf(bankBPartyAndCertificate), 1, 1)) + flowWorkerServiceHub.flowFactories[SomeFlowLogic::class.java] = InitiatedFlowFactory.Core { flowSession -> SomeFlowLogic(flowSession) } + + val cordaMessage = flowWorkerServiceHub.networkService.createMessage("platform.session", data = ByteSequence.of(InitialSessionMessage(SessionId(1), 1, SomeFlowLogic::class.java.name, 1, "", "test".serialize()).serialize().bytes).bytes) + val artemisMessage = (flowWorkerServiceHub.networkService as P2PMessagingClient).messagingExecutor!!.cordaToArtemisMessage(cordaMessage) + artemisMessage!!.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString(DUMMY_BANK_B_NAME.toString())) + (flowWorkerServiceHub.networkService as P2PMessagingClient).deliver(artemisMessage) + + flowWorker.stop() + } + + @Test + fun `cash issue`() { + val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair) + val flowWorker = FlowWorker(flowWorkerServiceHub) + flowWorker.start() + + flowWorkerServiceHub.database.transaction { + flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate) + } + + val startFlowEventCashIssue = object : ExternalEvent.ExternalStartFlowEvent, DeduplicationHandler { + override val deduplicationHandler = this + override fun insideDatabaseTransaction() {} + override fun afterDatabaseTransaction() {} + override val externalCause = this + override val flowLogic = CashIssueFlow(10.DOLLARS, OpaqueBytes.of(0x01), notary) + override val context = InvocationContext.service("bla", DUMMY_BANK_A_NAME) + private val _future = openFuture>() + override fun wireUpFuture(flowFuture: CordaFuture>) { + _future.captureLater(flowFuture) + } + + override val future: CordaFuture> + get() = _future + } + val result = flowWorker.startFlow(startFlowEventCashIssue) + println(result.getOrThrow().resultFuture.getOrThrow()) + println("Cash " + flowWorkerServiceHub.getCashBalances()) + + flowWorker.stop() + } + +} + +private class SomeFlowLogic(private val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + println("FLOW START") + session.send("FLOW SEND A MESSAGE") + println("FLOW END") + } +} \ No newline at end of file diff --git a/experimental/flow-worker/src/integration-test/resources/test-config.conf b/experimental/flow-worker/src/integration-test/resources/test-config.conf new file mode 100644 index 0000000000..ce17dcbab7 --- /dev/null +++ b/experimental/flow-worker/src/integration-test/resources/test-config.conf @@ -0,0 +1,47 @@ +baseDirectory = "" +myLegalName = "O=Bank A, L=London, C=GB" +emailAddress = "" +keyStorePassword = "cordacadevpass" +trustStorePassword = "trustpass" +dataSourceProperties = { + dataSourceClassName = org.h2.jdbcx.JdbcDataSource + dataSource.url = "jdbc:h2:file:blah" + dataSource.user = "sa" + dataSource.password = "" +} +verifierType = InMemory +p2pAddress = "localhost:3334" +flowTimeout { + timeout = 30 seconds + maxRestartCount = 3 + backoffBase = 2.0 +} +devMode = true +crlCheckSoftFail = true +database = { + transactionIsolationLevel = "REPEATABLE_READ" + exportHibernateJMXStatistics = "false" +} +h2port = 0 +useTestClock = false +rpcSettings = { + address = "locahost:3418" + adminAddress = "localhost:3419" + useSsl = false + standAloneBroker = false +} +enterpriseConfiguration = { + mutualExclusionConfiguration = { + on = false + updateInterval = 20000 + waitInterval = 40000 + } + tuning = { + flowThreadPoolSize = 1 + rpcThreadPoolSize = 4 + maximumMessagingBatchSize = 256 + p2pConfirmationWindowSize = 1048576 + brokerConnectionTtlCheckIntervalMs = 20 + } + useMultiThreadedSMM = true +} \ No newline at end of file diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt new file mode 100644 index 0000000000..fc1cbca11c --- /dev/null +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorker.kt @@ -0,0 +1,24 @@ +package net.corda.flowworker + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.FlowStateMachine +import net.corda.node.services.statemachine.ExternalEvent + +class FlowWorker(private val flowWorkerServiceHub: FlowWorkerServiceHub) { + + fun start() { + flowWorkerServiceHub.start() + } + + fun stop() { + flowWorkerServiceHub.stop() + } + + fun startFlow(event: ExternalEvent.ExternalStartFlowEvent): CordaFuture> { + flowWorkerServiceHub.database.transaction { + flowWorkerServiceHub.smm.deliverExternalEvent(event) + } + return event.future + } + +} \ No newline at end of file diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt new file mode 100644 index 0000000000..0339fc3962 --- /dev/null +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt @@ -0,0 +1,316 @@ +package net.corda.flowworker + +import com.codahale.metrics.MetricRegistry +import com.google.common.collect.MutableClassToInstanceMap +import com.jcabi.manifests.Manifests +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState +import net.corda.core.crypto.newSecureRandom +import net.corda.core.flows.FlowLogic +import net.corda.core.node.NetworkParameters +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SerializeAsToken +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.internal.SerializationEnvironmentImpl +import net.corda.core.serialization.internal.effectiveSerializationEnv +import net.corda.core.serialization.internal.nodeSerializationEnv +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.CordaClock +import net.corda.node.SimpleClock +import net.corda.node.VersionInfo +import net.corda.node.cordapp.CordappLoader +import net.corda.node.internal.* +import net.corda.node.internal.cordapp.CordappConfigFileProvider +import net.corda.node.internal.cordapp.CordappProviderImpl +import net.corda.node.internal.cordapp.JarScanningCordappLoader +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme +import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT +import net.corda.node.serialization.kryo.KryoServerSerializationScheme +import net.corda.node.services.api.DummyAuditService +import net.corda.node.services.api.MonitoringService +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.WritableTransactionStorage +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.config.shouldInitCrashShell +import net.corda.node.services.identity.PersistentIdentityService +import net.corda.node.services.keys.PersistentKeyManagementService +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.messaging.P2PMessagingClient +import net.corda.node.services.network.* +import net.corda.node.services.persistence.* +import net.corda.node.services.schema.HibernateObserver +import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor +import net.corda.node.services.statemachine.MultiThreadedStateMachineManager +import net.corda.node.services.transactions.InMemoryTransactionVerifierService +import net.corda.node.services.upgrade.ContractUpgradeServiceImpl +import net.corda.node.services.vault.NodeVaultService +import net.corda.node.utilities.AffinityExecutor +import net.corda.nodeapi.internal.DEV_ROOT_CA +import net.corda.nodeapi.internal.crypto.X509Utilities +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.serialization.internal.* +import org.apache.activemq.artemis.utils.ReusableLatch +import rx.schedulers.Schedulers +import java.security.KeyPair +import java.sql.Connection +import java.time.Clock +import java.time.Duration +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair) : ServiceHubInternal, SingletonSerializeAsToken() { + + companion object { + @JvmStatic + private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader { + return JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo) + } + } + + private val versionInfo = getVersionInfo() + override val clock: CordaClock = SimpleClock(Clock.systemUTC()) + + private val runOnStop = ArrayList<() -> Any?>() + + val cordappLoader = makeCordappLoader(configuration, versionInfo) + + @Suppress("LeakingThis") + private var tokenizableServices: MutableList? = mutableListOf(clock, this) + + override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null).tokenize() + override val identityService = PersistentIdentityService().tokenize() + override val database: CordaPersistence = createCordaPersistence( + configuration.database, + identityService::wellKnownPartyFromX500Name, + identityService::wellKnownPartyFromAnonymous, + schemaService + ) + + init { + // TODO Break cyclic dependency + identityService.database = database + } + + private val persistentNetworkMapCache = PersistentNetworkMapCache(database) + override val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize() + private val checkpointStorage = DBCheckpointStorage() + @Suppress("LeakingThis") + override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database).tokenize() + private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) } + private val metricRegistry = MetricRegistry() + override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize() + override val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize() + @Suppress("LeakingThis") + override val keyManagementService = PersistentKeyManagementService(identityService, database).tokenize() + private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions) + @Suppress("LeakingThis") + override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database).tokenize() + override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database) + override val monitoringService = MonitoringService(metricRegistry).tokenize() + override val networkMapUpdater = NetworkMapUpdater( + networkMapCache, + NodeInfoWatcher( + configuration.baseDirectory, + @Suppress("LeakingThis") + Schedulers.io(), + Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec) + ), + networkMapClient, + configuration.baseDirectory, + configuration.extraNetworkMapKeys + ).closeOnStop() + private val transactionVerifierWorkerCount = 4 + @Suppress("LeakingThis") + override val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize() + override val contractUpgradeService = ContractUpgradeServiceImpl().tokenize() + override val auditService = DummyAuditService().tokenize() + + @Suppress("LeakingThis") + val smm = MultiThreadedStateMachineManager(this, checkpointStorage, MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize), database, newSecureRandom(), ReusableLatch(), cordappLoader.appClassLoader) + // TODO Making this non-lateinit requires MockNode being able to create a blank InMemoryMessaging instance + private lateinit var network: MessagingService + + private val cordappServices = MutableClassToInstanceMap.create() + val flowFactories = ConcurrentHashMap>, InitiatedFlowFactory<*>>() + + override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database) + + override val rpcFlows = ArrayList>>() + + override val networkService: MessagingService get() = network + + override fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? { + return flowFactories[initiatingFlowClass] + } + + override fun loadState(stateRef: StateRef): TransactionState<*> { + return servicesForResolution.loadState(stateRef) + } + + override fun loadStates(stateRefs: Set): Set> { + return servicesForResolution.loadStates(stateRefs) + } + + override fun cordaService(type: Class): T { + require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" } + return cordappServices.getInstance(type) + ?: throw IllegalArgumentException("Corda service ${type.name} does not exist") + } + + override fun jdbcSession(): Connection = database.createSession() + + override fun registerUnloadHandler(runOnStop: () -> Unit) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + private fun T.tokenize(): T { + tokenizableServices?.add(this) + ?: throw IllegalStateException("The tokenisable services list has already been finialised") + return this + } + + private fun getVersionInfo(): VersionInfo { + // Manifest properties are only available if running from the corda jar + fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null + + return VersionInfo( + manifestValue("Corda-Platform-Version")?.toInt() ?: 1, + manifestValue("Corda-Release-Version") ?: "Unknown", + manifestValue("Corda-Revision") ?: "Unknown", + manifestValue("Corda-Vendor") ?: "Unknown" + ) + } + + private fun makeMessagingService(): MessagingService { + return P2PMessagingClient( + config = configuration, + versionInfo = versionInfo, + serverAddress = configuration.messagingServerAddress + ?: NetworkHostAndPort("localhost", configuration.p2pAddress.port), + nodeExecutor = AffinityExecutor.ServiceAffinityExecutor("Flow Worker", 1), + database = database, + networkMap = networkMapCache, + metricRegistry = metricRegistry, + isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled, + drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values + ) + } + + private fun initialiseSerialization() { + val serializationExists = try { + effectiveSerializationEnv + true + } catch (e: IllegalStateException) { + false + } + if (!serializationExists) { + val classloader = cordappLoader.appClassLoader + nodeSerializationEnv = SerializationEnvironmentImpl( + SerializationFactoryImpl().apply { + registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps)) + registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps)) + registerScheme(KryoServerSerializationScheme()) + }, + p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), + rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), + storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), + checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader), + rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node + } + } + + fun start() { + initialiseSerialization() + + // TODO First thing we do is create the MessagingService. This should have been done by the c'tor but it's not + // possible (yet) to due restriction from MockNode + network = makeMessagingService().tokenize() + + // TODO + configuration.configureWithDevSSLCertificate() + val trustRoot = DEV_ROOT_CA.certificate + val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA) + + networkMapClient?.start(trustRoot) + + servicesForResolution.start(networkParameters) + persistentNetworkMapCache.start(networkParameters.notaries) + + database.hikariStart(configuration.dataSourceProperties, configuration.database, schemaService) + identityService.start(trustRoot, listOf(myInfo.legalIdentitiesAndCerts.first().certificate, nodeCa)) + + database.transaction { + networkMapCache.start() + } + + // TODO + //networkMapUpdater.start(trustRoot, signedNetParams.raw.hash, signedNodeInfo.raw.hash) + + startMessaging() + + database.transaction { + identityService.loadIdentities(myInfo.legalIdentitiesAndCerts) + attachments.start() + cordappProvider.start(networkParameters.whitelistedContractImplementations) + nodeProperties.start() + keyManagementService.start(setOf(ourKeyPair)) + + contractUpgradeService.start() + vaultService.start() + HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService) + + val frozenTokenizableServices = tokenizableServices!! + tokenizableServices = null + + smm.start(frozenTokenizableServices) + runOnStop += { smm.stop(0) } + } + } + + fun stop() { + for (toRun in runOnStop.reversed()) { + toRun() + } + runOnStop.clear() + } + + private fun startMessaging() { + val client = network as P2PMessagingClient + + val messageBroker = if (!configuration.messagingServerExternal) { + val brokerBindAddress = configuration.messagingServerAddress + ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port) + ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize) + } else { + null + } + + // Start up the embedded MQ server + messageBroker?.apply { + closeOnStop() + start() + } + client.closeOnStop() + client.start( + myIdentity = myInfo.legalIdentities[0].owningKey, + serviceIdentity = if (myInfo.legalIdentities.size == 1) null else myInfo.legalIdentities[1].owningKey, + advertisedAddress = myInfo.addresses.single(), + maxMessageSize = networkParameters.maxMessageSize, + legalName = myInfo.legalIdentities[0].name.toString() + ) + } + + + private fun T.closeOnStop(): T { + runOnStop += this::close + return this + } + +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 6f960dd7d6..23acca2c31 100644 --- a/settings.gradle +++ b/settings.gradle @@ -35,6 +35,7 @@ include 'experimental:quasar-hook' include 'experimental:kryo-hook' // include 'experimental:intellij-plugin' include 'experimental:flow-hook' +include 'experimental:flow-worker' include 'experimental:ha-testing' include 'experimental:corda-utils' include 'experimental:rpc-worker' diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappDirectories.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappDirectories.kt index 6f261817d2..8cb65b0202 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappDirectories.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappDirectories.kt @@ -11,7 +11,7 @@ import java.nio.file.Paths import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap -internal object TestCordappDirectories { + object TestCordappDirectories { private val logger = loggerFor() @@ -20,7 +20,7 @@ internal object TestCordappDirectories { private val cordappsCache: ConcurrentMap, Path> = ConcurrentHashMap, Path>() - internal fun cached(cordapps: Iterable, replaceExistingOnes: Boolean = false, cordappsDirectory: Path = defaultCordappsDirectory): Iterable { + fun cached(cordapps: Iterable, replaceExistingOnes: Boolean = false, cordappsDirectory: Path = defaultCordappsDirectory): Iterable { cordappsDirectory.toFile().deleteOnExit() return cordapps.map { cached(it, replaceExistingOnes, cordappsDirectory) }