mirror of
https://github.com/corda/corda.git
synced 2025-01-27 06:39:38 +00:00
Fixes relating to testing flows and services.
Fixed issue where Corda services installed in unit tests were not being marked as serialise as singleton. Also the driver now automatically picks up the scanning annotations. This required moving the NodeFactory used in smoke tests into a separate module.
This commit is contained in:
parent
39555e4145
commit
1a47d60209
5
.idea/compiler.xml
generated
5
.idea/compiler.xml
generated
@ -60,6 +60,7 @@
|
||||
<module name="node-schemas_test" target="1.8" />
|
||||
<module name="node_integrationTest" target="1.8" />
|
||||
<module name="node_main" target="1.8" />
|
||||
<module name="node_smokeTest" target="1.8" />
|
||||
<module name="node_test" target="1.8" />
|
||||
<module name="notary-demo_main" target="1.8" />
|
||||
<module name="notary-demo_test" target="1.8" />
|
||||
@ -76,6 +77,8 @@
|
||||
<module name="simm-valuation-demo_integrationTest" target="1.8" />
|
||||
<module name="simm-valuation-demo_main" target="1.8" />
|
||||
<module name="simm-valuation-demo_test" target="1.8" />
|
||||
<module name="smoke-test-utils_main" target="1.8" />
|
||||
<module name="smoke-test-utils_test" target="1.8" />
|
||||
<module name="test-utils_main" target="1.8" />
|
||||
<module name="test-utils_test" target="1.8" />
|
||||
<module name="tools_main" target="1.8" />
|
||||
@ -98,4 +101,4 @@
|
||||
<component name="JavacSettings">
|
||||
<option name="ADDITIONAL_OPTIONS_STRING" value="-parameters" />
|
||||
</component>
|
||||
</project>
|
||||
</project>
|
@ -60,6 +60,7 @@ dependencies {
|
||||
testCompile project(':client:mock')
|
||||
|
||||
// Smoke tests do NOT have any Node code on the classpath!
|
||||
smokeTestCompile project(':smoke-test-utils')
|
||||
smokeTestCompile project(':finance')
|
||||
smokeTestCompile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version"
|
||||
smokeTestCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
|
||||
@ -76,5 +77,4 @@ task integrationTest(type: Test) {
|
||||
task smokeTest(type: Test) {
|
||||
testClassesDir = sourceSets.smokeTest.output.classesDir
|
||||
classpath = sourceSets.smokeTest.runtimeClasspath
|
||||
systemProperties['build.dir'] = buildDir
|
||||
}
|
||||
|
@ -2,17 +2,11 @@ package net.corda.kotlin.rpc
|
||||
|
||||
import com.google.common.hash.Hashing
|
||||
import com.google.common.hash.HashingInputStream
|
||||
import java.io.FilterInputStream
|
||||
import java.io.InputStream
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Duration.ofSeconds
|
||||
import java.util.Currency
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.test.*
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.client.rpc.notUsed
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.contracts.DOLLARS
|
||||
import net.corda.core.contracts.POUNDS
|
||||
import net.corda.core.contracts.SWISS_FRANCS
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.identity.Party
|
||||
@ -20,27 +14,34 @@ import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.StateMachineUpdate
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.messaging.startTrackedFlow
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.core.sizedInputStreamAndHash
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.flows.CashIssueFlow
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.smoketesting.NodeConfig
|
||||
import net.corda.smoketesting.NodeProcess
|
||||
import org.apache.commons.io.output.NullOutputStream
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.FilterInputStream
|
||||
import java.io.InputStream
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertNotEquals
|
||||
|
||||
class StandaloneCordaRPClientTest {
|
||||
private companion object {
|
||||
val log = loggerFor<StandaloneCordaRPClientTest>()
|
||||
val buildDir: Path = Paths.get(System.getProperty("build.dir"))
|
||||
val nodesDir: Path = buildDir.resolve("nodes")
|
||||
val user = User("user1", "test", permissions = setOf("ALL"))
|
||||
val factory = NodeProcess.Factory(nodesDir)
|
||||
val port = AtomicInteger(15000)
|
||||
const val attachmentSize = 2116
|
||||
const val timeout = 60L
|
||||
val timeout = 60.seconds
|
||||
}
|
||||
|
||||
private lateinit var notary: NodeProcess
|
||||
@ -59,7 +60,7 @@ class StandaloneCordaRPClientTest {
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
notary = factory.create(notaryConfig)
|
||||
notary = NodeProcess.Factory().create(notaryConfig)
|
||||
connection = notary.connect()
|
||||
rpcProxy = connection.proxy
|
||||
notaryIdentity = fetchNotaryIdentity()
|
||||
@ -91,7 +92,7 @@ class StandaloneCordaRPClientTest {
|
||||
@Test
|
||||
fun `test starting flow`() {
|
||||
rpcProxy.startFlow(::CashIssueFlow, 127.POUNDS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity)
|
||||
.returnValue.getOrThrow(ofSeconds(timeout))
|
||||
.returnValue.getOrThrow(timeout)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -104,7 +105,7 @@ class StandaloneCordaRPClientTest {
|
||||
log.info("Flow>> $msg")
|
||||
++trackCount
|
||||
}
|
||||
handle.returnValue.getOrThrow(ofSeconds(timeout))
|
||||
handle.returnValue.getOrThrow(timeout)
|
||||
assertNotEquals(0, trackCount)
|
||||
}
|
||||
|
||||
@ -128,7 +129,7 @@ class StandaloneCordaRPClientTest {
|
||||
|
||||
// Now issue some cash
|
||||
rpcProxy.startFlow(::CashIssueFlow, 513.SWISS_FRANCS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity)
|
||||
.returnValue.getOrThrow(ofSeconds(timeout))
|
||||
.returnValue.getOrThrow(timeout)
|
||||
assertEquals(1, updateCount)
|
||||
}
|
||||
|
||||
@ -145,7 +146,7 @@ class StandaloneCordaRPClientTest {
|
||||
|
||||
// Now issue some cash
|
||||
rpcProxy.startFlow(::CashIssueFlow, 629.POUNDS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity)
|
||||
.returnValue.getOrThrow(ofSeconds(timeout))
|
||||
.returnValue.getOrThrow(timeout)
|
||||
assertNotEquals(0, updateCount)
|
||||
|
||||
// Check that this cash exists in the vault
|
||||
|
@ -33,7 +33,6 @@ import java.util.zip.Deflater
|
||||
import java.util.zip.ZipEntry
|
||||
import java.util.zip.ZipInputStream
|
||||
import java.util.zip.ZipOutputStream
|
||||
import kotlin.collections.LinkedHashMap
|
||||
import kotlin.concurrent.withLock
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
@ -141,8 +140,8 @@ fun <A> ListenableFuture<out A>.toObservable(): Observable<A> {
|
||||
}
|
||||
|
||||
/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */
|
||||
operator fun Path.div(other: String) = resolve(other)
|
||||
operator fun String.div(other: String) = Paths.get(this) / other
|
||||
operator fun Path.div(other: String): Path = resolve(other)
|
||||
operator fun String.div(other: String): Path = Paths.get(this) / other
|
||||
|
||||
fun Path.createDirectory(vararg attrs: FileAttribute<*>): Path = Files.createDirectory(this, *attrs)
|
||||
fun Path.createDirectories(vararg attrs: FileAttribute<*>): Path = Files.createDirectories(this, *attrs)
|
||||
|
@ -16,8 +16,7 @@ import kotlin.annotation.AnnotationTarget.CLASS
|
||||
* only loaded in nodes that declare the type in their advertisedServices.
|
||||
*/
|
||||
// TODO Handle the singleton serialisation of Corda services automatically, removing the need to implement SerializeAsToken
|
||||
// TODO Currently all nodes which load the plugin will attempt to load the service even if it's not revelant to them. The
|
||||
// underlying problem is that the entire CorDapp jar is used as a dependency, when in fact it's just the client-facing
|
||||
// bit of the CorDapp that should be depended on (e.g. the initiating flows).
|
||||
// TODO Perhaps this should be an interface or abstract class due to the need for it to implement SerializeAsToken and
|
||||
// the need for the service type (which can be exposed by a simple getter)
|
||||
@Target(CLASS)
|
||||
annotation class CordaService
|
||||
|
@ -138,7 +138,7 @@ class AttachmentSerializationTest {
|
||||
}
|
||||
|
||||
private fun launchFlow(clientLogic: ClientLogic, rounds: Int) {
|
||||
server.registerFlowFactory(ClientLogic::class.java, object : InitiatedFlowFactory<ServerLogic> {
|
||||
server.internalRegisterFlowFactory(ClientLogic::class.java, object : InitiatedFlowFactory<ServerLogic> {
|
||||
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): ServerLogic {
|
||||
return ServerLogic(otherParty)
|
||||
}
|
||||
|
@ -15,7 +15,11 @@ UNRELEASED
|
||||
* ``CordaPluginRegistry.servicePlugins`` is also no longer used, along with ``PluginServiceHub.registerFlowInitiator``.
|
||||
Instead annotate your initiated flows with ``@InitiatedBy``. This annotation takes a single parameter which is the
|
||||
initiating flow. This initiating flow further has to be annotated with ``@InitiatingFlow``. For any services you
|
||||
may have, such as oracles, annotate them with ``@CordaService``.
|
||||
may have, such as oracles, annotate them with ``@CordaService``. These annotations will be picked up automatically
|
||||
when the node starts up.
|
||||
|
||||
* Due to these changes, when unit testing flows make sure to use ``AbstractNode.registerInitiatedFlow`` so that the flows
|
||||
are wired up. Likewise for services use ``AbstractNode.installCordaService``.
|
||||
|
||||
* Related to ``InitiatingFlow``, the ``shareParentSessions`` boolean parameter of ``FlowLogic.subFlow`` has been
|
||||
removed. This was an unfortunate parameter that unnecessarily exposed the inner workings of flow sessions. Now, if
|
||||
|
@ -80,4 +80,9 @@ valid) inside a ``database.transaction``. All node flows run within a database
|
||||
but any time we need to use the database directly from a unit test, you need to provide a database transaction as shown
|
||||
here.
|
||||
|
||||
And that's it: you can explore the documentation for the `MockNetwork API <api/kotlin/corda/net.corda.testing.node/-mock-network/index.html>`_ here.
|
||||
With regards to initiated flows (see :doc:`flow-state-machines` for information on initiated and initiating flows), the
|
||||
full node automatically registers them by scanning the CorDapp jars. In a unit test environment this is not possible so
|
||||
``MockNode`` has the ``registerInitiatedFlow`` method to manually register an initiated flow.
|
||||
|
||||
And that's it: you can explore the documentation for the `MockNetwork API <api/kotlin/corda/net.corda.testing.node/-mock-network/index.html>`_
|
||||
here.
|
||||
|
@ -275,3 +275,11 @@ Here's an example of it in action from ``FixingFlow.Fixer``.
|
||||
When overriding be careful when making the sub-class an anonymous or inner class (object declarations in Kotlin),
|
||||
because that kind of classes can access variables from the enclosing scope and cause serialization problems when
|
||||
checkpointed.
|
||||
|
||||
Testing
|
||||
-------
|
||||
|
||||
When unit testing we make use of the ``MockNetwork`` which allows us to create ``MockNode``s, which are simplified nodes
|
||||
suitable for tests. One feature we lose (and which is not suitable in unit testing anyway) is the node's ability to scan
|
||||
and automatically install orcales it finds in the CorDapp jars. Instead when working with ``MockNode`` use the
|
||||
``installCordaService`` method to manually install the oracle on the relevant node.
|
@ -25,6 +25,9 @@ configurations {
|
||||
|
||||
integrationTestCompile.extendsFrom testCompile
|
||||
integrationTestRuntime.extendsFrom testRuntime
|
||||
|
||||
smokeTestCompile.extendsFrom compile
|
||||
smokeTestRuntime.extendsFrom runtime
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
@ -38,6 +41,15 @@ sourceSets {
|
||||
srcDir file('src/integration-test/resources')
|
||||
}
|
||||
}
|
||||
smokeTest {
|
||||
kotlin {
|
||||
// We must NOT have any Node code on the classpath, so do NOT
|
||||
// include the test or integrationTest dependencies here.
|
||||
compileClasspath += main.output
|
||||
runtimeClasspath += main.output
|
||||
srcDir file('src/smoke-test/kotlin')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Use manual resource copying of log4j2.xml rather than source sets.
|
||||
@ -46,12 +58,15 @@ processResources {
|
||||
from file("$rootDir/config/dev/log4j2.xml")
|
||||
}
|
||||
|
||||
processIntegrationTestResources {
|
||||
processSmokeTestResources {
|
||||
// Build one of the demos so that we can test CorDapp scanning in CordappScanningTest. It doesn't matter which demo
|
||||
// we use, just make sure the test is updated accordingly.
|
||||
from(project(':samples:trader-demo').tasks.jar) {
|
||||
rename 'trader-demo-(.*)', 'trader-demo.jar'
|
||||
}
|
||||
from(project(':node:capsule').tasks.buildCordaJAR) {
|
||||
rename 'corda-(.*)', 'corda.jar'
|
||||
}
|
||||
}
|
||||
|
||||
// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in
|
||||
@ -165,9 +180,19 @@ dependencies {
|
||||
|
||||
// Integration test helpers
|
||||
integrationTestCompile "junit:junit:$junit_version"
|
||||
|
||||
// Smoke tests do NOT have any Node code on the classpath!
|
||||
smokeTestCompile project(':smoke-test-utils')
|
||||
smokeTestCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
smokeTestCompile "junit:junit:$junit_version"
|
||||
}
|
||||
|
||||
task integrationTest(type: Test) {
|
||||
testClassesDir = sourceSets.integrationTest.output.classesDir
|
||||
classpath = sourceSets.integrationTest.runtimeClasspath
|
||||
}
|
||||
|
||||
task smokeTest(type: Test) {
|
||||
testClassesDir = sourceSets.smokeTest.output.classesDir
|
||||
classpath = sourceSets.smokeTest.runtimeClasspath
|
||||
}
|
||||
|
@ -35,7 +35,6 @@ import okhttp3.Request
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.slf4j.Logger
|
||||
import java.io.File
|
||||
import java.io.File.pathSeparator
|
||||
import java.net.*
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
@ -659,13 +658,19 @@ class DriverDSL(
|
||||
debugPort: Int?,
|
||||
overriddenSystemProperties: Map<String, String>
|
||||
): ListenableFuture<Process> {
|
||||
return executorService.submit<Process> {
|
||||
// Get the package of the caller of the driver and pass this to the node for CorDapp scanning
|
||||
val callerPackage = Exception()
|
||||
.stackTrace
|
||||
.first { it.fileName != "Driver.kt" }
|
||||
.let { Class.forName(it.className).`package`.name }
|
||||
val processFuture = executorService.submit<Process> {
|
||||
// Write node.conf
|
||||
writeConfig(nodeConf.baseDirectory, "node.conf", config)
|
||||
|
||||
val systemProperties = overriddenSystemProperties + mapOf(
|
||||
"name" to nodeConf.myLegalName,
|
||||
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
|
||||
"net.corda.node.cordapp.scan.package" to callerPackage,
|
||||
"java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process
|
||||
)
|
||||
// TODO Add this once we upgrade to quasar 0.7.8, this causes startup time to halve.
|
||||
@ -676,8 +681,6 @@ class DriverDSL(
|
||||
"-javaagent:$quasarJarPath"
|
||||
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
|
||||
|
||||
val pluginsDirectory = nodeConf.baseDirectory / "plugins"
|
||||
|
||||
ProcessUtilities.startJavaProcess(
|
||||
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
|
||||
arguments = listOf(
|
||||
@ -685,14 +688,15 @@ class DriverDSL(
|
||||
"--logging-level=$loggingLevel",
|
||||
"--no-local-shell"
|
||||
),
|
||||
// Like the capsule, include the node's plugin directory
|
||||
classpath = "${ProcessUtilities.defaultClassPath}$pathSeparator$pluginsDirectory/*",
|
||||
jdwpPort = debugPort,
|
||||
extraJvmArguments = extraJvmArguments,
|
||||
errorLogPath = nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log",
|
||||
workingDirectory = nodeConf.baseDirectory
|
||||
)
|
||||
}.flatMap { process -> addressMustBeBound(executorService, nodeConf.p2pAddress, process).map { process } }
|
||||
}
|
||||
return processFuture.flatMap {
|
||||
process -> addressMustBeBound(executorService, nodeConf.p2pAddress, process).map { process }
|
||||
}
|
||||
}
|
||||
|
||||
private fun startWebserver(
|
||||
|
@ -222,6 +222,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
serverThread,
|
||||
database,
|
||||
busyNodeLatch)
|
||||
|
||||
smm.tokenizableServices.addAll(tokenizableServices)
|
||||
|
||||
if (serverThread is ExecutorService) {
|
||||
runOnStop += Runnable {
|
||||
// We wait here, even though any in-flight messages should have been drained away because the
|
||||
@ -240,10 +243,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
startMessagingService(rpcOps)
|
||||
installCoreFlows()
|
||||
|
||||
val scanResult = scanCorDapps()
|
||||
val scanResult = scanCordapps()
|
||||
if (scanResult != null) {
|
||||
val cordappServices = installCordaServices(scanResult)
|
||||
tokenizableServices.addAll(cordappServices)
|
||||
installCordaServices(scanResult)
|
||||
registerInitiatedFlows(scanResult)
|
||||
rpcFlows = findRPCFlows(scanResult)
|
||||
} else {
|
||||
@ -257,7 +259,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
|
||||
runOnStop += Runnable { net.stop() }
|
||||
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
|
||||
smm.start(tokenizableServices)
|
||||
smm.start()
|
||||
// Shut down the SMM so no Fibers are scheduled.
|
||||
runOnStop += Runnable { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||
scheduler.start()
|
||||
@ -266,34 +268,37 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return this
|
||||
}
|
||||
|
||||
private fun installCordaServices(scanResult: ScanResult): List<SerializeAsToken> {
|
||||
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class).mapNotNull {
|
||||
tryInstallCordaService(it)
|
||||
private fun installCordaServices(scanResult: ScanResult) {
|
||||
fun getServiceType(clazz: Class<*>): ServiceType? {
|
||||
return try {
|
||||
clazz.getField("type").get(null) as ServiceType
|
||||
} catch (e: NoSuchFieldException) {
|
||||
log.warn("${clazz.name} does not have a type field, optimistically proceeding with install.")
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T : SerializeAsToken> tryInstallCordaService(serviceClass: Class<T>): T? {
|
||||
/** TODO: This mechanism may get replaced by a different one, see comments on [CordaService]. */
|
||||
val typeField = try {
|
||||
serviceClass.getField("type")
|
||||
} catch (e: NoSuchFieldException) {
|
||||
null
|
||||
}
|
||||
if (typeField == null) {
|
||||
log.warn("${serviceClass.name} does not have a type field, optimistically proceeding with install.")
|
||||
} else if (info.serviceIdentities(typeField.get(null) as ServiceType).isEmpty()) {
|
||||
return null
|
||||
}
|
||||
return try {
|
||||
installCordaService(serviceClass)
|
||||
} catch (e: NoSuchMethodException) {
|
||||
log.error("${serviceClass.name}, as a Corda service, must have a constructor with a single parameter " +
|
||||
"of type ${PluginServiceHub::class.java.name}")
|
||||
null
|
||||
} catch (e: Exception) {
|
||||
log.error("Unable to install Corda service ${serviceClass.name}", e)
|
||||
null
|
||||
}
|
||||
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
|
||||
.filter {
|
||||
val serviceType = getServiceType(it)
|
||||
if (serviceType != null && info.serviceIdentities(serviceType).isEmpty()) {
|
||||
log.debug { "Ignoring ${it.name} as a Corda service since $serviceType is not one of our " +
|
||||
"advertised services" }
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
.forEach {
|
||||
try {
|
||||
installCordaService(it)
|
||||
} catch (e: NoSuchMethodException) {
|
||||
log.error("${it.name}, as a Corda service, must have a constructor with a single parameter " +
|
||||
"of type ${PluginServiceHub::class.java.name}")
|
||||
} catch (e: Exception) {
|
||||
log.error("Unable to install Corda service ${it.name}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -305,6 +310,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
val ctor = clazz.getDeclaredConstructor(PluginServiceHub::class.java).apply { isAccessible = true }
|
||||
val service = ctor.newInstance(services)
|
||||
cordappServices.putInstance(clazz, service)
|
||||
smm.tokenizableServices += service
|
||||
log.info("Installed ${clazz.name} Corda service")
|
||||
return service
|
||||
}
|
||||
@ -371,16 +377,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
"${InitiatingFlow::class.java.name} must be annotated on ${initiatingFlow.name} and not on a super-type"
|
||||
}
|
||||
val flowFactory = InitiatedFlowFactory.CorDapp(version, { ctor.newInstance(it) })
|
||||
val observable = registerFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
|
||||
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
|
||||
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
|
||||
return observable
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
fun <F : FlowLogic<*>> registerFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
|
||||
flowFactory: InitiatedFlowFactory<F>,
|
||||
initiatedFlowClass: Class<F>,
|
||||
track: Boolean): Observable<F> {
|
||||
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
|
||||
flowFactory: InitiatedFlowFactory<F>,
|
||||
initiatedFlowClass: Class<F>,
|
||||
track: Boolean): Observable<F> {
|
||||
val observable = if (track) {
|
||||
smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass)
|
||||
} else {
|
||||
@ -453,14 +459,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
|
||||
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
|
||||
makeAdvertisedServices(tokenizableServices)
|
||||
|
||||
return tokenizableServices
|
||||
}
|
||||
|
||||
private fun scanCorDapps(): ScanResult? {
|
||||
private fun scanCordapps(): ScanResult? {
|
||||
val scanPackage = System.getProperty("net.corda.node.cordapp.scan.package")
|
||||
val paths = if (scanPackage != null) {
|
||||
// This is purely for integration tests so that classes defined in the test can automatically be picked up
|
||||
// Rather than looking in the plugins directory, figure out the classpath for the given package and scan that
|
||||
// instead. This is used in tests where we avoid having to package stuff up in jars and then having to move
|
||||
// them to the plugins directory for each node.
|
||||
check(configuration.devMode) { "Package scanning can only occur in dev mode" }
|
||||
val resource = scanPackage.replace('.', '/')
|
||||
javaClass.classLoader.getResources(resource)
|
||||
@ -545,7 +552,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
private fun hasSSLCertificates(): Boolean {
|
||||
val (sslKeystore, keystore) = try {
|
||||
// This will throw IOException if key file not found or KeyStoreException if keystore password is incorrect.
|
||||
Pair(KeyStoreUtilities.loadKeyStore(configuration.sslKeystore, configuration.keyStorePassword), KeyStoreUtilities.loadKeyStore(configuration.nodeKeystore, configuration.keyStorePassword))
|
||||
Pair(
|
||||
KeyStoreUtilities.loadKeyStore(configuration.sslKeystore, configuration.keyStorePassword),
|
||||
KeyStoreUtilities.loadKeyStore(configuration.nodeKeystore, configuration.keyStorePassword))
|
||||
} catch (e: IOException) {
|
||||
return false
|
||||
} catch (e: KeyStoreException) {
|
||||
|
@ -36,6 +36,7 @@ import rx.subjects.PublishSubject
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.collections.ArrayList
|
||||
|
||||
/**
|
||||
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
|
||||
@ -145,8 +146,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
private val openSessions = ConcurrentHashMap<Long, FlowSession>()
|
||||
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
|
||||
|
||||
internal val tokenizableServices = ArrayList<Any>()
|
||||
// Context for tokenized services in checkpoints
|
||||
private lateinit var serializationContext: SerializeAsTokenContext
|
||||
private val serializationContext by lazy {
|
||||
SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
|
||||
}
|
||||
|
||||
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
|
||||
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
|
||||
@ -170,8 +174,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
*/
|
||||
val changes: Observable<Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
|
||||
|
||||
fun start(tokenizableServices: List<Any>) {
|
||||
serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
|
||||
fun start() {
|
||||
restoreFibersFromCheckpoints()
|
||||
listenToLedgerTransactions()
|
||||
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
|
||||
@ -348,7 +351,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
val initiatedFlowFactory = serviceHub.getFlowFactory(sessionInit.initiatingFlowClass)
|
||||
if (initiatedFlowFactory == null) {
|
||||
logger.warn("${sessionInit.initiatingFlowClass} has not been registered: $sessionInit")
|
||||
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered with a service flow")
|
||||
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -18,40 +18,57 @@ import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.driver.driver
|
||||
import net.corda.node.services.startFlowPermission
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.smoketesting.NodeConfig
|
||||
import net.corda.smoketesting.NodeProcess
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import java.nio.file.Paths
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class CordappScanningTest {
|
||||
private companion object {
|
||||
val user = User("user1", "test", permissions = setOf("ALL"))
|
||||
val port = AtomicInteger(15100)
|
||||
}
|
||||
|
||||
private val factory = NodeProcess.Factory()
|
||||
|
||||
private val aliceConfig = NodeConfig(
|
||||
party = ALICE,
|
||||
p2pPort = port.andIncrement,
|
||||
rpcPort = port.andIncrement,
|
||||
webPort = port.andIncrement,
|
||||
extraServices = emptyList(),
|
||||
users = listOf(user)
|
||||
)
|
||||
|
||||
@Test
|
||||
fun `CorDapp jar in plugins directory is scanned`() {
|
||||
// If the CorDapp jar does't exist then run the integrationTestClasses gradle task
|
||||
// If the CorDapp jar does't exist then run the smokeTestClasses gradle task
|
||||
val cordappJar = Paths.get(javaClass.getResource("/trader-demo.jar").toURI())
|
||||
driver {
|
||||
val pluginsDir = (baseDirectory(ALICE.name) / "plugins").createDirectories()
|
||||
cordappJar.copyToDirectory(pluginsDir)
|
||||
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
|
||||
cordappJar.copyToDirectory(pluginsDir)
|
||||
|
||||
val user = User("u", "p", emptySet())
|
||||
val alice = startNode(ALICE.name, rpcUsers = listOf(user)).getOrThrow()
|
||||
val rpc = alice.rpcClientToNode().start(user.username, user.password)
|
||||
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
|
||||
assertThat(rpc.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
|
||||
factory.create(aliceConfig).use {
|
||||
it.connect().use {
|
||||
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
|
||||
assertThat(it.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `empty plugins directory`() {
|
||||
driver {
|
||||
val baseDirectory = baseDirectory(ALICE.name)
|
||||
(baseDirectory / "plugins").createDirectories()
|
||||
startNode(ALICE.name).getOrThrow()
|
||||
}
|
||||
(factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
|
||||
factory.create(aliceConfig).close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `sub-classed initiated flow pointing to the same initiating flow as its super-class`() {
|
||||
val user = User("u", "p", setOf(startFlowPermission<ReceiveFlow>()))
|
||||
driver(systemProperties = mapOf("net.corda.node.cordapp.scan.package" to "net.corda.node")) {
|
||||
// We don't use the factory for this test because we want the node to pick up the annotated flows below. The driver
|
||||
// will do just that.
|
||||
driver {
|
||||
val (alice, bob) = Futures.allAsList(
|
||||
startNode(ALICE.name, rpcUsers = listOf(user)),
|
||||
startNode(BOB.name)).getOrThrow()
|
@ -99,7 +99,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
smmHasRemovedAllFlows.countDown()
|
||||
}
|
||||
}
|
||||
mockSMM.start(listOf(services, scheduler))
|
||||
mockSMM.start()
|
||||
services.smm = mockSMM
|
||||
scheduler.start()
|
||||
}
|
||||
@ -124,7 +124,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
|
||||
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true
|
||||
|
||||
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? = ScheduledActivity(flowLogicRef, instant)
|
||||
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
|
||||
return ScheduledActivity(flowLogicRef, instant)
|
||||
}
|
||||
|
||||
override val contract: Contract
|
||||
get() = throw UnsupportedOperationException()
|
||||
|
@ -625,7 +625,7 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `unsupported new flow version`() {
|
||||
node2.registerFlowFactory(
|
||||
node2.internalRegisterFlowFactory(
|
||||
UpgradedFlow::class.java,
|
||||
InitiatedFlowFactory.CorDapp(version = 1, factory = ::DoubleInlinedSubFlow),
|
||||
DoubleInlinedSubFlow::class.java,
|
||||
@ -675,7 +675,7 @@ class FlowFrameworkTests {
|
||||
initiatingFlowClass: KClass<out FlowLogic<*>>,
|
||||
noinline flowFactory: (Party) -> P): ListenableFuture<P>
|
||||
{
|
||||
val observable = registerFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
|
||||
val observable = internalRegisterFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
|
||||
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): P {
|
||||
return flowFactory(otherParty)
|
||||
}
|
||||
|
@ -33,11 +33,7 @@ class IRSDemoTest : IntegrationTestCategory {
|
||||
|
||||
@Test
|
||||
fun `runs IRS demo`() {
|
||||
driver(
|
||||
useTestClock = true,
|
||||
isDebug = true,
|
||||
systemProperties = mapOf("net.corda.node.cordapp.scan.package" to "net.corda.irs"))
|
||||
{
|
||||
driver(useTestClock = true, isDebug = true) {
|
||||
val (controller, nodeA, nodeB) = Futures.allAsList(
|
||||
startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type))),
|
||||
startNode(DUMMY_BANK_A.name, rpcUsers = listOf(rpcUser)),
|
||||
|
@ -55,7 +55,8 @@ object NodeInterestRates {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<RatesFixFlow.SignRequest>(otherParty).unwrap { it }
|
||||
send(otherParty, serviceHub.cordaService(Oracle::class.java).sign(request.ftx))
|
||||
val oracle = serviceHub.cordaService(Oracle::class.java)
|
||||
send(otherParty, oracle.sign(request.ftx))
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,7 +71,8 @@ object NodeInterestRates {
|
||||
override fun call(): Unit {
|
||||
val request = receive<RatesFixFlow.QueryRequest>(otherParty).unwrap { it }
|
||||
progressTracker.currentStep = RECEIVED
|
||||
val answers = serviceHub.cordaService(Oracle::class.java).query(request.queries, request.deadline)
|
||||
val oracle = serviceHub.cordaService(Oracle::class.java)
|
||||
val answers = oracle.query(request.queries, request.deadline)
|
||||
progressTracker.currentStep = SENDING
|
||||
send(otherParty, answers)
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ class SimmValuationTest : IntegrationTestCategory {
|
||||
|
||||
@Test
|
||||
fun `runs SIMM valuation demo`() {
|
||||
driver(isDebug = true, systemProperties = mapOf("net.corda.node.cordapp.scan.package" to "net.corda.vega")) {
|
||||
driver(isDebug = true) {
|
||||
startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type))).getOrThrow()
|
||||
val (nodeA, nodeB) = Futures.allAsList(startNode(nodeALegalName), startNode(nodeBLegalName)).getOrThrow()
|
||||
val (nodeAApi, nodeBApi) = Futures.allAsList(startWebserver(nodeA), startWebserver(nodeB))
|
||||
|
@ -20,6 +20,7 @@ include 'experimental:sandbox'
|
||||
include 'experimental:quasar-hook'
|
||||
include 'verifier'
|
||||
include 'test-utils'
|
||||
include 'smoke-test-utils'
|
||||
include 'tools:explorer'
|
||||
include 'tools:explorer:capsule'
|
||||
include 'tools:demobench'
|
||||
|
8
smoke-test-utils/build.gradle
Normal file
8
smoke-test-utils/build.gradle
Normal file
@ -0,0 +1,8 @@
|
||||
apply plugin: 'kotlin'
|
||||
|
||||
description 'Utilities needed for smoke tests in Corda'
|
||||
|
||||
dependencies {
|
||||
// Smoke tests do NOT have any Node code on the classpath!
|
||||
compile project(':client:rpc')
|
||||
}
|
@ -1,6 +1,10 @@
|
||||
package net.corda.kotlin.rpc
|
||||
package net.corda.smoketesting
|
||||
|
||||
import com.typesafe.config.*
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory.empty
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import com.typesafe.config.ConfigValue
|
||||
import com.typesafe.config.ConfigValueFactory
|
||||
import net.corda.core.crypto.commonName
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.nodeapi.User
|
||||
@ -18,13 +22,13 @@ class NodeConfig(
|
||||
val renderOptions: ConfigRenderOptions = ConfigRenderOptions.defaults().setOriginComments(false)
|
||||
}
|
||||
|
||||
val commonName: String = party.name.commonName
|
||||
val commonName: String get() = party.name.commonName
|
||||
|
||||
/*
|
||||
* The configuration object depends upon the networkMap,
|
||||
* which is mutable.
|
||||
*/
|
||||
fun toFileConfig(): Config = ConfigFactory.empty()
|
||||
fun toFileConfig(): Config = empty()
|
||||
.withValue("myLegalName", valueFor(party.name.toString()))
|
||||
.withValue("p2pAddress", addressValueFor(p2pPort))
|
||||
.withValue("extraAdvertisedServiceIds", valueFor(extraServices))
|
||||
@ -42,7 +46,6 @@ class NodeConfig(
|
||||
private fun <T> valueFor(any: T): ConfigValue? = ConfigValueFactory.fromAnyRef(any)
|
||||
private fun addressValueFor(port: Int) = valueFor("localhost:$port")
|
||||
private inline fun <T> optional(path: String, obj: T?, body: (Config, T) -> Config): Config {
|
||||
val config = ConfigFactory.empty()
|
||||
return if (obj == null) config else body(config, obj).atPath(path)
|
||||
return if (obj == null) empty() else body(empty(), obj).atPath(path)
|
||||
}
|
||||
}
|
@ -1,16 +1,18 @@
|
||||
package net.corda.kotlin.rpc
|
||||
package net.corda.smoketesting
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.core.createDirectories
|
||||
import net.corda.core.div
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import java.io.File
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId.systemDefault
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import kotlin.test.*
|
||||
|
||||
class NodeProcess(
|
||||
val config: NodeConfig,
|
||||
@ -21,9 +23,7 @@ class NodeProcess(
|
||||
private companion object {
|
||||
val log = loggerFor<NodeProcess>()
|
||||
val javaPath: Path = Paths.get(System.getProperty("java.home"), "bin", "java")
|
||||
val corda = File(this::class.java.getResource("/corda.jar").toURI())
|
||||
val buildDir: Path = Paths.get(System.getProperty("build.dir"))
|
||||
val capsuleDir: Path = buildDir.resolve("capsule")
|
||||
val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(systemDefault())
|
||||
}
|
||||
|
||||
fun connect(): CordaRPCConnection {
|
||||
@ -40,16 +40,20 @@ class NodeProcess(
|
||||
}
|
||||
|
||||
log.info("Deleting Artemis directories, because they're large!")
|
||||
nodeDir.resolve("artemis").toFile().deleteRecursively()
|
||||
(nodeDir / "artemis").toFile().deleteRecursively()
|
||||
}
|
||||
|
||||
class Factory(val nodesDir: Path) {
|
||||
class Factory(val buildDirectory: Path = Paths.get("build"),
|
||||
val cordaJar: Path = Paths.get(this::class.java.getResource("/corda.jar").toURI())) {
|
||||
val nodesDirectory = buildDirectory / formatter.format(Instant.now())
|
||||
init {
|
||||
assertTrue(nodesDir.toFile().forceDirectory(), "Directory '$nodesDir' does not exist")
|
||||
nodesDirectory.createDirectories()
|
||||
}
|
||||
|
||||
fun baseDirectory(config: NodeConfig): Path = nodesDirectory / config.commonName
|
||||
|
||||
fun create(config: NodeConfig): NodeProcess {
|
||||
val nodeDir = Files.createTempDirectory(nodesDir, config.commonName)
|
||||
val nodeDir = baseDirectory(config).createDirectories()
|
||||
log.info("Node directory: {}", nodeDir)
|
||||
|
||||
val confFile = nodeDir.resolve("node.conf").toFile()
|
||||
@ -78,7 +82,7 @@ class NodeProcess(
|
||||
}, 5, 1, SECONDS)
|
||||
|
||||
val setupOK = setupExecutor.awaitTermination(120, SECONDS)
|
||||
assertTrue(setupOK && process.isAlive, "Failed to create RPC connection")
|
||||
check(setupOK && process.isAlive) { "Failed to create RPC connection" }
|
||||
} catch (e: Exception) {
|
||||
process.destroyForcibly()
|
||||
throw e
|
||||
@ -91,17 +95,14 @@ class NodeProcess(
|
||||
|
||||
private fun startNode(nodeDir: Path): Process {
|
||||
val builder = ProcessBuilder()
|
||||
.command(javaPath.toString(), "-jar", corda.path)
|
||||
.command(javaPath.toString(), "-jar", cordaJar.toString())
|
||||
.directory(nodeDir.toFile())
|
||||
|
||||
builder.environment().putAll(mapOf(
|
||||
"CAPSULE_CACHE_DIR" to capsuleDir.toString()
|
||||
"CAPSULE_CACHE_DIR" to (buildDirectory / "capsule").toString()
|
||||
))
|
||||
|
||||
return builder.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun File.forceDirectory(): Boolean = this.isDirectory || this.mkdirs()
|
||||
|
Loading…
x
Reference in New Issue
Block a user