StateMachine rewrite

This commit is contained in:
Andras Slemmer
2018-04-13 15:17:20 +01:00
parent d3446e213c
commit ce5fb66260
83 changed files with 4719 additions and 2009 deletions

View File

@ -0,0 +1,16 @@
package net.corda.core.flows;
import javax.annotation.Nullable;
/**
* An exception that may be identified with an ID. If an exception originates in a counter-flow this ID will be
* propagated. This allows correlation of error conditions across different flows.
*/
public interface IdentifiableException {
/**
* @return the ID of the error, or null if the error doesn't have it set (yet).
*/
default @Nullable Long getErrorId() {
return null;
}
}

View File

@ -7,16 +7,27 @@ import net.corda.core.CordaRuntimeException
/**
* Exception which can be thrown by a [FlowLogic] at any point in its logic to unexpectedly bring it to a permanent end.
* The exception will propagate to all counterparty flows and will be thrown on their end the next time they wait on a
* [FlowSession.receive] or [FlowSession.sendAndReceive]. Any flow which no longer needs to do a receive, or has already ended,
* will not receive the exception (if this is required then have them wait for a confirmation message).
* [FlowSession.receive] or [FlowSession.sendAndReceive]. Any flow which no longer needs to do a receive, or has already
* ended, will not receive the exception (if this is required then have them wait for a confirmation message).
*
* If the *rethrown* [FlowException] is uncaught in counterparty flows and propagation triggers then the exception is
* downgraded to an [UnexpectedFlowEndException]. This means only immediate counterparty flows will receive information
* about what the exception was.
*
* [FlowException] (or a subclass) can be a valid expected response from a flow, particularly ones which act as a service.
* It is recommended a [FlowLogic] document the [FlowException] types it can throw.
*
* @property originalErrorId the ID backing [getErrorId]. If null it will be set dynamically by the flow framework when
* the exception is handled. This ID is propagated to counterparty flows, even when the [FlowException] is
* downgraded to an [UnexpectedFlowEndException]. This is so the error conditions may be correlated later on.
*/
open class FlowException(message: String?, cause: Throwable?) : CordaException(message, cause) {
open class FlowException(message: String?, cause: Throwable?) :
CordaException(message, cause), IdentifiableException {
constructor(message: String?) : this(message, null)
constructor(cause: Throwable?) : this(cause?.toString(), cause)
constructor() : this(null, null)
var originalErrorId: Long? = null
override fun getErrorId(): Long? = originalErrorId
}
// DOCEND 1
@ -25,6 +36,7 @@ open class FlowException(message: String?, cause: Throwable?) : CordaException(m
* that we were not expecting), or the other side had an internal error, or the other side terminated when we
* were waiting for a response.
*/
class UnexpectedFlowEndException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause) {
constructor(msg: String) : this(msg, null)
}
class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long) :
CordaRuntimeException(message, cause), IdentifiableException {
override fun getErrorId(): Long = originalErrorId
}

View File

