diff --git a/.gitignore b/.gitignore index b408dd575d..cd3b549080 100644 --- a/.gitignore +++ b/.gitignore @@ -38,7 +38,6 @@ lib/quasar.jar .idea/markdown-navigator .idea/runConfigurations .idea/dictionaries -.idea/codeStyles/ # Include the -parameters compiler option by default in IntelliJ required for serialization. !.idea/compiler.xml diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000000..e6da17c0e7 --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,43 @@ + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000000..79ee123c2b --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 9208ede561..53159d93be 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -49,6 +49,10 @@ + + + + @@ -318,6 +322,8 @@ + + diff --git a/docs/source/deploying-a-node.rst b/docs/source/deploying-a-node.rst index 453e1638b0..7b18a10351 100644 --- a/docs/source/deploying-a-node.rst +++ b/docs/source/deploying-a-node.rst @@ -35,9 +35,11 @@ handling, and ensures the Corda service is run at boot. 6. Save the below as ``/opt/corda/node.conf``. See :doc:`corda-configuration-file` for a description of these options:: - basedir : "/opt/corda" p2pAddress : "example.com:10002" - rpcAddress : "example.com:10003" + rpcSettings { + address: "example.com:10003" + adminAddress: "example.com:10004" + } h2port : 11000 emailAddress : "you@example.com" myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB" @@ -56,18 +58,19 @@ handling, and ensures the Corda service is run at boot. 7. Make the following changes to ``/opt/corda/node.conf``: - * Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address. - This is the address other nodes or RPC interfaces will use to communicate with your node - * Change the ports if necessary, for example if you are running multiple nodes on one server (see below) + * Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match + your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to + communicate with your node. + * Change the ports if necessary, for example if you are running multiple nodes on one server (see below). * Enter an email address which will be used as an administrative contact during the registration process. This is - only visible to the permissioning service + only visible to the permissioning service. * Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely - change as it should represent the legal identity of your node + change as it should represent the legal identity of your node. * Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea) * Location (``L=``) is your nearest city * Country (``C=``) is the `ISO 3166-1 alpha-2 code `_ - * Change the RPC username and password + * Change the RPC username and password. .. note:: Ubuntu 16.04 and most current Linux distributions use SystemD, so if you are running one of these distributions follow the steps marked **SystemD**. @@ -202,15 +205,16 @@ at boot, and means the Corda service stays running with no users connected to th 3. Save the below as ``C:\Corda\node.conf``. See :doc:`corda-configuration-file` for a description of these options:: - basedir : "C:\\Corda" p2pAddress : "example.com:10002" - rpcAddress : "example.com:10003" + rpcSettings { + address: "example.com:10003" + adminAddress: "example.com:10004" + } h2port : 11000 emailAddress: "you@example.com" myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB" keyStorePassword : "cordacadevpass" trustStorePassword : "trustpass" - extraAdvertisedServiceIds: [ "" ] devMode : false rpcUsers=[ { @@ -224,18 +228,19 @@ at boot, and means the Corda service stays running with no users connected to th 4. Make the following changes to ``C:\Corda\node.conf``: - * Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address. - This is the address other nodes or RPC interfaces will use to communicate with your node - * Change the ports if necessary, for example if you are running multiple nodes on one server (see below) + * Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match + your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to + communicate with your node. + * Change the ports if necessary, for example if you are running multiple nodes on one server (see below). * Enter an email address which will be used as an administrative contact during the registration process. This is - only visible to the permissioning service + only visible to the permissioning service. * Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely - change as it should represent the legal identity of your node + change as it should represent the legal identity of your node. * Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea) * Location (``L=``) is your nearest city * Country (``C=``) is the `ISO 3166-1 alpha-2 code `_ - * Change the RPC username and password + * Change the RPC username and password. 5. Copy the required Java keystores to the node. See :doc:`permissioning` diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index aa619cb5bc..8c7eefdf33 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -120,14 +120,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration, cacheFactoryPrototype: BindableNamedCacheFactory, protected val versionInfo: VersionInfo, protected val flowManager: FlowManager, - protected val serverThread: AffinityExecutor.ServiceAffinityExecutor, - protected val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { + val serverThread: AffinityExecutor.ServiceAffinityExecutor, + val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { protected abstract val log: Logger @Suppress("LeakingThis") private var tokenizableServices: MutableList? = mutableListOf(platformClock, this) - protected val metricRegistry = MetricRegistry() + val metricRegistry = MetricRegistry() protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize() val monitoringService = MonitoringService(metricRegistry).tokenize() @@ -144,7 +144,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - protected val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo) + val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize() val identityService = PersistentIdentityService(cacheFactory).tokenize() val database: CordaPersistence = createCordaPersistence( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 95ec986c19..d941b83c23 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -33,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor @@ -97,8 +96,6 @@ class MultiThreadedStateMachineManager( val timedFlows = ConcurrentHashMap() } - override val flowHospital: StaffedFlowHospital = StaffedFlowHospital() - private val concurrentBox = ConcurrentBox(InnerState()) private val scheduler = FiberExecutorScheduler("Flow fiber scheduler", executor) @@ -111,17 +108,18 @@ class MultiThreadedStateMachineManager( private val sessionToFlow = ConcurrentHashMap() private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null - private val transitionExecutor = makeTransitionExecutor() private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: CheckpointSerializationContext? = null private var tokenizableServices: List? = null private var actionExecutor: ActionExecutor? = null + override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID) + private val transitionExecutor = makeTransitionExecutor() + override val allStateMachines: List> get() = concurrentBox.content.flows.values.map { it.fiber.logic } - private val totalStartedFlows = metrics.counter("Flows.Started") private val totalFinishedFlows = metrics.counter("Flows.Finished") private val totalSuccessFlows = metrics.counter("Flows.Success") @@ -214,7 +212,7 @@ class MultiThreadedStateMachineManager( invocationContext = context, flowLogic = flowLogic, flowStart = FlowStart.Explicit, - ourIdentity = ourIdentity ?: getOurFirstIdentity(), + ourIdentity = ourIdentity ?: ourFirstIdentity, deduplicationHandler = deduplicationHandler, isStartIdempotent = false ) @@ -408,21 +406,19 @@ class MultiThreadedStateMachineManager( } private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) { - val message: ReceivedMessage = event.receivedMessage - val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler - val peer = message.peer + val peer = event.receivedMessage.peer val sessionMessage = try { - message.data.deserialize() + event.receivedMessage.data.deserialize() } catch (ex: Exception) { logger.error("Received corrupt SessionMessage data from $peer") - deduplicationHandler.afterDatabaseTransaction() + event.deduplicationHandler.afterDatabaseTransaction() return } val sender = serviceHub.networkMapCache.getPeerByLegalName(peer) if (sender != null) { when (sessionMessage) { - is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender) - is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender) + is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender) + is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event) } } else { logger.error("Unknown peer $peer in $sessionMessage") @@ -453,13 +449,8 @@ class MultiThreadedStateMachineManager( } } - private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) { - fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage { - val errorId = secureRandom.nextLong() - val payload = RejectSessionMessage(message, errorId) - return ExistingSessionMessage(initiatorSessionId, payload) - } - val replyError = try { + private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) { + try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) val senderSession = FlowSessionImpl(sender, initiatedSessionId) @@ -469,40 +460,34 @@ class MultiThreadedStateMachineManager( is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName) } val senderCoreFlowVersion = when (initiatedFlowFactory) { - is InitiatedFlowFactory.Core -> senderPlatformVersion + is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion is InitiatedFlowFactory.CorDapp -> null } - startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) - null - } catch (exception: Exception) { - logger.warn("Exception while creating initiated flow", exception) - createErrorMessage( - sessionMessage.initiatorSessionId, - (exception as? SessionRejectException)?.message ?: "Unable to establish session" - ) - } - - if (replyError != null) { - flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) - deduplicationHandler.afterDatabaseTransaction() + startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) + } catch (t: Throwable) { + logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " + + "flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t) + flowHospital.sessionInitErrored(sessionMessage, sender, event, t) } } // TODO this is a temporary hack until we figure out multiple identities - private fun getOurFirstIdentity(): Party { - return serviceHub.myInfo.legalIdentities[0] - } + private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0] private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> { - val initiatingFlowClass = try { - Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java) + val initiatorClass = try { + Class.forName(message.initiatorFlowClassName, true, classloader) } catch (e: ClassNotFoundException) { - throw SessionRejectException("Don't know ${message.initiatorFlowClassName}") - } catch (e: ClassCastException) { - throw SessionRejectException("${message.initiatorFlowClassName} is not a flow") + throw SessionRejectException.UnknownClass(message.initiatorFlowClassName) } - return serviceHub.getFlowFactory(initiatingFlowClass) ?: - throw SessionRejectException("$initiatingFlowClass is not registered") + + val initiatorFlowClass = try { + initiatorClass.asSubclass(FlowLogic::class.java) + } catch (e: ClassCastException) { + throw SessionRejectException.NotAFlow(initiatorClass) + } + + return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass) } private fun startInitiatedFlow( @@ -515,7 +500,7 @@ class MultiThreadedStateMachineManager( initiatedFlowInfo: FlowInfo ) { val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo) - val ourIdentity = getOurFirstIdentity() + val ourIdentity = ourFirstIdentity startFlowInternal( InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity, initiatingMessageDeduplicationHandler, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt index cea4680411..55282c558c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt @@ -1,8 +1,16 @@ package net.corda.node.services.statemachine import net.corda.core.CordaException +import net.corda.core.flows.FlowLogic /** * An exception propagated and thrown in case a session initiation fails. */ -class SessionRejectException(message: String) : CordaException(message) +open class SessionRejectException(message: String) : CordaException(message) { + class UnknownClass(val initiatorFlowClassName: String) : SessionRejectException("Don't know $initiatorFlowClassName") + + class NotAFlow(val initiatorClass: Class<*>) : SessionRejectException("${initiatorClass.name} is not a flow") + + class NotRegistered(val initiatorFlowClass: Class>) : SessionRejectException("${initiatorFlowClass.name} is not registered") +} + diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 60fb528bcb..112b65ace0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -18,7 +18,8 @@ import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed -import net.corda.core.serialization.* +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointDeserialize @@ -32,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine @@ -92,8 +92,6 @@ class SingleThreadedStateMachineManager( val timedFlows = HashMap() } - override val flowHospital: StaffedFlowHospital = StaffedFlowHospital() - private val mutex = ThreadBox(InnerState()) private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor) private val timeoutScheduler = Executors.newScheduledThreadPool(1) @@ -104,12 +102,14 @@ class SingleThreadedStateMachineManager( private val sessionToFlow = ConcurrentHashMap() private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null - private val transitionExecutor = makeTransitionExecutor() private val ourSenderUUID = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: CheckpointSerializationContext? = null private var actionExecutor: ActionExecutor? = null + override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID) + private val transitionExecutor = makeTransitionExecutor() + override val allStateMachines: List> get() = mutex.locked { flows.values.map { it.fiber.logic } } @@ -204,7 +204,7 @@ class SingleThreadedStateMachineManager( invocationContext = context, flowLogic = flowLogic, flowStart = FlowStart.Explicit, - ourIdentity = ourIdentity ?: getOurFirstIdentity(), + ourIdentity = ourIdentity ?: ourFirstIdentity, deduplicationHandler = deduplicationHandler, isStartIdempotent = false ) @@ -402,23 +402,23 @@ class SingleThreadedStateMachineManager( } private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) { - val message: ReceivedMessage = event.receivedMessage - val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler - val peer = message.peer + val peer = event.receivedMessage.peer val sessionMessage = try { - message.data.deserialize() + event.receivedMessage.data.deserialize() } catch (ex: Exception) { logger.error("Received corrupt SessionMessage data from $peer") - deduplicationHandler.afterDatabaseTransaction() + event.deduplicationHandler.afterDatabaseTransaction() return } val sender = serviceHub.networkMapCache.getPeerByLegalName(peer) if (sender != null) { when (sessionMessage) { - is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender) - is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender) + is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender) + is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event) } } else { + // TODO Send the event to the flow hospital to be retried on network map update + // TODO Test that restarting the node attempts to retry logger.error("Unknown peer $peer in $sessionMessage") } } @@ -448,14 +448,8 @@ class SingleThreadedStateMachineManager( } } - private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) { - fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage { - val errorId = secureRandom.nextLong() - val payload = RejectSessionMessage(message, errorId) - return ExistingSessionMessage(initiatorSessionId, payload) - } - - val replyError = try { + private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) { + try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) val senderSession = FlowSessionImpl(sender, initiatedSessionId) @@ -465,40 +459,34 @@ class SingleThreadedStateMachineManager( is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName) } val senderCoreFlowVersion = when (initiatedFlowFactory) { - is InitiatedFlowFactory.Core -> senderPlatformVersion + is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion is InitiatedFlowFactory.CorDapp -> null } - startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) - null - } catch (exception: Exception) { - logger.warn("Exception while creating initiated flow", exception) - createErrorMessage( - sessionMessage.initiatorSessionId, - (exception as? SessionRejectException)?.message ?: "Unable to establish session" - ) - } - - if (replyError != null) { - flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) - deduplicationHandler.afterDatabaseTransaction() + startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) + } catch (t: Throwable) { + logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " + + "flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t) + flowHospital.sessionInitErrored(sessionMessage, sender, event, t) } } // TODO this is a temporary hack until we figure out multiple identities - private fun getOurFirstIdentity(): Party { - return serviceHub.myInfo.legalIdentities[0] - } + private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0] private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> { - val initiatingFlowClass = try { - Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java) + val initiatorClass = try { + Class.forName(message.initiatorFlowClassName, true, classloader) } catch (e: ClassNotFoundException) { - throw SessionRejectException("Don't know ${message.initiatorFlowClassName}") - } catch (e: ClassCastException) { - throw SessionRejectException("${message.initiatorFlowClassName} is not a flow") + throw SessionRejectException.UnknownClass(message.initiatorFlowClassName) } - return serviceHub.getFlowFactory(initiatingFlowClass) - ?: throw SessionRejectException("$initiatingFlowClass is not registered") + + val initiatorFlowClass = try { + initiatorClass.asSubclass(FlowLogic::class.java) + } catch (e: ClassCastException) { + throw SessionRejectException.NotAFlow(initiatorClass) + } + + return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass) } private fun startInitiatedFlow( @@ -511,7 +499,7 @@ class SingleThreadedStateMachineManager( initiatedFlowInfo: FlowInfo ) { val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo) - val ourIdentity = getOurFirstIdentity() + val ourIdentity = ourFirstIdentity startFlowInternal( InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity, initiatingMessageDeduplicationHandler, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 512497b6b9..f29749566c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -1,6 +1,8 @@ package net.corda.node.services.statemachine +import net.corda.core.crypto.newSecureRandom import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox import net.corda.core.internal.TimedFlow import net.corda.core.internal.bufferUntilSubscribed @@ -16,65 +18,101 @@ import java.util.* /** * This hospital consults "staff" to see if they can automatically diagnose and treat flows. */ -class StaffedFlowHospital { +class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) { private companion object { private val log = contextLogger() private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor) } private val mutex = ThreadBox(object { - val patients = HashMap() + val flowPatients = HashMap() + val treatableSessionInits = HashMap() val recordsPublisher = PublishSubject.create() }) + private val secureRandom = newSecureRandom() - class MedicalHistory { - internal val records: MutableList = mutableListOf() - - fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean { - val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount - return records - .filterIsInstance() - .count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max + /** + * The node was unable to initiate the [InitialSessionMessage] from [sender]. + */ + fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) { + val time = Instant.now() + val id = UUID.randomUUID() + val outcome = if (error is SessionRejectException.UnknownClass) { + // We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is + // installed on restart, at which point the message will be able proceed as normal. If not then it will need + // to be dropped manually. + Outcome.OVERNIGHT_OBSERVATION + } else { + Outcome.UNTREATABLE } - override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" + val record = sessionMessage.run { MedicalRecord.SessionInit(id, time, outcome, initiatorFlowClassName, flowVersion, appName, sender, error) } + mutex.locked { + if (outcome != Outcome.UNTREATABLE) { + treatableSessionInits[id] = InternalSessionInitRecord(sessionMessage, event, record) + } + recordsPublisher.onNext(record) + } + + if (outcome == Outcome.UNTREATABLE) { + sendBackError(error, sessionMessage, sender, event) + } + } + + private fun sendBackError(error: Throwable, sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) { + val message = (error as? SessionRejectException)?.message ?: "Unable to establish session" + val payload = RejectSessionMessage(message, secureRandom.nextLong()) + val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload) + + flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) + event.deduplicationHandler.afterDatabaseTransaction() + } + + /** + * Drop the errored session-init message with the given ID ([MedicalRecord.SessionInit.id]). This will cause the node + * to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker + * so that it never gets redelivered. + */ + fun dropSessionInit(id: UUID) { + val (sessionMessage, event, publicRecord) = mutex.locked { + requireNotNull(treatableSessionInits.remove(id)) { "$id does not refer to any session init message" } + } + log.info("Errored session-init permanently dropped: $publicRecord") + sendBackError(publicRecord.error, sessionMessage, publicRecord.sender, event) } /** * The flow running in [flowFiber] has errored. */ fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + val time = Instant.now() log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") - val suspendCount = currentState.checkpoint.numberOfSuspends val event = mutex.locked { - val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() } - - val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount) - medicalHistory.records += admitted - recordsPublisher.onNext(admitted) + val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() } val report = consultStaff(flowFiber, currentState, errors, medicalHistory) - val (newRecord, event) = when (report.diagnosis) { + val (outcome, event) = when (report.diagnosis) { Diagnosis.DISCHARGE -> { log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}") - Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint) + Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint) } Diagnosis.OVERNIGHT_OBSERVATION -> { log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}") // We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart - Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null) + Pair(Outcome.OVERNIGHT_OBSERVATION, null) } Diagnosis.NOT_MY_SPECIALTY -> { // None of the staff care for these errors so we let them propagate log.info("Flow ${flowFiber.id} error allowed to propagate") - Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation) + Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation) } } - medicalHistory.records += newRecord - recordsPublisher.onNext(newRecord) + val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome) + medicalHistory.records += record + recordsPublisher.onNext(record) event } @@ -86,8 +124,9 @@ class StaffedFlowHospital { private fun consultStaff(flowFiber: FlowFiber, currentState: StateMachineState, errors: List, - medicalHistory: MedicalHistory): ConsultationReport { + medicalHistory: FlowMedicalHistory): ConsultationReport { return errors + .asSequence() .mapIndexed { index, error -> log.info("Flow ${flowFiber.id} has error [$index]", error) val diagnoses: Map> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) } @@ -105,43 +144,61 @@ class StaffedFlowHospital { * The flow has been removed from the state machine. */ fun flowRemoved(flowId: StateMachineRunId) { - mutex.locked { patients.remove(flowId) } + mutex.locked { flowPatients.remove(flowId) } } // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC /** Returns a stream of medical records as flows pass through the hospital. */ fun track(): DataFeed, MedicalRecord> { return mutex.locked { - DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed()) + val snapshot = (flowPatients.values.flatMap { it.records } + treatableSessionInits.values.map { it.publicRecord }).sortedBy { it.time } + DataFeed(snapshot, recordsPublisher.bufferUntilSubscribed()) } } - sealed class MedicalRecord { - abstract val flowId: StateMachineRunId - abstract val at: Instant - abstract val suspendCount: Int + class FlowMedicalHistory { + internal val records: MutableList = mutableListOf() - data class Admitted(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int) : MedicalRecord() + fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean { + val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends + return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max + } - data class Discharged(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int, - val by: List, - val errors: List) : MedicalRecord() - - data class KeptInForObservation(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int, - val by: List, - val errors: List) : MedicalRecord() - - data class NothingWeCanDo(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int) : MedicalRecord() + override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" } + private data class InternalSessionInitRecord(val sessionMessage: InitialSessionMessage, + val event: ExternalEvent.ExternalMessageEvent, + val publicRecord: MedicalRecord.SessionInit) + + sealed class MedicalRecord { + abstract val time: Instant + abstract val outcome: Outcome + abstract val errors: List + + /** Medical record for a flow that has errored. */ + data class Flow(override val time: Instant, + val flowId: StateMachineRunId, + val suspendCount: Int, + override val errors: List, + val by: List, + override val outcome: Outcome) : MedicalRecord() + + /** Medical record for a session initiation that was unsuccessful. */ + data class SessionInit(val id: UUID, + override val time: Instant, + override val outcome: Outcome, + val initiatorFlowClassName: String, + val flowVersion: Int, + val appName: String, + val sender: Party, + val error: Throwable) : MedicalRecord() { + override val errors: List get() = listOf(error) + } + } + + enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE } + /** The order of the enum values are in priority order. */ enum class Diagnosis { /** Retry from last safe point. */ @@ -153,14 +210,14 @@ class StaffedFlowHospital { } interface Staff { - fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis + fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis } /** * SQL Deadlock detection. */ object DeadlockNurse : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { return if (mentionsDeadlock(newError)) { Diagnosis.DISCHARGE } else { @@ -178,8 +235,8 @@ class StaffedFlowHospital { * Primary key violation detection for duplicate inserts. Will detect other constraint violations too. */ object DuplicateInsertSpecialist : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { - return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this)) { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { + return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) { Diagnosis.DISCHARGE } else { Diagnosis.NOT_MY_SPECIALTY @@ -196,9 +253,9 @@ class StaffedFlowHospital { * exceed the limit specified by the [FlowTimeoutException]. */ object DoctorTimeout : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { if (newError is FlowTimeoutException) { - if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) { + if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this, currentState)) { return Diagnosis.DISCHARGE } else { val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " + @@ -216,12 +273,18 @@ class StaffedFlowHospital { * Parks [FinalityHandler]s for observation. */ object FinalityDoctor : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { - return (currentState.flowLogic as? FinalityHandler)?.let { logic -> Diagnosis.OVERNIGHT_OBSERVATION.also { warn(logic, flowFiber, currentState) } } ?: Diagnosis.NOT_MY_SPECIALTY + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { + return if (currentState.flowLogic is FinalityHandler) { + warn(currentState.flowLogic, flowFiber, currentState) + Diagnosis.OVERNIGHT_OBSERVATION + } else { + Diagnosis.NOT_MY_SPECIALTY + } } private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) { - log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}") + log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + + "the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}") } } } diff --git a/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt b/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt index 9a09548b82..c68a0f2ad6 100644 --- a/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt @@ -10,8 +10,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.POUNDS import net.corda.finance.contracts.asset.Cash import net.corda.finance.issuedBy -import net.corda.node.services.statemachine.StaffedFlowHospital -import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation +import net.corda.node.services.statemachine.StaffedFlowHospital.* import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity @@ -72,11 +71,11 @@ class FinalityHandlerTest { val keptInForObservation = smm.flowHospital .track() .let { it.updates.startWith(it.snapshot) } - .filter { it.flowId == runId } - .ofType(KeptInForObservation::class.java) + .ofType(MedicalRecord.Flow::class.java) + .filter { it.flowId == runId && it.outcome == Outcome.OVERNIGHT_OBSERVATION } .toBlocking() .first() - assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor) + assertThat(keptInForObservation.by).contains(FinalityDoctor) } private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt new file mode 100644 index 0000000000..8a9156af4a --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt @@ -0,0 +1,166 @@ +package net.corda.node.services.statemachine + +import net.corda.core.crypto.random63BitValue +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.registerCordappFlowFactory +import net.corda.core.identity.Party +import net.corda.core.utilities.getOrThrow +import net.corda.node.services.persistence.checkpoints +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.internal.LogHelper +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.internal.* +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import rx.Observable +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class FlowFrameworkPersistenceTests { + companion object { + init { + LogHelper.setLevel("+net.corda.flow") + } + } + + private lateinit var mockNet: InternalMockNetwork + private val receivedSessionMessages = ArrayList() + private lateinit var aliceNode: TestStartedNode + private lateinit var bobNode: TestStartedNode + private lateinit var notaryIdentity: Party + private lateinit var alice: Party + private lateinit var bob: Party + private lateinit var aliceFlowManager: MockNodeFlowManager + private lateinit var bobFlowManager: MockNodeFlowManager + + @Before + fun start() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + aliceFlowManager = MockNodeFlowManager() + bobFlowManager = MockNodeFlowManager() + + aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager)) + bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager)) + + receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } + + // Extract identities + alice = aliceNode.info.singleIdentity() + bob = bobNode.info.singleIdentity() + notaryIdentity = mockNet.defaultNotaryIdentity + } + + @After + fun cleanUp() { + mockNet.stopNodes() + receivedSessionMessages.clear() + } + + @Test + fun `newly added flow is preserved on restart`() { + aliceNode.services.startFlow(NoOpFlow(nonTerminating = true)) + aliceNode.internals.acceptableLiveFiberCountOnStop = 1 + val restoredFlow = aliceNode.restartAndGetRestoredFlow() + assertThat(restoredFlow.flowStarted).isTrue() + } + + @Test + fun `flow restarted just after receiving payload`() { + bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) + .nonTerminating() } + aliceNode.services.startFlow(SendFlow("Hello", bob)) + + // We push through just enough messages to get only the payload sent + bobNode.pumpReceive() + bobNode.internals.disableDBCloseOnStop() + bobNode.internals.acceptableLiveFiberCountOnStop = 1 + bobNode.dispose() + mockNet.runNetwork() + val restoredFlow = bobNode.restartAndGetRestoredFlow() + assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") + } + + @Test + fun `flow loaded from checkpoint will respond to messages from before start`() { + aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } + bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow + val restoredFlow = bobNode.restartAndGetRestoredFlow() + assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") + } + + @Ignore("Some changes in startup order make this test's assumptions fail.") + @Test + fun `flow with send will resend on interrupted restart`() { + val payload = random63BitValue() + val payload2 = random63BitValue() + + var sentCount = 0 + mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ } + val charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) + val secondFlow = charlieNode.registerCordappFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) } + mockNet.runNetwork() + val charlie = charlieNode.info.singleIdentity() + + // Kick off first send and receive + bobNode.services.startFlow(PingPongFlow(charlie, payload)) + bobNode.database.transaction { + assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) + } + // Make sure the add() has finished initial processing. + bobNode.internals.disableDBCloseOnStop() + // Restart node and thus reload the checkpoint and resend the message with same UUID + bobNode.dispose() + bobNode.database.transaction { + assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint + bobNode.services.networkMapCache.clearNetworkMapCache() + } + val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id)) + bobNode.internals.manuallyCloseDB() + val (firstAgain, fut1) = node2b.getSingleFlow() + // Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync. + mockNet.runNetwork() + fut1.getOrThrow() + + val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer } + // Check flows completed cleanly and didn't get out of phase + assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way + // can't give a precise value as every addMessageHandler re-runs the undelivered messages + assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") + node2b.database.transaction { + assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") + } + charlieNode.database.transaction { + assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") + } + assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3") + assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3") + assertEquals(payload, secondFlow.getOrThrow().receivedPayload, "Received payload does not match the (restarted) first value on Node 2") + assertEquals(payload + 1, secondFlow.getOrThrow().receivedPayload2, "Received payload does not match the expected second value on Node 2") + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + //region Helpers + + private inline fun > TestStartedNode.restartAndGetRestoredFlow(): P { + val newNode = mockNet.restartNode(this) + newNode.internals.acceptableLiveFiberCountOnStop = 1 + mockNet.runNetwork() + return newNode.getSingleFlow

