mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Replace data vending service with SendTransactionFlow (#964)
* WIP - Removed data Vending services, fixed all flow test * * separated out extra data, extra data are sent after the SendTransactionFlow if required * New SendProposalFlow for sending TradeProposal, which contains StateAndRef. * WIP * * removed TradeProposal interface. * changed SendProposalFlow to SendStateAndRefFlow, same for receive side. * fixup after rebase. * * undo changes in .idea folder * * remove unintended changes * * Addressed PR issues * * doc changes * * addressed pr issues * moved ResolveTransactionsFlow to internal * changed FlowLogic<Unit> to FlowLogic<Void?> for java use case * * addressed PR issues * renamed DataVendingFlow in TestUtill to TestDataVendingFlow to avoid name confusion, and moved it to core/test * * removed reference to ResolveTransactionsFlow
This commit is contained in:
@ -34,7 +34,9 @@ class LargeTransactionsTest {
|
||||
.addAttachment(hash4)
|
||||
val stx = serviceHub.signInitialTransaction(tx, serviceHub.legalIdentityKey)
|
||||
// Send to the other side and wait for it to trigger resolution from us.
|
||||
sendAndReceive<Unit>(serviceHub.networkMapCache.getNodeByLegalName(BOB.name)!!.legalIdentity, stx)
|
||||
val bob = serviceHub.networkMapCache.getNodeByLegalName(BOB.name)!!.legalIdentity
|
||||
subFlow(SendTransactionFlow(bob, stx))
|
||||
receive<Unit>(bob)
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,8 +44,7 @@ class LargeTransactionsTest {
|
||||
class ReceiveLargeTransactionFlow(private val counterParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val stx = receive<SignedTransaction>(counterParty).unwrap { it }
|
||||
subFlow(ResolveTransactionsFlow(stx, counterParty))
|
||||
subFlow(ReceiveTransactionFlow(counterParty))
|
||||
// Unblock the other side by sending some dummy object (Unit is fine here as it's a singleton).
|
||||
send(counterParty, Unit)
|
||||
}
|
||||
@ -53,10 +54,10 @@ class LargeTransactionsTest {
|
||||
fun checkCanSendLargeTransactions() {
|
||||
// These 4 attachments yield a transaction that's got >10mb attached, so it'd push us over the Artemis
|
||||
// max message size.
|
||||
val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 0)
|
||||
val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 1)
|
||||
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 2)
|
||||
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 3)
|
||||
val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 0)
|
||||
val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 1)
|
||||
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 2)
|
||||
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 3)
|
||||
driver(startNodesInProcess = true) {
|
||||
val (alice, _, _) = aliceBobAndNotary()
|
||||
alice.useRPC {
|
||||
|
@ -401,8 +401,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun installCoreFlows() {
|
||||
installCoreFlow(FetchTransactionsFlow::class) { otherParty, _ -> FetchTransactionsHandler(otherParty) }
|
||||
installCoreFlow(FetchAttachmentsFlow::class) { otherParty, _ -> FetchAttachmentsHandler(otherParty) }
|
||||
installCoreFlow(BroadcastTransactionFlow::class) { otherParty, _ -> NotifyTransactionHandler(otherParty) }
|
||||
installCoreFlow(NotaryChangeFlow::class) { otherParty, _ -> NotaryChangeHandler(otherParty) }
|
||||
installCoreFlow(ContractUpgradeFlow::class) { otherParty, _ -> ContractUpgradeHandler(otherParty) }
|
||||
|
@ -5,7 +5,6 @@ import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.UpgradeCommand
|
||||
import net.corda.core.contracts.UpgradedContract
|
||||
import net.corda.core.contracts.requireThat
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.AnonymousPartyAndPath
|
||||
import net.corda.core.identity.Party
|
||||
@ -13,50 +12,6 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
/**
|
||||
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
|
||||
* glue that sits between the network layer and the database layer.
|
||||
*
|
||||
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
|
||||
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
|
||||
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
|
||||
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
|
||||
* such the hash of a piece of data can be seen as a type of password allowing access to it.
|
||||
*
|
||||
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
|
||||
*/
|
||||
class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
|
||||
override fun getData(id: SecureHash): SignedTransaction? {
|
||||
return serviceHub.validatedTransactions.getTransaction(id)
|
||||
}
|
||||
}
|
||||
|
||||
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
|
||||
override fun getData(id: SecureHash): ByteArray? {
|
||||
return serviceHub.attachments.openAttachment(id)?.open()?.readBytes()
|
||||
}
|
||||
}
|
||||
|
||||
abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
@Throws(FetchDataFlow.HashNotFound::class)
|
||||
override fun call() {
|
||||
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
|
||||
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
|
||||
it
|
||||
}
|
||||
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
|
||||
// See the discussion in FetchDataFlow. We send each item individually here in a separate asynchronous send
|
||||
// call, and the other side picks them up with a straight receive call, because we batching would push us over
|
||||
// the (current) Artemis message size limit.
|
||||
request.hashes.forEach {
|
||||
send(otherParty, getData(it) ?: throw FetchDataFlow.HashNotFound(it))
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract fun getData(id: SecureHash): T?
|
||||
}
|
||||
|
||||
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
|
||||
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
|
||||
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
|
||||
@ -64,10 +19,8 @@ abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>(
|
||||
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty))
|
||||
request.tx.verify(serviceHub)
|
||||
serviceHub.recordTransactions(request.tx)
|
||||
val stx = subFlow(ReceiveTransactionFlow(otherParty))
|
||||
serviceHub.recordTransactions(stx)
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,9 +32,9 @@ class NotaryChangeHandler(otherSide: Party) : AbstractStateReplacementFlow.Accep
|
||||
* and is also in a geographically convenient location we can just automatically approve the change.
|
||||
* TODO: In more difficult cases this should call for human attention to manually verify and approve the proposal
|
||||
*/
|
||||
override fun verifyProposal(proposal: AbstractStateReplacementFlow.Proposal<Party>): Unit {
|
||||
override fun verifyProposal(stx: SignedTransaction, proposal: AbstractStateReplacementFlow.Proposal<Party>): Unit {
|
||||
val state = proposal.stateRef
|
||||
val proposedTx = proposal.stx.resolveNotaryChangeTransaction(serviceHub)
|
||||
val proposedTx = stx.resolveNotaryChangeTransaction(serviceHub)
|
||||
val newNotary = proposal.modification
|
||||
|
||||
if (state !in proposedTx.inputs.map { it.ref }) {
|
||||
@ -99,15 +52,15 @@ class NotaryChangeHandler(otherSide: Party) : AbstractStateReplacementFlow.Accep
|
||||
class ContractUpgradeHandler(otherSide: Party) : AbstractStateReplacementFlow.Acceptor<Class<out UpgradedContract<ContractState, *>>>(otherSide) {
|
||||
@Suspendable
|
||||
@Throws(StateReplacementException::class)
|
||||
override fun verifyProposal(proposal: AbstractStateReplacementFlow.Proposal<Class<out UpgradedContract<ContractState, *>>>) {
|
||||
override fun verifyProposal(stx: SignedTransaction, proposal: AbstractStateReplacementFlow.Proposal<Class<out UpgradedContract<ContractState, *>>>) {
|
||||
// Retrieve signed transaction from our side, we will apply the upgrade logic to the transaction on our side, and
|
||||
// verify outputs matches the proposed upgrade.
|
||||
val stx = subFlow(FetchTransactionsFlow(setOf(proposal.stateRef.txhash), otherSide)).fromDisk.singleOrNull()
|
||||
requireNotNull(stx) { "We don't have a copy of the referenced state" }
|
||||
val oldStateAndRef = stx!!.tx.outRef<ContractState>(proposal.stateRef.index)
|
||||
val ourSTX = serviceHub.validatedTransactions.getTransaction(proposal.stateRef.txhash)
|
||||
requireNotNull(ourSTX) { "We don't have a copy of the referenced state" }
|
||||
val oldStateAndRef = ourSTX!!.tx.outRef<ContractState>(proposal.stateRef.index)
|
||||
val authorisedUpgrade = serviceHub.vaultService.getAuthorisedContractUpgrade(oldStateAndRef.ref) ?:
|
||||
throw IllegalStateException("Contract state upgrade is unauthorised. State hash : ${oldStateAndRef.ref}")
|
||||
val proposedTx = proposal.stx.tx
|
||||
val proposedTx = stx.tx
|
||||
val expectedTx = ContractUpgradeFlow.assembleBareTx(oldStateAndRef, proposal.modification, proposedTx.privacySalt).toWireTransaction()
|
||||
requireThat {
|
||||
"The instigator is one of the participants" using (otherSide in oldStateAndRef.state.data.participants)
|
||||
|
@ -6,8 +6,6 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.security.SignatureException
|
||||
|
||||
/**
|
||||
@ -24,26 +22,11 @@ class ValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryServ
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
val stx = receive<SignedTransaction>(otherSide).unwrap { it }
|
||||
checkSignatures(stx)
|
||||
validateTransaction(stx)
|
||||
val wtx = stx.tx
|
||||
return TransactionParts(wtx.id, wtx.inputs, wtx.timeWindow)
|
||||
}
|
||||
|
||||
private fun checkSignatures(stx: SignedTransaction) {
|
||||
try {
|
||||
stx.verifySignaturesExcept(serviceHub.myInfo.notaryIdentity.owningKey)
|
||||
} catch(e: SignatureException) {
|
||||
throw NotaryException(NotaryError.TransactionInvalid(e))
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun validateTransaction(stx: SignedTransaction) {
|
||||
try {
|
||||
resolveTransaction(stx)
|
||||
stx.verify(serviceHub, false)
|
||||
val stx = subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = false))
|
||||
checkSignatures(stx)
|
||||
val wtx = stx.tx
|
||||
return TransactionParts(wtx.id, wtx.inputs, wtx.timeWindow)
|
||||
} catch (e: Exception) {
|
||||
throw when (e) {
|
||||
is TransactionVerificationException,
|
||||
@ -53,6 +36,11 @@ class ValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryServ
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun resolveTransaction(stx: SignedTransaction) = subFlow(ResolveTransactionsFlow(stx, otherSide))
|
||||
private fun checkSignatures(stx: SignedTransaction) {
|
||||
try {
|
||||
stx.verifySignaturesExcept(serviceHub.myInfo.notaryIdentity.owningKey)
|
||||
} catch(e: SignatureException) {
|
||||
throw NotaryException(NotaryError.TransactionInvalid(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,145 +0,0 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.flows.FetchAttachmentsFlow
|
||||
import net.corda.core.flows.FetchDataFlow
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.math.BigInteger
|
||||
import java.security.KeyPair
|
||||
import java.util.jar.JarOutputStream
|
||||
import java.util.zip.ZipEntry
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class AttachmentTests {
|
||||
lateinit var mockNet: MockNetwork
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
mockNet = MockNetwork()
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
configuration = RequeryConfiguration(dataSourceProperties, databaseProperties = makeTestDatabaseProperties())
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
fun fakeAttachment(): ByteArray {
|
||||
val bs = ByteArrayOutputStream()
|
||||
val js = JarOutputStream(bs)
|
||||
js.putNextEntry(ZipEntry("file1.txt"))
|
||||
js.writer().apply { append("Some useful content"); flush() }
|
||||
js.closeEntry()
|
||||
js.close()
|
||||
return bs.toByteArray()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `download and store`() {
|
||||
val nodes = mockNet.createSomeNodes(2)
|
||||
val n0 = nodes.partyNodes[0]
|
||||
val n1 = nodes.partyNodes[1]
|
||||
|
||||
// Insert an attachment into node zero's store directly.
|
||||
val id = n0.database.transaction {
|
||||
n0.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
|
||||
}
|
||||
|
||||
// Get node one to run a flow to fetch it and insert it.
|
||||
mockNet.runNetwork()
|
||||
val f1 = n1.services.startFlow(FetchAttachmentsFlow(setOf(id), n0.info.legalIdentity))
|
||||
mockNet.runNetwork()
|
||||
assertEquals(0, f1.resultFuture.getOrThrow().fromDisk.size)
|
||||
|
||||
// Verify it was inserted into node one's store.
|
||||
val attachment = n1.database.transaction {
|
||||
n1.attachments.openAttachment(id)!!
|
||||
}
|
||||
|
||||
assertEquals(id, attachment.open().readBytes().sha256())
|
||||
|
||||
// Shut down node zero and ensure node one can still resolve the attachment.
|
||||
n0.stop()
|
||||
|
||||
val response: FetchDataFlow.Result<Attachment> = n1.services.startFlow(FetchAttachmentsFlow(setOf(id), n0.info.legalIdentity)).resultFuture.getOrThrow()
|
||||
assertEquals(attachment, response.fromDisk[0])
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `missing`() {
|
||||
val nodes = mockNet.createSomeNodes(2)
|
||||
val n0 = nodes.partyNodes[0]
|
||||
val n1 = nodes.partyNodes[1]
|
||||
|
||||
// Get node one to fetch a non-existent attachment.
|
||||
val hash = SecureHash.randomSHA256()
|
||||
mockNet.runNetwork()
|
||||
val f1 = n1.services.startFlow(FetchAttachmentsFlow(setOf(hash), n0.info.legalIdentity))
|
||||
mockNet.runNetwork()
|
||||
val e = assertFailsWith<FetchDataFlow.HashNotFound> { f1.resultFuture.getOrThrow() }
|
||||
assertEquals(hash, e.requested)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `malicious response`() {
|
||||
// Make a node that doesn't do sanity checking at load time.
|
||||
val n0 = mockNet.createNode(nodeFactory = object : MockNetwork.Factory<MockNetwork.MockNode> {
|
||||
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
|
||||
advertisedServices: Set<ServiceInfo>, id: Int,
|
||||
overrideServices: Map<ServiceInfo, KeyPair>?,
|
||||
entropyRoot: BigInteger): MockNetwork.MockNode {
|
||||
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
|
||||
override fun start() {
|
||||
super.start()
|
||||
attachments.checkAttachmentsOnLoad = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}, advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)))
|
||||
val n1 = mockNet.createNode(n0.network.myAddress)
|
||||
|
||||
val attachment = fakeAttachment()
|
||||
// Insert an attachment into node zero's store directly.
|
||||
val id = n0.database.transaction {
|
||||
n0.attachments.importAttachment(ByteArrayInputStream(attachment))
|
||||
}
|
||||
|
||||
// Corrupt its store.
|
||||
val corruptBytes = "arggghhhh".toByteArray()
|
||||
System.arraycopy(corruptBytes, 0, attachment, 0, corruptBytes.size)
|
||||
|
||||
val corruptAttachment = AttachmentEntity()
|
||||
corruptAttachment.attId = id
|
||||
corruptAttachment.content = attachment
|
||||
n0.database.transaction {
|
||||
n0.attachments.session.update(corruptAttachment)
|
||||
}
|
||||
|
||||
|
||||
// Get n1 to fetch the attachment. Should receive corrupted bytes.
|
||||
mockNet.runNetwork()
|
||||
val f1 = n1.services.startFlow(FetchAttachmentsFlow(setOf(id), n0.info.legalIdentity))
|
||||
mockNet.runNetwork()
|
||||
assertFailsWith<FetchDataFlow.DownloadedVsRequestedDataMismatch> { f1.resultFuture.getOrThrow() }
|
||||
}
|
||||
}
|
@ -5,10 +5,10 @@ import net.corda.contracts.asset.Cash
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.Issued
|
||||
import net.corda.core.contracts.USD
|
||||
import net.corda.core.flows.BroadcastTransactionFlow.NotifyTxRequest
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.SendTransactionFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -102,9 +102,9 @@ class DataVendingServiceTests {
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Void?>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, NotifyTxRequest(stx))
|
||||
override fun call() = subFlow(SendTransactionFlow(otherParty, stx))
|
||||
}
|
||||
|
||||
@InitiatedBy(NotifyTxFlow::class)
|
||||
|
Reference in New Issue
Block a user