Merge commit 'da591363fdccd220455a92f083d2ad59ed0e2d12' into aslemmer-merge-19-Feb

This commit is contained in:
Andras Slemmer 2018-02-20 13:52:03 +00:00
commit 38d0ad30df
33 changed files with 772 additions and 39 deletions

View File

@ -1586,6 +1586,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
@org.jetbrains.annotations.NotNull public abstract Iterable getVaultTransactionNotes(net.corda.core.crypto.SecureHash)
@kotlin.Deprecated @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed internalVerifiedTransactionsFeed()
@kotlin.Deprecated @org.jetbrains.annotations.NotNull public abstract List internalVerifiedTransactionsSnapshot()
public abstract boolean isFlowsDrainingModeEnabled()
public abstract boolean killFlow(net.corda.core.flows.StateMachineRunId)
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed networkMapFeed()
@org.jetbrains.annotations.NotNull public abstract List networkMapSnapshot()
@ -1599,6 +1600,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
@org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party partyFromKey(java.security.PublicKey)
@org.jetbrains.annotations.NotNull public abstract List queryAttachments(net.corda.core.node.services.vault.AttachmentQueryCriteria, net.corda.core.node.services.vault.AttachmentSort)
@org.jetbrains.annotations.NotNull public abstract List registeredFlows()
public abstract void setFlowsDrainingModeEnabled(boolean)
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed stateMachineRecordedTransactionMappingFeed()
@org.jetbrains.annotations.NotNull public abstract List stateMachineRecordedTransactionMappingSnapshot()
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed stateMachinesFeed()

View File

@ -0,0 +1,109 @@
package net.corda.client.rpc
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.internal.packageName
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.getOrThrow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.testing.core.*
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Assume.assumeFalse
import org.junit.Before
import org.junit.Test
class FlowsExecutionModeRpcTest {
@Test
fun `persistent state survives node restart`() {
// Temporary disable this test when executed on Windows. It is known to be sporadically failing.
// More investigation is needed to establish why.
assumeFalse(System.getProperty("os.name").toLowerCase().startsWith("win"))
val user = User("mark", "dadada", setOf(invokeRpc("setFlowsDrainingModeEnabled"), invokeRpc("isFlowsDrainingModeEnabled")))
driver(isDebug = true, startNodesInProcess = true) {
val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.chooseIdentity().name
nodeHandle.rpc.setFlowsDrainingModeEnabled(true)
nodeHandle.stop()
nodeName
}()
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user)).getOrThrow()
assertThat(nodeHandle.rpc.isFlowsDrainingModeEnabled()).isEqualTo(true)
nodeHandle.stop()
}
}
}
class FlowsExecutionModeTests : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
private lateinit var node: StartedNode<Node>
private lateinit var client: CordaRPCClient
@Before
fun setup() {
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!)
}
@Test
fun `flows draining mode can be enabled and queried`() {
asALoggerUser { rpcOps ->
val newValue = true
rpcOps.setFlowsDrainingModeEnabled(true)
val flowsExecutionMode = rpcOps.isFlowsDrainingModeEnabled()
assertThat(flowsExecutionMode).isEqualTo(newValue)
}
}
@Test
fun `flows draining mode can be disabled and queried`() {
asALoggerUser { rpcOps ->
rpcOps.setFlowsDrainingModeEnabled(true)
val newValue = false
rpcOps.setFlowsDrainingModeEnabled(newValue)
val flowsExecutionMode = rpcOps.isFlowsDrainingModeEnabled()
assertThat(flowsExecutionMode).isEqualTo(newValue)
}
}
@Test
fun `node starts with flows draining mode disabled`() {
asALoggerUser { rpcOps ->
val defaultStartingMode = rpcOps.isFlowsDrainingModeEnabled()
assertThat(defaultStartingMode).isEqualTo(false)
}
}
private fun login(username: String, password: String, externalTrace: Trace? = null, impersonatedActor: Actor? = null): CordaRPCConnection {
return client.start(username, password, externalTrace, impersonatedActor)
}
private fun asALoggerUser(action: (CordaRPCOps) -> Unit) {
login(rpcUser.username, rpcUser.password).use {
action(it.proxy)
}
}
}