@ -6,20 +6,17 @@ import net.corda.core.CordaInternal
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate
import net.corda.core.internal.uncheckedCast
import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import net.corda.core.utilities.*
import org.slf4j.Logger
import java.time.Duration
import java.time.Instant
/**
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
@ -77,12 +74,19 @@ abstract class FlowLogic<out T> {
*/
@Suspendable
@JvmStatic
@JvmOverloads
@Throws(FlowException::class)
fun sleep(duration: Duration) {
fun sleep(duration: Duration, maySkipCheckpoint: Boolean = false) {
if (duration > Duration.ofMinutes(5)) {
throw FlowException("Attempt to sleep for longer than 5 minutes is not supported. Consider using SchedulableState.")
}
(Strand.currentStrand() as? FlowStateMachine<*>)?.sleepUntil(Instant.now() + duration) ?: Strand.sleep(duration.toMillis())
val fiber = (Strand.currentStrand() as? FlowStateMachine<*>)
if (fiber == null) {
Strand.sleep(duration.toMillis())
} else {
val request = FlowIORequest.Sleep(wakeUpAfter = fiber.serviceHub.clock.instant() + duration)
fiber.suspend(request, maySkipCheckpoint = maySkipCheckpoint)
}
}
}
@ -94,7 +98,7 @@ abstract class FlowLogic<out T> {
/**
* Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
* only available once the flow has started, which means it cannnot be accessed in the constructor. Either
* only available once the flow has started, which means it cannot be accessed in the constructor. Either
* access this lazily or from inside [call].
*/
val serviceHub: ServiceHub get() = stateMachine.serviceHub
@ -104,7 +108,7 @@ abstract class FlowLogic<out T> {
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
*/
@Suspendable
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, flowUsedForSessions)
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party)
/**
* Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that
@ -114,7 +118,10 @@ abstract class FlowLogic<out T> {
* Note: The current implementation returns the single identity of the node. This will change once multiple identities
* is implemented.
*/
val ourIdentityAndCert: PartyAndCertificate get() = stateMachine.ourIdentityAndCert
val ourIdentityAndCert: PartyAndCertificate get() {
return serviceHub.myInfo.legalIdentitiesAndCerts.find { it.party == stateMachine.ourIdentity }
?: throw IllegalStateException("Identity specified by ${stateMachine.id} (${stateMachine.ourIdentity}) is not one of ours!")
}
/**
* Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node.
@ -124,102 +131,23 @@ abstract class FlowLogic<out T> {
* Note: The current implementation returns the single identity of the node. This will change once multiple identities
* is implemented.
*/
val ourIdentity: Party get() = ourIdentityAndCert.party
/**
* Returns a [FlowInfo] object describing the flow [otherParty] is using. With [FlowInfo.flowVersion] it
* provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
*
* This method can be called before any send or receive has been done with [otherParty]. In such a case this will force
* them to start their flow.
*/
@Deprecated("Use FlowSession.getFlowInfo()", level = DeprecationLevel.WARNING)
@Suspendable
fun getFlowInfo(otherParty: Party): FlowInfo = stateMachine.getFlowInfo(otherParty, flowUsedForSessions, maySkipCheckpoint = false)
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [R] type.
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<R> {
return sendAndReceive(R::class.java, otherParty, payload)
}
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
* should not be trusted until it's been thoroughly verified for consistency and that all expectations are
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
@Suspendable
open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions, retrySend = false, maySkipCheckpoint = false)
}
/**
* Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received.
*
* Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead.
* It is only intended for the case where the [otherParty] is running a distributed service with an idempotent
* flow which only accepts a single request and sends back a single response e.g. a notary or certain types of
* oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered
* to a different one, so there is no need to wait until the initial node comes back up to obtain a response.
*/
@Deprecated("Use FlowSession.sendAndReceiveWithRetry()", level = DeprecationLevel.WARNING)
internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
return stateMachine.sendAndReceive(R::class.java, otherParty, payload, flowUsedForSessions, retrySend = true, maySkipCheckpoint = false)
}
val ourIdentity: Party get() = stateMachine.ourIdentity
@Suspendable
internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
return stateMachine.sendAndReceive(receiveType, counterparty, payload, flowUsedForSessions, retrySend = true, maySkipCheckpoint = false)
val request = FlowIORequest.SendAndReceive(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = true
)
return stateMachine.suspend(request, maySkipCheckpoint = false)[this]!!.checkPayloadIs(receiveType)
}
@Suspendable
internal inline fun <reified R : Any> FlowSession.sendAndReceiveWithRetry(payload: Any): UntrustworthyData<R> {
return stateMachine.sendAndReceive(R::class.java, counterparty, payload, flowUsedForSessions, retrySend = true, maySkipCheckpoint = false)
return sendAndReceiveWithRetry(R::class.java, payload)
}
/**
* Suspends until the specified [otherParty] sends us a message of type [R].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*/
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty)
/**
* Suspends until the specified [otherParty] sends us a message of type [receiveType].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
@Suspendable
open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R> {
return stateMachine.receive(receiveType, otherParty, flowUsedForSessions, maySkipCheckpoint = false)
}
/** Suspends until a message has been received for each session in the specified [sessions].
*
@ -232,8 +160,14 @@ abstract class FlowLogic<out T> {
* @returns a [Map] containing the objects received, wrapped in an [UntrustworthyData], by the [FlowSession]s who sent them.
*/
@Suspendable
open fun receiveAllMap(sessions: Map<FlowSession, Class<out Any>>): Map<FlowSession, UntrustworthyData<Any>> {
return stateMachine.receiveAll(sessions, this)
@JvmOverloads
open fun receiveAllMap(sessions: Map<FlowSession, Class<out Any>>, maySkipCheckpoint: Boolean = false): Map<FlowSession, UntrustworthyData<Any>> {
enforceNoPrimitiveInReceive(sessions.values)
val replies = stateMachine.suspend(
ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()),
maySkipCheckpoint = maySkipCheckpoint
)
return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) }
}
/**
@ -248,24 +182,13 @@ abstract class FlowLogic<out T> {
* @returns a [List] containing the objects received, wrapped in an [UntrustworthyData], with the same order of [sessions].
*/
@Suspendable
open fun <R : Any> receiveAll(receiveType: Class<R>, sessions: List<FlowSession>): List<UntrustworthyData<R>> {
@JvmOverloads
open fun <R : Any> receiveAll(receiveType: Class<R>, sessions: List<FlowSession>, maySkipCheckpoint: Boolean = false): List<UntrustworthyData<R>> {
enforceNoPrimitiveInReceive(listOf(receiveType))
enforceNoDuplicates(sessions)
return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions)))
}
/**
* Queues the given [payload] for sending to the [otherParty] and continues without suspending.
*
* Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty]
* is offline then message delivery will be retried until it comes back or until the message is older than the
* network's event horizon time.
*/
@Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING)
@Suspendable
open fun send(otherParty: Party, payload: Any) {
stateMachine.send(otherParty, payload, flowUsedForSessions, maySkipCheckpoint = false)
}
/**
* Invokes the given subflow. This function returns once the subflow completes successfully with the result
* returned by that subflow's [call] method. If the subflow has a progress tracker, it is attached to the
@ -283,11 +206,8 @@ abstract class FlowLogic<out T> {
open fun <R> subFlow(subLogic: FlowLogic<R>): R {
subLogic.stateMachine = stateMachine
maybeWireUpProgressTracking(subLogic)
if (!subLogic.javaClass.isAnnotationPresent(InitiatingFlow::class.java)) {
subLogic.flowUsedForSessions = flowUsedForSessions
}
logger.debug { "Calling subflow: $subLogic" }
val result = subLogic.call()
val result = stateMachine.subFlow(subLogic)
logger.debug { "Subflow finished with result ${result.toString().abbreviate(300)}" }
// It's easy to forget this when writing flows so we just step it to the DONE state when it completes.
subLogic.progressTracker?.currentStep = ProgressTracker.DONE
@ -384,7 +304,8 @@ abstract class FlowLogic<out T> {
@Suspendable
@JvmOverloads
fun waitForLedgerCommit(hash: SecureHash, maySkipCheckpoint: Boolean = false): SignedTransaction {
return stateMachine.waitForLedgerCommit(hash, this, maySkipCheckpoint = maySkipCheckpoint)
val request = FlowIORequest.WaitForLedgerCommit(hash)
return stateMachine.suspend(request, maySkipCheckpoint = maySkipCheckpoint)
}
/**
@ -427,11 +348,6 @@ abstract class FlowLogic<out T> {
_stateMachine = value
}
// This is the flow used for managing sessions. It defaults to the current flow but if this is an inlined sub-flow
// then it will point to the flow it's been inlined to.
@Suppress("LeakingThis")
private var flowUsedForSessions: FlowLogic<*> = this
private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) {
val ours = progressTracker
val theirs = subLogic.progressTracker
@ -448,6 +364,11 @@ abstract class FlowLogic<out T> {
require(sessions.size == sessions.toSet().size) { "A flow session can only appear once as argument." }
}
private fun enforceNoPrimitiveInReceive(types: Collection<Class<*>>) {
val primitiveTypes = types.filter { it.isPrimitive }
require(primitiveTypes.isEmpty()) { "Cannot receive primitive type(s) $primitiveTypes" }
}
private fun <R> associateSessionsToReceiveType(receiveType: Class<R>, sessions: List<FlowSession>): Map<FlowSession, Class<R>> {
return sessions.associateByTo(LinkedHashMap(), { it }, { receiveType })
}
@ -472,4 +393,4 @@ data class FlowInfo(
* to deduplicate it from other releases of the same CorDapp, typically a version string. See the
* [CorDapp JAR format](https://docs.corda.net/cordapp-build-systems.html#cordapp-jar-format) for more details.
*/
val appName: String)
val appName: String)

