mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
Updates TwoPartyTradeFlow to use the CollectSignaturesFlow.
This commit is contained in:
parent
f8e2766924
commit
f646936ab8
@ -61,7 +61,6 @@ import java.security.PublicKey
|
||||
* @param partiallySignedTx Transaction to collect the remaining signatures for
|
||||
*/
|
||||
// TODO: AbstractStateReplacementFlow needs updating to use this flow.
|
||||
// TODO: TwoPartyTradeFlow needs updating to use this flow.
|
||||
// TODO: Update this flow to handle randomly generated keys when that works is complete.
|
||||
class CollectSignaturesFlow(val partiallySignedTx: SignedTransaction,
|
||||
override val progressTracker: ProgressTracker = tracker()): FlowLogic<SignedTransaction>() {
|
||||
|
@ -89,9 +89,10 @@ Our flow has two parties (B and S for buyer and seller) and will proceed as foll
|
||||
2. B sends to S a ``SignedTransaction`` that includes the state as input, B's cash as input, the state with the new
|
||||
owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because
|
||||
it lacks a signature from S authorising movement of the asset.
|
||||
3. S signs it and *finalises* the transaction. This means sending it to the notary, which checks the transaction for
|
||||
validity, recording the transaction in the local vault, and then sending it back to B who also checks it and commits
|
||||
the transaction to their local vault.
|
||||
3. S signs the transaction and sends it back to B.
|
||||
4. B *finalises* the transaction by sending it to the notary who checks the transaction for validity,
|
||||
recording the transaction in B's local vault, and then sending it on to S who also checks it and commits
|
||||
the transaction to S's local vault.
|
||||
|
||||
You can find the implementation of this flow in the file ``finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt``.
|
||||
|
||||
@ -124,9 +125,6 @@ each side.
|
||||
val sellerOwnerKey: PublicKey
|
||||
)
|
||||
|
||||
data class SignaturesFromSeller(val sellerSig: DigitalSignature.WithKey,
|
||||
val notarySig: DigitalSignature.LegallyIdentifiable)
|
||||
|
||||
open class Seller(val otherParty: Party,
|
||||
val notaryNode: NodeInfo,
|
||||
val assetToSell: StateAndRef<OwnableState>,
|
||||
@ -156,8 +154,8 @@ simply flow messages or exceptions. The other two represent the buyer and seller
|
||||
Going through the data needed to become a seller, we have:
|
||||
|
||||
- ``otherParty: Party`` - the party with which you are trading.
|
||||
- ``notaryNode: NodeInfo`` - the entry in the network map for the chosen notary. See ":doc:`key-concepts-consensus-notaries`" for more
|
||||
information on notaries.
|
||||
- ``notaryNode: NodeInfo`` - the entry in the network map for the chosen notary. See
|
||||
":doc:`key-concepts-consensus-notaries`" for more information on notaries.
|
||||
- ``assetToSell: StateAndRef<OwnableState>`` - a pointer to the ledger entry that represents the thing being sold.
|
||||
- ``price: Amount<Currency>`` - the agreed on price that the asset is being sold for (without an issuer constraint).
|
||||
- ``myKey: PublicKey`` - the PublicKey part of the node's internal KeyPair that controls the asset being sold.
|
||||
@ -243,87 +241,111 @@ Let's implement the ``Seller.call`` method. This will be run when the flow is in
|
||||
:end-before: DOCEND 4
|
||||
:dedent: 4
|
||||
|
||||
Here we see the outline of the procedure. We receive a proposed trade transaction from the buyer and check that it's
|
||||
valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature
|
||||
so that the transaction is now signed by both the buyer and the seller. We then *finalise* this transaction by sending
|
||||
it to a notary to assert (with another signature) that the timestamp in the transaction (if any) is valid and there are no
|
||||
double spends. Finally, after the finalisation process is complete, we retrieve the now fully signed transaction from
|
||||
local storage. It will have the same ID as the one we started with but more signatures.
|
||||
We start by sending information about the asset we wish to sell to the buyer. We fill out the initial flow message with
|
||||
the trade info, and then call ``send``. which takes two arguments:
|
||||
|
||||
Let's fill out the ``receiveAndCheckProposedTransaction()`` method.
|
||||
- The party we wish to send the message to.
|
||||
- The payload being sent.
|
||||
|
||||
``send`` will serialise the payload and send it to the other party automatically.
|
||||
|
||||
Next, we call a *subflow* called ``SignTransactionFlow`` (see :ref:`subflows`). ``SignTransactionFlow`` automates the
|
||||
process of:
|
||||
|
||||
* Receiving a proposed trade transaction from the buyer, with the buyer's signature attached.
|
||||
* Checking that the proposed transaction is valid.
|
||||
* Calculating and attaching our own signature so that the transaction is now signed by both the buyer and the seller.
|
||||
* Sending the transaction back to the buyer.
|
||||
|
||||
The transaction then needs to be finalized. This is the the process of sending the transaction to a notary to assert
|
||||
(with another signature) that the timestamp in the transaction (if any) is valid and there are no double spends.
|
||||
In this flow, finalization is handled by the buyer, so we just wait for the signed transaction to appear in our
|
||||
transaction storage. It will have the same ID as the one we started with but more signatures.
|
||||
|
||||
Implementing the buyer
|
||||
----------------------
|
||||
|
||||
OK, let's do the same for the buyer side:
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 5
|
||||
:end-before: DOCEND 5
|
||||
:dedent: 4
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 1
|
||||
:end-before: DOCEND 1
|
||||
:dedent: 4
|
||||
|
||||
Let's break this down. We fill out the initial flow message with the trade info, and then call ``sendAndReceive``.
|
||||
This function takes a few arguments:
|
||||
This code is longer but no more complicated. Here are some things to pay attention to:
|
||||
|
||||
- The party on the other side.
|
||||
- The thing to send. It'll be serialised and sent automatically.
|
||||
- Finally a type argument, which is the kind of object we're expecting to receive from the other side. If we get
|
||||
back something else an exception is thrown.
|
||||
1. We do some sanity checking on the proposed trade transaction received from the seller to ensure we're being offered
|
||||
what we expected to be offered.
|
||||
2. We create a cash spend using ``VaultService.generateSpend``. You can read the vault documentation to learn more about this.
|
||||
3. We access the *service hub* as needed to access things that are transient and may change or be recreated
|
||||
whilst a flow is suspended, such as the wallet or the network map.
|
||||
4. We call ``CollectSignaturesFlow`` as a subflow to send the unfinished, still-invalid transaction to the seller so
|
||||
they can sign it and send it back to us.
|
||||
5. Last, we call ``FinalityFlow`` as a subflow to finalize the transaction.
|
||||
|
||||
Once ``sendAndReceive`` is called, the call method will be suspended into a continuation and saved to persistent
|
||||
storage. If the node crashes or is restarted, the flow will effectively continue as if nothing had happened. Your
|
||||
code may remain blocked inside such a call for seconds, minutes, hours or even days in the case of a flow that
|
||||
needs human interaction!
|
||||
As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite
|
||||
the fact that it takes minimal resources and can survive node restarts.
|
||||
|
||||
.. note:: There are a couple of rules you need to bear in mind when writing a class that will be used as a continuation.
|
||||
The first is that anything on the stack when the function is suspended will be stored into the heap and kept alive by
|
||||
the garbage collector. So try to avoid keeping enormous data structures alive unless you really have to. You can
|
||||
always use private methods to keep the stack uncluttered with temporary variables, or to avoid objects that
|
||||
Kryo is not able to serialise correctly.
|
||||
Flow sessions
|
||||
-------------
|
||||
|
||||
The second is that as well as being kept on the heap, objects reachable from the stack will be serialised. The state
|
||||
of the function call may be resurrected much later! Kryo doesn't require objects be marked as serialisable, but even so,
|
||||
doing things like creating threads from inside these calls would be a bad idea. They should only contain business
|
||||
logic and only do I/O via the methods exposed by the flow framework.
|
||||
It will be useful to describe how flows communicate with each other. A node may have many flows running at the same
|
||||
time, and perhaps communicating with the same counterparty node but for different purposes. Therefore flows need a
|
||||
way to segregate communication channels so that concurrent conversations between flows on the same set of nodes do
|
||||
not interfere with each other.
|
||||
|
||||
It's OK to keep references around to many large internal node services though: these will be serialised using a
|
||||
special token that's recognised by the platform, and wired up to the right instance when the continuation is
|
||||
loaded off disk again.
|
||||
To achieve this the flow framework initiates a new flow session each time a flow starts communicating with a ``Party``
|
||||
for the first time. A session is simply a pair of IDs, one for each side, to allow the node to route received messages to
|
||||
the correct flow. If the other side accepts the session request then subsequent sends and receives to that same ``Party``
|
||||
will use the same session. A session ends when either flow ends, whether as expected or pre-maturely. If a flow ends
|
||||
pre-maturely then the other side will be notified of that and they will also end, as the whole point of flows is a known
|
||||
sequence of message transfers. Flows end pre-maturely due to exceptions, and as described above, if that exception is
|
||||
``FlowException`` or a sub-type then it will propagate to the other side. Any other exception will not propagate.
|
||||
|
||||
The buyer is supposed to send us a transaction with all the right inputs/outputs/commands in response to the opening
|
||||
message, with their cash put into the transaction and their signature on it authorising the movement of the cash.
|
||||
Taking a step back, we mentioned that the other side has to accept the session request for there to be a communication
|
||||
channel. A node accepts a session request if it has registered the flow type (the fully-qualified class name) that is
|
||||
making the request - each session initiation includes the initiating flow type. The registration is done by a CorDapp
|
||||
which has made available the particular flow communication, using ``PluginServiceHub.registerServiceFlow``. This method
|
||||
specifies a flow factory for generating the counter-flow to any given initiating flow. If this registration doesn't exist
|
||||
then no further communication takes place and the initiating flow ends with an exception.
|
||||
|
||||
You get back a simple wrapper class, ``UntrustworthyData<SignedTransaction>``, which is just a marker class that reminds
|
||||
us that the data came from a potentially malicious external source and may have been tampered with or be unexpected in
|
||||
other ways. It doesn't add any functionality, but acts as a reminder to "scrub" the data before use.
|
||||
Going back to our buyer and seller flows, we need a way to initiate communication between the two. This is typically done
|
||||
with one side started manually using the ``startFlowDynamic`` RPC and this initiates the counter-flow on the other side.
|
||||
In this case it doesn't matter which flow is the initiator and which is the initiated. If we choose the seller side as
|
||||
the initiator then the buyer side would need to register their flow, perhaps with something like:
|
||||
|
||||
Our "scrubbing" has three parts:
|
||||
.. container:: codeset
|
||||
|
||||
1. Check that the expected signatures are present and correct. At this point we expect our own signature to be missing,
|
||||
because of course we didn't sign it yet, and also the signature of the notary because that must always come last.
|
||||
2. We resolve the transaction, which we will cover below.
|
||||
3. We verify that the transaction is paying us the demanded price.
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
Exception handling
|
||||
------------------
|
||||
class TwoPartyTradeFlowPlugin : CordaPluginRegistry() {
|
||||
override val servicePlugins = listOf(Function(TwoPartyTradeFlowService::Service))
|
||||
}
|
||||
|
||||
Flows can throw exceptions to prematurely terminate their execution. The flow framework gives special treatment to
|
||||
``FlowException`` and its subtypes. These exceptions are treated as error responses of the flow and are propagated
|
||||
to all counterparties it is communicating with. The receiving flows will throw the same exception the next time they do
|
||||
a ``receive`` or ``sendAndReceive`` and thus end the flow session. If the receiver was invoked via ``subFlow`` (details below)
|
||||
then the exception can be caught there enabling re-invocation of the sub-flow.
|
||||
object TwoPartyTradeFlowService {
|
||||
class Service(services: PluginServiceHub) {
|
||||
init {
|
||||
services.registerServiceFlow(TwoPartyTradeFlow.Seller::class.java) {
|
||||
TwoPartyTradeFlow.Buyer(
|
||||
it,
|
||||
notary = services.networkMapCache.notaryNodes[0].notaryIdentity,
|
||||
acceptablePrice = TODO(),
|
||||
typeToBuy = TODO())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
If the exception thrown by the erroring flow is not a ``FlowException`` it will still terminate but will not propagate to
|
||||
the other counterparties. Instead they will be informed the flow has terminated and will themselves be terminated with a
|
||||
generic exception.
|
||||
This is telling the buyer node to fire up an instance of ``TwoPartyTradeFlow.Buyer`` (the code in the lambda) when
|
||||
they receive a message from the initiating seller side of the flow (``TwoPartyTradeFlow.Seller::class.java``).
|
||||
|
||||
.. note:: A future version will extend this to give the node administrator more control on what to do with such erroring
|
||||
flows.
|
||||
.. _subflows:
|
||||
|
||||
Throwing a ``FlowException`` enables a flow to reject a piece of data it has received back to the sender. This is typically
|
||||
done in the ``unwrap`` method of the received ``UntrustworthyData``. In the above example the seller checks the price
|
||||
and throws ``FlowException`` if it's invalid. It's then up to the buyer to either try again with a better price or give up.
|
||||
|
||||
Sub-flows and finalisation
|
||||
--------------------------
|
||||
Sub-flows
|
||||
---------
|
||||
|
||||
Flows can be composed via nesting. Invoking a sub-flow looks similar to an ordinary function call:
|
||||
|
||||
@ -345,130 +367,124 @@ Flows can be composed via nesting. Invoking a sub-flow looks similar to an ordin
|
||||
subFlow(new FinalityFlow(unnotarisedTransaction))
|
||||
}
|
||||
|
||||
In this code snippet we are using the ``FinalityFlow`` to finish off the transaction. It will:
|
||||
Let's take a look at the three subflows we invoke in this flow.
|
||||
|
||||
FinalityFlow
|
||||
^^^^^^^^^^^^
|
||||
On the buyer side, we use ``FinalityFlow`` to finalise the transaction. It will:
|
||||
|
||||
* Send the transaction to the chosen notary and, if necessary, satisfy the notary that the transaction is valid.
|
||||
* Record the transaction in the local vault, if it is relevant (i.e. involves the owner of the node).
|
||||
* Send the fully signed transaction to the other participants for recording as well.
|
||||
|
||||
We simply create the flow object via its constructor, and then pass it to the ``subFlow`` method which
|
||||
returns the result of the flow's execution directly. Behind the scenes all this is doing is wiring up progress
|
||||
tracking (discussed more below) and then running the objects ``call`` method. Because the sub-flow might suspend,
|
||||
we must mark the method that invokes it as suspendable.
|
||||
|
||||
Going back to the previous code snippet, we use a sub-flow called ``ResolveTransactionsFlow``. This is
|
||||
responsible for downloading and checking all the dependencies of a transaction, which in Corda are always retrievable
|
||||
from the party that sent you a transaction that uses them. This flow returns a list of ``LedgerTransaction``
|
||||
objects, but we don't need them here so we just ignore the return value.
|
||||
|
||||
.. note:: Transaction dependency resolution assumes that the peer you got the transaction from has all of the
|
||||
dependencies itself. It must do, otherwise it could not have convinced itself that the dependencies were themselves
|
||||
valid. It's important to realise that requesting only the transactions we require is a privacy leak, because if
|
||||
we don't download a transaction from the peer, they know we must have already seen it before. Fixing this privacy
|
||||
leak will come later.
|
||||
|
||||
After the dependencies, we check the proposed trading transaction for validity by running the contracts for that as
|
||||
well (but having handled the fact that some signatures are missing ourselves).
|
||||
|
||||
.. warning:: If the seller stops before sending the finalised transaction to the buyer, the seller is left with a
|
||||
valid transaction but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing is not
|
||||
always a risk (as the seller may not gain anything from that sort of behaviour except a lawsuit), but if it is, a future
|
||||
version of the platform will allow you to ask the notary to send you the transaction as well, in case your counterparty
|
||||
does not. This is not the default because it reveals more private info to the notary.
|
||||
|
||||
Implementing the buyer
|
||||
----------------------
|
||||
We simply create the flow object via its constructor, and then pass it to the ``subFlow`` method which
|
||||
returns the result of the flow's execution directly. Behind the scenes all this is doing is wiring up progress
|
||||
tracking (discussed more below) and then running the object's ``call`` method. Because the sub-flow might suspend,
|
||||
we must mark the method that invokes it as suspendable.
|
||||
|
||||
OK, let's do the same for the buyer side:
|
||||
Within FinalityFlow, we use a further sub-flow called ``ResolveTransactionsFlow``. This is responsible for downloading
|
||||
and checking all the dependencies of a transaction, which in Corda are always retrievable from the party that sent you a
|
||||
transaction that uses them. This flow returns a list of ``LedgerTransaction`` objects.
|
||||
|
||||
.. note:: Transaction dependency resolution assumes that the peer you got the transaction from has all of the
|
||||
dependencies itself. It must do, otherwise it could not have convinced itself that the dependencies were themselves
|
||||
valid. It's important to realise that requesting only the transactions we require is a privacy leak, because if
|
||||
we don't download a transaction from the peer, they know we must have already seen it before. Fixing this privacy
|
||||
leak will come later.
|
||||
|
||||
CollectSignaturesFlow/SignTransactionFlow
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
We also invoke two other subflows:
|
||||
|
||||
* ``CollectSignaturesFlow``, on the buyer side
|
||||
* ``SignTransactionFlow``, on the seller side
|
||||
|
||||
These flows communicate to gather all the required signatures for the proposed transaction. ``CollectSignaturesFlow``
|
||||
will:
|
||||
|
||||
* Verify any signatures collected on the transaction so far
|
||||
* Verify the transaction itself
|
||||
* Send the transaction to the remaining required signers and receive back their signatures
|
||||
* Verify the collected signatures
|
||||
|
||||
``SignTransactionFlow`` responds by:
|
||||
|
||||
* Receiving the partially-signed transaction off the wire
|
||||
* Verifying the existing signatures
|
||||
* Resolving the transaction's dependencies
|
||||
* Verifying the transaction itself
|
||||
* Running any custom validation logic
|
||||
* Sending their signature back to the buyer
|
||||
* Waiting for the transaction to be recorded in their vault
|
||||
|
||||
We cannot instantiate ``SignTransactionFlow`` itself, as it's an abstract class. Instead, we need to subclass it and
|
||||
override ``checkTransaction()`` to add our own custom validation logic:
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 1
|
||||
:end-before: DOCEND 1
|
||||
:dedent: 4
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 5
|
||||
:end-before: DOCEND 5
|
||||
:dedent: 12
|
||||
|
||||
This code is longer but no more complicated. Here are some things to pay attention to:
|
||||
In this case, our custom validation logic ensures that the amount of cash outputs in the transaction equals the
|
||||
price of the asset.
|
||||
|
||||
1. We do some sanity checking on the received message to ensure we're being offered what we expected to be offered.
|
||||
2. We create a cash spend using ``VaultService.generateSpend``. You can read the vault documentation to learn more about this.
|
||||
3. We access the *service hub* when we need it to access things that are transient and may change or be recreated
|
||||
whilst a flow is suspended, things like the wallet or the network map.
|
||||
4. We send the unfinished, invalid transaction to the seller so they can sign it and finalise it.
|
||||
5. Finally, we wait for the finished transaction to arrive in our local storage and vault.
|
||||
Persisting flows
|
||||
----------------
|
||||
|
||||
As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite
|
||||
the fact that it takes minimal resources and can survive node restarts.
|
||||
If you look at the code for ``FinalityFlow``, ``CollectSignaturesFlow`` and ``SignTransactionFlow``, you'll see calls
|
||||
to both ``receive`` and ``sendAndReceive``. Once either of these methods is called, the ``call`` method will be
|
||||
suspended into a continuation and saved to persistent storage. If the node crashes or is restarted, the flow will
|
||||
effectively continue as if nothing had happened. Your code may remain blocked inside such a call for seconds,
|
||||
minutes, hours or even days in the case of a flow that needs human interaction!
|
||||
|
||||
Flow sessions
|
||||
-------------
|
||||
.. note:: There are a couple of rules you need to bear in mind when writing a class that will be used as a continuation.
|
||||
The first is that anything on the stack when the function is suspended will be stored into the heap and kept alive by
|
||||
the garbage collector. So try to avoid keeping enormous data structures alive unless you really have to. You can
|
||||
always use private methods to keep the stack uncluttered with temporary variables, or to avoid objects that
|
||||
Kryo is not able to serialise correctly.
|
||||
|
||||
Before going any further it will be useful to describe how flows communicate with each other. A node may have many flows
|
||||
running at the same time, and perhaps communicating with the same counterparty node but for different purposes. Therefore
|
||||
flows need a way to segregate communication channels so that concurrent conversations between flows on the same set of nodes
|
||||
do not interfere with each other.
|
||||
The second is that as well as being kept on the heap, objects reachable from the stack will be serialised. The state
|
||||
of the function call may be resurrected much later! Kryo doesn't require objects be marked as serialisable, but even so,
|
||||
doing things like creating threads from inside these calls would be a bad idea. They should only contain business
|
||||
logic and only do I/O via the methods exposed by the flow framework.
|
||||
|
||||
To achieve this the flow framework initiates a new flow session each time a flow starts communicating with a ``Party``
|
||||
for the first time. A session is simply a pair of IDs, one for each side, to allow the node to route received messages to
|
||||
the correct flow. If the other side accepts the session request then subsequent sends and receives to that same ``Party``
|
||||
will use the same session. A session ends when either flow ends, whether as expected or pre-maturely. If a flow ends
|
||||
pre-maturely then the other side will be notified of that and they will also end, as the whole point of flows is a known
|
||||
sequence of message transfers. Flows end pre-maturely due to exceptions, and as described above, if that exception is
|
||||
``FlowException`` or a sub-type then it will propagate to the other side. Any other exception will not propagate.
|
||||
It's OK to keep references around to many large internal node services though: these will be serialised using a
|
||||
special token that's recognised by the platform, and wired up to the right instance when the continuation is
|
||||
loaded off disk again.
|
||||
|
||||
Taking a step back, we mentioned that the other side has to accept the session request for there to be a communication
|
||||
channel. A node accepts a session request if it has registered the flow type (the fully-qualified class name) that is
|
||||
making the request - each session initiation includes the initiating flow type. This registration is done automatically
|
||||
by the node at startup by searching for flows which are annotated with ``@InitiatedBy``. This annotation points to the
|
||||
flow that is doing the initiating, and this flow must be annotated with ``@InitiatingFlow``. The ``InitiatedBy`` flow
|
||||
must have a constructor which takes in a single parameter of type ``Party`` - this is the initiating party.
|
||||
``receive`` and ``sendAndReceive`` return a simple wrapper class, ``UntrustworthyData<T>``, which is
|
||||
just a marker class that reminds us that the data came from a potentially malicious external source and may have been
|
||||
tampered with or be unexpected in other ways. It doesn't add any functionality, but acts as a reminder to "scrub"
|
||||
the data before use.
|
||||
|
||||
Going back to our buyer and seller flows, we need a way to initiate communication between the two. This is typically done
|
||||
with one side started manually using the ``startFlowDynamic`` RPC and this initiates the flow on the other side. In our
|
||||
case it doesn't matter which flow is the initiator and which is the initiated, which is why neither ``Buyer`` nor ``Seller``
|
||||
are annotated with ``InitiatedBy`` or ``InitiatingFlow``. If we, for example, choose the seller side as the initiator then
|
||||
we need to create a simple seller starter flow that has the annotation we need:
|
||||
Exception handling
|
||||
------------------
|
||||
|
||||
.. container:: codeset
|
||||
Flows can throw exceptions to prematurely terminate their execution. The flow framework gives special treatment to
|
||||
``FlowException`` and its subtypes. These exceptions are treated as error responses of the flow and are propagated
|
||||
to all counterparties it is communicating with. The receiving flows will throw the same exception the next time they do
|
||||
a ``receive`` or ``sendAndReceive`` and thus end the flow session. If the receiver was invoked via ``subFlow`` (details below)
|
||||
then the exception can be caught there enabling re-invocation of the sub-flow.
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
If the exception thrown by the erroring flow is not a ``FlowException`` it will still terminate but will not propagate to
|
||||
the other counterparties. Instead they will be informed the flow has terminated and will themselves be terminated with a
|
||||
generic exception.
|
||||
|
||||
@InitiatingFlow
|
||||
class SellerInitiator(val buyer: Party,
|
||||
val notary: NodeInfo,
|
||||
val assetToSell: StateAndRef<OwnableState>,
|
||||
val price: Amount<Currency>) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
send(buyer, Pair(notary.notaryIdentity, price))
|
||||
return subFlow(Seller(
|
||||
buyer,
|
||||
notary,
|
||||
assetToSell,
|
||||
price,
|
||||
serviceHub.legalIdentityKey))
|
||||
}
|
||||
}
|
||||
.. note:: A future version will extend this to give the node administrator more control on what to do with such erroring
|
||||
flows.
|
||||
|
||||
The buyer side would look something like this. Notice the constructor takes in a single ``Party`` object which represents
|
||||
the seller.
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
@InitiatedBy(SellerInitiator::class)
|
||||
class BuyerAcceptor(val seller: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val (notary, price) = receive<Pair<Party, Amount<Currency>>>(seller).unwrap {
|
||||
require(serviceHub.networkMapCache.isNotary(it.first)) { "${it.first} is not a notary" }
|
||||
it
|
||||
}
|
||||
subFlow(Buyer(seller, notary, price, CommercialPaper.State::class.java))
|
||||
}
|
||||
}
|
||||
Throwing a ``FlowException`` enables a flow to reject a piece of data it has received back to the sender. This is typically
|
||||
done in the ``unwrap`` method of the received ``UntrustworthyData``. In the above example the seller checks the price
|
||||
and throws ``FlowException`` if it's invalid. It's then up to the buyer to either try again with a better price or give up.
|
||||
|
||||
.. _progress-tracking:
|
||||
|
||||
@ -497,22 +513,28 @@ A flow might declare some steps with code inside the flow class like this:
|
||||
.. sourcecode:: java
|
||||
|
||||
private final ProgressTracker progressTracker = new ProgressTracker(
|
||||
CONSTRUCTING_OFFER,
|
||||
SENDING_OFFER_AND_RECEIVING_PARTIAL_TRANSACTION,
|
||||
VERIFYING
|
||||
RECEIVING,
|
||||
VERIFYING,
|
||||
SIGNING,
|
||||
COLLECTING_SIGNATURES,
|
||||
RECORDING
|
||||
);
|
||||
|
||||
private static final ProgressTracker.Step CONSTRUCTING_OFFER = new ProgressTracker.Step(
|
||||
"Constructing proposed purchase order.");
|
||||
private static final ProgressTracker.Step SENDING_OFFER_AND_RECEIVING_PARTIAL_TRANSACTION = new ProgressTracker.Step(
|
||||
"Sending purchase order to seller for review, and receiving partially signed transaction from seller in return.");
|
||||
private static final ProgressTracker.Step RECEIVING = new ProgressTracker.Step(
|
||||
"Waiting for seller trading info");
|
||||
private static final ProgressTracker.Step VERIFYING = new ProgressTracker.Step(
|
||||
"Verifying signatures and contract constraints.");
|
||||
"Verifying seller assets");
|
||||
private static final ProgressTracker.Step SIGNING = new ProgressTracker.Step(
|
||||
"Generating and signing transaction proposal");
|
||||
private static final ProgressTracker.Step COLLECTING_SIGNATURES = new ProgressTracker.Step(
|
||||
"Collecting signatures from other parties");
|
||||
private static final ProgressTracker.Step RECORDING = new ProgressTracker.Step(
|
||||
"Recording completed transaction");
|
||||
|
||||
Each step exposes a label. By default labels are fixed, but by subclassing ``RelabelableStep``
|
||||
you can make a step that can update its label on the fly. That's useful for steps that want to expose non-structured
|
||||
progress information like the current file being downloaded. By defining your own step types, you can export progress
|
||||
in a way that's both human readable and machine readable.
|
||||
Each step exposes a label. By default labels are fixed, but by subclassing ``RelabelableStep`` you can make a step
|
||||
that can update its label on the fly. That's useful for steps that want to expose non-structured progress information
|
||||
like the current file being downloaded. By defining your own step types, you can export progress in a way that's both
|
||||
human readable and machine readable.
|
||||
|
||||
Progress trackers are hierarchical. Each step can be the parent for another tracker. By altering the
|
||||
``ProgressTracker.childrenFor`` map, a tree of steps can be created. It's allowed to alter the hierarchy
|
||||
@ -531,9 +553,9 @@ steps by overriding the ``Step`` class like this:
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
private static final ProgressTracker.Step COMMITTING = new ProgressTracker.Step("Committing to the ledger.") {
|
||||
private static final ProgressTracker.Step VERIFYING_AND_SIGNING = new ProgressTracker.Step("Verifying and signing transaction proposal") {
|
||||
@Nullable @Override public ProgressTracker childProgressTracker() {
|
||||
return FinalityFlow.Companion.tracker();
|
||||
return SignTransactionFlow.Companion.tracker();
|
||||
}
|
||||
};
|
||||
|
||||
@ -591,4 +613,4 @@ the features we have planned:
|
||||
* Being able to interact with people, either via some sort of external ticketing system, or email, or a custom UI.
|
||||
For example to implement human transaction authorisations.
|
||||
* A standard library of flows that can be easily sub-classed by local developers in order to integrate internal
|
||||
reporting logic, or anything else that might be required as part of a communications lifecycle.
|
||||
reporting logic, or anything else that might be required as part of a communications lifecycle.
|
@ -3,7 +3,6 @@ package net.corda.flows
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.asset.sumCashBy
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
@ -13,9 +12,7 @@ import net.corda.core.seconds
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
@ -36,8 +33,6 @@ import java.util.*
|
||||
* that represents an atomic asset swap.
|
||||
*
|
||||
* Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine.
|
||||
*
|
||||
* TODO: Refactor this using the [CollectSignaturesFlow]. Note. It requires a large docsite update!
|
||||
*/
|
||||
object TwoPartyTradeFlow {
|
||||
// TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this
|
||||
@ -57,10 +52,6 @@ object TwoPartyTradeFlow {
|
||||
val sellerOwnerKey: PublicKey
|
||||
)
|
||||
|
||||
@CordaSerializable
|
||||
data class SignaturesFromSeller(val sellerSig: DigitalSignature.WithKey,
|
||||
val notarySig: DigitalSignature.WithKey)
|
||||
|
||||
open class Seller(val otherParty: PartyAndCertificate,
|
||||
val notaryNode: NodeInfo,
|
||||
val assetToSell: StateAndRef<OwnableState>,
|
||||
@ -70,62 +61,42 @@ object TwoPartyTradeFlow {
|
||||
|
||||
companion object {
|
||||
object AWAITING_PROPOSAL : ProgressTracker.Step("Awaiting transaction proposal")
|
||||
object VERIFYING : ProgressTracker.Step("Verifying transaction proposal")
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
// DOCSTART 3
|
||||
object COMMITTING : ProgressTracker.Step("Committing transaction to the ledger") {
|
||||
override fun childProgressTracker() = FinalityFlow.tracker()
|
||||
object VERIFYING_AND_SIGNING : ProgressTracker.Step("Verifying and signing transaction proposal") {
|
||||
override fun childProgressTracker() = SignTransactionFlow.tracker()
|
||||
}
|
||||
|
||||
// DOCEND 3
|
||||
object SENDING_FINAL_TX : ProgressTracker.Step("Sending final transaction to buyer")
|
||||
|
||||
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, COMMITTING, SENDING_FINAL_TX)
|
||||
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING_AND_SIGNING)
|
||||
}
|
||||
|
||||
// DOCSTART 4
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val partialSTX: SignedTransaction = receiveAndCheckProposedTransaction()
|
||||
val ourSignature = calculateOurSignature(partialSTX)
|
||||
val unnotarisedSTX: SignedTransaction = partialSTX + ourSignature
|
||||
val finishedSTX = subFlow(FinalityFlow(unnotarisedSTX)).single()
|
||||
return finishedSTX
|
||||
}
|
||||
// DOCEND 4
|
||||
|
||||
// DOCSTART 5
|
||||
@Suspendable
|
||||
private fun receiveAndCheckProposedTransaction(): SignedTransaction {
|
||||
progressTracker.currentStep = AWAITING_PROPOSAL
|
||||
|
||||
// Make the first message we'll send to kick off the flow.
|
||||
val hello = SellerTradeInfo(assetToSell, price, myKey)
|
||||
// What we get back from the other side is a transaction that *might* be valid and acceptable to us,
|
||||
// but we must check it out thoroughly before we sign!
|
||||
val untrustedSTX = sendAndReceive<SignedTransaction>(otherParty, hello)
|
||||
send(otherParty, hello)
|
||||
|
||||
progressTracker.currentStep = VERIFYING
|
||||
return untrustedSTX.unwrap {
|
||||
// Check that the tx proposed by the buyer is valid.
|
||||
val wtx: WireTransaction = it.verifySignatures(myKey, notaryNode.notaryIdentity.owningKey)
|
||||
logger.trace { "Received partially signed transaction: ${it.id}" }
|
||||
|
||||
// Download and check all the things that this transaction depends on and verify it is contract-valid,
|
||||
// even though it is missing signatures.
|
||||
subFlow(ResolveTransactionsFlow(wtx, otherParty))
|
||||
|
||||
if (wtx.outputs.map { it.data }.sumCashBy(AnonymousParty(myKey)).withoutIssuer() != price)
|
||||
throw FlowException("Transaction is not sending us the right amount of cash")
|
||||
|
||||
it
|
||||
// Verify and sign the transaction.
|
||||
progressTracker.currentStep = VERIFYING_AND_SIGNING
|
||||
// DOCSTART 5
|
||||
val signTransactionFlow = object : SignTransactionFlow(otherParty, VERIFYING_AND_SIGNING.childProgressTracker()) {
|
||||
override fun checkTransaction(stx: SignedTransaction) {
|
||||
if (stx.tx.outputs.map { it.data }.sumCashBy(AnonymousParty(myKey)).withoutIssuer() != price)
|
||||
throw FlowException("Transaction is not sending us the right amount of cash")
|
||||
}
|
||||
}
|
||||
return subFlow(signTransactionFlow)
|
||||
// DOCEND 5
|
||||
}
|
||||
// DOCEND 5
|
||||
// DOCEND 4
|
||||
|
||||
// Following comment moved here so that it doesn't appear in the docsite:
|
||||
// There are all sorts of funny games a malicious secondary might play with it sends maybeSTX (in
|
||||
// receiveAndCheckProposedTransaction), we should fix them:
|
||||
// There are all sorts of funny games a malicious secondary might play with it sends maybeSTX,
|
||||
// we should fix them:
|
||||
//
|
||||
// - This tx may attempt to send some assets we aren't intending to sell to the secondary, if
|
||||
// we're reusing keys! So don't reuse keys!
|
||||
@ -134,11 +105,6 @@ object TwoPartyTradeFlow {
|
||||
//
|
||||
// but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to
|
||||
// express flow state machines on top of the messaging layer.
|
||||
|
||||
open fun calculateOurSignature(partialTX: SignedTransaction): DigitalSignature.WithKey {
|
||||
progressTracker.currentStep = SIGNING
|
||||
return serviceHub.createSignature(partialTX, myKey)
|
||||
}
|
||||
}
|
||||
|
||||
open class Buyer(val otherParty: PartyAndCertificate,
|
||||
@ -149,10 +115,15 @@ object TwoPartyTradeFlow {
|
||||
object RECEIVING : ProgressTracker.Step("Waiting for seller trading info")
|
||||
object VERIFYING : ProgressTracker.Step("Verifying seller assets")
|
||||
object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal")
|
||||
object SENDING_SIGNATURES : ProgressTracker.Step("Sending signatures to the seller")
|
||||
object WAITING_FOR_TX : ProgressTracker.Step("Waiting for the transaction to finalise.")
|
||||
object COLLECTING_SIGNATURES : ProgressTracker.Step("Collecting signatures from other parties") {
|
||||
override fun childProgressTracker() = CollectSignaturesFlow.tracker()
|
||||
}
|
||||
object RECORDING : ProgressTracker.Step("Recording completed transaction") {
|
||||
// TODO: Currently triggers a race condition on Team City. See https://github.com/corda/corda/issues/733.
|
||||
// override fun childProgressTracker() = FinalityFlow.tracker()
|
||||
}
|
||||
|
||||
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SENDING_SIGNATURES, WAITING_FOR_TX)
|
||||
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, COLLECTING_SIGNATURES, RECORDING)
|
||||
// DOCEND 2
|
||||
|
||||
// DOCSTART 1
|
||||
@ -165,16 +136,16 @@ object TwoPartyTradeFlow {
|
||||
// Put together a proposed transaction that performs the trade, and sign it.
|
||||
progressTracker.currentStep = SIGNING
|
||||
val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest)
|
||||
val stx = signWithOurKeys(cashSigningPubKeys, ptx)
|
||||
val partSignedTx = signWithOurKeys(cashSigningPubKeys, ptx)
|
||||
|
||||
// Send the signed transaction to the seller, who must then sign it themselves and commit
|
||||
// it to the ledger by sending it to the notary.
|
||||
progressTracker.currentStep = SENDING_SIGNATURES
|
||||
send(otherParty, stx)
|
||||
progressTracker.currentStep = COLLECTING_SIGNATURES
|
||||
val twiceSignedTx = subFlow(CollectSignaturesFlow(partSignedTx, COLLECTING_SIGNATURES.childProgressTracker()))
|
||||
|
||||
// Wait for the finished, notarised transaction to arrive in our transaction store.
|
||||
progressTracker.currentStep = WAITING_FOR_TX
|
||||
return waitForLedgerCommit(stx.id)
|
||||
// Notarise and record the transaction.
|
||||
progressTracker.currentStep = RECORDING
|
||||
return subFlow(FinalityFlow(twiceSignedTx, setOf(otherParty, serviceHub.myInfo.legalIdentity))).single()
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -186,7 +157,6 @@ object TwoPartyTradeFlow {
|
||||
// What is the seller trying to sell us?
|
||||
val asset = it.assetForSale.state.data
|
||||
val assetTypeName = asset.javaClass.name
|
||||
logger.trace { "Got trade request for a $assetTypeName: ${it.assetForSale}" }
|
||||
|
||||
if (it.price > acceptablePrice)
|
||||
throw UnacceptablePriceException(it.price)
|
||||
@ -233,4 +203,4 @@ object TwoPartyTradeFlow {
|
||||
}
|
||||
// DOCEND 1
|
||||
}
|
||||
}
|
||||
}
|
@ -308,7 +308,7 @@ class TwoPartyTradeFlowTests {
|
||||
attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
}
|
||||
|
||||
val extraKey = bobNode.keyManagement.freshKey()
|
||||
val extraKey = bobNode.keyManagement.keys.single()
|
||||
val bobsFakeCash = fillUpForBuyer(false, AnonymousParty(extraKey),
|
||||
DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
@ -407,7 +407,7 @@ class TwoPartyTradeFlowTests {
|
||||
attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
}
|
||||
|
||||
val bobsKey = bobNode.keyManagement.freshKey()
|
||||
val bobsKey = bobNode.keyManagement.keys.single()
|
||||
val bobsFakeCash = fillUpForBuyer(false, AnonymousParty(bobsKey),
|
||||
DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
|
Loading…
Reference in New Issue
Block a user