View File

@ -1,10 +1,7 @@
package net.corda.core.messaging
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.Actor
import net.corda.core.context.AuthServiceId
import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
@ -360,6 +357,21 @@ interface CordaRPCOps : RPCOps {
/** Clear all network map data from local node cache. */
fun clearNetworkMapCache()
/** Sets the value of the node's flows draining mode.
* If this mode is [enabled], the node will reject new flows through RPC, ignore scheduled flows, and do not process
* initial session messages, meaning that P2P counter-parties will not be able to initiate new flows involving the node.
*
* @param enabled whether the flows draining mode will be enabled.
* */
fun setFlowsDrainingModeEnabled(enabled: Boolean)
/**
* Returns whether the flows draining mode is enabled.
*
* @see setFlowsDrainingModeEnabled
*/
fun isFlowsDrainingModeEnabled(): Boolean
}
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),

View File

@ -5,6 +5,7 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.CordaSerializable
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Observable
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future

View File

@ -6,6 +6,11 @@ from previous releases. Please refer to :doc:`upgrade-notes` for detailed instru
UNRELEASED
----------
* Introduced Flow Draining mode, in which a node continues executing existing flows, but does not start new. This is to support graceful node shutdown/restarts.
In particular, when this mode is on, new flows through RPC will be rejected, scheduled flows will be ignored, and initial session messages will not be consumed.
This will ensure that the number of checkpoints will strictly diminish with time, allowing for a clean shutdown.
* Make the serialisation finger-printer a pluggable entity rather than hard wiring into the factory
* Removed blacklisted word checks in Corda X.500 name to allow "Server" or "Node" to be use as part of the legal name.

View File

@ -79,10 +79,10 @@ class TutorialMockNetwork {
// DOCSTART 1
// modify message if it's 1
nodeB.setMessagingServiceSpy(object : MessagingServiceSpy(nodeB.network) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map<String, String>) {
val messageData = message.data.deserialize<Any>() as? ExistingSessionMessage
val payload = messageData?.payload
if (payload is DataSessionMessage && payload.payload.deserialize() == 1) {
val alteredMessageData = messageData.copy(payload = payload.copy(99.serialize())).serialize().bytes
messagingService.send(InMemoryMessagingNetwork.InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData), message.uniqueMessageId), target, retryId)

View File

@ -76,4 +76,18 @@ The node also has several CorDapps installed by default to handle common tasks s
* Retrieving transactions and attachments from counterparties
* Upgrading contracts
* Broadcasting agreed ledger updates for recording by counterparties
* Broadcasting agreed ledger updates for recording by counterparties
Draining mode
^^^^^^^^^^^^^
In order to operate a clean shutdown of a node, it is important than no flows are in-flight, meaning no checkpoints should
be persisted. The node is able to be put in a Flows Draining Mode, during which:
* Commands requiring to start new flows through RPC will be rejected.
* Scheduled flows due will be ignored.
* Initial P2P session messages will not be processed, meaning peers will not be able to initiate new flows involving the node.
* All other activities will proceed as usual, ensuring that the number of in-flight flows will strictly diminish.
As their number - which can be monitored through RPC - reaches zero, it is safe to shut the node down.
This property is durable, meaning that restarting the node will not reset it to its default value and that a RPC command is required.

View File

@ -10,7 +10,7 @@ See [here](../../docs/source/corda-api.rst) for Corda's public API strategy. We
apply this plugin to other modules in future Corda releases as those modules' APIs stabilise.
Basically, this plugin will document a module's `public` and `protected` classes/methods/fields,
excluding those from our `*.internal.*` packgages, any synthetic methods, bridge methods, or methods
excluding those from our `*.internal.*` packages, any synthetic methods, bridge methods, or methods
identified as having Kotlin's `internal` scope. (Kotlin doesn't seem to have implemented `internal`
scope for classes or fields yet as these are currently `public` inside the `.class` file.)

View File

@ -0,0 +1,8 @@
package net.corda.nodeapi.exceptions
import net.corda.core.CordaRuntimeException
/**
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
*/
class RejectedCommandException(msg: String) : CordaRuntimeException(msg)

