Merge branch 'master' into andrius-merge-02-26

This commit is contained in:
Andrius Dagys 2018-02-26 18:41:20 +00:00
commit 9814ea6f34
15 changed files with 399 additions and 54 deletions

View File

@ -1,26 +0,0 @@
package net.corda.test.node
import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.node.internal.NodeBasedTest
import org.junit.ClassRule
import org.junit.Test
class NodeStartAndStopTest : NodeBasedTest() {
companion object {
@ClassRule @JvmField
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName())
}
@Test
fun `start stop start`() {
val node = startNode(ALICE_NAME)
node.internals.startupComplete.get()
node.internals.stop()
node.internals.start()
node.internals.startupComplete.getOrThrow()
}
}

View File

@ -0,0 +1,226 @@
package net.corda.test.node
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.services.Permissions
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions
import org.junit.Test
import java.util.*
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class NodeSuspendAndResumeTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(
Permissions.startFlow<CashIssueFlow>(),
Permissions.startFlow<CashPaymentFlow>(),
Permissions.invokeRpc("vaultQueryBy"),
Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed),
Permissions.invokeRpc("vaultQueryByCriteria"))
)
@Test
fun `start suspend resume`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
for (i in 1..10) {
node.suspend()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
}
}
@Test
fun `start suspend resume issuing cash`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
for (i in 1..10) {
node.suspend()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals((123 * i).DOLLARS, currentCashAmount)
}
}
@Test
fun `cash not issued when suspended`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
var currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
}
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `initialise node without starting`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
// The node hasn't been started yet
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.start()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `resume called on node not previously started`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
// will start the node
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `resume called when node not suspended`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
node.stop()
node.resume()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `resume called on started node`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
node.start()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `suspend called when node not started`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
node.stop()
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
}
private fun issueCash(node: Node, party: Party) {
val client = CordaRPCClient(node.configuration.rpcOptions.address!!)
val connection = client.start(rpcUser.username, rpcUser.password)
val proxy = connection.proxy
val flowHandle = proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), party)
println("Started issuing cash, waiting on result")
flowHandle.returnValue.get()
val cashDollars = proxy.getCashBalance(USD)
println("Balance: $cashDollars")
connection.close()
}
private fun getCashBalance(node: Node): Amount<Currency> {
val client = CordaRPCClient(node.configuration.rpcOptions.address!!)
val connection = client.start(rpcUser.username, rpcUser.password)
val proxy = connection.proxy
val cashBalance = proxy.getCashBalance(USD)
connection.close()
return cashBalance
}
}

View File

