mirror of
https://github.com/corda/corda.git
synced 2025-06-22 09:08:49 +00:00
Merge remote-tracking branch 'remotes/open/master' into parkri-os-merge-20181126-1
This commit is contained in:
@ -7,12 +7,13 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.BackpressureAwareTimedFlow
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.notary.generateSignature
|
||||
import net.corda.core.internal.notary.validateSignatures
|
||||
import net.corda.core.internal.pushToLoggingContext
|
||||
import net.corda.core.transactions.ContractUpgradeWireTransaction
|
||||
import net.corda.core.transactions.ReferenceStateRef
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
@ -36,7 +37,7 @@ class NotaryFlow {
|
||||
open class Client(
|
||||
private val stx: SignedTransaction,
|
||||
override val progressTracker: ProgressTracker
|
||||
) : FlowLogic<List<TransactionSignature>>(), TimedFlow {
|
||||
) : BackpressureAwareTimedFlow<List<TransactionSignature>>() {
|
||||
constructor(stx: SignedTransaction) : this(stx, tracker())
|
||||
|
||||
companion object {
|
||||
@ -90,7 +91,7 @@ class NotaryFlow {
|
||||
private fun sendAndReceiveValidating(session: FlowSession, signature: NotarisationRequestSignature): UntrustworthyData<NotarisationResponse> {
|
||||
val payload = NotarisationPayload(stx, signature)
|
||||
subFlow(NotarySendTransactionFlow(session, payload))
|
||||
return session.receive()
|
||||
return receiveResultOrTiming(session)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -98,10 +99,11 @@ class NotaryFlow {
|
||||
val ctx = stx.coreTransaction
|
||||
val tx = when (ctx) {
|
||||
is ContractUpgradeWireTransaction -> ctx.buildFilteredTransaction()
|
||||
is WireTransaction -> ctx.buildFilteredTransaction(Predicate { it is StateRef || it is TimeWindow || it == notaryParty })
|
||||
is WireTransaction -> ctx.buildFilteredTransaction(Predicate { it is StateRef || it is ReferenceStateRef || it is TimeWindow || it == notaryParty })
|
||||
else -> ctx
|
||||
}
|
||||
return session.sendAndReceiveWithRetry(NotarisationPayload(tx, signature))
|
||||
session.send(NotarisationPayload(tx, signature))
|
||||
return receiveResultOrTiming(session)
|
||||
}
|
||||
|
||||
/** Checks that the notary's signature(s) is/are valid. */
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A notarisation request specifies a list of states to consume and the id of the consuming transaction. Its primary
|
||||
@ -80,4 +81,8 @@ data class NotarisationPayload(val transaction: Any, val requestSignature: Notar
|
||||
|
||||
/** Payload returned by the notary service flow to the client. */
|
||||
@CordaSerializable
|
||||
data class NotarisationResponse(val signatures: List<TransactionSignature>)
|
||||
data class NotarisationResponse(val signatures: List<TransactionSignature>)
|
||||
|
||||
/** Sent by the notary when the notary detects it will unlikely respond before the client retries. */
|
||||
@CordaSerializable
|
||||
data class WaitTimeUpdate(val waitTime: Duration)
|
||||
|
@ -0,0 +1,36 @@
|
||||
package net.corda.core.internal
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.WaitTimeUpdate
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
|
||||
const val MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE = 4
|
||||
|
||||
/**
|
||||
* Implementation of TimedFlow that can handle WaitTimeUpdate messages. Any flow talking to the notary should implement this and use
|
||||
* explicit send and this class's receiveResultOrTiming to receive the response to handle cases where the notary sends a timeout update.
|
||||
*
|
||||
* This is handling the special case of the notary where the notary service will have an internal queue on the uniqueness provider and we
|
||||
* want to stop retries overwhelming that internal queue. As the TimedFlow mechanism and the notary service back-pressure are very specific
|
||||
* to this use case at the moment, this implementation is internal and not for general use.
|
||||
*/
|
||||
abstract class BackpressureAwareTimedFlow<ResultType> : FlowLogic<ResultType>(), TimedFlow {
|
||||
@Suspendable
|
||||
inline fun <reified ReceiveType> receiveResultOrTiming(session: FlowSession): UntrustworthyData<ReceiveType> {
|
||||
while (true) {
|
||||
val wrappedResult = session.receive<Any>()
|
||||
val unwrapped = wrappedResult.fromUntrustedWorld
|
||||
when {
|
||||
unwrapped is WaitTimeUpdate -> {
|
||||
logger.info("Counterparty [${session.counterparty}] is busy - TimedFlow $runId has been asked to wait for an additional ${unwrapped.waitTime} seconds for completion.")
|
||||
stateMachine.updateTimedFlowTimeout(unwrapped.waitTime.seconds)
|
||||
}
|
||||
unwrapped is ReceiveType -> @Suppress("UNCHECKED_CAST") // The compiler doesn't understand it's checked in the line above
|
||||
return wrappedResult as UntrustworthyData<ReceiveType>
|
||||
else -> throw throw IllegalArgumentException("We were expecting a ${ReceiveType::class.java.name} or WaitTimeUpdate but we instead got a ${unwrapped.javaClass.name} ($unwrapped)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@ import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import org.slf4j.MDC
|
||||
|
||||
// *Internal* Corda-specific utilities
|
||||
// *Internal* Corda-specific utilities.
|
||||
|
||||
const val PLATFORM_VERSION = 4
|
||||
|
||||
@ -33,13 +33,13 @@ fun checkMinimumPlatformVersion(minimumPlatformVersion: Int, requiredMinPlatform
|
||||
}
|
||||
}
|
||||
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests */
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests. */
|
||||
@DeleteForDJVM
|
||||
fun TransactionBuilder.toWireTransaction(services: ServicesForResolution, serializationContext: SerializationContext): WireTransaction {
|
||||
return toWireTransactionWithContext(services, serializationContext)
|
||||
}
|
||||
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests */
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests. */
|
||||
@DeleteForDJVM
|
||||
fun TransactionBuilder.toLedgerTransaction(services: ServicesForResolution, serializationContext: SerializationContext): LedgerTransaction {
|
||||
return toLedgerTransactionWithContext(services, serializationContext)
|
||||
|
@ -36,6 +36,8 @@ interface FlowStateMachine<FLOWRETURN> {
|
||||
@Suspendable
|
||||
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>)
|
||||
|
||||
fun updateTimedFlowTimeout(timeoutSeconds: Long)
|
||||
|
||||
val logic: FlowLogic<FLOWRETURN>
|
||||
val serviceHub: ServiceHub
|
||||
val logger: Logger
|
||||
|
@ -4,10 +4,23 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotarisationResponse
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.flows.WaitTimeUpdate
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.IdempotentFlow
|
||||
import net.corda.core.internal.MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A flow run by a notary service that handles notarisation requests.
|
||||
@ -16,16 +29,30 @@ import net.corda.core.utilities.unwrap
|
||||
* if any of the input states have been previously committed.
|
||||
*
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*
|
||||
* @param otherSideSession The session with the notary client.
|
||||
* @param service The notary service to utilise.
|
||||
* @param etaThreshold If the ETA for processing the request, according to the service, is greater than this, notify the client.
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService, private val etaThreshold: Duration) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
companion object {
|
||||
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
|
||||
private const val maxAllowedInputsAndReferences = 10_000
|
||||
|
||||
/**
|
||||
* This is default wait time estimate for notaries/uniqueness providers that do not estimate wait times.
|
||||
* Also used as default eta message threshold so that a default wait time/default threshold will never
|
||||
* lead to an update message being sent.
|
||||
*/
|
||||
val defaultEstimatedWaitTime: Duration = 10.seconds
|
||||
}
|
||||
|
||||
private var transactionId: SecureHash? = null
|
||||
|
||||
@Suspendable
|
||||
private fun counterpartyCanHandleBackPressure() = otherSideSession.getCounterpartyFlowInfo(true).flowVersion >= MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Void? {
|
||||
check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) {
|
||||
@ -40,6 +67,11 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
|
||||
verifyTransaction(requestPayload)
|
||||
|
||||
val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size)
|
||||
if (eta > etaThreshold && counterpartyCanHandleBackPressure()) {
|
||||
otherSideSession.send(WaitTimeUpdate(eta))
|
||||
}
|
||||
|
||||
service.commitInputStates(
|
||||
tx.inputs,
|
||||
tx.id,
|
||||
|
@ -4,7 +4,11 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignableData
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
@ -14,6 +18,7 @@ import net.corda.core.internal.notary.UniquenessProvider.Result
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
import java.time.Duration
|
||||
|
||||
/** Base implementation for a notary service operated by a singe party. */
|
||||
abstract class SinglePartyNotaryService : NotaryService() {
|
||||
@ -42,6 +47,7 @@ abstract class SinglePartyNotaryService : NotaryService() {
|
||||
|
||||
val callingFlow = FlowLogic.currentTopLevel
|
||||
?: throw IllegalStateException("This method should be invoked in a flow context.")
|
||||
|
||||
val result = callingFlow.executeAsync(
|
||||
CommitOperation(
|
||||
this,
|
||||
@ -59,6 +65,13 @@ abstract class SinglePartyNotaryService : NotaryService() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate the wait time to be notarised taking into account the new request size.
|
||||
*
|
||||
* @param numStates The number of states we're about to request be notarised.
|
||||
*/
|
||||
fun getEstimatedWaitTime(numStates: Int): Duration = uniquenessProvider.getEta(numStates)
|
||||
|
||||
/**
|
||||
* Required for the flow to be able to suspend until the commit is complete.
|
||||
* This object will be included in the flow checkpoint.
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
@ -23,6 +24,18 @@ interface UniquenessProvider {
|
||||
references: List<StateRef> = emptyList()
|
||||
): CordaFuture<Result>
|
||||
|
||||
/**
|
||||
* Estimated time of request processing. A uniqueness provider that is aware of their own throughput can return
|
||||
* an estimate how long requests will be queued before they can be processed. Notary services use this information
|
||||
* to potentially update clients with an expected wait time in order to avoid spamming by retries when the notary
|
||||
* gets busy.
|
||||
*
|
||||
* @param numStates The number of states (input + reference) in the new request, to be added to the pending count.
|
||||
*/
|
||||
fun getEta(numStates: Int): Duration {
|
||||
return NotaryServiceFlow.defaultEstimatedWaitTime
|
||||
}
|
||||
|
||||
/** The outcome of committing a transaction. */
|
||||
sealed class Result {
|
||||
/** Indicates that all input states have been committed successfully. */
|
||||
@ -30,4 +43,4 @@ interface UniquenessProvider {
|
||||
/** Indicates that the transaction has not been committed. */
|
||||
data class Failure(val error: NotaryError) : Result()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -147,7 +147,9 @@ class FilteredTransaction internal constructor(
|
||||
wtx.attachments.forEachIndexed { internalIndex, it -> filter(it, ATTACHMENTS_GROUP.ordinal, internalIndex) }
|
||||
if (wtx.notary != null) filter(wtx.notary, NOTARY_GROUP.ordinal, 0)
|
||||
if (wtx.timeWindow != null) filter(wtx.timeWindow, TIMEWINDOW_GROUP.ordinal, 0)
|
||||
wtx.references.forEachIndexed { internalIndex, it -> filter(it, REFERENCES_GROUP.ordinal, internalIndex) }
|
||||
// Note that because [inputs] and [references] share the same type [StateRef], we use a wrapper for references [ReferenceStateRef],
|
||||
// when filtering. Thus, to filter-in all [references] based on type, one should use the wrapper type [ReferenceStateRef] and not [StateRef].
|
||||
wtx.references.forEachIndexed { internalIndex, it -> filter(ReferenceStateRef(it), REFERENCES_GROUP.ordinal, internalIndex) }
|
||||
// It is highlighted that because there is no a signers property in TraversableTransaction,
|
||||
// one cannot specifically filter them in or out.
|
||||
// The above is very important to ensure someone won't filter out the signers component group if at least one
|
||||
@ -344,3 +346,8 @@ class ComponentVisibilityException(val id: SecureHash, val reason: String) : Cor
|
||||
@KeepForDJVM
|
||||
@CordaSerializable
|
||||
class FilteredTransactionVerificationException(val id: SecureHash, val reason: String) : CordaException("Transaction with id:$id cannot be verified. Reason: $reason")
|
||||
|
||||
/** Wrapper over [StateRef] to be used when filtering reference states. */
|
||||
@KeepForDJVM
|
||||
@CordaSerializable
|
||||
data class ReferenceStateRef(val stateRef: StateRef)
|
||||
|
@ -8,11 +8,13 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.ReferenceStateRef
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
@ -59,9 +61,14 @@ class PartialMerkleTreeTest {
|
||||
hashed = nodes.map { it.serialize().sha256() }
|
||||
expectedRoot = MerkleTree.getMerkleTree(hashed.toMutableList() + listOf(zeroHash, zeroHash)).hash
|
||||
merkleTree = MerkleTree.getMerkleTree(hashed)
|
||||
testLedger = MockServices(emptyList(), MEGA_CORP.name, rigorousMock<IdentityServiceInternal>().also {
|
||||
doReturn(MEGA_CORP).whenever(it).partyFromKey(MEGA_CORP_PUBKEY)
|
||||
}).ledger(DUMMY_NOTARY) {
|
||||
|
||||
testLedger = MockServices(
|
||||
cordappPackages = emptyList(),
|
||||
initialIdentity = TestIdentity(MEGA_CORP.name),
|
||||
identityService = rigorousMock<IdentityServiceInternal>().also {
|
||||
doReturn(MEGA_CORP).whenever(it).partyFromKey(MEGA_CORP_PUBKEY) },
|
||||
networkParameters = testNetworkParameters(minimumPlatformVersion = 4)
|
||||
).ledger(DUMMY_NOTARY) {
|
||||
unverifiedTransaction {
|
||||
attachments(Cash.PROGRAM_ID)
|
||||
output(Cash.PROGRAM_ID, "MEGA_CORP cash",
|
||||
@ -76,6 +83,7 @@ class PartialMerkleTreeTest {
|
||||
transaction {
|
||||
attachments(Cash.PROGRAM_ID)
|
||||
input("MEGA_CORP cash")
|
||||
reference("dummy cash 1")
|
||||
output(Cash.PROGRAM_ID, "MEGA_CORP cash".output<Cash.State>().copy(owner = MINI_CORP))
|
||||
command(MEGA_CORP_PUBKEY, Cash.Commands.Move())
|
||||
timeWindow(TEST_TX_TIME)
|
||||
@ -148,6 +156,7 @@ class PartialMerkleTreeTest {
|
||||
// the signers component is also sent (required for visibility purposes).
|
||||
assertEquals(5, ftx.filteredComponentGroups.size)
|
||||
assertEquals(1, ftx.inputs.size)
|
||||
assertEquals(0, ftx.references.size)
|
||||
assertEquals(0, ftx.attachments.size)
|
||||
assertEquals(1, ftx.outputs.size)
|
||||
assertEquals(1, ftx.commands.size)
|
||||
@ -173,6 +182,7 @@ class PartialMerkleTreeTest {
|
||||
assertTrue(ftxNothing.attachments.isEmpty())
|
||||
assertTrue(ftxNothing.commands.isEmpty())
|
||||
assertTrue(ftxNothing.inputs.isEmpty())
|
||||
assertTrue(ftxNothing.references.isEmpty())
|
||||
assertTrue(ftxNothing.outputs.isEmpty())
|
||||
assertNull(ftxNothing.timeWindow)
|
||||
assertTrue(ftxNothing.availableComponentGroups.flatten().isEmpty())
|
||||
@ -321,4 +331,21 @@ class PartialMerkleTreeTest {
|
||||
// The provided hash is not in the tree (using a leaf that didn't exist in the original Merkle tree).
|
||||
assertFailsWith<MerkleTreeException> { pmtAllIncluded.leafIndex(SecureHash.sha256("30")) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `building Merkle for reference states only`() {
|
||||
fun filtering(elem: Any): Boolean {
|
||||
return when (elem) {
|
||||
is ReferenceStateRef -> true
|
||||
else -> false
|
||||
}
|
||||
}
|
||||
|
||||
val ftx = testTx.buildFilteredTransaction(Predicate(::filtering))
|
||||
|
||||
assertEquals(1, ftx.filteredComponentGroups.size)
|
||||
assertEquals(0, ftx.inputs.size)
|
||||
assertEquals(1, ftx.references.size)
|
||||
ftx.verify()
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
val CONTRACT_ID = "net.corda.core.transactions.ReferenceStateTests\$ExampleContract"
|
||||
const val CONTRACT_ID = "net.corda.core.transactions.ReferenceStateTests\$ExampleContract"
|
||||
|
||||
class ReferenceStateTests {
|
||||
private companion object {
|
||||
|
Reference in New Issue
Block a user