Add back deprecated functions

This commit is contained in:
Andras Slemmer 2018-04-16 16:20:10 +01:00
parent ce5fb66260
commit 19dad6da96
7 changed files with 297 additions and 28 deletions

View File

@ -1200,7 +1200,7 @@ public static final class net.corda.core.flows.FinalityFlow$Companion extends ja
@org.jetbrains.annotations.NotNull public net.corda.core.utilities.ProgressTracker childProgressTracker()
public static final net.corda.core.flows.FinalityFlow$Companion$NOTARISING INSTANCE
##
@net.corda.core.serialization.CordaSerializable public class net.corda.core.flows.FlowException extends net.corda.core.CordaException
@net.corda.core.serialization.CordaSerializable public class net.corda.core.flows.FlowException extends net.corda.core.CordaException implements net.corda.core.flows.IdentifiableException
public <init>()
public <init>(String)
public <init>(String, Throwable)
@ -1589,9 +1589,10 @@ public final class net.corda.core.flows.TransactionParts extends java.lang.Objec
public int hashCode()
public String toString()
##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException
public <init>(String)
public <init>(String, Throwable)
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException
public <init>(String, Throwable, long)
@org.jetbrains.annotations.NotNull public Long getErrorId()
public final long getOriginalErrorId()
##
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object
public <init>(java.security.PublicKey)

View File

@ -134,6 +134,110 @@ abstract class FlowLogic<out T> {
val ourIdentity: Party get() = stateMachine.ourIdentity
// Used to implement the deprecated send/receive functions using Party. When such a deprecated function is used we
// create a fresh session for the Party, put it here and use it in subsequent deprecated calls.
private val deprecatedPartySessionMap = HashMap<Party, FlowSession>()
private fun getDeprecatedSessionForParty(party: Party): FlowSession {
return deprecatedPartySessionMap.getOrPut(party) { initiateFlow(party) }
}
/**
* Returns a [FlowInfo] object describing the flow [otherParty] is using. With [FlowInfo.flowVersion] it
* provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
*
* This method can be called before any send or receive has been done with [otherParty]. In such a case this will force
* them to start their flow.
*/
@Deprecated("Use FlowSession.getCounterpartyFlowInfo()", level = DeprecationLevel.WARNING)
@Suspendable
fun getFlowInfo(otherParty: Party): FlowInfo = getDeprecatedSessionForParty(otherParty).getCounterpartyFlowInfo()
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [R] type.
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<R> {
return sendAndReceive(R::class.java, otherParty, payload)
}
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
* should not be trusted until it's been thoroughly verified for consistency and that all expectations are
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
@Suspendable
open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload)
}
/**
* Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received.
*
* Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead.
* It is only intended for the case where the [otherParty] is running a distributed service with an idempotent
* flow which only accepts a single request and sends back a single response e.g. a notary or certain types of
* oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered
* to a different one, so there is no need to wait until the initial node comes back up to obtain a response.
*/
@Deprecated("Use FlowSession.sendAndReceiveWithRetry()", level = DeprecationLevel.WARNING)
internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
return getDeprecatedSessionForParty(otherParty).sendAndReceiveWithRetry(payload)
}
/**
* Suspends until the specified [otherParty] sends us a message of type [R].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*/
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty)
/**
* Suspends until the specified [otherParty] sends us a message of type [receiveType].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
@Suspendable
open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R> {
return getDeprecatedSessionForParty(otherParty).receive(receiveType)
}
/**
* Queues the given [payload] for sending to the [otherParty] and continues without suspending.
*
* Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty]
* is offline then message delivery will be retried until it comes back or until the message is older than the
* network's event horizon time.
*/
@Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING)
@Suspendable
open fun send(otherParty: Party, payload: Any) {
getDeprecatedSessionForParty(otherParty).send(payload)
}
@Suspendable
internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
val request = FlowIORequest.SendAndReceive(

View File

@ -416,6 +416,68 @@ Our side of the flow must mirror these calls. We could do this as follows:
:end-before: DOCEND 08
:dedent: 12
Why sessions?
^^^^^^^^^^^^^
Before ``FlowSession`` s were introduced the send/receive API looked a bit different. They were functions on
``FlowLogic`` and took the address ``Party`` as argument. The platform internally maintained a mapping from ``Party`` to
session, hiding sessions from the user completely.
Although this is a convenient API it introduces subtle issues where a message that was originally meant for a specific
session may end up in another.
Consider the following contrived example using the old ``Party`` based API:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
:language: kotlin
:start-after: DOCSTART LaunchSpaceshipFlow
:end-before: DOCEND LaunchSpaceshipFlow
The intention of the flows is very clear: LaunchSpaceshipFlow asks the president whether a spaceship should be launched.
It is expecting a boolean reply. The president in return first tells the secretary that they need coffee, which is also
communicated with a boolean. Afterwards the president replies to the launcher that they don't want to launch.
However the above can go horribly wrong when the ``launcher`` happens to be the same party ``getSecretary`` returns. In
this case the boolean meant for the secretary will be received by the launcher!
This indicates that ``Party`` is not a good identifier for the communication sequence, and indeed the ``Party`` based
API may introduce ways for an attacker to fish for information and even trigger unintended control flow like in the
above case.
Hence we introduced ``FlowSession``, which identifies the communication sequence. With ``FlowSession`` s the above set
of flows would look like this:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
:language: kotlin
:start-after: DOCSTART LaunchSpaceshipFlowCorrect
:end-before: DOCEND LaunchSpaceshipFlowCorrect
Note how the president is now explicit about which session it wants to send to.
Porting from the old Party-based API
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In the old API the first ``send`` or ``receive`` to a ``Party`` was the one kicking off the counter-flow. This is now
explicit in the ``initiateFlow`` function call. To port existing code:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
:language: kotlin
:start-after: DOCSTART FlowSession porting
:end-before: DOCEND FlowSession porting
:dedent: 8
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
:language: java
:start-after: DOCSTART FlowSession porting
:end-before: DOCEND FlowSession porting
:dedent: 12
Subflows
--------
Subflows are pieces of reusable flows that may be run by calling ``FlowLogic.subFlow``. There are two broad categories

View File

@ -582,6 +582,13 @@ public class FlowCookbookJava {
SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()));
// DOCEND 10
// DOCSTART FlowSession porting
send(regulator, new Object()); // Old API
// becomes
FlowSession session = initiateFlow(regulator);
session.send(new Object());
// DOCEND FlowSession porting
return null;
}
}

