mirror of
https://github.com/corda/corda.git
synced 2025-02-20 17:33:15 +00:00
Add more formal support for observer/regulator nodes. This is a simplistic
approach which assumes a dedicated node for observers: states that are reported to the node will appear in the database and update feeds as normal. Apps that expect all updates to be relevant to themselves may need adjusting if they run on an observer node too, but this is likely to be rare.
This commit is contained in:
parent
d5088c600b
commit
4728062455
1
.idea/compiler.xml
generated
1
.idea/compiler.xml
generated
@ -39,6 +39,7 @@
|
||||
<module name="explorer-capsule_test" target="1.6" />
|
||||
<module name="explorer_main" target="1.8" />
|
||||
<module name="explorer_test" target="1.8" />
|
||||
<module name="finance_integrationTest" target="1.8" />
|
||||
<module name="finance_main" target="1.8" />
|
||||
<module name="finance_test" target="1.8" />
|
||||
<module name="gradle-plugins-cordform-common_main" target="1.8" />
|
||||
|
@ -3,6 +3,7 @@ package net.corda.core.flows
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.security.SignatureException
|
||||
@ -13,26 +14,42 @@ import java.security.SignatureException
|
||||
* This flow is a combination of [FlowSession.receive], resolve and [SignedTransaction.verify]. This flow will receive the
|
||||
* [SignedTransaction] and perform the resolution back-and-forth required to check the dependencies and download any missing
|
||||
* attachments. The flow will return the [SignedTransaction] after it is resolved and then verified using [SignedTransaction.verify].
|
||||
*
|
||||
* @param otherSideSession session to the other side which is calling [SendTransactionFlow].
|
||||
* @param checkSufficientSignatures if true checks all required signatures are present. See [SignedTransaction.verify].
|
||||
*
|
||||
* Please note that it will *not* store the transaction to the vault unless that is explicitly requested.
|
||||
*
|
||||
* @property otherSideSession session to the other side which is calling [SendTransactionFlow].
|
||||
* @property checkSufficientSignatures if true checks all required signatures are present. See [SignedTransaction.verify].
|
||||
* @property statesToRecord which transaction states should be recorded in the vault, if any.
|
||||
*/
|
||||
class ReceiveTransactionFlow(private val otherSideSession: FlowSession,
|
||||
private val checkSufficientSignatures: Boolean) : FlowLogic<SignedTransaction>() {
|
||||
/** Receives a [SignedTransaction] from [otherSideSession], verifies it and then records it in the vault. */
|
||||
constructor(otherSideSession: FlowSession) : this(otherSideSession, true)
|
||||
|
||||
class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
|
||||
private val checkSufficientSignatures: Boolean = true,
|
||||
private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic<SignedTransaction>() {
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
@Suspendable
|
||||
@Throws(SignatureException::class,
|
||||
AttachmentResolutionException::class,
|
||||
TransactionResolutionException::class,
|
||||
TransactionVerificationException::class)
|
||||
override fun call(): SignedTransaction {
|
||||
return otherSideSession.receive<SignedTransaction>().unwrap {
|
||||
if (checkSufficientSignatures) {
|
||||
logger.trace("Receiving a transaction from ${otherSideSession.counterparty}")
|
||||
} else {
|
||||
logger.trace("Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}")
|
||||
}
|
||||
|
||||
val stx = otherSideSession.receive<SignedTransaction>().unwrap {
|
||||
subFlow(ResolveTransactionsFlow(it, otherSideSession))
|
||||
it.verify(serviceHub, checkSufficientSignatures)
|
||||
it
|
||||
}
|
||||
|
||||
if (checkSufficientSignatures) {
|
||||
// We should only send a transaction to the vault for processing if we did in fact fully verify it, and
|
||||
// there are no missing signatures. We don't want partly signed stuff in the vault.
|
||||
logger.trace("Successfully received fully signed tx ${stx.id}, sending to the vault for processing")
|
||||
serviceHub.recordTransactions(statesToRecord, setOf(stx))
|
||||
}
|
||||
return stx
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.exactAdd
|
||||
@ -93,7 +94,7 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
// half way through, it's no big deal, although it might result in us attempting to re-download data
|
||||
// redundantly next time we attempt verification.
|
||||
it.verify(serviceHub)
|
||||
serviceHub.recordTransactions(false, it)
|
||||
serviceHub.recordTransactions(StatesToRecord.NONE, listOf(it))
|
||||
}
|
||||
|
||||
return signedTransaction?.let {
|
||||
|
@ -47,6 +47,26 @@ interface ServicesForResolution {
|
||||
fun loadState(stateRef: StateRef): TransactionState<*>
|
||||
}
|
||||
|
||||
/**
|
||||
* Controls whether the transaction is sent to the vault at all, and if so whether states have to be relevant
|
||||
* or not in order to be recorded. Used in [ServiceHub.recordTransactions]
|
||||
*/
|
||||
enum class StatesToRecord {
|
||||
/** The received transaction is not sent to the vault at all. This is used within transaction resolution. */
|
||||
NONE,
|
||||
/**
|
||||
* All states that can be seen in the transaction will be recorded by the vault, even if none of the identities
|
||||
* on this node are a participant or owner.
|
||||
*/
|
||||
ALL_VISIBLE,
|
||||
/**
|
||||
* Only states that involve one of our public keys will be stored in the vault. This is the default. A public
|
||||
* key is involved (relevant) if it's in the [OwnableState.owner] field, or appears in the [ContractState.participants]
|
||||
* collection. This is usually equivalent to "can I change the contents of this state by signing a transaction".
|
||||
*/
|
||||
ONLY_RELEVANT
|
||||
}
|
||||
|
||||
/**
|
||||
* A service hub is the starting point for most operations you can do inside the node. You are provided with one
|
||||
* when a class annotated with [CordaService] is constructed, and you have access to one inside flows. Most RPCs
|
||||
@ -129,7 +149,9 @@ interface ServiceHub : ServicesForResolution {
|
||||
* @param txs The transactions to record.
|
||||
* @param notifyVault indicate if the vault should be notified for the update.
|
||||
*/
|
||||
fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>)
|
||||
fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
recordTransactions(if (notifyVault) StatesToRecord.ONLY_RELEVANT else StatesToRecord.NONE, txs)
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
|
||||
@ -139,12 +161,22 @@ interface ServiceHub : ServicesForResolution {
|
||||
recordTransactions(notifyVault, listOf(first, *remaining))
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
|
||||
* further processing if [statesToRecord] is not [StatesToRecord.NONE].
|
||||
* This is expected to be run within a database transaction.
|
||||
*
|
||||
* @param txs The transactions to record.
|
||||
* @param statesToRecord how the vault should treat the output states of the transaction.
|
||||
*/
|
||||
fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>)
|
||||
|
||||
/**
|
||||
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
|
||||
* further processing. This is expected to be run within a database transaction.
|
||||
*/
|
||||
fun recordTransactions(first: SignedTransaction, vararg remaining: SignedTransaction) {
|
||||
recordTransactions(true, first, *remaining)
|
||||
recordTransactions(listOf(first, *remaining))
|
||||
}
|
||||
|
||||
/**
|
||||
@ -152,7 +184,7 @@ interface ServiceHub : ServicesForResolution {
|
||||
* further processing. This is expected to be run within a database transaction.
|
||||
*/
|
||||
fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
recordTransactions(true, txs)
|
||||
recordTransactions(StatesToRecord.ONLY_RELEVANT, txs)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3,6 +3,12 @@ Release notes
|
||||
|
||||
Here are release notes for each snapshot release from M9 onwards.
|
||||
|
||||
Release 2.0
|
||||
----------
|
||||
|
||||
Support for observer/regulator nodes has returned. Read :doc:`tutorial-observer-nodes` to learn more or examine the
|
||||
interest rate swaps demo.
|
||||
|
||||
Release 1.0
|
||||
-----------
|
||||
Corda 1.0 is finally here!
|
||||
|
36
docs/source/tutorial-observer-nodes.rst
Normal file
36
docs/source/tutorial-observer-nodes.rst
Normal file
@ -0,0 +1,36 @@
|
||||
.. highlight:: kotlin
|
||||
.. raw:: html
|
||||
|
||||
<script type="text/javascript" src="_static/jquery.js"></script>
|
||||
<script type="text/javascript" src="_static/codesets.js"></script>
|
||||
|
||||
Observer nodes
|
||||
==============
|
||||
|
||||
Posting transactions to an observer node is a common requirement in finance, where regulators often want
|
||||
to receive comprehensive reporting on all actions taken. By running their own node, regulators can receive a stream
|
||||
of digitally signed, de-duplicated reports useful for later processing.
|
||||
|
||||
Adding support for observer nodes to your application is easy. The IRS (interest rate swap) demo shows to do it.
|
||||
|
||||
Just define a new flow that wraps the SendTransactionFlow/ReceiveTransactionFlow, as follows:
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. literalinclude:: ../../samples/irs-demo/cordapp/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 1
|
||||
:end-before: DOCEND 1
|
||||
|
||||
In this example, the ``AutoOfferFlow`` is the business logic, and we define two very short and simple flows to send
|
||||
the transaction to the regulator. There are two important aspects to note here:
|
||||
|
||||
1. The ``ReportToRegulatorFlow`` is marked as an ``@InitiatingFlow`` because it will start a new conversation, context
|
||||
free, with the regulator.
|
||||
2. The ``ReceiveRegulatoryReportFlow`` uses ``ReceiveTransactionFlow`` in a special way - it tells it to send the
|
||||
transaction to the vault for processing, including all states even if not involving our public keys. This is required
|
||||
because otherwise the vault will ignore states that don't list any of the node's public keys, but in this case,
|
||||
we do want to passively observe states we can't change. So overriding this behaviour is required.
|
||||
|
||||
If the states define a relational mapping (see :doc:`api-persistence`) then the regulator will be able to query the
|
||||
reports from their database and observe new transactions coming in via RPC.
|
@ -18,4 +18,5 @@ Tutorials
|
||||
oracles
|
||||
tutorial-tear-offs
|
||||
tutorial-attachments
|
||||
event-scheduling
|
||||
event-scheduling
|
||||
tutorial-observer-nodes
|
@ -727,9 +727,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return flowFactories[initiatingFlowClass]
|
||||
}
|
||||
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
database.transaction {
|
||||
super.recordTransactions(notifyVault, txs)
|
||||
super.recordTransactions(statesToRecord, txs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.contracts.requireThat
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.ContractUpgradeUtils
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
|
||||
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
|
||||
@ -16,8 +17,7 @@ import net.corda.core.transactions.SignedTransaction
|
||||
class FinalityHandler(private val sender: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val stx = subFlow(ReceiveTransactionFlow(sender))
|
||||
serviceHub.recordTransactions(stx)
|
||||
subFlow(ReceiveTransactionFlow(sender, true, StatesToRecord.ONLY_RELEVANT))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.messaging.StateMachineTransactionMapping
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -88,8 +89,7 @@ interface ServiceHubInternal : ServiceHub {
|
||||
val networkService: MessagingService
|
||||
val database: CordaPersistence
|
||||
val configuration: NodeConfiguration
|
||||
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
require(txs.any()) { "No transactions passed in for recording" }
|
||||
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
|
||||
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
|
||||
@ -101,9 +101,9 @@ interface ServiceHubInternal : ServiceHub {
|
||||
log.warn("Transactions recorded from outside of a state machine")
|
||||
}
|
||||
|
||||
if (notifyVault) {
|
||||
if (statesToRecord != StatesToRecord.NONE) {
|
||||
val toNotify = recordedTransactions.map { if (it.isNotaryChangeTransaction()) it.notaryChangeTx else it.tx }
|
||||
(vaultService as NodeVaultService).notifyAll(toNotify)
|
||||
(vaultService as NodeVaultService).notifyAll(statesToRecord, toNotify)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,8 +38,7 @@ import java.util.*
|
||||
import javax.persistence.Tuple
|
||||
|
||||
/**
|
||||
* Currently, the node vault service is a very simple RDBMS backed implementation. It will change significantly when
|
||||
* we add further functionality as the design for the vault and vault service matures.
|
||||
* The vault service handles storage, retrieval and querying of states.
|
||||
*
|
||||
* This class needs database transactions to be in-flight during method calls and init, and will throw exceptions if
|
||||
* this is not the case.
|
||||
@ -48,7 +47,6 @@ import javax.persistence.Tuple
|
||||
* TODO: have transaction storage do some caching.
|
||||
*/
|
||||
class NodeVaultService(private val services: ServiceHub, private val hibernateConfig: HibernateConfiguration) : SingletonSerializeAsToken(), VaultService {
|
||||
|
||||
private companion object {
|
||||
val log = loggerFor<NodeVaultService>()
|
||||
}
|
||||
@ -112,7 +110,10 @@ class NodeVaultService(private val services: ServiceHub, private val hibernateCo
|
||||
* indicate whether an update consists entirely of regular or notary change transactions, which may require
|
||||
* different processing logic.
|
||||
*/
|
||||
fun notifyAll(txns: Iterable<CoreTransaction>) {
|
||||
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) {
|
||||
if (statesToRecord == StatesToRecord.NONE)
|
||||
return
|
||||
|
||||
// It'd be easier to just group by type, but then we'd lose ordering.
|
||||
val regularTxns = mutableListOf<WireTransaction>()
|
||||
val notaryChangeTxns = mutableListOf<NotaryChangeWireTransaction>()
|
||||
@ -122,33 +123,36 @@ class NodeVaultService(private val services: ServiceHub, private val hibernateCo
|
||||
is WireTransaction -> {
|
||||
regularTxns.add(tx)
|
||||
if (notaryChangeTxns.isNotEmpty()) {
|
||||
notifyNotaryChange(notaryChangeTxns.toList())
|
||||
notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
|
||||
notaryChangeTxns.clear()
|
||||
}
|
||||
}
|
||||
is NotaryChangeWireTransaction -> {
|
||||
notaryChangeTxns.add(tx)
|
||||
if (regularTxns.isNotEmpty()) {
|
||||
notifyRegular(regularTxns.toList())
|
||||
notifyRegular(regularTxns.toList(), statesToRecord)
|
||||
regularTxns.clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (regularTxns.isNotEmpty()) notifyRegular(regularTxns.toList())
|
||||
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList())
|
||||
if (regularTxns.isNotEmpty()) notifyRegular(regularTxns.toList(), statesToRecord)
|
||||
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
|
||||
}
|
||||
|
||||
/** Same as notifyAll but with a single transaction. */
|
||||
fun notify(tx: CoreTransaction) = notifyAll(listOf(tx))
|
||||
|
||||
private fun notifyRegular(txns: Iterable<WireTransaction>) {
|
||||
private fun notifyRegular(txns: Iterable<WireTransaction>, statesToRecord: StatesToRecord) {
|
||||
fun makeUpdate(tx: WireTransaction): Vault.Update<ContractState> {
|
||||
val myKeys = services.keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } })
|
||||
val ourNewStates = tx.outputs.
|
||||
filter { isRelevant(it.data, myKeys.toSet()) }.
|
||||
map { tx.outRef<ContractState>(it.data) }
|
||||
|
||||
val ourNewStates = when (statesToRecord) {
|
||||
StatesToRecord.NONE -> throw AssertionError("Should not reach here")
|
||||
StatesToRecord.ONLY_RELEVANT -> tx.outputs.filter { isRelevant(it.data, myKeys.toSet()) }
|
||||
StatesToRecord.ALL_VISIBLE -> tx.outputs
|
||||
}.map { tx.outRef<ContractState>(it.data) }
|
||||
|
||||
// Retrieve all unconsumed states for this transaction's inputs
|
||||
val consumedStates = loadStates(tx.inputs)
|
||||
@ -166,7 +170,7 @@ class NodeVaultService(private val services: ServiceHub, private val hibernateCo
|
||||
processAndNotify(netDelta)
|
||||
}
|
||||
|
||||
private fun notifyNotaryChange(txns: Iterable<NotaryChangeWireTransaction>) {
|
||||
private fun notifyNotaryChange(txns: Iterable<NotaryChangeWireTransaction>, statesToRecord: StatesToRecord) {
|
||||
fun makeUpdate(tx: NotaryChangeWireTransaction): Vault.Update<ContractState> {
|
||||
// We need to resolve the full transaction here because outputs are calculated from inputs
|
||||
// We also can't do filtering beforehand, since output encumbrance pointers get recalculated based on
|
||||
@ -175,7 +179,12 @@ class NodeVaultService(private val services: ServiceHub, private val hibernateCo
|
||||
val myKeys = services.keyManagementService.filterMyKeys(ltx.outputs.flatMap { it.data.participants.map { it.owningKey } })
|
||||
val (consumedStateAndRefs, producedStates) = ltx.inputs.
|
||||
zip(ltx.outputs).
|
||||
filter { (_, output) -> isRelevant(output.data, myKeys.toSet()) }.
|
||||
filter { (_, output) ->
|
||||
if (statesToRecord == StatesToRecord.ONLY_RELEVANT)
|
||||
isRelevant(output.data, myKeys.toSet())
|
||||
else
|
||||
true
|
||||
}.
|
||||
unzip()
|
||||
|
||||
val producedStateAndRefs = producedStates.map { ltx.outRef<ContractState>(it.data) }
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.days
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.toFuture
|
||||
@ -14,7 +15,6 @@ import net.corda.finance.schemas.CashSchemaV1
|
||||
import net.corda.finance.schemas.SampleCashSchemaV2
|
||||
import net.corda.finance.schemas.SampleCashSchemaV3
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
@ -48,7 +48,6 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
|
||||
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(), createSchemaService, ::makeTestIdentityService)
|
||||
|
||||
database.transaction {
|
||||
|
||||
services = object : MockServices(BOB_KEY) {
|
||||
override val vaultService: VaultService get() {
|
||||
val vaultService = NodeVaultService(this, database.hibernateConfig)
|
||||
@ -61,7 +60,7 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
|
||||
validatedTransactions.addTransaction(stx)
|
||||
}
|
||||
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
|
||||
(vaultService as NodeVaultService).notifyAll(txs.map { it.tx })
|
||||
(vaultService as NodeVaultService).notifyAll(StatesToRecord.ONLY_RELEVANT, txs.map { it.tx })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultService
|
||||
@ -88,8 +89,7 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
|
||||
for (stx in txs) {
|
||||
validatedTransactions.addTransaction(stx)
|
||||
}
|
||||
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
|
||||
(vaultService as NodeVaultService).notifyAll(txs.map { it.tx })
|
||||
(vaultService as NodeVautService).notifyAll(statesToRecord, txs.map { it.tx })
|
||||
}
|
||||
override fun jdbcSession() = database.createSession()
|
||||
}
|
||||
@ -886,4 +886,4 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
|
||||
Assert.assertEquals(cashStates.count(), count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,15 +1,15 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.Issued
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.QueryCriteria.*
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
@ -29,7 +29,6 @@ import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.fillWithSomeTestCash
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.After
|
||||
@ -99,10 +98,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
val originalVault = vaultService
|
||||
val services2 = object : MockServices() {
|
||||
override val vaultService: NodeVaultService get() = originalVault as NodeVaultService
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
validatedTransactions.addTransaction(stx)
|
||||
vaultService.notify(stx.tx)
|
||||
vaultService.notify(statesToRecord, stx.tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -510,14 +509,14 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
}.toWireTransaction(services)
|
||||
val cashState = StateAndRef(issueTx.outputs.single(), StateRef(issueTx.id, 0))
|
||||
|
||||
database.transaction { service.notify(issueTx) }
|
||||
database.transaction { service.notify(StatesToRecord.ONLY_RELEVANT, issueTx) }
|
||||
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null)
|
||||
|
||||
database.transaction {
|
||||
val moveTx = TransactionBuilder(services.myInfo.chooseIdentity()).apply {
|
||||
Cash.generateSpend(services, this, Amount(1000, GBP), thirdPartyIdentity)
|
||||
}.toWireTransaction(services)
|
||||
service.notify(moveTx)
|
||||
service.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
|
||||
}
|
||||
val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null)
|
||||
|
||||
@ -554,7 +553,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
val cashStateWithNewNotary = StateAndRef(initialCashState.state.copy(notary = newNotary), StateRef(changeNotaryTx.id, 0))
|
||||
|
||||
database.transaction {
|
||||
service.notifyAll(listOf(issueStx.tx, changeNotaryTx))
|
||||
service.notifyAll(StatesToRecord.ONLY_RELEVANT, listOf(issueStx.tx, changeNotaryTx))
|
||||
}
|
||||
|
||||
// Move cash
|
||||
@ -565,7 +564,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
service.notify(moveTx)
|
||||
service.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
|
||||
}
|
||||
|
||||
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null)
|
||||
@ -575,4 +574,32 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
val observedUpdates = vaultSubscriber.onNextEvents
|
||||
assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun observerMode() {
|
||||
fun countCash(): Long {
|
||||
return database.transaction {
|
||||
vaultService.queryBy(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(), PageSpecification(1)).totalStatesAvailable
|
||||
}
|
||||
}
|
||||
val currentCashStates = countCash()
|
||||
|
||||
// Send some minimalist dummy transaction.
|
||||
val txb = TransactionBuilder(DUMMY_NOTARY)
|
||||
txb.addOutputState(Cash.State(MEGA_CORP.ref(0), 100.DOLLARS, MINI_CORP), Cash::class.java.name)
|
||||
txb.addCommand(Cash.Commands.Move(), MEGA_CORP_PUBKEY)
|
||||
val wtx = txb.toWireTransaction(services)
|
||||
database.transaction {
|
||||
vaultService.notify(StatesToRecord.ONLY_RELEVANT, wtx)
|
||||
}
|
||||
|
||||
// Check that it was ignored as irrelevant.
|
||||
assertEquals(currentCashStates, countCash())
|
||||
|
||||
// Now try again and check it was accepted.
|
||||
database.transaction {
|
||||
vaultService.notify(StatesToRecord.ALL_VISIBLE, wtx)
|
||||
}
|
||||
assertEquals(currentCashStates + 1, countCash())
|
||||
}
|
||||
}
|
||||
|
@ -2,8 +2,10 @@ package net.corda.irs.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.excludeHostNode
|
||||
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.finance.contracts.DealState
|
||||
@ -23,7 +25,6 @@ object AutoOfferFlow {
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class Requester(val dealToBeOffered: DealState) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
companion object {
|
||||
object RECEIVED : ProgressTracker.Step("Received API call")
|
||||
object DEALING : ProgressTracker.Step("Starting the deal flow") {
|
||||
@ -55,11 +56,45 @@ object AutoOfferFlow {
|
||||
AutoOffer(notary, dealToBeOffered),
|
||||
progressTracker.getChildProgressTracker(DEALING)!!
|
||||
)
|
||||
val stx = subFlow(instigator)
|
||||
return stx
|
||||
return subFlow(instigator)
|
||||
}
|
||||
}
|
||||
|
||||
// DOCSTART 1
|
||||
@InitiatedBy(Requester::class)
|
||||
class AutoOfferAcceptor(otherSideSession: FlowSession) : Acceptor(otherSideSession)
|
||||
class AutoOfferAcceptor(otherSideSession: FlowSession) : Acceptor(otherSideSession) {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val finalTx = super.call()
|
||||
// Our transaction is now committed to the ledger, so report it to our regulator. We use a custom flow
|
||||
// that wraps SendTransactionFlow to allow the receiver to customise how ReceiveTransactionFlow is run,
|
||||
// and because in a real life app you'd probably have more complex logic here e.g. describing why the report
|
||||
// was filed, checking that the reportee is a regulated entity and not some random node from the wrong
|
||||
// country and so on.
|
||||
val regulator = serviceHub.identityService.partiesFromName("Regulator", true).single()
|
||||
subFlow(ReportToRegulatorFlow(regulator, finalTx))
|
||||
return finalTx
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
class ReportToRegulatorFlow(private val regulator: Party, private val finalTx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val session = initiateFlow(regulator)
|
||||
subFlow(SendTransactionFlow(session, finalTx))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(ReportToRegulatorFlow::class)
|
||||
class ReceiveRegulatoryReportFlow(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Start the matching side of SendTransactionFlow above, but tell it to record all visible states even
|
||||
// though they (as far as the node can tell) are nothing to do with us.
|
||||
subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
||||
// DOCEND 1
|
||||
}
|
||||
|
@ -7,6 +7,9 @@ import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
@ -88,7 +91,7 @@ open class MockServices(cordappPackages: List<String> = emptyList(), vararg val
|
||||
* Makes database and mock services appropriate for unit tests.
|
||||
*
|
||||
* @param customSchemas a set of schemas being used by [NodeSchemaService]
|
||||
* @param keys a lis of [KeyPair] instances to be used by [MockServices]. Defualts to [MEGA_CORP_KEY]
|
||||
* @param keys a list of [KeyPair] instances to be used by [MockServices]. Defaults to [MEGA_CORP_KEY]
|
||||
* @param createIdentityService a lambda function returning an instance of [IdentityService]. Defauts to [InMemoryIdentityService].
|
||||
*
|
||||
* @return a pair where the first element is the instance of [CordaPersistence] and the second is [MockServices].
|
||||
@ -108,12 +111,12 @@ open class MockServices(cordappPackages: List<String> = emptyList(), vararg val
|
||||
override val identityService: IdentityService = database.transaction { identityServiceRef }
|
||||
override val vaultService: VaultService = makeVaultService(database.hibernateConfig)
|
||||
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
validatedTransactions.addTransaction(stx)
|
||||
}
|
||||
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
|
||||
(vaultService as NodeVaultService).notifyAll(txs.map { it.tx })
|
||||
(vaultService as NodeVaultService).notifyAll(statesToRecord, txs.map { it.tx })
|
||||
}
|
||||
|
||||
override fun jdbcSession(): Connection = database.createSession()
|
||||
@ -129,7 +132,7 @@ open class MockServices(cordappPackages: List<String> = emptyList(), vararg val
|
||||
|
||||
val key: KeyPair get() = keys.first()
|
||||
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
txs.forEach {
|
||||
stateMachineRecordedTransactionMapping.addMapping(StateMachineRunId.createRandom(), it.id)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user