View File

@ -0,0 +1,23 @@
package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
/**
* Interface for arbitrary operations that can be invoked in a flow asynchronously - the flow will suspend until the
* operation completes. Operation parameters are expected to be injected via constructor.
*/
@CordaSerializable
interface FlowAsyncOperation<R : Any> {
/** Performs the operation in a non-blocking fashion. */
fun execute(): CordaFuture<R>
}
/** Executes the specified [operation] and suspends until operation completion. */
@Suspendable
fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, maySkipCheckpoint: Boolean = false): R {
val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint)
}

View File

@ -0,0 +1,89 @@
package net.corda.core.internal
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowSession
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NonEmptySet
import java.time.Instant
/**
* A [FlowIORequest] represents an IO request of a flow when it suspends. It is persisted in checkpoints.
*/
sealed class FlowIORequest<out R : Any> {
/**
* Send messages to sessions.
*
* @property sessionToMessage a map from session to message-to-be-sent.
* @property shouldRetrySend specifies whether the send should be retried.
*/
data class Send(
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>,
val shouldRetrySend: Boolean
) : FlowIORequest<Unit>() {
override fun toString() = "Send(" +
"sessionToMessage=${sessionToMessage.mapValues { it.value.hash }}, " +
"shouldRetrySend=$shouldRetrySend" +
")"
}
/**
* Receive messages from sessions.
*
* @property sessions the sessions to receive messages from.
* @return a map from session to received message.
*/
data class Receive(
val sessions: NonEmptySet<FlowSession>
) : FlowIORequest<Map<FlowSession, SerializedBytes<Any>>>()
/**
* Send and receive messages from the specified sessions.
*
* @property sessionToMessage a map from session to message-to-be-sent. The keys also specify which sessions to
* receive from.
* @property shouldRetrySend specifies whether the send should be retried.
* @return a map from session to received message.
*/
data class SendAndReceive(
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>,
val shouldRetrySend: Boolean
) : FlowIORequest<Map<FlowSession, SerializedBytes<Any>>>() {
override fun toString() = "SendAndReceive(${sessionToMessage.mapValues { (key, value) ->
"$key=${value.hash}" }}, shouldRetrySend=$shouldRetrySend)"
}
/**
* Wait for a transaction to be committed to the database.
*
* @property hash the hash of the transaction.
* @return the committed transaction.
*/
data class WaitForLedgerCommit(val hash: SecureHash) : FlowIORequest<SignedTransaction>()
/**
* Get the FlowInfo of the specified sessions.
*
* @property sessions the sessions to get the FlowInfo of.
* @return a map from session to FlowInfo.
*/
data class GetFlowInfo(val sessions: NonEmptySet<FlowSession>) : FlowIORequest<Map<FlowSession, FlowInfo>>()
/**
* Suspend the flow until the specified time.
*
* @property wakeUpAfter the time to sleep until.
*/
data class Sleep(val wakeUpAfter: Instant) : FlowIORequest<Unit>()
/**
* Suspend the flow until all Initiating sessions are confirmed.
*/
object WaitForSessionConfirmations : FlowIORequest<Unit>()
/**
* Execute the specified [operation], suspend the flow until completion.
*/
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
}

