CORDA-3506 - Add test for session close API (#6512)

This commit is contained in:
Dimos Raptis 2020-07-28 10:20:24 +01:00 committed by GitHub
parent d4189c4f37
commit f2336f397d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -43,7 +43,7 @@ class FlowSessionCloseTest {
).transpose().getOrThrow() ).transpose().getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), true, null, false).returnValue.getOrThrow() } assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), true, null, InitiatorFlow.ResponderReaction.NORMAL_CLOSE).returnValue.getOrThrow() }
.isInstanceOf(CordaRuntimeException::class.java) .isInstanceOf(CordaRuntimeException::class.java)
.hasMessageContaining(PrematureSessionCloseException::class.java.name) .hasMessageContaining(PrematureSessionCloseException::class.java.name)
.hasMessageContaining("The following session was closed before it was initialised") .hasMessageContaining("The following session was closed before it was initialised")
@ -52,18 +52,26 @@ class FlowSessionCloseTest {
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `flow cannot access closed session`() { fun `flow cannot access closed session, unless it's a duplicate close which is handled gracefully`() {
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
val (nodeAHandle, nodeBHandle) = listOf( val (nodeAHandle, nodeBHandle) = listOf(
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)), startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
startNode(providedName = BOB_NAME, rpcUsers = listOf(user)) startNode(providedName = BOB_NAME, rpcUsers = listOf(user))
).transpose().getOrThrow() ).transpose().getOrThrow()
InitiatorFlow.SessionAPI.values().forEach { sessionAPI ->
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, sessionAPI, false).returnValue.getOrThrow() } InitiatorFlow.SessionAPI.values().forEach { sessionAPI ->
.isInstanceOf(UnexpectedFlowEndException::class.java) when (sessionAPI) {
.hasMessageContaining("Tried to access ended session") InitiatorFlow.SessionAPI.CLOSE -> {
it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, sessionAPI, InitiatorFlow.ResponderReaction.NORMAL_CLOSE).returnValue.getOrThrow()
}
else -> {
assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, sessionAPI, InitiatorFlow.ResponderReaction.NORMAL_CLOSE).returnValue.getOrThrow() }
.isInstanceOf(UnexpectedFlowEndException::class.java)
.hasMessageContaining("Tried to access ended session")
}
}
} }
} }
@ -79,7 +87,7 @@ class FlowSessionCloseTest {
).transpose().getOrThrow() ).transpose().getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, false).returnValue.getOrThrow() it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, InitiatorFlow.ResponderReaction.NORMAL_CLOSE).returnValue.getOrThrow()
} }
} }
} }
@ -93,7 +101,7 @@ class FlowSessionCloseTest {
).transpose().getOrThrow() ).transpose().getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, true).returnValue.getOrThrow() it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, InitiatorFlow.ResponderReaction.RETRY_CLOSE_FROM_CHECKPOINT).returnValue.getOrThrow()
} }
} }
} }
@ -151,14 +159,21 @@ class FlowSessionCloseTest {
@StartableByRPC @StartableByRPC
class InitiatorFlow(val party: Party, private val prematureClose: Boolean = false, class InitiatorFlow(val party: Party, private val prematureClose: Boolean = false,
private val accessClosedSessionWithApi: SessionAPI? = null, private val accessClosedSessionWithApi: SessionAPI? = null,
private val retryClose: Boolean = false): FlowLogic<Unit>() { private val responderReaction: ResponderReaction): FlowLogic<Unit>() {
@CordaSerializable @CordaSerializable
enum class SessionAPI { enum class SessionAPI {
SEND, SEND,
SEND_AND_RECEIVE, SEND_AND_RECEIVE,
RECEIVE, RECEIVE,
GET_FLOW_INFO GET_FLOW_INFO,
CLOSE
}
@CordaSerializable
enum class ResponderReaction {
NORMAL_CLOSE,
RETRY_CLOSE_FROM_CHECKPOINT
} }
@Suspendable @Suspendable
@ -169,7 +184,7 @@ class FlowSessionCloseTest {
session.close() session.close()
} }
session.send(retryClose) session.send(responderReaction)
sleep(1.seconds) sleep(1.seconds)
if (accessClosedSessionWithApi != null) { if (accessClosedSessionWithApi != null) {
@ -178,6 +193,7 @@ class FlowSessionCloseTest {
SessionAPI.RECEIVE -> session.receive<String>() SessionAPI.RECEIVE -> session.receive<String>()
SessionAPI.SEND_AND_RECEIVE -> session.sendAndReceive<String>("dummy payload") SessionAPI.SEND_AND_RECEIVE -> session.sendAndReceive<String>("dummy payload")
SessionAPI.GET_FLOW_INFO -> session.getCounterpartyFlowInfo() SessionAPI.GET_FLOW_INFO -> session.getCounterpartyFlowInfo()
SessionAPI.CLOSE -> session.close()
} }
} }
} }
@ -192,16 +208,21 @@ class FlowSessionCloseTest {
@Suspendable @Suspendable
override fun call() { override fun call() {
val retryClose = otherSideSession.receive<Boolean>() val responderReaction = otherSideSession.receive<InitiatorFlow.ResponderReaction>()
.unwrap{ it } .unwrap{ it }
otherSideSession.close() when(responderReaction) {
InitiatorFlow.ResponderReaction.NORMAL_CLOSE -> {
otherSideSession.close()
}
InitiatorFlow.ResponderReaction.RETRY_CLOSE_FROM_CHECKPOINT -> {
otherSideSession.close()
// failing with a transient exception to force a replay of the close. // failing with a transient exception to force a replay of the close.
if (retryClose) { if (!thrown) {
if (!thrown) { thrown = true
thrown = true throw SQLTransientConnectionException("Connection is not available")
throw SQLTransientConnectionException("Connection is not available") }
} }
} }
} }