From 19dad6da96b01d3e5a1c036c148cab17f9890bb2 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 16 Apr 2018 16:20:10 +0100 Subject: [PATCH] Add back deprecated functions --- .ci/api-current.txt | 9 +- .../kotlin/net/corda/core/flows/FlowLogic.kt | 104 ++++++++++++++++++ docs/source/api-flows.rst | 62 +++++++++++ .../java/net/corda/docs/FlowCookbookJava.java | 7 ++ .../kotlin/net/corda/docs/FlowCookbook.kt | 7 ++ .../net/corda/docs/LaunchSpaceshipFlow.kt | 99 +++++++++++++++++ .../transitions/TopLevelTransition.kt | 37 +++---- 7 files changed, 297 insertions(+), 28 deletions(-) create mode 100644 docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index fac366c8fa..3e83150ab4 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -1200,7 +1200,7 @@ public static final class net.corda.core.flows.FinalityFlow$Companion extends ja @org.jetbrains.annotations.NotNull public net.corda.core.utilities.ProgressTracker childProgressTracker() public static final net.corda.core.flows.FinalityFlow$Companion$NOTARISING INSTANCE ## -@net.corda.core.serialization.CordaSerializable public class net.corda.core.flows.FlowException extends net.corda.core.CordaException +@net.corda.core.serialization.CordaSerializable public class net.corda.core.flows.FlowException extends net.corda.core.CordaException implements net.corda.core.flows.IdentifiableException public () public (String) public (String, Throwable) @@ -1589,9 +1589,10 @@ public final class net.corda.core.flows.TransactionParts extends java.lang.Objec public int hashCode() public String toString() ## -@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException - public (String) - public (String, Throwable) +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException + public (String, Throwable, long) + @org.jetbrains.annotations.NotNull public Long getErrorId() + public final long getOriginalErrorId() ## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object public (java.security.PublicKey) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 5050b6253a..9eeddc32d8 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -134,6 +134,110 @@ abstract class FlowLogic { val ourIdentity: Party get() = stateMachine.ourIdentity + // Used to implement the deprecated send/receive functions using Party. When such a deprecated function is used we + // create a fresh session for the Party, put it here and use it in subsequent deprecated calls. + private val deprecatedPartySessionMap = HashMap() + private fun getDeprecatedSessionForParty(party: Party): FlowSession { + return deprecatedPartySessionMap.getOrPut(party) { initiateFlow(party) } + } + /** + * Returns a [FlowInfo] object describing the flow [otherParty] is using. With [FlowInfo.flowVersion] it + * provides the necessary information needed for the evolution of flows and enabling backwards compatibility. + * + * This method can be called before any send or receive has been done with [otherParty]. In such a case this will force + * them to start their flow. + */ + @Deprecated("Use FlowSession.getCounterpartyFlowInfo()", level = DeprecationLevel.WARNING) + @Suspendable + fun getFlowInfo(otherParty: Party): FlowInfo = getDeprecatedSessionForParty(otherParty).getCounterpartyFlowInfo() + + /** + * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response + * is received, which must be of the given [R] type. + * + * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly + * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly + * corrupted data in order to exploit your code. + * + * Note that this function is not just a simple send+receive pair: it is more efficient and more correct to + * use this when you expect to do a message swap than do use [send] and then [receive] in turn. + * + * @return an [UntrustworthyData] wrapper around the received object. + */ + @Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING) + inline fun sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData { + return sendAndReceive(R::class.java, otherParty, payload) + } + + /** + * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response + * is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data + * should not be trusted until it's been thoroughly verified for consistency and that all expectations are + * satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code. + * + * Note that this function is not just a simple send+receive pair: it is more efficient and more correct to + * use this when you expect to do a message swap than do use [send] and then [receive] in turn. + * + * @return an [UntrustworthyData] wrapper around the received object. + */ + @Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING) + @Suspendable + open fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any): UntrustworthyData { + return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload) + } + + /** + * Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received. + * + * Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead. + * It is only intended for the case where the [otherParty] is running a distributed service with an idempotent + * flow which only accepts a single request and sends back a single response – e.g. a notary or certain types of + * oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered + * to a different one, so there is no need to wait until the initial node comes back up to obtain a response. + */ + @Deprecated("Use FlowSession.sendAndReceiveWithRetry()", level = DeprecationLevel.WARNING) + internal inline fun sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData { + return getDeprecatedSessionForParty(otherParty).sendAndReceiveWithRetry(payload) + } + + /** + * Suspends until the specified [otherParty] sends us a message of type [R]. + * + * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly + * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly + * corrupted data in order to exploit your code. + */ + @Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING) + inline fun receive(otherParty: Party): UntrustworthyData = receive(R::class.java, otherParty) + + /** + * Suspends until the specified [otherParty] sends us a message of type [receiveType]. + * + * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly + * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly + * corrupted data in order to exploit your code. + * + * @return an [UntrustworthyData] wrapper around the received object. + */ + @Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING) + @Suspendable + open fun receive(receiveType: Class, otherParty: Party): UntrustworthyData { + return getDeprecatedSessionForParty(otherParty).receive(receiveType) + } + + /** + * Queues the given [payload] for sending to the [otherParty] and continues without suspending. + * + * Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty] + * is offline then message delivery will be retried until it comes back or until the message is older than the + * network's event horizon time. + */ + @Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING) + @Suspendable + open fun send(otherParty: Party, payload: Any) { + getDeprecatedSessionForParty(otherParty).send(payload) + } + @Suspendable internal fun FlowSession.sendAndReceiveWithRetry(receiveType: Class, payload: Any): UntrustworthyData { val request = FlowIORequest.SendAndReceive( diff --git a/docs/source/api-flows.rst b/docs/source/api-flows.rst index 6e80c7ae72..8f00b6bc0c 100644 --- a/docs/source/api-flows.rst +++ b/docs/source/api-flows.rst @@ -416,6 +416,68 @@ Our side of the flow must mirror these calls. We could do this as follows: :end-before: DOCEND 08 :dedent: 12 +Why sessions? +^^^^^^^^^^^^^ + +Before ``FlowSession`` s were introduced the send/receive API looked a bit different. They were functions on +``FlowLogic`` and took the address ``Party`` as argument. The platform internally maintained a mapping from ``Party`` to +session, hiding sessions from the user completely. + +Although this is a convenient API it introduces subtle issues where a message that was originally meant for a specific +session may end up in another. + +Consider the following contrived example using the old ``Party`` based API: + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt + :language: kotlin + :start-after: DOCSTART LaunchSpaceshipFlow + :end-before: DOCEND LaunchSpaceshipFlow + +The intention of the flows is very clear: LaunchSpaceshipFlow asks the president whether a spaceship should be launched. +It is expecting a boolean reply. The president in return first tells the secretary that they need coffee, which is also +communicated with a boolean. Afterwards the president replies to the launcher that they don't want to launch. + +However the above can go horribly wrong when the ``launcher`` happens to be the same party ``getSecretary`` returns. In +this case the boolean meant for the secretary will be received by the launcher! + +This indicates that ``Party`` is not a good identifier for the communication sequence, and indeed the ``Party`` based +API may introduce ways for an attacker to fish for information and even trigger unintended control flow like in the +above case. + +Hence we introduced ``FlowSession``, which identifies the communication sequence. With ``FlowSession`` s the above set +of flows would look like this: + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt + :language: kotlin + :start-after: DOCSTART LaunchSpaceshipFlowCorrect + :end-before: DOCEND LaunchSpaceshipFlowCorrect + +Note how the president is now explicit about which session it wants to send to. + +Porting from the old Party-based API +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In the old API the first ``send`` or ``receive`` to a ``Party`` was the one kicking off the counter-flow. This is now +explicit in the ``initiateFlow`` function call. To port existing code: + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt + :language: kotlin + :start-after: DOCSTART FlowSession porting + :end-before: DOCEND FlowSession porting + :dedent: 8 + + .. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java + :language: java + :start-after: DOCSTART FlowSession porting + :end-before: DOCEND FlowSession porting + :dedent: 12 + Subflows -------- Subflows are pieces of reusable flows that may be run by calling ``FlowLogic.subFlow``. There are two broad categories diff --git a/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java b/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java index d17615841d..bf2c58b858 100644 --- a/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java +++ b/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java @@ -582,6 +582,13 @@ public class FlowCookbookJava { SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker())); // DOCEND 10 + // DOCSTART FlowSession porting + send(regulator, new Object()); // Old API + // becomes + FlowSession session = initiateFlow(regulator); + session.send(new Object()); + // DOCEND FlowSession porting + return null; } } diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt index 880570e2df..0528caeaf3 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt @@ -572,6 +572,13 @@ class InitiatorFlow(val arg1: Boolean, val arg2: Int, private val counterparty: val additionalParties: Set = setOf(regulator) val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker())) // DOCEND 10 + + // DOCSTART FlowSession porting + send(regulator, Any()) // Old API + // becomes + val session = initiateFlow(regulator) + session.send(Any()) + // DOCEND FlowSession porting } } diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt new file mode 100644 index 0000000000..e6826fa213 --- /dev/null +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt @@ -0,0 +1,99 @@ +package net.corda.docs + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.identity.Party +import net.corda.core.utilities.unwrap + +// DOCSTART LaunchSpaceshipFlow +@InitiatingFlow +class LaunchSpaceshipFlow : FlowLogic() { + @Suspendable + override fun call() { + val shouldLaunchSpaceship = receive(getPresident()).unwrap { it } + if (shouldLaunchSpaceship) { + launchSpaceship() + } + } + + fun launchSpaceship() { + } + + fun getPresident(): Party { + TODO() + } +} + +@InitiatedBy(LaunchSpaceshipFlow::class) +@InitiatingFlow +class PresidentSpaceshipFlow(val launcher: Party) : FlowLogic() { + @Suspendable + override fun call() { + val needCoffee = true + send(getSecretary(), needCoffee) + val shouldLaunchSpaceship = false + send(launcher, shouldLaunchSpaceship) + } + + fun getSecretary(): Party { + TODO() + } +} + +@InitiatedBy(PresidentSpaceshipFlow::class) +class SecretaryFlow(val president: Party) : FlowLogic() { + @Suspendable + override fun call() { + // ignore + } +} +// DOCEND LaunchSpaceshipFlow + +// DOCSTART LaunchSpaceshipFlowCorrect +@InitiatingFlow +class LaunchSpaceshipFlowCorrect : FlowLogic() { + @Suspendable + override fun call() { + val presidentSession = initiateFlow(getPresident()) + val shouldLaunchSpaceship = presidentSession.receive().unwrap { it } + if (shouldLaunchSpaceship) { + launchSpaceship() + } + } + + fun launchSpaceship() { + } + + fun getPresident(): Party { + TODO() + } +} + +@InitiatedBy(LaunchSpaceshipFlowCorrect::class) +@InitiatingFlow +class PresidentSpaceshipFlowCorrect(val launcherSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val needCoffee = true + val secretarySession = initiateFlow(getSecretary()) + secretarySession.send(needCoffee) + val shouldLaunchSpaceship = false + launcherSession.send(shouldLaunchSpaceship) + } + + fun getSecretary(): Party { + TODO() + } +} + +@InitiatedBy(PresidentSpaceshipFlowCorrect::class) +class SecretaryFlowCorrect(val presidentSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + // ignore + } +} +// DOCEND LaunchSpaceshipFlowCorrect diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 684c74b1ab..5260942c36 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -130,30 +130,19 @@ class TopLevelTransition( flowState = FlowState.Started(event.ioRequest, event.fiber), numberOfSuspends = currentState.checkpoint.numberOfSuspends + 1 ) - if (event.maySkipCheckpoint) { - actions.addAll(arrayOf( - Action.CommitTransaction, - Action.ScheduleEvent(Event.DoRemainingWork) - )) - currentState = currentState.copy( - checkpoint = newCheckpoint, - isFlowResumed = false - ) - } else { - actions.addAll(arrayOf( - Action.PersistCheckpoint(context.id, newCheckpoint), - Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers), - Action.CommitTransaction, - Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers), - Action.ScheduleEvent(Event.DoRemainingWork) - )) - currentState = currentState.copy( - checkpoint = newCheckpoint, - pendingDeduplicationHandlers = emptyList(), - isFlowResumed = false, - isAnyCheckpointPersisted = true - ) - } + actions.addAll(arrayOf( + Action.PersistCheckpoint(context.id, newCheckpoint), + Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers), + Action.CommitTransaction, + Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers), + Action.ScheduleEvent(Event.DoRemainingWork) + )) + currentState = currentState.copy( + checkpoint = newCheckpoint, + pendingDeduplicationHandlers = emptyList(), + isFlowResumed = false, + isAnyCheckpointPersisted = true + ) FlowContinuation.ProcessEvents } }