@ -131,6 +131,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl
protected lateinit var smm: StateMachineManager
protected lateinit var schedulerService: NodeSchedulerService
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
private lateinit var tokenizableServices: List<Any>
@ -232,10 +234,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
}
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
val schedulerService = NodeSchedulerService(
schedulerService = NodeSchedulerService(
platformClock,
database,
flowStarter,

View File

@ -2,6 +2,7 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.div
@ -389,6 +390,24 @@ open class Node(configuration: NodeConfiguration,
return started
}
/**
* Resume services stopped after [suspend].
*/
fun resume() {
if (started == null) {
start()
} else if (suspended) {
bridgeControlListener?.start()
rpcMessagingClient?.resume(started!!.rpcOps, securityManager)
(network as P2PMessagingClient).start()
started!!.database.transaction {
smm.resume()
schedulerService.resume()
}
suspended = false
}
}
override fun getRxIoScheduler(): Scheduler = Schedulers.io()
private fun initialiseSerialization() {
@ -407,6 +426,7 @@ open class Node(configuration: NodeConfiguration,
private var rpcMessagingClient: RPCMessagingClient? = null
private var verifierMessagingClient: VerifierMessagingClient? = null
/** Starts a blocking event loop for message dispatch. */
fun run() {
rpcMessagingClient?.start2(rpcBroker!!.serverControl)
@ -436,4 +456,20 @@ open class Node(configuration: NodeConfiguration,
log.info("Shutdown complete")
}
private var suspended = false
/**
* Suspend the minimum number of services([schedulerService], [smm], [network], [rpcMessagingClient], and [bridgeControlListener]).
*/
fun suspend() {
if(started != null && !suspended) {
schedulerService.stop()
smm.stop(0)
(network as P2PMessagingClient).stop()
rpcMessagingClient?.stop()
bridgeControlListener?.stop()
suspended = true
}
}
}

View File

@ -13,6 +13,11 @@ interface RPCSecurityManager : AutoCloseable {
*/
val id: AuthServiceId
/**
* Resume
*/
fun resume()
/**
* Perform user authentication from principal and password. Return an [AuthorizingSubject] containing
* the permissions of the user identified by the given [principal] if authentication via password succeeds,

View File

@ -34,15 +34,20 @@ private typealias AuthServiceConfig = SecurityConfiguration.AuthService
* Default implementation of [RPCSecurityManager] adapting
* [org.apache.shiro.mgt.SecurityManager]
*/
class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager {
class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurityManager {
override val id = config.id
private val manager: DefaultSecurityManager
private var manager: DefaultSecurityManager
init {
manager = buildImpl(config)
}
override fun resume() {
close()
manager = buildImpl(config)
}
@Throws(FailedLoginException::class)
override fun authenticate(principal: String, password: Password): AuthorizingSubject {
password.use {
@ -75,9 +80,12 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager {
/**
* Instantiate RPCSecurityManager initialised with users data from a list of [User]
*/
fun fromUserList(id: AuthServiceId, users: List<User>) =
RPCSecurityManagerImpl(
fun fromUserList(id: AuthServiceId, users: List<User>): RPCSecurityManagerImpl {
val rpcSecurityManagerImpl = RPCSecurityManagerImpl(
AuthServiceConfig.fromUsers(users).copy(id = id))
return rpcSecurityManagerImpl
}
// Build internal Shiro securityManager instance
private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager {

View File

@ -171,6 +171,29 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
/**
* Stop scheduler service.
*/
fun stop() {
mutex.locked {
schedulerTimerExecutor.shutdown()
scheduledStatesQueue.clear()
scheduledStates.clear()
}
}
/**
* Resume scheduler service after having called [stop].
*/
fun resume() {
mutex.locked {
schedulerTimerExecutor = Executors.newSingleThreadExecutor()
scheduledStates.putAll(createMap())
scheduledStatesQueue.addAll(scheduledStates.values)
rescheduleWakeUp()
}
}
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
val previousState = scheduledStates[action.ref]
@ -210,7 +233,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
private var schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new

View File

@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
@ -357,6 +358,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val shutdownLatch = CountDownLatch(1)
var runningFuture = openFuture<Unit>()
/**
* Starts the p2p event loop: this method only returns once [stop] has been called.
*/
@ -367,6 +370,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
runningFuture.set(Unit)
// If it's null, it means we already called stop, so return immediately.
if (p2pConsumer == null) {
return
@ -479,6 +483,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started)
val prevRunning = running
running = false
runningFuture = openFuture()
networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, {"stop can't be called twice"})
require(producer != null, {"stop can't be called twice"})

View File

@ -30,6 +30,11 @@ class RPCMessagingClient(
rpcServer!!.start(serverControl)
}
fun resume(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) {
start(rpcOps, securityManager)
securityManager.resume()
}
fun stop() = synchronized(this) {
rpcServer?.close()
artemis.stop()

View File

@ -138,6 +138,19 @@ class MultiThreadedStateMachineManager(
lifeCycle.transition(State.UNSTARTED, State.STARTED)
}
override fun resume() {
lifeCycle.requireState(State.STOPPED)
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
lifeCycle.transition(State.STOPPED, State.STARTED)
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return concurrentBox.content.flows.values.mapNotNull {
flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture }

View File

@ -134,6 +134,20 @@ class SingleThreadedStateMachineManager(
}
}
override fun resume() {
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
mutex.locked {
stopping = false
}
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked {
flows.values.mapNotNull {

View File

@ -38,6 +38,11 @@ interface StateMachineManager {
*/
fun stop(allowedUnsuspendedFiberCount: Int)
/**
* Resume state machine manager after having called [stop].
*/
fun resume()
/**
* Starts a new flow.
*

View File

@ -89,6 +89,7 @@ class FiberDeserializationChecker {
fun stop(): Boolean {
jobQueue.add(Job.Finish)
checkerThread?.join()
checkerThread = null
return foundUnrestorableFibers
}
}

View File

@ -5,15 +5,29 @@ import net.corda.core.internal.div
import net.corda.nodeapi.internal.crypto.X509KeyStore
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.BeforeClass
import org.junit.Test
import org.slf4j.event.Level
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
class ArgsParserTest {
private val parser = ArgsParser()
private val workingDirectory = Paths.get(".").normalize().toAbsolutePath()
companion object {
private lateinit var workingDirectory: Path
private lateinit var buildDirectory: Path
@BeforeClass
@JvmStatic
fun initDirectories() {
workingDirectory = Paths.get(".").normalize().toAbsolutePath()
buildDirectory = workingDirectory.resolve("build")
}
}
@Test
fun `no command line arguments`() {
@ -113,17 +127,21 @@ class ArgsParserTest {
@Test
fun `initial-registration`() {
val truststorePath = workingDirectory / "truststore" / "file.jks"
// Create this temporary file in the "build" directory so that "clean" can delete it.
val truststorePath = buildDirectory / "truststore" / "file.jks"
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
parser.parse("--initial-registration", "--network-root-truststore", "$truststorePath", "--network-root-truststore-password", "password-test")
}.withMessageContaining("Network root trust store path").withMessageContaining("doesn't exist")
X509KeyStore.fromFile(truststorePath, "dummy_password", createNew = true)
try {
val cmdLineOptions = parser.parse("--initial-registration", "--network-root-truststore", "$truststorePath", "--network-root-truststore-password", "password-test")
assertNotNull(cmdLineOptions.nodeRegistrationConfig)
assertEquals(truststorePath.toAbsolutePath(), cmdLineOptions.nodeRegistrationConfig?.networkRootTrustStorePath)
assertEquals("password-test", cmdLineOptions.nodeRegistrationConfig?.networkRootTrustStorePassword)
} finally {
Files.delete(truststorePath)
}
}
@Test

View File

@ -46,7 +46,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
val tempFolder = TemporaryFolder()
private lateinit var defaultNetworkParameters: NetworkParametersCopier
private val nodes = mutableListOf<StartedNode<Node>>()
private val startedNodes = mutableListOf<StartedNode<Node>>()
private val nodeInfos = mutableListOf<NodeInfo>()
init {
@ -64,17 +64,17 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
*/
@After
fun stopAllNodes() {
val shutdownExecutor = Executors.newScheduledThreadPool(nodes.size)
val shutdownExecutor = Executors.newScheduledThreadPool(startedNodes.size)
try {
nodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow()
startedNodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow()
// Wait until ports are released
val portNotBoundChecks = nodes.flatMap {
val portNotBoundChecks = startedNodes.flatMap {
listOf(
it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) },
it.internals.configuration.rpcOptions.address?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }
)
}.filterNotNull()
nodes.clear()
startedNodes.clear()
portNotBoundChecks.transpose().getOrThrow()
} finally {
shutdownExecutor.shutdown()
@ -82,10 +82,10 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
@JvmOverloads
fun startNode(legalName: CordaX500Name,
fun initNode(legalName: CordaX500Name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
configOverrides: Map<String, Any> = emptyMap()): Node {
val baseDirectory = baseDirectory(legalName).createDirectories()
val localPort = getFreeLocalPorts("localhost", 3)
val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString()
@ -109,14 +109,24 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
}
defaultNetworkParameters.install(baseDirectory)
val node = InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages).start()
nodes += node
ensureAllNetworkMapCachesHaveAllNodeInfos()
thread(name = legalName.organisation) {
node.internals.run()
return InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages)
}
return node
@JvmOverloads
fun startNode(legalName: CordaX500Name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
val node = initNode(legalName,platformVersion, rpcUsers,configOverrides)
val startedNode = node.start()
startedNodes += startedNode
ensureAllNetworkMapCachesHaveAllNodeInfos()
thread(name = legalName.organisation) {
node.run()
}
return startedNode
}
protected fun baseDirectory(legalName: CordaX500Name): Path {
@ -124,7 +134,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
private fun ensureAllNetworkMapCachesHaveAllNodeInfos() {
val runningNodes = nodes.filter { it.internals.started != null }
val runningNodes = startedNodes.filter { it.internals.started != null }
val runningNodesInfo = runningNodes.map { it.info }
for (node in runningNodes)
for (nodeInfo in runningNodesInfo) {