().first + } + + private fun receivedSessionMessagesObservable(): Observable { + return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + } + + //endregion Helpers +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index d1517dc318..23b919eb66 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.concurrent.Semaphore +import net.corda.client.rpc.notUsed import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef @@ -39,16 +40,13 @@ import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType import org.junit.After import org.junit.Before -import org.junit.Ignore import org.junit.Test import rx.Notification import rx.Observable import java.time.Instant import java.util.* import kotlin.reflect.KClass -import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertTrue class FlowFrameworkTests { companion object { @@ -68,7 +66,7 @@ class FlowFrameworkTests { @Before fun setUpMockNet() { mockNet = InternalMockNetwork( - cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), + cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), servicePeerAllocationStrategy = RoundRobin() ) @@ -100,8 +98,8 @@ class FlowFrameworkTests { assertThat(flow.lazyTime).isNotNull() } - class SuspendThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor { - var thrown = false + class SuspendThrowingActionExecutor(private val exception: Exception, private val delegate: ActionExecutor) : ActionExecutor { + private var thrown = false @Suspendable override fun executeAction(fiber: FlowFiber, action: Action) { if (action is Action.CommitTransaction && !thrown) { @@ -367,10 +365,17 @@ class FlowFrameworkTests { } @Test - fun `unknown class in session init`() { + fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() { aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob) mockNet.runNetwork() - assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected + assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital + val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot + assertThat(medicalRecords).hasSize(1) + val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit + assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class") + bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender + mockNet.runNetwork() + assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class") } @@ -441,334 +446,142 @@ class FlowFrameworkTests { private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) - private fun TestStartedNode.sendSessionMessage(message: SessionMessage, destination: Party) { - services.networkService.apply { - val address = getAddressOfParty(PartyInfo.SingleNode(destination, emptyList())) - send(createMessage(FlowMessagingImpl.sessionTopic, message.serialize().bytes), address) + private fun assertSessionTransfers(vararg expected: SessionTransfer) { + assertThat(receivedSessionMessages).containsExactly(*expected) + } + + private val FlowLogic<*>.progressSteps: CordaFuture>> + get() { + return progressTracker!!.changes + .ofType(Change.Position::class.java) + .map { it.newStep } + .materialize() + .toList() + .toFuture() + } + + @InitiatingFlow + private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party, + @Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic() { + @Suspendable + override fun call() { + // Kick off the flow on the other side ... + val session = initiateFlow(otherParty) + session.send(1) + // ... then pause this one until it's received the session-end message from the other side + receivedOtherFlowEnd.acquire() + session.sendAndReceive(2) } } - private fun assertSessionTransfers(vararg expected: SessionTransfer) { - assertThat(receivedSessionMessages).containsExactly(*expected) + // we need brand new class for a flow to fail, so here it is + @InitiatingFlow + private open class NeverRegisteredFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { + init { + require(otherParties.isNotEmpty()) + } + + @Suspendable + override fun call(): FlowInfo { + val flowInfos = otherParties.map { + val session = initiateFlow(it) + session.send(payload) + session.getCounterpartyFlowInfo() + }.toList() + return flowInfos.first() + } + } + + private object WaitingFlows { + @InitiatingFlow + class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val otherPartySession = initiateFlow(otherParty) + otherPartySession.send(stx) + return waitForLedgerCommit(stx.id) + } + } + + class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val stx = otherPartySession.receive().unwrap { it } + if (throwException != null) throw throwException.invoke() + return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty))) + } + } + } + + private class LazyServiceHubAccessFlow : FlowLogic() { + val lazyTime: Instant by lazy { serviceHub.clock.instant() } + @Suspendable + override fun call() = Unit + } + + private interface CustomInterface + + private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) + + @InitiatingFlow + private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) + + @InitiatingFlow + private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic>>() { + @Suspendable + override fun call(): List> { + val otherPartySession = initiateFlow(otherParty) + otherPartySession.send(stx) + // hold onto reference here to force checkpoint of vaultService and thus + // prove it is registered as a tokenizableService in the node + val vaultQuerySvc = serviceHub.vaultService + waitForLedgerCommit(stx.id) + return vaultQuerySvc.queryBy().states + } + } + + @InitiatingFlow(version = 2) + private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic>() { + constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession) + + @Suspendable + override fun call(): Pair { + val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty) + val received = otherPartySession.receive().unwrap { it } + val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion + return Pair(received, otherFlowVersion) + } + } + + private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val payload = otherPartySession.receive().unwrap { it } + subFlow(InlinedSendFlow(payload + payload, otherPartySession)) + } + } + + private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + subFlow(SingleInlinedSubFlow(otherPartySession)) + } + } + + private data class NonSerialisableData(val a: Int) + private class NonSerialisableFlowException(@Suppress("unused") val data: NonSerialisableData) : FlowException() + + private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() = otherPartySession.send(payload) } //endregion Helpers } -class FlowFrameworkTripartyTests { +internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, ""))) - companion object { - init { - LogHelper.setLevel("+net.corda.flow") - } - - private lateinit var mockNet: InternalMockNetwork - private lateinit var aliceNode: TestStartedNode - private lateinit var bobNode: TestStartedNode - private lateinit var charlieNode: TestStartedNode - private lateinit var alice: Party - private lateinit var bob: Party - private lateinit var charlie: Party - private lateinit var notaryIdentity: Party - private val receivedSessionMessages = ArrayList() - } - - @Before - fun setUpGlobalMockNet() { - mockNet = InternalMockNetwork( - cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), - servicePeerAllocationStrategy = RoundRobin() - ) - - aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) - bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) - charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) - - - // Extract identities - alice = aliceNode.info.singleIdentity() - bob = bobNode.info.singleIdentity() - charlie = charlieNode.info.singleIdentity() - notaryIdentity = mockNet.defaultNotaryIdentity - - receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } - } - - @After - fun cleanUp() { - mockNet.stopNodes() - receivedSessionMessages.clear() - } - - private fun receivedSessionMessagesObservable(): Observable { - return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() - } - - @Test - fun `sending to multiple parties`() { - bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() } - charlieNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() } - val payload = "Hello World" - aliceNode.services.startFlow(SendFlow(payload, bob, charlie)) - mockNet.runNetwork() - bobNode.internals.acceptableLiveFiberCountOnStop = 1 - charlieNode.internals.acceptableLiveFiberCountOnStop = 1 - val bobFlow = bobNode.getSingleFlow().first - val charlieFlow = charlieNode.getSingleFlow().first - assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload) - assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload) - - assertSessionTransfers(bobNode, - aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - aliceNode sent normalEnd to bobNode - //There's no session end from the other flows as they're manually suspended - ) - - assertSessionTransfers(charlieNode, - aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode, - charlieNode sent sessionConfirm() to aliceNode, - aliceNode sent normalEnd to charlieNode - //There's no session end from the other flows as they're manually suspended - ) - } - - @Test - fun `receiving from multiple parties`() { - val bobPayload = "Test 1" - val charliePayload = "Test 2" - bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) } - charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) } - val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating() - aliceNode.services.startFlow(multiReceiveFlow) - aliceNode.internals.acceptableLiveFiberCountOnStop = 1 - mockNet.runNetwork() - assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload) - assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload) - - assertSessionTransfers(bobNode, - aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent sessionData(bobPayload) to aliceNode, - bobNode sent normalEnd to aliceNode - ) - - assertSessionTransfers(charlieNode, - aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode, - charlieNode sent sessionConfirm() to aliceNode, - charlieNode sent sessionData(charliePayload) to aliceNode, - charlieNode sent normalEnd to aliceNode - ) - } - - @Test - fun `FlowException only propagated to parent`() { - charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } } - bobNode.registerCordappFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) } - val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) - mockNet.runNetwork() - assertThatExceptionOfType(UnexpectedFlowEndException::class.java) - .isThrownBy { receivingFiber.resultFuture.getOrThrow() } - } - - @Test - fun `FlowException thrown and there is a 3rd unrelated party flow`() { - // Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move - // onto Charlie which will throw the exception - val node2Fiber = bobNode - .registerCordappFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") } - .map { it.stateMachine } - charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } } - - val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl - mockNet.runNetwork() - - // Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's - // not relevant to it) but it will end its session with it - assertThatExceptionOfType(MyFlowException::class.java).isThrownBy { - aliceFiber.resultFuture.getOrThrow() - } - val bobResultFuture = node2Fiber.getOrThrow().resultFuture - assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { - bobResultFuture.getOrThrow() - } - - assertSessionTransfers(bobNode, - aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent sessionData("Hello") to aliceNode, - aliceNode sent errorMessage() to bobNode - ) - } - - private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) - - private fun assertSessionTransfers(vararg expected: SessionTransfer) { - assertThat(receivedSessionMessages).containsExactly(*expected) - } - - private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { - val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } - assertThat(actualForNode).containsExactly(*expected) - return actualForNode - } - -} - -class FlowFrameworkPersistenceTests { - companion object { - init { - LogHelper.setLevel("+net.corda.flow") - } - } - - private lateinit var mockNet: InternalMockNetwork - private val receivedSessionMessages = ArrayList() - private lateinit var aliceNode: TestStartedNode - private lateinit var bobNode: TestStartedNode - private lateinit var notaryIdentity: Party - private lateinit var alice: Party - private lateinit var bob: Party - private lateinit var aliceFlowManager: MockNodeFlowManager - private lateinit var bobFlowManager: MockNodeFlowManager - - @Before - fun start() { - mockNet = InternalMockNetwork( - cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), - servicePeerAllocationStrategy = RoundRobin() - ) - aliceFlowManager = MockNodeFlowManager() - bobFlowManager = MockNodeFlowManager() - - aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager)) - bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager)) - - receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } - - // Extract identities - alice = aliceNode.info.singleIdentity() - bob = bobNode.info.singleIdentity() - notaryIdentity = mockNet.defaultNotaryIdentity - } - - @After - fun cleanUp() { - mockNet.stopNodes() - receivedSessionMessages.clear() - } - - @Test - fun `newly added flow is preserved on restart`() { - aliceNode.services.startFlow(NoOpFlow(nonTerminating = true)) - aliceNode.internals.acceptableLiveFiberCountOnStop = 1 - val restoredFlow = aliceNode.restartAndGetRestoredFlow() - assertThat(restoredFlow.flowStarted).isTrue() - } - - @Test - fun `flow restarted just after receiving payload`() { - bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() } - aliceNode.services.startFlow(SendFlow("Hello", bob)) - - // We push through just enough messages to get only the payload sent - bobNode.pumpReceive() - bobNode.internals.disableDBCloseOnStop() - bobNode.internals.acceptableLiveFiberCountOnStop = 1 - bobNode.dispose() - mockNet.runNetwork() - val restoredFlow = bobNode.restartAndGetRestoredFlow() - assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") - } - - @Test - fun `flow loaded from checkpoint will respond to messages from before start`() { - aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } - bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow - val restoredFlow = bobNode.restartAndGetRestoredFlow() - assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") - } - - @Ignore("Some changes in startup order make this test's assumptions fail.") - @Test - fun `flow with send will resend on interrupted restart`() { - val payload = random63BitValue() - val payload2 = random63BitValue() - - var sentCount = 0 - mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ } - val charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) - val secondFlow = charlieNode.registerCordappFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) } - mockNet.runNetwork() - val charlie = charlieNode.info.singleIdentity() - - // Kick off first send and receive - bobNode.services.startFlow(PingPongFlow(charlie, payload)) - bobNode.database.transaction { - assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) - } - // Make sure the add() has finished initial processing. - bobNode.internals.disableDBCloseOnStop() - // Restart node and thus reload the checkpoint and resend the message with same UUID - bobNode.dispose() - bobNode.database.transaction { - assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint - bobNode.services.networkMapCache.clearNetworkMapCache() - } - val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id)) - bobNode.internals.manuallyCloseDB() - val (firstAgain, fut1) = node2b.getSingleFlow() - // Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync. - mockNet.runNetwork() - fut1.getOrThrow() - - val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer } - // Check flows completed cleanly and didn't get out of phase - assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way - // can't give a precise value as every addMessageHandler re-runs the undelivered messages - assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") - node2b.database.transaction { - assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") - } - charlieNode.database.transaction { - assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") - } - assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3") - assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3") - assertEquals(payload, secondFlow.getOrThrow().receivedPayload, "Received payload does not match the (restarted) first value on Node 2") - assertEquals(payload + 1, secondFlow.getOrThrow().receivedPayload2, "Received payload does not match the expected second value on Node 2") - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////// - //region Helpers - - private inline fun > TestStartedNode.restartAndGetRestoredFlow(): P { - val newNode = mockNet.restartNode(this) - newNode.internals.acceptableLiveFiberCountOnStop = 1 - mockNet.runNetwork() - return newNode.getSingleFlow