View File

@ -1,64 +1,42 @@
package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DoNotImplement
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.context.InvocationContext
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData
import org.slf4j.Logger
import java.time.Instant
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
interface FlowStateMachine<R> {
@DoNotImplement
interface FlowStateMachine<FLOWRETURN> {
@Suspendable
fun getFlowInfo(otherParty: Party, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): FlowInfo
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
@Suspendable
fun initiateFlow(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession
@Suspendable
fun <T : Any> sendAndReceive(receiveType: Class<T>,
otherParty: Party,
payload: Any,
sessionFlow: FlowLogic<*>,
retrySend: Boolean,
maySkipCheckpoint: Boolean): UntrustworthyData<T>
@Suspendable
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): UntrustworthyData<T>
@Suspendable
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean)
@Suspendable
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): SignedTransaction
@Suspendable
fun sleepUntil(until: Instant)
fun initiateFlow(party: Party): FlowSession
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>)
@Suspendable
fun <SUBFLOWRETURN> subFlow(subFlow: FlowLogic<SUBFLOWRETURN>): SUBFLOWRETURN
@Suspendable
fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot?
@Suspendable
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>)
val logic: FlowLogic<R>
val logic: FlowLogic<FLOWRETURN>
val serviceHub: ServiceHub
val logger: Logger
val id: StateMachineRunId
val resultFuture: CordaFuture<R>
val resultFuture: CordaFuture<FLOWRETURN>
val context: InvocationContext
val ourIdentityAndCert: PartyAndCertificate
@Suspendable
fun receiveAll(sessions: Map<FlowSession, Class<out Any>>, sessionFlow: FlowLogic<*>): Map<FlowSession, UntrustworthyData<Any>>
val ourIdentity: Party
}

