mirror of
https://github.com/corda/corda.git
synced 2025-03-15 08:41:04 +00:00
parent
6f47633f71
commit
0e807a2c3c
3
.idea/compiler.xml
generated
3
.idea/compiler.xml
generated
@ -136,6 +136,9 @@
|
||||
<module name="finance_test" target="1.8" />
|
||||
<module name="flow-hook_main" target="1.8" />
|
||||
<module name="flow-hook_test" target="1.8" />
|
||||
<module name="flow-worker_integrationTest" target="1.8" />
|
||||
<module name="flow-worker_main" target="1.8" />
|
||||
<module name="flow-worker_test" target="1.8" />
|
||||
<module name="flows_integrationTest" target="1.8" />
|
||||
<module name="flows_main" target="1.8" />
|
||||
<module name="flows_test" target="1.8" />
|
||||
|
38
experimental/flow-worker/build.gradle
Normal file
38
experimental/flow-worker/build.gradle
Normal file
@ -0,0 +1,38 @@
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'java'
|
||||
|
||||
description 'Corda Flow Worker'
|
||||
|
||||
configurations {
|
||||
integrationTestCompile.extendsFrom testCompile
|
||||
integrationTestRuntime.extendsFrom testRuntime
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
integrationTest {
|
||||
kotlin {
|
||||
compileClasspath += main.output + test.output
|
||||
runtimeClasspath += main.output + test.output
|
||||
srcDir file('src/integration-test/kotlin')
|
||||
}
|
||||
resources {
|
||||
srcDir file('src/integration-test/resources')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
||||
|
||||
compile(project(':core'))
|
||||
compile(project(':node'))
|
||||
|
||||
testCompile "junit:junit:$junit_version"
|
||||
testCompile(project(':node-driver'))
|
||||
}
|
||||
|
||||
task integrationTest(type: Test) {
|
||||
testClassesDirs = sourceSets.integrationTest.output.classesDirs
|
||||
classpath = sourceSets.integrationTest.runtimeClasspath
|
||||
}
|
@ -0,0 +1,148 @@
|
||||
package net.corda.flowworker
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigParseOptions
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.Crypto.generateKeyPair
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.contracts.getCashBalances
|
||||
import net.corda.finance.flows.AbstractCashFlow
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.NodeConfigurationImpl
|
||||
import net.corda.node.services.config.parseAsNodeConfiguration
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.P2PMessagingClient
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.InitialSessionMessage
|
||||
import net.corda.node.services.statemachine.SessionId
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.getTestPartyAndCertificate
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.internal.NodeBasedTest
|
||||
import net.corda.testing.node.internal.TestCordappDirectories
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import net.corda.testing.node.internal.getCallerPackage
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.time.Instant
|
||||
|
||||
class FlowWorkerTest {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
private val bankAKeyPair = generateKeyPair()
|
||||
private val bankBKeyPair = generateKeyPair()
|
||||
private val notaryKeyPair = generateKeyPair()
|
||||
private val bankA = Party(DUMMY_BANK_A_NAME, bankAKeyPair.public)
|
||||
private val bankB = Party(DUMMY_BANK_B_NAME, bankBKeyPair.public)
|
||||
private val notary = Party(DUMMY_NOTARY_NAME, notaryKeyPair.public)
|
||||
private val bankAPartyAndCertificate = getTestPartyAndCertificate(bankA)
|
||||
private val bankBPartyAndCertificate = getTestPartyAndCertificate(bankB)
|
||||
private val notaryPartyAndCertificate = getTestPartyAndCertificate(notary)
|
||||
|
||||
private val cordappPackages = listOf("net.corda.finance")
|
||||
private val cordapps = cordappsForPackages(getCallerPackage(NodeBasedTest::class)?.let { cordappPackages + it }
|
||||
?: cordappPackages)
|
||||
|
||||
private lateinit var configuration: NodeConfiguration
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
val testConfig = ConfigFactory.parseResources("test-config.conf", ConfigParseOptions.defaults().setAllowMissing(false)).parseAsNodeConfiguration() as NodeConfigurationImpl
|
||||
configuration = testConfig.copy(baseDirectory = temporaryFolder.root.toPath(), dataSourceProperties = makeTestDataSourceProperties(), cordappDirectories = TestCordappDirectories.cached(cordapps).toList())
|
||||
}
|
||||
|
||||
private val myInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 3334)), listOf(bankAPartyAndCertificate), 1, 1)
|
||||
private val networkParameters = NetworkParameters(
|
||||
minimumPlatformVersion = 1,
|
||||
notaries = listOf(),
|
||||
modifiedTime = Instant.now(),
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = 4000000,
|
||||
epoch = 1,
|
||||
whitelistedContractImplementations = emptyMap()
|
||||
)
|
||||
|
||||
@Test
|
||||
fun `send message`() {
|
||||
val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair)
|
||||
val flowWorker = FlowWorker(flowWorkerServiceHub)
|
||||
flowWorker.start()
|
||||
|
||||
flowWorkerServiceHub.networkMapCache.addNode(NodeInfo(listOf(NetworkHostAndPort("localhost", 3333)), listOf(bankBPartyAndCertificate), 1, 1))
|
||||
flowWorkerServiceHub.flowFactories[SomeFlowLogic::class.java] = InitiatedFlowFactory.Core { flowSession -> SomeFlowLogic(flowSession) }
|
||||
|
||||
val cordaMessage = flowWorkerServiceHub.networkService.createMessage("platform.session", data = ByteSequence.of(InitialSessionMessage(SessionId(1), 1, SomeFlowLogic::class.java.name, 1, "", "test".serialize()).serialize().bytes).bytes)
|
||||
val artemisMessage = (flowWorkerServiceHub.networkService as P2PMessagingClient).messagingExecutor!!.cordaToArtemisMessage(cordaMessage)
|
||||
artemisMessage!!.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString(DUMMY_BANK_B_NAME.toString()))
|
||||
(flowWorkerServiceHub.networkService as P2PMessagingClient).deliver(artemisMessage)
|
||||
|
||||
flowWorker.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `cash issue`() {
|
||||
val flowWorkerServiceHub = FlowWorkerServiceHub(configuration, myInfo, networkParameters, bankAKeyPair)
|
||||
val flowWorker = FlowWorker(flowWorkerServiceHub)
|
||||
flowWorker.start()
|
||||
|
||||
flowWorkerServiceHub.database.transaction {
|
||||
flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
|
||||
}
|
||||
|
||||
val startFlowEventCashIssue = object : ExternalEvent.ExternalStartFlowEvent<AbstractCashFlow.Result>, DeduplicationHandler {
|
||||
override val deduplicationHandler = this
|
||||
override fun insideDatabaseTransaction() {}
|
||||
override fun afterDatabaseTransaction() {}
|
||||
override val externalCause = this
|
||||
override val flowLogic = CashIssueFlow(10.DOLLARS, OpaqueBytes.of(0x01), notary)
|
||||
override val context = InvocationContext.service("bla", DUMMY_BANK_A_NAME)
|
||||
private val _future = openFuture<FlowStateMachine<AbstractCashFlow.Result>>()
|
||||
override fun wireUpFuture(flowFuture: CordaFuture<FlowStateMachine<AbstractCashFlow.Result>>) {
|
||||
_future.captureLater(flowFuture)
|
||||
}
|
||||
|
||||
override val future: CordaFuture<FlowStateMachine<AbstractCashFlow.Result>>
|
||||
get() = _future
|
||||
}
|
||||
val result = flowWorker.startFlow(startFlowEventCashIssue)
|
||||
println(result.getOrThrow().resultFuture.getOrThrow())
|
||||
println("Cash " + flowWorkerServiceHub.getCashBalances())
|
||||
|
||||
flowWorker.stop()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class SomeFlowLogic(private val session: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
println("FLOW START")
|
||||
session.send("FLOW SEND A MESSAGE")
|
||||
println("FLOW END")
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
baseDirectory = ""
|
||||
myLegalName = "O=Bank A, L=London, C=GB"
|
||||
emailAddress = ""
|
||||
keyStorePassword = "cordacadevpass"
|
||||
trustStorePassword = "trustpass"
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
dataSource.url = "jdbc:h2:file:blah"
|
||||
dataSource.user = "sa"
|
||||
dataSource.password = ""
|
||||
}
|
||||
verifierType = InMemory
|
||||
p2pAddress = "localhost:3334"
|
||||
flowTimeout {
|
||||
timeout = 30 seconds
|
||||
maxRestartCount = 3
|
||||
backoffBase = 2.0
|
||||
}
|
||||
devMode = true
|
||||
crlCheckSoftFail = true
|
||||
database = {
|
||||
transactionIsolationLevel = "REPEATABLE_READ"
|
||||
exportHibernateJMXStatistics = "false"
|
||||
}
|
||||
h2port = 0
|
||||
useTestClock = false
|
||||
rpcSettings = {
|
||||
address = "locahost:3418"
|
||||
adminAddress = "localhost:3419"
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
}
|
||||
enterpriseConfiguration = {
|
||||
mutualExclusionConfiguration = {
|
||||
on = false
|
||||
updateInterval = 20000
|
||||
waitInterval = 40000
|
||||
}
|
||||
tuning = {
|
||||
flowThreadPoolSize = 1
|
||||
rpcThreadPoolSize = 4
|
||||
maximumMessagingBatchSize = 256
|
||||
p2pConfirmationWindowSize = 1048576
|
||||
brokerConnectionTtlCheckIntervalMs = 20
|
||||
}
|
||||
useMultiThreadedSMM = true
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package net.corda.flowworker
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
|
||||
class FlowWorker(private val flowWorkerServiceHub: FlowWorkerServiceHub) {
|
||||
|
||||
fun start() {
|
||||
flowWorkerServiceHub.start()
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
flowWorkerServiceHub.stop()
|
||||
}
|
||||
|
||||
fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<FlowStateMachine<T>> {
|
||||
flowWorkerServiceHub.database.transaction {
|
||||
flowWorkerServiceHub.smm.deliverExternalEvent(event)
|
||||
}
|
||||
return event.future
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,316 @@
|
||||
package net.corda.flowworker
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.collect.MutableClassToInstanceMap
|
||||
import com.jcabi.manifests.Manifests
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.SimpleClock
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.cordapp.CordappLoader
|
||||
import net.corda.node.internal.*
|
||||
import net.corda.node.internal.cordapp.CordappConfigFileProvider
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
||||
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.services.api.DummyAuditService
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.config.shouldInitCrashShell
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.P2PMessagingClient
|
||||
import net.corda.node.services.network.*
|
||||
import net.corda.node.services.persistence.*
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.DEV_ROOT_CA
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.serialization.internal.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import rx.schedulers.Schedulers
|
||||
import java.security.KeyPair
|
||||
import java.sql.Connection
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair) : ServiceHubInternal, SingletonSerializeAsToken() {
|
||||
|
||||
companion object {
|
||||
@JvmStatic
|
||||
private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader {
|
||||
return JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo)
|
||||
}
|
||||
}
|
||||
|
||||
private val versionInfo = getVersionInfo()
|
||||
override val clock: CordaClock = SimpleClock(Clock.systemUTC())
|
||||
|
||||
private val runOnStop = ArrayList<() -> Any?>()
|
||||
|
||||
val cordappLoader = makeCordappLoader(configuration, versionInfo)
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
private var tokenizableServices: MutableList<Any>? = mutableListOf(clock, this)
|
||||
|
||||
override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null).tokenize()
|
||||
override val identityService = PersistentIdentityService().tokenize()
|
||||
override val database: CordaPersistence = createCordaPersistence(
|
||||
configuration.database,
|
||||
identityService::wellKnownPartyFromX500Name,
|
||||
identityService::wellKnownPartyFromAnonymous,
|
||||
schemaService
|
||||
)
|
||||
|
||||
init {
|
||||
// TODO Break cyclic dependency
|
||||
identityService.database = database
|
||||
}
|
||||
|
||||
private val persistentNetworkMapCache = PersistentNetworkMapCache(database)
|
||||
override val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize()
|
||||
private val checkpointStorage = DBCheckpointStorage()
|
||||
@Suppress("LeakingThis")
|
||||
override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database).tokenize()
|
||||
private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
|
||||
private val metricRegistry = MetricRegistry()
|
||||
override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize()
|
||||
override val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize()
|
||||
@Suppress("LeakingThis")
|
||||
override val keyManagementService = PersistentKeyManagementService(identityService, database).tokenize()
|
||||
private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions)
|
||||
@Suppress("LeakingThis")
|
||||
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database).tokenize()
|
||||
override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
|
||||
override val monitoringService = MonitoringService(metricRegistry).tokenize()
|
||||
override val networkMapUpdater = NetworkMapUpdater(
|
||||
networkMapCache,
|
||||
NodeInfoWatcher(
|
||||
configuration.baseDirectory,
|
||||
@Suppress("LeakingThis")
|
||||
Schedulers.io(),
|
||||
Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)
|
||||
),
|
||||
networkMapClient,
|
||||
configuration.baseDirectory,
|
||||
configuration.extraNetworkMapKeys
|
||||
).closeOnStop()
|
||||
private val transactionVerifierWorkerCount = 4
|
||||
@Suppress("LeakingThis")
|
||||
override val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize()
|
||||
override val contractUpgradeService = ContractUpgradeServiceImpl().tokenize()
|
||||
override val auditService = DummyAuditService().tokenize()
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
val smm = MultiThreadedStateMachineManager(this, checkpointStorage, MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize), database, newSecureRandom(), ReusableLatch(), cordappLoader.appClassLoader)
|
||||
// TODO Making this non-lateinit requires MockNode being able to create a blank InMemoryMessaging instance
|
||||
private lateinit var network: MessagingService
|
||||
|
||||
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
|
||||
val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
|
||||
|
||||
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
|
||||
|
||||
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
|
||||
|
||||
override val networkService: MessagingService get() = network
|
||||
|
||||
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {
|
||||
return flowFactories[initiatingFlowClass]
|
||||
}
|
||||
|
||||
override fun loadState(stateRef: StateRef): TransactionState<*> {
|
||||
return servicesForResolution.loadState(stateRef)
|
||||
}
|
||||
|
||||
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
|
||||
return servicesForResolution.loadStates(stateRefs)
|
||||
}
|
||||
|
||||
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
|
||||
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
|
||||
return cordappServices.getInstance(type)
|
||||
?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
|
||||
}
|
||||
|
||||
override fun jdbcSession(): Connection = database.createSession()
|
||||
|
||||
override fun registerUnloadHandler(runOnStop: () -> Unit) {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
private fun <T : Any> T.tokenize(): T {
|
||||
tokenizableServices?.add(this)
|
||||
?: throw IllegalStateException("The tokenisable services list has already been finialised")
|
||||
return this
|
||||
}
|
||||
|
||||
private fun getVersionInfo(): VersionInfo {
|
||||
// Manifest properties are only available if running from the corda jar
|
||||
fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null
|
||||
|
||||
return VersionInfo(
|
||||
manifestValue("Corda-Platform-Version")?.toInt() ?: 1,
|
||||
manifestValue("Corda-Release-Version") ?: "Unknown",
|
||||
manifestValue("Corda-Revision") ?: "Unknown",
|
||||
manifestValue("Corda-Vendor") ?: "Unknown"
|
||||
)
|
||||
}
|
||||
|
||||
private fun makeMessagingService(): MessagingService {
|
||||
return P2PMessagingClient(
|
||||
config = configuration,
|
||||
versionInfo = versionInfo,
|
||||
serverAddress = configuration.messagingServerAddress
|
||||
?: NetworkHostAndPort("localhost", configuration.p2pAddress.port),
|
||||
nodeExecutor = AffinityExecutor.ServiceAffinityExecutor("Flow Worker", 1),
|
||||
database = database,
|
||||
networkMap = networkMapCache,
|
||||
metricRegistry = metricRegistry,
|
||||
isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled,
|
||||
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values
|
||||
)
|
||||
}
|
||||
|
||||
private fun initialiseSerialization() {
|
||||
val serializationExists = try {
|
||||
effectiveSerializationEnv
|
||||
true
|
||||
} catch (e: IllegalStateException) {
|
||||
false
|
||||
}
|
||||
if (!serializationExists) {
|
||||
val classloader = cordappLoader.appClassLoader
|
||||
nodeSerializationEnv = SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps))
|
||||
registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps))
|
||||
registerScheme(KryoServerSerializationScheme())
|
||||
},
|
||||
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
|
||||
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),
|
||||
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
|
||||
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader),
|
||||
rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node
|
||||
}
|
||||
}
|
||||
|
||||
fun start() {
|
||||
initialiseSerialization()
|
||||
|
||||
// TODO First thing we do is create the MessagingService. This should have been done by the c'tor but it's not
|
||||
// possible (yet) to due restriction from MockNode
|
||||
network = makeMessagingService().tokenize()
|
||||
|
||||
// TODO
|
||||
configuration.configureWithDevSSLCertificate()
|
||||
val trustRoot = DEV_ROOT_CA.certificate
|
||||
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
|
||||
networkMapClient?.start(trustRoot)
|
||||
|
||||
servicesForResolution.start(networkParameters)
|
||||
persistentNetworkMapCache.start(networkParameters.notaries)
|
||||
|
||||
database.hikariStart(configuration.dataSourceProperties, configuration.database, schemaService)
|
||||
identityService.start(trustRoot, listOf(myInfo.legalIdentitiesAndCerts.first().certificate, nodeCa))
|
||||
|
||||
database.transaction {
|
||||
networkMapCache.start()
|
||||
}
|
||||
|
||||
// TODO
|
||||
//networkMapUpdater.start(trustRoot, signedNetParams.raw.hash, signedNodeInfo.raw.hash)
|
||||
|
||||
startMessaging()
|
||||
|
||||
database.transaction {
|
||||
identityService.loadIdentities(myInfo.legalIdentitiesAndCerts)
|
||||
attachments.start()
|
||||
cordappProvider.start(networkParameters.whitelistedContractImplementations)
|
||||
nodeProperties.start()
|
||||
keyManagementService.start(setOf(ourKeyPair))
|
||||
|
||||
contractUpgradeService.start()
|
||||
vaultService.start()
|
||||
HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService)
|
||||
|
||||
val frozenTokenizableServices = tokenizableServices!!
|
||||
tokenizableServices = null
|
||||
|
||||
smm.start(frozenTokenizableServices)
|
||||
runOnStop += { smm.stop(0) }
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
for (toRun in runOnStop.reversed()) {
|
||||
toRun()
|
||||
}
|
||||
runOnStop.clear()
|
||||
}
|
||||
|
||||
private fun startMessaging() {
|
||||
val client = network as P2PMessagingClient
|
||||
|
||||
val messageBroker = if (!configuration.messagingServerExternal) {
|
||||
val brokerBindAddress = configuration.messagingServerAddress
|
||||
?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
|
||||
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
// Start up the embedded MQ server
|
||||
messageBroker?.apply {
|
||||
closeOnStop()
|
||||
start()
|
||||
}
|
||||
client.closeOnStop()
|
||||
client.start(
|
||||
myIdentity = myInfo.legalIdentities[0].owningKey,
|
||||
serviceIdentity = if (myInfo.legalIdentities.size == 1) null else myInfo.legalIdentities[1].owningKey,
|
||||
advertisedAddress = myInfo.addresses.single(),
|
||||
maxMessageSize = networkParameters.maxMessageSize,
|
||||
legalName = myInfo.legalIdentities[0].name.toString()
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
private fun <T : AutoCloseable> T.closeOnStop(): T {
|
||||
runOnStop += this::close
|
||||
return this
|
||||
}
|
||||
|
||||
}
|
@ -35,6 +35,7 @@ include 'experimental:quasar-hook'
|
||||
include 'experimental:kryo-hook'
|
||||
// include 'experimental:intellij-plugin'
|
||||
include 'experimental:flow-hook'
|
||||
include 'experimental:flow-worker'
|
||||
include 'experimental:ha-testing'
|
||||
include 'experimental:corda-utils'
|
||||
include 'experimental:rpc-worker'
|
||||
|
@ -11,7 +11,7 @@ import java.nio.file.Paths
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentMap
|
||||
|
||||
internal object TestCordappDirectories {
|
||||
object TestCordappDirectories {
|
||||
|
||||
private val logger = loggerFor<TestCordappDirectories>()
|
||||
|
||||
@ -20,7 +20,7 @@ internal object TestCordappDirectories {
|
||||
|
||||
private val cordappsCache: ConcurrentMap<List<String>, Path> = ConcurrentHashMap<List<String>, Path>()
|
||||
|
||||
internal fun cached(cordapps: Iterable<TestCorDapp>, replaceExistingOnes: Boolean = false, cordappsDirectory: Path = defaultCordappsDirectory): Iterable<Path> {
|
||||
fun cached(cordapps: Iterable<TestCorDapp>, replaceExistingOnes: Boolean = false, cordappsDirectory: Path = defaultCordappsDirectory): Iterable<Path> {
|
||||
|
||||
cordappsDirectory.toFile().deleteOnExit()
|
||||
return cordapps.map { cached(it, replaceExistingOnes, cordappsDirectory) }
|
||||
|
Loading…
x
Reference in New Issue
Block a user