().first - } - - private fun assertSessionTransfers(vararg expected: SessionTransfer) { - assertThat(receivedSessionMessages).containsExactly(*expected) - } - - private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { - val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } - assertThat(actualForNode).containsExactly(*expected) - return actualForNode - } - - private fun receivedSessionMessagesObservable(): Observable { - return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() - } - - //endregion Helpers -} - -private fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, ""))) - -private inline fun > TestStartedNode.getSingleFlow(): Pair> { +internal inline fun > TestStartedNode.getSingleFlow(): Pair> { return smm.findStateMachines(P::class.java).single() } @@ -792,7 +605,7 @@ private fun sanitise(message: SessionMessage) = when (message) { } } -private fun Observable.toSessionTransfers(): Observable { +internal fun Observable.toSessionTransfers(): Observable { return filter { it.getMessage().topic == FlowMessagingImpl.sessionTopic }.map { val from = it.sender.id val message = it.messageData.deserialize() @@ -800,12 +613,19 @@ private fun Observable.toSessionTransfers(): Observable = Pair(internals.id, message) -private infix fun Pair.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) +internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0)) -private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) { +internal infix fun TestStartedNode.sent(message: SessionMessage): Pair = Pair(internals.id, message) +internal infix fun Pair.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) + +internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) { val isPayloadTransfer: Boolean get() = message is ExistingSessionMessage && message.payload is DataSessionMessage || @@ -814,53 +634,14 @@ private data class SessionTransfer(val from: Int, val message: SessionMessage, v override fun toString(): String = "$from sent $message to $to" } - -private fun sessionInit(clientFlowClass: KClass>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage { +internal fun sessionInit(clientFlowClass: KClass>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage { return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize()) } -private fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize())) - - -private val FlowLogic<*>.progressSteps: CordaFuture>> - get() { - return progressTracker!!.changes - .ofType(Change.Position::class.java) - .map { it.newStep } - .materialize() - .toList() - .toFuture() - } - -class ThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor { - var thrown = false - @Suspendable - override fun executeAction(fiber: FlowFiber, action: Action) { - if (thrown) { - delegate.executeAction(fiber, action) - } else { - thrown = true - throw exception - } - } -} +internal fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize())) @InitiatingFlow -private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party, - @Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic() { - @Suspendable - override fun call() { - // Kick off the flow on the other side ... - val session = initiateFlow(otherParty) - session.send(1) - // ... then pause this one until it's received the session-end message from the other side - receivedOtherFlowEnd.acquire() - session.sendAndReceive(2) - } -} - -@InitiatingFlow -private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { +internal open class SendFlow(private val payload: Any, private vararg val otherParties: Party) : FlowLogic() { init { require(otherParties.isNotEmpty()) } @@ -876,46 +657,7 @@ private open class SendFlow(val payload: Any, vararg val otherParties: Party) : } } -// we need brand new class for a flow to fail, so here it is -@InitiatingFlow -private open class NeverRegisteredFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { - init { - require(otherParties.isNotEmpty()) - } - - @Suspendable - override fun call(): FlowInfo { - val flowInfos = otherParties.map { - val session = initiateFlow(it) - session.send(payload) - session.getCounterpartyFlowInfo() - }.toList() - return flowInfos.first() - } -} - -private object WaitingFlows { - @InitiatingFlow - class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - val otherPartySession = initiateFlow(otherParty) - otherPartySession.send(stx) - return waitForLedgerCommit(stx.id) - } - } - - class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - val stx = otherPartySession.receive().unwrap { it } - if (throwException != null) throw throwException.invoke() - return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty))) - } - } -} - -private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() { +internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() { @Transient var flowStarted = false @@ -928,7 +670,7 @@ private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() } } -private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLogic() { +internal class InitiatedReceiveFlow(private val otherPartySession: FlowSession) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") object RECEIVED_STEP : ProgressTracker.Step("Received") @@ -953,26 +695,13 @@ private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLog } } -private class LazyServiceHubAccessFlow : FlowLogic() { - val lazyTime: Instant by lazy { serviceHub.clock.instant() } - @Suspendable - override fun call() = Unit -} - -private open class InitiatedSendFlow(val payload: Any, val otherPartySession: FlowSession) : FlowLogic() { +internal open class InitiatedSendFlow(private val payload: Any, private val otherPartySession: FlowSession) : FlowLogic() { @Suspendable override fun call() = otherPartySession.send(payload) } -private interface CustomInterface - -private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) - @InitiatingFlow -private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) - -@InitiatingFlow -private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic() { +internal class ReceiveFlow(private vararg val otherParties: Party) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") object RECEIVED_STEP : ProgressTracker.Step("Received") @@ -1001,72 +730,23 @@ private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic() { } } -private class MyFlowException(override val message: String) : FlowException() { +internal class MyFlowException(override val message: String) : FlowException() { override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message override fun hashCode(): Int = message.hashCode() } @InitiatingFlow -private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic>>() { - @Suspendable - override fun call(): List> { - val otherPartySession = initiateFlow(otherParty) - otherPartySession.send(stx) - // hold onto reference here to force checkpoint of vaultService and thus - // prove it is registered as a tokenizableService in the node - val vaultQuerySvc = serviceHub.vaultService - waitForLedgerCommit(stx.id) - return vaultQuerySvc.queryBy().states - } -} - -@InitiatingFlow(version = 2) -private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic>() { - constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession) - - @Suspendable - override fun call(): Pair { - val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty) - val received = otherPartySession.receive().unwrap { it } - val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion - return Pair(received, otherFlowVersion) - } -} - -private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - val payload = otherPartySession.receive().unwrap { it } - subFlow(InlinedSendFlow(payload + payload, otherPartySession)) - } -} - -private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - subFlow(SingleInlinedSubFlow(otherPartySession)) - } -} - -private data class NonSerialisableData(val a: Int) -private class NonSerialisableFlowException(@Suppress("unused") val data: NonSerialisableData) : FlowException() - -@InitiatingFlow -private class SendAndReceiveFlow(val otherParty: Party, val payload: Any, val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic() { constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession) @Suspendable - override fun call(): Any = (otherPartySession - ?: initiateFlow(otherParty)).sendAndReceive(payload).unwrap { it } -} - -private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() = otherPartySession.send(payload) + override fun call(): Any { + return (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive(payload).unwrap { it } + } } @InitiatingFlow -private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic() { constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession) @Transient @@ -1082,7 +762,7 @@ private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPa } } -private class ExceptionFlow(val exception: () -> E) : FlowLogic() { +internal class ExceptionFlow(val exception: () -> E) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") override val progressTracker: ProgressTracker = ProgressTracker(START_STEP) @@ -1094,4 +774,4 @@ private class ExceptionFlow(val exception: () -> E) : FlowLogic() + + @Before + fun setUpMockNet() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + + aliceNode = createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + bobNode = createNode(InternalMockNodeParameters(legalName = BOB_NAME)) + + // Extract identities + alice = aliceNode.info.singleIdentity() + bob = bobNode.info.singleIdentity() + notaryIdentity = mockNet.defaultNotaryIdentity + + receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } + } + + private fun receivedSessionMessagesObservable(): Observable { + return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + } + + @After + fun cleanUp() { + mockNet.stopNodes() + receivedSessionMessages.clear() + } + + private fun createNode(parameters: InternalMockNodeParameters): TestStartedNode { + return mockNet.createNode(parameters) { + object : InternalMockNetwork.MockNode(it) { + override fun makeStateMachineManager(): StateMachineManager { + val executor = MultiThreadedStateMachineExecutor(metricRegistry, 1) + return MultiThreadedStateMachineManager( + services, + checkpointStorage, + executor, + database, + newSecureRandom(), + busyNodeLatch, + cordappLoader.appClassLoader + ) + } + } + } + } + + @Test + fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() { + aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob) + mockNet.runNetwork() + assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital + val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot + assertThat(medicalRecords).hasSize(1) + val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit + assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class") + bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender + mockNet.runNetwork() + assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected + val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage + assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class") + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTripartyTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTripartyTests.kt new file mode 100644 index 0000000000..eb7be9531f --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTripartyTests.kt @@ -0,0 +1,178 @@ +package net.corda.node.services.statemachine + +import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.flows.registerCordappFlowFactory +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.map +import net.corda.core.utilities.getOrThrow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.internal.LogHelper +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.internal.* +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.AssertionsForClassTypes +import org.junit.After +import org.junit.Before +import org.junit.Test +import rx.Observable +import java.util.* + +class FlowFrameworkTripartyTests { + companion object { + init { + LogHelper.setLevel("+net.corda.flow") + } + + private lateinit var mockNet: InternalMockNetwork + private lateinit var aliceNode: TestStartedNode + private lateinit var bobNode: TestStartedNode + private lateinit var charlieNode: TestStartedNode + private lateinit var alice: Party + private lateinit var bob: Party + private lateinit var charlie: Party + private lateinit var notaryIdentity: Party + private val receivedSessionMessages = ArrayList() + } + + @Before + fun setUpGlobalMockNet() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + + aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) + charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) + + + // Extract identities + alice = aliceNode.info.singleIdentity() + bob = bobNode.info.singleIdentity() + charlie = charlieNode.info.singleIdentity() + notaryIdentity = mockNet.defaultNotaryIdentity + + receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } + } + + @After + fun cleanUp() { + mockNet.stopNodes() + receivedSessionMessages.clear() + } + + private fun receivedSessionMessagesObservable(): Observable { + return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + } + + @Test + fun `sending to multiple parties`() { + bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) + .nonTerminating() } + charlieNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) + .nonTerminating() } + val payload = "Hello World" + aliceNode.services.startFlow(SendFlow(payload, bob, charlie)) + mockNet.runNetwork() + bobNode.internals.acceptableLiveFiberCountOnStop = 1 + charlieNode.internals.acceptableLiveFiberCountOnStop = 1 + val bobFlow = bobNode.getSingleFlow().first + val charlieFlow = charlieNode.getSingleFlow().first + assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload) + assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload) + + assertSessionTransfers(bobNode, + aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + aliceNode sent normalEnd to bobNode + //There's no session end from the other flows as they're manually suspended + ) + + assertSessionTransfers(charlieNode, + aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode, + charlieNode sent sessionConfirm() to aliceNode, + aliceNode sent normalEnd to charlieNode + //There's no session end from the other flows as they're manually suspended + ) + } + + @Test + fun `receiving from multiple parties`() { + val bobPayload = "Test 1" + val charliePayload = "Test 2" + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) } + charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) } + val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating() + aliceNode.services.startFlow(multiReceiveFlow) + aliceNode.internals.acceptableLiveFiberCountOnStop = 1 + mockNet.runNetwork() + assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload) + assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload) + + assertSessionTransfers(bobNode, + aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent sessionData(bobPayload) to aliceNode, + bobNode sent normalEnd to aliceNode + ) + + assertSessionTransfers(charlieNode, + aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode, + charlieNode sent sessionConfirm() to aliceNode, + charlieNode sent sessionData(charliePayload) to aliceNode, + charlieNode sent normalEnd to aliceNode + ) + } + + @Test + fun `FlowException only propagated to parent`() { + charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } } + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) } + val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) + mockNet.runNetwork() + AssertionsForClassTypes.assertThatExceptionOfType(UnexpectedFlowEndException::class.java) + .isThrownBy { receivingFiber.resultFuture.getOrThrow() } + } + + @Test + fun `FlowException thrown and there is a 3rd unrelated party flow`() { + // Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move + // onto Charlie which will throw the exception + val node2Fiber = bobNode + .registerCordappFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") } + .map { it.stateMachine } + charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } } + + val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl + mockNet.runNetwork() + + // Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's + // not relevant to it) but it will end its session with it + AssertionsForClassTypes.assertThatExceptionOfType(MyFlowException::class.java) + .isThrownBy { + aliceFiber.resultFuture.getOrThrow() + } + val bobResultFuture = node2Fiber.getOrThrow().resultFuture + AssertionsForClassTypes.assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { + bobResultFuture.getOrThrow() + } + + assertSessionTransfers(bobNode, + aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent sessionData("Hello") to aliceNode, + aliceNode sent errorMessage() to bobNode + ) + } + + private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) + + private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { + val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } + assertThat(actualForNode).containsExactly(*expected) + return actualForNode + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 7822e6f136..4255224d16 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -175,8 +175,6 @@ class RetryFlowMockTest { TODO("not implemented") } }), nodeA.services.newContext()).get() - // Should be 2 records, one for admission and one for keep in. - records.next() records.next() // Killing it should remove it. nodeA.smm.killFlow(flow.id) diff --git a/release-tools/testing/README.md b/release-tools/testing/README.md index 0f859967f6..b49ce25426 100644 --- a/release-tools/testing/README.md +++ b/release-tools/testing/README.md @@ -58,7 +58,9 @@ This will create a new sub-task under each of the test tickets for `` ` ## Options -Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user `. For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`. +Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user `. You can also provide the user name in the environment variable, `JIRA_USER`. + +For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`. There is also a useful dry-run option, `--dry-run` or `-d`, that lets you run through the command without creating any tickets or applying any changes to JIRA. diff --git a/release-tools/testing/login_manager.py b/release-tools/testing/login_manager.py index 08a4e1412a..2fc38b1184 100644 --- a/release-tools/testing/login_manager.py +++ b/release-tools/testing/login_manager.py @@ -1,7 +1,7 @@ # {{{ Dependencies from __future__ import print_function -import sys +import sys, os try: from getpass import getpass @@ -41,9 +41,13 @@ def confirm(message, auto_yes=False): # {{{ login(account, user, password, use_keyring) - Present user with login prompt and return the provided username and password. If use_keyring is true, use previously provided password (if any) def login(account, user=None, password=None, use_keyring=True): if not user: - user = prompt('Username: ') - user = u'{}@r3.com'.format(user) if '@' not in user else user - if not user: return (None, None) + if 'JIRA_USER' not in os.environ: + user = prompt('Username: ') + user = u'{}@r3.com'.format(user) if '@' not in user else user + if not user: return (None, None) + else: + user = os.environ['JIRA_USER'] + print('Username: {}'.format(user)) else: user = u'{}@r3.com'.format(user) if '@' not in user else user print('Username: {}'.format(user)) diff --git a/release-tools/testing/test-manager b/release-tools/testing/test-manager index f0471bbb1b..d288817f63 100755 --- a/release-tools/testing/test-manager +++ b/release-tools/testing/test-manager @@ -16,10 +16,10 @@ try: def red(message): return colored(message, 'red') def yellow(message): return colored(message, 'yellow') def faint(message): return colored(message, 'white', attrs=['dark']) - def on_green(message): return colored(message, 'white', 'on_green') - def on_red(message): return colored(message, 'white', 'on_red') - def blue_on_white(message): return colored(message, 'blue', 'on_white') - def yellow_on_white(message): return colored(message, 'yellow', 'on_white') + def on_green(message): return colored(message, 'green') + def on_red(message): return colored(message, 'red') + def blue_on_white(message): return colored(message, 'blue') + def yellow_on_white(message): return colored(message, 'yellow') except: def blue(message): return u'[{}]'.format(message) def green(message): return message @@ -159,7 +159,10 @@ def create_version(args): jira.jira.create_version(name=version, project=project, description=version) print(u' {} - Created version for project {}'.format(green('SUCCESS'), blue(project))) except Exception as error: - print(u' {} - Failed to version: {}'.format(red('FAIL'), error)) + if args.verbose: + print(u' {} - Failed to version: {}'.format(red('FAIL'), error)) + else: + print(u' {} - Failed to version: {}'.format(red('FAIL'), error.text)) print()