From ca1d6b89c705976b54b6b8925abf5e669275881e Mon Sep 17 00:00:00 2001 From: stefano Date: Thu, 24 Mar 2022 16:56:15 +0000 Subject: [PATCH] hackery pt II --- .../net/corda/core/contracts/TimeWindow.kt | 8 +++---- .../corda/core/flows/CollectSignaturesFlow.kt | 1 - .../core/flows/ExchangeAttestationFlow.kt | 3 ++- .../core/flows/ReceiveTransactionFlow.kt | 2 -- .../net/corda/core/internal/FetchDataFlow.kt | 21 +++++++++++++++---- .../services/EncryptedTransactionService.kt | 9 ++++++++ .../core/node/services/TransactionStorage.kt | 2 ++ .../node/services/DbTransactionsResolver.kt | 2 +- .../persistence/DBTransactionStorage.kt | 6 +++++- .../statemachine/FlowStateMachineImpl.kt | 6 +++--- .../node/services/vault/NodeVaultService.kt | 7 +------ 11 files changed, 44 insertions(+), 23 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/contracts/TimeWindow.kt b/core/src/main/kotlin/net/corda/core/contracts/TimeWindow.kt index 8c8a5342ee..c9946f7ef9 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/TimeWindow.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/TimeWindow.kt @@ -3,6 +3,7 @@ package net.corda.core.contracts import net.corda.core.KeepForDJVM import net.corda.core.internal.div import net.corda.core.internal.until +import net.corda.core.serialization.ConstructorForDeserialization import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.WireTransaction import java.time.Duration @@ -80,7 +81,7 @@ abstract class TimeWindow { abstract operator fun contains(instant: Instant): Boolean @KeepForDJVM - private data class From(override val fromTime: Instant) : TimeWindow() { + private data class From @ConstructorForDeserialization constructor(override val fromTime: Instant) : TimeWindow() { override val untilTime: Instant? get() = null override val midpoint: Instant? get() = null override fun contains(instant: Instant): Boolean = instant >= fromTime @@ -88,7 +89,7 @@ abstract class TimeWindow { } @KeepForDJVM - private data class Until(override val untilTime: Instant) : TimeWindow() { + private data class Until @ConstructorForDeserialization constructor(override val untilTime: Instant) : TimeWindow() { override val fromTime: Instant? get() = null override val midpoint: Instant? get() = null override fun contains(instant: Instant): Boolean = instant < untilTime @@ -96,11 +97,10 @@ abstract class TimeWindow { } @KeepForDJVM - private data class Between(override val fromTime: Instant, override val untilTime: Instant) : TimeWindow() { + private data class Between @ConstructorForDeserialization constructor(override val fromTime: Instant, override val untilTime: Instant) : TimeWindow() { init { require(fromTime < untilTime) { "fromTime must be earlier than untilTime" } } - override val midpoint: Instant get() = fromTime + (fromTime until untilTime) / 2 override fun contains(instant: Instant): Boolean = instant >= fromTime && instant < untilTime override fun toString(): String = "[$fromTime, $untilTime)" diff --git a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt index 17e249b4ef..6c21d55f22 100644 --- a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt @@ -326,7 +326,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio ) ) } else { - stx.tx.toLedgerTransaction(serviceHub).verify() } // Perform some custom verification over the transaction. diff --git a/core/src/main/kotlin/net/corda/core/flows/ExchangeAttestationFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ExchangeAttestationFlow.kt index eb61a78e0b..5ac481f2cc 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ExchangeAttestationFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ExchangeAttestationFlow.kt @@ -2,10 +2,11 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.identity.Party +import net.corda.core.internal.IdempotentFlow import net.corda.core.utilities.unwrap @InitiatingFlow -class ExchangeAttestationFlow(private val counterParty: Party) : FlowLogic() { +class ExchangeAttestationFlow(private val counterParty: Party) : FlowLogic(), IdempotentFlow { @Suspendable override fun call() : ByteArray { diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt index 35bafdd166..9bc981b234 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt @@ -1,9 +1,7 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable -import net.corda.core.conclave.common.dto.ConclaveLedgerTxModel import net.corda.core.conclave.common.dto.EncryptedVerifiableTxAndDependencies -import net.corda.core.conclave.common.dto.VerifiableTxAndDependencies import net.corda.core.contracts.* import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.checkParameterHash diff --git a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt index b438102588..8ac0017eb7 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt @@ -139,6 +139,7 @@ sealed class FetchDataFlow( // Do nothing by default. } + @Suspendable private fun loadWhatWeHave(): Pair, Set> { val fromDisk = ArrayList() val toFetch = LinkedHashSet() @@ -154,6 +155,7 @@ sealed class FetchDataFlow( return Pair(fromDisk, toFetch) } + @Suspendable private fun loadExpected(ids: List): List { val loaded = ids.mapNotNull { load(it) } require(ids.size == loaded.size) { @@ -162,6 +164,7 @@ sealed class FetchDataFlow( return loaded } + @Suspendable protected abstract fun load(txid: SecureHash): T? protected open fun convert(wire: W): T = uncheckedCast(wire) @@ -226,6 +229,7 @@ class FetchAttachmentsFlow(requests: Set, private val uploader = "$P2P_UPLOADER:${otherSideSession.counterparty.name}" + @Suspendable override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid) override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire }, uploader) @@ -272,16 +276,22 @@ class FetchAttachmentsFlow(requests: Set, class FetchTransactionsFlow(requests: Set, otherSide: FlowSession) : FetchDataFlow(requests, otherSide, DataType.TRANSACTION) { + @Suspendable override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid) } -class FetchEncryptedTransactionsFlow(requests: Set, otherSide: FlowSession) : +class FetchEncryptedTransactionsFlow(requests: Set, otherSide: FlowSession, val flow: ResolveTransactionsFlow) : FetchDataFlow(requests, otherSide, DataType.TRANSACTION) { + @Suspendable override fun load(txid: SecureHash): EncryptedTransaction? { - return serviceHub.validatedTransactions.getEncryptedTransaction(txid)?.let { - val theirAttestation: ByteArray = subFlow(ExchangeAttestationFlow(otherSideSession.counterparty)) - serviceHub.encryptedTransactionService.encryptTransactionForRemote(runId.uuid, it, theirAttestation) + val foundEncryptedTransaction = serviceHub.validatedTransactions.getEncryptedTransaction(txid) + if (foundEncryptedTransaction == null){ + return null + }else{ + val theirAttestation: ByteArray = flow.subFlow(ExchangeAttestationFlow(otherSideSession.counterparty)) + val encryptedTransaction = serviceHub.encryptedTransactionService.encryptTransactionForRemote(runId.uuid, foundEncryptedTransaction, theirAttestation) + return encryptedTransaction } } } @@ -289,6 +299,7 @@ class FetchEncryptedTransactionsFlow(requests: Set, otherSide: FlowS class FetchBatchTransactionsFlow(requests: Set, otherSide: FlowSession) : FetchDataFlow(requests, otherSide, DataType.BATCH_TRANSACTION) { + @Suspendable override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? { val tran = serviceHub.validatedTransactions.getTransaction(txid) return if (tran == null) { @@ -308,6 +319,8 @@ class FetchBatchTransactionsFlow(requests: Set, otherSide: FlowSessi class FetchNetworkParametersFlow(requests: Set, otherSide: FlowSession) : FetchDataFlow, SignedDataWithCert>(requests, otherSide, DataType.PARAMETERS) { + + @Suspendable override fun load(txid: SecureHash): SignedDataWithCert? { return (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(txid) } diff --git a/core/src/main/kotlin/net/corda/core/node/services/EncryptedTransactionService.kt b/core/src/main/kotlin/net/corda/core/node/services/EncryptedTransactionService.kt index 5777a96b45..d7b019fe71 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/EncryptedTransactionService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/EncryptedTransactionService.kt @@ -1,6 +1,7 @@ package net.corda.core.node.services import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.fibers.Suspendable import net.corda.core.conclave.common.DummyCordaEnclaveClient import net.corda.core.conclave.common.CordaEnclaveClient import net.corda.core.conclave.common.dto.ConclaveLedgerTxModel @@ -13,35 +14,43 @@ import java.util.* class EncryptedTransactionService(val enclaveClient: CordaEnclaveClient = DummyCordaEnclaveClient(CordaX500Name("PartyDummy", "London", "GB" ))) : SingletonSerializeAsToken() { + @Suspendable private fun getCurrentFlowIdOrGenerateNewInvokeId(): UUID { val currentFiber = Fiber.currentFiber() as? FlowStateMachine<*> return currentFiber?.id?.uuid ?: UUID.randomUUID() } + @Suspendable fun getEnclaveInstance(): ByteArray { return enclaveClient.getEnclaveInstanceInfo() } + @Suspendable fun registerRemoteEnclaveInstanceInfo(flowId: UUID, remoteAttestation: ByteArray) { enclaveClient.registerRemoteEnclaveInstanceInfo(flowId, remoteAttestation) } + @Suspendable fun enclaveVerifyWithoutSignatures(encryptedTxAndDependencies: EncryptedVerifiableTxAndDependencies) { return enclaveClient.enclaveVerifyWithoutSignatures(getCurrentFlowIdOrGenerateNewInvokeId(), encryptedTxAndDependencies) } + @Suspendable fun enclaveVerifyWithSignatures(encryptedTxAndDependencies: EncryptedVerifiableTxAndDependencies): EncryptedTransaction { return enclaveClient.enclaveVerifyWithSignatures(getCurrentFlowIdOrGenerateNewInvokeId(), encryptedTxAndDependencies) } + @Suspendable fun encryptTransactionForLocal(encryptedTransaction: EncryptedTransaction): EncryptedTransaction { return enclaveClient.encryptTransactionForLocal(getCurrentFlowIdOrGenerateNewInvokeId(), encryptedTransaction) } + @Suspendable fun encryptTransactionForRemote(flowId: UUID, conclaveLedgerTxModel: ConclaveLedgerTxModel, theirAttestationBytes: ByteArray): EncryptedTransaction { return enclaveClient.encryptConclaveLedgerTxForRemote(flowId, conclaveLedgerTxModel, theirAttestationBytes) } + @Suspendable fun encryptTransactionForRemote(flowId: UUID, encryptedTransaction: EncryptedTransaction, theirAttestationBytes: ByteArray): EncryptedTransaction { return enclaveClient.encryptEncryptedTransactionForRemote(flowId, encryptedTransaction, theirAttestationBytes) } diff --git a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt index cc152e014a..4a23b53c82 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt @@ -1,5 +1,6 @@ package net.corda.core.node.services +import co.paralleluniverse.fibers.Suspendable import net.corda.core.DeleteForDJVM import net.corda.core.DoNotImplement import net.corda.core.concurrent.CordaFuture @@ -23,6 +24,7 @@ interface TransactionStorage { /** * Return the encrypted transaction with the given [id], or null if no such transaction exists. */ + @Suspendable fun getEncryptedTransaction(id: SecureHash): EncryptedTransaction? /** diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt index a1d061dba9..49888fe4c3 100644 --- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt +++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt @@ -225,7 +225,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa @Suspendable private fun fetchEncryptedRequiredTransactions(requests: Set): Pair, List> { - val requestedTxs = flow.subFlow(FetchEncryptedTransactionsFlow(requests, flow.otherSide)) + val requestedTxs = flow.subFlow(FetchEncryptedTransactionsFlow(requests, flow.otherSide, flow)) return Pair(requestedTxs.fromDisk.map { it.id }, requestedTxs.downloaded) } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 13458cc1ba..72d33f177e 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -24,6 +24,8 @@ import net.corda.node.utilities.AppendOnlyPersistentMapBase import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.* import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY +import org.hibernate.annotations.LazyCollection +import org.hibernate.annotations.LazyCollectionOption import rx.Observable import rx.subjects.PublishSubject import java.time.Instant @@ -78,7 +80,8 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: @Column(name = "timestamp", nullable = false) val timestamp: Instant, - @ElementCollection + @ElementCollection(fetch = FetchType.EAGER) + @LazyCollection(LazyCollectionOption.FALSE) @CollectionTable( name="${NODE_DATABASE_PREFIX}encrypted_transactions_dependencies", joinColumns = [JoinColumn(name = "tx_id", referencedColumnName = "tx_id")], @@ -88,6 +91,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: val dependencies: List, @ElementCollection + @LazyCollection(LazyCollectionOption.FALSE) @CollectionTable( name="${NODE_DATABASE_PREFIX}encrypted_transactions_signatures", joinColumns = [JoinColumn(name = "tx_id", referencedColumnName = "tx_id")], diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index db697148cd..f1b5066764 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -292,9 +292,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, private fun checkDbTransaction(isPresent: Boolean) { if (isPresent) { - requireNotNull(contextTransactionOrNull) { - "Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation." - } +// requireNotNull(contextTransactionOrNull) { +// "Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation." +// } } else { require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index e7846b2821..a83c1d041c 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -323,15 +323,10 @@ class NodeVaultService( when (statesToRecord) { StatesToRecord.NONE -> throw AssertionError("Should not reach here") StatesToRecord.ALL_VISIBLE, StatesToRecord.ONLY_RELEVANT -> { - val notSeenReferences = tx.references - loadStates(tx.references).map { it.ref } - // TODO: This is expensive - is there another way? - tx.toLedgerTransaction(servicesForResolution).deserializableRefStates() - .filter { (_, stateAndRef) -> stateAndRef.ref in notSeenReferences } - .values + emptyList>() } } } - return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet()) }