mirror of
https://github.com/corda/corda.git
synced 2025-01-20 19:49:25 +00:00
hackery pt II
This commit is contained in:
parent
2a165205c6
commit
ca1d6b89c7
@ -3,6 +3,7 @@ package net.corda.core.contracts
|
||||
import net.corda.core.KeepForDJVM
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.until
|
||||
import net.corda.core.serialization.ConstructorForDeserialization
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import java.time.Duration
|
||||
@ -80,7 +81,7 @@ abstract class TimeWindow {
|
||||
abstract operator fun contains(instant: Instant): Boolean
|
||||
|
||||
@KeepForDJVM
|
||||
private data class From(override val fromTime: Instant) : TimeWindow() {
|
||||
private data class From @ConstructorForDeserialization constructor(override val fromTime: Instant) : TimeWindow() {
|
||||
override val untilTime: Instant? get() = null
|
||||
override val midpoint: Instant? get() = null
|
||||
override fun contains(instant: Instant): Boolean = instant >= fromTime
|
||||
@ -88,7 +89,7 @@ abstract class TimeWindow {
|
||||
}
|
||||
|
||||
@KeepForDJVM
|
||||
private data class Until(override val untilTime: Instant) : TimeWindow() {
|
||||
private data class Until @ConstructorForDeserialization constructor(override val untilTime: Instant) : TimeWindow() {
|
||||
override val fromTime: Instant? get() = null
|
||||
override val midpoint: Instant? get() = null
|
||||
override fun contains(instant: Instant): Boolean = instant < untilTime
|
||||
@ -96,11 +97,10 @@ abstract class TimeWindow {
|
||||
}
|
||||
|
||||
@KeepForDJVM
|
||||
private data class Between(override val fromTime: Instant, override val untilTime: Instant) : TimeWindow() {
|
||||
private data class Between @ConstructorForDeserialization constructor(override val fromTime: Instant, override val untilTime: Instant) : TimeWindow() {
|
||||
init {
|
||||
require(fromTime < untilTime) { "fromTime must be earlier than untilTime" }
|
||||
}
|
||||
|
||||
override val midpoint: Instant get() = fromTime + (fromTime until untilTime) / 2
|
||||
override fun contains(instant: Instant): Boolean = instant >= fromTime && instant < untilTime
|
||||
override fun toString(): String = "[$fromTime, $untilTime)"
|
||||
|
@ -326,7 +326,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio
|
||||
)
|
||||
)
|
||||
} else {
|
||||
|
||||
stx.tx.toLedgerTransaction(serviceHub).verify()
|
||||
}
|
||||
// Perform some custom verification over the transaction.
|
||||
|
@ -2,10 +2,11 @@ package net.corda.core.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.IdempotentFlow
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
@InitiatingFlow
|
||||
class ExchangeAttestationFlow(private val counterParty: Party) : FlowLogic<ByteArray>() {
|
||||
class ExchangeAttestationFlow(private val counterParty: Party) : FlowLogic<ByteArray>(), IdempotentFlow {
|
||||
|
||||
@Suspendable
|
||||
override fun call() : ByteArray {
|
||||
|
@ -1,9 +1,7 @@
|
||||
package net.corda.core.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.conclave.common.dto.ConclaveLedgerTxModel
|
||||
import net.corda.core.conclave.common.dto.EncryptedVerifiableTxAndDependencies
|
||||
import net.corda.core.conclave.common.dto.VerifiableTxAndDependencies
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.checkParameterHash
|
||||
|
@ -139,6 +139,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
|
||||
// Do nothing by default.
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun loadWhatWeHave(): Pair<List<SecureHash>, Set<SecureHash>> {
|
||||
val fromDisk = ArrayList<SecureHash>()
|
||||
val toFetch = LinkedHashSet<SecureHash>()
|
||||
@ -154,6 +155,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
|
||||
return Pair(fromDisk, toFetch)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun loadExpected(ids: List<SecureHash>): List<T> {
|
||||
val loaded = ids.mapNotNull { load(it) }
|
||||
require(ids.size == loaded.size) {
|
||||
@ -162,6 +164,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
|
||||
return loaded
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
protected abstract fun load(txid: SecureHash): T?
|
||||
|
||||
protected open fun convert(wire: W): T = uncheckedCast(wire)
|
||||
@ -226,6 +229,7 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||
|
||||
private val uploader = "$P2P_UPLOADER:${otherSideSession.counterparty.name}"
|
||||
|
||||
@Suspendable
|
||||
override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid)
|
||||
|
||||
override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire }, uploader)
|
||||
@ -272,16 +276,22 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||
class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
|
||||
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, DataType.TRANSACTION) {
|
||||
|
||||
@Suspendable
|
||||
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
|
||||
}
|
||||
|
||||
class FetchEncryptedTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
|
||||
class FetchEncryptedTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession, val flow: ResolveTransactionsFlow) :
|
||||
FetchDataFlow<EncryptedTransaction, EncryptedTransaction>(requests, otherSide, DataType.TRANSACTION) {
|
||||
|
||||
@Suspendable
|
||||
override fun load(txid: SecureHash): EncryptedTransaction? {
|
||||
return serviceHub.validatedTransactions.getEncryptedTransaction(txid)?.let {
|
||||
val theirAttestation: ByteArray = subFlow(ExchangeAttestationFlow(otherSideSession.counterparty))
|
||||
serviceHub.encryptedTransactionService.encryptTransactionForRemote(runId.uuid, it, theirAttestation)
|
||||
val foundEncryptedTransaction = serviceHub.validatedTransactions.getEncryptedTransaction(txid)
|
||||
if (foundEncryptedTransaction == null){
|
||||
return null
|
||||
}else{
|
||||
val theirAttestation: ByteArray = flow.subFlow(ExchangeAttestationFlow(otherSideSession.counterparty))
|
||||
val encryptedTransaction = serviceHub.encryptedTransactionService.encryptTransactionForRemote(runId.uuid, foundEncryptedTransaction, theirAttestation)
|
||||
return encryptedTransaction
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -289,6 +299,7 @@ class FetchEncryptedTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowS
|
||||
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
|
||||
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) {
|
||||
|
||||
@Suspendable
|
||||
override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? {
|
||||
val tran = serviceHub.validatedTransactions.getTransaction(txid)
|
||||
return if (tran == null) {
|
||||
@ -308,6 +319,8 @@ class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSessi
|
||||
class FetchNetworkParametersFlow(requests: Set<SecureHash>,
|
||||
otherSide: FlowSession) : FetchDataFlow<SignedDataWithCert<NetworkParameters>,
|
||||
SignedDataWithCert<NetworkParameters>>(requests, otherSide, DataType.PARAMETERS) {
|
||||
|
||||
@Suspendable
|
||||
override fun load(txid: SecureHash): SignedDataWithCert<NetworkParameters>? {
|
||||
return (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(txid)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.core.node.services
|
||||
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.conclave.common.DummyCordaEnclaveClient
|
||||
import net.corda.core.conclave.common.CordaEnclaveClient
|
||||
import net.corda.core.conclave.common.dto.ConclaveLedgerTxModel
|
||||
@ -13,35 +14,43 @@ import java.util.*
|
||||
|
||||
class EncryptedTransactionService(val enclaveClient: CordaEnclaveClient = DummyCordaEnclaveClient(CordaX500Name("PartyDummy", "London", "GB" ))) : SingletonSerializeAsToken() {
|
||||
|
||||
@Suspendable
|
||||
private fun getCurrentFlowIdOrGenerateNewInvokeId(): UUID {
|
||||
val currentFiber = Fiber.currentFiber() as? FlowStateMachine<*>
|
||||
return currentFiber?.id?.uuid ?: UUID.randomUUID()
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun getEnclaveInstance(): ByteArray {
|
||||
return enclaveClient.getEnclaveInstanceInfo()
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun registerRemoteEnclaveInstanceInfo(flowId: UUID, remoteAttestation: ByteArray) {
|
||||
enclaveClient.registerRemoteEnclaveInstanceInfo(flowId, remoteAttestation)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun enclaveVerifyWithoutSignatures(encryptedTxAndDependencies: EncryptedVerifiableTxAndDependencies) {
|
||||
return enclaveClient.enclaveVerifyWithoutSignatures(getCurrentFlowIdOrGenerateNewInvokeId(), encryptedTxAndDependencies)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun enclaveVerifyWithSignatures(encryptedTxAndDependencies: EncryptedVerifiableTxAndDependencies): EncryptedTransaction {
|
||||
return enclaveClient.enclaveVerifyWithSignatures(getCurrentFlowIdOrGenerateNewInvokeId(), encryptedTxAndDependencies)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun encryptTransactionForLocal(encryptedTransaction: EncryptedTransaction): EncryptedTransaction {
|
||||
return enclaveClient.encryptTransactionForLocal(getCurrentFlowIdOrGenerateNewInvokeId(), encryptedTransaction)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun encryptTransactionForRemote(flowId: UUID, conclaveLedgerTxModel: ConclaveLedgerTxModel, theirAttestationBytes: ByteArray): EncryptedTransaction {
|
||||
return enclaveClient.encryptConclaveLedgerTxForRemote(flowId, conclaveLedgerTxModel, theirAttestationBytes)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun encryptTransactionForRemote(flowId: UUID, encryptedTransaction: EncryptedTransaction, theirAttestationBytes: ByteArray): EncryptedTransaction {
|
||||
return enclaveClient.encryptEncryptedTransactionForRemote(flowId, encryptedTransaction, theirAttestationBytes)
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.core.node.services
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
@ -23,6 +24,7 @@ interface TransactionStorage {
|
||||
/**
|
||||
* Return the encrypted transaction with the given [id], or null if no such transaction exists.
|
||||
*/
|
||||
@Suspendable
|
||||
fun getEncryptedTransaction(id: SecureHash): EncryptedTransaction?
|
||||
|
||||
/**
|
||||
|
@ -225,7 +225,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
|
||||
@Suspendable
|
||||
private fun fetchEncryptedRequiredTransactions(requests: Set<SecureHash>): Pair<List<SecureHash>, List<EncryptedTransaction>> {
|
||||
val requestedTxs = flow.subFlow(FetchEncryptedTransactionsFlow(requests, flow.otherSide))
|
||||
val requestedTxs = flow.subFlow(FetchEncryptedTransactionsFlow(requests, flow.otherSide, flow))
|
||||
return Pair(requestedTxs.fromDisk.map { it.id }, requestedTxs.downloaded)
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,8 @@ import net.corda.node.utilities.AppendOnlyPersistentMapBase
|
||||
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
|
||||
import org.hibernate.annotations.LazyCollection
|
||||
import org.hibernate.annotations.LazyCollectionOption
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.time.Instant
|
||||
@ -78,7 +80,8 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
@Column(name = "timestamp", nullable = false)
|
||||
val timestamp: Instant,
|
||||
|
||||
@ElementCollection
|
||||
@ElementCollection(fetch = FetchType.EAGER)
|
||||
@LazyCollection(LazyCollectionOption.FALSE)
|
||||
@CollectionTable(
|
||||
name="${NODE_DATABASE_PREFIX}encrypted_transactions_dependencies",
|
||||
joinColumns = [JoinColumn(name = "tx_id", referencedColumnName = "tx_id")],
|
||||
@ -88,6 +91,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
val dependencies: List<String>,
|
||||
|
||||
@ElementCollection
|
||||
@LazyCollection(LazyCollectionOption.FALSE)
|
||||
@CollectionTable(
|
||||
name="${NODE_DATABASE_PREFIX}encrypted_transactions_signatures",
|
||||
joinColumns = [JoinColumn(name = "tx_id", referencedColumnName = "tx_id")],
|
||||
|
@ -292,9 +292,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
|
||||
private fun checkDbTransaction(isPresent: Boolean) {
|
||||
if (isPresent) {
|
||||
requireNotNull(contextTransactionOrNull) {
|
||||
"Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation."
|
||||
}
|
||||
// requireNotNull(contextTransactionOrNull) {
|
||||
// "Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation."
|
||||
// }
|
||||
} else {
|
||||
require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" }
|
||||
}
|
||||
|
@ -323,15 +323,10 @@ class NodeVaultService(
|
||||
when (statesToRecord) {
|
||||
StatesToRecord.NONE -> throw AssertionError("Should not reach here")
|
||||
StatesToRecord.ALL_VISIBLE, StatesToRecord.ONLY_RELEVANT -> {
|
||||
val notSeenReferences = tx.references - loadStates(tx.references).map { it.ref }
|
||||
// TODO: This is expensive - is there another way?
|
||||
tx.toLedgerTransaction(servicesForResolution).deserializableRefStates()
|
||||
.filter { (_, stateAndRef) -> stateAndRef.ref in notSeenReferences }
|
||||
.values
|
||||
emptyList<StateAndRef<ContractState>>()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet())
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user