Use non-static Rx pool when testing. (#2165)

This commit is contained in:
Andrzej Cichocki 2017-12-04 12:41:43 +00:00 committed by GitHub
parent 5264072752
commit d2f66acff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 34 additions and 24 deletions

View File

@ -49,7 +49,7 @@ class NodeInfoWatcherTest {
fun start() {
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler)
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler)
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
}

View File

@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
import org.slf4j.Logger
import rx.Observable
import rx.Scheduler
import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.security.KeyPair
@ -225,7 +226,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
val networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
networkMapClient)
runOnStop += networkMapUpdater::close
@ -251,6 +252,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
/**
* Should be [rx.schedulers.Schedulers.io] for production,
* or [rx.internal.schedulers.CachedThreadScheduler] (with shutdown registered with [runOnStop]) for shared-JVM testing.
*/
protected abstract fun getRxIoScheduler(): Scheduler
open fun startShell(rpcOps: CordaRPCOps) {
InteractiveShell.startShell(configuration, rpcOps, userService, _services.identityService, _services.database)
}

View File

@ -33,6 +33,7 @@ import net.corda.nodeapi.internal.serialization.*
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.schedulers.Schedulers
import java.time.Clock
import java.util.concurrent.atomic.AtomicInteger
import javax.management.ObjectName
@ -46,7 +47,7 @@ import kotlin.system.exitProcess
*/
open class Node(configuration: NodeConfiguration,
versionInfo: VersionInfo,
val initialiseSerialization: Boolean = true,
private val initialiseSerialization: Boolean = true,
cordappLoader: CordappLoader = makeCordappLoader(configuration)
) : AbstractNode(configuration, createClock(configuration), versionInfo, cordappLoader) {
companion object {
@ -293,6 +294,7 @@ open class Node(configuration: NodeConfiguration,
return started
}
override fun getRxIoScheduler() = Schedulers.io()!!
private fun initialiseSerialization() {
val classloader = cordappLoader.appClassLoader
nodeSerializationEnv = SerializationEnvironmentImpl(

View File

@ -12,7 +12,6 @@ import net.corda.core.utilities.seconds
import net.corda.nodeapi.NodeInfoFilesCopier
import rx.Observable
import rx.Scheduler
import rx.schedulers.Schedulers
import java.io.IOException
import java.nio.file.Path
import java.time.Duration
@ -31,9 +30,8 @@ import kotlin.streams.toList
*/
// TODO: Use NIO watch service instead?
class NodeInfoWatcher(private val nodePath: Path,
private val pollInterval: Duration = 5.seconds,
private val scheduler: Scheduler = Schedulers.io()) {
private val scheduler: Scheduler,
private val pollInterval: Duration = 5.seconds) {
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val processedNodeInfoFiles = mutableSetOf<Path>()
private val _processedNodeInfoHashes = mutableSetOf<SecureHash>()

View File

@ -51,7 +51,7 @@ class NetworkMapUpdaterTest {
val networkMapClient = mock<NetworkMapClient>()
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
// Publish node info for the first time.
@ -100,7 +100,7 @@ class NetworkMapUpdaterTest {
}
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
// Test adding new node.
@ -154,7 +154,7 @@ class NetworkMapUpdaterTest {
}
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
// Add all nodes.
@ -198,7 +198,7 @@ class NetworkMapUpdaterTest {
val networkMapCache = getMockNetworkMapCache()
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null)
// Not subscribed yet.

View File

@ -24,7 +24,6 @@ import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.config.*
import net.corda.node.utilities.ServiceIdentityGenerator
@ -33,6 +32,7 @@ import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.*
import net.corda.testing.internal.InProcessNode
import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
@ -940,12 +940,7 @@ class DriverDSL(
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
// TODO pass the version in?
val node = Node(
nodeConf,
MOCK_VERSION_INFO,
initialiseSerialization = false,
cordappLoader = CordappLoader.createDefaultWithTestPackages(nodeConf, cordappPackages))
.start()
val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = nodeConf.myLegalName.organisation) {
node.internals.run()
}

View File

@ -7,9 +7,11 @@ import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.getOrThrow
import net.corda.node.VersionInfo
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.config.*
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.configOf
import net.corda.node.services.config.parseAsNodeConfiguration
@ -18,11 +20,12 @@ import net.corda.nodeapi.internal.config.User
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.driver.addressMustNotBeBoundFuture
import net.corda.testing.getFreeLocalPorts
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import org.apache.logging.log4j.Level
import org.junit.After
import org.junit.Rule
import org.junit.rules.TemporaryFolder
import rx.internal.schedulers.CachedThreadScheduler
import java.nio.file.Path
import java.util.concurrent.Executors
import kotlin.concurrent.thread
@ -90,11 +93,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
)
val parsedConfig = config.parseAsNodeConfiguration()
val node = Node(
parsedConfig,
MockServices.MOCK_VERSION_INFO.copy(platformVersion = platformVersion),
initialiseSerialization = false,
cordappLoader = CordappLoader.createDefaultWithTestPackages(parsedConfig, cordappPackages)).start()
val node = InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages).start()
nodes += node
ensureAllNetworkMapCachesHaveAllNodeInfos()
thread(name = legalName.organisation) {
@ -117,3 +116,9 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
}
}
class InProcessNode(
configuration: NodeConfiguration, versionInfo: VersionInfo, cordappPackages: List<String>) : Node(
configuration, versionInfo, false, CordappLoader.createDefaultWithTestPackages(configuration, cordappPackages)) {
override fun getRxIoScheduler() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown }
}

View File

@ -38,12 +38,14 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.setGlobalSerialization
import net.corda.testing.testNodeConfiguration
import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.sshd.common.util.security.SecurityUtils
import rx.internal.schedulers.CachedThreadScheduler
import java.math.BigInteger
import java.nio.file.Path
import java.security.KeyPair
@ -245,6 +247,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
return started
}
override fun getRxIoScheduler() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown }
private fun advertiseNodeToNetwork(newNode: StartedNode<MockNode>) {
mockNet.nodes
.mapNotNull { it.started }