mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Attempting to initiate flow that's not in the destination's classpath returns back a session reject
This commit is contained in:
2
.idea/compiler.xml
generated
2
.idea/compiler.xml
generated
@ -36,6 +36,8 @@
|
|||||||
<module name="explorer_test" target="1.8" />
|
<module name="explorer_test" target="1.8" />
|
||||||
<module name="finance_main" target="1.8" />
|
<module name="finance_main" target="1.8" />
|
||||||
<module name="finance_test" target="1.8" />
|
<module name="finance_test" target="1.8" />
|
||||||
|
<module name="graphs_main" target="1.8" />
|
||||||
|
<module name="graphs_test" target="1.8" />
|
||||||
<module name="irs-demo_integrationTest" target="1.8" />
|
<module name="irs-demo_integrationTest" target="1.8" />
|
||||||
<module name="irs-demo_main" target="1.8" />
|
<module name="irs-demo_main" target="1.8" />
|
||||||
<module name="irs-demo_test" target="1.8" />
|
<module name="irs-demo_test" target="1.8" />
|
||||||
|
@ -3,6 +3,7 @@ package net.corda.node.internal
|
|||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.node.services.statemachine.SessionInit
|
import net.corda.node.services.statemachine.SessionInit
|
||||||
|
import net.corda.node.services.statemachine.SessionRejectException
|
||||||
|
|
||||||
interface InitiatedFlowFactory<out F : FlowLogic<*>> {
|
interface InitiatedFlowFactory<out F : FlowLogic<*>> {
|
||||||
fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): F
|
fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): F
|
||||||
@ -24,4 +25,3 @@ interface InitiatedFlowFactory<out F : FlowLogic<*>> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class SessionRejectException(val rejectMessage: String, val logMessage: String) : Exception()
|
|
||||||
|
@ -18,6 +18,7 @@ import net.corda.core.utilities.*
|
|||||||
import net.corda.node.services.api.FlowAppAuditEvent
|
import net.corda.node.services.api.FlowAppAuditEvent
|
||||||
import net.corda.node.services.api.FlowPermissionAuditEvent
|
import net.corda.node.services.api.FlowPermissionAuditEvent
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
|
import net.corda.node.services.statemachine.FlowSessionState.Initiating
|
||||||
import net.corda.node.utilities.CordaPersistence
|
import net.corda.node.utilities.CordaPersistence
|
||||||
import net.corda.node.utilities.DatabaseTransaction
|
import net.corda.node.utilities.DatabaseTransaction
|
||||||
import net.corda.node.utilities.DatabaseTransactionManager
|
import net.corda.node.utilities.DatabaseTransactionManager
|
||||||
@ -98,7 +99,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
logger.debug { "Calling flow: $logic" }
|
logger.debug { "Calling flow: $logic" }
|
||||||
val startTime = System.nanoTime()
|
val startTime = System.nanoTime()
|
||||||
val result = try {
|
val result = try {
|
||||||
logic.call()
|
val r = logic.call()
|
||||||
|
// Only sessions which have done a single send and nothing else will block here
|
||||||
|
openSessions.values
|
||||||
|
.filter { it.state is Initiating }
|
||||||
|
.forEach { it.waitForConfirmation() }
|
||||||
|
r
|
||||||
} catch (e: FlowException) {
|
} catch (e: FlowException) {
|
||||||
recordDuration(startTime, success = false)
|
recordDuration(startTime, success = false)
|
||||||
// Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive).
|
// Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive).
|
||||||
@ -114,10 +120,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
recordDuration(startTime)
|
recordDuration(startTime)
|
||||||
// Only sessions which have done a single send and nothing else will block here
|
|
||||||
openSessions.values
|
|
||||||
.filter { it.state is FlowSessionState.Initiating }
|
|
||||||
.forEach { it.waitForConfirmation() }
|
|
||||||
// This is to prevent actionOnEnd being called twice if it throws an exception
|
// This is to prevent actionOnEnd being called twice if it throws an exception
|
||||||
actionOnEnd(Try.Success(result), false)
|
actionOnEnd(Try.Success(result), false)
|
||||||
_resultFuture?.set(result)
|
_resultFuture?.set(result)
|
||||||
@ -314,7 +316,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
|
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
|
||||||
openSessions[Pair(sessionFlow, otherParty)] = session
|
openSessions[Pair(sessionFlow, otherParty)] = session
|
||||||
val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass
|
val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass
|
||||||
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass, version, firstPayload)
|
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, firstPayload)
|
||||||
sendInternal(session, sessionInit)
|
sendInternal(session, sessionInit)
|
||||||
if (waitForConfirmation) {
|
if (waitForConfirmation) {
|
||||||
session.waitForConfirmation()
|
session.waitForConfirmation()
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package net.corda.node.services.statemachine
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
import net.corda.core.flows.FlowException
|
import net.corda.core.flows.FlowException
|
||||||
import net.corda.core.flows.FlowLogic
|
|
||||||
import net.corda.core.flows.UnexpectedFlowEndException
|
import net.corda.core.flows.UnexpectedFlowEndException
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.castIfPossible
|
import net.corda.core.internal.castIfPossible
|
||||||
@ -16,7 +15,7 @@ import net.corda.core.utilities.UntrustworthyData
|
|||||||
interface SessionMessage
|
interface SessionMessage
|
||||||
|
|
||||||
data class SessionInit(val initiatorSessionId: Long,
|
data class SessionInit(val initiatorSessionId: Long,
|
||||||
val initiatingFlowClass: Class<out FlowLogic<*>>,
|
val initiatingFlowClass: String,
|
||||||
val flowVerison: Int,
|
val flowVerison: Int,
|
||||||
val firstPayload: Any?) : SessionMessage
|
val firstPayload: Any?) : SessionMessage
|
||||||
|
|
||||||
|
@ -8,8 +8,6 @@ import com.esotericsoftware.kryo.KryoException
|
|||||||
import com.google.common.collect.HashMultimap
|
import com.google.common.collect.HashMultimap
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import com.google.common.util.concurrent.MoreExecutors
|
import com.google.common.util.concurrent.MoreExecutors
|
||||||
import net.corda.core.internal.ThreadBox
|
|
||||||
import net.corda.core.internal.bufferUntilSubscribed
|
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
import net.corda.core.flows.FlowException
|
import net.corda.core.flows.FlowException
|
||||||
@ -17,6 +15,8 @@ import net.corda.core.flows.FlowInitiator
|
|||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
|
import net.corda.core.internal.bufferUntilSubscribed
|
||||||
import net.corda.core.internal.castIfPossible
|
import net.corda.core.internal.castIfPossible
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
@ -27,7 +27,6 @@ import net.corda.core.utilities.Try
|
|||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.core.utilities.trace
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.internal.SessionRejectException
|
|
||||||
import net.corda.node.services.api.Checkpoint
|
import net.corda.node.services.api.Checkpoint
|
||||||
import net.corda.node.services.api.CheckpointStorage
|
import net.corda.node.services.api.CheckpointStorage
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
@ -342,14 +341,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
|
|
||||||
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
|
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
|
||||||
|
|
||||||
val initiatedFlowFactory = serviceHub.getFlowFactory(sessionInit.initiatingFlowClass)
|
|
||||||
if (initiatedFlowFactory == null) {
|
|
||||||
logger.warn("${sessionInit.initiatingFlowClass} has not been registered: $sessionInit")
|
|
||||||
sendSessionReject("${sessionInit.initiatingFlowClass.name} has not been registered")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
val session = try {
|
val session = try {
|
||||||
|
val initiatedFlowFactory = serviceHub.getFlowFactory(sessionInit.loadInitiatingFlowClass())
|
||||||
|
?: throw SessionRejectException("${sessionInit.initiatingFlowClass} is not registered")
|
||||||
val flow = initiatedFlowFactory.createFlow(receivedMessage.platformVersion, sender, sessionInit)
|
val flow = initiatedFlowFactory.createFlow(receivedMessage.platformVersion, sender, sessionInit)
|
||||||
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
|
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
|
||||||
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
|
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
|
||||||
@ -371,11 +365,21 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
}
|
}
|
||||||
|
|
||||||
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber)
|
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber)
|
||||||
session.fiber.logger.debug { "Initiated by $sender using ${sessionInit.initiatingFlowClass.name}" }
|
session.fiber.logger.debug { "Initiated by $sender using ${sessionInit.initiatingFlowClass}" }
|
||||||
session.fiber.logger.trace { "Initiated from $sessionInit on $session" }
|
session.fiber.logger.trace { "Initiated from $sessionInit on $session" }
|
||||||
resumeFiber(session.fiber)
|
resumeFiber(session.fiber)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun SessionInit.loadInitiatingFlowClass(): Class<out FlowLogic<*>> {
|
||||||
|
return try {
|
||||||
|
Class.forName(initiatingFlowClass).asSubclass(FlowLogic::class.java)
|
||||||
|
} catch (e: ClassNotFoundException) {
|
||||||
|
throw SessionRejectException("Don't know $initiatingFlowClass")
|
||||||
|
} catch (e: ClassCastException) {
|
||||||
|
throw SessionRejectException("$initiatingFlowClass is not a flow")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
|
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
|
||||||
return fiber.serialize(context = CHECKPOINT_CONTEXT.withTokenContext(serializationContext))
|
return fiber.serialize(context = CHECKPOINT_CONTEXT.withTokenContext(serializationContext))
|
||||||
}
|
}
|
||||||
@ -580,3 +584,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class SessionRejectException(val rejectMessage: String, val logMessage: String) : Exception() {
|
||||||
|
constructor(message: String) : this(message, message)
|
||||||
|
}
|
||||||
|
@ -21,6 +21,7 @@ import net.corda.core.node.services.ServiceInfo
|
|||||||
import net.corda.core.node.services.queryBy
|
import net.corda.core.node.services.queryBy
|
||||||
import net.corda.core.node.services.unconsumedStates
|
import net.corda.core.node.services.unconsumedStates
|
||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.toFuture
|
import net.corda.core.toFuture
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
@ -349,7 +350,7 @@ class FlowFrameworkTests {
|
|||||||
sessionTransfers.expectEvents(isStrict = false) {
|
sessionTransfers.expectEvents(isStrict = false) {
|
||||||
sequence(
|
sequence(
|
||||||
// First Pay
|
// First Pay
|
||||||
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java }) {
|
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) {
|
||||||
it.message as SessionInit
|
it.message as SessionInit
|
||||||
assertEquals(node1.id, it.from)
|
assertEquals(node1.id, it.from)
|
||||||
assertEquals(notary1Address, it.to)
|
assertEquals(notary1Address, it.to)
|
||||||
@ -359,7 +360,7 @@ class FlowFrameworkTests {
|
|||||||
assertEquals(notary1.id, it.from)
|
assertEquals(notary1.id, it.from)
|
||||||
},
|
},
|
||||||
// Second pay
|
// Second pay
|
||||||
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java }) {
|
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) {
|
||||||
it.message as SessionInit
|
it.message as SessionInit
|
||||||
assertEquals(node1.id, it.from)
|
assertEquals(node1.id, it.from)
|
||||||
assertEquals(notary1Address, it.to)
|
assertEquals(notary1Address, it.to)
|
||||||
@ -369,7 +370,7 @@ class FlowFrameworkTests {
|
|||||||
assertEquals(notary2.id, it.from)
|
assertEquals(notary2.id, it.from)
|
||||||
},
|
},
|
||||||
// Third pay
|
// Third pay
|
||||||
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java }) {
|
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) {
|
||||||
it.message as SessionInit
|
it.message as SessionInit
|
||||||
assertEquals(node1.id, it.from)
|
assertEquals(node1.id, it.from)
|
||||||
assertEquals(notary1Address, it.to)
|
assertEquals(notary1Address, it.to)
|
||||||
@ -653,9 +654,36 @@ class FlowFrameworkTests {
|
|||||||
track = false)
|
track = false)
|
||||||
val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture
|
val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture
|
||||||
mockNet.runNetwork()
|
mockNet.runNetwork()
|
||||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
|
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
|
||||||
result.getOrThrow()
|
.isThrownBy { result.getOrThrow() }
|
||||||
}.withMessageContaining("Version")
|
.withMessageContaining("Version")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `unregistered flow`() {
|
||||||
|
val future = node1.services.startFlow(SendFlow("Hello", node2.info.legalIdentity)).resultFuture
|
||||||
|
mockNet.runNetwork()
|
||||||
|
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
|
||||||
|
.isThrownBy { future.getOrThrow() }
|
||||||
|
.withMessageEndingWith("${SendFlow::class.java.name} is not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `unknown class in session init`() {
|
||||||
|
node1.sendSessionMessage(SessionInit(random63BitValue(), "not.a.real.Class", 1, null), node2)
|
||||||
|
mockNet.runNetwork()
|
||||||
|
assertThat(sessionTransfers).hasSize(2) // Only the session-init and session-reject are expected
|
||||||
|
val reject = sessionTransfers.last().message as SessionReject
|
||||||
|
assertThat(reject.errorMessage).isEqualTo("Don't know not.a.real.Class")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `non-flow class in session init`() {
|
||||||
|
node1.sendSessionMessage(SessionInit(random63BitValue(), String::class.java.name, 1, null), node2)
|
||||||
|
mockNet.runNetwork()
|
||||||
|
assertThat(sessionTransfers).hasSize(2) // Only the session-init and session-reject are expected
|
||||||
|
val reject = sessionTransfers.last().message as SessionReject
|
||||||
|
assertThat(reject.errorMessage).isEqualTo("${String::class.java.name} is not a flow")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -705,13 +733,20 @@ class FlowFrameworkTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): SessionInit {
|
private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): SessionInit {
|
||||||
return SessionInit(0, clientFlowClass.java, flowVersion, payload)
|
return SessionInit(0, clientFlowClass.java.name, flowVersion, payload)
|
||||||
}
|
}
|
||||||
private val sessionConfirm = SessionConfirm(0, 0)
|
private val sessionConfirm = SessionConfirm(0, 0)
|
||||||
private fun sessionData(payload: Any) = SessionData(0, payload)
|
private fun sessionData(payload: Any) = SessionData(0, payload)
|
||||||
private val normalEnd = NormalSessionEnd(0)
|
private val normalEnd = NormalSessionEnd(0)
|
||||||
private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse)
|
private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse)
|
||||||
|
|
||||||
|
private fun MockNode.sendSessionMessage(message: SessionMessage, destination: MockNode) {
|
||||||
|
services.networkService.apply {
|
||||||
|
val address = getAddressOfParty(PartyInfo.Node(destination.info))
|
||||||
|
send(createMessage(StateMachineManager.sessionTopic, message.serialize().bytes), address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
||||||
assertThat(sessionTransfers).containsExactly(*expected)
|
assertThat(sessionTransfers).containsExactly(*expected)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user