From 7261fa690f703923e7bddfd569dd51478ceed4ab Mon Sep 17 00:00:00 2001 From: Dimos Raptis Date: Tue, 21 Jul 2020 13:26:11 +0100 Subject: [PATCH 01/14] CORDA-3506 - Implement session close operations (#6357) --- constants.properties | 2 +- .../flows/FlowExternalAsyncOperationTest.kt | 2 +- .../kotlin/net/corda/core/flows/FlowLogic.kt | 17 ++ .../net/corda/core/flows/FlowSession.kt | 13 + .../net/corda/core/internal/CordaUtils.kt | 2 +- .../net/corda/core/internal/FlowIORequest.kt | 7 + .../corda/node/flows/FlowSessionCloseTest.kt | 273 ++++++++++++++++++ .../node/services/rpc/CheckpointDumperImpl.kt | 12 +- .../statemachine/ActionExecutorImpl.kt | 6 +- .../node/services/statemachine/FlowMonitor.kt | 1 + .../services/statemachine/FlowSessionImpl.kt | 6 + .../statemachine/StateMachineState.kt | 52 ++-- .../DeliverSessionMessageTransition.kt | 72 +++-- .../transitions/ErrorFlowTransition.kt | 3 +- .../transitions/KilledFlowTransition.kt | 3 +- .../transitions/StartedFlowTransition.kt | 272 +++++++++++------ .../transitions/TopLevelTransition.kt | 5 +- .../transitions/TransitionBuilder.kt | 2 + .../transitions/UnstartedFlowTransition.kt | 7 +- .../statemachine/FlowFrameworkTests.kt | 3 +- .../statemachine/FlowMetadataRecordingTest.kt | 2 +- .../statemachine/RetryFlowMockTest.kt | 5 + .../node/internal/InternalMockNetwork.kt | 3 +- .../node/internal/MockNodeMessagingService.kt | 1 + 24 files changed, 598 insertions(+), 173 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/flows/FlowSessionCloseTest.kt diff --git a/constants.properties b/constants.properties index c9877793fb..6cc3b7e4a2 100644 --- a/constants.properties +++ b/constants.properties @@ -11,7 +11,7 @@ java8MinUpdateVersion=171 # When incrementing platformVersion make sure to update # # net.corda.core.internal.CordaUtilsKt.PLATFORM_VERSION as well. # # ***************************************************************# -platformVersion=7 +platformVersion=8 guavaVersion=28.0-jre # Quasar version to use with Java 8: quasarVersion=0.7.12_r3 diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt index cbf1892e51..6b6cfb3891 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt @@ -167,7 +167,7 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() { @Suspendable override fun testCode(): Any = - await(ExternalAsyncOperation(serviceHub) { _, _ -> + await(ExternalAsyncOperation(serviceHub) { serviceHub, _ -> serviceHub.cordaService(FutureService::class.java).createFuture() }) } diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index f6f502cf98..fd7b16fc5f 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -25,6 +25,7 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.debug @@ -378,6 +379,22 @@ abstract class FlowLogic { stateMachine.suspend(request, maySkipCheckpoint) } + /** + * Closes the provided sessions and performs cleanup of any resources tied to these sessions. + * + * Note that sessions are closed automatically when the corresponding top-level flow terminates. + * So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.). + * A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the provided sessions are not going to be used anymore. + * As a result, any operations on a closed session will fail with an [UnexpectedFlowEndException]. + * When a session is closed, the other side is informed and the session is closed there too eventually. + * To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an [IllegalStateException]. + */ + @Suspendable + fun close(sessions: NonEmptySet) { + val request = FlowIORequest.CloseSessions(sessions) + stateMachine.suspend(request, false) + } + /** * Invokes the given subflow. This function returns once the subflow completes successfully with the result * returned by that subflow's [call] method. If the subflow has a progress tracker, it is attached to the diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt index ac16d6897d..dd09a9d481 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt @@ -191,6 +191,19 @@ abstract class FlowSession { */ @Suspendable abstract fun send(payload: Any) + + /** + * Closes this session and performs cleanup of any resources tied to this session. + * + * Note that sessions are closed automatically when the corresponding top-level flow terminates. + * So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.). + * A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the session is not going to be used anymore. + * As a result, any operations on a closed session will fail with an [UnexpectedFlowEndException]. + * When a session is closed, the other side is informed and the session is closed there too eventually. + * To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an [IllegalStateException]. + */ + @Suspendable + abstract fun close() } /** diff --git a/core/src/main/kotlin/net/corda/core/internal/CordaUtils.kt b/core/src/main/kotlin/net/corda/core/internal/CordaUtils.kt index cc99f3a6a2..af2a20b40c 100644 --- a/core/src/main/kotlin/net/corda/core/internal/CordaUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/CordaUtils.kt @@ -28,7 +28,7 @@ import java.util.jar.JarInputStream // *Internal* Corda-specific utilities. -const val PLATFORM_VERSION = 7 +const val PLATFORM_VERSION = 8 fun ServicesForResolution.ensureMinimumPlatformVersion(requiredMinPlatformVersion: Int, feature: String) { checkMinimumPlatformVersion(networkParameters.minimumPlatformVersion, requiredMinPlatformVersion, feature) diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt index 0d54a4715a..7ced0d46a0 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt @@ -55,6 +55,13 @@ sealed class FlowIORequest { }}, shouldRetrySend=$shouldRetrySend)" } + /** + * Closes the specified sessions. + * + * @property sessions the sessions to be closed. + */ + data class CloseSessions(val sessions: NonEmptySet): FlowIORequest() + /** * Wait for a transaction to be committed to the database. * diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowSessionCloseTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowSessionCloseTest.kt new file mode 100644 index 0000000000..a7e0cf877e --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowSessionCloseTest.kt @@ -0,0 +1,273 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.CordaRuntimeException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.startFlow +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.core.utilities.toNonEmptySet +import net.corda.core.utilities.unwrap +import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.transitions.PrematureSessionCloseException +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import net.corda.testing.node.internal.enclosedCordapp +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Test +import java.sql.SQLTransientConnectionException +import kotlin.test.assertEquals + +class FlowSessionCloseTest { + + private val user = User("user", "pwd", setOf(Permissions.all())) + + @Test(timeout=300_000) + fun `flow cannot close uninitialised session`() { + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user)) + ).transpose().getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), true, null, false).returnValue.getOrThrow() } + .isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining(PrematureSessionCloseException::class.java.name) + .hasMessageContaining("The following session was closed before it was initialised") + } + } + } + + @Test(timeout=300_000) + fun `flow cannot access closed session`() { + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user)) + ).transpose().getOrThrow() + + InitiatorFlow.SessionAPI.values().forEach { sessionAPI -> + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, sessionAPI, false).returnValue.getOrThrow() } + .isInstanceOf(UnexpectedFlowEndException::class.java) + .hasMessageContaining("Tried to access ended session") + } + } + + } + } + + @Test(timeout=300_000) + fun `flow can close initialised session successfully`() { + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user)) + ).transpose().getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, false).returnValue.getOrThrow() + } + } + } + + @Test(timeout=300_000) + fun `flow can close initialised session successfully even in case of failures and replays`() { + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user)) + ).transpose().getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, true).returnValue.getOrThrow() + } + } + } + + @Test(timeout=300_000) + fun `flow can close multiple sessions successfully`() { + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user)) + ).transpose().getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + it.proxy.startFlow(::InitiatorMultipleSessionsFlow, nodeBHandle.nodeInfo.legalIdentities.first()).returnValue.getOrThrow() + } + } + } + + /** + * This test ensures that when sessions are closed, the associated resources are eagerly cleaned up. + * If sessions are not closed, then the node will crash with an out-of-memory error. + * This can be confirmed by commenting out [FlowSession.close] operation in the invoked flow and re-run the test. + */ + @Test(timeout=300_000) + fun `flow looping over sessions can close them to release resources and avoid out-of-memory failures, when the other side does not finish early`() { + driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m"), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m") + ).transpose().getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + it.proxy.startFlow(::InitiatorLoopingFlow, nodeBHandle.nodeInfo.legalIdentities.first(), true).returnValue.getOrThrow() + } + } + } + + @Test(timeout=300_000) + fun `flow looping over sessions will close sessions automatically, when the other side finishes early`() { + driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { + val (nodeAHandle, nodeBHandle) = listOf( + startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m"), + startNode(providedName = BOB_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m") + ).transpose().getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + it.proxy.startFlow(::InitiatorLoopingFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false).returnValue.getOrThrow() + } + } + } + + + + @InitiatingFlow + @StartableByRPC + class InitiatorFlow(val party: Party, private val prematureClose: Boolean = false, + private val accessClosedSessionWithApi: SessionAPI? = null, + private val retryClose: Boolean = false): FlowLogic() { + + @CordaSerializable + enum class SessionAPI { + SEND, + SEND_AND_RECEIVE, + RECEIVE, + GET_FLOW_INFO + } + + @Suspendable + override fun call() { + val session = initiateFlow(party) + + if (prematureClose) { + session.close() + } + + session.send(retryClose) + sleep(1.seconds) + + if (accessClosedSessionWithApi != null) { + when(accessClosedSessionWithApi) { + SessionAPI.SEND -> session.send("dummy payload ") + SessionAPI.RECEIVE -> session.receive() + SessionAPI.SEND_AND_RECEIVE -> session.sendAndReceive("dummy payload") + SessionAPI.GET_FLOW_INFO -> session.getCounterpartyFlowInfo() + } + } + } + } + + @InitiatedBy(InitiatorFlow::class) + class InitiatedFlow(private val otherSideSession: FlowSession): FlowLogic() { + + companion object { + var thrown = false + } + + @Suspendable + override fun call() { + val retryClose = otherSideSession.receive() + .unwrap{ it } + + otherSideSession.close() + + // failing with a transient exception to force a replay of the close. + if (retryClose) { + if (!thrown) { + thrown = true + throw SQLTransientConnectionException("Connection is not available") + } + } + } + } + + @InitiatingFlow + @StartableByRPC + class InitiatorLoopingFlow(val party: Party, val blockingCounterparty: Boolean = false): FlowLogic() { + @Suspendable + override fun call() { + for (i in 1..1_000) { + val session = initiateFlow(party) + session.sendAndReceive(blockingCounterparty ).unwrap{ assertEquals("Got it", it) } + + /** + * If the counterparty blocks, we need to eagerly close the session and release resources to avoid running out of memory. + * Otherwise, the session end messages from the other side will do that automatically. + */ + if (blockingCounterparty) { + session.close() + } + + logger.info("Completed iteration $i") + } + } + } + + @InitiatedBy(InitiatorLoopingFlow::class) + class InitiatedLoopingFlow(private val otherSideSession: FlowSession): FlowLogic() { + @Suspendable + override fun call() { + val shouldBlock = otherSideSession.receive() + .unwrap{ it } + otherSideSession.send("Got it") + + if (shouldBlock) { + otherSideSession.receive() + } + } + } + + @InitiatingFlow + @StartableByRPC + class InitiatorMultipleSessionsFlow(val party: Party): FlowLogic() { + @Suspendable + override fun call() { + for (round in 1 .. 2) { + val sessions = mutableListOf() + for (session_number in 1 .. 5) { + val session = initiateFlow(party) + sessions.add(session) + session.sendAndReceive("What's up?").unwrap{ assertEquals("All good!", it) } + } + close(sessions.toNonEmptySet()) + } + } + } + + @InitiatedBy(InitiatorMultipleSessionsFlow::class) + class InitiatedMultipleSessionsFlow(private val otherSideSession: FlowSession): FlowLogic() { + @Suspendable + override fun call() { + otherSideSession.receive() + .unwrap{ assertEquals("What's up?", it) } + otherSideSession.send("All good!") + } + } + +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index 2d57f8947e..412ccd72a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -60,13 +60,11 @@ import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.repo import net.corda.node.internal.NodeStartup import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint -import net.corda.node.services.statemachine.DataSessionMessage import net.corda.node.services.statemachine.ErrorState -import net.corda.node.services.statemachine.FlowError +import net.corda.node.services.statemachine.ExistingSessionMessagePayload import net.corda.node.services.statemachine.FlowSessionImpl import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.node.services.statemachine.InitiatedSessionState import net.corda.node.services.statemachine.SessionId import net.corda.node.services.statemachine.SessionState import net.corda.node.services.statemachine.SubFlow @@ -325,6 +323,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri val send: List? = null, val receive: NonEmptySet? = null, val sendAndReceive: List? = null, + val closeSessions: NonEmptySet? = null, val waitForLedgerCommit: SecureHash? = null, val waitForStateConsumption: Set? = null, val getFlowInfo: NonEmptySet? = null, @@ -352,6 +351,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson()) is FlowIORequest.Receive -> SuspendedOn(receive = sessions) is FlowIORequest.SendAndReceive -> SuspendedOn(sendAndReceive = sessionToMessage.toJson()) + is FlowIORequest.CloseSessions -> SuspendedOn(closeSessions = sessions) is FlowIORequest.WaitForLedgerCommit -> SuspendedOn(waitForLedgerCommit = hash) is FlowIORequest.GetFlowInfo -> SuspendedOn(getFlowInfo = sessions) is FlowIORequest.Sleep -> SuspendedOn(sleepTill = wakeUpAfter) @@ -379,16 +379,14 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri private class ActiveSession( val peer: Party, val ourSessionId: SessionId, - val receivedMessages: List, - val errors: List, + val receivedMessages: List, val peerFlowInfo: FlowInfo, val peerSessionId: SessionId? ) private fun SessionState.toActiveSession(sessionId: SessionId): ActiveSession? { return if (this is SessionState.Initiated) { - val peerSessionId = (initiatedState as? InitiatedSessionState.Live)?.peerSinkSessionId - ActiveSession(peerParty, sessionId, receivedMessages, errors, peerFlowInfo, peerSessionId) + ActiveSession(peerParty, sessionId, receivedMessages, peerFlowInfo, peerSinkSessionId) } else { null } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 435ae5d6f3..7f31d0e743 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -130,13 +130,9 @@ internal class ActionExecutorImpl( log.warn("Propagating error", exception) } for (sessionState in action.sessions) { - // We cannot propagate if the session isn't live. - if (sessionState.initiatedState !is InitiatedSessionState.Live) { - continue - } // Don't propagate errors to the originating session for (errorMessage in action.errorMessages) { - val sinkSessionId = sessionState.initiatedState.peerSinkSessionId + val sinkSessionId = sessionState.peerSinkSessionId val existingMessage = ExistingSessionMessage(sinkSessionId, errorMessage) val deduplicationId = DeduplicationId.createForError(errorMessage.errorId, sinkSessionId) flowMessaging.sendSessionMessage(sessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, action.senderUUID)) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt index 9f80005880..5fba6ae8b7 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -78,6 +78,7 @@ internal class FlowMonitor( is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}" is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}" is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}" + is FlowIORequest.CloseSessions -> "to close sessions: ${request.sessions}" is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt index 0dc2e53b23..7d02a23c99 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt @@ -81,6 +81,12 @@ class FlowSessionImpl( @Suspendable override fun send(payload: Any) = send(payload, maySkipCheckpoint = false) + @Suspendable + override fun close() { + val request = FlowIORequest.CloseSessions(NonEmptySet.of(this)) + return flowStateMachine.suspend(request, false) + } + private fun enforceNotPrimitive(type: Class<*>) { require(!type.isPrimitive) { "Cannot receive primitive type $type" } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 58a072fc99..e652d26145 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -106,6 +106,7 @@ data class Checkpoint( invocationContext, ourIdentity, emptyMap(), + emptySet(), listOf(topLevelSubFlow), numberOfSuspends = 0 ), @@ -132,6 +133,22 @@ data class Checkpoint( return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions + session)) } + fun addSessionsToBeClosed(sessionIds: Set): Checkpoint { + return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed + sessionIds)) + } + + fun removeSessionsToBeClosed(sessionIds: Set): Checkpoint { + return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds)) + } + + /** + * Returns a copy of the Checkpoint with the specified session removed from the session map. + * @param sessionIds the sessions to remove. + */ + fun removeSessions(sessionIds: Set): Checkpoint { + return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions - sessionIds)) + } + /** * Returns a copy of the Checkpoint with a new subFlow stack. * @param subFlows the new List of subFlows. @@ -193,16 +210,18 @@ data class Checkpoint( * @param invocationContext the initiator of the flow. * @param ourIdentity the identity the flow is run as. * @param sessions map of source session ID to session state. + * @param sessionsToBeClosed the sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions. * @param subFlowStack the stack of currently executing subflows. * @param numberOfSuspends the number of flow suspends due to IO API calls. */ @CordaSerializable data class CheckpointState( - val invocationContext: InvocationContext, - val ourIdentity: Party, - val sessions: SessionMap, // This must preserve the insertion order! - val subFlowStack: List, - val numberOfSuspends: Int + val invocationContext: InvocationContext, + val ourIdentity: Party, + val sessions: SessionMap, // This must preserve the insertion order! + val sessionsToBeClosed: Set, + val subFlowStack: List, + val numberOfSuspends: Int ) /** @@ -236,30 +255,25 @@ sealed class SessionState { /** * We have received a confirmation, the peer party and session id is resolved. - * @property errors if not empty the session is in an errored state. + * @property receivedMessages the messages that have been received and are pending processing. + * this could be any [ExistingSessionMessagePayload] type in theory, but it in practice it can only be one of the following types now: + * * [DataSessionMessage] + * * [ErrorSessionMessage] + * * [EndSessionMessage] + * @property otherSideErrored whether the session has received an error from the other side. */ data class Initiated( val peerParty: Party, val peerFlowInfo: FlowInfo, - val receivedMessages: List, - val initiatedState: InitiatedSessionState, - val errors: List, + val receivedMessages: List, + val otherSideErrored: Boolean, + val peerSinkSessionId: SessionId, override val deduplicationSeed: String ) : SessionState() } typealias SessionMap = Map -/** - * Tracks whether an initiated session state is live or has ended. This is a separate state, as we still need the rest - * of [SessionState.Initiated], even when the session has ended, for un-drained session messages and potential future - * [FlowInfo] requests. - */ -sealed class InitiatedSessionState { - data class Live(val peerSinkSessionId: SessionId) : InitiatedSessionState() - object Ended : InitiatedSessionState() { override fun toString() = "Ended" } -} - /** * Represents the way the flow has started. */ diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt index 0aa58241eb..5719139095 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt @@ -1,9 +1,8 @@ package net.corda.node.services.statemachine.transitions -import net.corda.core.flows.FlowException import net.corda.core.flows.UnexpectedFlowEndException -import net.corda.core.identity.Party -import net.corda.core.internal.DeclaredField +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.node.services.statemachine.Action import net.corda.node.services.statemachine.ConfirmSessionMessage import net.corda.node.services.statemachine.DataSessionMessage @@ -12,7 +11,7 @@ import net.corda.node.services.statemachine.ErrorSessionMessage import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.ExistingSessionMessage import net.corda.node.services.statemachine.FlowError -import net.corda.node.services.statemachine.InitiatedSessionState +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.RejectSessionMessage import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.services.statemachine.SessionState @@ -37,6 +36,11 @@ class DeliverSessionMessageTransition( override val startingState: StateMachineState, val event: Event.DeliverSessionMessage ) : Transition { + + private companion object { + val log = contextLogger() + } + override fun transition(): TransitionResult { return builder { // Add the DeduplicationHandler to the pending ones ASAP so in case an error happens we still know @@ -49,7 +53,7 @@ class DeliverSessionMessageTransition( // Check whether we have a session corresponding to the message. val existingSession = startingState.checkpoint.checkpointState.sessions[event.sessionMessage.recipientSessionId] if (existingSession == null) { - freshErrorTransition(CannotFindSessionException(event.sessionMessage.recipientSessionId)) + checkIfMissingSessionIsAnIssue(event.sessionMessage) } else { val payload = event.sessionMessage.payload // Dispatch based on what kind of message it is. @@ -58,7 +62,7 @@ class DeliverSessionMessageTransition( is DataSessionMessage -> dataMessageTransition(existingSession, payload) is ErrorSessionMessage -> errorMessageTransition(existingSession, payload) is RejectSessionMessage -> rejectMessageTransition(existingSession, payload) - is EndSessionMessage -> endMessageTransition() + is EndSessionMessage -> endMessageTransition(payload) } } // Schedule a DoRemainingWork to check whether the flow needs to be woken up. @@ -67,6 +71,14 @@ class DeliverSessionMessageTransition( } } + private fun TransitionBuilder.checkIfMissingSessionIsAnIssue(message: ExistingSessionMessage) { + val payload = message.payload + if (payload is EndSessionMessage) + log.debug { "Received session end message for a session that has already ended: ${event.sessionMessage.recipientSessionId}"} + else + freshErrorTransition(CannotFindSessionException(event.sessionMessage.recipientSessionId)) + } + private fun TransitionBuilder.confirmMessageTransition(sessionState: SessionState, message: ConfirmSessionMessage) { // We received a confirmation message. The corresponding session state must be Initiating. when (sessionState) { @@ -76,9 +88,9 @@ class DeliverSessionMessageTransition( peerParty = event.sender, peerFlowInfo = message.initiatedFlowInfo, receivedMessages = emptyList(), - initiatedState = InitiatedSessionState.Live(message.initiatedSessionId), - errors = emptyList(), - deduplicationSeed = sessionState.deduplicationSeed + peerSinkSessionId = message.initiatedSessionId, + deduplicationSeed = sessionState.deduplicationSeed, + otherSideErrored = false ) val newCheckpoint = currentState.checkpoint.addSession( event.sessionMessage.recipientSessionId to initiatedSession @@ -115,28 +127,11 @@ class DeliverSessionMessageTransition( } private fun TransitionBuilder.errorMessageTransition(sessionState: SessionState, payload: ErrorSessionMessage) { - val exception: Throwable = if (payload.flowException == null) { - UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = payload.errorId) - } else { - payload.flowException.originalErrorId = payload.errorId - payload.flowException - } - return when (sessionState) { is SessionState.Initiated -> { - when (exception) { - // reflection used to access private field - is UnexpectedFlowEndException -> DeclaredField( - UnexpectedFlowEndException::class.java, - "peer", - exception - ).value = sessionState.peerParty - is FlowException -> DeclaredField(FlowException::class.java, "peer", exception).value = sessionState.peerParty - } val checkpoint = currentState.checkpoint val sessionId = event.sessionMessage.recipientSessionId - val flowError = FlowError(payload.errorId, exception) - val newSessionState = sessionState.copy(errors = sessionState.errors + flowError) + val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages + payload) currentState = currentState.copy( checkpoint = checkpoint.addSession(sessionId to newSessionState) ) @@ -165,23 +160,26 @@ class DeliverSessionMessageTransition( } } - private fun TransitionBuilder.endMessageTransition() { + private fun TransitionBuilder.endMessageTransition(payload: EndSessionMessage) { + val sessionId = event.sessionMessage.recipientSessionId val sessions = currentState.checkpoint.checkpointState.sessions - val sessionState = sessions[sessionId] - if (sessionState == null) { - return freshErrorTransition(CannotFindSessionException(sessionId)) - } + // a check has already been performed to confirm the session exists for this message before this method is invoked. + val sessionState = sessions[sessionId]!! when (sessionState) { is SessionState.Initiated -> { - val newSessionState = sessionState.copy(initiatedState = InitiatedSessionState.Ended) - currentState = currentState.copy( - checkpoint = currentState.checkpoint.addSession(sessionId to newSessionState) + val flowState = currentState.checkpoint.flowState + // flow must have already been started when session end messages are being delivered. + if (flowState !is FlowState.Started) + return freshErrorTransition(UnexpectedEventInState()) - ) + val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages + payload) + val newCheckpoint = currentState.checkpoint.addSession(event.sessionMessage.recipientSessionId to newSessionState) + .addSessionsToBeClosed(setOf(event.sessionMessage.recipientSessionId)) + currentState = currentState.copy(checkpoint = newCheckpoint) } else -> { - freshErrorTransition(UnexpectedEventInState()) + freshErrorTransition(PrematureSessionEndException(event.sessionMessage.recipientSessionId)) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt index 551807fcdf..ba5ecaa6bd 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt @@ -117,8 +117,9 @@ class ErrorFlowTransition( sessionState } } + // if we have already received error message from the other side, we don't include that session in the list to avoid propagating errors. val initiatedSessions = sessions.values.mapNotNull { session -> - if (session is SessionState.Initiated && session.errors.isEmpty()) { + if (session is SessionState.Initiated && !session.otherSideErrored) { session } else { null diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt index 5c7b095e80..9c44f5988c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt @@ -105,8 +105,9 @@ class KilledFlowTransition( sessionState } } + // if we have already received error message from the other side, we don't include that session in the list to avoid propagating errors. val initiatedSessions = sessions.values.mapNotNull { session -> - if (session is SessionState.Initiated && session.errors.isEmpty()) { + if (session is SessionState.Initiated && !session.otherSideErrored) { session } else { null diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 96b6557829..cea423134f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -1,13 +1,18 @@ package net.corda.node.services.statemachine.transitions +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowSession import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.identity.Party +import net.corda.core.internal.DeclaredField import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes +import net.corda.core.utilities.contextLogger import net.corda.core.utilities.toNonEmptySet import net.corda.node.services.statemachine.* -import java.lang.IllegalStateException +import org.slf4j.Logger +import kotlin.collections.LinkedHashMap /** * This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request @@ -20,28 +25,62 @@ class StartedFlowTransition( override val startingState: StateMachineState, val started: FlowState.Started ) : Transition { + + companion object { + private val logger: Logger = contextLogger() + } + override fun transition(): TransitionResult { val flowIORequest = started.flowIORequest - val checkpoint = startingState.checkpoint - val errorsToThrow = collectRelevantErrorsToThrow(flowIORequest, checkpoint) + val (newState, errorsToThrow) = collectRelevantErrorsToThrow(startingState, flowIORequest) if (errorsToThrow.isNotEmpty()) { return TransitionResult( - newState = startingState.copy(isFlowResumed = true), + newState = newState.copy(isFlowResumed = true), // throw the first exception. TODO should this aggregate all of them somehow? actions = listOf(Action.CreateTransaction), continuation = FlowContinuation.Throw(errorsToThrow[0]) ) } - return when (flowIORequest) { - is FlowIORequest.Send -> sendTransition(flowIORequest) - is FlowIORequest.Receive -> receiveTransition(flowIORequest) - is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest) - is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest) - is FlowIORequest.Sleep -> sleepTransition(flowIORequest) - is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest) - is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition() - is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest) - FlowIORequest.ForceCheckpoint -> executeForceCheckpoint() + val sessionsToBeTerminated = findSessionsToBeTerminated(startingState) + // if there are sessions to be closed, we close them as part of this transition and normal processing will continue on the next transition. + return if (sessionsToBeTerminated.isNotEmpty()) { + terminateSessions(sessionsToBeTerminated) + } else { + when (flowIORequest) { + is FlowIORequest.Send -> sendTransition(flowIORequest) + is FlowIORequest.Receive -> receiveTransition(flowIORequest) + is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest) + is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest) + is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest) + is FlowIORequest.Sleep -> sleepTransition(flowIORequest) + is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest) + is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition() + is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest) + FlowIORequest.ForceCheckpoint -> executeForceCheckpoint() + } + } + } + + private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap { + return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId -> + val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated + if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) { + sessionId to sessionState + } else { + null + } + }.toMap() + } + + private fun terminateSessions(sessionsToBeTerminated: SessionMap): TransitionResult { + return builder { + val sessionsToRemove = sessionsToBeTerminated.keys + val newCheckpoint = currentState.checkpoint.removeSessions(sessionsToRemove) + .removeSessionsToBeClosed(sessionsToRemove) + currentState = currentState.copy(checkpoint = newCheckpoint) + actions.add(Action.RemoveSessionBindings(sessionsToRemove)) + actions.add(Action.ScheduleEvent(Event.DoRemainingWork)) + FlowContinuation.ProcessEvents } } @@ -149,6 +188,34 @@ class StartedFlowTransition( } } + private fun closeSessionTransition(flowIORequest: FlowIORequest.CloseSessions): TransitionResult { + return builder { + val sessionIdsToRemove = flowIORequest.sessions.map { sessionToSessionId(it) }.toSet() + val existingSessionsToRemove = currentState.checkpoint.checkpointState.sessions.filter { (sessionId, _) -> + sessionIdsToRemove.contains(sessionId) + } + val alreadyClosedSessions = sessionIdsToRemove.filter { sessionId -> sessionId !in existingSessionsToRemove } + if (alreadyClosedSessions.isNotEmpty()) { + logger.warn("Attempting to close already closed sessions: $alreadyClosedSessions") + } + + if (existingSessionsToRemove.isNotEmpty()) { + val sendEndMessageActions = existingSessionsToRemove.values.mapIndexed { index, state -> + val sinkSessionId = (state as SessionState.Initiated).peerSinkSessionId + val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage) + val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state) + Action.SendExisting(state.peerParty, message, SenderDeduplicationId(deduplicationId, currentState.senderUUID)) + } + + currentState = currentState.copy(checkpoint = currentState.checkpoint.removeSessions(existingSessionsToRemove.keys)) + actions.add(Action.RemoveSessionBindings(sessionIdsToRemove)) + actions.add(Action.SendMultiple(emptyList(), sendEndMessageActions)) + } + + resumeFlowLogic(Unit) + } + } + private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult { return builder { val sessionIdToSession = LinkedHashMap() @@ -199,7 +266,8 @@ class StartedFlowTransition( someNotFound = true } else { newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList()) - resultMessages[sessionId] = messages[0].payload + // at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message. + resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload } } else -> { @@ -257,12 +325,6 @@ class StartedFlowTransition( val checkpoint = startingState.checkpoint val newSessions = LinkedHashMap(checkpoint.checkpointState.sessions) var index = 0 - for ((sourceSessionId, _) in sourceSessionIdToMessage) { - val existingSessionState = checkpoint.checkpointState.sessions[sourceSessionId] ?: return freshErrorTransition(CannotFindSessionException(sourceSessionId)) - if (existingSessionState is SessionState.Initiated && existingSessionState.initiatedState is InitiatedSessionState.Ended) { - return freshErrorTransition(IllegalStateException("Tried to send to ended session $sourceSessionId")) - } - } val messagesByType = sourceSessionIdToMessage.toList() .map { (sourceSessionId, message) -> Triple(sourceSessionId, checkpoint.checkpointState.sessions[sourceSessionId]!!, message) } @@ -286,17 +348,13 @@ class StartedFlowTransition( val newBufferedMessages = initiatingSessionState.bufferedMessages + Pair(deduplicationId, sessionMessage) newSessions[sourceSessionId] = initiatingSessionState.copy(bufferedMessages = newBufferedMessages) } - val sendExistingActions = messagesByType[SessionState.Initiated::class]?.mapNotNull {(_, sessionState, message) -> + val sendExistingActions = messagesByType[SessionState.Initiated::class]?.map {(_, sessionState, message) -> val initiatedSessionState = sessionState as SessionState.Initiated - if (initiatedSessionState.initiatedState !is InitiatedSessionState.Live) - null - else { - val sessionMessage = DataSessionMessage(message) - val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatedSessionState) - val sinkSessionId = initiatedSessionState.initiatedState.peerSinkSessionId - val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage) - Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)) - } + val sessionMessage = DataSessionMessage(message) + val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatedSessionState) + val sinkSessionId = initiatedSessionState.peerSinkSessionId + val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage) + Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)) } ?: emptyList() if (sendInitialActions.isNotEmpty() || sendExistingActions.isNotEmpty()) { @@ -309,21 +367,68 @@ class StartedFlowTransition( return (session as FlowSessionImpl).sourceSessionId } - private fun collectErroredSessionErrors(sessionIds: Collection, checkpoint: Checkpoint): List { - return sessionIds.flatMap { sessionId -> - val sessionState = checkpoint.checkpointState.sessions[sessionId]!! - when (sessionState) { - is SessionState.Uninitiated -> emptyList() - is SessionState.Initiating -> { - if (sessionState.rejectionError == null) { - emptyList() - } else { - listOf(sessionState.rejectionError.exception) + private fun collectErroredSessionErrors(startingState: StateMachineState, sessionIds: Collection): Pair> { + var newState = startingState + val errors = sessionIds.filter { sessionId -> + startingState.checkpoint.checkpointState.sessions.containsKey(sessionId) + }.flatMap { sessionId -> + val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! + when (sessionState) { + is SessionState.Uninitiated -> emptyList() + is SessionState.Initiating -> { + if (sessionState.rejectionError == null) { + emptyList() + } else { + listOf(sessionState.rejectionError.exception) + } + } + is SessionState.Initiated -> { + if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is ErrorSessionMessage) { + val errorMessage = sessionState.receivedMessages.first() as ErrorSessionMessage + val exception = convertErrorMessageToException(errorMessage, sessionState.peerParty) + val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages.subList(1, sessionState.receivedMessages.size), otherSideErrored = true) + val newCheckpoint = startingState.checkpoint.addSession(sessionId to newSessionState) + newState = startingState.copy(checkpoint = newCheckpoint) + listOf(exception) + } else { + emptyList() + } + } } } - is SessionState.Initiated -> sessionState.errors.map(FlowError::exception) - } + return Pair(newState, errors) + } + + private fun convertErrorMessageToException(errorMessage: ErrorSessionMessage, peer: Party): Throwable { + val exception: Throwable = if (errorMessage.flowException == null) { + UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = errorMessage.errorId) + } else { + errorMessage.flowException.originalErrorId = errorMessage.errorId + errorMessage.flowException } + when (exception) { + // reflection used to access private field + is UnexpectedFlowEndException -> DeclaredField( + UnexpectedFlowEndException::class.java, + "peer", + exception + ).value = peer + is FlowException -> DeclaredField(FlowException::class.java, "peer", exception).value = peer + } + return exception + } + + private fun collectUncloseableSessions(sessionIds: Collection, checkpoint: Checkpoint): List { + val uninitialisedSessions = sessionIds.mapNotNull { sessionId -> + if (!checkpoint.checkpointState.sessions.containsKey(sessionId)) + null + else + sessionId to checkpoint.checkpointState.sessions[sessionId] + } + .filter { (_, sessionState) -> sessionState !is SessionState.Initiated } + .map { it.first } + + return uninitialisedSessions.map { PrematureSessionCloseException(it) } } private fun collectErroredInitiatingSessionErrors(checkpoint: Checkpoint): List { @@ -333,77 +438,64 @@ class StartedFlowTransition( } private fun collectEndedSessionErrors(sessionIds: Collection, checkpoint: Checkpoint): List { - return sessionIds.mapNotNull { sessionId -> - val sessionState = checkpoint.checkpointState.sessions[sessionId]!! - when (sessionState) { - is SessionState.Initiated -> { - if (sessionState.initiatedState === InitiatedSessionState.Ended) { - UnexpectedFlowEndException( - "Tried to access ended session $sessionId", - cause = null, - originalErrorId = context.secureRandom.nextLong() - ) - } else { - null - } - } - else -> null - } + return sessionIds.filter { sessionId -> + !checkpoint.checkpointState.sessions.containsKey(sessionId) + }.map {sessionId -> + UnexpectedFlowEndException( + "Tried to access ended session $sessionId", + cause = null, + originalErrorId = context.secureRandom.nextLong() + ) } } - private fun collectEndedEmptySessionErrors(sessionIds: Collection, checkpoint: Checkpoint): List { - return sessionIds.mapNotNull { sessionId -> - val sessionState = checkpoint.checkpointState.sessions[sessionId]!! - when (sessionState) { - is SessionState.Initiated -> { - if (sessionState.initiatedState === InitiatedSessionState.Ended && - sessionState.receivedMessages.isEmpty()) { - UnexpectedFlowEndException( - "Tried to access ended session $sessionId with empty buffer", - cause = null, - originalErrorId = context.secureRandom.nextLong() - ) - } else { - null - } - } - else -> null - } - } - } - - private fun collectRelevantErrorsToThrow(flowIORequest: FlowIORequest<*>, checkpoint: Checkpoint): List { + private fun collectRelevantErrorsToThrow(startingState: StateMachineState, flowIORequest: FlowIORequest<*>): Pair> { return when (flowIORequest) { is FlowIORequest.Send -> { val sessionIds = flowIORequest.sessionToMessage.keys.map(this::sessionToSessionId) - collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedSessionErrors(sessionIds, checkpoint) + val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds) + val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint) + Pair(newState, erroredSessionErrors + endedSessionErrors) } is FlowIORequest.Receive -> { val sessionIds = flowIORequest.sessions.map(this::sessionToSessionId) - collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedEmptySessionErrors(sessionIds, checkpoint) + val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds) + val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint) + Pair(newState, erroredSessionErrors + endedSessionErrors) } is FlowIORequest.SendAndReceive -> { val sessionIds = flowIORequest.sessionToMessage.keys.map(this::sessionToSessionId) - collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedSessionErrors(sessionIds, checkpoint) + val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds) + val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint) + Pair(newState, erroredSessionErrors + endedSessionErrors) } is FlowIORequest.WaitForLedgerCommit -> { - collectErroredSessionErrors(checkpoint.checkpointState.sessions.keys, checkpoint) + return collectErroredSessionErrors(startingState, startingState.checkpoint.checkpointState.sessions.keys) } is FlowIORequest.GetFlowInfo -> { - collectErroredSessionErrors(flowIORequest.sessions.map(this::sessionToSessionId), checkpoint) + val sessionIds = flowIORequest.sessions.map(this::sessionToSessionId) + val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds) + val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint) + Pair(newState, erroredSessionErrors + endedSessionErrors) + } + is FlowIORequest.CloseSessions -> { + val sessionIds = flowIORequest.sessions.map(this::sessionToSessionId) + val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds) + val uncloseableSessionErrors = collectUncloseableSessions(sessionIds, startingState.checkpoint) + Pair(newState, erroredSessionErrors + uncloseableSessionErrors) } is FlowIORequest.Sleep -> { - emptyList() + Pair(startingState, emptyList()) } is FlowIORequest.WaitForSessionConfirmations -> { - collectErroredInitiatingSessionErrors(checkpoint) + val errors = collectErroredInitiatingSessionErrors(startingState.checkpoint) + Pair(startingState, errors) } is FlowIORequest.ExecuteAsyncOperation<*> -> { - emptyList() + Pair(startingState, emptyList()) } FlowIORequest.ForceCheckpoint -> { - emptyList() + Pair(startingState, emptyList()) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 1b7d79dfec..4846ee101d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -18,7 +18,6 @@ import net.corda.node.services.statemachine.FlowRemovalReason import net.corda.node.services.statemachine.FlowSessionImpl import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.InitialSessionMessage -import net.corda.node.services.statemachine.InitiatedSessionState import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.services.statemachine.SessionId import net.corda.node.services.statemachine.SessionMessage @@ -267,8 +266,8 @@ class TopLevelTransition( private fun TransitionBuilder.sendEndMessages() { val sendEndMessageActions = currentState.checkpoint.checkpointState.sessions.values.mapIndexed { index, state -> - if (state is SessionState.Initiated && state.initiatedState is InitiatedSessionState.Live) { - val message = ExistingSessionMessage(state.initiatedState.peerSinkSessionId, EndSessionMessage) + if (state is SessionState.Initiated) { + val message = ExistingSessionMessage(state.peerSinkSessionId, EndSessionMessage) val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state) Action.SendExisting(state.peerParty, message, SenderDeduplicationId(deduplicationId, currentState.senderUUID)) } else { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt index 5e6ca3adbb..dac380b3c2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt @@ -81,3 +81,5 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi class CannotFindSessionException(sessionId: SessionId) : IllegalStateException("Couldn't find session with id $sessionId") class UnexpectedEventInState : IllegalStateException("Unexpected event") +class PrematureSessionCloseException(sessionId: SessionId): IllegalStateException("The following session was closed before it was initialised: $sessionId") +class PrematureSessionEndException(sessionId: SessionId): IllegalStateException("A premature session end message was received before the session was initialised: $sessionId") \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt index c85830fb03..7361943cde 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt @@ -8,7 +8,6 @@ import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.ExistingSessionMessage import net.corda.node.services.statemachine.FlowStart import net.corda.node.services.statemachine.FlowState -import net.corda.node.services.statemachine.InitiatedSessionState import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.services.statemachine.SessionState import net.corda.node.services.statemachine.StateMachineState @@ -45,7 +44,7 @@ class UnstartedFlowTransition( val initiatingMessage = flowStart.initiatingMessage val initiatedState = SessionState.Initiated( peerParty = flowStart.peerSession.counterparty, - initiatedState = InitiatedSessionState.Live(initiatingMessage.initiatorSessionId), + peerSinkSessionId = initiatingMessage.initiatorSessionId, peerFlowInfo = FlowInfo( flowVersion = flowStart.senderCoreFlowVersion ?: initiatingMessage.flowVersion, appName = initiatingMessage.appName @@ -55,8 +54,8 @@ class UnstartedFlowTransition( } else { listOf(DataSessionMessage(initiatingMessage.firstPayload)) }, - errors = emptyList(), - deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}" + deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}", + otherSideErrored = false ) val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo) val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index feafb34279..d6eb21aa33 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -201,7 +201,7 @@ class FlowFrameworkTests { } @Test(timeout=300_000) - fun `other side ends before doing expected send`() { + fun `other side ends before doing expected send`() { bobNode.registerCordappFlowFactory(ReceiveFlow::class) { NoOpFlow() } val resultFuture = aliceNode.services.startFlow(ReceiveFlow(bob)).resultFuture mockNet.runNetwork() @@ -868,6 +868,7 @@ class FlowFrameworkTests { session.send(1) // ... then pause this one until it's received the session-end message from the other side receivedOtherFlowEnd.acquire() + session.sendAndReceive(2) } } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt index ddac3afba8..0fae5c91bb 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt @@ -247,7 +247,7 @@ class FlowMetadataRecordingTest { it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT) ) assertThat(it.launchingCordapp).contains("custom-cordapp") - assertEquals(7, it.platformVersion) + assertEquals(8, it.platformVersion) assertEquals(nodeAHandle.nodeInfo.singleIdentity().name.toString(), it.startedBy) assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant) assertTrue(it.startInstant >= it.invocationInstant) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 3f5c249424..ee93d937d2 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -183,6 +183,11 @@ class RetryFlowMockTest { override fun send(payload: Any) { TODO("not implemented") } + + override fun close() { + TODO("Not yet implemented") + } + }), nodeA.services.newContext()).get() records.next() // Killing it should remove it. diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index aea0e9d5d0..9ae6f1f9d2 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -529,7 +529,8 @@ open class InternalMockNetwork(cordappPackages: List = emptyList(), } private fun pumpAll(): Boolean { - val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) } + val transferredMessages = messagingNetwork.endpoints.filter { it.active } + .map { it.pumpReceive(false) } return transferredMessages.any { it != null } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt index 3b81fbc2ef..a64da72048 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt @@ -173,6 +173,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration, it.join() } running = false + stateHelper.active = false network.netNodeHasShutdown(myAddress) } From 50c51d3e6fcb30ae6979cc637ad6a78a8d042842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Waldemar=20=C5=BBurowski?= <45210402+wzur-r3@users.noreply.github.com> Date: Tue, 21 Jul 2020 15:42:57 +0100 Subject: [PATCH 02/14] Empty JUnit results are not allowed (#6488) --- .ci/dev/nightly-regression/Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/dev/nightly-regression/Jenkinsfile b/.ci/dev/nightly-regression/Jenkinsfile index 7dd4301440..303bd722a9 100644 --- a/.ci/dev/nightly-regression/Jenkinsfile +++ b/.ci/dev/nightly-regression/Jenkinsfile @@ -56,7 +56,7 @@ pipeline { post { always { archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false - junit testResults: '**/build/test-results-xml/**/*.xml', allowEmptyResults: true + junit testResults: '**/build/test-results-xml/**/*.xml' } cleanup { deleteDir() /* clean up our workspace */ From 7437630d56b0e846e7ec3b233f294466c5ab589d Mon Sep 17 00:00:00 2001 From: Waldemar Zurowski Date: Tue, 21 Jul 2020 15:50:21 +0100 Subject: [PATCH 03/14] Empty JUnit results are not allowed --- .ci/dev/mswin/Jenkinsfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.ci/dev/mswin/Jenkinsfile b/.ci/dev/mswin/Jenkinsfile index b0e3766e0c..714fadf4fb 100644 --- a/.ci/dev/mswin/Jenkinsfile +++ b/.ci/dev/mswin/Jenkinsfile @@ -65,7 +65,7 @@ pipeline { post { always { archiveArtifacts allowEmptyArchive: true, artifacts: '**/logs/**/*.log' - junit testResults: '**/build/test-results/**/*.xml', keepLongStdio: true, allowEmptyResults: true + junit testResults: '**/build/test-results/**/*.xml', keepLongStdio: true bat '.ci/kill_corda_procs.cmd' } cleanup { @@ -87,7 +87,7 @@ pipeline { post { always { archiveArtifacts allowEmptyArchive: true, artifacts: '**/logs/**/*.log' - junit testResults: '**/build/test-results/**/*.xml', keepLongStdio: true, allowEmptyResults: true + junit testResults: '**/build/test-results/**/*.xml', keepLongStdio: true bat '.ci/kill_corda_procs.cmd' } cleanup { From 5ee262653007551031a60011dc16b8c01c3fb840 Mon Sep 17 00:00:00 2001 From: Waldemar Zurowski Date: Tue, 21 Jul 2020 15:55:56 +0100 Subject: [PATCH 04/14] Empty JUnit results are not allowed --- .ci/dev/compatibility/JenkinsfileJDK11Azul | 2 +- .ci/dev/regression/Jenkinsfile | 2 +- Jenkinsfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.ci/dev/compatibility/JenkinsfileJDK11Azul b/.ci/dev/compatibility/JenkinsfileJDK11Azul index d24a3f7ac4..1dd3b40043 100644 --- a/.ci/dev/compatibility/JenkinsfileJDK11Azul +++ b/.ci/dev/compatibility/JenkinsfileJDK11Azul @@ -153,7 +153,7 @@ pipeline { post { always { archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false - junit testResults: '**/build/test-results-xml/**/*.xml', allowEmptyResults: true + junit testResults: '**/build/test-results-xml/**/*.xml' } cleanup { deleteDir() /* clean up our workspace */ diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile index 9af46eba66..8de096bccf 100644 --- a/.ci/dev/regression/Jenkinsfile +++ b/.ci/dev/regression/Jenkinsfile @@ -177,7 +177,7 @@ pipeline { post { always { archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false - junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true, allowEmptyResults: true + junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true script { try { diff --git a/Jenkinsfile b/Jenkinsfile index c27f461148..c96e217de3 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -79,7 +79,7 @@ pipeline { post { always { archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false - junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true, allowEmptyResults: true + junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true } cleanup { deleteDir() /* clean up our workspace */ From a03fb8c6fd903ce9db88f4f073f1d012d647af13 Mon Sep 17 00:00:00 2001 From: Yiftach Kaplan <67583323+yift-r3@users.noreply.github.com> Date: Tue, 21 Jul 2020 18:10:09 +0100 Subject: [PATCH 05/14] INFRA-438: Close session factory before closing the locator (#6477) --- .../net/corda/node/services/messaging/P2PMessagingClient.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 0fcb7a3ca7..b9035a6975 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -121,6 +121,7 @@ class P2PMessagingClient(val config: NodeConfiguration, var bridgeSession: ClientSession? = null var bridgeNotifyConsumer: ClientConsumer? = null var networkChangeSubscription: Subscription? = null + var sessionFactory: ClientSessionFactory? = null fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message) } @@ -172,7 +173,7 @@ class P2PMessagingClient(val config: NodeConfiguration, minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE isUseGlobalPools = nodeSerializationEnv != null } - val sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback) + sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback) // Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // using our TLS certificate. // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer @@ -490,8 +491,10 @@ class P2PMessagingClient(val config: NodeConfiguration, // Wait for the main loop to notice the consumer has gone and finish up. shutdownLatch.await() } + // Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests. state.locked { + sessionFactory?.close() locator?.close() } } From ca37b9b7372584620cb9cfe6dd4e5dc771369376 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Waldemar=20=C5=BBurowski?= <45210402+wzur-r3@users.noreply.github.com> Date: Wed, 22 Jul 2020 09:26:39 +0100 Subject: [PATCH 06/14] PR Code Checks use `standard` Jenkins agent (#6496) --- .ci/dev/pr-code-checks/Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/dev/pr-code-checks/Jenkinsfile b/.ci/dev/pr-code-checks/Jenkinsfile index a64813c92f..20cf49f912 100644 --- a/.ci/dev/pr-code-checks/Jenkinsfile +++ b/.ci/dev/pr-code-checks/Jenkinsfile @@ -4,7 +4,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger()) pipeline { - agent { label 'k8s' } + agent { label 'standard' } options { timestamps() timeout(time: 3, unit: 'HOURS') From 05532c0419dbdfd6b7817028b044bff7184c417f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Waldemar=20=C5=BBurowski?= <45210402+wzur-r3@users.noreply.github.com> Date: Wed, 22 Jul 2020 12:41:22 +0100 Subject: [PATCH 07/14] NexusIQ updates (#6499) * every build related to Corda X.Y (GA, RC, HC, patch or snapshot) uses the same NexusIQ application * NexusIQ application application *has* to exist before a build starts --- .ci/dev/regression/Jenkinsfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile index 85a699180a..4678bbda6c 100644 --- a/.ci/dev/regression/Jenkinsfile +++ b/.ci/dev/regression/Jenkinsfile @@ -57,14 +57,15 @@ pipeline { sh "./gradlew --no-daemon clean jar" script { sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties" - def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim() + /* every build related to Corda X.Y (GA, RC, HC, patch or snapshot) uses the same NexusIQ application */ + def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: \\([0-9]\\+\\.[0-9]\\+\\).*\$/\\1/'").trim() def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim() def artifactId = 'corda' nexusAppId = "jenkins-${groupId}-${artifactId}-${version}" } nexusPolicyEvaluation ( failBuildOnNetworkError: false, - iqApplication: manualApplication(nexusAppId), + iqApplication: selectedApplication(nexusAppId), // application *has* to exist before a build starts! iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']], iqStage: nexusIqStage ) From d810067ab686e9472ed799445b09fb4f979a094d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Waldemar=20=C5=BBurowski?= <45210402+wzur-r3@users.noreply.github.com> Date: Wed, 22 Jul 2020 14:43:14 +0100 Subject: [PATCH 08/14] NOTICK: Fixed for JDK11 builds (#6501) * NexusIQ every build related to Corda X.Y (GA, RC, HC, patch or snapshot) uses the same NexusIQ application * NexusIQ application application has to exist before a build starts * Fixed repository name for publishing, use OS instead of Ent one --- .ci/dev/compatibility/JenkinsfileJDK11Azul | 24 ++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/.ci/dev/compatibility/JenkinsfileJDK11Azul b/.ci/dev/compatibility/JenkinsfileJDK11Azul index 1dd3b40043..b692a4d736 100644 --- a/.ci/dev/compatibility/JenkinsfileJDK11Azul +++ b/.ci/dev/compatibility/JenkinsfileJDK11Azul @@ -1,3 +1,14 @@ +#!groovy +/** + * Jenkins pipeline to build Corda OS release with JDK11 + */ + +/** + * Kill already started job. + * Assume new commit takes precendence and results from previous + * unfinished builds are not required. + * This feature doesn't play well with disableConcurrentBuilds() option + */ @Library('corda-shared-build-pipeline-steps') import static com.r3.build.BuildControl.killAllExistingBuildsForJob @@ -22,13 +33,13 @@ if (isReleaseTag) { default: nexusIqStage = "operate" } } + pipeline { - agent { - label 'k8s' - } + agent { label 'k8s' } options { timestamps() timeout(time: 3, unit: 'HOURS') + buildDiscarder(logRotator(daysToKeepStr: '14', artifactDaysToKeepStr: '14')) } environment { @@ -48,14 +59,15 @@ pipeline { sh "./gradlew --no-daemon clean jar" script { sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties" - def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim() + /* every build related to Corda X.Y (GA, RC, HC, patch or snapshot) uses the same NexusIQ application */ + def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: \\([0-9]\\+\\.[0-9]\\+\\).*\$/\\1/'").trim() def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim() def artifactId = 'corda' nexusAppId = "jenkins-${groupId}-${artifactId}-jdk11-${version}" } nexusPolicyEvaluation ( failBuildOnNetworkError: false, - iqApplication: manualApplication(nexusAppId), + iqApplication: selectedApplication(nexusAppId), // application *has* to exist before a build starts! iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']], iqStage: nexusIqStage ) @@ -132,7 +144,7 @@ pipeline { rtGradleDeployer( id: 'deployer', serverId: 'R3-Artifactory', - repo: 'r3-corda-releases' + repo: 'corda-releases' ) rtGradleRun( usesPlugin: true, From a41152edf69d80ca25da62f485bb4fd12eb17ac8 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Wed, 22 Jul 2020 16:19:20 +0100 Subject: [PATCH 09/14] CORDA-3899 Refactor flow's transient fields (#6441) Refactor `FlowStateMachineImpl.transientValues` and `FlowStateMachineImpl.transientState` to stop the fields from exposing the fact that they are nullable. This is done by having private backing fields `transientValuesReference` and `transientStateReference` that can be null. The nullability is still needed due to serialisation and deserialisation of flow fibers. The fields are transient and therefore will be null when reloaded from the database. Getters and setters hide the private field, allowing a non-null field to returned. There is no point other than in `FlowCreator` where the transient fields can be null. Therefore the non null checks that are being made are valid. Add custom kryo serialisation and deserialisation to `TransientValues` and `StateMachineState` to ensure that neither of the objects are ever touched by kryo. --- .../node/services/statemachine/FlowCreator.kt | 8 +- .../FlowDefaultUncaughtExceptionHandler.kt | 16 +-- .../node/services/statemachine/FlowMonitor.kt | 4 +- .../statemachine/FlowStateMachineImpl.kt | 128 ++++++++++-------- .../SingleThreadedStateMachineManager.kt | 17 +-- .../statemachine/StateMachineState.kt | 15 +- .../statemachine/FlowFrameworkTests.kt | 22 +-- 7 files changed, 118 insertions(+), 92 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt index be8026b73f..08a006c345 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt @@ -69,11 +69,11 @@ class FlowCreator( val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null val resultFuture = openFuture() - fiber.transientValues = TransientReference(createTransientValues(runId, resultFuture)) fiber.logic.stateMachine = fiber verifyFlowLogicIsSuspendable(fiber.logic) val state = createStateMachineState(checkpoint, fiber, true) - fiber.transientState = TransientReference(state) + fiber.transientValues = createTransientValues(runId, resultFuture) + fiber.transientState = state return Flow(fiber, resultFuture) } @@ -91,7 +91,7 @@ class FlowCreator( // have access to the fiber (and thereby the service hub) val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler) val resultFuture = openFuture() - flowStateMachineImpl.transientValues = TransientReference(createTransientValues(flowId, resultFuture)) + flowStateMachineImpl.transientValues = createTransientValues(flowId, resultFuture) flowLogic.stateMachine = flowStateMachineImpl val frozenFlowLogic = (flowLogic as FlowLogic<*>).checkpointSerialize(context = checkpointSerializationContext) val flowCorDappVersion = FlowStateMachineImpl.createSubFlowVersion( @@ -113,7 +113,7 @@ class FlowCreator( existingCheckpoint != null, deduplicationHandler, senderUUID) - flowStateMachineImpl.transientState = TransientReference(state) + flowStateMachineImpl.transientState = state return Flow(flowStateMachineImpl, resultFuture) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowDefaultUncaughtExceptionHandler.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowDefaultUncaughtExceptionHandler.kt index 0dc5f28791..44a3c8876b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowDefaultUncaughtExceptionHandler.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowDefaultUncaughtExceptionHandler.kt @@ -39,18 +39,14 @@ class FlowDefaultUncaughtExceptionHandler( val id = fiber.id if (!fiber.resultFuture.isDone) { fiber.transientState.let { state -> - if (state != null) { - fiber.logger.warn("Forcing flow $id into overnight observation") - flowHospital.forceIntoOvernightObservation(state.value, listOf(throwable)) - val hospitalizedCheckpoint = state.value.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED) - val hospitalizedState = state.value.copy(checkpoint = hospitalizedCheckpoint) - fiber.transientState = TransientReference(hospitalizedState) - } else { - fiber.logger.warn("The fiber's transient state is not set, cannot force flow $id into in-memory overnight observation, status will still be updated in database") - } + fiber.logger.warn("Forcing flow $id into overnight observation") + flowHospital.forceIntoOvernightObservation(state, listOf(throwable)) + val hospitalizedCheckpoint = state.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalizedState = state.copy(checkpoint = hospitalizedCheckpoint) + fiber.transientState = hospitalizedState } - scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, 0, TimeUnit.SECONDS) } + scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, 0, TimeUnit.SECONDS) } @Suppress("TooGenericExceptionCaught") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt index 5fba6ae8b7..26ab9d5987 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -96,12 +96,12 @@ internal class FlowMonitor( private fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant): Duration { - return transientState?.value?.checkpoint?.timestamp?.let { Duration.between(it, now) } ?: Duration.ZERO + return transientState.checkpoint.timestamp.let { Duration.between(it, now) } ?: Duration.ZERO } private fun FlowStateMachineImpl<*>.isSuspended() = !snapshot().isFlowResumed - private fun FlowStateMachineImpl<*>.isStarted() = transientState?.value?.checkpoint?.flowState is FlowState.Started + private fun FlowStateMachineImpl<*>.isStarted() = transientState.checkpoint.flowState is FlowState.Started private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id) } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 5277d89638..ce4fdea2bd 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -6,6 +6,10 @@ import co.paralleluniverse.fibers.FiberScheduler import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.channels.Channel +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoSerializable +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.contracts.StateRef @@ -58,7 +62,6 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC import java.util.concurrent.TimeUnit -import kotlin.reflect.KProperty1 class FlowPermissionException(message: String) : FlowException(message) @@ -86,52 +89,65 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, private val SERIALIZER_BLOCKER = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER").apply { isAccessible = true }.get(null) } - override val serviceHub get() = getTransientField(TransientValues::serviceHub) - data class TransientValues( - val eventQueue: Channel, - val resultFuture: CordaFuture, - val database: CordaPersistence, - val transitionExecutor: TransitionExecutor, - val actionExecutor: ActionExecutor, - val stateMachine: StateMachine, - val serviceHub: ServiceHubInternal, - val checkpointSerializationContext: CheckpointSerializationContext, - val unfinishedFibers: ReusableLatch, - val waitTimeUpdateHook: (id: StateMachineRunId, timeout: Long) -> Unit - ) + val eventQueue: Channel, + val resultFuture: CordaFuture, + val database: CordaPersistence, + val transitionExecutor: TransitionExecutor, + val actionExecutor: ActionExecutor, + val stateMachine: StateMachine, + val serviceHub: ServiceHubInternal, + val checkpointSerializationContext: CheckpointSerializationContext, + val unfinishedFibers: ReusableLatch, + val waitTimeUpdateHook: (id: StateMachineRunId, timeout: Long) -> Unit + ) : KryoSerializable { + override fun write(kryo: Kryo?, output: Output?) { + throw IllegalStateException("${TransientValues::class.qualifiedName} should never be serialized") + } - internal var transientValues: TransientReference? = null - internal var transientState: TransientReference? = null - - /** - * What sender identifier to put on messages sent by this flow. This will either be the identifier for the current - * state machine manager / messaging client, or null to indicate this flow is restored from a checkpoint and - * the de-duplication of messages it sends should not be optimised since this could be unreliable. - */ - override val ourSenderUUID: String? - get() = transientState?.value?.senderUUID - - private fun getTransientField(field: KProperty1): A { - val suppliedValues = transientValues ?: throw IllegalStateException("${field.name} wasn't supplied!") - return field.get(suppliedValues.value) + override fun read(kryo: Kryo?, input: Input?) { + throw IllegalStateException("${TransientValues::class.qualifiedName} should never be deserialized") + } } - private fun extractThreadLocalTransaction(): TransientReference { - val transaction = contextTransaction - contextTransactionOrNull = null - return TransientReference(transaction) - } + private var transientValuesReference: TransientReference? = null + internal var transientValues: TransientValues + // After the flow has been created, the transient values should never be null + get() = transientValuesReference!!.value + set(values) { + check(transientValuesReference?.value == null) { "The transient values should only be set once when initialising a flow" } + transientValuesReference = TransientReference(values) + } + + private var transientStateReference: TransientReference? = null + internal var transientState: StateMachineState + // After the flow has been created, the transient state should never be null + get() = transientStateReference!!.value + set(state) { + transientStateReference = TransientReference(state) + } /** * Return the logger for this state machine. The logger name incorporates [id] and so including it in the log message * is not necessary. */ override val logger = log - override val resultFuture: CordaFuture get() = uncheckedCast(getTransientField(TransientValues::resultFuture)) - override val context: InvocationContext get() = transientState!!.value.checkpoint.checkpointState.invocationContext - override val ourIdentity: Party get() = transientState!!.value.checkpoint.checkpointState.ourIdentity - override val isKilled: Boolean get() = transientState!!.value.isKilled + + override val instanceId: StateMachineInstanceId get() = StateMachineInstanceId(id, super.getId()) + + override val serviceHub: ServiceHubInternal get() = transientValues.serviceHub + override val stateMachine: StateMachine get() = transientValues.stateMachine + override val resultFuture: CordaFuture get() = uncheckedCast(transientValues.resultFuture) + + override val context: InvocationContext get() = transientState.checkpoint.checkpointState.invocationContext + override val ourIdentity: Party get() = transientState.checkpoint.checkpointState.ourIdentity + override val isKilled: Boolean get() = transientState.isKilled + /** + * What sender identifier to put on messages sent by this flow. This will either be the identifier for the current + * state machine manager / messaging client, or null to indicate this flow is restored from a checkpoint and + * the de-duplication of messages it sends should not be optimised since this could be unreliable. + */ + override val ourSenderUUID: String? get() = transientState.senderUUID internal val softLockedStates = mutableSetOf() @@ -143,9 +159,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Suspendable private fun processEvent(transitionExecutor: TransitionExecutor, event: Event): FlowContinuation { setLoggingContext() - val stateMachine = getTransientField(TransientValues::stateMachine) - val oldState = transientState!!.value - val actionExecutor = getTransientField(TransientValues::actionExecutor) + val stateMachine = transientValues.stateMachine + val oldState = transientState + val actionExecutor = transientValues.actionExecutor val transition = stateMachine.transition(event, oldState) val (continuation, newState) = transitionExecutor.executeTransition(this, oldState, event, transition, actionExecutor) // Ensure that the next state that is being written to the transient state maintains the [isKilled] flag @@ -153,7 +169,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, if (oldState.isKilled && !newState.isKilled) { newState.isKilled = true } - transientState = TransientReference(newState) + transientState = newState setLoggingContext() return continuation } @@ -171,15 +187,15 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Suspendable private fun processEventsUntilFlowIsResumed(isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): Any? { checkDbTransaction(isDbTransactionOpenOnEntry) - val transitionExecutor = getTransientField(TransientValues::transitionExecutor) - val eventQueue = getTransientField(TransientValues::eventQueue) + val transitionExecutor = transientValues.transitionExecutor + val eventQueue = transientValues.eventQueue try { eventLoop@ while (true) { val nextEvent = try { eventQueue.receive() } catch (interrupted: InterruptedException) { log.error("Flow interrupted while waiting for events, aborting immediately") - (transientValues?.value?.resultFuture as? OpenFuture<*>)?.setException(KilledFlowException(id)) + (transientValues.resultFuture as? OpenFuture<*>)?.setException(KilledFlowException(id)) abortFiber() } val continuation = processEvent(transitionExecutor, nextEvent) @@ -246,7 +262,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation { checkDbTransaction(isDbTransactionOpenOnEntry) - val transitionExecutor = getTransientField(TransientValues::transitionExecutor) + val transitionExecutor = transientValues.transitionExecutor val continuation = processEvent(transitionExecutor, event) checkDbTransaction(isDbTransactionOpenOnExit) return continuation @@ -270,7 +286,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } private fun openThreadLocalWormhole() { - val threadLocal = getTransientField(TransientValues::database).hikariPoolThreadLocal + val threadLocal = transientValues.database.hikariPoolThreadLocal if (threadLocal != null) { val valueFromThread = swappedOutThreadLocalValue(threadLocal) threadLocal.set(valueFromThread) @@ -332,7 +348,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } recordDuration(startTime) - getTransientField(TransientValues::unfinishedFibers).countDown() + transientValues.unfinishedFibers.countDown() } @Suspendable @@ -476,7 +492,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Suspendable override fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): R { - val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext)) + val serializationContext = TransientReference(transientValues.checkpointSerializationContext) val transaction = extractThreadLocalTransaction() parkAndSerialize { _, _ -> setLoggingContext() @@ -524,13 +540,19 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, return subFlowStack.any { IdempotentFlow::class.java.isAssignableFrom(it.flowClass) } } + private fun extractThreadLocalTransaction(): TransientReference { + val transaction = contextTransaction + contextTransactionOrNull = null + return TransientReference(transaction) + } + @Suspendable override fun scheduleEvent(event: Event) { - getTransientField(TransientValues::eventQueue).send(event) + transientValues.eventQueue.send(event) } override fun snapshot(): StateMachineState { - return transientState!!.value + return transientState } /** @@ -538,13 +560,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * retried. */ override fun updateTimedFlowTimeout(timeoutSeconds: Long) { - getTransientField(TransientValues::waitTimeUpdateHook).invoke(id, timeoutSeconds) + transientValues.waitTimeUpdateHook.invoke(id, timeoutSeconds) } - override val stateMachine get() = getTransientField(TransientValues::stateMachine) - - override val instanceId: StateMachineInstanceId get() = StateMachineInstanceId(id, super.getId()) - /** * Records the duration of this flow – from call() to completion or failure. * Note that the duration will include the time the flow spent being parked, and not just the total diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 1d07a75d02..fba5833661 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -261,14 +261,9 @@ internal class SingleThreadedStateMachineManager( unfinishedFibers.countDown() val state = flow.fiber.transientState - return@withLock if (state != null) { - state.value.isKilled = true - flow.fiber.scheduleEvent(Event.DoRemainingWork) - true - } else { - logger.info("Flow $id has not been initialised correctly and cannot be killed") - false - } + state.isKilled = true + flow.fiber.scheduleEvent(Event.DoRemainingWork) + true } else { // It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists. database.transaction { checkpointStorage.removeCheckpoint(id) } @@ -386,7 +381,7 @@ internal class SingleThreadedStateMachineManager( currentState.cancelFutureIfRunning() // Get set of external events val flowId = currentState.flowLogic.runId - val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue + val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.eventQueue if (oldFlowLeftOver == null) { logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.") return @@ -592,7 +587,7 @@ internal class SingleThreadedStateMachineManager( ): CordaFuture> { val existingFlow = innerState.withLock { flows[flowId] } - val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) { + val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState.isAnyCheckpointPersisted) { // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) @@ -756,7 +751,7 @@ internal class SingleThreadedStateMachineManager( // The flow's event queue may be non-empty in case it shut down abruptly. We handle outstanding events here. private fun drainFlowEventQueue(flow: Flow<*>) { while (true) { - val event = flow.fiber.transientValues!!.value.eventQueue.tryReceive() ?: return + val event = flow.fiber.transientValues.eventQueue.tryReceive() ?: return when (event) { is Event.DoRemainingWork -> {} is Event.DeliverSessionMessage -> { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index e652d26145..6c124d41e6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -1,5 +1,9 @@ package net.corda.node.services.statemachine +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoSerializable +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output import net.corda.core.context.InvocationContext import net.corda.core.crypto.SecureHash import net.corda.core.flows.Destination @@ -15,6 +19,7 @@ import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler +import java.lang.IllegalStateException import java.time.Instant import java.util.concurrent.Future @@ -55,7 +60,15 @@ data class StateMachineState( @Volatile var isKilled: Boolean, val senderUUID: String? -) +) : KryoSerializable { + override fun write(kryo: Kryo?, output: Output?) { + throw IllegalStateException("${StateMachineState::class.qualifiedName} should never be serialized") + } + + override fun read(kryo: Kryo?, input: Input?) { + throw IllegalStateException("${StateMachineState::class.qualifiedName} should never be deserialized") + } +} /** * @param checkpointState the state of the checkpoint diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index d6eb21aa33..1967f9ff63 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -26,6 +26,7 @@ import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.declaredField import net.corda.core.messaging.MessageRecipients import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.queryBy @@ -173,9 +174,12 @@ class FlowFrameworkTests { val flow = ReceiveFlow(bob) val fiber = aliceNode.services.startFlow(flow) as FlowStateMachineImpl // Before the flow runs change the suspend action to throw an exception - val throwingActionExecutor = SuspendThrowingActionExecutor(Exception("Thrown during suspend"), - fiber.transientValues!!.value.actionExecutor) - fiber.transientValues = TransientReference(fiber.transientValues!!.value.copy(actionExecutor = throwingActionExecutor)) + val throwingActionExecutor = SuspendThrowingActionExecutor( + Exception("Thrown during suspend"), + fiber.transientValues.actionExecutor + ) + fiber.declaredField>("transientValuesReference").value = + TransientReference(fiber.transientValues.copy(actionExecutor = throwingActionExecutor)) mockNet.runNetwork() fiber.resultFuture.getOrThrow() assertThat(aliceNode.smm.allStateMachines).isEmpty() @@ -679,14 +683,14 @@ class FlowFrameworkTests { SuspendingFlow.hookBeforeCheckpoint = { val flowFiber = this as? FlowStateMachineImpl<*> - flowState = flowFiber!!.transientState!!.value.checkpoint.flowState + flowState = flowFiber!!.transientState.checkpoint.flowState if (firstExecution) { throw HospitalizeFlowException() } else { dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint - inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState!!.value.checkpoint.status + inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status futureFiber.complete(flowFiber) } @@ -701,7 +705,7 @@ class FlowFrameworkTests { } // flow is in hospital assertTrue(flowState is FlowState.Unstarted) - val inMemoryHospitalizedCheckpointStatus = aliceNode.internals.smm.snapshot().first().transientState?.value?.checkpoint?.status + val inMemoryHospitalizedCheckpointStatus = aliceNode.internals.smm.snapshot().first().transientState.checkpoint.status assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, inMemoryHospitalizedCheckpointStatus) aliceNode.database.transaction { val checkpoint = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second @@ -727,13 +731,13 @@ class FlowFrameworkTests { SuspendingFlow.hookAfterCheckpoint = { val flowFiber = this as? FlowStateMachineImpl<*> - flowState = flowFiber!!.transientState!!.value.checkpoint.flowState + flowState = flowFiber!!.transientState.checkpoint.flowState if (firstExecution) { throw HospitalizeFlowException() } else { dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status - inMemoryCheckpointStatus = flowFiber.transientState!!.value.checkpoint.status + inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status futureFiber.complete(flowFiber) } @@ -820,7 +824,7 @@ class FlowFrameworkTests { } else { val flowFiber = this as? FlowStateMachineImpl<*> dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status - inMemoryCheckpointStatus = flowFiber!!.transientState!!.value.checkpoint.status + inMemoryCheckpointStatus = flowFiber!!.transientState.checkpoint.status persistedException = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowFiber.id)!!.exceptionDetails } } From c33720c73d0fd40b8d98bcbb63aafb0d7ed8bd5b Mon Sep 17 00:00:00 2001 From: Joseph Zuniga-Daly <59851625+josephzunigadaly@users.noreply.github.com> Date: Wed, 22 Jul 2020 17:31:59 +0100 Subject: [PATCH 10/14] CORDA-3717: Apply custom serializers to checkpoints (#6392) * CORDA-3717: Apply custom serializers to checkpoints * Remove try/catch to fix TooGenericExceptionCaught detekt rule * Rename exception * Extract method * Put calls to the userSerializer on their own lines to improve readability * Remove unused constructors from exception * Remove unused proxyType field * Give field a descriptive name * Explain why we are looking for two type parameters when we only use one * Tidy up the fetching of types * Use 0 seconds when forcing a flow checkpoint inside test * Add test to check references are restored correctly * Add CheckpointCustomSerializer interface * Wire up the new CheckpointCustomSerializer interface * Use kryo default for abstract classes * Remove unused imports * Remove need for external library in tests * Make file match original to remove from diff * Remove maySkipCheckpoint from calls to sleep * Add newline to end of file * Test custom serializers mapped to interfaces * Test serializer configured with abstract class * Move test into its own package * Rename test * Move flows and serializers into their own source file * Move broken map into its own source file * Delete comment now source file is simpler * Rename class to have a shorter name * Add tests that run the checkpoint serializer directly * Check serialization of final classes * Register as default unless the target class is final * Test PublicKey serializer has not been overridden * Add a broken serializer for EdDSAPublicKey to make test more robust * Split serializer registration into default and non-default registrations. Run registrations at the right time to preserve Cordas own custom serializers. * Check for duplicate custom checkpoint serializers * Add doc comments * Add doc comments to CustomSerializerCheckpointAdaptor * Add test to check duplicate serializers are logged * Do not log the duplicate serializer warning when the duplicate is the same class * Update doc comment for CheckpointCustomSerializer * Sort serializers by classname so we are not registering in an unknown or random order * Add test to serialize a class that references itself * Store custom serializer type in the Kryo stream so we can spot when a different serializer is being used to deserialize * Testing has shown that registering custom serializers as default is more robust when adding new cordapps * Remove new line character * Remove unused imports * Add interface net.corda.core.serialization.CheckpointCustomSerializer to api-current.txt * Remove comment * Update comment on exception * Make CustomSerializerCheckpointAdaptor internal * Revert "Add interface net.corda.core.serialization.CheckpointCustomSerializer to api-current.txt" This reverts commit b835de79bd21f0048be741e7fc5f0c3088516d2b. * Restore "Add interface net.corda.core.serialization.CheckpointCustomSerializer to api-current.txt"" This reverts commit 718873a4e963bad4e327bb200e7bb4de44bc47ad. * Pass the class loader instead of the context * Do less work in test setup * Make the serialization context unique for CustomCheckpointSerializerTest so we get a new Kryo pool for the test * Rebuild the Kryo pool for the given context when we change custom serializers * Rebuild all Kryo pools on serializer change to keep serializer list consistent * Move the custom serializer list into CheckpointSerializationContext to reduce scope from global to a serialization context * Remove unused imports * Make the new checkpointCustomSerializers property default to the empty list * Delegate implementation using kotlin language feature --- .ci/api-current.txt | 4 + .../kotlin/net/corda/core/cordapp/Cordapp.kt | 3 + .../core/internal/cordapp/CordappImpl.kt | 3 + .../SerializationCustomSerializer.kt | 23 ++ .../internal/CheckpointSerializationAPI.kt | 9 + .../kryo/CustomSerializerCheckpointAdaptor.kt | 103 +++++++++ .../kryo/KryoCheckpointSerializer.kt | 54 ++++- .../CustomCheckpointSerializerTest.kt | 99 ++++++++ .../DifficultToSerialize.kt | 27 +++ .../DuplicateSerializerLogTest.kt | 59 +++++ ...cateSerializerLogWithSameSerializerTest.kt | 58 +++++ ...ckNetworkCustomCheckpointSerializerTest.kt | 75 ++++++ .../ReferenceLoopTest.kt | 75 ++++++ .../customcheckpointserializer/TestCorDapp.kt | 214 ++++++++++++++++++ .../kotlin/net/corda/node/internal/Node.kt | 4 +- .../cordapp/JarScanningCordappLoader.kt | 6 + .../node/internal/cordapp/VirtualCordapps.kt | 4 + .../internal/CheckpointSerializationScheme.kt | 6 +- .../InternalSerializationTestHelpers.kt | 6 +- 19 files changed, 826 insertions(+), 6 deletions(-) create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomSerializerCheckpointAdaptor.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/CustomCheckpointSerializerTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DifficultToSerialize.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogWithSameSerializerTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/MockNetworkCustomCheckpointSerializerTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/ReferenceLoopTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/TestCorDapp.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 64e351610e..10374f09e3 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -5398,6 +5398,10 @@ public interface net.corda.core.schemas.QueryableState extends net.corda.core.co ## public interface net.corda.core.schemas.StatePersistable ## +public interface net.corda.core.serialization.CheckpointCustomSerializer + public abstract OBJ fromProxy(PROXY) + public abstract PROXY toProxy(OBJ) +## public interface net.corda.core.serialization.ClassWhitelist public abstract boolean hasListed(Class) ## diff --git a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt index 753e842fe6..1dd153e0ae 100644 --- a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt +++ b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.internal.cordapp.CordappImpl.Companion.UNKNOWN_VALUE import net.corda.core.schemas.MappedSchema +import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken @@ -29,6 +30,7 @@ import java.net.URL * @property services List of RPC services * @property serializationWhitelists List of Corda plugin registries * @property serializationCustomSerializers List of serializers + * @property checkpointCustomSerializers List of serializers for checkpoints * @property customSchemas List of custom schemas * @property allFlows List of all flow classes * @property jarPath The path to the JAR for this CorDapp @@ -49,6 +51,7 @@ interface Cordapp { val services: List> val serializationWhitelists: List val serializationCustomSerializers: List> + val checkpointCustomSerializers: List> val customSchemas: Set val allFlows: List>> val jarPath: URL diff --git a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt index d511ba7860..1c5d69e511 100644 --- a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt +++ b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt @@ -9,6 +9,7 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.toPath import net.corda.core.schemas.MappedSchema +import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken @@ -25,6 +26,7 @@ data class CordappImpl( override val services: List>, override val serializationWhitelists: List, override val serializationCustomSerializers: List>, + override val checkpointCustomSerializers: List>, override val customSchemas: Set, override val allFlows: List>>, override val jarPath: URL, @@ -79,6 +81,7 @@ data class CordappImpl( services = emptyList(), serializationWhitelists = emptyList(), serializationCustomSerializers = emptyList(), + checkpointCustomSerializers = emptyList(), customSchemas = emptySet(), jarPath = Paths.get("").toUri().toURL(), info = UNKNOWN_INFO, diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt index d0c910b638..ed387f8f94 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt @@ -25,3 +25,26 @@ interface SerializationCustomSerializer { */ fun fromProxy(proxy: PROXY): OBJ } + +/** + * Allows CorDapps to provide custom serializers for classes that do not serialize successfully during a checkpoint. + * In this case, a proxy serializer can be written that implements this interface whose purpose is to move between + * unserializable types and an intermediate representation. + * + * NOTE: Only implement this interface if you have a class that triggers an error during normal checkpoint + * serialization/deserialization. + */ +@KeepForDJVM +interface CheckpointCustomSerializer { + /** + * Should facilitate the conversion of the third party object into the serializable + * local class specified by [PROXY] + */ + fun toProxy(obj: OBJ): PROXY + + /** + * Should facilitate the conversion of the proxy object into a new instance of the + * unserializable type + */ + fun fromProxy(proxy: PROXY): OBJ +} diff --git a/core/src/main/kotlin/net/corda/core/serialization/internal/CheckpointSerializationAPI.kt b/core/src/main/kotlin/net/corda/core/serialization/internal/CheckpointSerializationAPI.kt index 98fdcd730d..510986141c 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/internal/CheckpointSerializationAPI.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/internal/CheckpointSerializationAPI.kt @@ -56,6 +56,10 @@ interface CheckpointSerializationContext { * otherwise they appear as new copies of the object. */ val objectReferencesEnabled: Boolean + /** + * User defined custom serializers for use in checkpoint serialization. + */ + val checkpointCustomSerializers: Iterable> /** * Helper method to return a new context based on this context with the property added. @@ -86,6 +90,11 @@ interface CheckpointSerializationContext { * A shallow copy of this context but with the given encoding whitelist. */ fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist): CheckpointSerializationContext + + /** + * A shallow copy of this context but with the given custom serializers. + */ + fun withCheckpointCustomSerializers(checkpointCustomSerializers: Iterable>): CheckpointSerializationContext } /* diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomSerializerCheckpointAdaptor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomSerializerCheckpointAdaptor.kt new file mode 100644 index 0000000000..4f3475696b --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomSerializerCheckpointAdaptor.kt @@ -0,0 +1,103 @@ +package net.corda.nodeapi.internal.serialization.kryo + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import net.corda.core.serialization.CheckpointCustomSerializer +import net.corda.serialization.internal.amqp.CORDAPP_TYPE +import java.lang.reflect.Type +import kotlin.reflect.jvm.javaType +import kotlin.reflect.jvm.jvmErasure + +/** + * Adapts CheckpointCustomSerializer for use in Kryo + */ +internal class CustomSerializerCheckpointAdaptor(private val userSerializer : CheckpointCustomSerializer) : Serializer() { + + /** + * The class name of the serializer we are adapting. + */ + val serializerName: String = userSerializer.javaClass.name + + /** + * The input type of this custom serializer. + */ + val cordappType: Type + + /** + * Check we have access to the types specified on the CheckpointCustomSerializer interface. + * + * Throws UnableToDetermineSerializerTypesException if the types are missing. + */ + init { + val types: List = userSerializer::class + .supertypes + .filter { it.jvmErasure == CheckpointCustomSerializer::class } + .flatMap { it.arguments } + .mapNotNull { it.type?.javaType } + + // We are expecting a cordapp type and a proxy type. + // We will only use the cordapp type in this class + // but we want to check both are present. + val typeParameterCount = 2 + if (types.size != typeParameterCount) { + throw UnableToDetermineSerializerTypesException("Unable to determine serializer parent types") + } + cordappType = types[CORDAPP_TYPE] + } + + /** + * Serialize obj to the Kryo stream. + */ + override fun write(kryo: Kryo, output: Output, obj: OBJ) { + + fun writeToKryo(obj: T) = kryo.writeClassAndObject(output, obj) + + // Write serializer type + writeToKryo(serializerName) + + // Write proxy object + writeToKryo(userSerializer.toProxy(obj)) + } + + /** + * Deserialize an object from the Kryo stream. + */ + override fun read(kryo: Kryo, input: Input, type: Class): OBJ { + + @Suppress("UNCHECKED_CAST") + fun readFromKryo() = kryo.readClassAndObject(input) as T + + // Check the serializer type + checkSerializerType(readFromKryo()) + + // Read the proxy object + return userSerializer.fromProxy(readFromKryo()) + } + + /** + * Throws a `CustomCheckpointSerializersHaveChangedException` if the serializer type in the kryo stream does not match the serializer + * type for this custom serializer. + * + * @param checkpointSerializerType Serializer type from the Kryo stream + */ + private fun checkSerializerType(checkpointSerializerType: String) { + if (checkpointSerializerType != serializerName) + throw CustomCheckpointSerializersHaveChangedException("The custom checkpoint serializers have changed while checkpoints exist. " + + "Please restore the CorDapps to when this checkpoint was created.") + } +} + +/** + * Thrown when the input/output types are missing from the custom serializer. + */ +class UnableToDetermineSerializerTypesException(message: String) : RuntimeException(message) + +/** + * Thrown when the custom serializer is found to be reading data from another type of custom serializer. + * + * This was expected to happen if the user adds or removes CorDapps while checkpoints exist but it turned out that registering serializers + * as default made the system reliable. + */ +class CustomCheckpointSerializersHaveChangedException(message: String) : RuntimeException(message) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt index 6a73119ce6..06698d99ad 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt @@ -10,12 +10,14 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.pool.KryoPool import com.esotericsoftware.kryo.serializers.ClosureSerializer import net.corda.core.internal.uncheckedCast +import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.CheckpointSerializer import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.loggerFor import net.corda.serialization.internal.AlwaysAcceptEncodingWhitelist import net.corda.serialization.internal.ByteBufferInputStream import net.corda.serialization.internal.CheckpointSerializationContextImpl @@ -40,10 +42,10 @@ private object AutoCloseableSerialisationDetector : Serializer() } object KryoCheckpointSerializer : CheckpointSerializer { - private val kryoPoolsForContexts = ConcurrentHashMap, KryoPool>() + private val kryoPoolsForContexts = ConcurrentHashMap>>, KryoPool>() private fun getPool(context: CheckpointSerializationContext): KryoPool { - return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) { + return kryoPoolsForContexts.computeIfAbsent(Triple(context.whitelist, context.deserializationClassLoader, context.checkpointCustomSerializers)) { KryoPool.Builder { val serializer = Fiber.getFiberSerializer(false) as KryoSerializer val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) } @@ -56,12 +58,60 @@ object KryoCheckpointSerializer : CheckpointSerializer { addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector) register(ClosureSerializer.Closure::class.java, CordaClosureSerializer) classLoader = it.second + + // Add custom serializers + val customSerializers = buildCustomSerializerAdaptors(context) + warnAboutDuplicateSerializers(customSerializers) + val classToSerializer = mapInputClassToCustomSerializer(context.deserializationClassLoader, customSerializers) + addDefaultCustomSerializers(this, classToSerializer) } }.build() } } + /** + * Returns a sorted list of CustomSerializerCheckpointAdaptor based on the custom serializers inside context. + * + * The adaptors are sorted by serializerName which maps to javaClass.name for the serializer class + */ + private fun buildCustomSerializerAdaptors(context: CheckpointSerializationContext) = + context.checkpointCustomSerializers.map { CustomSerializerCheckpointAdaptor(it) }.sortedBy { it.serializerName } + + /** + * Returns a list of pairs where the first element is the input class of the custom serializer and the second element is the + * custom serializer. + */ + private fun mapInputClassToCustomSerializer(classLoader: ClassLoader, customSerializers: Iterable>) = + customSerializers.map { getInputClassForCustomSerializer(classLoader, it) to it } + + /** + * Returns the Class object for the serializers input type. + */ + private fun getInputClassForCustomSerializer(classLoader: ClassLoader, customSerializer: CustomSerializerCheckpointAdaptor<*, *>): Class<*> { + val typeNameWithoutGenerics = customSerializer.cordappType.typeName.substringBefore('<') + return classLoader.loadClass(typeNameWithoutGenerics) + } + + /** + * Emit a warning if two or more custom serializers are found for the same input type. + */ + private fun warnAboutDuplicateSerializers(customSerializers: Iterable>) = + customSerializers + .groupBy({ it.cordappType }, { it.serializerName }) + .filter { (_, serializerNames) -> serializerNames.distinct().size > 1 } + .forEach { (inputType, serializerNames) -> loggerFor().warn("Duplicate custom checkpoint serializer for type $inputType. Serializers: ${serializerNames.joinToString(", ")}") } + + /** + * Register all custom serializers as default, this class + subclass, registrations. + * + * Serializers registered before this will take priority. This needs to run after registrations we want to keep otherwise it may + * replace them. + */ + private fun addDefaultCustomSerializers(kryo: Kryo, classToSerializer: Iterable, CustomSerializerCheckpointAdaptor<*, *>>>) = + classToSerializer + .forEach { (clazz, customSerializer) -> kryo.addDefaultSerializer(clazz, customSerializer) } + private fun CheckpointSerializationContext.kryo(task: Kryo.() -> T): T { return getPool(this).run { kryo -> kryo.context.ensureCapacity(properties.size) diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/CustomCheckpointSerializerTest.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/CustomCheckpointSerializerTest.kt new file mode 100644 index 0000000000..0efb030fff --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/CustomCheckpointSerializerTest.kt @@ -0,0 +1,99 @@ +package net.corda.node.customcheckpointserializer + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.crypto.generateKeyPair +import net.corda.core.serialization.EncodingWhitelist +import net.corda.core.serialization.internal.CheckpointSerializationContext +import net.corda.core.serialization.internal.checkpointDeserialize +import net.corda.core.serialization.internal.checkpointSerialize +import net.corda.coretesting.internal.rigorousMock +import net.corda.serialization.internal.AllWhitelist +import net.corda.serialization.internal.CheckpointSerializationContextImpl +import net.corda.serialization.internal.CordaSerializationEncoding +import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule +import org.junit.Assert +import org.junit.Rule +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(Parameterized::class) +class CustomCheckpointSerializerTest(private val compression: CordaSerializationEncoding?) { + companion object { + @Parameterized.Parameters(name = "{0}") + @JvmStatic + fun compression() = arrayOf(null) + CordaSerializationEncoding.values() + } + + @get:Rule + val serializationRule = CheckpointSerializationEnvironmentRule(inheritable = true) + private val context: CheckpointSerializationContext = CheckpointSerializationContextImpl( + deserializationClassLoader = javaClass.classLoader, + whitelist = AllWhitelist, + properties = emptyMap(), + objectReferencesEnabled = true, + encoding = compression, + encodingWhitelist = rigorousMock().also { + if (compression != null) doReturn(true).whenever(it).acceptEncoding(compression) + }, + checkpointCustomSerializers = listOf( + TestCorDapp.TestAbstractClassSerializer(), + TestCorDapp.TestClassSerializer(), + TestCorDapp.TestInterfaceSerializer(), + TestCorDapp.TestFinalClassSerializer(), + TestCorDapp.BrokenPublicKeySerializer() + ) + ) + + @Test(timeout=300_000) + fun `test custom checkpoint serialization`() { + testBrokenMapSerialization(DifficultToSerialize.BrokenMapClass()) + } + + @Test(timeout=300_000) + fun `test custom checkpoint serialization using interface`() { + testBrokenMapSerialization(DifficultToSerialize.BrokenMapInterfaceImpl()) + } + + @Test(timeout=300_000) + fun `test custom checkpoint serialization using abstract class`() { + testBrokenMapSerialization(DifficultToSerialize.BrokenMapAbstractImpl()) + } + + @Test(timeout=300_000) + fun `test custom checkpoint serialization using final class`() { + testBrokenMapSerialization(DifficultToSerialize.BrokenMapFinal()) + } + + @Test(timeout=300_000) + fun `test PublicKey serializer has not been overridden`() { + + val publicKey = generateKeyPair().public + + // Serialize/deserialize + val checkpoint = publicKey.checkpointSerialize(context) + val deserializedCheckpoint = checkpoint.checkpointDeserialize(context) + + // Check the elements are as expected + Assert.assertArrayEquals(publicKey.encoded, deserializedCheckpoint.encoded) + } + + + private fun testBrokenMapSerialization(brokenMap : MutableMap): MutableMap { + // Add elements to the map + brokenMap.putAll(mapOf("key" to "value")) + + // Serialize/deserialize + val checkpoint = brokenMap.checkpointSerialize(context) + val deserializedCheckpoint = checkpoint.checkpointDeserialize(context) + + // Check the elements are as expected + Assert.assertEquals(1, deserializedCheckpoint.size) + Assert.assertEquals("value", deserializedCheckpoint.get("key")) + + // Return map for extra checks + return deserializedCheckpoint + } +} + diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DifficultToSerialize.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DifficultToSerialize.kt new file mode 100644 index 0000000000..f272e71ebf --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DifficultToSerialize.kt @@ -0,0 +1,27 @@ +package net.corda.node.customcheckpointserializer + +import net.corda.core.flows.FlowException + +class DifficultToSerialize { + + // Broken Map + // This map breaks the rules for the put method. Making the normal map serializer fail. + + open class BrokenMapBaseImpl(delegate: MutableMap = mutableMapOf()) : MutableMap by delegate { + override fun put(key: K, value: V): V? = throw FlowException("Broken on purpose") + } + + // A class to test custom serializers applied to implementations + class BrokenMapClass : BrokenMapBaseImpl() + + // An interface and implementation to test custom serializers applied to interface types + interface BrokenMapInterface : MutableMap + class BrokenMapInterfaceImpl : BrokenMapBaseImpl(), BrokenMapInterface + + // An abstract class and implementation to test custom serializers applied to interface types + abstract class BrokenMapAbstract : BrokenMapBaseImpl(), MutableMap + class BrokenMapAbstractImpl : BrokenMapAbstract() + + // A final class + final class BrokenMapFinal: BrokenMapBaseImpl() +} diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogTest.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogTest.kt new file mode 100644 index 0000000000..2f87e1005f --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogTest.kt @@ -0,0 +1,59 @@ +package net.corda.node.customcheckpointserializer + +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.serialization.CheckpointCustomSerializer +import net.corda.core.utilities.getOrThrow +import net.corda.node.logging.logFile +import net.corda.testing.driver.driver +import org.assertj.core.api.Assertions +import org.junit.Test +import java.time.Duration + +class DuplicateSerializerLogTest{ + @Test(timeout=300_000) + fun `check duplicate serialisers are logged`() { + driver { + val node = startNode(startInSameProcess = false).getOrThrow() + node.rpc.startFlow(::TestFlow).returnValue.get() + + val text = node.logFile().readLines().filter { it.startsWith("[WARN") } + + // Initial message is correct + Assertions.assertThat(text).anyMatch {it.contains("Duplicate custom checkpoint serializer for type net.corda.node.customcheckpointserializer.DifficultToSerialize\$BrokenMapInterface. Serializers: ")} + // Message mentions TestInterfaceSerializer + Assertions.assertThat(text).anyMatch {it.contains("net.corda.node.customcheckpointserializer.TestCorDapp\$TestInterfaceSerializer")} + // Message mentions DuplicateSerializer + Assertions.assertThat(text).anyMatch {it.contains("net.corda.node.customcheckpointserializer.DuplicateSerializerLogTest\$DuplicateSerializer")} + } + } + + @StartableByRPC + @InitiatingFlow + class TestFlow : FlowLogic>() { + override fun call(): DifficultToSerialize.BrokenMapInterface { + val brokenMap: DifficultToSerialize.BrokenMapInterface = DifficultToSerialize.BrokenMapInterfaceImpl() + brokenMap.putAll(mapOf("test" to "input")) + + sleep(Duration.ofSeconds(0)) + + return brokenMap + } + } + + @Suppress("unused") + class DuplicateSerializer : + CheckpointCustomSerializer, HashMap> { + + override fun toProxy(obj: DifficultToSerialize.BrokenMapInterface): HashMap { + val proxy = HashMap() + return obj.toMap(proxy) + } + override fun fromProxy(proxy: HashMap): DifficultToSerialize.BrokenMapInterface { + return DifficultToSerialize.BrokenMapInterfaceImpl() + .also { it.putAll(proxy) } + } + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogWithSameSerializerTest.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogWithSameSerializerTest.kt new file mode 100644 index 0000000000..598b1ed401 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/DuplicateSerializerLogWithSameSerializerTest.kt @@ -0,0 +1,58 @@ +package net.corda.node.customcheckpointserializer + +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.serialization.CheckpointCustomSerializer +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.getOrThrow +import net.corda.node.logging.logFile +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.internal.enclosedCordapp +import org.assertj.core.api.Assertions +import org.junit.Test +import java.time.Duration + +class DuplicateSerializerLogWithSameSerializerTest { + @Test(timeout=300_000) + fun `check duplicate serialisers are logged not logged for the same class`() { + + // Duplicate the cordapp in this node + driver(DriverParameters(cordappsForAllNodes = listOf(this.enclosedCordapp(), this.enclosedCordapp()))) { + val node = startNode(startInSameProcess = false).getOrThrow() + node.rpc.startFlow(::TestFlow).returnValue.get() + + val text = node.logFile().readLines().filter { it.startsWith("[WARN") } + + // Initial message is not logged + Assertions.assertThat(text) + .anyMatch { !it.contains("Duplicate custom checkpoint serializer for type ") } + // Log does not mention DuplicateSerializerThatShouldNotBeLogged + Assertions.assertThat(text) + .anyMatch { !it.contains("DuplicateSerializerThatShouldNotBeLogged") } + } + } + + @CordaSerializable + class UnusedClass + + @Suppress("unused") + class DuplicateSerializerThatShouldNotBeLogged : CheckpointCustomSerializer { + override fun toProxy(obj: UnusedClass): String = "" + override fun fromProxy(proxy: String): UnusedClass = UnusedClass() + } + + @StartableByRPC + @InitiatingFlow + class TestFlow : FlowLogic() { + override fun call(): UnusedClass { + val unusedClass = UnusedClass() + + sleep(Duration.ofSeconds(0)) + + return unusedClass + } + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/MockNetworkCustomCheckpointSerializerTest.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/MockNetworkCustomCheckpointSerializerTest.kt new file mode 100644 index 0000000000..5bd60293c4 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/MockNetworkCustomCheckpointSerializerTest.kt @@ -0,0 +1,75 @@ +package net.corda.node.customcheckpointserializer + +import co.paralleluniverse.fibers.Suspendable +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.MockNetworkParameters +import org.assertj.core.api.Assertions +import org.junit.After +import org.junit.Before +import org.junit.Test + +class MockNetworkCustomCheckpointSerializerTest { + private lateinit var mockNetwork: MockNetwork + + @Before + fun setup() { + mockNetwork = MockNetwork(MockNetworkParameters(cordappsForAllNodes = listOf(TestCorDapp.getCorDapp()))) + } + + @After + fun shutdown() { + mockNetwork.stopNodes() + } + + @Test(timeout = 300_000) + fun `flow suspend with custom kryo serializer`() { + val node = mockNetwork.createPartyNode() + val expected = 5 + val actual = node.startFlow(TestCorDapp.TestFlowWithDifficultToSerializeLocalVariable(5)).get() + + Assertions.assertThat(actual).isEqualTo(expected) + } + + @Test(timeout = 300_000) + fun `check references are restored correctly`() { + val node = mockNetwork.createPartyNode() + val expectedReference = DifficultToSerialize.BrokenMapClass() + expectedReference.putAll(mapOf("one" to 1)) + val actualReference = node.startFlow(TestCorDapp.TestFlowCheckingReferencesWork(expectedReference)).get() + + Assertions.assertThat(actualReference).isSameAs(expectedReference) + Assertions.assertThat(actualReference["one"]).isEqualTo(1) + } + + @Test(timeout = 300_000) + @Suspendable + fun `check serialization of interfaces`() { + val node = mockNetwork.createPartyNode() + val result = node.startFlow(TestCorDapp.TestFlowWithDifficultToSerializeLocalVariableAsInterface(5)).get() + Assertions.assertThat(result).isEqualTo(5) + } + + @Test(timeout = 300_000) + @Suspendable + fun `check serialization of abstract classes`() { + val node = mockNetwork.createPartyNode() + val result = node.startFlow(TestCorDapp.TestFlowWithDifficultToSerializeLocalVariableAsAbstract(5)).get() + Assertions.assertThat(result).isEqualTo(5) + } + + @Test(timeout = 300_000) + @Suspendable + fun `check serialization of final classes`() { + val node = mockNetwork.createPartyNode() + val result = node.startFlow(TestCorDapp.TestFlowWithDifficultToSerializeLocalVariableAsFinal(5)).get() + Assertions.assertThat(result).isEqualTo(5) + } + + @Test(timeout = 300_000) + @Suspendable + fun `check PublicKey serializer has not been overridden`() { + val node = mockNetwork.createPartyNode() + val result = node.startFlow(TestCorDapp.TestFlowCheckingPublicKeySerializer()).get() + Assertions.assertThat(result.encoded).isEqualTo(node.info.legalIdentities.first().owningKey.encoded) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/ReferenceLoopTest.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/ReferenceLoopTest.kt new file mode 100644 index 0000000000..92a8d396c4 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/ReferenceLoopTest.kt @@ -0,0 +1,75 @@ +package net.corda.node.customcheckpointserializer + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.serialization.CheckpointCustomSerializer +import net.corda.core.serialization.EncodingWhitelist +import net.corda.core.serialization.internal.CheckpointSerializationContext +import net.corda.core.serialization.internal.checkpointDeserialize +import net.corda.core.serialization.internal.checkpointSerialize +import net.corda.coretesting.internal.rigorousMock +import net.corda.serialization.internal.AllWhitelist +import net.corda.serialization.internal.CheckpointSerializationContextImpl +import net.corda.serialization.internal.CordaSerializationEncoding +import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule +import org.junit.Assert +import org.junit.Rule +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(Parameterized::class) +class ReferenceLoopTest(private val compression: CordaSerializationEncoding?) { + companion object { + @Parameterized.Parameters(name = "{0}") + @JvmStatic + fun compression() = arrayOf(null) + CordaSerializationEncoding.values() + } + + @get:Rule + val serializationRule = CheckpointSerializationEnvironmentRule(inheritable = true) + private val context: CheckpointSerializationContext = CheckpointSerializationContextImpl( + deserializationClassLoader = javaClass.classLoader, + whitelist = AllWhitelist, + properties = emptyMap(), + objectReferencesEnabled = true, + encoding = compression, + encodingWhitelist = rigorousMock() + .also { + if (compression != null) doReturn(true).whenever(it) + .acceptEncoding(compression) + }, + checkpointCustomSerializers = listOf(PersonSerializer())) + + @Test(timeout=300_000) + fun `custom checkpoint serialization with reference loop`() { + val person = Person("Test name") + + val result = person.checkpointSerialize(context).checkpointDeserialize(context) + + Assert.assertEquals("Test name", result.name) + Assert.assertEquals("Test name", result.bestFriend.name) + Assert.assertSame(result, result.bestFriend) + } + + /** + * Test class that will hold a reference to itself + */ + class Person(val name: String, bestFriend: Person? = null) { + val bestFriend: Person = bestFriend ?: this + } + + /** + * Custom serializer for the Person class + */ + @Suppress("unused") + class PersonSerializer : CheckpointCustomSerializer> { + override fun toProxy(obj: Person): Map { + return mapOf("name" to obj.name, "bestFriend" to obj.bestFriend) + } + + override fun fromProxy(proxy: Map): Person { + return Person(proxy["name"] as String, proxy["bestFriend"] as Person?) + } + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/TestCorDapp.kt b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/TestCorDapp.kt new file mode 100644 index 0000000000..1d3e929dde --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/customcheckpointserializer/TestCorDapp.kt @@ -0,0 +1,214 @@ +package net.corda.node.customcheckpointserializer + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.serialization.CheckpointCustomSerializer +import net.corda.testing.node.internal.CustomCordapp +import net.corda.testing.node.internal.enclosedCordapp +import net.i2p.crypto.eddsa.EdDSAPublicKey +import org.assertj.core.api.Assertions +import java.security.PublicKey +import java.time.Duration + +/** + * Contains all the flows and custom serializers for testing custom checkpoint serializers + */ +class TestCorDapp { + + companion object { + fun getCorDapp(): CustomCordapp = enclosedCordapp() + } + + // Flows + @StartableByRPC + class TestFlowWithDifficultToSerializeLocalVariableAsAbstract(private val purchase: Int) : FlowLogic() { + @Suspendable + override fun call(): Int { + + // This object is difficult to serialize with Kryo + val difficultToSerialize: DifficultToSerialize.BrokenMapAbstract = DifficultToSerialize.BrokenMapAbstractImpl() + difficultToSerialize.putAll(mapOf("foo" to purchase)) + + // Force a checkpoint + sleep(Duration.ofSeconds(0)) + + // Return value from deserialized object + return difficultToSerialize["foo"] ?: 0 + } + } + + @StartableByRPC + class TestFlowWithDifficultToSerializeLocalVariableAsFinal(private val purchase: Int) : FlowLogic() { + @Suspendable + override fun call(): Int { + + // This object is difficult to serialize with Kryo + val difficultToSerialize: DifficultToSerialize.BrokenMapFinal = DifficultToSerialize.BrokenMapFinal() + difficultToSerialize.putAll(mapOf("foo" to purchase)) + + // Force a checkpoint + sleep(Duration.ofSeconds(0)) + + // Return value from deserialized object + return difficultToSerialize["foo"] ?: 0 + } + } + + @StartableByRPC + class TestFlowWithDifficultToSerializeLocalVariableAsInterface(private val purchase: Int) : FlowLogic() { + @Suspendable + override fun call(): Int { + + // This object is difficult to serialize with Kryo + val difficultToSerialize: DifficultToSerialize.BrokenMapInterface = DifficultToSerialize.BrokenMapInterfaceImpl() + difficultToSerialize.putAll(mapOf("foo" to purchase)) + + // Force a checkpoint + sleep(Duration.ofSeconds(0)) + + // Return value from deserialized object + return difficultToSerialize["foo"] ?: 0 + } + } + + @StartableByRPC + class TestFlowWithDifficultToSerializeLocalVariable(private val purchase: Int) : FlowLogic() { + @Suspendable + override fun call(): Int { + + // This object is difficult to serialize with Kryo + val difficultToSerialize: DifficultToSerialize.BrokenMapClass = DifficultToSerialize.BrokenMapClass() + difficultToSerialize.putAll(mapOf("foo" to purchase)) + + // Force a checkpoint + sleep(Duration.ofSeconds(0)) + + // Return value from deserialized object + return difficultToSerialize["foo"] ?: 0 + } + } + + @StartableByRPC + class TestFlowCheckingReferencesWork(private val reference: DifficultToSerialize.BrokenMapClass) : + FlowLogic>() { + + private val referenceField = reference + @Suspendable + override fun call(): DifficultToSerialize.BrokenMapClass { + + val ref = referenceField + + // Force a checkpoint + sleep(Duration.ofSeconds(0)) + + // Check all objects refer to same object + Assertions.assertThat(reference).isSameAs(referenceField) + Assertions.assertThat(referenceField).isSameAs(ref) + + // Return deserialized object + return ref + } + } + + + @StartableByRPC + class TestFlowCheckingPublicKeySerializer : + FlowLogic() { + + @Suspendable + override fun call(): PublicKey { + val ref = ourIdentity.owningKey + + // Force a checkpoint + sleep(Duration.ofSeconds(0)) + + // Return deserialized object + return ref + } + } + + // Custom serializers + + @Suppress("unused") + class TestInterfaceSerializer : + CheckpointCustomSerializer, HashMap> { + + override fun toProxy(obj: DifficultToSerialize.BrokenMapInterface): HashMap { + val proxy = HashMap() + return obj.toMap(proxy) + } + override fun fromProxy(proxy: HashMap): DifficultToSerialize.BrokenMapInterface { + return DifficultToSerialize.BrokenMapInterfaceImpl() + .also { it.putAll(proxy) } + } + } + + @Suppress("unused") + class TestClassSerializer : + CheckpointCustomSerializer, HashMap> { + + override fun toProxy(obj: DifficultToSerialize.BrokenMapClass): HashMap { + val proxy = HashMap() + return obj.toMap(proxy) + } + override fun fromProxy(proxy: HashMap): DifficultToSerialize.BrokenMapClass { + return DifficultToSerialize.BrokenMapClass() + .also { it.putAll(proxy) } + } + } + + @Suppress("unused") + class TestAbstractClassSerializer : + CheckpointCustomSerializer, HashMap> { + + override fun toProxy(obj: DifficultToSerialize.BrokenMapAbstract): HashMap { + val proxy = HashMap() + return obj.toMap(proxy) + } + override fun fromProxy(proxy: HashMap): DifficultToSerialize.BrokenMapAbstract { + return DifficultToSerialize.BrokenMapAbstractImpl() + .also { it.putAll(proxy) } + } + } + + @Suppress("unused") + class TestFinalClassSerializer : + CheckpointCustomSerializer, HashMap> { + + override fun toProxy(obj: DifficultToSerialize.BrokenMapFinal): HashMap { + val proxy = HashMap() + return obj.toMap(proxy) + } + override fun fromProxy(proxy: HashMap): DifficultToSerialize.BrokenMapFinal { + return DifficultToSerialize.BrokenMapFinal() + .also { it.putAll(proxy) } + } + } + + @Suppress("unused") + class BrokenPublicKeySerializer : + CheckpointCustomSerializer { + override fun toProxy(obj: PublicKey): String { + throw FlowException("Broken on purpose") + } + + override fun fromProxy(proxy: String): PublicKey { + throw FlowException("Broken on purpose") + } + } + + @Suppress("unused") + class BrokenEdDSAPublicKeySerializer : + CheckpointCustomSerializer { + override fun toProxy(obj: EdDSAPublicKey): String { + throw FlowException("Broken on purpose") + } + + override fun fromProxy(proxy: String): EdDSAPublicKey { + throw FlowException("Broken on purpose") + } + } + +} diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 77d95abacd..0c2552c37b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -644,8 +644,8 @@ open class Node(configuration: NodeConfiguration, storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), checkpointSerializer = KryoCheckpointSerializer, - checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader) - ) + checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader).withCheckpointCustomSerializers(cordappLoader.cordapps.flatMap { it.checkpointCustomSerializers }) + ) } /** Starts a blocking event loop for message dispatch. */ diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt index 97a5672846..bb2fce1a58 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt @@ -18,6 +18,7 @@ import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.node.services.CordaService import net.corda.core.schemas.MappedSchema +import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken @@ -185,6 +186,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: findServices(this), findWhitelists(url), findSerializers(this), + findCheckpointSerializers(this), findCustomSchemas(this), findAllFlows(this), url.url, @@ -334,6 +336,10 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: return scanResult.getClassesImplementingWithClassVersionCheck(SerializationCustomSerializer::class) } + private fun findCheckpointSerializers(scanResult: RestrictedScanResult): List> { + return scanResult.getClassesImplementingWithClassVersionCheck(CheckpointCustomSerializer::class) + } + private fun findCustomSchemas(scanResult: RestrictedScanResult): Set { return scanResult.getClassesWithSuperclass(MappedSchema::class).instances().toSet() } diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt index 3f9e3b85f9..5ad5add351 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt @@ -32,6 +32,7 @@ internal object VirtualCordapp { services = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), + checkpointCustomSerializers = listOf(), customSchemas = setOf(), info = Cordapp.Info.Default("corda-core", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"), allFlows = listOf(), @@ -55,6 +56,7 @@ internal object VirtualCordapp { services = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), + checkpointCustomSerializers = listOf(), customSchemas = setOf(NodeNotarySchemaV1), info = Cordapp.Info.Default("corda-notary", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"), allFlows = listOf(), @@ -78,6 +80,7 @@ internal object VirtualCordapp { services = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), + checkpointCustomSerializers = listOf(), customSchemas = setOf(RaftNotarySchemaV1), info = Cordapp.Info.Default("corda-notary-raft", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"), allFlows = listOf(), @@ -101,6 +104,7 @@ internal object VirtualCordapp { services = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), + checkpointCustomSerializers = listOf(), customSchemas = setOf(BFTSmartNotarySchemaV1), info = Cordapp.Info.Default("corda-notary-bft-smart", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"), allFlows = listOf(), diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/CheckpointSerializationScheme.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/CheckpointSerializationScheme.kt index b6c43ddc6d..f037e2dfbb 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/CheckpointSerializationScheme.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/CheckpointSerializationScheme.kt @@ -1,6 +1,7 @@ package net.corda.serialization.internal import net.corda.core.KeepForDJVM +import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.EncodingWhitelist import net.corda.core.serialization.SerializationEncoding @@ -13,7 +14,8 @@ data class CheckpointSerializationContextImpl @JvmOverloads constructor( override val properties: Map, override val objectReferencesEnabled: Boolean, override val encoding: SerializationEncoding?, - override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) : CheckpointSerializationContext { + override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist, + override val checkpointCustomSerializers: Iterable> = emptyList()) : CheckpointSerializationContext { override fun withProperty(property: Any, value: Any): CheckpointSerializationContext { return copy(properties = properties + (property to value)) } @@ -34,4 +36,6 @@ data class CheckpointSerializationContextImpl @JvmOverloads constructor( override fun withEncoding(encoding: SerializationEncoding?) = copy(encoding = encoding) override fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist) = copy(encodingWhitelist = encodingWhitelist) + override fun withCheckpointCustomSerializers(checkpointCustomSerializers : Iterable>) + = copy(checkpointCustomSerializers = checkpointCustomSerializers) } \ No newline at end of file diff --git a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/InternalSerializationTestHelpers.kt b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/InternalSerializationTestHelpers.kt index 61bf91aac9..116016b991 100644 --- a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/InternalSerializationTestHelpers.kt +++ b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/InternalSerializationTestHelpers.kt @@ -2,6 +2,7 @@ package net.corda.coretesting.internal import net.corda.nodeapi.internal.rpc.client.AMQPClientSerializationScheme import net.corda.core.internal.createInstancesOfClassesImplementing +import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.internal.SerializationEnvironment @@ -25,8 +26,11 @@ fun createTestSerializationEnv(): SerializationEnvironment { } fun createTestSerializationEnv(classLoader: ClassLoader?): SerializationEnvironment { + var customCheckpointSerializers: Set> = emptySet() val (clientSerializationScheme, serverSerializationScheme) = if (classLoader != null) { val customSerializers = createInstancesOfClassesImplementing(classLoader, SerializationCustomSerializer::class.java) + customCheckpointSerializers = createInstancesOfClassesImplementing(classLoader, CheckpointCustomSerializer::class.java) + val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, classLoader).toSet() Pair(AMQPClientSerializationScheme(customSerializers, serializationWhitelists), @@ -44,7 +48,7 @@ fun createTestSerializationEnv(classLoader: ClassLoader?): SerializationEnvironm AMQP_RPC_SERVER_CONTEXT, AMQP_RPC_CLIENT_CONTEXT, AMQP_STORAGE_CONTEXT, - KRYO_CHECKPOINT_CONTEXT, + KRYO_CHECKPOINT_CONTEXT.withCheckpointCustomSerializers(customCheckpointSerializers), KryoCheckpointSerializer ) } From 0842ea26b717508b8078c8f08142f7150f5a598f Mon Sep 17 00:00:00 2001 From: Waldemar Zurowski Date: Wed, 22 Jul 2020 19:43:08 +0100 Subject: [PATCH 11/14] NOTICK: Backport Jenkins configuration for nightly builds to release 4.4 --- .ci/dev/publish-branch/Jenkinsfile.nightly | 37 +++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/.ci/dev/publish-branch/Jenkinsfile.nightly b/.ci/dev/publish-branch/Jenkinsfile.nightly index b25549729e..485124ab66 100644 --- a/.ci/dev/publish-branch/Jenkinsfile.nightly +++ b/.ci/dev/publish-branch/Jenkinsfile.nightly @@ -1,11 +1,27 @@ #!groovy +/** + * Jenkins pipeline to build Corda OS nightly snapshots + */ + +/** + * Kill already started job. + * Assume new commit takes precendence and results from previous + * unfinished builds are not required. + * This feature doesn't play well with disableConcurrentBuilds() option + */ @Library('corda-shared-build-pipeline-steps') import static com.r3.build.BuildControl.killAllExistingBuildsForJob killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger()) +/* +** calculate the stage for NexusIQ evaluation +** * build for snapshots +*/ +def nexusIqStage = "build" + pipeline { - agent { label 'k8s' } + agent { label 'standard' } options { timestamps() @@ -28,6 +44,25 @@ pipeline { } stages { + stage('Sonatype Check') { + steps { + sh "./gradlew --no-daemon clean jar" + script { + sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties" + def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim() + def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim() + def artifactId = 'corda' + nexusAppId = "jenkins-${groupId}-${artifactId}-${version}" + } + nexusPolicyEvaluation ( + failBuildOnNetworkError: false, + iqApplication: manualApplication(nexusAppId), + iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']], + iqStage: nexusIqStage + ) + } + } + stage('Publish to Artifactory') { steps { rtServer ( From f60f06a85fc81febb65b8487a9bcd78df31fbbf5 Mon Sep 17 00:00:00 2001 From: Waldemar Zurowski Date: Wed, 22 Jul 2020 19:54:41 +0100 Subject: [PATCH 12/14] INFRA-508: Change appID in sonatype stage for Corda and Corda Enterprise --- .ci/dev/publish-branch/Jenkinsfile.nightly | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.ci/dev/publish-branch/Jenkinsfile.nightly b/.ci/dev/publish-branch/Jenkinsfile.nightly index 485124ab66..471825bd2d 100644 --- a/.ci/dev/publish-branch/Jenkinsfile.nightly +++ b/.ci/dev/publish-branch/Jenkinsfile.nightly @@ -49,14 +49,15 @@ pipeline { sh "./gradlew --no-daemon clean jar" script { sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties" - def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim() + /* every build related to Corda X.Y (GA, RC, HC, patch or snapshot) uses the same NexusIQ application */ + def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: \\([0-9]\\+\\.[0-9]\\+\\).*\$/\\1/'").trim() def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim() def artifactId = 'corda' nexusAppId = "jenkins-${groupId}-${artifactId}-${version}" } nexusPolicyEvaluation ( failBuildOnNetworkError: false, - iqApplication: manualApplication(nexusAppId), + iqApplication: selectedApplication(nexusAppId), // application *has* to exist before a build starts! iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']], iqStage: nexusIqStage ) From 14279a30cc7b720031a2378124ac3f9800177214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Waldemar=20=C5=BBurowski?= <45210402+wzur-r3@users.noreply.github.com> Date: Thu, 23 Jul 2020 10:36:48 +0100 Subject: [PATCH 13/14] INFRA-508: NexusIQ stage for GA releases updated to `release` (#6508) --- .ci/dev/regression/Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile index 4678bbda6c..b079bab2e1 100644 --- a/.ci/dev/regression/Jenkinsfile +++ b/.ci/dev/regression/Jenkinsfile @@ -30,7 +30,7 @@ if (isReleaseTag) { switch (env.TAG_NAME) { case ~/.*-RC\d+(-.*)?/: nexusIqStage = "stage-release"; break; case ~/.*-HC\d+(-.*)?/: nexusIqStage = "stage-release"; break; - default: nexusIqStage = "operate" + default: nexusIqStage = "release" } } From 2e6bd97fe951c360bb574b7ca305bd71f818997b Mon Sep 17 00:00:00 2001 From: Waldemar Zurowski Date: Thu, 23 Jul 2020 10:46:32 +0100 Subject: [PATCH 14/14] Updates * INFRA-508: NexusIQ stage for GA releases updated to `release` * do not truncate stdio when collecting JUnit tests --- .ci/dev/compatibility/JenkinsfileJDK11Azul | 4 ++-- .ci/dev/regression/Jenkinsfile | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.ci/dev/compatibility/JenkinsfileJDK11Azul b/.ci/dev/compatibility/JenkinsfileJDK11Azul index b692a4d736..af53b9fa84 100644 --- a/.ci/dev/compatibility/JenkinsfileJDK11Azul +++ b/.ci/dev/compatibility/JenkinsfileJDK11Azul @@ -30,7 +30,7 @@ if (isReleaseTag) { switch (env.TAG_NAME) { case ~/.*-RC\d+(-.*)?/: nexusIqStage = "stage-release"; break; case ~/.*-HC\d+(-.*)?/: nexusIqStage = "stage-release"; break; - default: nexusIqStage = "operate" + default: nexusIqStage = "release" } } @@ -165,7 +165,7 @@ pipeline { post { always { archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false - junit testResults: '**/build/test-results-xml/**/*.xml' + junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true } cleanup { deleteDir() /* clean up our workspace */ diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile index 7ba2ef2d6a..b9a952c1fb 100644 --- a/.ci/dev/regression/Jenkinsfile +++ b/.ci/dev/regression/Jenkinsfile @@ -174,7 +174,6 @@ pipeline { } } - post { always { archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false