ENT-10100: fix batch support during recovery of in flight transactions (#7549)

This commit is contained in:
Rick Parker 2023-10-27 13:26:07 +01:00 committed by GitHub
parent e52f086d11
commit c626d3a435
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 21 deletions

View File

@ -4,11 +4,18 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.NamedByHash
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.internal.PlatformVersionSwitches
import net.corda.core.internal.RetrieveAnyTransactionPayload
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.readFully
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.DeprecatedConstructorForDeserialization
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -16,7 +23,6 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap
import kotlin.collections.toSet
import net.corda.core.flows.DistributionList.SenderDistributionList
/**
* In the words of Matt working code is more important then pretty code. This class that contains code that may
@ -27,7 +33,12 @@ import net.corda.core.flows.DistributionList.SenderDistributionList
*/
@CordaSerializable
class MaybeSerializedSignedTransaction(override val id: SecureHash, val serialized: SerializedBytes<SignedTransaction>?,
val nonSerialised: SignedTransaction?) : NamedByHash {
val nonSerialised: SignedTransaction?,
val inFlight: Boolean) : NamedByHash {
@DeprecatedConstructorForDeserialization(version = 1)
constructor(id: SecureHash, serialized: SerializedBytes<SignedTransaction>?, nonSerialised: SignedTransaction?) : this(id, serialized, nonSerialised, false)
init {
check(serialized == null || nonSerialised == null) {
"MaybeSerializedSignedTransaction: Serialized and non-serialized may not both be non-null."
@ -227,7 +238,7 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
numSent++
tx
}
FetchDataFlow.DataType.TRANSACTION_RECOVERY -> NotImplementedError("Enterprise only feature")
FetchDataFlow.DataType.TRANSACTION_RECOVERY -> throw NotImplementedError("Enterprise only feature")
// Loop on all items returned using dataRequest.hashes.map:
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
if (!authorisedTransactions.isAuthorised(txId)) {

View File

@ -12,7 +12,6 @@ import net.corda.core.flows.MaybeSerializedSignedTransaction
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
import net.corda.core.internal.FetchDataFlow.HashNotFound
import net.corda.core.node.NetworkParameters
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.core.node.services.TransactionStatus
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
@ -24,10 +23,11 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap
import java.nio.file.FileAlreadyExistsException
import java.util.*
import java.util.ArrayList
import java.util.LinkedHashSet
/**
* An abstract flow for fetching typed data from a remote peer.
@ -275,24 +275,17 @@ class FetchTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>,
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
}
// Used by Enterprise Ledger Recovery
class FetchRecoverableTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION_RECOVERY) :
FetchDataFlow<SignedTransactionWithStatus, SignedTransactionWithStatus>(requests, otherSide, dataType) {
override fun load(txid: SecureHash): SignedTransactionWithStatus? = serviceHub.validatedTransactions.getTransactionWithStatus(txid)?.let {
if (it.status != TransactionStatus.UNVERIFIED) it else null
}
}
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) {
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession, private val recoveryMode: Boolean = false) :
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide,
if (recoveryMode) DataType.TRANSACTION_RECOVERY else DataType.BATCH_TRANSACTION) {
override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? {
val tran = serviceHub.validatedTransactions.getTransaction(txid)
return if (tran == null) {
val tranAndStatus = serviceHub.validatedTransactions.getTransactionWithStatus(txid)
@Suppress("ComplexCondition")
return if (tranAndStatus == null || tranAndStatus.status == TransactionStatus.UNVERIFIED || (!recoveryMode && tranAndStatus.status == TransactionStatus.IN_FLIGHT)) {
null
} else {
MaybeSerializedSignedTransaction(txid, null, tran)
MaybeSerializedSignedTransaction(txid, null, tranAndStatus.stx, tranAndStatus.status == TransactionStatus.IN_FLIGHT)
}
}
}

View File

@ -4,6 +4,7 @@ import net.corda.core.crypto.Crypto
import net.corda.core.crypto.Crypto.generateKeyPair
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.sign
import net.corda.core.flows.MaybeSerializedSignedTransaction
import net.corda.core.flows.NotarisationRequest
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
@ -21,6 +22,7 @@ import net.corda.serialization.internal.amqp.testutils.serialize
import net.corda.serialization.internal.amqp.testutils.serializeAndReturnSchema
import net.corda.serialization.internal.amqp.testutils.testDefaultFactory
import net.corda.serialization.internal.amqp.testutils.testName
import org.assertj.core.api.Assertions.assertThat
import org.junit.Ignore
import org.junit.Test
import org.junit.jupiter.api.Assertions.assertNotSame
@ -921,4 +923,19 @@ class EvolvabilityTests {
assertNotSame(deserialized2.statesToConsume[0].txhash, deserialized2.statesToConsume[1].txhash)
assertNotSame(deserialized2.statesToConsume[2].txhash, deserialized2.statesToConsume[3].txhash)
}
@Test(timeout = 300_000)
fun maybeSerializedTransaction() {
val sf = testDefaultFactory()
val resource = "EvolvabilityTests.maybeSerializedTransaction"
//val A = MaybeSerializedSignedTransaction(SecureHash.randomSHA256(), null, null)
//File(URI("$localPath/$resource")).writeBytes(SerializationOutput(sf).serialize(A).bytes)
val url = EvolvabilityTests::class.java.getResource(resource)
val sc2 = url.readBytes()
val deserializedA = DeserializationInput(sf).deserialize(SerializedBytes<MaybeSerializedSignedTransaction>(sc2))
assertThat(deserializedA).isInstanceOf(MaybeSerializedSignedTransaction::class.java)
}
}