View File

@ -0,0 +1,101 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.core.chooseIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.fail
class P2PFlowsDrainingModeTest {
private val portAllocation = PortAllocation.Incremental(10000)
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
private var executor: ExecutorService? = null
companion object {
private val logger = loggerFor<P2PFlowsDrainingModeTest>()
}
@Before
fun setup() {
executor = Executors.newSingleThreadExecutor()
}
@After
fun cleanUp() {
executor!!.shutdown()
}
@Test
fun `flows draining mode suspends consumption of initial session messages`() {
driver(DriverParameters(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation)) {
val initiatedNode = startNode().getOrThrow()
val initiating = startNode(rpcUsers = users).getOrThrow().rpc
val counterParty = initiatedNode.nodeInfo.chooseIdentity()
val initiated = initiatedNode.rpc
initiated.setFlowsDrainingModeEnabled(true)
var shouldFail = true
initiating.apply {
val flow = startFlow(::InitiateSessionFlow, counterParty)
// this should be really fast, for the flow has already started, so 5 seconds should never be a problem
executor!!.submit({
logger.info("Now disabling flows draining mode for $counterParty.")
shouldFail = false
initiated.setFlowsDrainingModeEnabled(false)
})
flow.returnValue.map { result ->
if (shouldFail) {
fail("Shouldn't happen until flows draining mode is switched off.")
} else {
assertThat(result).isEqualTo("Hi there answer")
}
}.getOrThrow()
}
}
}
@StartableByRPC
@InitiatingFlow
class InitiateSessionFlow(private val counterParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(counterParty)
session.send("Hi there")
return session.receive<String>().unwrap { it }
}
}
@InitiatedBy(InitiateSessionFlow::class)
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val message = initiatingSession.receive<String>().unwrap { it }
initiatingSession.send("$message answer")
}
}
}

View File

@ -0,0 +1,50 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.Permissions
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.catchThrowable
import org.junit.Test
class RpcFlowsDrainingModeTest {
private val portAllocation = PortAllocation.Incremental(10000)
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
@Test
fun `flows draining mode rejects start flows commands through rpc`() {
driver(DriverParameters(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation)) {
startNode(rpcUsers = users).getOrThrow().rpc.apply {
setFlowsDrainingModeEnabled(true)
val error: Throwable? = catchThrowable { startFlow(RpcFlowsDrainingModeTest::NoOpFlow) }
assertThat(error).isNotNull()
assertThat(error!!).isInstanceOf(RejectedCommandException::class.java)
}
}
}
@StartableByRPC
class NoOpFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
println("NO OP!")
}
}
}

View File

