mirror of
synced 2025-03-11 06:54:04 +00:00
CORDA-990 - Flows Draining Mode (#2507)
* [CORDA-990]: Flows Draining Mode * Fixed incorrect disconnect login. * Code review changes. * Fixed compilation error about MockNetwork... * Fixed compilation error about MockNetwork... * Fixed broken compilation. * Ignoring an unstable test.
This commit is contained in:
@ -0,0 +1,114 @@
package net.corda.client.rpc
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.internal.packageName
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.getOrThrow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.chooseIdentity
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Assume.assumeFalse
import org.junit.Before
import org.junit.Test
class FlowsExecutionModeRpcTest {
fun `persistent state survives node restart`() {
// Temporary disable this test when executed on Windows. It is known to be sporadically failing.
// More investigation is needed to establish why.
val user = User("mark", "dadada", setOf(invokeRpc("setFlowsDrainingModeEnabled"), invokeRpc("isFlowsDrainingModeEnabled")))
driver(isDebug = true, startNodesInProcess = true) {
val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.chooseIdentity().name
nodeHandle.rpc.run {
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user)).getOrThrow()
val result = nodeHandle.rpc.run {
class FlowsExecutionModeTests : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
private lateinit var node: StartedNode<Node>
private lateinit var client: CordaRPCClient
fun setup() {
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!)
fun `flows draining mode can be enabled and queried`() {
asALoggerUser { rpcOps ->
val newValue = true
val flowsExecutionMode = rpcOps.isFlowsDrainingModeEnabled()
fun `flows draining mode can be disabled and queried`() {
asALoggerUser { rpcOps ->
val newValue = false
val flowsExecutionMode = rpcOps.isFlowsDrainingModeEnabled()
fun `node starts with flows draining mode disabled`() {
asALoggerUser { rpcOps ->
val defaultStartingMode = rpcOps.isFlowsDrainingModeEnabled()
private fun login(username: String, password: String, externalTrace: Trace? = null, impersonatedActor: Actor? = null): CordaRPCConnection {
return client.start(username, password, externalTrace, impersonatedActor)
private fun asALoggerUser(action: (CordaRPCOps) -> Unit) {
login(rpcUser.username, rpcUser.password).use {
@ -357,6 +357,21 @@ interface CordaRPCOps : RPCOps {
/** Clear all network map data from local node cache. */
/** Clear all network map data from local node cache. */
fun clearNetworkMapCache()
fun clearNetworkMapCache()
/** Sets the value of the node's flows draining mode.
* If this mode is [enabled], the node will reject new flows through RPC, ignore scheduled flows, and do not process
* initial session messages, meaning that P2P counter-parties will not be able to initiate new flows involving the node.
* @param enabled whether the flows draining mode will be enabled.
* */
fun setFlowsDrainingModeEnabled(enabled: Boolean)
* Returns whether the flows draining mode is enabled.
* @see setFlowsDrainingModeEnabled
fun isFlowsDrainingModeEnabled(): Boolean
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),
@ -6,6 +6,10 @@ from the previous milestone release.
* Introduced Flow Draining mode, in which a node continues executing existing flows, but does not start new. This is to support graceful node shutdown/restarts.
In particular, when this mode is on, new flows through RPC will be rejected, scheduled flows will be ignored, and initial session messages will not be consumed.
This will ensure that the number of checkpoints will strictly diminish with time, allowing for a clean shutdown.
* Removed blacklisted word checks in Corda X.500 name to allow "Server" or "Node" to be use as part of the legal name.
* Removed blacklisted word checks in Corda X.500 name to allow "Server" or "Node" to be use as part of the legal name.
* Separated our pre-existing Artemis broker into an RPC broker and a P2P broker.
* Separated our pre-existing Artemis broker into an RPC broker and a P2P broker.
@ -80,7 +80,7 @@ class TutorialMockNetwork {
// modify message if it's 1
// modify message if it's 1
nodeB.setMessagingServiceSpy(object : MessagingServiceSpy(nodeB.network) {
nodeB.setMessagingServiceSpy(object : MessagingServiceSpy(nodeB.network) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map<String, String>) {
val messageData = message.data.deserialize<Any>() as? ExistingSessionMessage
val messageData = message.data.deserialize<Any>() as? ExistingSessionMessage
val payload = messageData?.payload
val payload = messageData?.payload
if (payload is DataSessionMessage && payload.payload.deserialize() == 1) {
if (payload is DataSessionMessage && payload.payload.deserialize() == 1) {
@ -76,4 +76,18 @@ The node also has several CorDapps installed by default to handle common tasks s
* Retrieving transactions and attachments from counterparties
* Retrieving transactions and attachments from counterparties
* Upgrading contracts
* Upgrading contracts
* Broadcasting agreed ledger updates for recording by counterparties
* Broadcasting agreed ledger updates for recording by counterparties
Draining mode
In order to operate a clean shutdown of a node, it is important than no flows are in-flight, meaning no checkpoints should
be persisted. The node is able to be put in a Flows Draining Mode, during which:
* Commands requiring to start new flows through RPC will be rejected.
* Scheduled flows due will be ignored.
* Initial P2P session messages will not be processed, meaning peers will not be able to initiate new flows involving the node.
* All other activities will proceed as usual, ensuring that the number of in-flight flows will strictly diminish.
As their number - which can be monitored through RPC - reaches zero, it is safe to shut the node down.
This property is durable, meaning that restarting the node will not reset it to its default value and that a RPC command is required.
@ -10,7 +10,7 @@ See [here](../../docs/source/corda-api.rst) for Corda's public API strategy. We
apply this plugin to other modules in future Corda releases as those modules' APIs stabilise.
apply this plugin to other modules in future Corda releases as those modules' APIs stabilise.
Basically, this plugin will document a module's `public` and `protected` classes/methods/fields,
Basically, this plugin will document a module's `public` and `protected` classes/methods/fields,
excluding those from our `*.internal.*` packgages, any synthetic methods, bridge methods, or methods
excluding those from our `*.internal.*` packages, any synthetic methods, bridge methods, or methods
identified as having Kotlin's `internal` scope. (Kotlin doesn't seem to have implemented `internal`
identified as having Kotlin's `internal` scope. (Kotlin doesn't seem to have implemented `internal`
scope for classes or fields yet as these are currently `public` inside the `.class` file.)
scope for classes or fields yet as these are currently `public` inside the `.class` file.)
@ -0,0 +1,8 @@
package net.corda.nodeapi.exceptions
import net.corda.core.CordaRuntimeException
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
class RejectedCommandException(msg: String) : CordaRuntimeException(msg)
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertArrayEquals
import org.junit.Ignore
import org.junit.Rule
import org.junit.Rule
import org.junit.Test
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.junit.rules.TemporaryFolder
@ -44,6 +45,7 @@ class AMQPBridgeTest {
private abstract class AbstractNodeConfiguration : NodeConfiguration
private abstract class AbstractNodeConfiguration : NodeConfiguration
fun `test acked and nacked messages`() {
fun `test acked and nacked messages`() {
// Create local queue
// Create local queue
@ -0,0 +1,100 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.core.chooseIdentity
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.fail
class P2PFlowsDrainingModeTest {
private val portAllocation = PortAllocation.Incremental(10000)
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
private var executor: ExecutorService? = null
companion object {
private val logger = loggerFor<P2PFlowsDrainingModeTest>()
fun setup() {
executor = Executors.newSingleThreadExecutor()
fun cleanUp() {
fun `flows draining mode suspends consumption of initial session messages`() {
driver(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation) {
val initiatedNode = startNode().getOrThrow()
val initiating = startNode(rpcUsers = users).getOrThrow().rpc
val counterParty = initiatedNode.nodeInfo.chooseIdentity()
val initiated = initiatedNode.rpc
var shouldFail = true
initiating.apply {
val flow = startFlow(::InitiateSessionFlow, counterParty)
// this should be really fast, for the flow has already started, so 5 seconds should never be a problem
logger.info("Now disabling flows draining mode for $counterParty.")
shouldFail = false
flow.returnValue.map { result ->
if (shouldFail) {
fail("Shouldn't happen until flows draining mode is switched off.")
} else {
assertThat(result).isEqualTo("Hi there answer")
class InitiateSessionFlow(private val counterParty: Party) : FlowLogic<String>() {
override fun call(): String {
val session = initiateFlow(counterParty)
session.send("Hi there")
return session.receive<String>().unwrap { it }
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
override fun call() {
val message = initiatingSession.receive<String>().unwrap { it }
initiatingSession.send("$message answer")
@ -0,0 +1,49 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.Permissions
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.catchThrowable
import org.junit.Test
class RpcFlowsDrainingModeTest {
private val portAllocation = PortAllocation.Incremental(10000)
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
fun `flows draining mode rejects start flows commands through rpc`() {
driver(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation) {
startNode(rpcUsers = users).getOrThrow().rpc.apply {
val error: Throwable? = catchThrowable { startFlow(RpcFlowsDrainingModeTest::NoOpFlow) }
class NoOpFlow : FlowLogic<Unit>() {
override fun call() {
println("NO OP!")
@ -207,7 +207,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache, nodeProperties)
val notaryService = makeNotaryService(nodeServices, database)
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
@ -219,7 +220,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
unfinishedSchedules = busyNodeLatch,
unfinishedSchedules = busyNodeLatch,
serverThread = serverThread,
serverThread = serverThread,
flowLogicRefFactory = flowLogicRefFactory)
flowLogicRefFactory = flowLogicRefFactory,
drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties)
if (serverThread is ExecutorService) {
if (serverThread is ExecutorService) {
runOnStop += {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
// We wait here, even though any in-flight messages should have been drained away because the
@ -526,7 +529,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
* Builds node internal, advertised, and plugin services.
* Builds node internal, advertised, and plugin services.
* Returns a list of tokenizable services to be added to the serialisation context.
* Returns a list of tokenizable services to be added to the serialisation context.
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal): MutableList<Any> {
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal, nodeProperties: NodePropertiesStore): MutableList<Any> {
checkpointStorage = DBCheckpointStorage()
checkpointStorage = DBCheckpointStorage()
val metrics = MetricRegistry()
val metrics = MetricRegistry()
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
@ -541,8 +544,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
network = makeMessagingService(database, info)
network = makeMessagingService(database, info, nodeProperties)
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock,
services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
@ -676,7 +680,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
_started = null
_started = null
protected abstract fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService
protected abstract fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore): MessagingService
protected abstract fun startMessagingService(rpcOps: RPCOps)
protected abstract fun startMessagingService(rpcOps: RPCOps)
private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair<PartyAndCertificate, KeyPair> {
private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair<PartyAndCertificate, KeyPair> {
@ -747,7 +751,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val cordappProvider: CordappProviderInternal,
override val cordappProvider: CordappProviderInternal,
override val database: CordaPersistence,
override val database: CordaPersistence,
override val myInfo: NodeInfo,
override val myInfo: NodeInfo,
override val networkMapCache: NetworkMapCacheInternal
override val networkMapCache: NetworkMapCacheInternal,
override val nodeProperties: NodePropertiesStore
) : SingletonSerializeAsToken(), ServiceHubInternal, StateLoader by validatedTransactions {
) : SingletonSerializeAsToken(), ServiceHubInternal, StateLoader by validatedTransactions {
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
@ -28,6 +28,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.context
import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
import rx.Observable
import java.io.InputStream
import java.io.InputStream
@ -165,6 +166,9 @@ internal class CordaRPCOpsImpl(
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachine<T> {
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachine<T> {
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
if (isFlowsDrainingModeEnabled()) {
throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow()
return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow()
@ -284,6 +288,14 @@ internal class CordaRPCOpsImpl(
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
override fun setFlowsDrainingModeEnabled(enabled: Boolean) {
override fun isFlowsDrainingModeEnabled(): Boolean {
return services.nodeProperties.flowsDrainingMode.isEnabled()
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
@ -302,7 +314,7 @@ internal class CordaRPCOpsImpl(
is Origin.RPC -> FlowInitiator.RPC(principal)
is Origin.RPC -> FlowInitiator.RPC(principal)
is Origin.Peer -> services.identityService.wellKnownPartyFromX500Name((origin as Origin.Peer).party)?.let { FlowInitiator.Peer(it) } ?: throw IllegalStateException("Unknown peer with name ${(origin as Origin.Peer).party}.")
is Origin.Peer -> services.identityService.wellKnownPartyFromX500Name((origin as Origin.Peer).party)?.let { FlowInitiator.Peer(it) } ?: throw IllegalStateException("Unknown peer with name ${(origin as Origin.Peer).party}.")
is Origin.Service -> FlowInitiator.Service(principal)
is Origin.Service -> FlowInitiator.Service(principal)
is Origin.Shell -> FlowInitiator.Shell
Origin.Shell -> FlowInitiator.Shell
is Origin.Scheduled -> FlowInitiator.Scheduled((origin as Origin.Scheduled).scheduledState)
is Origin.Scheduled -> FlowInitiator.Scheduled((origin as Origin.Scheduled).scheduledState)
@ -2,12 +2,23 @@ package net.corda.node.internal
interface LifecycleSupport : Startable, Stoppable
interface LifecycleSupport : Startable, Stoppable
interface Stoppable {
interface Stoppable : AutoCloseable {
fun stop()
fun stop()
override fun close() = stop()
interface Startable {
interface Startable {
fun start()
fun start()
val started: Boolean
val started: Boolean
interface Connectable {
val connected: Boolean
fun connect()
fun disconnect()
@ -22,6 +22,7 @@ import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.SecurityConfiguration
@ -144,7 +145,7 @@ open class Node(configuration: NodeConfiguration,
private var shutdownHook: ShutdownHook? = null
private var shutdownHook: ShutdownHook? = null
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService {
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore): MessagingService {
// Construct security manager reading users data either from the 'security' config section
// Construct security manager reading users data either from the 'security' config section
// if present or from rpcUsers list if the former is missing from config.
// if present or from rpcUsers list if the former is missing from config.
val securityManagerConfig = configuration.security?.authService ?:
val securityManagerConfig = configuration.security?.authService ?:
@ -177,7 +178,9 @@ open class Node(configuration: NodeConfiguration,
isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled,
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values)
private fun startLocalRpcBroker(): BrokerAddresses? {
private fun startLocalRpcBroker(): BrokerAddresses? {
@ -0,0 +1,13 @@
package net.corda.node.internal
interface NodeUniqueIdProvider {
val value: String
// this is stubbed because we still do not support clustered node setups.
// the moment we will, this will have to be changed to return a value unique for each physical node.
internal object StubbedNodeUniqueIdProvider : NodeUniqueIdProvider {
// TODO implement to return a value unique for each physical node when we will support clustered node setups.
override val value: String = "NABOB"
@ -161,6 +161,12 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
implementation.vaultTrackByWithSorting(contractStateType, criteria, sorting)
implementation.vaultTrackByWithSorting(contractStateType, criteria, sorting)
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = guard("setFlowsDrainingModeEnabled") {
override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled)
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action)
private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action)
@ -0,0 +1,94 @@
package net.corda.node.internal.artemis
import net.corda.node.internal.Connectable
import net.corda.node.internal.LifecycleSupport
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Observable
import rx.subjects.PublishSubject
interface ReactiveArtemisConsumer : LifecycleSupport, Connectable {
val messages: Observable<ClientMessage>
companion object {
fun multiplex(createSession: () -> ClientSession, queueName: String, filter: String? = null, vararg queueNames: String): ReactiveArtemisConsumer {
return MultiplexingReactiveArtemisConsumer(setOf(queueName, *queueNames), createSession, filter)
fun multiplex(queueNames: Set<String>, createSession: () -> ClientSession, filter: String? = null): ReactiveArtemisConsumer {
return MultiplexingReactiveArtemisConsumer(queueNames, createSession, filter)
private class MultiplexingReactiveArtemisConsumer(private val queueNames: Set<String>, private val createSession: () -> ClientSession, private val filter: String?) : ReactiveArtemisConsumer {
private var startedFlag = false
override var connected = false
override val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
private val consumers = mutableSetOf<ClientConsumer>()
private val sessions = mutableSetOf<ClientSession>()
override fun start() {
synchronized(this) {
startedFlag = true
override fun stop() {
synchronized(this) {
if(startedFlag) {
startedFlag = false
override fun connect() {
synchronized(this) {
queueNames.forEach { queue ->
createSession().apply {
consumers += filter?.let { createConsumer(queue, it) } ?: createConsumer(queue)
sessions += this
consumers.forEach { consumer ->
consumer.setMessageHandler { message ->
connected = true
override fun disconnect() {
synchronized(this) {
if(connected) {
connected = false
override val started: Boolean
get() = startedFlag
@ -1,4 +1,4 @@
package net.corda.core.internal.schemas
package net.corda.node.internal.schemas
import net.corda.core.crypto.toStringShort
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.identity.PartyAndCertificate
@ -9,7 +9,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.NetworkHostAndPort
import java.io.Serializable
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import javax.persistence.*
import javax.persistence.*
object NodeInfoSchema
object NodeInfoSchema
@ -17,7 +17,7 @@ object NodeInfoSchema
object NodeInfoSchemaV1 : MappedSchema(
object NodeInfoSchemaV1 : MappedSchema(
schemaFamily = NodeInfoSchema.javaClass,
schemaFamily = NodeInfoSchema.javaClass,
version = 1,
version = 1,
mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java)
mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java, NodePropertiesPersistentStore.DBNodeProperty::class.java)
) {
) {
@Table(name = "node_infos")
@Table(name = "node_infos")
@ -33,7 +33,7 @@ object NodeInfoSchemaV1 : MappedSchema(
@Column(name = "addresses")
@Column(name = "addresses")
@OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true)
@OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true)
@JoinColumn(name = "node_info_id", foreignKey = ForeignKey(name = "FK__info_hosts__infos"))
@JoinColumn(name = "node_info_id", foreignKey = ForeignKey(name = "FK__info_hosts__infos"))
val addresses: List<NodeInfoSchemaV1.DBHostAndPort>,
val addresses: List<DBHostAndPort>,
@Column(name = "legal_identities_certs")
@Column(name = "legal_identities_certs")
@ManyToMany(cascade = arrayOf(CascadeType.ALL))
@ManyToMany(cascade = arrayOf(CascadeType.ALL))
@ -180,7 +180,7 @@ private object RPCPermissionResolver : PermissionResolver {
// Leaving empty set of targets and actions to match everything
// Leaving empty set of targets and actions to match everything
return RPCPermission()
return RPCPermission()
else -> throw IllegalArgumentException("Unkwnow permission action specifier: $action")
else -> throw IllegalArgumentException("Unknown permission action specifier: $action")
@ -0,0 +1,18 @@
package net.corda.node.services.api
import rx.Observable
interface NodePropertiesStore {
val flowsDrainingMode: FlowsDrainingModeOperations
interface FlowsDrainingModeOperations {
fun setEnabled(enabled: Boolean)
fun isEnabled(): Boolean
val values: Observable<Pair<Boolean, Boolean>>
@ -65,6 +65,7 @@ interface ServiceHubInternal : ServiceHub {
val networkService: MessagingService
val networkService: MessagingService
val database: CordaPersistence
val database: CordaPersistence
val configuration: NodeConfiguration
val configuration: NodeConfiguration
val nodeProperties: NodePropertiesStore
val networkMapUpdater: NetworkMapUpdater
val networkMapUpdater: NetworkMapUpdater
override val cordappProvider: CordappProviderInternal
override val cordappProvider: CordappProviderInternal
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
@ -16,6 +16,7 @@ import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import java.net.URL
import java.net.URL
import java.nio.file.Path
import java.nio.file.Path
import java.time.Duration
import java.util.*
import java.util.*
@ -48,6 +49,8 @@ interface NodeConfiguration : NodeSSLConfiguration {
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
// do not change this value without syncing it with ScheduledFlowsDrainingModeTest
val drainingModePollPeriod: Duration get() = Duration.ofSeconds(5)
fun validate(): List<String>
fun validate(): List<String>
@ -24,12 +24,14 @@ import net.corda.core.utilities.trace
import net.corda.node.internal.CordaClock
import net.corda.node.internal.CordaClock
import net.corda.node.internal.MutableClock
import net.corda.node.internal.MutableClock
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.api.SchedulerService
import net.corda.node.utilities.PersistentMap
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
import org.slf4j.Logger
import java.time.Duration
import java.time.Instant
import java.time.Instant
import java.util.*
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.*
@ -60,6 +62,8 @@ class NodeSchedulerService(private val clock: CordaClock,
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val serverThread: Executor,
private val serverThread: Executor,
private val flowLogicRefFactory: FlowLogicRefFactory,
private val flowLogicRefFactory: FlowLogicRefFactory,
private val nodeProperties: NodePropertiesStore,
private val drainingModePollPeriod: Duration,
private val log: Logger = staticLog,
private val log: Logger = staticLog,
private val scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
private val scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
: SchedulerService, SingletonSerializeAsToken() {
: SchedulerService, SingletonSerializeAsToken() {
@ -285,10 +289,19 @@ class NodeSchedulerService(private val clock: CordaClock,
} else {
} else {
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
log.trace { "Scheduler starting FlowLogic $flowLogic" }
scheduledFlow = when {
scheduledFlow = flowLogic
nodeProperties.flowsDrainingMode.isEnabled() -> {
log.warn("Ignoring scheduled flow start because of draining mode. FlowLogic: $flowLogic.")
awaitWithDeadline(clock, Instant.now() + drainingModePollPeriod)
else -> {
log.trace { "Scheduler starting FlowLogic $flowLogic" }
// and schedule the next one
// and schedule the next one
@ -66,7 +66,8 @@ interface MessagingService {
message: Message,
message: Message,
target: MessageRecipients,
target: MessageRecipients,
retryId: Long? = null,
retryId: Long? = null,
sequenceKey: Any = target
sequenceKey: Any = target,
additionalHeaders: Map<String, String> = emptyMap()
/** A message with a target and sequenceKey specified. */
/** A message with a target and sequenceKey specified. */
@ -143,3 +144,11 @@ object TopicStringValidator {
/** @throws IllegalArgumentException if the given topic contains invalid characters */
/** @throws IllegalArgumentException if the given topic contains invalid characters */
fun check(tag: String) = require(regex.matcher(tag).matches())
fun check(tag: String) = require(regex.matcher(tag).matches())
object P2PMessagingHeaders {
object Type {
const val KEY = "corda_p2p_message_type"
const val SESSION_INIT_VALUE = "session_init"
@ -12,16 +12,21 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
import net.corda.node.VersionInfo
import net.corda.node.internal.LifecycleSupport
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.PersistentMap
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
@ -34,14 +39,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ClientMessage
import rx.Observable
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Subscription
import rx.Subscription
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.security.PublicKey
import java.time.Instant
import java.time.Instant
import java.util.*
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.*
import javax.annotation.concurrent.ThreadSafe
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Entity
@ -76,17 +82,19 @@ import javax.persistence.Lob
* @param maxMessageSize A bound applied to the message size.
* @param maxMessageSize A bound applied to the message size.
class P2PMessagingClient(config: NodeConfiguration,
class P2PMessagingClient(private val config: NodeConfiguration,
private val versionInfo: VersionInfo,
private val versionInfo: VersionInfo,
serverAddress: NetworkHostAndPort,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val myIdentity: PublicKey,
private val serviceIdentity: PublicKey?,
private val serviceIdentity: PublicKey?,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val database: CordaPersistence,
private val database: CordaPersistence,
private val networkMap: NetworkMapCacheInternal,
private val networkMap: NetworkMapCacheInternal,
advertisedAddress: NetworkHostAndPort = serverAddress,
advertisedAddress: NetworkHostAndPort = serverAddress,
maxMessageSize: Int
private val maxMessageSize: Int,
) : SingletonSerializeAsToken(), MessagingService {
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>
) : SingletonSerializeAsToken(), MessagingService, AutoCloseable {
companion object {
companion object {
private val log = contextLogger()
private val log = contextLogger()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
@ -98,7 +106,7 @@ class P2PMessagingClient(config: NodeConfiguration,
private val releaseVersionProperty = SimpleString("release-version")
private val releaseVersionProperty = SimpleString("release-version")
private val platformVersionProperty = SimpleString("platform-version")
private val platformVersionProperty = SimpleString("platform-version")
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
private val messageMaxRetryCount: Int = 3
private const val messageMaxRetryCount: Int = 3
fun createProcessedMessage(): AppendOnlyPersistentMap<String, Instant, ProcessedMessage, String> {
fun createProcessedMessage(): AppendOnlyPersistentMap<String, Instant, ProcessedMessage, String> {
return AppendOnlyPersistentMap(
return AppendOnlyPersistentMap(
@ -141,11 +149,18 @@ class P2PMessagingClient(config: NodeConfiguration,
private class InnerState {
private class InnerState {
var started = false
var running = false
var running = false
var p2pConsumer: ClientConsumer? = null
var eventsSubscription: Subscription? = null
var serviceConsumer: ClientConsumer? = null
var p2pConsumer: P2PMessagingConsumer? = null
var locator: ServerLocator? = null
var producer: ClientProducer? = null
var producerSession: ClientSession? = null
var bridgeSession: ClientSession? = null
var bridgeNotifyConsumer: ClientConsumer? = null
var bridgeNotifyConsumer: ClientConsumer? = null
var networkChangeSubscription: Subscription? = null
var networkChangeSubscription: Subscription? = null
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
private val messagesToRedeliver = database.transaction {
private val messagesToRedeliver = database.transaction {
@ -165,7 +180,6 @@ class P2PMessagingClient(config: NodeConfiguration,
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
private val state = ThreadBox(InnerState())
private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val handlers = CopyOnWriteArrayList<Handler>()
private val handlers = CopyOnWriteArrayList<Handler>()
@ -201,28 +215,45 @@ class P2PMessagingClient(config: NodeConfiguration,
fun start() {
fun start() {
state.locked {
state.locked {
val session = artemis.start().session
started = true
val inbox = RemoteInboxAddress(myIdentity).queueName
log.info("Connecting to message broker: $serverAddress")
val inboxes = mutableListOf(inbox)
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
// Create a queue, consumer and producer for handling P2P network messages.
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
p2pConsumer = session.createConsumer(inbox)
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
if (serviceIdentity != null) {
// would be the default and the two lines below can be deleted.
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
connectionTTL = -1
inboxes += serviceAddress
clientFailureCheckPeriod = -1
minLargeMessageSize = maxMessageSize
val serviceHandler = session.createConsumer(serviceAddress)
isUseGlobalPools = nodeSerializationEnv != null
serviceHandler.setMessageHandler { msg ->
val message: ReceivedMessage? = artemisToCordaMessage(msg)
if (message != null)
state.locked {
registerBridgeControl(session, inboxes)
val sessionFactory = locator!!.createSessionFactory()
enumerateBridges(session, inboxes)
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
// size of 1MB is acknowledged.
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER, false, true, true, locator!!.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
producerSession = createNewSession()
bridgeSession = createNewSession()
val inboxes = mutableSetOf<String>()
// Create a queue, consumer and producer for handling P2P network messages.
// Create a general purpose producer.
producer = producerSession!!.createProducer()
inboxes += RemoteInboxAddress(myIdentity).queueName
serviceIdentity?.let {
inboxes += RemoteInboxAddress(it).queueName
inboxes.forEach { createQueueIfAbsent(it, producerSession!!) }
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
registerBridgeControl(bridgeSession!!, inboxes.toList())
enumerateBridges(bridgeSession!!, inboxes.toList())
@ -240,7 +271,7 @@ class P2PMessagingClient(config: NodeConfiguration,
when (notifyMessage) {
when (notifyMessage) {
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage")
else -> log.error("Unexpected Bridge Control message type on notify topic $notifyMessage")
@ -249,20 +280,23 @@ class P2PMessagingClient(config: NodeConfiguration,
private fun sendBridgeControl(message: BridgeControl) {
private fun sendBridgeControl(message: BridgeControl) {
val client = artemis.started!!
state.locked {
val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = client.session.createMessage(false)
val artemisMessage = producerSession!!.createMessage(false)
client.producer.send(BRIDGE_CONTROL, artemisMessage)
sendMessage(BRIDGE_CONTROL, artemisMessage)
private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) {
private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) {
log.info("Updating bridges on network map change: ${change.node}")
log.info("Updating bridges on network map change: ${change.node}")
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
return node.legalIdentitiesAndCerts.map {
return state.locked {
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
node.legalIdentitiesAndCerts.map {
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name })
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
}.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name })
}.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
fun deployBridges(node: NodeInfo) {
fun deployBridges(node: NodeInfo) {
@ -315,62 +349,43 @@ class P2PMessagingClient(config: NodeConfiguration,
private fun resumeMessageRedelivery() {
private fun resumeMessageRedelivery() {
messagesToRedeliver.forEach { retryId, (message, target) ->
messagesToRedeliver.forEach { retryId, (message, target) ->
send(message, target, retryId)
sendInternal(message, target, retryId)
private val shutdownLatch = CountDownLatch(1)
private val shutdownLatch = CountDownLatch(1)
private fun processMessage(consumer: ClientConsumer): Boolean {
// Two possibilities here:
// 1. We block waiting for a message and the consumer is closed in another thread. In this case
// receive returns null and we break out of the loop.
// 2. We receive a message and process it, and stop() is called during delivery. In this case,
// calling receive will throw and we break out of the loop.
// It's safe to call into receive simultaneous with other threads calling send on a producer.
val artemisMessage: ClientMessage = try {
} catch (e: ActiveMQObjectClosedException) {
} ?: return false
val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage)
if (message != null)
// Ack the message so it won't be redelivered. We should only really do this when there were no
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
// a few times before giving up and redirecting the message to a dead-letter address for admin or
// developer inspection. Artemis has the features to do this for us, we just need to enable them.
// TODO: Setup Artemis delayed redelivery and dead letter addresses.
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
// doesn't collide with a send here. Note that stop() could have been called whilst we were
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
state.locked {
return true
* Starts the p2p event loop: this method only returns once [stop] has been called.
* Starts the p2p event loop: this method only returns once [stop] has been called.
fun run() {
fun run() {
val latch = CountDownLatch(1)
try {
try {
val consumer = state.locked {
val consumer = state.locked {
check(artemis.started != null) { "start must be called first" }
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
check(!running) { "run can't be called twice" }
running = true
running = true
// If it's null, it means we already called stop, so return immediately.
// If it's null, it means we already called stop, so return immediately.
p2pConsumer ?: return
if (p2pConsumer == null) {
eventsSubscription = p2pConsumer!!.messages
.doOnError { error -> throw error }
.doOnNext { artemisMessage ->
val receivedMessage = artemisToCordaMessage(artemisMessage)
receivedMessage?.let {
// this `run()` method is semantically meant to block until the message consumption runs, hence the latch here
while (processMessage(consumer)) { }
} finally {
} finally {
@ -458,30 +473,24 @@ class P2PMessagingClient(config: NodeConfiguration,
fun stop() {
fun stop() {
val running = state.locked {
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(artemis.started != null)
val prevRunning = running
val prevRunning = running
running = false
running = false
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
require(p2pConsumer != null, {"stop can't be called twice"})
try {
require(producer != null, {"stop can't be called twice"})
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
try {
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
p2pConsumer = null
p2pConsumer = null
val s = serviceConsumer
try {
producer = null
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
serviceConsumer = null
eventsSubscription = null
if (running && !nodeExecutor.isOnThread) {
if (running && !nodeExecutor.isOnThread) {
@ -489,21 +498,32 @@ class P2PMessagingClient(config: NodeConfiguration,
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
if (running) {
state.locked {
state.locked {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
private fun close(target: AutoCloseable?) {
try {
} catch (ignored: ActiveMQObjectClosedException) {
// swallow
override fun close() = stop()
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map<String, String>) {
sendInternal(message, target, retryId, additionalHeaders)
private fun sendInternal(message: Message, target: MessageRecipients, retryId: Long?, additionalHeaders: Map<String, String> = emptyMap()) {
// We have to perform sending on a different thread pool, since using the same pool for messaging and
// We have to perform sending on a different thread pool, since using the same pool for messaging and
// fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals.
// fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals.
messagingExecutor.fetchFrom {
messagingExecutor.fetchFrom {
state.locked {
state.locked {
val mqAddress = getMQAddress(target)
val mqAddress = getMQAddress(target)
val artemis = artemis.started!!
val artemisMessage = producerSession!!.createMessage(true).apply {
val artemisMessage = artemis.session.createMessage(true).apply {
putStringProperty(cordaVendorProperty, cordaVendor)
putStringProperty(cordaVendorProperty, cordaVendor)
putStringProperty(releaseVersionProperty, releaseVersion)
putStringProperty(releaseVersionProperty, releaseVersion)
putIntProperty(platformVersionProperty, versionInfo.platformVersion)
putIntProperty(platformVersionProperty, versionInfo.platformVersion)
@ -516,11 +536,12 @@ class P2PMessagingClient(config: NodeConfiguration,
if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) {
if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
additionalHeaders.forEach { key, value -> putStringProperty(key, value)}
log.trace {
log.trace {
"Send to: $mqAddress topic: ${message.topic} uuid: ${message.uniqueMessageId}"
"Send to: $mqAddress topic: ${message.topic} uuid: ${message.uniqueMessageId}"
artemis.producer.send(mqAddress, artemisMessage)
sendMessage(mqAddress, artemisMessage)
retryId?.let {
retryId?.let {
database.transaction {
database.transaction {
messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) })
messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) })
@ -556,7 +577,7 @@ class P2PMessagingClient(config: NodeConfiguration,
state.locked {
state.locked {
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
artemis.started!!.producer.send(address, message)
sendMessage(address, message)
scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({
scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({
@ -575,6 +596,9 @@ class P2PMessagingClient(config: NodeConfiguration,
private fun Pair<ClientMessage, ReceivedMessage?>.deliver() = deliver(second!!)
private fun Pair<ClientMessage, ReceivedMessage?>.acknowledge() = first.acknowledge()
private fun getMQAddress(target: MessageRecipients): String {
private fun getMQAddress(target: MessageRecipients): String {
return if (target == myAddress) {
return if (target == myAddress) {
// If we are sending to ourselves then route the message directly to our P2P queue.
// If we are sending to ourselves then route the message directly to our P2P queue.
@ -583,28 +607,27 @@ class P2PMessagingClient(config: NodeConfiguration,
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
// broker's job to route the message to the target's P2P queue.
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
state.locked {
createQueueIfAbsent(internalTargetQueue, producerSession!!)
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: String) {
private fun createQueueIfAbsent(queueName: String, session: ClientSession) {
if (!knownQueues.contains(queueName)) {
if (!knownQueues.contains(queueName)) {
state.alreadyLocked {
val queueQuery = session.queueQuery(SimpleString(queueName))
val session = artemis.started!!.session
if (!queueQuery.isExists) {
val queueQuery = session.queueQuery(SimpleString(queueName))
log.info("Create fresh queue $queueName bound on same address")
if (!queueQuery.isExists) {
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
log.info("Create fresh queue $queueName bound on same address")
if (queueName.startsWith(PEERS_PREFIX)) {
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
val keyHash = queueName.substring(PEERS_PREFIX.length)
if (queueName.startsWith(PEERS_PREFIX)) {
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
val keyHash = queueName.substring(PEERS_PREFIX.length)
for (node in peers) {
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name })
for (node in peers) {
val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge)
val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name })
val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge)
@ -637,3 +660,78 @@ class P2PMessagingClient(config: NodeConfiguration,
private class P2PMessagingConsumer(
queueNames: Set<String>,
createSession: () -> ClientSession,
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
private companion object {
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}=${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}"
private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}"
private var startedFlag = false
val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages)
private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages)
private val subscriptions = mutableSetOf<Subscription>()
override fun start() {
synchronized(this) {
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe()
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe()
subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe()
subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe()
if (!isDrainingModeOn()) {
startedFlag = true
override fun stop() {
synchronized(this) {
if (startedFlag) {
startedFlag = false
override val started: Boolean
get() = startedFlag
private fun pauseInitial() {
if (initialConsumer.started && initialConsumer.connected) {
private fun resumeInitial() {
if(!initialConsumer.started) {
if (!initialConsumer.connected) {
private fun Pair<Boolean, Boolean>.switchedOff() = first && !second
private fun Pair<Boolean, Boolean>.switchedOn() = !first && second
@ -10,7 +10,7 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NotaryInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.schemas.NodeInfoSchemaV1
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.IdentityService
@ -0,0 +1,67 @@
package net.corda.node.services.persistence
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.NodePropertiesStore.FlowsDrainingModeOperations
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.slf4j.Logger
import rx.subjects.PublishSubject
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
* Simple node properties key value store in DB.
class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, persistence: CordaPersistence) : NodePropertiesStore {
private companion object {
val logger = loggerFor<NodePropertiesStore>()
override val flowsDrainingMode: FlowsDrainingModeOperations = FlowsDrainingModeOperationsImpl(readPhysicalNodeId, persistence, logger)
@Table(name = "${NODE_DATABASE_PREFIX}properties")
class DBNodeProperty(
@Column(name = "key")
val key: String = "",
@Column(name = "value")
var value: String? = ""
private class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private val persistence: CordaPersistence, logger: Logger) : FlowsDrainingModeOperations {
private val nodeSpecificFlowsExecutionModeKey = "${readPhysicalNodeId()}_flowsExecutionMode"
init {
logger.debug { "Node's flow execution mode property key: $nodeSpecificFlowsExecutionModeKey" }
private val map = PersistentMap({ key -> key }, { entity -> entity.key to entity.value!! }, NodePropertiesPersistentStore::DBNodeProperty, NodePropertiesPersistentStore.DBNodeProperty::class.java)
override val values = PublishSubject.create<Pair<Boolean, Boolean>>()!!
override fun setEnabled(enabled: Boolean) {
var oldValue: Boolean? = null
persistence.transaction {
oldValue = map.put(nodeSpecificFlowsExecutionModeKey, enabled.toString())?.toBoolean() ?: false
values.onNext(oldValue!! to enabled)
override fun isEnabled(): Boolean {
return persistence.transaction {
map[nodeSpecificFlowsExecutionModeKey]?.toBoolean() ?: false
@ -3,7 +3,7 @@ package net.corda.node.services.schema
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.LinearState
import net.corda.core.internal.schemas.NodeInfoSchemaV1
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.PersistentState
@ -14,7 +14,6 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogic
@ -37,6 +36,7 @@ import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.P2PMessagingHeaders
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.newNamedSingleThreadExecutor
import net.corda.node.utilities.newNamedSingleThreadExecutor
@ -640,11 +640,18 @@ class StateMachineManagerImpl(
serviceHub.networkService.apply {
serviceHub.networkService.apply {
send(createMessage(sessionTopic, serialized.bytes), address, retryId = retryId)
send(createMessage(sessionTopic, serialized.bytes), address, retryId = retryId, additionalHeaders = message.additionalHeaders())
private fun SessionMessage.additionalHeaders(): Map<String, String> {
return when (this) {
is InitialSessionMessage -> mapOf(P2PMessagingHeaders.Type.KEY to P2PMessagingHeaders.Type.SESSION_INIT_VALUE)
else -> emptyMap()
class SessionRejectException(val rejectMessage: String, val logMessage: String) : CordaException(rejectMessage) {
class SessionRejectException(val rejectMessage: String, val logMessage: String) : CordaException(rejectMessage) {
constructor(message: String) : this(message, message)
constructor(message: String) : this(message, message)
@ -0,0 +1,148 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.SchedulableFlow
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.StartedNode
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Instant
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.reflect.jvm.jvmName
import kotlin.test.fail
class ScheduledFlowsDrainingModeTest {
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: StartedNode<InternalMockNetwork.MockNode>
private lateinit var bobNode: StartedNode<InternalMockNetwork.MockNode>
private lateinit var notary: Party
private lateinit var alice: Party
private lateinit var bob: Party
private var executor: ScheduledExecutorService? = null
companion object {
private val logger = loggerFor<ScheduledFlowsDrainingModeTest>()
fun setup() {
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.testing.contracts"))
aliceNode = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME))
bobNode = mockNet.createNode(MockNodeParameters(legalName = BOB_NAME))
notary = mockNet.defaultNotaryIdentity
alice = aliceNode.info.singleIdentity()
bob = bobNode.info.singleIdentity()
executor = Executors.newSingleThreadScheduledExecutor()
fun cleanUp() {
fun `flows draining mode ignores scheduled flows until unset`() {
val latch = CountDownLatch(1)
var shouldFail = true
val scheduledStates = aliceNode.services
.filter { update -> update.containsType<ScheduledState>() }
.map { update -> update.produced.single().state.data as ScheduledState }
scheduledStates.filter { state -> !state.processed }.doOnNext { _ ->
// this is needed because there is a delay between the moment a SchedulableState gets in the Vault and the first time nextScheduledActivity is called
logger.info("Disabling flows draining mode")
shouldFail = false
}, 5, TimeUnit.SECONDS)
scheduledStates.filter { state -> state.processed }.doOnNext { _ ->
if (shouldFail) {
fail("Should not have happened before draining is switched off.")
val flow = aliceNode.services.startFlow(InsertInitialStateFlow(bob, notary))
data class ScheduledState(private val creationTime: Instant, val source: Party, val destination: Party, val processed: Boolean = false, override val linearId: UniqueIdentifier = UniqueIdentifier()) : SchedulableState, LinearState {
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
return if (!processed) {
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.jvmName, thisStateRef)
ScheduledActivity(logicRef, creationTime)
} else {
override val participants: List<Party> get() = listOf(source, destination)
class InsertInitialStateFlow(private val destination: Party, private val notary: Party) : FlowLogic<Unit>() {
override fun call() {
val scheduledState = ScheduledState(serviceHub.clock.instant(), ourIdentity, destination)
val builder = TransactionBuilder(notary).addOutputState(scheduledState, DummyContract.PROGRAM_ID).addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
override fun call() {
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
val scheduledState = state.state.data
// Only run flow over states originating on this node
if (!serviceHub.myInfo.isLegalIdentity(scheduledState.source)) {
require(!scheduledState.processed) { "State should not have been previously processed" }
val notary = state.state.notary
val newStateOutput = scheduledState.copy(processed = true)
val builder = TransactionBuilder(notary).addInputState(state).addOutputState(newStateOutput, DummyContract.PROGRAM_ID).addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
@ -6,8 +6,9 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.node.internal.security.Password
import net.corda.node.internal.security.Password
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.tryAuthenticate
import net.corda.node.internal.security.tryAuthenticate
import net.corda.nodeapi.internal.config.User
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.SecurityConfiguration
import net.corda.nodeapi.internal.config.User
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import org.junit.Test
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.FailedLoginException
@ -30,8 +31,8 @@ class RPCSecurityManagerTest {
permitted = setOf(arrayListOf("nodeInfo"), arrayListOf("notaryIdentities")),
permitted = setOf(arrayListOf("nodeInfo"), arrayListOf("notaryIdentities")),
permissions = setOf(
permissions = setOf(
@ -46,7 +47,7 @@ class RPCSecurityManagerTest {
fun `Check startFlow RPC permission implies startFlowDynamic`() {
fun `Check startFlow RPC permission implies startFlowDynamic`() {
permissions = setOf(Permissions.invokeRpc("startFlow")),
permissions = setOf(invokeRpc("startFlow")),
permitted = setOf(arrayListOf("startFlow"), arrayListOf("startFlowDynamic")))
permitted = setOf(arrayListOf("startFlow"), arrayListOf("startFlowDynamic")))
@ -54,7 +55,7 @@ class RPCSecurityManagerTest {
fun `Check startTrackedFlow RPC permission implies startTrackedFlowDynamic`() {
fun `Check startTrackedFlow RPC permission implies startTrackedFlowDynamic`() {
permitted = setOf(arrayListOf("startTrackedFlow"), arrayListOf("startTrackedFlowDynamic")),
permitted = setOf(arrayListOf("startTrackedFlow"), arrayListOf("startTrackedFlowDynamic")),
permissions = setOf(Permissions.invokeRpc("startTrackedFlow")))
permissions = setOf(invokeRpc("startTrackedFlow")))
@ -64,6 +65,18 @@ class RPCSecurityManagerTest {
permitted = allActions.map { arrayListOf(it) }.toSet())
permitted = allActions.map { arrayListOf(it) }.toSet())
fun `flows draining mode permissions`() {
permitted = setOf(arrayListOf("setFlowsDrainingModeEnabled")),
permissions = setOf(invokeRpc(CordaRPCOps::setFlowsDrainingModeEnabled))
permitted = setOf(arrayListOf("isFlowsDrainingModeEnabled")),
permissions = setOf(invokeRpc(CordaRPCOps::isFlowsDrainingModeEnabled))
fun `Malformed permission strings`() {
fun `Malformed permission strings`() {
@ -131,11 +144,11 @@ class RPCSecurityManagerTest {
val call = request.first()
val call = request.first()
val args = request.drop(1).toTypedArray()
val args = request.drop(1).toTypedArray()
assert(subject.isPermitted(request.first(), *args)) {
assert(subject.isPermitted(request.first(), *args)) {
"User ${subject.principal} should be permitted ${call} with target '${request.toList()}'"
"User ${subject.principal} should be permitted $call with target '${request.toList()}'"
if (args.isEmpty()) {
if (args.isEmpty()) {
assert(subject.isPermitted(request.first(), "XXX")) {
assert(subject.isPermitted(request.first(), "XXX")) {
"User ${subject.principal} should be permitted ${call} with any target"
"User ${subject.principal} should be permitted $call with any target"
@ -12,6 +12,7 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.node.StateLoader
import net.corda.core.node.StateLoader
import net.corda.core.utilities.days
import net.corda.core.utilities.days
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.NodePropertiesStore
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.internal.doLookup
import net.corda.testing.internal.doLookup
@ -23,6 +24,7 @@ import org.junit.rules.TestWatcher
import org.junit.runner.Description
import org.junit.runner.Description
import org.slf4j.Logger
import org.slf4j.Logger
import java.time.Clock
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.Instant
class NodeSchedulerServiceTest {
class NodeSchedulerServiceTest {
@ -39,6 +41,12 @@ class NodeSchedulerServiceTest {
private val flowStarter = rigorousMock<FlowStarter>().also {
private val flowStarter = rigorousMock<FlowStarter>().also {
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also {
private val nodeProperties = rigorousMock<NodePropertiesStore>().also {
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
private val stateLoader = rigorousMock<StateLoader>().also {
private val stateLoader = rigorousMock<StateLoader>().also {
@ -58,6 +66,8 @@ class NodeSchedulerServiceTest {
serverThread = MoreExecutors.directExecutor(),
serverThread = MoreExecutors.directExecutor(),
flowLogicRefFactory = flowLogicRefFactory,
flowLogicRefFactory = flowLogicRefFactory,
nodeProperties = nodeProperties,
drainingModePollPeriod = Duration.ofSeconds(5),
log = log,
log = log,
scheduledStates = mutableMapOf()).apply { start() }
scheduledStates = mutableMapOf()).apply { start() }
@ -173,4 +183,4 @@ class NodeSchedulerServiceTest {
scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef)
scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef)
@ -26,6 +26,7 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Rule
import org.junit.Test
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.junit.rules.TemporaryFolder
import rx.subjects.PublishSubject
import java.net.ServerSocket
import java.net.ServerSocket
import java.util.concurrent.BlockingQueue
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.LinkedBlockingQueue
@ -150,10 +151,11 @@ class ArtemisMessagingTest {
val messagingClient = createMessagingClient(platformVersion = platformVersion)
val messagingClient = createMessagingClient(platformVersion = platformVersion)
messagingClient.addMessageHandler(TOPIC) { message, _ ->
messagingClient.addMessageHandler(TOPIC) { message, _ ->
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread(isDaemon = true) { messagingClient.run() }
thread(isDaemon = true) { messagingClient.run() }
@ -171,7 +173,9 @@ class ArtemisMessagingTest {
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
maxMessageSize = maxMessageSize).apply {
maxMessageSize = maxMessageSize,
isDrainingModeOn = { false },
drainingModeWasChangedEvents = PublishSubject.create<Pair<Boolean, Boolean>>()).apply {
messagingClient = this
messagingClient = this
@ -184,4 +188,4 @@ class ArtemisMessagingTest {
messagingServer = this
messagingServer = this
@ -330,7 +330,7 @@ class InMemoryMessagingNetwork internal constructor(
state.locked { check(handlers.remove(registration as Handler)) }
state.locked { check(handlers.remove(registration as Handler)) }
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map<String, String>) {
msgSend(this, message, target)
msgSend(this, message, target)
if (!sendManuallyPumped) {
if (!sendManuallyPumped) {
@ -159,7 +159,7 @@ class DriverDSLImpl(
config.corda.rpcUsers[0].run { client.start(username, password) }
config.corda.rpcUsers[0].run { client.start(username, password) }
} catch (e: Exception) {
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
if (processDeathFuture.isDone) throw e
log.error("Exception while connecting to RPC, retrying to connect at $rpcAddress", e)
log.info("Exception while connecting to RPC, retrying to connect at $rpcAddress", e)
@ -33,6 +33,7 @@ import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.StartedNode
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.*
import net.corda.node.services.config.*
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.keys.E2ETestKeyManagementService
@ -265,7 +266,7 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
// We only need to override the messaging service here, as currently everything that hits disk does so
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService {
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore): MessagingService {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(
return mockNet.messagingNetwork.createNodeWithID(
Reference in New Issue
Block a user