mirror of
https://github.com/corda/corda.git
synced 2025-03-15 00:36:49 +00:00
commit
fab0c5cd2c
1
.gitignore
vendored
1
.gitignore
vendored
@ -38,7 +38,6 @@ lib/quasar.jar
|
||||
.idea/markdown-navigator
|
||||
.idea/runConfigurations
|
||||
.idea/dictionaries
|
||||
.idea/codeStyles/
|
||||
|
||||
# Include the -parameters compiler option by default in IntelliJ required for serialization.
|
||||
!.idea/compiler.xml
|
||||
|
43
.idea/codeStyles/Project.xml
generated
Normal file
43
.idea/codeStyles/Project.xml
generated
Normal file
@ -0,0 +1,43 @@
|
||||
<component name="ProjectCodeStyleConfiguration">
|
||||
<code_scheme name="Project" version="173">
|
||||
<option name="LINE_SEPARATOR" value=" " />
|
||||
<option name="RIGHT_MARGIN" value="140" />
|
||||
<option name="SOFT_MARGINS" value="140" />
|
||||
<JetCodeStyleSettings>
|
||||
<option name="PACKAGES_TO_USE_STAR_IMPORTS">
|
||||
<value>
|
||||
<package name="java.util" withSubpackages="false" static="false" />
|
||||
<package name="kotlinx.android.synthetic" withSubpackages="true" static="false" />
|
||||
<package name="tornadofx" withSubpackages="false" static="false" />
|
||||
</value>
|
||||
</option>
|
||||
<option name="CONTINUATION_INDENT_IN_PARAMETER_LISTS" value="true" />
|
||||
<option name="CONTINUATION_INDENT_IN_ARGUMENT_LISTS" value="true" />
|
||||
<option name="CONTINUATION_INDENT_FOR_EXPRESSION_BODIES" value="true" />
|
||||
<option name="CONTINUATION_INDENT_FOR_CHAINED_CALLS" value="true" />
|
||||
<option name="CONTINUATION_INDENT_IN_SUPERTYPE_LISTS" value="true" />
|
||||
<option name="CONTINUATION_INDENT_IN_IF_CONDITIONS" value="true" />
|
||||
<option name="CONTINUATION_INDENT_IN_ELVIS" value="true" />
|
||||
<option name="WRAP_EXPRESSION_BODY_FUNCTIONS" value="0" />
|
||||
<option name="IF_RPAREN_ON_NEW_LINE" value="false" />
|
||||
<option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
|
||||
</JetCodeStyleSettings>
|
||||
<editorconfig>
|
||||
<option name="ENABLED" value="false" />
|
||||
</editorconfig>
|
||||
<codeStyleSettings language="kotlin">
|
||||
<option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
|
||||
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1" />
|
||||
<option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
|
||||
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="0" />
|
||||
<option name="CALL_PARAMETERS_WRAP" value="0" />
|
||||
<option name="CALL_PARAMETERS_LPAREN_ON_NEXT_LINE" value="false" />
|
||||
<option name="CALL_PARAMETERS_RPAREN_ON_NEXT_LINE" value="false" />
|
||||
<option name="METHOD_PARAMETERS_WRAP" value="0" />
|
||||
<option name="METHOD_PARAMETERS_LPAREN_ON_NEXT_LINE" value="false" />
|
||||
<option name="METHOD_PARAMETERS_RPAREN_ON_NEXT_LINE" value="false" />
|
||||
<option name="EXTENDS_LIST_WRAP" value="0" />
|
||||
<option name="ASSIGNMENT_WRAP" value="0" />
|
||||
</codeStyleSettings>
|
||||
</code_scheme>
|
||||
</component>
|
5
.idea/codeStyles/codeStyleConfig.xml
generated
Normal file
5
.idea/codeStyles/codeStyleConfig.xml
generated
Normal file
@ -0,0 +1,5 @@
|
||||
<component name="ProjectCodeStyleConfiguration">
|
||||
<state>
|
||||
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
|
||||
</state>
|
||||
</component>
|
6
.idea/compiler.xml
generated
6
.idea/compiler.xml
generated
@ -49,6 +49,10 @@
|
||||
<module name="client_test" target="1.8" />
|
||||
<module name="cliutils_main" target="1.8" />
|
||||
<module name="cliutils_test" target="1.8" />
|
||||
<module name="com.r3.corda_buildSrc_main" target="1.8" />
|
||||
<module name="com.r3.corda_buildSrc_test" target="1.8" />
|
||||
<module name="com.r3.corda_canonicalizer_main" target="1.8" />
|
||||
<module name="com.r3.corda_canonicalizer_test" target="1.8" />
|
||||
<module name="common-configuration-parsing_main" target="1.8" />
|
||||
<module name="common-configuration-parsing_test" target="1.8" />
|
||||
<module name="common-validation_main" target="1.8" />
|
||||
@ -318,6 +322,8 @@
|
||||
<module name="testing-test-utils_test" target="1.8" />
|
||||
<module name="testing_main" target="1.8" />
|
||||
<module name="testing_test" target="1.8" />
|
||||
<module name="tools-blobinspector_main" target="1.8" />
|
||||
<module name="tools-blobinspector_test" target="1.8" />
|
||||
<module name="tools_main" target="1.8" />
|
||||
<module name="tools_test" target="1.8" />
|
||||
<module name="trader-demo_integrationTest" target="1.8" />
|
||||
|
@ -35,9 +35,11 @@ handling, and ensures the Corda service is run at boot.
|
||||
|
||||
6. Save the below as ``/opt/corda/node.conf``. See :doc:`corda-configuration-file` for a description of these options::
|
||||
|
||||
basedir : "/opt/corda"
|
||||
p2pAddress : "example.com:10002"
|
||||
rpcAddress : "example.com:10003"
|
||||
rpcSettings {
|
||||
address: "example.com:10003"
|
||||
adminAddress: "example.com:10004"
|
||||
}
|
||||
h2port : 11000
|
||||
emailAddress : "you@example.com"
|
||||
myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB"
|
||||
@ -56,18 +58,19 @@ handling, and ensures the Corda service is run at boot.
|
||||
|
||||
7. Make the following changes to ``/opt/corda/node.conf``:
|
||||
|
||||
* Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address.
|
||||
This is the address other nodes or RPC interfaces will use to communicate with your node
|
||||
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below)
|
||||
* Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match
|
||||
your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to
|
||||
communicate with your node.
|
||||
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below).
|
||||
* Enter an email address which will be used as an administrative contact during the registration process. This is
|
||||
only visible to the permissioning service
|
||||
only visible to the permissioning service.
|
||||
* Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely
|
||||
change as it should represent the legal identity of your node
|
||||
change as it should represent the legal identity of your node.
|
||||
|
||||
* Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea)
|
||||
* Location (``L=``) is your nearest city
|
||||
* Country (``C=``) is the `ISO 3166-1 alpha-2 code <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_
|
||||
* Change the RPC username and password
|
||||
* Change the RPC username and password.
|
||||
|
||||
.. note:: Ubuntu 16.04 and most current Linux distributions use SystemD, so if you are running one of these
|
||||
distributions follow the steps marked **SystemD**.
|
||||
@ -202,15 +205,16 @@ at boot, and means the Corda service stays running with no users connected to th
|
||||
|
||||
3. Save the below as ``C:\Corda\node.conf``. See :doc:`corda-configuration-file` for a description of these options::
|
||||
|
||||
basedir : "C:\\Corda"
|
||||
p2pAddress : "example.com:10002"
|
||||
rpcAddress : "example.com:10003"
|
||||
rpcSettings {
|
||||
address: "example.com:10003"
|
||||
adminAddress: "example.com:10004"
|
||||
}
|
||||
h2port : 11000
|
||||
emailAddress: "you@example.com"
|
||||
myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB"
|
||||
keyStorePassword : "cordacadevpass"
|
||||
trustStorePassword : "trustpass"
|
||||
extraAdvertisedServiceIds: [ "" ]
|
||||
devMode : false
|
||||
rpcUsers=[
|
||||
{
|
||||
@ -224,18 +228,19 @@ at boot, and means the Corda service stays running with no users connected to th
|
||||
|
||||
4. Make the following changes to ``C:\Corda\node.conf``:
|
||||
|
||||
* Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address.
|
||||
This is the address other nodes or RPC interfaces will use to communicate with your node
|
||||
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below)
|
||||
* Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match
|
||||
your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to
|
||||
communicate with your node.
|
||||
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below).
|
||||
* Enter an email address which will be used as an administrative contact during the registration process. This is
|
||||
only visible to the permissioning service
|
||||
only visible to the permissioning service.
|
||||
* Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely
|
||||
change as it should represent the legal identity of your node
|
||||
change as it should represent the legal identity of your node.
|
||||
|
||||
* Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea)
|
||||
* Location (``L=``) is your nearest city
|
||||
* Country (``C=``) is the `ISO 3166-1 alpha-2 code <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_
|
||||
* Change the RPC username and password
|
||||
* Change the RPC username and password.
|
||||
|
||||
5. Copy the required Java keystores to the node. See :doc:`permissioning`
|
||||
|
||||
|
@ -120,14 +120,14 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
cacheFactoryPrototype: BindableNamedCacheFactory,
|
||||
protected val versionInfo: VersionInfo,
|
||||
protected val flowManager: FlowManager,
|
||||
protected val serverThread: AffinityExecutor.ServiceAffinityExecutor,
|
||||
protected val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
|
||||
val serverThread: AffinityExecutor.ServiceAffinityExecutor,
|
||||
val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
|
||||
|
||||
protected abstract val log: Logger
|
||||
@Suppress("LeakingThis")
|
||||
private var tokenizableServices: MutableList<Any>? = mutableListOf(platformClock, this)
|
||||
|
||||
protected val metricRegistry = MetricRegistry()
|
||||
val metricRegistry = MetricRegistry()
|
||||
protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize()
|
||||
val monitoringService = MonitoringService(metricRegistry).tokenize()
|
||||
|
||||
@ -144,7 +144,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
protected val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo)
|
||||
val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo)
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize()
|
||||
val identityService = PersistentIdentityService(cacheFactory).tokenize()
|
||||
val database: CordaPersistence = createCordaPersistence(
|
||||
|
@ -33,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.shouldCheckCheckpoints
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.statemachine.interceptors.*
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
@ -97,8 +96,6 @@ class MultiThreadedStateMachineManager(
|
||||
val timedFlows = ConcurrentHashMap<StateMachineRunId, ScheduledTimeout>()
|
||||
}
|
||||
|
||||
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital()
|
||||
|
||||
private val concurrentBox = ConcurrentBox(InnerState())
|
||||
|
||||
private val scheduler = FiberExecutorScheduler("Flow fiber scheduler", executor)
|
||||
@ -111,17 +108,18 @@ class MultiThreadedStateMachineManager(
|
||||
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
|
||||
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
||||
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID
|
||||
|
||||
private var checkpointSerializationContext: CheckpointSerializationContext? = null
|
||||
private var tokenizableServices: List<Any>? = null
|
||||
private var actionExecutor: ActionExecutor? = null
|
||||
|
||||
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID)
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
|
||||
override val allStateMachines: List<FlowLogic<*>>
|
||||
get() = concurrentBox.content.flows.values.map { it.fiber.logic }
|
||||
|
||||
|
||||
private val totalStartedFlows = metrics.counter("Flows.Started")
|
||||
private val totalFinishedFlows = metrics.counter("Flows.Finished")
|
||||
private val totalSuccessFlows = metrics.counter("Flows.Success")
|
||||
@ -214,7 +212,7 @@ class MultiThreadedStateMachineManager(
|
||||
invocationContext = context,
|
||||
flowLogic = flowLogic,
|
||||
flowStart = FlowStart.Explicit,
|
||||
ourIdentity = ourIdentity ?: getOurFirstIdentity(),
|
||||
ourIdentity = ourIdentity ?: ourFirstIdentity,
|
||||
deduplicationHandler = deduplicationHandler,
|
||||
isStartIdempotent = false
|
||||
)
|
||||
@ -408,21 +406,19 @@ class MultiThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) {
|
||||
val message: ReceivedMessage = event.receivedMessage
|
||||
val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler
|
||||
val peer = message.peer
|
||||
val peer = event.receivedMessage.peer
|
||||
val sessionMessage = try {
|
||||
message.data.deserialize<SessionMessage>()
|
||||
event.receivedMessage.data.deserialize<SessionMessage>()
|
||||
} catch (ex: Exception) {
|
||||
logger.error("Received corrupt SessionMessage data from $peer")
|
||||
deduplicationHandler.afterDatabaseTransaction()
|
||||
event.deduplicationHandler.afterDatabaseTransaction()
|
||||
return
|
||||
}
|
||||
val sender = serviceHub.networkMapCache.getPeerByLegalName(peer)
|
||||
if (sender != null) {
|
||||
when (sessionMessage) {
|
||||
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender)
|
||||
is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender)
|
||||
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender)
|
||||
is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event)
|
||||
}
|
||||
} else {
|
||||
logger.error("Unknown peer $peer in $sessionMessage")
|
||||
@ -453,13 +449,8 @@ class MultiThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) {
|
||||
fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage {
|
||||
val errorId = secureRandom.nextLong()
|
||||
val payload = RejectSessionMessage(message, errorId)
|
||||
return ExistingSessionMessage(initiatorSessionId, payload)
|
||||
}
|
||||
val replyError = try {
|
||||
private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
|
||||
try {
|
||||
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
|
||||
val initiatedSessionId = SessionId.createRandom(secureRandom)
|
||||
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
|
||||
@ -469,40 +460,34 @@ class MultiThreadedStateMachineManager(
|
||||
is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName)
|
||||
}
|
||||
val senderCoreFlowVersion = when (initiatedFlowFactory) {
|
||||
is InitiatedFlowFactory.Core -> senderPlatformVersion
|
||||
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
|
||||
is InitiatedFlowFactory.CorDapp -> null
|
||||
}
|
||||
startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||
null
|
||||
} catch (exception: Exception) {
|
||||
logger.warn("Exception while creating initiated flow", exception)
|
||||
createErrorMessage(
|
||||
sessionMessage.initiatorSessionId,
|
||||
(exception as? SessionRejectException)?.message ?: "Unable to establish session"
|
||||
)
|
||||
}
|
||||
|
||||
if (replyError != null) {
|
||||
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
|
||||
deduplicationHandler.afterDatabaseTransaction()
|
||||
startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
|
||||
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
|
||||
flowHospital.sessionInitErrored(sessionMessage, sender, event, t)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO this is a temporary hack until we figure out multiple identities
|
||||
private fun getOurFirstIdentity(): Party {
|
||||
return serviceHub.myInfo.legalIdentities[0]
|
||||
}
|
||||
private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0]
|
||||
|
||||
private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> {
|
||||
val initiatingFlowClass = try {
|
||||
Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java)
|
||||
val initiatorClass = try {
|
||||
Class.forName(message.initiatorFlowClassName, true, classloader)
|
||||
} catch (e: ClassNotFoundException) {
|
||||
throw SessionRejectException("Don't know ${message.initiatorFlowClassName}")
|
||||
} catch (e: ClassCastException) {
|
||||
throw SessionRejectException("${message.initiatorFlowClassName} is not a flow")
|
||||
throw SessionRejectException.UnknownClass(message.initiatorFlowClassName)
|
||||
}
|
||||
return serviceHub.getFlowFactory(initiatingFlowClass) ?:
|
||||
throw SessionRejectException("$initiatingFlowClass is not registered")
|
||||
|
||||
val initiatorFlowClass = try {
|
||||
initiatorClass.asSubclass(FlowLogic::class.java)
|
||||
} catch (e: ClassCastException) {
|
||||
throw SessionRejectException.NotAFlow(initiatorClass)
|
||||
}
|
||||
|
||||
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
|
||||
}
|
||||
|
||||
private fun <A> startInitiatedFlow(
|
||||
@ -515,7 +500,7 @@ class MultiThreadedStateMachineManager(
|
||||
initiatedFlowInfo: FlowInfo
|
||||
) {
|
||||
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||
val ourIdentity = getOurFirstIdentity()
|
||||
val ourIdentity = ourFirstIdentity
|
||||
startFlowInternal(
|
||||
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
|
||||
initiatingMessageDeduplicationHandler,
|
||||
|
@ -1,8 +1,16 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
|
||||
/**
|
||||
* An exception propagated and thrown in case a session initiation fails.
|
||||
*/
|
||||
class SessionRejectException(message: String) : CordaException(message)
|
||||
open class SessionRejectException(message: String) : CordaException(message) {
|
||||
class UnknownClass(val initiatorFlowClassName: String) : SessionRejectException("Don't know $initiatorFlowClassName")
|
||||
|
||||
class NotAFlow(val initiatorClass: Class<*>) : SessionRejectException("${initiatorClass.name} is not a flow")
|
||||
|
||||
class NotRegistered(val initiatorFlowClass: Class<out FlowLogic<*>>) : SessionRejectException("${initiatorFlowClass.name} is not registered")
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,8 @@ import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
|
||||
import net.corda.core.serialization.internal.checkpointDeserialize
|
||||
@ -32,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.shouldCheckCheckpoints
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
||||
import net.corda.node.services.statemachine.interceptors.*
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
@ -92,8 +92,6 @@ class SingleThreadedStateMachineManager(
|
||||
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
|
||||
}
|
||||
|
||||
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital()
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
|
||||
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
|
||||
@ -104,12 +102,14 @@ class SingleThreadedStateMachineManager(
|
||||
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
|
||||
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
||||
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
|
||||
|
||||
private var checkpointSerializationContext: CheckpointSerializationContext? = null
|
||||
private var actionExecutor: ActionExecutor? = null
|
||||
|
||||
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID)
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
|
||||
override val allStateMachines: List<FlowLogic<*>>
|
||||
get() = mutex.locked { flows.values.map { it.fiber.logic } }
|
||||
|
||||
@ -204,7 +204,7 @@ class SingleThreadedStateMachineManager(
|
||||
invocationContext = context,
|
||||
flowLogic = flowLogic,
|
||||
flowStart = FlowStart.Explicit,
|
||||
ourIdentity = ourIdentity ?: getOurFirstIdentity(),
|
||||
ourIdentity = ourIdentity ?: ourFirstIdentity,
|
||||
deduplicationHandler = deduplicationHandler,
|
||||
isStartIdempotent = false
|
||||
)
|
||||
@ -402,23 +402,23 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) {
|
||||
val message: ReceivedMessage = event.receivedMessage
|
||||
val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler
|
||||
val peer = message.peer
|
||||
val peer = event.receivedMessage.peer
|
||||
val sessionMessage = try {
|
||||
message.data.deserialize<SessionMessage>()
|
||||
event.receivedMessage.data.deserialize<SessionMessage>()
|
||||
} catch (ex: Exception) {
|
||||
logger.error("Received corrupt SessionMessage data from $peer")
|
||||
deduplicationHandler.afterDatabaseTransaction()
|
||||
event.deduplicationHandler.afterDatabaseTransaction()
|
||||
return
|
||||
}
|
||||
val sender = serviceHub.networkMapCache.getPeerByLegalName(peer)
|
||||
if (sender != null) {
|
||||
when (sessionMessage) {
|
||||
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender)
|
||||
is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender)
|
||||
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender)
|
||||
is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event)
|
||||
}
|
||||
} else {
|
||||
// TODO Send the event to the flow hospital to be retried on network map update
|
||||
// TODO Test that restarting the node attempts to retry
|
||||
logger.error("Unknown peer $peer in $sessionMessage")
|
||||
}
|
||||
}
|
||||
@ -448,14 +448,8 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) {
|
||||
fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage {
|
||||
val errorId = secureRandom.nextLong()
|
||||
val payload = RejectSessionMessage(message, errorId)
|
||||
return ExistingSessionMessage(initiatorSessionId, payload)
|
||||
}
|
||||
|
||||
val replyError = try {
|
||||
private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
|
||||
try {
|
||||
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
|
||||
val initiatedSessionId = SessionId.createRandom(secureRandom)
|
||||
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
|
||||
@ -465,40 +459,34 @@ class SingleThreadedStateMachineManager(
|
||||
is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName)
|
||||
}
|
||||
val senderCoreFlowVersion = when (initiatedFlowFactory) {
|
||||
is InitiatedFlowFactory.Core -> senderPlatformVersion
|
||||
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
|
||||
is InitiatedFlowFactory.CorDapp -> null
|
||||
}
|
||||
startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||
null
|
||||
} catch (exception: Exception) {
|
||||
logger.warn("Exception while creating initiated flow", exception)
|
||||
createErrorMessage(
|
||||
sessionMessage.initiatorSessionId,
|
||||
(exception as? SessionRejectException)?.message ?: "Unable to establish session"
|
||||
)
|
||||
}
|
||||
|
||||
if (replyError != null) {
|
||||
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
|
||||
deduplicationHandler.afterDatabaseTransaction()
|
||||
startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
|
||||
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
|
||||
flowHospital.sessionInitErrored(sessionMessage, sender, event, t)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO this is a temporary hack until we figure out multiple identities
|
||||
private fun getOurFirstIdentity(): Party {
|
||||
return serviceHub.myInfo.legalIdentities[0]
|
||||
}
|
||||
private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0]
|
||||
|
||||
private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> {
|
||||
val initiatingFlowClass = try {
|
||||
Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java)
|
||||
val initiatorClass = try {
|
||||
Class.forName(message.initiatorFlowClassName, true, classloader)
|
||||
} catch (e: ClassNotFoundException) {
|
||||
throw SessionRejectException("Don't know ${message.initiatorFlowClassName}")
|
||||
} catch (e: ClassCastException) {
|
||||
throw SessionRejectException("${message.initiatorFlowClassName} is not a flow")
|
||||
throw SessionRejectException.UnknownClass(message.initiatorFlowClassName)
|
||||
}
|
||||
return serviceHub.getFlowFactory(initiatingFlowClass)
|
||||
?: throw SessionRejectException("$initiatingFlowClass is not registered")
|
||||
|
||||
val initiatorFlowClass = try {
|
||||
initiatorClass.asSubclass(FlowLogic::class.java)
|
||||
} catch (e: ClassCastException) {
|
||||
throw SessionRejectException.NotAFlow(initiatorClass)
|
||||
}
|
||||
|
||||
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
|
||||
}
|
||||
|
||||
private fun <A> startInitiatedFlow(
|
||||
@ -511,7 +499,7 @@ class SingleThreadedStateMachineManager(
|
||||
initiatedFlowInfo: FlowInfo
|
||||
) {
|
||||
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||
val ourIdentity = getOurFirstIdentity()
|
||||
val ourIdentity = ourFirstIdentity
|
||||
startFlowInternal(
|
||||
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
|
||||
initiatingMessageDeduplicationHandler,
|
||||
|
@ -1,6 +1,8 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
@ -16,65 +18,101 @@ import java.util.*
|
||||
/**
|
||||
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
|
||||
*/
|
||||
class StaffedFlowHospital {
|
||||
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
val patients = HashMap<StateMachineRunId, MedicalHistory>()
|
||||
val flowPatients = HashMap<StateMachineRunId, FlowMedicalHistory>()
|
||||
val treatableSessionInits = HashMap<UUID, InternalSessionInitRecord>()
|
||||
val recordsPublisher = PublishSubject.create<MedicalRecord>()
|
||||
})
|
||||
private val secureRandom = newSecureRandom()
|
||||
|
||||
class MedicalHistory {
|
||||
internal val records: MutableList<MedicalRecord> = mutableListOf()
|
||||
|
||||
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean {
|
||||
val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount
|
||||
return records
|
||||
.filterIsInstance<MedicalRecord.Discharged>()
|
||||
.count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
|
||||
/**
|
||||
* The node was unable to initiate the [InitialSessionMessage] from [sender].
|
||||
*/
|
||||
fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) {
|
||||
val time = Instant.now()
|
||||
val id = UUID.randomUUID()
|
||||
val outcome = if (error is SessionRejectException.UnknownClass) {
|
||||
// We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is
|
||||
// installed on restart, at which point the message will be able proceed as normal. If not then it will need
|
||||
// to be dropped manually.
|
||||
Outcome.OVERNIGHT_OBSERVATION
|
||||
} else {
|
||||
Outcome.UNTREATABLE
|
||||
}
|
||||
|
||||
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
|
||||
val record = sessionMessage.run { MedicalRecord.SessionInit(id, time, outcome, initiatorFlowClassName, flowVersion, appName, sender, error) }
|
||||
mutex.locked {
|
||||
if (outcome != Outcome.UNTREATABLE) {
|
||||
treatableSessionInits[id] = InternalSessionInitRecord(sessionMessage, event, record)
|
||||
}
|
||||
recordsPublisher.onNext(record)
|
||||
}
|
||||
|
||||
if (outcome == Outcome.UNTREATABLE) {
|
||||
sendBackError(error, sessionMessage, sender, event)
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendBackError(error: Throwable, sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
|
||||
val message = (error as? SessionRejectException)?.message ?: "Unable to establish session"
|
||||
val payload = RejectSessionMessage(message, secureRandom.nextLong())
|
||||
val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload)
|
||||
|
||||
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
|
||||
event.deduplicationHandler.afterDatabaseTransaction()
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop the errored session-init message with the given ID ([MedicalRecord.SessionInit.id]). This will cause the node
|
||||
* to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker
|
||||
* so that it never gets redelivered.
|
||||
*/
|
||||
fun dropSessionInit(id: UUID) {
|
||||
val (sessionMessage, event, publicRecord) = mutex.locked {
|
||||
requireNotNull(treatableSessionInits.remove(id)) { "$id does not refer to any session init message" }
|
||||
}
|
||||
log.info("Errored session-init permanently dropped: $publicRecord")
|
||||
sendBackError(publicRecord.error, sessionMessage, publicRecord.sender, event)
|
||||
}
|
||||
|
||||
/**
|
||||
* The flow running in [flowFiber] has errored.
|
||||
*/
|
||||
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
|
||||
val time = Instant.now()
|
||||
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
||||
val suspendCount = currentState.checkpoint.numberOfSuspends
|
||||
|
||||
val event = mutex.locked {
|
||||
val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() }
|
||||
|
||||
val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount)
|
||||
medicalHistory.records += admitted
|
||||
recordsPublisher.onNext(admitted)
|
||||
val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() }
|
||||
|
||||
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
|
||||
|
||||
val (newRecord, event) = when (report.diagnosis) {
|
||||
val (outcome, event) = when (report.diagnosis) {
|
||||
Diagnosis.DISCHARGE -> {
|
||||
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
|
||||
Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint)
|
||||
Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint)
|
||||
}
|
||||
Diagnosis.OVERNIGHT_OBSERVATION -> {
|
||||
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
|
||||
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
|
||||
Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null)
|
||||
Pair(Outcome.OVERNIGHT_OBSERVATION, null)
|
||||
}
|
||||
Diagnosis.NOT_MY_SPECIALTY -> {
|
||||
// None of the staff care for these errors so we let them propagate
|
||||
log.info("Flow ${flowFiber.id} error allowed to propagate")
|
||||
Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation)
|
||||
Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation)
|
||||
}
|
||||
}
|
||||
|
||||
medicalHistory.records += newRecord
|
||||
recordsPublisher.onNext(newRecord)
|
||||
val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome)
|
||||
medicalHistory.records += record
|
||||
recordsPublisher.onNext(record)
|
||||
event
|
||||
}
|
||||
|
||||
@ -86,8 +124,9 @@ class StaffedFlowHospital {
|
||||
private fun consultStaff(flowFiber: FlowFiber,
|
||||
currentState: StateMachineState,
|
||||
errors: List<Throwable>,
|
||||
medicalHistory: MedicalHistory): ConsultationReport {
|
||||
medicalHistory: FlowMedicalHistory): ConsultationReport {
|
||||
return errors
|
||||
.asSequence()
|
||||
.mapIndexed { index, error ->
|
||||
log.info("Flow ${flowFiber.id} has error [$index]", error)
|
||||
val diagnoses: Map<Diagnosis, List<Staff>> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) }
|
||||
@ -105,43 +144,61 @@ class StaffedFlowHospital {
|
||||
* The flow has been removed from the state machine.
|
||||
*/
|
||||
fun flowRemoved(flowId: StateMachineRunId) {
|
||||
mutex.locked { patients.remove(flowId) }
|
||||
mutex.locked { flowPatients.remove(flowId) }
|
||||
}
|
||||
|
||||
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
|
||||
/** Returns a stream of medical records as flows pass through the hospital. */
|
||||
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
|
||||
return mutex.locked {
|
||||
DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed())
|
||||
val snapshot = (flowPatients.values.flatMap { it.records } + treatableSessionInits.values.map { it.publicRecord }).sortedBy { it.time }
|
||||
DataFeed(snapshot, recordsPublisher.bufferUntilSubscribed())
|
||||
}
|
||||
}
|
||||
|
||||
sealed class MedicalRecord {
|
||||
abstract val flowId: StateMachineRunId
|
||||
abstract val at: Instant
|
||||
abstract val suspendCount: Int
|
||||
class FlowMedicalHistory {
|
||||
internal val records: MutableList<MedicalRecord.Flow> = mutableListOf()
|
||||
|
||||
data class Admitted(override val flowId: StateMachineRunId,
|
||||
override val at: Instant,
|
||||
override val suspendCount: Int) : MedicalRecord()
|
||||
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean {
|
||||
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
|
||||
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
|
||||
}
|
||||
|
||||
data class Discharged(override val flowId: StateMachineRunId,
|
||||
override val at: Instant,
|
||||
override val suspendCount: Int,
|
||||
val by: List<Staff>,
|
||||
val errors: List<Throwable>) : MedicalRecord()
|
||||
|
||||
data class KeptInForObservation(override val flowId: StateMachineRunId,
|
||||
override val at: Instant,
|
||||
override val suspendCount: Int,
|
||||
val by: List<Staff>,
|
||||
val errors: List<Throwable>) : MedicalRecord()
|
||||
|
||||
data class NothingWeCanDo(override val flowId: StateMachineRunId,
|
||||
override val at: Instant,
|
||||
override val suspendCount: Int) : MedicalRecord()
|
||||
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
|
||||
}
|
||||
|
||||
private data class InternalSessionInitRecord(val sessionMessage: InitialSessionMessage,
|
||||
val event: ExternalEvent.ExternalMessageEvent,
|
||||
val publicRecord: MedicalRecord.SessionInit)
|
||||
|
||||
sealed class MedicalRecord {
|
||||
abstract val time: Instant
|
||||
abstract val outcome: Outcome
|
||||
abstract val errors: List<Throwable>
|
||||
|
||||
/** Medical record for a flow that has errored. */
|
||||
data class Flow(override val time: Instant,
|
||||
val flowId: StateMachineRunId,
|
||||
val suspendCount: Int,
|
||||
override val errors: List<Throwable>,
|
||||
val by: List<Staff>,
|
||||
override val outcome: Outcome) : MedicalRecord()
|
||||
|
||||
/** Medical record for a session initiation that was unsuccessful. */
|
||||
data class SessionInit(val id: UUID,
|
||||
override val time: Instant,
|
||||
override val outcome: Outcome,
|
||||
val initiatorFlowClassName: String,
|
||||
val flowVersion: Int,
|
||||
val appName: String,
|
||||
val sender: Party,
|
||||
val error: Throwable) : MedicalRecord() {
|
||||
override val errors: List<Throwable> get() = listOf(error)
|
||||
}
|
||||
}
|
||||
|
||||
enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE }
|
||||
|
||||
/** The order of the enum values are in priority order. */
|
||||
enum class Diagnosis {
|
||||
/** Retry from last safe point. */
|
||||
@ -153,14 +210,14 @@ class StaffedFlowHospital {
|
||||
}
|
||||
|
||||
interface Staff {
|
||||
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis
|
||||
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
|
||||
}
|
||||
|
||||
/**
|
||||
* SQL Deadlock detection.
|
||||
*/
|
||||
object DeadlockNurse : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (mentionsDeadlock(newError)) {
|
||||
Diagnosis.DISCHARGE
|
||||
} else {
|
||||
@ -178,8 +235,8 @@ class StaffedFlowHospital {
|
||||
* Primary key violation detection for duplicate inserts. Will detect other constraint violations too.
|
||||
*/
|
||||
object DuplicateInsertSpecialist : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
||||
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this)) {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
|
||||
Diagnosis.DISCHARGE
|
||||
} else {
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
@ -196,9 +253,9 @@ class StaffedFlowHospital {
|
||||
* exceed the limit specified by the [FlowTimeoutException].
|
||||
*/
|
||||
object DoctorTimeout : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
if (newError is FlowTimeoutException) {
|
||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this, currentState)) {
|
||||
return Diagnosis.DISCHARGE
|
||||
} else {
|
||||
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
|
||||
@ -216,12 +273,18 @@ class StaffedFlowHospital {
|
||||
* Parks [FinalityHandler]s for observation.
|
||||
*/
|
||||
object FinalityDoctor : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
||||
return (currentState.flowLogic as? FinalityHandler)?.let { logic -> Diagnosis.OVERNIGHT_OBSERVATION.also { warn(logic, flowFiber, currentState) } } ?: Diagnosis.NOT_MY_SPECIALTY
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (currentState.flowLogic is FinalityHandler) {
|
||||
warn(currentState.flowLogic, flowFiber, currentState)
|
||||
Diagnosis.OVERNIGHT_OBSERVATION
|
||||
} else {
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
}
|
||||
|
||||
private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) {
|
||||
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
|
||||
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
|
||||
"the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,8 +10,7 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.finance.POUNDS
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
@ -72,11 +71,11 @@ class FinalityHandlerTest {
|
||||
val keptInForObservation = smm.flowHospital
|
||||
.track()
|
||||
.let { it.updates.startWith(it.snapshot) }
|
||||
.filter { it.flowId == runId }
|
||||
.ofType(KeptInForObservation::class.java)
|
||||
.ofType(MedicalRecord.Flow::class.java)
|
||||
.filter { it.flowId == runId && it.outcome == Outcome.OVERNIGHT_OBSERVATION }
|
||||
.toBlocking()
|
||||
.first()
|
||||
assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor)
|
||||
assertThat(keptInForObservation.by).contains(FinalityDoctor)
|
||||
}
|
||||
|
||||
private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? {
|
||||
|
@ -0,0 +1,166 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.registerCordappFlowFactory
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FlowFrameworkPersistenceTests {
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
}
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private val receivedSessionMessages = ArrayList<SessionTransfer>()
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
private lateinit var bobNode: TestStartedNode
|
||||
private lateinit var notaryIdentity: Party
|
||||
private lateinit var alice: Party
|
||||
private lateinit var bob: Party
|
||||
private lateinit var aliceFlowManager: MockNodeFlowManager
|
||||
private lateinit var bobFlowManager: MockNodeFlowManager
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
|
||||
servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()
|
||||
)
|
||||
aliceFlowManager = MockNodeFlowManager()
|
||||
bobFlowManager = MockNodeFlowManager()
|
||||
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager))
|
||||
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager))
|
||||
|
||||
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
|
||||
|
||||
// Extract identities
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
bob = bobNode.info.singleIdentity()
|
||||
notaryIdentity = mockNet.defaultNotaryIdentity
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
receivedSessionMessages.clear()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `newly added flow is preserved on restart`() {
|
||||
aliceNode.services.startFlow(NoOpFlow(nonTerminating = true))
|
||||
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
val restoredFlow = aliceNode.restartAndGetRestoredFlow<NoOpFlow>()
|
||||
assertThat(restoredFlow.flowStarted).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `flow restarted just after receiving payload`() {
|
||||
bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it)
|
||||
.nonTerminating() }
|
||||
aliceNode.services.startFlow(SendFlow("Hello", bob))
|
||||
|
||||
// We push through just enough messages to get only the payload sent
|
||||
bobNode.pumpReceive()
|
||||
bobNode.internals.disableDBCloseOnStop()
|
||||
bobNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
bobNode.dispose()
|
||||
mockNet.runNetwork()
|
||||
val restoredFlow = bobNode.restartAndGetRestoredFlow<InitiatedReceiveFlow>()
|
||||
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `flow loaded from checkpoint will respond to messages from before start`() {
|
||||
aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
|
||||
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
|
||||
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
|
||||
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
|
||||
}
|
||||
|
||||
@Ignore("Some changes in startup order make this test's assumptions fail.")
|
||||
@Test
|
||||
fun `flow with send will resend on interrupted restart`() {
|
||||
val payload = random63BitValue()
|
||||
val payload2 = random63BitValue()
|
||||
|
||||
var sentCount = 0
|
||||
mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ }
|
||||
val charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME))
|
||||
val secondFlow = charlieNode.registerCordappFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) }
|
||||
mockNet.runNetwork()
|
||||
val charlie = charlieNode.info.singleIdentity()
|
||||
|
||||
// Kick off first send and receive
|
||||
bobNode.services.startFlow(PingPongFlow(charlie, payload))
|
||||
bobNode.database.transaction {
|
||||
assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size)
|
||||
}
|
||||
// Make sure the add() has finished initial processing.
|
||||
bobNode.internals.disableDBCloseOnStop()
|
||||
// Restart node and thus reload the checkpoint and resend the message with same UUID
|
||||
bobNode.dispose()
|
||||
bobNode.database.transaction {
|
||||
assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint
|
||||
bobNode.services.networkMapCache.clearNetworkMapCache()
|
||||
}
|
||||
val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id))
|
||||
bobNode.internals.manuallyCloseDB()
|
||||
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
|
||||
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
|
||||
mockNet.runNetwork()
|
||||
fut1.getOrThrow()
|
||||
|
||||
val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer }
|
||||
// Check flows completed cleanly and didn't get out of phase
|
||||
assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way
|
||||
// can't give a precise value as every addMessageHandler re-runs the undelivered messages
|
||||
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages")
|
||||
node2b.database.transaction {
|
||||
assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
|
||||
}
|
||||
charlieNode.database.transaction {
|
||||
assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
|
||||
}
|
||||
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
|
||||
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
|
||||
assertEquals(payload, secondFlow.getOrThrow().receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
|
||||
assertEquals(payload + 1, secondFlow.getOrThrow().receivedPayload2, "Received payload does not match the expected second value on Node 2")
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
//region Helpers
|
||||
|
||||
private inline fun <reified P : FlowLogic<*>> TestStartedNode.restartAndGetRestoredFlow(): P {
|
||||
val newNode = mockNet.restartNode(this)
|
||||
newNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
mockNet.runNetwork()
|
||||
return newNode.getSingleFlow<P>().first
|
||||
}
|
||||
|
||||
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
||||
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
||||
}
|
||||
|
||||
//endregion Helpers
|
||||
}
|
@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import co.paralleluniverse.strands.concurrent.Semaphore
|
||||
import net.corda.client.rpc.notUsed
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
@ -39,16 +40,13 @@ import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FlowFrameworkTests {
|
||||
companion object {
|
||||
@ -68,7 +66,7 @@ class FlowFrameworkTests {
|
||||
@Before
|
||||
fun setUpMockNet() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
)
|
||||
|
||||
@ -100,8 +98,8 @@ class FlowFrameworkTests {
|
||||
assertThat(flow.lazyTime).isNotNull()
|
||||
}
|
||||
|
||||
class SuspendThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
|
||||
var thrown = false
|
||||
class SuspendThrowingActionExecutor(private val exception: Exception, private val delegate: ActionExecutor) : ActionExecutor {
|
||||
private var thrown = false
|
||||
@Suspendable
|
||||
override fun executeAction(fiber: FlowFiber, action: Action) {
|
||||
if (action is Action.CommitTransaction && !thrown) {
|
||||
@ -367,10 +365,17 @@ class FlowFrameworkTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `unknown class in session init`() {
|
||||
fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() {
|
||||
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
|
||||
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
|
||||
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
|
||||
assertThat(medicalRecords).hasSize(1)
|
||||
val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit
|
||||
assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class")
|
||||
bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected
|
||||
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
|
||||
assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class")
|
||||
}
|
||||
@ -441,334 +446,142 @@ class FlowFrameworkTests {
|
||||
|
||||
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
|
||||
|
||||
private fun TestStartedNode.sendSessionMessage(message: SessionMessage, destination: Party) {
|
||||
services.networkService.apply {
|
||||
val address = getAddressOfParty(PartyInfo.SingleNode(destination, emptyList()))
|
||||
send(createMessage(FlowMessagingImpl.sessionTopic, message.serialize().bytes), address)
|
||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
||||
assertThat(receivedSessionMessages).containsExactly(*expected)
|
||||
}
|
||||
|
||||
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>>
|
||||
get() {
|
||||
return progressTracker!!.changes
|
||||
.ofType(Change.Position::class.java)
|
||||
.map { it.newStep }
|
||||
.materialize()
|
||||
.toList()
|
||||
.toFuture()
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
|
||||
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Kick off the flow on the other side ...
|
||||
val session = initiateFlow(otherParty)
|
||||
session.send(1)
|
||||
// ... then pause this one until it's received the session-end message from the other side
|
||||
receivedOtherFlowEnd.acquire()
|
||||
session.sendAndReceive<Int>(2)
|
||||
}
|
||||
}
|
||||
|
||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
||||
assertThat(receivedSessionMessages).containsExactly(*expected)
|
||||
// we need brand new class for a flow to fail, so here it is
|
||||
@InitiatingFlow
|
||||
private open class NeverRegisteredFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<FlowInfo>() {
|
||||
init {
|
||||
require(otherParties.isNotEmpty())
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call(): FlowInfo {
|
||||
val flowInfos = otherParties.map {
|
||||
val session = initiateFlow(it)
|
||||
session.send(payload)
|
||||
session.getCounterpartyFlowInfo()
|
||||
}.toList()
|
||||
return flowInfos.first()
|
||||
}
|
||||
}
|
||||
|
||||
private object WaitingFlows {
|
||||
@InitiatingFlow
|
||||
class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val otherPartySession = initiateFlow(otherParty)
|
||||
otherPartySession.send(stx)
|
||||
return waitForLedgerCommit(stx.id)
|
||||
}
|
||||
}
|
||||
|
||||
class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val stx = otherPartySession.receive<SignedTransaction>().unwrap { it }
|
||||
if (throwException != null) throw throwException.invoke()
|
||||
return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class LazyServiceHubAccessFlow : FlowLogic<Unit>() {
|
||||
val lazyTime: Instant by lazy { serviceHub.clock.instant() }
|
||||
@Suspendable
|
||||
override fun call() = Unit
|
||||
}
|
||||
|
||||
private interface CustomInterface
|
||||
|
||||
private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
|
||||
|
||||
@InitiatingFlow
|
||||
private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
|
||||
|
||||
@InitiatingFlow
|
||||
private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<List<StateAndRef<ContractState>>>() {
|
||||
@Suspendable
|
||||
override fun call(): List<StateAndRef<ContractState>> {
|
||||
val otherPartySession = initiateFlow(otherParty)
|
||||
otherPartySession.send(stx)
|
||||
// hold onto reference here to force checkpoint of vaultService and thus
|
||||
// prove it is registered as a tokenizableService in the node
|
||||
val vaultQuerySvc = serviceHub.vaultService
|
||||
waitForLedgerCommit(stx.id)
|
||||
return vaultQuerySvc.queryBy<ContractState>().states
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow(version = 2)
|
||||
private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic<Pair<Any, Int>>() {
|
||||
constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Pair<Any, Int> {
|
||||
val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty)
|
||||
val received = otherPartySession.receive<Any>().unwrap { it }
|
||||
val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion
|
||||
return Pair(received, otherFlowVersion)
|
||||
}
|
||||
}
|
||||
|
||||
private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val payload = otherPartySession.receive<String>().unwrap { it }
|
||||
subFlow(InlinedSendFlow(payload + payload, otherPartySession))
|
||||
}
|
||||
}
|
||||
|
||||
private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(SingleInlinedSubFlow(otherPartySession))
|
||||
}
|
||||
}
|
||||
|
||||
private data class NonSerialisableData(val a: Int)
|
||||
private class NonSerialisableFlowException(@Suppress("unused") val data: NonSerialisableData) : FlowException()
|
||||
|
||||
private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = otherPartySession.send(payload)
|
||||
}
|
||||
|
||||
//endregion Helpers
|
||||
}
|
||||
|
||||
class FlowFrameworkTripartyTests {
|
||||
internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
|
||||
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
private lateinit var bobNode: TestStartedNode
|
||||
private lateinit var charlieNode: TestStartedNode
|
||||
private lateinit var alice: Party
|
||||
private lateinit var bob: Party
|
||||
private lateinit var charlie: Party
|
||||
private lateinit var notaryIdentity: Party
|
||||
private val receivedSessionMessages = ArrayList<SessionTransfer>()
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setUpGlobalMockNet() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
)
|
||||
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
||||
charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME))
|
||||
|
||||
|
||||
// Extract identities
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
bob = bobNode.info.singleIdentity()
|
||||
charlie = charlieNode.info.singleIdentity()
|
||||
notaryIdentity = mockNet.defaultNotaryIdentity
|
||||
|
||||
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
receivedSessionMessages.clear()
|
||||
}
|
||||
|
||||
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
||||
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `sending to multiple parties`() {
|
||||
bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
|
||||
charlieNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
|
||||
val payload = "Hello World"
|
||||
aliceNode.services.startFlow(SendFlow(payload, bob, charlie))
|
||||
mockNet.runNetwork()
|
||||
bobNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
charlieNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
val bobFlow = bobNode.getSingleFlow<InitiatedReceiveFlow>().first
|
||||
val charlieFlow = charlieNode.getSingleFlow<InitiatedReceiveFlow>().first
|
||||
assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload)
|
||||
assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload)
|
||||
|
||||
assertSessionTransfers(bobNode,
|
||||
aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
aliceNode sent normalEnd to bobNode
|
||||
//There's no session end from the other flows as they're manually suspended
|
||||
)
|
||||
|
||||
assertSessionTransfers(charlieNode,
|
||||
aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode,
|
||||
charlieNode sent sessionConfirm() to aliceNode,
|
||||
aliceNode sent normalEnd to charlieNode
|
||||
//There's no session end from the other flows as they're manually suspended
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `receiving from multiple parties`() {
|
||||
val bobPayload = "Test 1"
|
||||
val charliePayload = "Test 2"
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) }
|
||||
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) }
|
||||
val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating()
|
||||
aliceNode.services.startFlow(multiReceiveFlow)
|
||||
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
mockNet.runNetwork()
|
||||
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload)
|
||||
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload)
|
||||
|
||||
assertSessionTransfers(bobNode,
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData(bobPayload) to aliceNode,
|
||||
bobNode sent normalEnd to aliceNode
|
||||
)
|
||||
|
||||
assertSessionTransfers(charlieNode,
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode,
|
||||
charlieNode sent sessionConfirm() to aliceNode,
|
||||
charlieNode sent sessionData(charliePayload) to aliceNode,
|
||||
charlieNode sent normalEnd to aliceNode
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowException only propagated to parent`() {
|
||||
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } }
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) }
|
||||
val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob))
|
||||
mockNet.runNetwork()
|
||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
|
||||
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
|
||||
// Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move
|
||||
// onto Charlie which will throw the exception
|
||||
val node2Fiber = bobNode
|
||||
.registerCordappFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") }
|
||||
.map { it.stateMachine }
|
||||
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } }
|
||||
|
||||
val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's
|
||||
// not relevant to it) but it will end its session with it
|
||||
assertThatExceptionOfType(MyFlowException::class.java).isThrownBy {
|
||||
aliceFiber.resultFuture.getOrThrow()
|
||||
}
|
||||
val bobResultFuture = node2Fiber.getOrThrow().resultFuture
|
||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
|
||||
bobResultFuture.getOrThrow()
|
||||
}
|
||||
|
||||
assertSessionTransfers(bobNode,
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData("Hello") to aliceNode,
|
||||
aliceNode sent errorMessage() to bobNode
|
||||
)
|
||||
}
|
||||
|
||||
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
|
||||
|
||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
||||
assertThat(receivedSessionMessages).containsExactly(*expected)
|
||||
}
|
||||
|
||||
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
|
||||
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
|
||||
assertThat(actualForNode).containsExactly(*expected)
|
||||
return actualForNode
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class FlowFrameworkPersistenceTests {
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
}
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private val receivedSessionMessages = ArrayList<SessionTransfer>()
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
private lateinit var bobNode: TestStartedNode
|
||||
private lateinit var notaryIdentity: Party
|
||||
private lateinit var alice: Party
|
||||
private lateinit var bob: Party
|
||||
private lateinit var aliceFlowManager: MockNodeFlowManager
|
||||
private lateinit var bobFlowManager: MockNodeFlowManager
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
)
|
||||
aliceFlowManager = MockNodeFlowManager()
|
||||
bobFlowManager = MockNodeFlowManager()
|
||||
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager))
|
||||
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager))
|
||||
|
||||
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
|
||||
|
||||
// Extract identities
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
bob = bobNode.info.singleIdentity()
|
||||
notaryIdentity = mockNet.defaultNotaryIdentity
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
receivedSessionMessages.clear()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `newly added flow is preserved on restart`() {
|
||||
aliceNode.services.startFlow(NoOpFlow(nonTerminating = true))
|
||||
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
val restoredFlow = aliceNode.restartAndGetRestoredFlow<NoOpFlow>()
|
||||
assertThat(restoredFlow.flowStarted).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `flow restarted just after receiving payload`() {
|
||||
bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
|
||||
aliceNode.services.startFlow(SendFlow("Hello", bob))
|
||||
|
||||
// We push through just enough messages to get only the payload sent
|
||||
bobNode.pumpReceive()
|
||||
bobNode.internals.disableDBCloseOnStop()
|
||||
bobNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
bobNode.dispose()
|
||||
mockNet.runNetwork()
|
||||
val restoredFlow = bobNode.restartAndGetRestoredFlow<InitiatedReceiveFlow>()
|
||||
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `flow loaded from checkpoint will respond to messages from before start`() {
|
||||
aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
|
||||
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
|
||||
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
|
||||
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
|
||||
}
|
||||
|
||||
@Ignore("Some changes in startup order make this test's assumptions fail.")
|
||||
@Test
|
||||
fun `flow with send will resend on interrupted restart`() {
|
||||
val payload = random63BitValue()
|
||||
val payload2 = random63BitValue()
|
||||
|
||||
var sentCount = 0
|
||||
mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ }
|
||||
val charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME))
|
||||
val secondFlow = charlieNode.registerCordappFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) }
|
||||
mockNet.runNetwork()
|
||||
val charlie = charlieNode.info.singleIdentity()
|
||||
|
||||
// Kick off first send and receive
|
||||
bobNode.services.startFlow(PingPongFlow(charlie, payload))
|
||||
bobNode.database.transaction {
|
||||
assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size)
|
||||
}
|
||||
// Make sure the add() has finished initial processing.
|
||||
bobNode.internals.disableDBCloseOnStop()
|
||||
// Restart node and thus reload the checkpoint and resend the message with same UUID
|
||||
bobNode.dispose()
|
||||
bobNode.database.transaction {
|
||||
assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint
|
||||
bobNode.services.networkMapCache.clearNetworkMapCache()
|
||||
}
|
||||
val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id))
|
||||
bobNode.internals.manuallyCloseDB()
|
||||
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
|
||||
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
|
||||
mockNet.runNetwork()
|
||||
fut1.getOrThrow()
|
||||
|
||||
val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer }
|
||||
// Check flows completed cleanly and didn't get out of phase
|
||||
assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way
|
||||
// can't give a precise value as every addMessageHandler re-runs the undelivered messages
|
||||
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages")
|
||||
node2b.database.transaction {
|
||||
assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
|
||||
}
|
||||
charlieNode.database.transaction {
|
||||
assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
|
||||
}
|
||||
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
|
||||
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
|
||||
assertEquals(payload, secondFlow.getOrThrow().receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
|
||||
assertEquals(payload + 1, secondFlow.getOrThrow().receivedPayload2, "Received payload does not match the expected second value on Node 2")
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
//region Helpers
|
||||
|
||||
private inline fun <reified P : FlowLogic<*>> TestStartedNode.restartAndGetRestoredFlow(): P {
|
||||
val newNode = mockNet.restartNode(this)
|
||||
newNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
mockNet.runNetwork()
|
||||
return newNode.getSingleFlow<P>().first
|
||||
}
|
||||
|
||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
||||
assertThat(receivedSessionMessages).containsExactly(*expected)
|
||||
}
|
||||
|
||||
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
|
||||
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
|
||||
assertThat(actualForNode).containsExactly(*expected)
|
||||
return actualForNode
|
||||
}
|
||||
|
||||
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
||||
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
||||
}
|
||||
|
||||
//endregion Helpers
|
||||
}
|
||||
|
||||
private fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
|
||||
|
||||
private inline fun <reified P : FlowLogic<*>> TestStartedNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
|
||||
internal inline fun <reified P : FlowLogic<*>> TestStartedNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
|
||||
return smm.findStateMachines(P::class.java).single()
|
||||
}
|
||||
|
||||
@ -792,7 +605,7 @@ private fun sanitise(message: SessionMessage) = when (message) {
|
||||
}
|
||||
}
|
||||
|
||||
private fun Observable<MessageTransfer>.toSessionTransfers(): Observable<SessionTransfer> {
|
||||
internal fun Observable<MessageTransfer>.toSessionTransfers(): Observable<SessionTransfer> {
|
||||
return filter { it.getMessage().topic == FlowMessagingImpl.sessionTopic }.map {
|
||||
val from = it.sender.id
|
||||
val message = it.messageData.deserialize<SessionMessage>()
|
||||
@ -800,12 +613,19 @@ private fun Observable<MessageTransfer>.toSessionTransfers(): Observable<Session
|
||||
}
|
||||
}
|
||||
|
||||
private fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
|
||||
internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destination: Party) {
|
||||
services.networkService.apply {
|
||||
val address = getAddressOfParty(PartyInfo.SingleNode(destination, emptyList()))
|
||||
send(createMessage(FlowMessagingImpl.sessionTopic, message.serialize().bytes), address)
|
||||
}
|
||||
}
|
||||
|
||||
private infix fun TestStartedNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(internals.id, message)
|
||||
private infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
|
||||
internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
|
||||
|
||||
private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
|
||||
internal infix fun TestStartedNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(internals.id, message)
|
||||
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
|
||||
|
||||
internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
|
||||
val isPayloadTransfer: Boolean
|
||||
get() =
|
||||
message is ExistingSessionMessage && message.payload is DataSessionMessage ||
|
||||
@ -814,53 +634,14 @@ private data class SessionTransfer(val from: Int, val message: SessionMessage, v
|
||||
override fun toString(): String = "$from sent $message to $to"
|
||||
}
|
||||
|
||||
|
||||
private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage {
|
||||
internal fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage {
|
||||
return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize())
|
||||
}
|
||||
|
||||
private fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize()))
|
||||
|
||||
|
||||
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>>
|
||||
get() {
|
||||
return progressTracker!!.changes
|
||||
.ofType(Change.Position::class.java)
|
||||
.map { it.newStep }
|
||||
.materialize()
|
||||
.toList()
|
||||
.toFuture()
|
||||
}
|
||||
|
||||
class ThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
|
||||
var thrown = false
|
||||
@Suspendable
|
||||
override fun executeAction(fiber: FlowFiber, action: Action) {
|
||||
if (thrown) {
|
||||
delegate.executeAction(fiber, action)
|
||||
} else {
|
||||
thrown = true
|
||||
throw exception
|
||||
}
|
||||
}
|
||||
}
|
||||
internal fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize()))
|
||||
|
||||
@InitiatingFlow
|
||||
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
|
||||
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Kick off the flow on the other side ...
|
||||
val session = initiateFlow(otherParty)
|
||||
session.send(1)
|
||||
// ... then pause this one until it's received the session-end message from the other side
|
||||
receivedOtherFlowEnd.acquire()
|
||||
session.sendAndReceive<Int>(2)
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<FlowInfo>() {
|
||||
internal open class SendFlow(private val payload: Any, private vararg val otherParties: Party) : FlowLogic<FlowInfo>() {
|
||||
init {
|
||||
require(otherParties.isNotEmpty())
|
||||
}
|
||||
@ -876,46 +657,7 @@ private open class SendFlow(val payload: Any, vararg val otherParties: Party) :
|
||||
}
|
||||
}
|
||||
|
||||
// we need brand new class for a flow to fail, so here it is
|
||||
@InitiatingFlow
|
||||
private open class NeverRegisteredFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<FlowInfo>() {
|
||||
init {
|
||||
require(otherParties.isNotEmpty())
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call(): FlowInfo {
|
||||
val flowInfos = otherParties.map {
|
||||
val session = initiateFlow(it)
|
||||
session.send(payload)
|
||||
session.getCounterpartyFlowInfo()
|
||||
}.toList()
|
||||
return flowInfos.first()
|
||||
}
|
||||
}
|
||||
|
||||
private object WaitingFlows {
|
||||
@InitiatingFlow
|
||||
class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val otherPartySession = initiateFlow(otherParty)
|
||||
otherPartySession.send(stx)
|
||||
return waitForLedgerCommit(stx.id)
|
||||
}
|
||||
}
|
||||
|
||||
class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val stx = otherPartySession.receive<SignedTransaction>().unwrap { it }
|
||||
if (throwException != null) throw throwException.invoke()
|
||||
return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
|
||||
internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
|
||||
@Transient
|
||||
var flowStarted = false
|
||||
|
||||
@ -928,7 +670,7 @@ private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>()
|
||||
}
|
||||
}
|
||||
|
||||
private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
internal class InitiatedReceiveFlow(private val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
object START_STEP : ProgressTracker.Step("Starting")
|
||||
object RECEIVED_STEP : ProgressTracker.Step("Received")
|
||||
|
||||
@ -953,26 +695,13 @@ private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLog
|
||||
}
|
||||
}
|
||||
|
||||
private class LazyServiceHubAccessFlow : FlowLogic<Unit>() {
|
||||
val lazyTime: Instant by lazy { serviceHub.clock.instant() }
|
||||
@Suspendable
|
||||
override fun call() = Unit
|
||||
}
|
||||
|
||||
private open class InitiatedSendFlow(val payload: Any, val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
internal open class InitiatedSendFlow(private val payload: Any, private val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = otherPartySession.send(payload)
|
||||
}
|
||||
|
||||
private interface CustomInterface
|
||||
|
||||
private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
|
||||
|
||||
@InitiatingFlow
|
||||
private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
|
||||
|
||||
@InitiatingFlow
|
||||
private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
internal class ReceiveFlow(private vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
object START_STEP : ProgressTracker.Step("Starting")
|
||||
object RECEIVED_STEP : ProgressTracker.Step("Received")
|
||||
|
||||
@ -1001,72 +730,23 @@ private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
}
|
||||
}
|
||||
|
||||
private class MyFlowException(override val message: String) : FlowException() {
|
||||
internal class MyFlowException(override val message: String) : FlowException() {
|
||||
override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message
|
||||
override fun hashCode(): Int = message.hashCode()
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<List<StateAndRef<ContractState>>>() {
|
||||
@Suspendable
|
||||
override fun call(): List<StateAndRef<ContractState>> {
|
||||
val otherPartySession = initiateFlow(otherParty)
|
||||
otherPartySession.send(stx)
|
||||
// hold onto reference here to force checkpoint of vaultService and thus
|
||||
// prove it is registered as a tokenizableService in the node
|
||||
val vaultQuerySvc = serviceHub.vaultService
|
||||
waitForLedgerCommit(stx.id)
|
||||
return vaultQuerySvc.queryBy<ContractState>().states
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow(version = 2)
|
||||
private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic<Pair<Any, Int>>() {
|
||||
constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Pair<Any, Int> {
|
||||
val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty)
|
||||
val received = otherPartySession.receive<Any>().unwrap { it }
|
||||
val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion
|
||||
return Pair(received, otherFlowVersion)
|
||||
}
|
||||
}
|
||||
|
||||
private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val payload = otherPartySession.receive<String>().unwrap { it }
|
||||
subFlow(InlinedSendFlow(payload + payload, otherPartySession))
|
||||
}
|
||||
}
|
||||
|
||||
private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(SingleInlinedSubFlow(otherPartySession))
|
||||
}
|
||||
}
|
||||
|
||||
private data class NonSerialisableData(val a: Int)
|
||||
private class NonSerialisableFlowException(@Suppress("unused") val data: NonSerialisableData) : FlowException()
|
||||
|
||||
@InitiatingFlow
|
||||
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any, val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
|
||||
internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
|
||||
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Any = (otherPartySession
|
||||
?: initiateFlow(otherParty)).sendAndReceive<Any>(payload).unwrap { it }
|
||||
}
|
||||
|
||||
private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = otherPartySession.send(payload)
|
||||
override fun call(): Any {
|
||||
return (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive<Any>(payload).unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPartySession: FlowSession? = null) : FlowLogic<Unit>() {
|
||||
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic<Unit>() {
|
||||
constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession)
|
||||
|
||||
@Transient
|
||||
@ -1082,7 +762,7 @@ private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPa
|
||||
}
|
||||
}
|
||||
|
||||
private class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<Nothing>() {
|
||||
internal class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<Nothing>() {
|
||||
object START_STEP : ProgressTracker.Step("Starting")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(START_STEP)
|
||||
@ -1094,4 +774,4 @@ private class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<N
|
||||
exceptionThrown = exception()
|
||||
throw exceptionThrown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,102 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.client.rpc.notUsed
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import java.util.*
|
||||
|
||||
// FlowFrameworkTests but each node uses MultiThreadedStateMachineManager with one thread.
|
||||
// TODO This should extend FlowFrameworkTests so that it runs all those tests, but there are test failures
|
||||
class FlowFrameworkTestsUsingMultithreadedSMM {
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
}
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
private lateinit var bobNode: TestStartedNode
|
||||
private lateinit var alice: Party
|
||||
private lateinit var bob: Party
|
||||
private lateinit var notaryIdentity: Party
|
||||
private val receivedSessionMessages = ArrayList<SessionTransfer>()
|
||||
|
||||
@Before
|
||||
fun setUpMockNet() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"),
|
||||
servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()
|
||||
)
|
||||
|
||||
aliceNode = createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
bobNode = createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
||||
|
||||
// Extract identities
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
bob = bobNode.info.singleIdentity()
|
||||
notaryIdentity = mockNet.defaultNotaryIdentity
|
||||
|
||||
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
|
||||
}
|
||||
|
||||
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
||||
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
receivedSessionMessages.clear()
|
||||
}
|
||||
|
||||
private fun createNode(parameters: InternalMockNodeParameters): TestStartedNode {
|
||||
return mockNet.createNode(parameters) {
|
||||
object : InternalMockNetwork.MockNode(it) {
|
||||
override fun makeStateMachineManager(): StateMachineManager {
|
||||
val executor = MultiThreadedStateMachineExecutor(metricRegistry, 1)
|
||||
return MultiThreadedStateMachineManager(
|
||||
services,
|
||||
checkpointStorage,
|
||||
executor,
|
||||
database,
|
||||
newSecureRandom(),
|
||||
busyNodeLatch,
|
||||
cordappLoader.appClassLoader
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() {
|
||||
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
|
||||
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
|
||||
assertThat(medicalRecords).hasSize(1)
|
||||
val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit
|
||||
assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class")
|
||||
bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected
|
||||
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
|
||||
assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class")
|
||||
}
|
||||
}
|
@ -0,0 +1,178 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.flows.registerCordappFlowFactory
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.AssertionsForClassTypes
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import java.util.*
|
||||
|
||||
class FlowFrameworkTripartyTests {
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
private lateinit var bobNode: TestStartedNode
|
||||
private lateinit var charlieNode: TestStartedNode
|
||||
private lateinit var alice: Party
|
||||
private lateinit var bob: Party
|
||||
private lateinit var charlie: Party
|
||||
private lateinit var notaryIdentity: Party
|
||||
private val receivedSessionMessages = ArrayList<SessionTransfer>()
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setUpGlobalMockNet() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
|
||||
servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()
|
||||
)
|
||||
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
||||
charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME))
|
||||
|
||||
|
||||
// Extract identities
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
bob = bobNode.info.singleIdentity()
|
||||
charlie = charlieNode.info.singleIdentity()
|
||||
notaryIdentity = mockNet.defaultNotaryIdentity
|
||||
|
||||
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
receivedSessionMessages.clear()
|
||||
}
|
||||
|
||||
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
||||
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `sending to multiple parties`() {
|
||||
bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it)
|
||||
.nonTerminating() }
|
||||
charlieNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it)
|
||||
.nonTerminating() }
|
||||
val payload = "Hello World"
|
||||
aliceNode.services.startFlow(SendFlow(payload, bob, charlie))
|
||||
mockNet.runNetwork()
|
||||
bobNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
charlieNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
val bobFlow = bobNode.getSingleFlow<InitiatedReceiveFlow>().first
|
||||
val charlieFlow = charlieNode.getSingleFlow<InitiatedReceiveFlow>().first
|
||||
assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload)
|
||||
assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload)
|
||||
|
||||
assertSessionTransfers(bobNode,
|
||||
aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
aliceNode sent normalEnd to bobNode
|
||||
//There's no session end from the other flows as they're manually suspended
|
||||
)
|
||||
|
||||
assertSessionTransfers(charlieNode,
|
||||
aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode,
|
||||
charlieNode sent sessionConfirm() to aliceNode,
|
||||
aliceNode sent normalEnd to charlieNode
|
||||
//There's no session end from the other flows as they're manually suspended
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `receiving from multiple parties`() {
|
||||
val bobPayload = "Test 1"
|
||||
val charliePayload = "Test 2"
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) }
|
||||
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) }
|
||||
val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating()
|
||||
aliceNode.services.startFlow(multiReceiveFlow)
|
||||
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
|
||||
mockNet.runNetwork()
|
||||
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload)
|
||||
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload)
|
||||
|
||||
assertSessionTransfers(bobNode,
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData(bobPayload) to aliceNode,
|
||||
bobNode sent normalEnd to aliceNode
|
||||
)
|
||||
|
||||
assertSessionTransfers(charlieNode,
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode,
|
||||
charlieNode sent sessionConfirm() to aliceNode,
|
||||
charlieNode sent sessionData(charliePayload) to aliceNode,
|
||||
charlieNode sent normalEnd to aliceNode
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowException only propagated to parent`() {
|
||||
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } }
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) }
|
||||
val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob))
|
||||
mockNet.runNetwork()
|
||||
AssertionsForClassTypes.assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
|
||||
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
|
||||
// Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move
|
||||
// onto Charlie which will throw the exception
|
||||
val node2Fiber = bobNode
|
||||
.registerCordappFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") }
|
||||
.map { it.stateMachine }
|
||||
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } }
|
||||
|
||||
val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's
|
||||
// not relevant to it) but it will end its session with it
|
||||
AssertionsForClassTypes.assertThatExceptionOfType(MyFlowException::class.java)
|
||||
.isThrownBy {
|
||||
aliceFiber.resultFuture.getOrThrow()
|
||||
}
|
||||
val bobResultFuture = node2Fiber.getOrThrow().resultFuture
|
||||
AssertionsForClassTypes.assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
|
||||
bobResultFuture.getOrThrow()
|
||||
}
|
||||
|
||||
assertSessionTransfers(bobNode,
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData("Hello") to aliceNode,
|
||||
aliceNode sent errorMessage() to bobNode
|
||||
)
|
||||
}
|
||||
|
||||
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
|
||||
|
||||
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
|
||||
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
|
||||
assertThat(actualForNode).containsExactly(*expected)
|
||||
return actualForNode
|
||||
}
|
||||
}
|
@ -175,8 +175,6 @@ class RetryFlowMockTest {
|
||||
TODO("not implemented")
|
||||
}
|
||||
}), nodeA.services.newContext()).get()
|
||||
// Should be 2 records, one for admission and one for keep in.
|
||||
records.next()
|
||||
records.next()
|
||||
// Killing it should remove it.
|
||||
nodeA.smm.killFlow(flow.id)
|
||||
|
@ -58,7 +58,9 @@ This will create a new sub-task under each of the test tickets for `<PRODUCT>` `
|
||||
|
||||
## Options
|
||||
|
||||
Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user <USER>`. For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`.
|
||||
Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user <USER>`. You can also provide the user name in the environment variable, `JIRA_USER`.
|
||||
|
||||
For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`.
|
||||
|
||||
There is also a useful dry-run option, `--dry-run` or `-d`, that lets you run through the command without creating any tickets or applying any changes to JIRA.
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
# {{{ Dependencies
|
||||
|
||||
from __future__ import print_function
|
||||
import sys
|
||||
import sys, os
|
||||
|
||||
try:
|
||||
from getpass import getpass
|
||||
@ -41,9 +41,13 @@ def confirm(message, auto_yes=False):
|
||||
# {{{ login(account, user, password, use_keyring) - Present user with login prompt and return the provided username and password. If use_keyring is true, use previously provided password (if any)
|
||||
def login(account, user=None, password=None, use_keyring=True):
|
||||
if not user:
|
||||
user = prompt('Username: ')
|
||||
user = u'{}@r3.com'.format(user) if '@' not in user else user
|
||||
if not user: return (None, None)
|
||||
if 'JIRA_USER' not in os.environ:
|
||||
user = prompt('Username: ')
|
||||
user = u'{}@r3.com'.format(user) if '@' not in user else user
|
||||
if not user: return (None, None)
|
||||
else:
|
||||
user = os.environ['JIRA_USER']
|
||||
print('Username: {}'.format(user))
|
||||
else:
|
||||
user = u'{}@r3.com'.format(user) if '@' not in user else user
|
||||
print('Username: {}'.format(user))
|
||||
|
@ -16,10 +16,10 @@ try:
|
||||
def red(message): return colored(message, 'red')
|
||||
def yellow(message): return colored(message, 'yellow')
|
||||
def faint(message): return colored(message, 'white', attrs=['dark'])
|
||||
def on_green(message): return colored(message, 'white', 'on_green')
|
||||
def on_red(message): return colored(message, 'white', 'on_red')
|
||||
def blue_on_white(message): return colored(message, 'blue', 'on_white')
|
||||
def yellow_on_white(message): return colored(message, 'yellow', 'on_white')
|
||||
def on_green(message): return colored(message, 'green')
|
||||
def on_red(message): return colored(message, 'red')
|
||||
def blue_on_white(message): return colored(message, 'blue')
|
||||
def yellow_on_white(message): return colored(message, 'yellow')
|
||||
except:
|
||||
def blue(message): return u'[{}]'.format(message)
|
||||
def green(message): return message
|
||||
@ -159,7 +159,10 @@ def create_version(args):
|
||||
jira.jira.create_version(name=version, project=project, description=version)
|
||||
print(u' {} - Created version for project {}'.format(green('SUCCESS'), blue(project)))
|
||||
except Exception as error:
|
||||
print(u' {} - Failed to version: {}'.format(red('FAIL'), error))
|
||||
if args.verbose:
|
||||
print(u' {} - Failed to version: {}'.format(red('FAIL'), error))
|
||||
else:
|
||||
print(u' {} - Failed to version: {}'.format(red('FAIL'), error.text))
|
||||
print()
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user