@ -202,7 +202,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(info.legalIdentitiesAndCerts)
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache, nodeProperties)
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
if (mutualExclusionConfiguration.on) {
RunOnceService(database, mutualExclusionConfiguration.machineName,
@ -219,8 +220,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
flowStarter,
transactionStorage,
unfinishedSchedules = busyNodeLatch,
flowLogicRefFactory = flowLogicRefFactory
)
flowLogicRefFactory = flowLogicRefFactory,
drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties)
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps)
@ -523,7 +525,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
* Builds node internal, advertised, and plugin services.
* Returns a list of tokenizable services to be added to the serialisation context.
*/
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityService, networkMapCache: NetworkMapCacheInternal): MutableList<Any> {
private fun makeServices(keyPairs: Set<KeyPair>,
schemaService: SchemaService,
transactionStorage: WritableTransactionStorage,
database: CordaPersistence,
info: NodeInfo,
identityService: IdentityServiceInternal,
networkMapCache: NetworkMapCacheInternal,
nodeProperties: NodePropertiesStore): MutableList<Any> {
checkpointStorage = DBCheckpointStorage()
val metrics = MetricRegistry()
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
@ -538,8 +547,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
cordappProvider,
database,
info,
networkMapCache)
network = makeMessagingService(database, info)
networkMapCache,
nodeProperties)
network = makeMessagingService(database, info, nodeProperties)
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
@ -683,7 +693,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
_started = null
}
protected abstract fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService
protected abstract fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore): MessagingService
protected abstract fun startMessagingService(rpcOps: RPCOps)
private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair<PartyAndCertificate, KeyPair> {
@ -754,7 +764,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val cordappProvider: CordappProviderInternal,
override val database: CordaPersistence,
override val myInfo: NodeInfo,
override val networkMapCache: NetworkMapCacheInternal
override val networkMapCache: NetworkMapCacheInternal,
override val nodeProperties: NodePropertiesStore
) : SingletonSerializeAsToken(), ServiceHubInternal, StateLoader by validatedTransactions {
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()

View File

@ -29,6 +29,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
import java.io.InputStream
@ -168,6 +169,9 @@ internal class CordaRPCOpsImpl(
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachine<T> {
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
if (isFlowsDrainingModeEnabled()) {
throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
}
return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow()
}
@ -287,6 +291,14 @@ internal class CordaRPCOpsImpl(
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
}
override fun setFlowsDrainingModeEnabled(enabled: Boolean) {
services.nodeProperties.flowsDrainingMode.setEnabled(enabled)
}
override fun isFlowsDrainingModeEnabled(): Boolean {
return services.nodeProperties.flowsDrainingMode.isEnabled()
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
}
@ -305,7 +317,7 @@ internal class CordaRPCOpsImpl(
is InvocationOrigin.RPC -> FlowInitiator.RPC(principal)
is InvocationOrigin.Peer -> services.identityService.wellKnownPartyFromX500Name((origin as InvocationOrigin.Peer).party)?.let { FlowInitiator.Peer(it) } ?: throw IllegalStateException("Unknown peer with name ${(origin as InvocationOrigin.Peer).party}.")
is InvocationOrigin.Service -> FlowInitiator.Service(principal)
is InvocationOrigin.Shell -> FlowInitiator.Shell
InvocationOrigin.Shell -> FlowInitiator.Shell
is InvocationOrigin.Scheduled -> FlowInitiator.Scheduled((origin as InvocationOrigin.Scheduled).scheduledState)
}
}

View File

@ -2,12 +2,23 @@ package net.corda.node.internal
interface LifecycleSupport : Startable, Stoppable
interface Stoppable {
interface Stoppable : AutoCloseable {
fun stop()
override fun close() = stop()
}
interface Startable {
fun start()
val started: Boolean
}
interface Connectable {
val connected: Boolean
fun connect()
fun disconnect()
}

View File

@ -22,6 +22,7 @@ import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.SecurityConfiguration
@ -145,7 +146,7 @@ open class Node(configuration: NodeConfiguration,
private var shutdownHook: ShutdownHook? = null
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService {
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore): MessagingService {
// Construct security manager reading users data either from the 'security' config section
// if present or from rpcUsers list if the former is missing from config.
val securityManagerConfig = configuration.security?.authService ?:

View File

@ -0,0 +1,13 @@
package net.corda.node.internal
interface NodeUniqueIdProvider {
val value: String
}
// this is stubbed because we still do not support clustered node setups.
// the moment we will, this will have to be changed to return a value unique for each physical node.
internal object StubbedNodeUniqueIdProvider : NodeUniqueIdProvider {
// TODO implement to return a value unique for each physical node when we will support clustered node setups.
override val value: String = "NABOB"
}

View File

@ -166,6 +166,12 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
implementation.vaultTrackByWithSorting(contractStateType, criteria, sorting)
}
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = guard("setFlowsDrainingModeEnabled") {
implementation.setFlowsDrainingModeEnabled(enabled)
}
override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled)
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action)

View File

@ -0,0 +1,94 @@
package net.corda.node.internal.artemis
import net.corda.node.internal.Connectable
import net.corda.node.internal.LifecycleSupport
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Observable
import rx.subjects.PublishSubject
interface ReactiveArtemisConsumer : LifecycleSupport, Connectable {
val messages: Observable<ClientMessage>
companion object {
fun multiplex(createSession: () -> ClientSession, queueName: String, filter: String? = null, vararg queueNames: String): ReactiveArtemisConsumer {
return MultiplexingReactiveArtemisConsumer(setOf(queueName, *queueNames), createSession, filter)
}
fun multiplex(queueNames: Set<String>, createSession: () -> ClientSession, filter: String? = null): ReactiveArtemisConsumer {
return MultiplexingReactiveArtemisConsumer(queueNames, createSession, filter)
}
}
}
private class MultiplexingReactiveArtemisConsumer(private val queueNames: Set<String>, private val createSession: () -> ClientSession, private val filter: String?) : ReactiveArtemisConsumer {
private var startedFlag = false
override var connected = false
override val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
private val consumers = mutableSetOf<ClientConsumer>()
private val sessions = mutableSetOf<ClientSession>()
override fun start() {
synchronized(this) {
require(!startedFlag)
connect()
startedFlag = true
}
}
override fun stop() {
synchronized(this) {
if(startedFlag) {
disconnect()
startedFlag = false
}
messages.onCompleted()
}
}
override fun connect() {
synchronized(this) {
require(!connected)
queueNames.forEach { queue ->
createSession().apply {
start()
consumers += filter?.let { createConsumer(queue, it) } ?: createConsumer(queue)
sessions += this
}
}
consumers.forEach { consumer ->
consumer.setMessageHandler { message ->
messages.onNext(message)
}
}
connected = true
}
}
override fun disconnect() {
synchronized(this) {
if(connected) {
consumers.forEach(ClientConsumer::close)
sessions.forEach(ClientSession::close)
consumers.clear()
sessions.clear()
connected = false
}
}
}
override val started: Boolean
get() = startedFlag
}

View File

@ -1,4 +1,4 @@
package net.corda.core.internal.schemas
package net.corda.node.internal.schemas
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.PartyAndCertificate
@ -9,7 +9,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.NetworkHostAndPort
import java.io.Serializable
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import javax.persistence.*
object NodeInfoSchema
@ -17,7 +17,7 @@ object NodeInfoSchema
object NodeInfoSchemaV1 : MappedSchema(
schemaFamily = NodeInfoSchema.javaClass,
version = 1,
mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java)
mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java, NodePropertiesPersistentStore.DBNodeProperty::class.java)
) {
override val migrationResource = "node-info.changelog-master"
@ -36,7 +36,7 @@ object NodeInfoSchemaV1 : MappedSchema(
@Column(name = "addresses")
@OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true)
@JoinColumn(name = "node_info_id", foreignKey = ForeignKey(name = "FK__info_hosts__infos"))
val addresses: List<NodeInfoSchemaV1.DBHostAndPort>,
val addresses: List<DBHostAndPort>,
@Column(name = "legal_identities_certs")
@ManyToMany(cascade = arrayOf(CascadeType.ALL))

View File

@ -180,7 +180,7 @@ private object RPCPermissionResolver : PermissionResolver {
// Leaving empty set of targets and actions to match everything
return RPCPermission()
}
else -> throw IllegalArgumentException("Unkwnow permission action specifier: $action")
else -> throw IllegalArgumentException("Unknown permission action specifier: $action")
}
}
}

