mirror of
https://github.com/corda/corda.git
synced 2025-06-17 22:58:19 +00:00
CORDA-961 Wire up and enforce max transaction size (#2465)
* wire up and enforce max transaction size * fixup after rebase moved network parameter from AbstractNode to NodeProperties * removed TODO * fix broken import * address PR issues * remove API breaking change address PR issue * added max transaction size to driver and mock network. address PR issues * fix failing test * added TODO * fix verifier test * fix spring driver build error
This commit is contained in:
@ -13,13 +13,19 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.GlobalProperties
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
@ -118,7 +124,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// low-performance prototyping period.
|
||||
protected abstract val serverThread: AffinityExecutor
|
||||
|
||||
protected lateinit var networkParameters: NetworkParameters
|
||||
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
|
||||
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
|
||||
|
||||
@ -166,7 +171,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
initCertificate()
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
|
||||
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
|
||||
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
|
||||
// a code smell.
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
@ -189,13 +194,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
val identityService = makeIdentityService(identity.certificate)
|
||||
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
|
||||
networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
GlobalProperties.networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
|
||||
check(GlobalProperties.networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, GlobalProperties.networkParameters.notaries).start(), identityService)
|
||||
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
|
||||
identityService.loadIdentities(info.legalIdentitiesAndCerts)
|
||||
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
|
||||
@ -234,7 +239,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient,
|
||||
networkParameters.serialize().hash,
|
||||
GlobalProperties.networkParameters.serialize().hash,
|
||||
configuration.baseDirectory)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
@ -520,7 +525,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
* Builds node internal, advertised, and plugin services.
|
||||
* Returns a list of tokenizable services to be added to the serialisation context.
|
||||
*/
|
||||
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal): MutableList<Any> {
|
||||
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal): MutableList<Any> {
|
||||
checkpointStorage = DBCheckpointStorage()
|
||||
val metrics = MetricRegistry()
|
||||
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
|
||||
|
@ -2,6 +2,7 @@ package net.corda.node.internal
|
||||
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.internal.GlobalProperties.networkParameters
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.internal.div
|
||||
|
@ -0,0 +1,135 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.InputStreamAndHash
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.api.StartedNodeServices
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNodeParameters
|
||||
import net.corda.testing.node.startFlow
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class MaxTransactionSizeTests {
|
||||
private lateinit var mockNet: MockNetwork
|
||||
private lateinit var notaryServices: StartedNodeServices
|
||||
private lateinit var aliceServices: StartedNodeServices
|
||||
private lateinit var notary: Party
|
||||
private lateinit var alice: Party
|
||||
private lateinit var bob: Party
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
mockNet = MockNetwork(listOf("net.corda.testing.contracts", "net.corda.node.services.transactions"), maxTransactionSize = 3_000_000)
|
||||
val aliceNode = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME))
|
||||
val bobNode = mockNet.createNode(MockNodeParameters(legalName = BOB_NAME))
|
||||
notaryServices = mockNet.defaultNotaryNode.services
|
||||
aliceServices = aliceNode.services
|
||||
notary = mockNet.defaultNotaryIdentity
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
bob = bobNode.info.singleIdentity()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check transaction will fail when exceed max transaction size limit`() {
|
||||
// These 4 attachments yield a transaction that's got ~ 4mb, which will exceed the 3mb max transaction size limit
|
||||
val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 0)
|
||||
val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 1)
|
||||
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 2)
|
||||
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 3)
|
||||
val flow = aliceServices.database.transaction {
|
||||
val hash1 = aliceServices.attachments.importAttachment(bigFile1.inputStream)
|
||||
val hash2 = aliceServices.attachments.importAttachment(bigFile2.inputStream)
|
||||
val hash3 = aliceServices.attachments.importAttachment(bigFile3.inputStream)
|
||||
val hash4 = aliceServices.attachments.importAttachment(bigFile4.inputStream)
|
||||
assertEquals(hash1, bigFile1.sha256)
|
||||
SendLargeTransactionFlow(notary, bob, hash1, hash2, hash3, hash4)
|
||||
}
|
||||
val exception = assertFailsWith<IllegalArgumentException> {
|
||||
val future = aliceServices.startFlow(flow)
|
||||
mockNet.runNetwork()
|
||||
future.getOrThrow()
|
||||
}
|
||||
assertThat(exception).hasMessageContaining("Transaction exceeded network's maximum transaction size limit")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check transaction will be rejected by counterparty when exceed max transaction size limit`() {
|
||||
// These 4 attachments yield a transaction that's got ~ 4mb, which will exceed the 3mb max transaction size limit
|
||||
val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 0)
|
||||
val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 1)
|
||||
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 2)
|
||||
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024, 3)
|
||||
val flow = aliceServices.database.transaction {
|
||||
val hash1 = aliceServices.attachments.importAttachment(bigFile1.inputStream)
|
||||
val hash2 = aliceServices.attachments.importAttachment(bigFile2.inputStream)
|
||||
val hash3 = aliceServices.attachments.importAttachment(bigFile3.inputStream)
|
||||
val hash4 = aliceServices.attachments.importAttachment(bigFile4.inputStream)
|
||||
assertEquals(hash1, bigFile1.sha256)
|
||||
SendLargeTransactionFlow(notary, bob, hash1, hash2, hash3, hash4, verify = false)
|
||||
}
|
||||
val ex = assertFailsWith<UnexpectedFlowEndException> {
|
||||
val future = aliceServices.startFlow(flow)
|
||||
mockNet.runNetwork()
|
||||
future.getOrThrow()
|
||||
}
|
||||
assertThat(ex).hasMessageContaining("Counterparty flow on O=Bob Plc, L=Rome, C=IT had an internal error and has terminated")
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class SendLargeTransactionFlow(private val notary: Party,
|
||||
private val otherSide: Party,
|
||||
private val hash1: SecureHash,
|
||||
private val hash2: SecureHash,
|
||||
private val hash3: SecureHash,
|
||||
private val hash4: SecureHash,
|
||||
private val verify: Boolean = true) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val tx = TransactionBuilder(notary = notary)
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
.addAttachment(hash1)
|
||||
.addAttachment(hash2)
|
||||
.addAttachment(hash3)
|
||||
.addAttachment(hash4)
|
||||
val stx = serviceHub.signInitialTransaction(tx, ourIdentity.owningKey)
|
||||
if (verify) stx.verify(serviceHub, checkSufficientSignatures = false)
|
||||
// Send to the other side and wait for it to trigger resolution from us.
|
||||
val otherSideSession = initiateFlow(otherSide)
|
||||
subFlow(SendTransactionFlow(otherSideSession, stx))
|
||||
otherSideSession.receive<Unit>()
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(SendLargeTransactionFlow::class)
|
||||
@Suppress("UNUSED")
|
||||
class ReceiveLargeTransactionFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(ReceiveTransactionFlow(otherSide))
|
||||
// Unblock the other side by sending some dummy object (Unit is fine here as it's a singleton).
|
||||
otherSide.send(Unit)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user