diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index 35e599ce38..7234acdbc7 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -118,6 +118,7 @@
+
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt
index 00bcae9d92..0e3e50f2f9 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt
@@ -324,6 +324,7 @@ class StateMachineManagerImpl(
val recipientId = sessionMessage.recipientSessionId
val flowId = sessionToFlow[recipientId]
if (flowId == null) {
+ acknowledgeHandle.acknowledge()
if (sessionMessage.payload is EndSessionMessage) {
logger.debug {
"Got ${EndSessionMessage::class.java.simpleName} for " +
@@ -610,6 +611,7 @@ class StateMachineManagerImpl(
removalReason: FlowRemovalReason.OrderlyFinish,
lastState: StateMachineState
) {
+ drainFlowEventQueue(flow)
// final sanity checks
require(lastState.unacknowledgedMessages.isEmpty())
require(lastState.isRemoved)
@@ -625,6 +627,7 @@ class StateMachineManagerImpl(
removalReason: FlowRemovalReason.ErrorFinish,
lastState: StateMachineState
) {
+ drainFlowEventQueue(flow)
val flowError = removalReason.flowErrors[0] // TODO what to do with several?
val exception = flowError.exception
(exception as? FlowException)?.originalErrorId = flowError.errorId
@@ -632,6 +635,30 @@ class StateMachineManagerImpl(
lastState.flowLogic.progressTracker?.endWithError(exception)
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure(exception)))
}
+
+ // The flow's event queue may be non-empty in case it shut down abruptly. We handle outstanding events here.
+ private fun drainFlowEventQueue(flow: Flow) {
+ while (true) {
+ val event = flow.fiber.transientValues!!.value.eventQueue.tryReceive() ?: return
+ when (event) {
+ is Event.DeliverSessionMessage -> {
+ // Acknowledge the message so it doesn't leak in the broker.
+ event.acknowledgeHandle.acknowledge()
+ when (event.sessionMessage.payload) {
+ EndSessionMessage -> {
+ logger.debug { "Unhandled message ${event.sessionMessage} due to flow shutting down" }
+ }
+ else -> {
+ logger.warn("Unhandled message ${event.sessionMessage} due to flow shutting down")
+ }
+ }
+ }
+ else -> {
+ logger.warn("Unhandled event $event due to flow shutting down")
+ }
+ }
+ }
+ }
}
class SessionRejectException(reason: String) : CordaException(reason)
diff --git a/perftestcordapp/build.gradle b/perftestcordapp/build.gradle
index ba8c4a72b0..48142030f1 100644
--- a/perftestcordapp/build.gradle
+++ b/perftestcordapp/build.gradle
@@ -10,6 +10,30 @@ apply plugin: 'net.corda.plugins.cordapp'
description 'Corda performance test modules'
+sourceSets {
+ integrationTest {
+ kotlin {
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ srcDir file('src/integration-test/kotlin')
+ }
+ java {
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ srcDir file('src/integration-test/java')
+ }
+ resources {
+ srcDir file('src/integration-test/resources')
+ srcDir file('../../testing/test-utils/src/main/resources')
+ }
+ }
+}
+
+task integrationTest(type: Test) {
+ testClassesDirs = sourceSets.integrationTest.output.classesDirs
+ classpath = sourceSets.integrationTest.runtimeClasspath
+}
+
dependencies {
// Note the :finance module is a CorDapp in its own right
// and CorDapps using :finance features should use 'cordapp' not 'compile' linkage.
@@ -29,6 +53,9 @@ dependencies {
configurations {
testArtifacts.extendsFrom testRuntime
+
+ integrationTestCompile.extendsFrom testCompile
+ integrationTestRuntime.extendsFrom testRuntime
}
task testJar(type: Jar) {
diff --git a/perftestcordapp/src/integrationTest/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/NoSelectionIntegrationTest.kt b/perftestcordapp/src/integrationTest/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/NoSelectionIntegrationTest.kt
new file mode 100644
index 0000000000..bf0cbd457f
--- /dev/null
+++ b/perftestcordapp/src/integrationTest/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/NoSelectionIntegrationTest.kt
@@ -0,0 +1,33 @@
+package com.r3.corda.enterprise.perftestcordapp.flows
+
+import net.corda.core.messaging.startFlow
+import net.corda.core.utilities.OpaqueBytes
+import net.corda.core.utilities.getOrThrow
+import net.corda.finance.DOLLARS
+import net.corda.node.services.Permissions
+import net.corda.nodeapi.internal.config.User
+import net.corda.testing.driver.PortAllocation
+import net.corda.testing.driver.driver
+import org.junit.Ignore
+import org.junit.Test
+
+@Ignore("Use to test no-selection locally")
+class NoSelectionIntegrationTest {
+
+ @Test
+ fun `single pay no selection`() {
+ val aliceUser = User("A", "A", setOf(Permissions.startFlow()))
+ driver(
+ startNodesInProcess = true,
+ extraCordappPackagesToScan = listOf("com.r3.corda.enterprise.perftestcordapp"),
+ portAllocation = PortAllocation.Incremental(20000)
+ ) {
+ val alice = startNode(rpcUsers = listOf(aliceUser)).get()
+ defaultNotaryNode.get()
+ alice.rpcClientToNode().use("A", "A") { connection ->
+ connection.proxy.startFlow(::CashIssueAndPaymentNoSelection, 1.DOLLARS, OpaqueBytes.of(0), alice.nodeInfo.legalIdentities[0], false, defaultNotaryIdentity).returnValue.getOrThrow()
+ }
+ }
+ }
+
+}