View File

@ -0,0 +1,17 @@
package net.corda.node.services.api
import rx.Observable
interface NodePropertiesStore {
val flowsDrainingMode: FlowsDrainingModeOperations
interface FlowsDrainingModeOperations {
fun setEnabled(enabled: Boolean)
fun isEnabled(): Boolean
val values: Observable<Pair<Boolean, Boolean>>
}
}

View File

@ -65,6 +65,7 @@ interface ServiceHubInternal : ServiceHub {
val networkService: MessagingService
val database: CordaPersistence
val configuration: NodeConfiguration
val nodeProperties: NodePropertiesStore
val networkMapUpdater: NetworkMapUpdater
override val cordappProvider: CordappProviderInternal
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {

View File

@ -17,6 +17,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigT
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import java.net.URL
import java.nio.file.Path
import java.time.Duration
import java.util.*
@ -54,6 +55,8 @@ interface NodeConfiguration : NodeSSLConfiguration {
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
val graphiteOptions: GraphiteOptions? get() = null
// do not change this value without syncing it with ScheduledFlowsDrainingModeTest
val drainingModePollPeriod: Duration get() = Duration.ofSeconds(5)
fun validate(): List<String>

View File

@ -24,12 +24,14 @@ import net.corda.core.utilities.trace
import net.corda.node.internal.CordaClock
import net.corda.node.internal.MutableClock
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchedulerService
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.*
@ -59,6 +61,8 @@ class NodeSchedulerService(private val clock: CordaClock,
private val stateLoader: StateLoader,
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val flowLogicRefFactory: FlowLogicRefFactory,
private val nodeProperties: NodePropertiesStore,
private val drainingModePollPeriod: Duration,
private val log: Logger = staticLog,
private val scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
: SchedulerService, SingletonSerializeAsToken() {
@ -282,10 +286,19 @@ class NodeSchedulerService(private val clock: CordaClock,
scheduledStatesQueue.add(newState)
} else {
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
log.trace { "Scheduler starting FlowLogic $flowLogic" }
scheduledFlow = flowLogic
scheduledStates.remove(scheduledState.ref)
scheduledStatesQueue.remove(scheduledState)
scheduledFlow = when {
nodeProperties.flowsDrainingMode.isEnabled() -> {
log.warn("Ignoring scheduled flow start because of draining mode. FlowLogic: $flowLogic.")
awaitWithDeadline(clock, Instant.now() + drainingModePollPeriod)
null
}
else -> {
log.trace { "Scheduler starting FlowLogic $flowLogic" }
scheduledStates.remove(scheduledState.ref)
scheduledStatesQueue.remove(scheduledState)
flowLogic
}
}
}
}
// and schedule the next one
@ -305,4 +318,4 @@ class NodeSchedulerService(private val clock: CordaClock,
null
}
}
}
}

View File

@ -93,6 +93,8 @@ interface MessagingService {
/**
* Returns an initialised [Message] with the current time, etc, already filled in.
*
* @param topicSession identifier for the topic and session the message is sent to.
* @param additionalProperties optional additional message headers.
* @param topic identifier for the topic the message is sent to.
*/
fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom())): Message

