mirror of
https://github.com/corda/corda.git
synced 2025-06-16 22:28:15 +00:00
Moved the initiated-side of core flows in net.corda.flows to net.corda.node.services. They are not meant to be visible to end-users.
This commit is contained in:
@ -24,6 +24,7 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.flows.*
|
||||
import net.corda.node.services.*
|
||||
import net.corda.node.services.api.*
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
@ -92,7 +93,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
CashIssueFlow::class.java to setOf(Amount::class.java, OpaqueBytes::class.java, Party::class.java),
|
||||
CashPaymentFlow::class.java to setOf(Amount::class.java, Party::class.java),
|
||||
FinalityFlow::class.java to emptySet(),
|
||||
ContractUpgradeFlow.Instigator::class.java to emptySet()
|
||||
ContractUpgradeFlow::class.java to emptySet()
|
||||
)
|
||||
}
|
||||
|
||||
@ -270,8 +271,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
installCoreFlow(FetchTransactionsFlow::class) { otherParty, _ -> FetchTransactionsHandler(otherParty) }
|
||||
installCoreFlow(FetchAttachmentsFlow::class) { otherParty, _ -> FetchAttachmentsHandler(otherParty) }
|
||||
installCoreFlow(BroadcastTransactionFlow::class) { otherParty, _ -> NotifyTransactionHandler(otherParty) }
|
||||
installCoreFlow(NotaryChangeFlow.Instigator::class) { otherParty, _ -> NotaryChangeFlow.Acceptor(otherParty) }
|
||||
installCoreFlow(ContractUpgradeFlow.Instigator::class) { otherParty, _ -> ContractUpgradeFlow.Acceptor(otherParty) }
|
||||
installCoreFlow(NotaryChangeFlow::class) { otherParty, _ -> NotaryChangeHandler(otherParty) }
|
||||
installCoreFlow(ContractUpgradeFlow::class) { otherParty, _ -> ContractUpgradeHandler(otherParty) }
|
||||
}
|
||||
|
||||
/**
|
||||
|
124
node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt
Normal file
124
node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt
Normal file
@ -0,0 +1,124 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.contracts.UpgradedContract
|
||||
import net.corda.core.contracts.requireThat
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.*
|
||||
|
||||
/**
|
||||
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
|
||||
* glue that sits between the network layer and the database layer.
|
||||
*
|
||||
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
|
||||
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
|
||||
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
|
||||
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
|
||||
* such the hash of a piece of data can be seen as a type of password allowing access to it.
|
||||
*
|
||||
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
|
||||
*/
|
||||
class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
|
||||
override fun getData(id: SecureHash): SignedTransaction? {
|
||||
return serviceHub.storageService.validatedTransactions.getTransaction(id)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
|
||||
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
|
||||
override fun getData(id: SecureHash): ByteArray? {
|
||||
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
|
||||
}
|
||||
}
|
||||
|
||||
abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
@Throws(FetchDataFlow.HashNotFound::class)
|
||||
override fun call() {
|
||||
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
|
||||
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
|
||||
it
|
||||
}
|
||||
val response = request.hashes.map {
|
||||
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
|
||||
}
|
||||
send(otherParty, response)
|
||||
}
|
||||
|
||||
protected abstract fun getData(id: SecureHash): T?
|
||||
}
|
||||
|
||||
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
|
||||
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
|
||||
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
|
||||
// cash without from unknown parties?
|
||||
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
|
||||
serviceHub.recordTransactions(request.tx)
|
||||
}
|
||||
}
|
||||
|
||||
class NotaryChangeHandler(otherSide: Party) : AbstractStateReplacementFlow.Acceptor<Party>(otherSide) {
|
||||
/**
|
||||
* Check the notary change proposal.
|
||||
*
|
||||
* For example, if the proposed new notary has the same behaviour (e.g. both are non-validating)
|
||||
* and is also in a geographically convenient location we can just automatically approve the change.
|
||||
* TODO: In more difficult cases this should call for human attention to manually verify and approve the proposal
|
||||
*/
|
||||
override fun verifyProposal(proposal: AbstractStateReplacementFlow.Proposal<Party>): Unit {
|
||||
val state = proposal.stateRef
|
||||
val proposedTx = proposal.stx.tx
|
||||
|
||||
if (proposedTx.type !is TransactionType.NotaryChange) {
|
||||
throw StateReplacementException("The proposed transaction is not a notary change transaction.")
|
||||
}
|
||||
|
||||
val newNotary = proposal.modification
|
||||
val isNotary = serviceHub.networkMapCache.notaryNodes.any { it.notaryIdentity == newNotary }
|
||||
if (!isNotary) {
|
||||
throw StateReplacementException("The proposed node $newNotary does not run a Notary service")
|
||||
}
|
||||
if (state !in proposedTx.inputs) {
|
||||
throw StateReplacementException("The proposed state $state is not in the proposed transaction inputs")
|
||||
}
|
||||
|
||||
// // An example requirement
|
||||
// val blacklist = listOf("Evil Notary")
|
||||
// checkProposal(newNotary.name !in blacklist) {
|
||||
// "The proposed new notary $newNotary is not trusted by the party"
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
class ContractUpgradeHandler(otherSide: Party) : AbstractStateReplacementFlow.Acceptor<Class<out UpgradedContract<ContractState, *>>>(otherSide) {
|
||||
@Suspendable
|
||||
@Throws(StateReplacementException::class)
|
||||
override fun verifyProposal(proposal: AbstractStateReplacementFlow.Proposal<Class<out UpgradedContract<ContractState, *>>>) {
|
||||
// Retrieve signed transaction from our side, we will apply the upgrade logic to the transaction on our side, and
|
||||
// verify outputs matches the proposed upgrade.
|
||||
val stx = subFlow(FetchTransactionsFlow(setOf(proposal.stateRef.txhash), otherSide)).fromDisk.singleOrNull()
|
||||
requireNotNull(stx) { "We don't have a copy of the referenced state" }
|
||||
val oldStateAndRef = stx!!.tx.outRef<ContractState>(proposal.stateRef.index)
|
||||
val authorisedUpgrade = serviceHub.vaultService.getAuthorisedContractUpgrade(oldStateAndRef.ref) ?:
|
||||
throw IllegalStateException("Contract state upgrade is unauthorised. State hash : ${oldStateAndRef.ref}")
|
||||
val proposedTx = proposal.stx.tx
|
||||
val expectedTx = ContractUpgradeFlow.assembleBareTx(oldStateAndRef, proposal.modification).toWireTransaction()
|
||||
requireThat {
|
||||
"The instigator is one of the participants" using (otherSide.owningKey in oldStateAndRef.state.data.participants)
|
||||
"The proposed upgrade ${proposal.modification.javaClass} is a trusted upgrade path" using (proposal.modification == authorisedUpgrade)
|
||||
"The proposed tx matches the expected tx for this upgrade" using (proposedTx == expectedTx)
|
||||
}
|
||||
ContractUpgradeFlow.verify(oldStateAndRef.state.data, expectedTx.outRef<ContractState>(0).state.data, expectedTx.commands.single())
|
||||
}
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.*
|
||||
|
||||
/**
|
||||
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
|
||||
* glue that sits between the network layer and the database layer.
|
||||
*
|
||||
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
|
||||
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
|
||||
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
|
||||
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
|
||||
* such the hash of a piece of data can be seen as a type of password allowing access to it.
|
||||
*
|
||||
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
|
||||
*/
|
||||
class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
|
||||
override fun getData(id: SecureHash): SignedTransaction? {
|
||||
return serviceHub.storageService.validatedTransactions.getTransaction(id)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
|
||||
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
|
||||
override fun getData(id: SecureHash): ByteArray? {
|
||||
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
|
||||
}
|
||||
}
|
||||
|
||||
abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
@Throws(FetchDataFlow.HashNotFound::class)
|
||||
override fun call() {
|
||||
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
|
||||
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
|
||||
it
|
||||
}
|
||||
val response = request.hashes.map {
|
||||
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
|
||||
}
|
||||
send(otherParty, response)
|
||||
}
|
||||
|
||||
protected abstract fun getData(id: SecureHash): T?
|
||||
}
|
||||
|
||||
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
|
||||
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
|
||||
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
|
||||
// cash without from unknown parties?
|
||||
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
|
||||
serviceHub.recordTransactions(request.tx)
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.NotaryFlow
|
||||
import net.corda.flows.TransactionParts
|
||||
|
||||
class NonValidatingNotaryFlow(otherSide: Party,
|
||||
timestampChecker: TimestampChecker,
|
||||
uniquenessProvider: UniquenessProvider) : NotaryFlow.Service(otherSide, timestampChecker, uniquenessProvider) {
|
||||
/**
|
||||
* The received transaction is not checked for contract-validity, as that would require fully
|
||||
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction
|
||||
* history chain.
|
||||
* As a result, the Notary _will commit invalid transactions_ as well, but as it also records the identity of
|
||||
* the caller, it is possible to raise a dispute and verify the validity of the transaction and subsequently
|
||||
* undo the commit of the input states (the exact mechanism still needs to be worked out).
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
val ftx = receive<FilteredTransaction>(otherSide).unwrap {
|
||||
it.verify()
|
||||
it
|
||||
}
|
||||
return TransactionParts(ftx.rootHash, ftx.filteredLeaves.inputs, ftx.filteredLeaves.timestamp)
|
||||
}
|
||||
}
|
@ -3,7 +3,6 @@ package net.corda.node.services.transactions
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.flows.NonValidatingNotaryFlow
|
||||
|
||||
/** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
|
||||
class RaftNonValidatingNotaryService(val timestampChecker: TimestampChecker,
|
||||
|
@ -3,7 +3,6 @@ package net.corda.node.services.transactions
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.flows.ValidatingNotaryFlow
|
||||
|
||||
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
|
||||
class RaftValidatingNotaryService(val timestampChecker: TimestampChecker,
|
||||
|
@ -5,7 +5,6 @@ import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.flows.NonValidatingNotaryFlow
|
||||
|
||||
/** A simple Notary service that does not perform transaction validation */
|
||||
class SimpleNotaryService(val timestampChecker: TimestampChecker,
|
||||
|
@ -0,0 +1,62 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.*
|
||||
import java.security.SignatureException
|
||||
|
||||
/**
|
||||
* A notary commit flow that makes sure a given transaction is valid before committing it. This does mean that the calling
|
||||
* party has to reveal the whole transaction history; however, we avoid complex conflict resolution logic where a party
|
||||
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
|
||||
* indeed valid.
|
||||
*/
|
||||
class ValidatingNotaryFlow(otherSide: Party,
|
||||
timestampChecker: TimestampChecker,
|
||||
uniquenessProvider: UniquenessProvider) :
|
||||
NotaryFlow.Service(otherSide, timestampChecker, uniquenessProvider) {
|
||||
/**
|
||||
* The received transaction is checked for contract-validity, which requires fully resolving it into a
|
||||
* [TransactionForVerification], for which the caller also has to to reveal the whole transaction
|
||||
* dependency chain.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
val stx = receive<SignedTransaction>(otherSide).unwrap { it }
|
||||
checkSignatures(stx)
|
||||
val wtx = stx.tx
|
||||
validateTransaction(wtx)
|
||||
return TransactionParts(wtx.id, wtx.inputs, wtx.timestamp)
|
||||
}
|
||||
|
||||
private fun checkSignatures(stx: SignedTransaction) {
|
||||
try {
|
||||
stx.verifySignatures(serviceHub.myInfo.notaryIdentity.owningKey)
|
||||
} catch(e: SignedTransaction.SignaturesMissingException) {
|
||||
throw NotaryException(NotaryError.SignaturesMissing(e))
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun validateTransaction(wtx: WireTransaction) {
|
||||
try {
|
||||
resolveTransaction(wtx)
|
||||
wtx.toLedgerTransaction(serviceHub).verify()
|
||||
} catch (e: Exception) {
|
||||
throw when (e) {
|
||||
is TransactionVerificationException -> NotaryException(NotaryError.TransactionInvalid(e.toString()))
|
||||
is SignatureException -> NotaryException(NotaryError.SignaturesInvalid(e.toString()))
|
||||
else -> e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun resolveTransaction(wtx: WireTransaction) = subFlow(ResolveTransactionsFlow(wtx, otherSide))
|
||||
}
|
@ -5,7 +5,6 @@ import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.flows.ValidatingNotaryFlow
|
||||
|
||||
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
|
||||
class ValidatingNotaryService(val timestampChecker: TimestampChecker,
|
||||
|
@ -2,14 +2,13 @@ package net.corda.node.services
|
||||
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.flows.NotaryChangeFlow.Instigator
|
||||
import net.corda.flows.NotaryChangeFlow
|
||||
import net.corda.flows.StateReplacementException
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
@ -48,7 +47,7 @@ class NotaryChangeTests {
|
||||
fun `should change notary for a state with single participant`() {
|
||||
val state = issueState(clientNodeA, oldNotaryNode)
|
||||
val newNotary = newNotaryNode.info.notaryIdentity
|
||||
val flow = Instigator(state, newNotary)
|
||||
val flow = NotaryChangeFlow(state, newNotary)
|
||||
val future = clientNodeA.services.startFlow(flow)
|
||||
|
||||
net.runNetwork()
|
||||
@ -61,7 +60,7 @@ class NotaryChangeTests {
|
||||
fun `should change notary for a state with multiple participants`() {
|
||||
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
|
||||
val newNotary = newNotaryNode.info.notaryIdentity
|
||||
val flow = Instigator(state, newNotary)
|
||||
val flow = NotaryChangeFlow(state, newNotary)
|
||||
val future = clientNodeA.services.startFlow(flow)
|
||||
|
||||
net.runNetwork()
|
||||
@ -77,7 +76,7 @@ class NotaryChangeTests {
|
||||
fun `should throw when a participant refuses to change Notary`() {
|
||||
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
|
||||
val newEvilNotary = Party(X500Name("CN=Evil Notary,O=Evil R3,OU=corda,L=London,C=UK"), generateKeyPair().public)
|
||||
val flow = Instigator(state, newEvilNotary)
|
||||
val flow = NotaryChangeFlow(state, newEvilNotary)
|
||||
val future = clientNodeA.services.startFlow(flow)
|
||||
|
||||
net.runNetwork()
|
||||
@ -93,7 +92,7 @@ class NotaryChangeTests {
|
||||
|
||||
val state = StateAndRef(issueTx.outputs.first(), StateRef(issueTx.id, 0))
|
||||
val newNotary = newNotaryNode.info.notaryIdentity
|
||||
val flow = Instigator(state, newNotary)
|
||||
val flow = NotaryChangeFlow(state, newNotary)
|
||||
val future = clientNodeA.services.startFlow(flow)
|
||||
net.runNetwork()
|
||||
val newState = future.resultFuture.getOrThrow()
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest
|
||||
import net.corda.node.services.NotifyTransactionHandler
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.node.MockNetwork
|
||||
|
Reference in New Issue
Block a user