Merge pull request #789 from corda/shams-flow-testing-fixes

Fixed issue where Corda services installed in unit tests were not be…
This commit is contained in:
Shams Asari
2017-06-06 15:46:26 +01:00
committed by GitHub
23 changed files with 220 additions and 130 deletions

View File

@ -36,7 +36,6 @@ import okhttp3.Request
import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger
import java.io.File
import java.io.File.pathSeparator
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
@ -668,13 +667,19 @@ class DriverDSL(
debugPort: Int?,
overriddenSystemProperties: Map<String, String>
): ListenableFuture<Process> {
return executorService.submit<Process> {
// Get the package of the caller of the driver and pass this to the node for CorDapp scanning
val callerPackage = Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`.name }
val processFuture = executorService.submit<Process> {
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
"net.corda.node.cordapp.scan.package" to callerPackage,
"java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process
)
// TODO Add this once we upgrade to quasar 0.7.8, this causes startup time to halve.
@ -685,8 +690,6 @@ class DriverDSL(
"-javaagent:$quasarJarPath"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val pluginsDirectory = nodeConf.baseDirectory / "plugins"
ProcessUtilities.startJavaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf(
@ -694,14 +697,15 @@ class DriverDSL(
"--logging-level=$loggingLevel",
"--no-local-shell"
),
// Like the capsule, include the node's plugin directory
classpath = "${ProcessUtilities.defaultClassPath}$pathSeparator$pluginsDirectory/*",
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory
)
}.flatMap { process -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process } }
}
return processFuture.flatMap {
process -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process }
}
}
private fun startWebserver(

View File

@ -222,6 +222,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
serverThread,
database,
busyNodeLatch)
smm.tokenizableServices.addAll(tokenizableServices)
if (serverThread is ExecutorService) {
runOnStop += Runnable {
// We wait here, even though any in-flight messages should have been drained away because the
@ -240,10 +243,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
startMessagingService(rpcOps)
installCoreFlows()
val scanResult = scanCorDapps()
val scanResult = scanCordapps()
if (scanResult != null) {
val cordappServices = installCordaServices(scanResult)
tokenizableServices.addAll(cordappServices)
installCordaServices(scanResult)
registerInitiatedFlows(scanResult)
rpcFlows = findRPCFlows(scanResult)
} else {
@ -257,7 +259,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
runOnStop += Runnable { network.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
smm.start(tokenizableServices)
smm.start()
// Shut down the SMM so no Fibers are scheduled.
runOnStop += Runnable { smm.stop(acceptableLiveFiberCountOnStop()) }
scheduler.start()
@ -266,34 +268,37 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return this
}
private fun installCordaServices(scanResult: ScanResult): List<SerializeAsToken> {
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class).mapNotNull {
tryInstallCordaService(it)
private fun installCordaServices(scanResult: ScanResult) {
fun getServiceType(clazz: Class<*>): ServiceType? {
return try {
clazz.getField("type").get(null) as ServiceType
} catch (e: NoSuchFieldException) {
log.warn("${clazz.name} does not have a type field, optimistically proceeding with install.")
null
}
}
}
private fun <T : SerializeAsToken> tryInstallCordaService(serviceClass: Class<T>): T? {
/** TODO: This mechanism may get replaced by a different one, see comments on [CordaService]. */
val typeField = try {
serviceClass.getField("type")
} catch (e: NoSuchFieldException) {
null
}
if (typeField == null) {
log.warn("${serviceClass.name} does not have a type field, optimistically proceeding with install.")
} else if (info.serviceIdentities(typeField.get(null) as ServiceType).isEmpty()) {
return null
}
return try {
installCordaService(serviceClass)
} catch (e: NoSuchMethodException) {
log.error("${serviceClass.name}, as a Corda service, must have a constructor with a single parameter " +
"of type ${PluginServiceHub::class.java.name}")
null
} catch (e: Exception) {
log.error("Unable to install Corda service ${serviceClass.name}", e)
null
}
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
.filter {
val serviceType = getServiceType(it)
if (serviceType != null && info.serviceIdentities(serviceType).isEmpty()) {
log.debug { "Ignoring ${it.name} as a Corda service since $serviceType is not one of our " +
"advertised services" }
false
} else {
true
}
}
.forEach {
try {
installCordaService(it)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as a Corda service, must have a constructor with a single parameter " +
"of type ${PluginServiceHub::class.java.name}")
} catch (e: Exception) {
log.error("Unable to install Corda service ${it.name}", e)
}
}
}
/**
@ -305,6 +310,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val ctor = clazz.getDeclaredConstructor(PluginServiceHub::class.java).apply { isAccessible = true }
val service = ctor.newInstance(services)
cordappServices.putInstance(clazz, service)
smm.tokenizableServices += service
log.info("Installed ${clazz.name} Corda service")
return service
}
@ -371,16 +377,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
"${InitiatingFlow::class.java.name} must be annotated on ${initiatingFlow.name} and not on a super-type"
}
val flowFactory = InitiatedFlowFactory.CorDapp(version, { ctor.newInstance(it) })
val observable = registerFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
return observable
}
@VisibleForTesting
fun <F : FlowLogic<*>> registerFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
val observable = if (track) {
smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass)
} else {
@ -453,14 +459,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val tokenizableServices = mutableListOf(storage, network, vault, keyManagement, identity, platformClock, scheduler)
makeAdvertisedServices(tokenizableServices)
return tokenizableServices
}
private fun scanCorDapps(): ScanResult? {
private fun scanCordapps(): ScanResult? {
val scanPackage = System.getProperty("net.corda.node.cordapp.scan.package")
val paths = if (scanPackage != null) {
// This is purely for integration tests so that classes defined in the test can automatically be picked up
// Rather than looking in the plugins directory, figure out the classpath for the given package and scan that
// instead. This is used in tests where we avoid having to package stuff up in jars and then having to move
// them to the plugins directory for each node.
check(configuration.devMode) { "Package scanning can only occur in dev mode" }
val resource = scanPackage.replace('.', '/')
javaClass.classLoader.getResources(resource)
@ -545,7 +552,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun hasSSLCertificates(): Boolean {
val (sslKeystore, keystore) = try {
// This will throw IOException if key file not found or KeyStoreException if keystore password is incorrect.
Pair(KeyStoreUtilities.loadKeyStore(configuration.sslKeystore, configuration.keyStorePassword), KeyStoreUtilities.loadKeyStore(configuration.nodeKeystore, configuration.keyStorePassword))
Pair(
KeyStoreUtilities.loadKeyStore(configuration.sslKeystore, configuration.keyStorePassword),
KeyStoreUtilities.loadKeyStore(configuration.nodeKeystore, configuration.keyStorePassword))
} catch (e: IOException) {
return false
} catch (e: KeyStoreException) {

View File

@ -36,6 +36,7 @@ import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
@ -145,8 +146,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val openSessions = ConcurrentHashMap<Long, FlowSession>()
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
internal val tokenizableServices = ArrayList<Any>()
// Context for tokenized services in checkpoints
private lateinit var serializationContext: SerializeAsTokenContext
private val serializationContext by lazy {
SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
}
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@ -170,8 +174,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*/
val changes: Observable<Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
fun start(tokenizableServices: List<Any>) {
serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
fun start() {
restoreFibersFromCheckpoints()
listenToLedgerTransactions()
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
@ -348,7 +351,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val initiatedFlowFactory = serviceHub.getFlowFactory(sessionInit.initiatingFlowClass)
if (initiatedFlowFactory == null) {
logger.warn("${sessionInit.initiatingFlowClass} has not been registered: $sessionInit")
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered with a service flow")
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered")
return
}

View File

@ -18,40 +18,57 @@ import net.corda.core.utilities.unwrap
import net.corda.node.driver.driver
import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
class CordappScanningTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
private val factory = NodeProcess.Factory()
private val aliceConfig = NodeConfig(
party = ALICE,
p2pPort = port.andIncrement,
rpcPort = port.andIncrement,
webPort = port.andIncrement,
extraServices = emptyList(),
users = listOf(user)
)
@Test
fun `CorDapp jar in plugins directory is scanned`() {
// If the CorDapp jar does't exist then run the integrationTestClasses gradle task
// If the CorDapp jar does't exist then run the smokeTestClasses gradle task
val cordappJar = Paths.get(javaClass.getResource("/trader-demo.jar").toURI())
driver {
val pluginsDir = (baseDirectory(ALICE.name) / "plugins").createDirectories()
cordappJar.copyToDirectory(pluginsDir)
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
cordappJar.copyToDirectory(pluginsDir)
val user = User("u", "p", emptySet())
val alice = startNode(ALICE.name, rpcUsers = listOf(user)).getOrThrow()
val rpc = alice.rpcClientToNode().start(user.username, user.password)
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
assertThat(rpc.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
factory.create(aliceConfig).use {
it.connect().use {
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
assertThat(it.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
}
}
}
@Test
fun `empty plugins directory`() {
driver {
val baseDirectory = baseDirectory(ALICE.name)
(baseDirectory / "plugins").createDirectories()
startNode(ALICE.name).getOrThrow()
}
(factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
factory.create(aliceConfig).close()
}
@Test
fun `sub-classed initiated flow pointing to the same initiating flow as its super-class`() {
val user = User("u", "p", setOf(startFlowPermission<ReceiveFlow>()))
driver(systemProperties = mapOf("net.corda.node.cordapp.scan.package" to "net.corda.node")) {
// We don't use the factory for this test because we want the node to pick up the annotated flows below. The driver
// will do just that.
driver {
val (alice, bob) = Futures.allAsList(
startNode(ALICE.name, rpcUsers = listOf(user)),
startNode(BOB.name)).getOrThrow()

View File

@ -99,7 +99,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
smmHasRemovedAllFlows.countDown()
}
}
mockSMM.start(listOf(services, scheduler))
mockSMM.start()
services.smm = mockSMM
scheduler.start()
}
@ -124,7 +124,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? = ScheduledActivity(flowLogicRef, instant)
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
return ScheduledActivity(flowLogicRef, instant)
}
override val contract: Contract
get() = throw UnsupportedOperationException()

View File

@ -625,7 +625,7 @@ class FlowFrameworkTests {
@Test
fun `unsupported new flow version`() {
node2.registerFlowFactory(
node2.internalRegisterFlowFactory(
UpgradedFlow::class.java,
InitiatedFlowFactory.CorDapp(version = 1, factory = ::DoubleInlinedSubFlow),
DoubleInlinedSubFlow::class.java,
@ -675,7 +675,7 @@ class FlowFrameworkTests {
initiatingFlowClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P>
{
val observable = registerFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
val observable = internalRegisterFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): P {
return flowFactory(otherParty)
}