View File

@ -10,7 +10,7 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NotaryInfo
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.schemas.NodeInfoSchemaV1
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService

View File

@ -0,0 +1,67 @@
package net.corda.node.services.persistence
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.NodePropertiesStore.FlowsDrainingModeOperations
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.slf4j.Logger
import rx.subjects.PublishSubject
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
/**
* Simple node properties key value store in DB.
*/
class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, persistence: CordaPersistence) : NodePropertiesStore {
private companion object {
val logger = loggerFor<NodePropertiesStore>()
}
override val flowsDrainingMode: FlowsDrainingModeOperations = FlowsDrainingModeOperationsImpl(readPhysicalNodeId, persistence, logger)
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}properties")
class DBNodeProperty(
@Id
@Column(name = "key")
val key: String = "",
@Column(name = "value")
var value: String? = ""
)
}
private class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private val persistence: CordaPersistence, logger: Logger) : FlowsDrainingModeOperations {
private val nodeSpecificFlowsExecutionModeKey = "${readPhysicalNodeId()}_flowsExecutionMode"
init {
logger.debug { "Node's flow execution mode property key: $nodeSpecificFlowsExecutionModeKey" }
}
private val map = PersistentMap({ key -> key }, { entity -> entity.key to entity.value!! }, NodePropertiesPersistentStore::DBNodeProperty, NodePropertiesPersistentStore.DBNodeProperty::class.java)
override val values = PublishSubject.create<Pair<Boolean, Boolean>>()!!
override fun setEnabled(enabled: Boolean) {
var oldValue: Boolean? = null
persistence.transaction {
oldValue = map.put(nodeSpecificFlowsExecutionModeKey, enabled.toString())?.toBoolean() ?: false
}
values.onNext(oldValue!! to enabled)
}
override fun isEnabled(): Boolean {
return persistence.transaction {
map[nodeSpecificFlowsExecutionModeKey]?.toBoolean() ?: false
}
}
}

View File

@ -3,7 +3,7 @@ package net.corda.node.services.schema
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.LinearState
import net.corda.core.internal.schemas.NodeInfoSchemaV1
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState

View File

