diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt index 1a0be3d57f..224a545e0c 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt @@ -49,7 +49,7 @@ class NodeInfoWatcherTest { fun start() { val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT) keyManagementService = MockKeyManagementService(identityService, ALICE_KEY) - nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler) + nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler) nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 453afefea0..5a2e6785ad 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry import org.slf4j.Logger import rx.Observable +import rx.Scheduler import java.io.IOException import java.lang.reflect.InvocationTargetException import java.security.KeyPair @@ -225,7 +226,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } val networkMapUpdater = NetworkMapUpdater(services.networkMapCache, - NodeInfoWatcher(configuration.baseDirectory, Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), + NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), networkMapClient) runOnStop += networkMapUpdater::close @@ -251,6 +252,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } + /** + * Should be [rx.schedulers.Schedulers.io] for production, + * or [rx.internal.schedulers.CachedThreadScheduler] (with shutdown registered with [runOnStop]) for shared-JVM testing. + */ + protected abstract fun getRxIoScheduler(): Scheduler + open fun startShell(rpcOps: CordaRPCOps) { InteractiveShell.startShell(configuration, rpcOps, userService, _services.identityService, _services.database) } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 71f3ae581a..2ddba4a393 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -33,6 +33,7 @@ import net.corda.nodeapi.internal.serialization.* import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import org.slf4j.Logger import org.slf4j.LoggerFactory +import rx.schedulers.Schedulers import java.time.Clock import java.util.concurrent.atomic.AtomicInteger import javax.management.ObjectName @@ -46,7 +47,7 @@ import kotlin.system.exitProcess */ open class Node(configuration: NodeConfiguration, versionInfo: VersionInfo, - val initialiseSerialization: Boolean = true, + private val initialiseSerialization: Boolean = true, cordappLoader: CordappLoader = makeCordappLoader(configuration) ) : AbstractNode(configuration, createClock(configuration), versionInfo, cordappLoader) { companion object { @@ -293,6 +294,7 @@ open class Node(configuration: NodeConfiguration, return started } + override fun getRxIoScheduler() = Schedulers.io()!! private fun initialiseSerialization() { val classloader = cordappLoader.appClassLoader nodeSerializationEnv = SerializationEnvironmentImpl( diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt index 76fcf6c472..df583c4ffe 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -12,7 +12,6 @@ import net.corda.core.utilities.seconds import net.corda.nodeapi.NodeInfoFilesCopier import rx.Observable import rx.Scheduler -import rx.schedulers.Schedulers import java.io.IOException import java.nio.file.Path import java.time.Duration @@ -31,9 +30,8 @@ import kotlin.streams.toList */ // TODO: Use NIO watch service instead? class NodeInfoWatcher(private val nodePath: Path, - private val pollInterval: Duration = 5.seconds, - private val scheduler: Scheduler = Schedulers.io()) { - + private val scheduler: Scheduler, + private val pollInterval: Duration = 5.seconds) { private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY private val processedNodeInfoFiles = mutableSetOf() private val _processedNodeInfoHashes = mutableSetOf() diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 3390d32968..f4c4e4ee41 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -51,7 +51,7 @@ class NetworkMapUpdaterTest { val networkMapClient = mock() val scheduler = TestScheduler() - val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val fileWatcher = NodeInfoWatcher(baseDir, scheduler) val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient) // Publish node info for the first time. @@ -100,7 +100,7 @@ class NetworkMapUpdaterTest { } val scheduler = TestScheduler() - val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val fileWatcher = NodeInfoWatcher(baseDir, scheduler) val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient) // Test adding new node. @@ -154,7 +154,7 @@ class NetworkMapUpdaterTest { } val scheduler = TestScheduler() - val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val fileWatcher = NodeInfoWatcher(baseDir, scheduler) val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient) // Add all nodes. @@ -198,7 +198,7 @@ class NetworkMapUpdaterTest { val networkMapCache = getMockNetworkMapCache() val scheduler = TestScheduler() - val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val fileWatcher = NodeInfoWatcher(baseDir, scheduler) val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null) // Not subscribed yet. diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 7df5649657..e46742f885 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -24,7 +24,6 @@ import net.corda.core.utilities.* import net.corda.node.internal.Node import net.corda.node.internal.NodeStartup import net.corda.node.internal.StartedNode -import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.config.* import net.corda.node.utilities.ServiceIdentityGenerator @@ -33,6 +32,7 @@ import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.toConfig import net.corda.nodeapi.internal.addShutdownHook import net.corda.testing.* +import net.corda.testing.internal.InProcessNode import net.corda.testing.internal.ProcessUtilities import net.corda.testing.node.ClusterSpec import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO @@ -940,12 +940,7 @@ class DriverDSL( // Write node.conf writeConfig(nodeConf.baseDirectory, "node.conf", config) // TODO pass the version in? - val node = Node( - nodeConf, - MOCK_VERSION_INFO, - initialiseSerialization = false, - cordappLoader = CordappLoader.createDefaultWithTestPackages(nodeConf, cordappPackages)) - .start() + val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start() val nodeThread = thread(name = nodeConf.myLegalName.organisation) { node.internals.run() } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt index ac5b735dcc..061974adf6 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt @@ -7,9 +7,11 @@ import net.corda.core.internal.createDirectories import net.corda.core.internal.div import net.corda.core.node.NodeInfo import net.corda.core.utilities.getOrThrow +import net.corda.node.VersionInfo import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.node.internal.cordapp.CordappLoader +import net.corda.node.services.config.* import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.configOf import net.corda.node.services.config.parseAsNodeConfiguration @@ -18,11 +20,12 @@ import net.corda.nodeapi.internal.config.User import net.corda.testing.SerializationEnvironmentRule import net.corda.testing.driver.addressMustNotBeBoundFuture import net.corda.testing.getFreeLocalPorts -import net.corda.testing.node.MockServices +import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import org.apache.logging.log4j.Level import org.junit.After import org.junit.Rule import org.junit.rules.TemporaryFolder +import rx.internal.schedulers.CachedThreadScheduler import java.nio.file.Path import java.util.concurrent.Executors import kotlin.concurrent.thread @@ -90,11 +93,7 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi ) val parsedConfig = config.parseAsNodeConfiguration() - val node = Node( - parsedConfig, - MockServices.MOCK_VERSION_INFO.copy(platformVersion = platformVersion), - initialiseSerialization = false, - cordappLoader = CordappLoader.createDefaultWithTestPackages(parsedConfig, cordappPackages)).start() + val node = InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages).start() nodes += node ensureAllNetworkMapCachesHaveAllNodeInfos() thread(name = legalName.organisation) { @@ -117,3 +116,9 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi } } } + +class InProcessNode( + configuration: NodeConfiguration, versionInfo: VersionInfo, cordappPackages: List) : Node( + configuration, versionInfo, false, CordappLoader.createDefaultWithTestPackages(configuration, cordappPackages)) { + override fun getRxIoScheduler() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown } +} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index d2452cab8a..2b9a966e3a 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -38,12 +38,14 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.DUMMY_NOTARY +import net.corda.testing.internal.testThreadFactory import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.setGlobalSerialization import net.corda.testing.testNodeConfiguration import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.sshd.common.util.security.SecurityUtils +import rx.internal.schedulers.CachedThreadScheduler import java.math.BigInteger import java.nio.file.Path import java.security.KeyPair @@ -245,6 +247,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete return started } + override fun getRxIoScheduler() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown } private fun advertiseNodeToNetwork(newNode: StartedNode) { mockNet.nodes .mapNotNull { it.started }