View File

@ -1,6 +1,7 @@
package net.corda.core.node.services
import net.corda.core.DoNotImplement
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.DataFeed
import net.corda.core.transactions.SignedTransaction
@ -26,4 +27,9 @@ interface TransactionStorage {
* Returns all currently stored transactions and further fresh ones.
*/
fun track(): DataFeed<List<SignedTransaction>, SignedTransaction>
/**
* Returns a future that completes with the transaction corresponding to [id] once it has been committed
*/
fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction>
}

View File

@ -2,6 +2,9 @@ package net.corda.core.utilities
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowException
import net.corda.core.internal.castIfPossible
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import java.io.Serializable
/**
@ -29,3 +32,15 @@ class UntrustworthyData<out T>(@PublishedApi internal val fromUntrustedWorld: T)
}
inline fun <T, R> UntrustworthyData<T>.unwrap(validator: (T) -> R): R = validator(fromUntrustedWorld)
fun <T : Any> SerializedBytes<Any>.checkPayloadIs(type: Class<T>): UntrustworthyData<T> {
val payloadData: T = try {
val serializer = SerializationDefaults.SERIALIZATION_FACTORY
serializer.deserialize(this, type, SerializationDefaults.P2P_CONTEXT)
} catch (ex: Exception) {
throw IllegalArgumentException("Payload invalid", ex)
}
return type.castIfPossible(payloadData)?.let { UntrustworthyData(it) } ?:
throw IllegalArgumentException("We were expecting a ${type.name} but we instead got a " +
"${payloadData.javaClass.name} (${payloadData})")
}

View File

@ -61,9 +61,8 @@ public class FlowsInJavaTest {
fail("ExecutionException should have been thrown");
} catch (ExecutionException e) {
assertThat(e.getCause())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("primitive")
.hasMessageContaining(receiveType.getName());
.hasMessageContaining(Primitives.unwrap(receiveType).getName());
}
}