@ -0,0 +1,153 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.SchedulableFlow
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.StartedNode
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Instant
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.reflect.jvm.jvmName
import kotlin.test.fail
class ScheduledFlowsDrainingModeTest {
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: StartedNode<InternalMockNetwork.MockNode>
private lateinit var bobNode: StartedNode<InternalMockNetwork.MockNode>
private lateinit var notary: Party
private lateinit var alice: Party
private lateinit var bob: Party
private var executor: ScheduledExecutorService? = null
companion object {
private val logger = loggerFor<ScheduledFlowsDrainingModeTest>()
}
@Before
fun setup() {
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.testing.contracts"))
aliceNode = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME))
bobNode = mockNet.createNode(MockNodeParameters(legalName = BOB_NAME))
notary = mockNet.defaultNotaryIdentity
alice = aliceNode.info.singleIdentity()
bob = bobNode.info.singleIdentity()
executor = Executors.newSingleThreadScheduledExecutor()
}
@After
fun cleanUp() {
mockNet.stopNodes()
executor!!.shutdown()
}
@Test
fun `flows draining mode ignores scheduled flows until unset`() {
val latch = CountDownLatch(1)
var shouldFail = true
aliceNode.services.nodeProperties.flowsDrainingMode.setEnabled(true)
val scheduledStates = aliceNode.services
.vaultService
.updates
.filter { update -> update.containsType<ScheduledState>() }
.map { update -> update.produced.single().state.data as ScheduledState }
scheduledStates.filter { state -> !state.processed }.doOnNext { _ ->
// this is needed because there is a delay between the moment a SchedulableState gets in the Vault and the first time nextScheduledActivity is called
executor!!.schedule({
logger.info("Disabling flows draining mode")
shouldFail = false
aliceNode.services.nodeProperties.flowsDrainingMode.setEnabled(false)
}, 5, TimeUnit.SECONDS)
}.subscribe()
scheduledStates.filter { state -> state.processed }.doOnNext { _ ->
if (shouldFail) {
fail("Should not have happened before draining is switched off.")
}
latch.countDown()
}.subscribe()
val flow = aliceNode.services.startFlow(InsertInitialStateFlow(bob, notary))
flow.resultFuture.getOrThrow()
mockNet.waitQuiescent()
latch.await()
}
data class ScheduledState(private val creationTime: Instant, val source: Party, val destination: Party, val processed: Boolean = false, override val linearId: UniqueIdentifier = UniqueIdentifier()) : SchedulableState, LinearState {
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
return if (!processed) {
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.jvmName, thisStateRef)
ScheduledActivity(logicRef, creationTime)
} else {
null
}
}
override val participants: List<Party> get() = listOf(source, destination)
}
class InsertInitialStateFlow(private val destination: Party, private val notary: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val scheduledState = ScheduledState(serviceHub.clock.instant(), ourIdentity, destination)
val builder = TransactionBuilder(notary).addOutputState(scheduledState, DummyContract.PROGRAM_ID).addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx))
}
}
@SchedulableFlow
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
val scheduledState = state.state.data
// Only run flow over states originating on this node
if (!serviceHub.myInfo.isLegalIdentity(scheduledState.source)) {
return
}
require(!scheduledState.processed) { "State should not have been previously processed" }
val notary = state.state.notary
val newStateOutput = scheduledState.copy(processed = true)
val builder = TransactionBuilder(notary).addInputState(state).addOutputState(newStateOutput, DummyContract.PROGRAM_ID).addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
}
}
}

View File