View File

@ -572,6 +572,13 @@ class InitiatorFlow(val arg1: Boolean, val arg2: Int, private val counterparty:
val additionalParties: Set<Party> = setOf(regulator)
val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()))
// DOCEND 10
// DOCSTART FlowSession porting
send(regulator, Any()) // Old API
// becomes
val session = initiateFlow(regulator)
session.send(Any())
// DOCEND FlowSession porting
}
}

View File

@ -0,0 +1,99 @@
package net.corda.docs
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.utilities.unwrap
// DOCSTART LaunchSpaceshipFlow
@InitiatingFlow
class LaunchSpaceshipFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val shouldLaunchSpaceship = receive<Boolean>(getPresident()).unwrap { it }
if (shouldLaunchSpaceship) {
launchSpaceship()
}
}
fun launchSpaceship() {
}
fun getPresident(): Party {
TODO()
}
}
@InitiatedBy(LaunchSpaceshipFlow::class)
@InitiatingFlow
class PresidentSpaceshipFlow(val launcher: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val needCoffee = true
send(getSecretary(), needCoffee)
val shouldLaunchSpaceship = false
send(launcher, shouldLaunchSpaceship)
}
fun getSecretary(): Party {
TODO()
}
}
@InitiatedBy(PresidentSpaceshipFlow::class)
class SecretaryFlow(val president: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// ignore
}
}
// DOCEND LaunchSpaceshipFlow
// DOCSTART LaunchSpaceshipFlowCorrect
@InitiatingFlow
class LaunchSpaceshipFlowCorrect : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val presidentSession = initiateFlow(getPresident())
val shouldLaunchSpaceship = presidentSession.receive<Boolean>().unwrap { it }
if (shouldLaunchSpaceship) {
launchSpaceship()
}
}
fun launchSpaceship() {
}
fun getPresident(): Party {
TODO()
}
}
@InitiatedBy(LaunchSpaceshipFlowCorrect::class)
@InitiatingFlow
class PresidentSpaceshipFlowCorrect(val launcherSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val needCoffee = true
val secretarySession = initiateFlow(getSecretary())
secretarySession.send(needCoffee)
val shouldLaunchSpaceship = false
launcherSession.send(shouldLaunchSpaceship)
}
fun getSecretary(): Party {
TODO()
}
}
@InitiatedBy(PresidentSpaceshipFlowCorrect::class)
class SecretaryFlowCorrect(val presidentSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// ignore
}
}
// DOCEND LaunchSpaceshipFlowCorrect

View File

@ -130,30 +130,19 @@ class TopLevelTransition(
flowState = FlowState.Started(event.ioRequest, event.fiber),
numberOfSuspends = currentState.checkpoint.numberOfSuspends + 1
)
if (event.maySkipCheckpoint) {
actions.addAll(arrayOf(
Action.CommitTransaction,
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
checkpoint = newCheckpoint,
isFlowResumed = false
)
} else {
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, newCheckpoint),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isAnyCheckpointPersisted = true
)
}
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, newCheckpoint),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isAnyCheckpointPersisted = true
)
FlowContinuation.ProcessEvents
}
}