CORDA-3596 Record flow metadata (#6067)

* CORDA-3596 Record flow metadata

Record flow metadata during the zero'th checkpoint that occurs before
calling the flow's `call` function.

This required adding an RPC call's arguments to the `InvocationContext`
that gets created. These arguments are then accessible within the
statemachine and from the `Checkpoint` class. The arguments are then
extracted when recording a flow's metadata inside of
`DBCheckpointStorage`.

Updated the size of the started by column to 128 since it was not long
enough to hold the fully qualified class of a service that started a
flow.

* CORDA-3596 Remove arguments from in-memory checkpoint

When executing a flows first real suspend (from flow code) the arguments
 contained in the `InvocationContext` are removed. This saves holding
 these arguments for the whole lifecyle of a flow.

* CORDA-3596 Increase `cordapp_name` column to 128

* CORDA-3596 Join metadata by `flow_id`

Due to changes in where metadata is recorded, there is no need for
having `invocation_id` as the metadata table's primary key. The
`flow_id` is now the primary key of the table and is used to join to the
 main checkpoints table.

The `invocation_id` has been removed from the checkpoints table since it
 is not needed for the join anymore.

* CORDA-3596 Remove `received_time` from metadata table

* CORDA-3596 Remove unused `StartReason` enum

* CORDA-3596 Simple `DBCheckpointStorageTests` for metadata

* CORDA-3596 Truncate really long flow names
This commit is contained in:
Dan Newton 2020-03-17 17:28:32 +00:00 committed by GitHub
parent 4c6d87d9fb
commit ca23612fe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 733 additions and 120 deletions

View File

@ -18,21 +18,53 @@ import java.security.Principal
* @property impersonatedActor Optional impersonated actor, used for logging but not for authorisation. * @property impersonatedActor Optional impersonated actor, used for logging but not for authorisation.
*/ */
@CordaSerializable @CordaSerializable
data class InvocationContext(val origin: InvocationOrigin, val trace: Trace, val actor: Actor?, val externalTrace: Trace? = null, val impersonatedActor: Actor? = null) { data class InvocationContext(
val origin: InvocationOrigin,
val trace: Trace,
val actor: Actor?,
val externalTrace: Trace? = null,
val impersonatedActor: Actor? = null,
val arguments: List<Any?> = emptyList()
) {
constructor(
origin: InvocationOrigin,
trace: Trace,
actor: Actor?,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null
) : this(origin, trace, actor, externalTrace, impersonatedActor, emptyList())
companion object { companion object {
/** /**
* Creates an [InvocationContext] with a [Trace] that defaults to a [java.util.UUID] as value and [java.time.Instant.now] timestamp. * Creates an [InvocationContext] with a [Trace] that defaults to a [java.util.UUID] as value and [java.time.Instant.now] timestamp.
*/ */
@DeleteForDJVM @DeleteForDJVM
@JvmStatic @JvmStatic
fun newInstance(origin: InvocationOrigin, trace: Trace = Trace.newInstance(), actor: Actor? = null, externalTrace: Trace? = null, impersonatedActor: Actor? = null) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor) @JvmOverloads
@Suppress("LongParameterList")
fun newInstance(
origin: InvocationOrigin,
trace: Trace = Trace.newInstance(),
actor: Actor? = null,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
arguments: List<Any?> = emptyList()
) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments)
/** /**
* Creates an [InvocationContext] with [InvocationOrigin.RPC] origin. * Creates an [InvocationContext] with [InvocationOrigin.RPC] origin.
*/ */
@DeleteForDJVM @DeleteForDJVM
@JvmStatic @JvmStatic
fun rpc(actor: Actor, trace: Trace = Trace.newInstance(), externalTrace: Trace? = null, impersonatedActor: Actor? = null): InvocationContext = newInstance(InvocationOrigin.RPC(actor), trace, actor, externalTrace, impersonatedActor) @JvmOverloads
fun rpc(
actor: Actor,
trace: Trace = Trace.newInstance(),
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
arguments: List<Any?> = emptyList()
): InvocationContext = newInstance(InvocationOrigin.RPC(actor), trace, actor, externalTrace, impersonatedActor, arguments)
/** /**
* Creates an [InvocationContext] with [InvocationOrigin.Peer] origin. * Creates an [InvocationContext] with [InvocationOrigin.Peer] origin.
@ -67,6 +99,23 @@ data class InvocationContext(val origin: InvocationOrigin, val trace: Trace, val
* Associated security principal. * Associated security principal.
*/ */
fun principal(): Principal = origin.principal() fun principal(): Principal = origin.principal()
fun copy(
origin: InvocationOrigin = this.origin,
trace: Trace = this.trace,
actor: Actor? = this.actor,
externalTrace: Trace? = this.externalTrace,
impersonatedActor: Actor? = this.impersonatedActor
): InvocationContext {
return copy(
origin = origin,
trace = trace,
actor = actor,
externalTrace = externalTrace,
impersonatedActor = impersonatedActor,
arguments = arguments
)
}
} }
/** /**

View File

@ -11,7 +11,6 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.vault.SessionScope import net.corda.core.node.services.vault.SessionScope
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.DriverParameters
@ -63,9 +62,10 @@ class CordaPersistenceServiceTests {
(1..count).toList().parallelStream().forEach { (1..count).toList().parallelStream().forEach {
val now = Instant.now() val now = Instant.now()
services.database.transaction { services.database.transaction {
val flowId = it.toString()
session.save( session.save(
DBCheckpointStorage.DBFlowCheckpoint( DBCheckpointStorage.DBFlowCheckpoint(
id = it.toString(), id = flowId,
blob = DBCheckpointStorage.DBFlowCheckpointBlob( blob = DBCheckpointStorage.DBFlowCheckpointBlob(
checkpoint = ByteArray(8192), checkpoint = ByteArray(8192),
flowStack = ByteArray(8192), flowStack = ByteArray(8192),
@ -78,8 +78,8 @@ class CordaPersistenceServiceTests {
compatible = false, compatible = false,
progressStep = "", progressStep = "",
ioRequestType = FlowIORequest.ForceCheckpoint::class.java.simpleName, ioRequestType = FlowIORequest.ForceCheckpoint::class.java.simpleName,
checkpointInstant = Instant.now(), checkpointInstant = now,
flowMetadata = createMetadataRecord(UUID.randomUUID(), now) flowMetadata = createMetadataRecord(flowId, now)
) )
) )
} }
@ -88,18 +88,17 @@ class CordaPersistenceServiceTests {
return count return count
} }
private fun SessionScope.createMetadataRecord(invocationId: UUID, timestamp: Instant): DBCheckpointStorage.DBFlowMetadata { private fun SessionScope.createMetadataRecord(flowId: String, timestamp: Instant): DBCheckpointStorage.DBFlowMetadata {
val metadata = DBCheckpointStorage.DBFlowMetadata( val metadata = DBCheckpointStorage.DBFlowMetadata(
invocationId = invocationId.toString(), invocationId = UUID.randomUUID().toString(),
flowId = null, flowId = flowId,
flowName = "random.flow", flowName = "random.flow",
userSuppliedIdentifier = null, userSuppliedIdentifier = null,
startType = DBCheckpointStorage.StartReason.RPC, startType = DBCheckpointStorage.StartReason.RPC,
launchingCordapp = "this cordapp", launchingCordapp = "this cordapp",
platformVersion = PLATFORM_VERSION, platformVersion = PLATFORM_VERSION,
rpcUsername = "Batman", rpcUsername = "Batman",
invocationInstant = Instant.now(), invocationInstant = timestamp,
receivedInstant = Instant.now(),
startInstant = timestamp, startInstant = timestamp,
finishInstant = null finishInstant = null
) )

View File

@ -1,7 +1,10 @@
package net.corda.node.services.persistence package net.corda.node.services.persistence
import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
@ -12,6 +15,7 @@ import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.node.services.statemachine.CheckpointState import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.ErrorState import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.currentDBSession
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
@ -43,6 +47,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
private const val HMAC_SIZE_BYTES = 16 private const val HMAC_SIZE_BYTES = 16
private const val MAX_PROGRESS_STEP_LENGTH = 256 private const val MAX_PROGRESS_STEP_LENGTH = 256
private const val MAX_FLOW_NAME_LENGTH = 128
private val NOT_RUNNABLE_CHECKPOINTS = listOf(FlowStatus.COMPLETED, FlowStatus.FAILED, FlowStatus.KILLED) private val NOT_RUNNABLE_CHECKPOINTS = listOf(FlowStatus.COMPLETED, FlowStatus.FAILED, FlowStatus.KILLED)
@ -71,7 +76,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
} }
enum class StartReason { enum class StartReason {
RPC, FLOW, SERVICE, SCHEDULED, INITIATED RPC, SERVICE, SCHEDULED, INITIATED
} }
@Entity @Entity
@ -94,7 +99,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
var exceptionDetails: DBFlowException?, var exceptionDetails: DBFlowException?,
@OneToOne(fetch = FetchType.LAZY) @OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "invocation_id", referencedColumnName = "invocation_id") @JoinColumn(name = "flow_id", referencedColumnName = "flow_id")
var flowMetadata: DBFlowMetadata, var flowMetadata: DBFlowMetadata,
@Column(name = "status", nullable = false) @Column(name = "status", nullable = false)
@ -180,12 +185,12 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
class DBFlowMetadata( class DBFlowMetadata(
@Id @Id
@Column(name = "flow_id", nullable = false)
var flowId: String,
@Column(name = "invocation_id", nullable = false) @Column(name = "invocation_id", nullable = false)
var invocationId: String, var invocationId: String,
@Column(name = "flow_id", nullable = true)
var flowId: String?,
@Column(name = "flow_name", nullable = false) @Column(name = "flow_name", nullable = false)
var flowName: String, var flowName: String,
@ -210,16 +215,51 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
@Column(name = "invocation_time", nullable = false) @Column(name = "invocation_time", nullable = false)
var invocationInstant: Instant, var invocationInstant: Instant,
@Column(name = "received_time", nullable = false)
var receivedInstant: Instant,
@Column(name = "start_time", nullable = true) @Column(name = "start_time", nullable = true)
var startInstant: Instant?, var startInstant: Instant,
@Column(name = "finish_time", nullable = true) @Column(name = "finish_time", nullable = true)
var finishInstant: Instant? var finishInstant: Instant?
) {
@Suppress("ComplexMethod")
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
) other as DBFlowMetadata
if (flowId != other.flowId) return false
if (invocationId != other.invocationId) return false
if (flowName != other.flowName) return false
if (userSuppliedIdentifier != other.userSuppliedIdentifier) return false
if (startType != other.startType) return false
if (!initialParameters.contentEquals(other.initialParameters)) return false
if (launchingCordapp != other.launchingCordapp) return false
if (platformVersion != other.platformVersion) return false
if (rpcUsername != other.rpcUsername) return false
if (invocationInstant != other.invocationInstant) return false
if (startInstant != other.startInstant) return false
if (finishInstant != other.finishInstant) return false
return true
}
override fun hashCode(): Int {
var result = flowId.hashCode()
result = 31 * result + invocationId.hashCode()
result = 31 * result + flowName.hashCode()
result = 31 * result + (userSuppliedIdentifier?.hashCode() ?: 0)
result = 31 * result + startType.hashCode()
result = 31 * result + initialParameters.contentHashCode()
result = 31 * result + launchingCordapp.hashCode()
result = 31 * result + platformVersion
result = 31 * result + rpcUsername.hashCode()
result = 31 * result + invocationInstant.hashCode()
result = 31 * result + startInstant.hashCode()
result = 31 * result + (finishInstant?.hashCode() ?: 0)
return result
}
}
override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>) { override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>) {
currentDBSession().save(createDBCheckpoint(id, checkpoint, serializedFlowState)) currentDBSession().save(createDBCheckpoint(id, checkpoint, serializedFlowState))
@ -275,25 +315,13 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
): DBFlowCheckpoint { ): DBFlowCheckpoint {
val flowId = id.uuid.toString() val flowId = id.uuid.toString()
val now = Instant.now() val now = Instant.now()
val invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize() val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now) val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
// Need to update the metadata record to join it to the main checkpoint record
// This code needs to be added back in once the metadata record is properly created (remove the code below it) val metadata = createMetadata(flowId, checkpoint)
// val metadata = requireNotNull(currentDBSession().find(
// DBFlowMetadata::class.java,
// invocationId
// )) { "The flow metadata record for flow [$flowId] with invocation id [$invocationId] does not exist"}
val metadata = (currentDBSession().find(
DBFlowMetadata::class.java,
invocationId
)) ?: createTemporaryMetadata(checkpoint)
metadata.flowId = flowId
currentDBSession().update(metadata)
// Most fields are null as they cannot have been set when creating the initial checkpoint // Most fields are null as they cannot have been set when creating the initial checkpoint
return DBFlowCheckpoint( return DBFlowCheckpoint(
id = flowId, id = flowId,
@ -309,20 +337,24 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
) )
} }
// Remove this when saving of metadata is properly handled private fun createMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata {
private fun createTemporaryMetadata(checkpoint: Checkpoint): DBFlowMetadata { val context = checkpoint.checkpointState.invocationContext
val flowInfo = checkpoint.checkpointState.subFlowStack.first()
return DBFlowMetadata( return DBFlowMetadata(
invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value, flowId = flowId,
flowId = null, invocationId = context.trace.invocationId.value,
flowName = "random.flow", // Truncate the flow name to fit into the database column
// Flow names are unlikely to be this long
flowName = flowInfo.flowClass.name.take(MAX_FLOW_NAME_LENGTH),
// will come from the context
userSuppliedIdentifier = null, userSuppliedIdentifier = null,
startType = DBCheckpointStorage.StartReason.RPC, startType = context.getStartedType(),
launchingCordapp = "this cordapp", initialParameters = context.getFlowParameters().storageSerialize().bytes,
launchingCordapp = (flowInfo.subFlowVersion as? SubFlowVersion.CorDappFlow)?.corDappName ?: "Core flow",
platformVersion = PLATFORM_VERSION, platformVersion = PLATFORM_VERSION,
rpcUsername = "Batman", rpcUsername = context.principal().name,
invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp, invocationInstant = context.trace.invocationId.timestamp,
receivedInstant = Instant.now(), startInstant = Instant.now(),
startInstant = null,
finishInstant = null finishInstant = null
).apply { ).apply {
currentDBSession().save(this) currentDBSession().save(this)
@ -444,6 +476,24 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
} }
} }
private fun InvocationContext.getStartedType(): StartReason {
return when (origin) {
is InvocationOrigin.RPC, is InvocationOrigin.Shell -> StartReason.RPC
is InvocationOrigin.Peer -> StartReason.INITIATED
is InvocationOrigin.Service -> StartReason.SERVICE
is InvocationOrigin.Scheduled -> StartReason.SCHEDULED
}
}
private fun InvocationContext.getFlowParameters(): List<Any?> {
// Only RPC flows have parameters which are found in index 1
return if(arguments.isNotEmpty()) {
uncheckedCast<Any?, Array<Any?>>(arguments[1]).toList()
} else {
emptyList()
}
}
private fun DBFlowCheckpoint.toSerializedCheckpoint(): Checkpoint.Serialized { private fun DBFlowCheckpoint.toSerializedCheckpoint(): Checkpoint.Serialized {
return Checkpoint.Serialized( return Checkpoint.Serialized(
serializedCheckpointState = SerializedBytes(blob.checkpoint), serializedCheckpointState = SerializedBytes(blob.checkpoint),

View File

@ -388,10 +388,11 @@ class RPCServer(
val arguments = Try.on { val arguments = Try.on {
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT) clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
} }
val context = artemisMessage.context(clientToServer.sessionId) val context: RpcAuthContext
context.invocation.pushToLoggingContext()
when (arguments) { when (arguments) {
is Try.Success -> { is Try.Success -> {
context = artemisMessage.context(clientToServer.sessionId, arguments.value)
context.invocation.pushToLoggingContext()
log.debug { "Arguments: ${arguments.value.toTypedArray().contentDeepToString()}" } log.debug { "Arguments: ${arguments.value.toTypedArray().contentDeepToString()}" }
rpcExecutor!!.submit { rpcExecutor!!.submit {
val result = invokeRpc(context, clientToServer.methodName, arguments.value) val result = invokeRpc(context, clientToServer.methodName, arguments.value)
@ -399,6 +400,8 @@ class RPCServer(
} }
} }
is Try.Failure -> { is Try.Failure -> {
context = artemisMessage.context(clientToServer.sessionId, emptyList())
context.invocation.pushToLoggingContext()
// We failed to deserialise the arguments, route back the error // We failed to deserialise the arguments, route back the error
log.warn("Inbound RPC failed", arguments.exception) log.warn("Inbound RPC failed", arguments.exception)
sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments) sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments)
@ -476,12 +479,12 @@ class RPCServer(
observableMap.cleanUp() observableMap.cleanUp()
} }
private fun ClientMessage.context(sessionId: Trace.SessionId): RpcAuthContext { private fun ClientMessage.context(sessionId: Trace.SessionId, arguments: List<Any?>): RpcAuthContext {
val trace = Trace.newInstance(sessionId = sessionId) val trace = Trace.newInstance(sessionId = sessionId)
val externalTrace = externalTrace() val externalTrace = externalTrace()
val rpcActor = actorFrom(this) val rpcActor = actorFrom(this)
val impersonatedActor = impersonatedActor() val impersonatedActor = impersonatedActor()
return RpcAuthContext(InvocationContext.rpc(rpcActor.first, trace, externalTrace, impersonatedActor), rpcActor.second) return RpcAuthContext(InvocationContext.rpc(rpcActor.first, trace, externalTrace, impersonatedActor, arguments), rpcActor.second)
} }
private fun actorFrom(message: ClientMessage): Pair<Actor, AuthorizingSubject> { private fun actorFrom(message: ClientMessage): Pair<Actor, AuthorizingSubject> {

View File

@ -154,14 +154,22 @@ class TopLevelTransition(
private fun suspendTransition(event: Event.Suspend): TransitionResult { private fun suspendTransition(event: Event.Suspend): TransitionResult {
return builder { return builder {
val newCheckpoint = currentState.checkpoint.copy( val newCheckpoint = currentState.checkpoint.run {
val newCheckpointState = if (checkpointState.invocationContext.arguments.isNotEmpty()) {
checkpointState.copy(
invocationContext = checkpointState.invocationContext.copy(arguments = emptyList()),
numberOfSuspends = checkpointState.numberOfSuspends + 1
)
} else {
checkpointState.copy(numberOfSuspends = checkpointState.numberOfSuspends + 1)
}
copy(
flowState = FlowState.Started(event.ioRequest, event.fiber), flowState = FlowState.Started(event.ioRequest, event.fiber),
checkpointState = currentState.checkpoint.checkpointState.copy( checkpointState = newCheckpointState,
numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends + 1
),
flowIoRequest = event.ioRequest::class.java.simpleName, flowIoRequest = event.ioRequest::class.java.simpleName,
progressStep = event.progressStep?.label progressStep = event.progressStep?.label
) )
}
if (event.maySkipCheckpoint) { if (event.maySkipCheckpoint) {
actions.addAll(arrayOf( actions.addAll(arrayOf(
Action.CommitTransaction, Action.CommitTransaction,

View File

@ -9,7 +9,7 @@
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/> <addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/> <addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/> <addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/>
<addPrimaryKey columnNames="invocation_id" constraintName="node_flow_metadata_pk" tableName="node_flow_metadata"/> <addPrimaryKey columnNames="flow_id" constraintName="node_flow_metadata_pk" tableName="node_flow_metadata"/>
</changeSet> </changeSet>
@ -26,9 +26,9 @@
constraintName="node_checkpoint_to_result_fk" constraintName="node_checkpoint_to_result_fk"
referencedColumnNames="id" referencedTableName="node_flow_results"/> referencedColumnNames="id" referencedTableName="node_flow_results"/>
<addForeignKeyConstraint baseColumnNames="invocation_id" baseTableName="node_checkpoints" <addForeignKeyConstraint baseColumnNames="flow_id" baseTableName="node_checkpoints"
constraintName="node_metadata_to_checkpoints_fk" constraintName="node_metadata_to_checkpoints_fk"
referencedColumnNames="invocation_id" referencedTableName="node_flow_metadata"/> referencedColumnNames="flow_id" referencedTableName="node_flow_metadata"/>
</changeSet> </changeSet>

View File

@ -19,9 +19,6 @@
<column name="error_id" type="BIGINT"> <column name="error_id" type="BIGINT">
<constraints nullable="true"/> <constraints nullable="true"/>
</column> </column>
<column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="status" type="TINYINT"> <column name="status" type="TINYINT">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
@ -98,11 +95,11 @@
<changeSet author="R3.Corda" id="add_new_flow_metadata_table-pg" dbms="postgresql"> <changeSet author="R3.Corda" id="add_new_flow_metadata_table-pg" dbms="postgresql">
<createTable tableName="node_flow_metadata"> <createTable tableName="node_flow_metadata">
<column name="invocation_id" type="NVARCHAR(128)"> <column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="flow_id" type="NVARCHAR(64)"> <column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="true"/> <constraints nullable="false"/>
</column> </column>
<column name="flow_name" type="NVARCHAR(128)"> <column name="flow_name" type="NVARCHAR(128)">
<constraints nullable="false"/> <constraints nullable="false"/>
@ -116,23 +113,20 @@
<column name="flow_parameters" type="varbinary(33554432)"> <column name="flow_parameters" type="varbinary(33554432)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="cordapp_name" type="NVARCHAR(64)"> <column name="cordapp_name" type="NVARCHAR(128)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="platform_version" type="TINYINT"> <column name="platform_version" type="TINYINT">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="rpc_user" type="NVARCHAR(64)"> <column name="rpc_user" type="NVARCHAR(128)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="invocation_time" type="java.sql.Types.TIMESTAMP"> <column name="invocation_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="received_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="start_time" type="java.sql.Types.TIMESTAMP"> <column name="start_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/> <constraints nullable="false"/>
</column> </column>
<column name="finish_time" type="java.sql.Types.TIMESTAMP"> <column name="finish_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/> <constraints nullable="true"/>

View File

@ -19,9 +19,6 @@
<column name="error_id" type="BIGINT"> <column name="error_id" type="BIGINT">
<constraints nullable="true"/> <constraints nullable="true"/>
</column> </column>
<column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="status" type="TINYINT"> <column name="status" type="TINYINT">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
@ -98,11 +95,11 @@
<changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="!postgresql"> <changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="!postgresql">
<createTable tableName="node_flow_metadata"> <createTable tableName="node_flow_metadata">
<column name="invocation_id" type="NVARCHAR(128)"> <column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="flow_id" type="NVARCHAR(64)"> <column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="true"/> <constraints nullable="false"/>
</column> </column>
<column name="flow_name" type="NVARCHAR(128)"> <column name="flow_name" type="NVARCHAR(128)">
<constraints nullable="false"/> <constraints nullable="false"/>
@ -116,23 +113,20 @@
<column name="flow_parameters" type="blob"> <column name="flow_parameters" type="blob">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="cordapp_name" type="NVARCHAR(64)"> <column name="cordapp_name" type="NVARCHAR(128)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="platform_version" type="TINYINT"> <column name="platform_version" type="TINYINT">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="rpc_user" type="NVARCHAR(64)"> <column name="rpc_user" type="NVARCHAR(128)">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="invocation_time" type="java.sql.Types.TIMESTAMP"> <column name="invocation_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="received_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="start_time" type="java.sql.Types.TIMESTAMP"> <column name="start_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/> <constraints nullable="false"/>
</column> </column>
<column name="finish_time" type="java.sql.Types.TIMESTAMP"> <column name="finish_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/> <constraints nullable="true"/>

View File

@ -1,10 +1,10 @@
package net.corda.node.services.persistence package net.corda.node.services.persistence
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointSerialize import net.corda.core.serialization.internal.checkpointSerialize
@ -21,7 +21,6 @@ import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
@ -38,7 +37,6 @@ import org.junit.Before
import org.junit.Ignore import org.junit.Ignore
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import java.time.Instant
import kotlin.streams.toList import kotlin.streams.toList
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -79,7 +77,6 @@ class DBCheckpointStorageTests {
val (id, checkpoint) = newCheckpoint() val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
database.transaction { database.transaction {
@ -107,7 +104,6 @@ class DBCheckpointStorageTests {
val (id, checkpoint) = newCheckpoint() val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val logic: FlowLogic<*> = object : FlowLogic<String>() { val logic: FlowLogic<*> = object : FlowLogic<String>() {
@ -141,7 +137,6 @@ class DBCheckpointStorageTests {
val (id, checkpoint) = newCheckpoint() val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
database.transaction { database.transaction {
@ -163,8 +158,6 @@ class DBCheckpointStorageTests {
val (id2, checkpoint2) = newCheckpoint() val (id2, checkpoint2) = newCheckpoint()
val serializedFlowState2 = checkpoint.serializeFlowState() val serializedFlowState2 = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
createMetadataRecord(checkpoint2)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
checkpointStorage.addCheckpoint(id2, checkpoint2, serializedFlowState2) checkpointStorage.addCheckpoint(id2, checkpoint2, serializedFlowState2)
checkpointStorage.removeCheckpoint(id) checkpointStorage.removeCheckpoint(id)
@ -190,13 +183,11 @@ class DBCheckpointStorageTests {
val serializedFirstFlowState = firstCheckpoint.serializeFlowState() val serializedFirstFlowState = firstCheckpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(firstCheckpoint)
checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState) checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState)
} }
val (id2, secondCheckpoint) = newCheckpoint() val (id2, secondCheckpoint) = newCheckpoint()
val serializedSecondFlowState = secondCheckpoint.serializeFlowState() val serializedSecondFlowState = secondCheckpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(secondCheckpoint)
checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState) checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState)
} }
database.transaction { database.transaction {
@ -222,7 +213,6 @@ class DBCheckpointStorageTests {
val (id, originalCheckpoint) = newCheckpoint() val (id, originalCheckpoint) = newCheckpoint()
val serializedOriginalFlowState = originalCheckpoint.serializeFlowState() val serializedOriginalFlowState = originalCheckpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(originalCheckpoint)
checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState) checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState)
} }
newCheckpointStorage() newCheckpointStorage()
@ -242,13 +232,52 @@ class DBCheckpointStorageTests {
} }
} }
@Test(timeout = 300_000)
fun `adding a new checkpoint creates a metadata record`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
session.get(DBCheckpointStorage.DBFlowMetadata::class.java, id.uuid.toString()).also {
assertNotNull(it)
}
}
}
@Test(timeout = 300_000)
fun `updating a checkpoint does not change the metadata record`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val metadata = database.transaction {
session.get(DBCheckpointStorage.DBFlowMetadata::class.java, id.uuid.toString()).also {
assertNotNull(it)
}
}
val updatedCheckpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
invocationContext = InvocationContext.newInstance(InvocationOrigin.Peer(ALICE_NAME))
)
)
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, serializedFlowState)
}
val potentiallyUpdatedMetadata = database.transaction {
session.get(DBCheckpointStorage.DBFlowMetadata::class.java, id.uuid.toString())
}
assertEquals(metadata, potentiallyUpdatedMetadata)
}
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `verify checkpoints compatible`() { fun `verify checkpoints compatible`() {
val mockServices = MockServices(emptyList(), ALICE.name) val mockServices = MockServices(emptyList(), ALICE.name)
database.transaction { database.transaction {
val (id, checkpoint) = newCheckpoint(1) val (id, checkpoint) = newCheckpoint(1)
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
@ -259,7 +288,6 @@ class DBCheckpointStorageTests {
database.transaction { database.transaction {
val (id1, checkpoint1) = newCheckpoint(2) val (id1, checkpoint1) = newCheckpoint(2)
val serializedFlowState1 = checkpoint1.serializeFlowState() val serializedFlowState1 = checkpoint1.serializeFlowState()
createMetadataRecord(checkpoint1)
checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1) checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1)
} }
@ -278,7 +306,6 @@ class DBCheckpointStorageTests {
val serializedFlowState = val serializedFlowState =
checkpoint.serializeFlowState() checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val updatedCheckpoint = checkpoint.copy(result = result) val updatedCheckpoint = checkpoint.copy(result = result)
@ -307,7 +334,6 @@ class DBCheckpointStorageTests {
val serializedFlowState = val serializedFlowState =
checkpoint.serializeFlowState() checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val updatedCheckpoint = checkpoint.copy(result = result) val updatedCheckpoint = checkpoint.copy(result = result)
@ -339,7 +365,6 @@ class DBCheckpointStorageTests {
val serializedFlowState = val serializedFlowState =
checkpoint.serializeFlowState() checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val updatedCheckpoint = checkpoint.copy(result = result) val updatedCheckpoint = checkpoint.copy(result = result)
@ -367,7 +392,6 @@ class DBCheckpointStorageTests {
val (id, checkpoint) = newCheckpoint() val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val updatedCheckpoint = checkpoint.addError(exception) val updatedCheckpoint = checkpoint.addError(exception)
@ -393,7 +417,6 @@ class DBCheckpointStorageTests {
val (id, checkpoint) = newCheckpoint() val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val updatedCheckpoint1 = checkpoint.addError(illegalStateException) val updatedCheckpoint1 = checkpoint.addError(illegalStateException)
@ -421,7 +444,6 @@ class DBCheckpointStorageTests {
val (id, checkpoint) = newCheckpoint() val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState() val serializedFlowState = checkpoint.serializeFlowState()
database.transaction { database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
} }
val updatedCheckpoint = checkpoint.addError(exception) val updatedCheckpoint = checkpoint.addError(exception)
@ -454,7 +476,7 @@ class DBCheckpointStorageTests {
database.transaction { database.transaction {
val newCheckpoint = checkpoint.copy(flowIoRequest = FlowIORequest.Sleep::class.java.simpleName) val newCheckpoint = checkpoint.copy(flowIoRequest = FlowIORequest.Sleep::class.java.simpleName)
val serializedFlowState = newCheckpoint.flowState.checkpointSerialize( val serializedFlowState = newCheckpoint.flowState.checkpointSerialize(
context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
) )
checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState) checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState)
} }
@ -574,22 +596,4 @@ class DBCheckpointStorageTests {
) )
) )
} }
private fun DatabaseTransaction.createMetadataRecord(checkpoint: Checkpoint) {
val metadata = DBCheckpointStorage.DBFlowMetadata(
invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value,
flowId = null,
flowName = "random.flow",
userSuppliedIdentifier = null,
startType = DBCheckpointStorage.StartReason.RPC,
launchingCordapp = "this cordapp",
platformVersion = PLATFORM_VERSION,
rpcUsername = "Batman",
invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp,
receivedInstant = Instant.now(),
startInstant = null,
finishInstant = null
)
session.save(metadata)
}
} }

View File

@ -0,0 +1,512 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.context.InvocationContext
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.SchedulableFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StartableByService
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.Permissions
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.function.Supplier
import kotlin.reflect.jvm.jvmName
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class FlowMetadataRecordingTest {
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val string = "I must be delivered for 4.5"
private val someObject = SomeObject("Store me in the database please", 1234)
@Before
fun before() {
MyFlow.hookAfterInitialCheckpoint = null
MyFlow.hookAfterSuspend = null
MyResponder.hookAfterInitialCheckpoint = null
MyFlowWithoutParameters.hookAfterInitialCheckpoint = null
}
@Test(timeout = 300_000)
fun `rpc started flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
var flowId: StateMachineRunId? = null
var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyFlow.hookAfterInitialCheckpoint =
{ flowIdFromHook: StateMachineRunId, contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
flowId = flowIdFromHook
context = contextFromHook
metadata = metadataFromHook
}
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
metadata!!.let {
assertEquals(context!!.trace.invocationId.value, it.invocationId)
assertEquals(flowId!!.uuid.toString(), it.flowId)
assertEquals(MyFlow::class.java.name, it.flowName)
// Should be changed when [userSuppliedIdentifier] gets filled in future changes
assertNull(it.userSuppliedIdentifier)
assertEquals(DBCheckpointStorage.StartReason.RPC, it.startType)
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
assertThat(it.launchingCordapp).contains("custom-cordapp")
assertEquals(PLATFORM_VERSION, it.platformVersion)
assertEquals(user.username, it.rpcUsername)
assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant)
assertTrue(it.startInstant >= it.invocationInstant)
assertNull(it.finishInstant)
}
}
}
@Test(timeout = 300_000)
fun `rpc started flows have metadata recorded - no parameters`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
var flowId: StateMachineRunId? = null
var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyFlowWithoutParameters.hookAfterInitialCheckpoint =
{ flowIdFromHook: StateMachineRunId, contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
flowId = flowIdFromHook
context = contextFromHook
metadata = metadataFromHook
}
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::MyFlowWithoutParameters).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
metadata!!.let {
assertEquals(context!!.trace.invocationId.value, it.invocationId)
assertEquals(flowId!!.uuid.toString(), it.flowId)
assertEquals(MyFlowWithoutParameters::class.java.name, it.flowName)
// Should be changed when [userSuppliedIdentifier] gets filled in future changes
assertNull(it.userSuppliedIdentifier)
assertEquals(DBCheckpointStorage.StartReason.RPC, it.startType)
assertEquals(
emptyList<Any?>(),
it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
assertThat(it.launchingCordapp).contains("custom-cordapp")
assertEquals(PLATFORM_VERSION, it.platformVersion)
assertEquals(user.username, it.rpcUsername)
assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant)
assertTrue(it.startInstant >= it.invocationInstant)
assertNull(it.finishInstant)
}
}
}
@Test(timeout = 300_000)
fun `rpc started flows have their arguments removed from in-memory checkpoint after zero'th checkpoint`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyFlow.hookAfterInitialCheckpoint =
{ _, contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
context = contextFromHook
metadata = metadataFromHook
}
var context2: InvocationContext? = null
var metadata2: DBCheckpointStorage.DBFlowMetadata? = null
MyFlow.hookAfterSuspend =
{ contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
context2 = contextFromHook
metadata2 = metadataFromHook
}
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
uncheckedCast<Any?, Array<Any?>>(context!!.arguments[1]).toList()
)
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
metadata!!.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
assertEquals(
emptyList(),
context2!!.arguments
)
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
metadata2!!.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
}
}
@Test(timeout = 300_000)
fun `initiated flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
var flowId: StateMachineRunId? = null
var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyResponder.hookAfterInitialCheckpoint =
{ flowIdFromHook: StateMachineRunId, contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
flowId = flowIdFromHook
context = contextFromHook
metadata = metadataFromHook
}
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
metadata!!.let {
assertEquals(context!!.trace.invocationId.value, it.invocationId)
assertEquals(flowId!!.uuid.toString(), it.flowId)
assertEquals(MyResponder::class.java.name, it.flowName)
assertNull(it.userSuppliedIdentifier)
assertEquals(DBCheckpointStorage.StartReason.INITIATED, it.startType)
assertEquals(
emptyList<Any?>(),
it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
assertThat(it.launchingCordapp).contains("custom-cordapp")
assertEquals(6, it.platformVersion)
assertEquals(nodeAHandle.nodeInfo.singleIdentity().name.toString(), it.rpcUsername)
assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant)
assertTrue(it.startInstant >= it.invocationInstant)
assertNull(it.finishInstant)
}
}
}
@Test(timeout = 300_000)
fun `service started flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
var flowId: StateMachineRunId? = null
var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyFlow.hookAfterInitialCheckpoint =
{ flowIdFromHook: StateMachineRunId, contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
flowId = flowIdFromHook
context = contextFromHook
metadata = metadataFromHook
}
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyServiceStartingFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
metadata!!.let {
assertEquals(context!!.trace.invocationId.value, it.invocationId)
assertEquals(flowId!!.uuid.toString(), it.flowId)
assertEquals(MyFlow::class.java.name, it.flowName)
assertNull(it.userSuppliedIdentifier)
assertEquals(DBCheckpointStorage.StartReason.SERVICE, it.startType)
assertEquals(
emptyList<Any?>(),
it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
assertThat(it.launchingCordapp).contains("custom-cordapp")
assertEquals(PLATFORM_VERSION, it.platformVersion)
assertEquals(MyService::class.java.name, it.rpcUsername)
assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant)
assertTrue(it.startInstant >= it.invocationInstant)
assertNull(it.finishInstant)
}
}
}
@Test(timeout = 300_000)
fun `scheduled flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val lock = Semaphore(1)
var flowId: StateMachineRunId? = null
var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyFlow.hookAfterInitialCheckpoint =
{ flowIdFromHook: StateMachineRunId, contextFromHook: InvocationContext, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
flowId = flowIdFromHook
context = contextFromHook
metadata = metadataFromHook
// Release the lock so the asserts can be processed
lock.release()
}
// Acquire the lock to prevent the asserts from being processed too early
lock.acquire()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyStartedScheduledFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
// Block here until released in the hook
lock.acquire()
metadata!!.let {
assertEquals(context!!.trace.invocationId.value, it.invocationId)
assertEquals(flowId!!.uuid.toString(), it.flowId)
assertEquals(MyFlow::class.java.name, it.flowName)
assertNull(it.userSuppliedIdentifier)
assertEquals(DBCheckpointStorage.StartReason.SCHEDULED, it.startType)
assertEquals(
emptyList<Any?>(),
it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
)
assertThat(it.launchingCordapp).contains("custom-cordapp")
assertEquals(PLATFORM_VERSION, it.platformVersion)
assertEquals("Scheduler", it.rpcUsername)
assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant)
assertTrue(it.startInstant >= it.invocationInstant)
assertNull(it.finishInstant)
}
}
}
@InitiatingFlow
@StartableByRPC
@StartableByService
@SchedulableFlow
@Suppress("UNUSED_PARAMETER")
class MyFlow(private val party: Party, string: String, someObject: SomeObject) :
FlowLogic<Unit>() {
companion object {
var hookAfterInitialCheckpoint: ((
flowId: StateMachineRunId,
context: InvocationContext,
metadata: DBCheckpointStorage.DBFlowMetadata
) -> Unit)? = null
var hookAfterSuspend: ((
context: InvocationContext,
metadata: DBCheckpointStorage.DBFlowMetadata
) -> Unit)? = null
}
@Suspendable
override fun call() {
hookAfterInitialCheckpoint?.let {
it(
stateMachine.id,
stateMachine.context,
serviceHub.cordaService(MyService::class.java).findMetadata(stateMachine.id)
)
}
initiateFlow(party).sendAndReceive<String>("Hello there")
hookAfterSuspend?.let {
it(
stateMachine.context,
serviceHub.cordaService(MyService::class.java).findMetadata(stateMachine.id)
)
}
}
}
@InitiatedBy(MyFlow::class)
class MyResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
var hookAfterInitialCheckpoint: ((
flowId: StateMachineRunId,
context: InvocationContext,
metadata: DBCheckpointStorage.DBFlowMetadata
) -> Unit)? = null
}
@Suspendable
override fun call() {
session.receive<String>()
hookAfterInitialCheckpoint?.let {
it(
stateMachine.id,
stateMachine.context,
serviceHub.cordaService(MyService::class.java).findMetadata(stateMachine.id)
)
}
session.send("Hello there")
}
}
@StartableByRPC
class MyFlowWithoutParameters : FlowLogic<Unit>() {
companion object {
var hookAfterInitialCheckpoint: ((
flowId: StateMachineRunId,
context: InvocationContext,
metadata: DBCheckpointStorage.DBFlowMetadata
) -> Unit)? = null
}
@Suspendable
override fun call() {
hookAfterInitialCheckpoint?.let {
it(
stateMachine.id,
stateMachine.context,
serviceHub.cordaService(MyService::class.java).findMetadata(stateMachine.id)
)
}
}
}
@StartableByRPC
class MyServiceStartingFlow(private val party: Party, private val string: String, private val someObject: SomeObject) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(object : FlowExternalAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CompletableFuture<Unit> {
return serviceHub.cordaService(MyService::class.java).startFlow(party, string, someObject)
}
})
}
}
@StartableByRPC
class MyStartedScheduledFlow(private val party: Party, private val string: String, private val someObject: SomeObject) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(ScheduledState(party, string, someObject, listOf(ourIdentity)))
addCommand(DummyContract.Commands.Create(), ourIdentity.owningKey)
}
val stx = serviceHub.signInitialTransaction(tx)
serviceHub.recordTransactions(stx)
}
}
@CordaService
class MyService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
private val executorService = Executors.newFixedThreadPool(1)
fun findMetadata(flowId: StateMachineRunId): DBCheckpointStorage.DBFlowMetadata {
return services.database.transaction {
session.find(DBCheckpointStorage.DBFlowMetadata::class.java, flowId.uuid.toString())
}
}
fun startFlow(party: Party, string: String, someObject: SomeObject): CompletableFuture<Unit> {
return CompletableFuture.supplyAsync(
Supplier<Unit> { services.startFlow(MyFlow(party, string, someObject)).returnValue.getOrThrow() },
executorService
)
}
}
@CordaSerializable
data class SomeObject(private val string: String, private val number: Int)
@BelongsToContract(DummyContract::class)
data class ScheduledState(
val party: Party,
val string: String,
val someObject: SomeObject,
override val participants: List<Party>,
override val linearId: UniqueIdentifier = UniqueIdentifier()
) : SchedulableState, LinearState {
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
val logicRef = flowLogicRefFactory.create(MyFlow::class.jvmName, party, string, someObject)
return ScheduledActivity(logicRef, Instant.now())
}
}
}