@ -6,8 +6,9 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.node.internal.security.Password
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.tryAuthenticate
import net.corda.nodeapi.internal.config.User
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.config.SecurityConfiguration
import net.corda.nodeapi.internal.config.User
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import javax.security.auth.login.FailedLoginException
@ -30,8 +31,8 @@ class RPCSecurityManagerTest {
checkUserActions(
permitted = setOf(arrayListOf("nodeInfo"), arrayListOf("notaryIdentities")),
permissions = setOf(
Permissions.invokeRpc(CordaRPCOps::nodeInfo),
Permissions.invokeRpc(CordaRPCOps::notaryIdentities)))
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::notaryIdentities)))
}
@Test
@ -46,7 +47,7 @@ class RPCSecurityManagerTest {
@Test
fun `Check startFlow RPC permission implies startFlowDynamic`() {
checkUserActions(
permissions = setOf(Permissions.invokeRpc("startFlow")),
permissions = setOf(invokeRpc("startFlow")),
permitted = setOf(arrayListOf("startFlow"), arrayListOf("startFlowDynamic")))
}
@ -54,7 +55,7 @@ class RPCSecurityManagerTest {
fun `Check startTrackedFlow RPC permission implies startTrackedFlowDynamic`() {
checkUserActions(
permitted = setOf(arrayListOf("startTrackedFlow"), arrayListOf("startTrackedFlowDynamic")),
permissions = setOf(Permissions.invokeRpc("startTrackedFlow")))
permissions = setOf(invokeRpc("startTrackedFlow")))
}
@Test
@ -64,6 +65,18 @@ class RPCSecurityManagerTest {
permitted = allActions.map { arrayListOf(it) }.toSet())
}
@Test
fun `flows draining mode permissions`() {
checkUserActions(
permitted = setOf(arrayListOf("setFlowsDrainingModeEnabled")),
permissions = setOf(invokeRpc(CordaRPCOps::setFlowsDrainingModeEnabled))
)
checkUserActions(
permitted = setOf(arrayListOf("isFlowsDrainingModeEnabled")),
permissions = setOf(invokeRpc(CordaRPCOps::isFlowsDrainingModeEnabled))
)
}
@Test
fun `Malformed permission strings`() {
assertMalformedPermission("bar")
@ -131,11 +144,11 @@ class RPCSecurityManagerTest {
val call = request.first()
val args = request.drop(1).toTypedArray()
assert(subject.isPermitted(request.first(), *args)) {
"User ${subject.principal} should be permitted ${call} with target '${request.toList()}'"
"User ${subject.principal} should be permitted $call with target '${request.toList()}'"
}
if (args.isEmpty()) {
assert(subject.isPermitted(request.first(), "XXX")) {
"User ${subject.principal} should be permitted ${call} with any target"
"User ${subject.principal} should be permitted $call with any target"
}
}
}

View File

@ -11,6 +11,7 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.node.StateLoader
import net.corda.core.utilities.days
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.NodePropertiesStore
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.internal.doLookup
@ -22,6 +23,7 @@ import org.junit.rules.TestWatcher
import org.junit.runner.Description
import org.slf4j.Logger
import java.time.Clock
import java.time.Duration
import java.time.Instant
class NodeSchedulerServiceTest {
@ -38,6 +40,12 @@ class NodeSchedulerServiceTest {
private val flowStarter = rigorousMock<FlowStarter>().also {
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
}
private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also {
doReturn(false).whenever(it).isEnabled()
}
private val nodeProperties = rigorousMock<NodePropertiesStore>().also {
doReturn(flowsDraingMode).whenever(it).flowsDrainingMode
}
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
private val stateLoader = rigorousMock<StateLoader>().also {
doLookup(transactionStates).whenever(it).loadState(any())
@ -56,6 +64,8 @@ class NodeSchedulerServiceTest {
flowStarter,
stateLoader,
flowLogicRefFactory = flowLogicRefFactory,
nodeProperties = nodeProperties,
drainingModePollPeriod = Duration.ofSeconds(5),
log = log,
scheduledStates = mutableMapOf()).apply { start() }
@Rule
@ -171,4 +181,4 @@ class NodeSchedulerServiceTest {
scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef)
testClock.advanceBy(1.days)
}
}
}

View File

@ -27,6 +27,7 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.subjects.PublishSubject
import java.net.ServerSocket
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
@ -276,6 +277,8 @@ class ArtemisMessagingTest {
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message)
}
startNodeMessagingClient()
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread(isDaemon = true) { messagingClient.run() }

View File

@ -30,6 +30,7 @@ import net.corda.node.VersionInfo
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.*
import net.corda.node.services.keys.E2ETestKeyManagementService
@ -259,7 +260,7 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService {
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore): MessagingService {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(
!mockNet.threadPerNode,