Fixes relating to testing flows and services.

Fixed issue where Corda services installed in unit tests were not being marked as serialise as singleton. Also the driver now automatically picks up the scanning annotations. This required moving the NodeFactory used in smoke tests into a separate module.
This commit is contained in:
Shams Asari
2017-06-02 15:47:20 +01:00
parent 08cbcac40c
commit afa3efb308
23 changed files with 220 additions and 130 deletions

View File

@ -36,7 +36,6 @@ import okhttp3.Request
import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger
import java.io.File
import java.io.File.pathSeparator
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
@ -668,13 +667,19 @@ class DriverDSL(
debugPort: Int?,
overriddenSystemProperties: Map<String, String>
): ListenableFuture<Process> {
return executorService.submit<Process> {
// Get the package of the caller of the driver and pass this to the node for CorDapp scanning
val callerPackage = Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`.name }
val processFuture = executorService.submit<Process> {
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
"net.corda.node.cordapp.scan.package" to callerPackage,
"java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process
)
// TODO Add this once we upgrade to quasar 0.7.8, this causes startup time to halve.
@ -685,8 +690,6 @@ class DriverDSL(
"-javaagent:$quasarJarPath"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val pluginsDirectory = nodeConf.baseDirectory / "plugins"
ProcessUtilities.startJavaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf(
@ -694,14 +697,15 @@ class DriverDSL(
"--logging-level=$loggingLevel",
"--no-local-shell"
),
// Like the capsule, include the node's plugin directory
classpath = "${ProcessUtilities.defaultClassPath}$pathSeparator$pluginsDirectory/*",
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory
)
}.flatMap { process -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process } }
}
return processFuture.flatMap {
process -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process }
}
}
private fun startWebserver(

View File

@ -222,6 +222,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
serverThread,
database,
busyNodeLatch)
smm.tokenizableServices.addAll(tokenizableServices)
if (serverThread is ExecutorService) {
runOnStop += Runnable {
// We wait here, even though any in-flight messages should have been drained away because the
@ -240,10 +243,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
startMessagingService(rpcOps)
installCoreFlows()
val scanResult = scanCorDapps()
val scanResult = scanCordapps()
if (scanResult != null) {
val cordappServices = installCordaServices(scanResult)
tokenizableServices.addAll(cordappServices)
installCordaServices(scanResult)
registerInitiatedFlows(scanResult)
rpcFlows = findRPCFlows(scanResult)
} else {
@ -257,7 +259,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
runOnStop += Runnable { network.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
smm.start(tokenizableServices)
smm.start()
// Shut down the SMM so no Fibers are scheduled.
runOnStop += Runnable { smm.stop(acceptableLiveFiberCountOnStop()) }
scheduler.start()
@ -266,34 +268,37 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return this
}
private fun installCordaServices(scanResult: ScanResult): List<SerializeAsToken> {
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class).mapNotNull {
tryInstallCordaService(it)
private fun installCordaServices(scanResult: ScanResult) {
fun getServiceType(clazz: Class<*>): ServiceType? {
return try {
clazz.getField("type").get(null) as ServiceType
} catch (e: NoSuchFieldException) {
log.warn("${clazz.name} does not have a type field, optimistically proceeding with install.")
null
}
}
}
private fun <T : SerializeAsToken> tryInstallCordaService(serviceClass: Class<T>): T? {
/** TODO: This mechanism may get replaced by a different one, see comments on [CordaService]. */
val typeField = try {
serviceClass.getField("type")
} catch (e: NoSuchFieldException) {
null
}
if (typeField == null) {
log.warn("${serviceClass.name} does not have a type field, optimistically proceeding with install.")
} else if (info.serviceIdentities(typeField.get(null) as ServiceType).isEmpty()) {
return null
}
return try {
installCordaService(serviceClass)
} catch (e: NoSuchMethodException) {
log.error("${serviceClass.name}, as a Corda service, must have a constructor with a single parameter " +
"of type ${PluginServiceHub::class.java.name}")
null
} catch (e: Exception) {
log.error("Unable to install Corda service ${serviceClass.name}", e)
null
}
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
.filter {
val serviceType = getServiceType(it)
if (serviceType != null && info.serviceIdentities(serviceType).isEmpty()) {
log.debug { "Ignoring ${it.name} as a Corda service since $serviceType is not one of our " +
"advertised services" }
false
} else {
true
}
}
.forEach {
try {
installCordaService(it)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as a Corda service, must have a constructor with a single parameter " +
"of type ${PluginServiceHub::class.java.name}")
} catch (e: Exception) {
log.error("Unable to install Corda service ${it.name}", e)
}
}
}
/**
@ -305,6 +310,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val ctor = clazz.getDeclaredConstructor(PluginServiceHub::class.java).apply { isAccessible = true }
val service = ctor.newInstance(services)
cordappServices.putInstance(clazz, service)
smm.tokenizableServices += service
log.info("Installed ${clazz.name} Corda service")
return service
}
@ -371,16 +377,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
"${InitiatingFlow::class.java.name} must be annotated on ${initiatingFlow.name} and not on a super-type"
}
val flowFactory = InitiatedFlowFactory.CorDapp(version, { ctor.newInstance(it) })
val observable = registerFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
return observable
}
@VisibleForTesting
fun <F : FlowLogic<*>> registerFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
val observable = if (track) {
smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass)
} else {
@ -453,14 +459,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val tokenizableServices = mutableListOf(storage, network, vault, keyManagement, identity, platformClock, scheduler)
makeAdvertisedServices(tokenizableServices)
return tokenizableServices
}
private fun scanCorDapps(): ScanResult? {
private fun scanCordapps(): ScanResult? {
val scanPackage = System.getProperty("net.corda.node.cordapp.scan.package")
val paths = if (scanPackage != null) {
// This is purely for integration tests so that classes defined in the test can automatically be picked up
// Rather than looking in the plugins directory, figure out the classpath for the given package and scan that
// instead. This is used in tests where we avoid having to package stuff up in jars and then having to move
// them to the plugins directory for each node.
check(configuration.devMode) { "Package scanning can only occur in dev mode" }
val resource = scanPackage.replace('.', '/')
javaClass.classLoader.getResources(resource)
@ -545,7 +552,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun hasSSLCertificates(): Boolean {
val (sslKeystore, keystore) = try {
// This will throw IOException if key file not found or KeyStoreException if keystore password is incorrect.
Pair(KeyStoreUtilities.loadKeyStore(configuration.sslKeystore, configuration.keyStorePassword), KeyStoreUtilities.loadKeyStore(configuration.nodeKeystore, configuration.keyStorePassword))
Pair(
KeyStoreUtilities.loadKeyStore(configuration.sslKeystore, configuration.keyStorePassword),
KeyStoreUtilities.loadKeyStore(configuration.nodeKeystore, configuration.keyStorePassword))
} catch (e: IOException) {
return false
} catch (e: KeyStoreException) {

View File

@ -36,6 +36,7 @@ import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
@ -145,8 +146,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val openSessions = ConcurrentHashMap<Long, FlowSession>()
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
internal val tokenizableServices = ArrayList<Any>()
// Context for tokenized services in checkpoints
private lateinit var serializationContext: SerializeAsTokenContext
private val serializationContext by lazy {
SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
}
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@ -170,8 +174,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*/
val changes: Observable<Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
fun start(tokenizableServices: List<Any>) {
serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
fun start() {
restoreFibersFromCheckpoints()
listenToLedgerTransactions()
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
@ -348,7 +351,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val initiatedFlowFactory = serviceHub.getFlowFactory(sessionInit.initiatingFlowClass)
if (initiatedFlowFactory == null) {
logger.warn("${sessionInit.initiatingFlowClass} has not been registered: $sessionInit")
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered with a service flow")
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered")
return
}

View File

@ -18,40 +18,57 @@ import net.corda.core.utilities.unwrap
import net.corda.node.driver.driver
import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
class CordappScanningTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
private val factory = NodeProcess.Factory()
private val aliceConfig = NodeConfig(
party = ALICE,
p2pPort = port.andIncrement,
rpcPort = port.andIncrement,
webPort = port.andIncrement,
extraServices = emptyList(),
users = listOf(user)
)
@Test
fun `CorDapp jar in plugins directory is scanned`() {
// If the CorDapp jar does't exist then run the integrationTestClasses gradle task
// If the CorDapp jar does't exist then run the smokeTestClasses gradle task
val cordappJar = Paths.get(javaClass.getResource("/trader-demo.jar").toURI())
driver {
val pluginsDir = (baseDirectory(ALICE.name) / "plugins").createDirectories()
cordappJar.copyToDirectory(pluginsDir)
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
cordappJar.copyToDirectory(pluginsDir)
val user = User("u", "p", emptySet())
val alice = startNode(ALICE.name, rpcUsers = listOf(user)).getOrThrow()
val rpc = alice.rpcClientToNode().start(user.username, user.password)
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
assertThat(rpc.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
factory.create(aliceConfig).use {
it.connect().use {
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
assertThat(it.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
}
}
}
@Test
fun `empty plugins directory`() {
driver {
val baseDirectory = baseDirectory(ALICE.name)
(baseDirectory / "plugins").createDirectories()
startNode(ALICE.name).getOrThrow()
}
(factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
factory.create(aliceConfig).close()
}
@Test
fun `sub-classed initiated flow pointing to the same initiating flow as its super-class`() {
val user = User("u", "p", setOf(startFlowPermission<ReceiveFlow>()))
driver(systemProperties = mapOf("net.corda.node.cordapp.scan.package" to "net.corda.node")) {
// We don't use the factory for this test because we want the node to pick up the annotated flows below. The driver
// will do just that.
driver {
val (alice, bob) = Futures.allAsList(
startNode(ALICE.name, rpcUsers = listOf(user)),
startNode(BOB.name)).getOrThrow()

View File

@ -99,7 +99,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
smmHasRemovedAllFlows.countDown()
}
}
mockSMM.start(listOf(services, scheduler))
mockSMM.start()
services.smm = mockSMM
scheduler.start()
}
@ -124,7 +124,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? = ScheduledActivity(flowLogicRef, instant)
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
return ScheduledActivity(flowLogicRef, instant)
}
override val contract: Contract
get() = throw UnsupportedOperationException()

View File

@ -625,7 +625,7 @@ class FlowFrameworkTests {
@Test
fun `unsupported new flow version`() {
node2.registerFlowFactory(
node2.internalRegisterFlowFactory(
UpgradedFlow::class.java,
InitiatedFlowFactory.CorDapp(version = 1, factory = ::DoubleInlinedSubFlow),
DoubleInlinedSubFlow::class.java,
@ -675,7 +675,7 @@ class FlowFrameworkTests {
initiatingFlowClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P>
{
val observable = registerFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
val observable = internalRegisterFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): P {
return flowFactory(otherParty)
}