mirror of
https://github.com/corda/corda.git
synced 2025-04-18 08:10:47 +00:00
Merge pull request #968 from corda/merges/os-2018-06-11-a590d5e
OS merge on 2018-06-11 up to a590d5e
This commit is contained in:
commit
5ee242de4f
.idea
CONTRIBUTORS.mdcore/src/main/kotlin/net/corda/core
docs/source
finance/src
main/kotlin/net/corda/finance/schemas
test/kotlin/net/corda/finance/schemas
node/src
main/kotlin/net/corda/node
internal
services
events
identity
persistence
statemachine
SingleThreadedStateMachineManager.ktStaffedFlowHospital.ktTransitionExecutor.ktTransitionExecutorImpl.kt
interceptors
transactions
upgrade
vault
test/kotlin/net/corda/node
testing
node-driver/src/main/kotlin/net/corda/testing/node/internal
test-utils/src/main/kotlin/net/corda/testing/internal/vault
tools
2
.idea/compiler.xml
generated
2
.idea/compiler.xml
generated
@ -100,8 +100,6 @@
|
||||
<module name="experimental-behave_main" target="1.8" />
|
||||
<module name="experimental-behave_smokeTest" target="1.8" />
|
||||
<module name="experimental-behave_test" target="1.8" />
|
||||
<module name="experimental-blobinspector_main" target="1.8" />
|
||||
<module name="experimental-blobinspector_test" target="1.8" />
|
||||
<module name="experimental-kryo-hook_main" target="1.8" />
|
||||
<module name="experimental-kryo-hook_test" target="1.8" />
|
||||
<module name="experimental_main" target="1.8" />
|
||||
|
@ -74,7 +74,6 @@ see changes to this list.
|
||||
* gary-rowe
|
||||
* Gavin Thomas (R3)
|
||||
* George Marcel Smetana (Bradesco)
|
||||
* George Smetana (Bradesco)
|
||||
* Giulio Katis (Westpac)
|
||||
* Giuseppe Cardone (Intesa Sanpaolo)
|
||||
* Guy Hochstetler (R3)
|
||||
@ -134,7 +133,7 @@ see changes to this list.
|
||||
* Nigel King (R3)
|
||||
* Nitesh Solanki (Persistent Systems Limited)
|
||||
* Nuam Athaweth (MUFG)
|
||||
* Oscar Zibordi de Paiva (Bradesco)
|
||||
* Oscar Zibordi de Paiva (Scopus Soluções em TI)
|
||||
* OP Financial
|
||||
* Patrick Kuo (R3)
|
||||
* Pekka Kaipio (OP Financial)
|
||||
@ -172,7 +171,7 @@ see changes to this list.
|
||||
* Stanly Johnson (Servntire Global)
|
||||
* Szymon Sztuka (R3)
|
||||
* tb-pq
|
||||
* Thiago Rafael Ferreira (Scorpius IT Solutions)
|
||||
* Thiago Rafael Ferreira (Scopus Soluções em TI)
|
||||
* Thomas O'Donnell (Macquarie)
|
||||
* Thomas Schroeter (R3)
|
||||
* Tim Swanson (R3)
|
||||
@ -185,6 +184,7 @@ see changes to this list.
|
||||
* Tudor Malene (R3)
|
||||
* Tushar Singh Bora
|
||||
* varunkm
|
||||
* Venelin Stoykov (INDUSTRIA)
|
||||
* verymahler
|
||||
* Viktor Kolomeyko (R3)
|
||||
* Vipin Bharathan
|
||||
|
@ -56,13 +56,13 @@ sealed class SecureHash(bytes: ByteArray) : OpaqueBytes(bytes) {
|
||||
* @throws IllegalArgumentException The input string does not contain 64 hexadecimal digits, or it contains incorrectly-encoded characters.
|
||||
*/
|
||||
@JvmStatic
|
||||
fun parse(str: String): SHA256 {
|
||||
return str.toUpperCase().parseAsHex().let {
|
||||
fun parse(str: String?): SHA256 {
|
||||
return str?.toUpperCase()?.parseAsHex()?.let {
|
||||
when (it.size) {
|
||||
32 -> SHA256(it)
|
||||
else -> throw IllegalArgumentException("Provided string is ${it.size} bytes not 32 bytes in hex: $str")
|
||||
}
|
||||
}
|
||||
} ?: throw IllegalArgumentException("Provided string is null")
|
||||
}
|
||||
|
||||
private val sha256MessageDigest = SHA256DigestSupplier()
|
||||
|
@ -158,7 +158,13 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||
|
||||
override fun maybeWriteToDisk(downloaded: List<Attachment>) {
|
||||
for (attachment in downloaded) {
|
||||
serviceHub.attachments.importAttachment(attachment.open(), "$P2P_UPLOADER:${otherSideSession.counterparty.name}", null)
|
||||
with(serviceHub.attachments) {
|
||||
if (!hasAttachment(attachment.id)) {
|
||||
importAttachment(attachment.open(), "$P2P_UPLOADER:${otherSideSession.counterparty.name}", null)
|
||||
} else {
|
||||
logger.info("Attachment ${attachment.id} already exists, skipping.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ object CommonSchemaV1 : MappedSchema(schemaFamily = CommonSchema.javaClass, vers
|
||||
|
||||
/** X500Name of participant parties **/
|
||||
@Transient
|
||||
open var participants: MutableSet<AbstractParty>? = null,
|
||||
open var participants: MutableSet<AbstractParty?>? = null,
|
||||
|
||||
/** [OwnableState] attributes */
|
||||
|
||||
|
@ -492,7 +492,7 @@ whether the change is one of position (i.e. progress), structure (i.e. new subta
|
||||
aspect of rendering (i.e. a step has changed in some way and is requesting a re-render).
|
||||
|
||||
The flow framework is somewhat integrated with this API. Each ``FlowLogic`` may optionally provide a tracker by
|
||||
overriding the ``flowTracker`` property (``getFlowTracker`` method in Java). If the
|
||||
overriding the ``progressTracker`` property (``getProgressTracker`` method in Java). If the
|
||||
``FlowLogic.subFlow`` method is used, then the tracker of the sub-flow will be made a child of the current
|
||||
step in the parent flow automatically, if the parent is using tracking in the first place. The framework will also
|
||||
automatically set the current step to ``DONE`` for you, when the flow is finished.
|
||||
|
@ -43,6 +43,8 @@ in the `Kotlin CorDapp Template <https://github.com/corda/cordapp-template-kotli
|
||||
}
|
||||
// No webport property, so no webserver will be created.
|
||||
h2Port 10004
|
||||
// Starts an internal SSH server providing a management shell on the node.
|
||||
sshdPort 2223
|
||||
// Includes the corda-finance CorDapp on our node.
|
||||
cordapps = ["$corda_release_distribution:corda-finance:$corda_release_version"]
|
||||
// Specify a JVM argument to be used when running the node (in this case, extra heap size).
|
||||
|
@ -167,7 +167,7 @@ Run from the terminal
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
1. From the ``cordapp-example`` folder, deploy the nodes by running ``./gradlew deployNodes``
|
||||
2. Move into the ``cordapp-example`` folder by running ``cd cordapp-example``
|
||||
3. Start the nodes by running ``kotlin-source/build/nodes/runnodes``. Do not click while 8 additional terminal windows start up.
|
||||
3. Start the nodes by running ``kotlin-source/build/nodes/runnodes``. Do not click while 7 additional terminal windows start up.
|
||||
4. Wait until all the terminal windows display either ``Webserver started up in XX.X sec`` or ``Node for "NodeC" started up and registered in XX.XX sec``
|
||||
5. Confirm that the CorDapp is running correctly by visiting the front end at http://localhost:10009/web/example/
|
||||
|
||||
|
@ -39,7 +39,7 @@ object CashSchemaV1 : MappedSchema(
|
||||
class PersistentCashState(
|
||||
/** X500Name of owner party **/
|
||||
@Column(name = "owner_name", nullable = true)
|
||||
var owner: AbstractParty,
|
||||
var owner: AbstractParty?,
|
||||
|
||||
@Column(name = "pennies", nullable = false)
|
||||
var pennies: Long,
|
||||
|
@ -30,7 +30,7 @@ object SampleCashSchemaV2 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
|
||||
/** product type */
|
||||
@Column(name = "ccy_code", length = 3, nullable = false)
|
||||
var currency: String,
|
||||
participants: Set<AbstractParty>,
|
||||
participants: Set<AbstractParty?>,
|
||||
owner: AbstractParty,
|
||||
quantity: Long,
|
||||
issuerParty: AbstractParty,
|
||||
@ -40,6 +40,6 @@ object SampleCashSchemaV2 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
|
||||
@ElementCollection
|
||||
@Column(name = "participants", nullable = true)
|
||||
@CollectionTable(name = "cash_states_v2_participants", joinColumns = [JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")])
|
||||
override var participants: MutableSet<AbstractParty>? = null
|
||||
override var participants: MutableSet<AbstractParty?>? = null
|
||||
}
|
||||
}
|
||||
|
@ -36,11 +36,11 @@ object SampleCashSchemaV3 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
|
||||
@CollectionTable(name="cash_state_participants", joinColumns = arrayOf(
|
||||
JoinColumn(name = "output_index", referencedColumnName = "output_index"),
|
||||
JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")))
|
||||
var participants: MutableSet<AbstractParty>? = null,
|
||||
var participants: MutableSet<AbstractParty?>? = null,
|
||||
|
||||
/** X500Name of owner party **/
|
||||
@Column(name = "owner_name", nullable = true)
|
||||
var owner: AbstractParty,
|
||||
var owner: AbstractParty?,
|
||||
|
||||
@Column(name = "pennies", nullable = false)
|
||||
var pennies: Long,
|
||||
@ -50,7 +50,7 @@ object SampleCashSchemaV3 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
|
||||
|
||||
/** X500Name of issuer party **/
|
||||
@Column(name = "issuer_name", nullable = true)
|
||||
var issuer: AbstractParty,
|
||||
var issuer: AbstractParty?,
|
||||
|
||||
@Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false)
|
||||
@Type(type = "corda-wrapper-binary")
|
||||
|
@ -55,6 +55,6 @@ object SampleCommercialPaperSchemaV2 : MappedSchema(schemaFamily = CommercialPap
|
||||
@ElementCollection
|
||||
@Column(name = "participants")
|
||||
@CollectionTable(name = "cp_states_v2_participants", joinColumns = [JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")])
|
||||
override var participants: MutableSet<AbstractParty>? = null
|
||||
override var participants: MutableSet<AbstractParty?>? = null
|
||||
}
|
||||
}
|
||||
|
@ -1099,7 +1099,7 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
|
||||
} catch (ex: Exception) {
|
||||
when {
|
||||
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.")
|
||||
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)
|
||||
ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html")
|
||||
else -> throw CouldNotCreateDataSourceException("Could not create the DataSource: ${ex.message}", ex)
|
||||
}
|
||||
|
@ -25,8 +25,8 @@ class PersistentScheduledFlowRepository(val database: CordaPersistence) : Schedu
|
||||
}
|
||||
|
||||
private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair<StateRef, ScheduledStateRef> {
|
||||
val txId = scheduledStateRecord.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = scheduledStateRecord.output.index ?: throw IllegalStateException("DB returned null integer index")
|
||||
val txId = scheduledStateRecord.output.txId
|
||||
val index = scheduledStateRecord.output.index
|
||||
return Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), scheduledStateRecord.scheduledAt))
|
||||
}
|
||||
|
||||
|
@ -105,8 +105,8 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
|
||||
@Column(name = "name", length = 128, nullable = false)
|
||||
var name: String = "",
|
||||
|
||||
@Column(name = "pk_hash", length = MAX_HASH_HEX_SIZE, nullable = false)
|
||||
var publicKeyHash: String = ""
|
||||
@Column(name = "pk_hash", length = MAX_HASH_HEX_SIZE, nullable = true)
|
||||
var publicKeyHash: String? = ""
|
||||
) : Serializable
|
||||
|
||||
override val caCertStore: CertStore
|
||||
|
@ -46,8 +46,8 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat
|
||||
@Column(name = "tx_id", length = 64, nullable = false)
|
||||
var txId: String = "",
|
||||
|
||||
@Column(name = "state_machine_run_id", length = 36, nullable = false)
|
||||
var stateMachineRunId: String = ""
|
||||
@Column(name = "state_machine_run_id", length = 36, nullable = true)
|
||||
var stateMachineRunId: String? = ""
|
||||
) : Serializable
|
||||
|
||||
private companion object {
|
||||
|
@ -23,12 +23,20 @@ import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.castIfPossible
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.Try
|
||||
import net.corda.core.utilities.contextLogger
|
||||
@ -40,7 +48,11 @@ import net.corda.node.services.config.shouldCheckCheckpoints
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
||||
import net.corda.node.services.statemachine.interceptors.*
|
||||
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
|
||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -52,7 +64,11 @@ import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.SecureRandom
|
||||
import java.util.*
|
||||
import java.util.concurrent.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.collections.ArrayList
|
||||
@ -230,6 +246,7 @@ class SingleThreadedStateMachineManager(
|
||||
database.transaction {
|
||||
checkpointStorage.removeCheckpoint(id)
|
||||
}
|
||||
transitionExecutor.forceRemoveFlow(id)
|
||||
}
|
||||
} else {
|
||||
// TODO replace with a clustered delete after we'll support clustered nodes
|
||||
|
@ -2,7 +2,6 @@ package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.utilities.contextLogger
|
||||
@ -110,8 +109,8 @@ class StaffedFlowHospital {
|
||||
/**
|
||||
* The flow has been removed from the state machine.
|
||||
*/
|
||||
fun flowRemoved(flowFiber: FlowFiber) {
|
||||
mutex.locked { patients.remove(flowFiber.id) }
|
||||
fun flowRemoved(flowId: StateMachineRunId) {
|
||||
mutex.locked { patients.remove(flowId) }
|
||||
}
|
||||
|
||||
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
|
||||
@ -204,24 +203,14 @@ class StaffedFlowHospital {
|
||||
object DoctorTimeout : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
||||
if (newError is FlowTimeoutException) {
|
||||
if (isTimedFlow(flowFiber)) {
|
||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
||||
return Diagnosis.DISCHARGE
|
||||
} else {
|
||||
log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}")
|
||||
}
|
||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
||||
return Diagnosis.DISCHARGE
|
||||
} else {
|
||||
log.warn("\"Unable to restart flow: ${flowFiber.javaClass}, it is not timed and does not contain any timed sub-flows.")
|
||||
log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}")
|
||||
}
|
||||
}
|
||||
return Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
|
||||
private fun isTimedFlow(flowFiber: FlowFiber): Boolean {
|
||||
return flowFiber.snapshot().checkpoint.subFlowStack.any {
|
||||
TimedFlow::class.java.isAssignableFrom(it.flowClass)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
|
||||
@ -27,6 +28,13 @@ interface TransitionExecutor {
|
||||
transition: TransitionResult,
|
||||
actionExecutor: ActionExecutor
|
||||
): Pair<FlowContinuation, StateMachineState>
|
||||
|
||||
/**
|
||||
* Called if the normal exit path where the new state is marked as removed via [StateMachineState.isRemoved] is not called.
|
||||
* Currently this only happens via [StateMachineManager.killFlow]. This allows instances of this interface to clean up
|
||||
* any state they are holding for a flow to prevent a memory leak.
|
||||
*/
|
||||
fun forceRemoveFlow(id: StateMachineRunId)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
@ -30,6 +31,8 @@ class TransitionExecutorImpl(
|
||||
val secureRandom: SecureRandom,
|
||||
val database: CordaPersistence
|
||||
) : TransitionExecutor {
|
||||
override fun forceRemoveFlow(id: StateMachineRunId) {}
|
||||
|
||||
private companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
@ -13,7 +13,12 @@ package net.corda.node.services.statemachine.interceptors
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.statemachine.ActionExecutor
|
||||
import net.corda.node.services.statemachine.ErrorState
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.FlowFiber
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
import net.corda.node.services.statemachine.TransitionExecutor
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
import java.time.Instant
|
||||
@ -58,4 +63,9 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
|
||||
|
||||
return Pair(continuation, nextState)
|
||||
}
|
||||
|
||||
override fun forceRemoveFlow(id: StateMachineRunId) {
|
||||
records.remove(id)
|
||||
delegate.forceRemoveFlow(id)
|
||||
}
|
||||
}
|
||||
|
@ -11,11 +11,18 @@
|
||||
package net.corda.node.services.statemachine.interceptors
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.statemachine.ActionExecutor
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.FlowFiber
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
import net.corda.node.services.statemachine.TransitionExecutor
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
@ -28,6 +35,10 @@ class FiberDeserializationCheckingInterceptor(
|
||||
val fiberDeserializationChecker: FiberDeserializationChecker,
|
||||
val delegate: TransitionExecutor
|
||||
) : TransitionExecutor {
|
||||
override fun forceRemoveFlow(id: StateMachineRunId) {
|
||||
delegate.forceRemoveFlow(id)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun executeTransition(
|
||||
fiber: FlowFiber,
|
||||
|
21
node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt
21
node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt
@ -12,7 +12,13 @@ package net.corda.node.services.statemachine.interceptors
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.statemachine.ActionExecutor
|
||||
import net.corda.node.services.statemachine.ErrorState
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.FlowFiber
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
import net.corda.node.services.statemachine.TransitionExecutor
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
@ -25,6 +31,16 @@ class HospitalisingInterceptor(
|
||||
private val flowHospital: StaffedFlowHospital,
|
||||
private val delegate: TransitionExecutor
|
||||
) : TransitionExecutor {
|
||||
override fun forceRemoveFlow(id: StateMachineRunId) {
|
||||
removeFlow(id)
|
||||
delegate.forceRemoveFlow(id)
|
||||
}
|
||||
|
||||
private fun removeFlow(id: StateMachineRunId) {
|
||||
hospitalisedFlows.remove(id)
|
||||
flowHospital.flowRemoved(id)
|
||||
}
|
||||
|
||||
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
|
||||
|
||||
@Suspendable
|
||||
@ -51,8 +67,7 @@ class HospitalisingInterceptor(
|
||||
}
|
||||
}
|
||||
if (nextState.isRemoved) {
|
||||
hospitalisedFlows.remove(fiber.id)
|
||||
flowHospital.flowRemoved(fiber)
|
||||
removeFlow(fiber.id)
|
||||
}
|
||||
return Pair(continuation, nextState)
|
||||
}
|
||||
|
@ -2,11 +2,21 @@ package net.corda.node.services.statemachine.interceptors
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.node.services.statemachine.Action
|
||||
import net.corda.node.services.statemachine.ActionExecutor
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.FlowFiber
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
import net.corda.node.services.statemachine.TransitionExecutor
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
|
||||
class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor) : TransitionExecutor {
|
||||
override fun forceRemoveFlow(id: StateMachineRunId) {
|
||||
delegate.forceRemoveFlow(id)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair<FlowContinuation, StateMachineState> {
|
||||
val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor)
|
||||
|
@ -11,8 +11,13 @@
|
||||
package net.corda.node.services.statemachine.interceptors
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.statemachine.ActionExecutor
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.FlowFiber
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
import net.corda.node.services.statemachine.TransitionExecutor
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
import java.time.Instant
|
||||
@ -21,6 +26,10 @@ import java.time.Instant
|
||||
* This interceptor simply prints all state machine transitions. Useful for debugging.
|
||||
*/
|
||||
class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
|
||||
override fun forceRemoveFlow(id: StateMachineRunId) {
|
||||
delegate.forceRemoveFlow(id)
|
||||
}
|
||||
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
@ -47,8 +47,8 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
|
||||
@EmbeddedId
|
||||
val id: PersistentStateRef,
|
||||
|
||||
@Column(name = "consuming_transaction_id", nullable = false)
|
||||
val consumingTxHash: String
|
||||
@Column(name = "consuming_transaction_id", nullable = true)
|
||||
val consumingTxHash: String?
|
||||
) : Serializable
|
||||
|
||||
@Entity
|
||||
@ -60,11 +60,11 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
|
||||
@Column(nullable = true)
|
||||
val id: Int? = null,
|
||||
|
||||
@Column(name = "consuming_transaction_id", nullable = false)
|
||||
val consumingTxHash: String,
|
||||
@Column(name = "consuming_transaction_id", nullable = true)
|
||||
val consumingTxHash: String?,
|
||||
|
||||
@Column(name = "requesting_party_name", nullable = false)
|
||||
var partyName: String,
|
||||
@Column(name = "requesting_party_name", nullable = true)
|
||||
var partyName: String?,
|
||||
|
||||
@Lob
|
||||
@Column(name = "request_signature", nullable = false)
|
||||
|
@ -100,8 +100,8 @@ class RaftUniquenessProvider(
|
||||
class CommittedState(
|
||||
@EmbeddedId
|
||||
val id: PersistentStateRef,
|
||||
@Column(name = "consuming_transaction_id", nullable = false)
|
||||
var value: String = "",
|
||||
@Column(name = "consuming_transaction_id", nullable = true)
|
||||
var value: String? = "",
|
||||
@Column(name = "raft_log_index", nullable = false)
|
||||
var index: Long = 0
|
||||
) : Serializable
|
||||
|
@ -32,15 +32,15 @@ class ContractUpgradeServiceImpl : ContractUpgradeService, SingletonSerializeAsT
|
||||
var stateRef: String = "",
|
||||
|
||||
/** refers to the UpgradedContract class name*/
|
||||
@Column(name = "contract_class_name", nullable = false)
|
||||
var upgradedContractClassName: String = ""
|
||||
@Column(name = "contract_class_name", nullable = true)
|
||||
var upgradedContractClassName: String? = ""
|
||||
) : Serializable
|
||||
|
||||
private companion object {
|
||||
fun createContractUpgradesMap(): PersistentMap<String, String, DBContractUpgrade, String> {
|
||||
return PersistentMap(
|
||||
toPersistentEntityKey = { it },
|
||||
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName) },
|
||||
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
|
||||
toPersistentEntity = { key: String, value: String ->
|
||||
DBContractUpgrade().apply {
|
||||
stateRef = key
|
||||
|
@ -252,7 +252,7 @@ class NodeVaultService(
|
||||
val txIdPredicate = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultTxnNote::txId.name), txnId.toString())
|
||||
criteriaQuery.where(txIdPredicate)
|
||||
val results = session.createQuery(criteriaQuery).resultList
|
||||
results.asIterable().map { it.note }
|
||||
results.asIterable().map { it.note ?: "" }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
|
||||
joinColumns = [(JoinColumn(name = "output_index", referencedColumnName = "output_index")), (JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))],
|
||||
foreignKey = ForeignKey(name = "FK__lin_stat_parts__lin_stat"))
|
||||
@Column(name = "participants")
|
||||
var participants: MutableSet<AbstractParty>? = null,
|
||||
var participants: MutableSet<AbstractParty?>? = null,
|
||||
// Reason for not using Set is described here:
|
||||
// https://stackoverflow.com/questions/44213074/kotlin-collection-has-neither-generic-type-or-onetomany-targetentity
|
||||
|
||||
@ -124,7 +124,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
|
||||
|
||||
/** X500Name of owner party **/
|
||||
@Column(name = "owner_name", nullable = true)
|
||||
var owner: AbstractParty,
|
||||
var owner: AbstractParty?,
|
||||
|
||||
/** [FungibleAsset] attributes
|
||||
*
|
||||
@ -140,7 +140,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
|
||||
|
||||
/** X500Name of issuer party **/
|
||||
@Column(name = "issuer_name", nullable = true)
|
||||
var issuer: AbstractParty,
|
||||
var issuer: AbstractParty?,
|
||||
|
||||
@Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false)
|
||||
@Type(type = "corda-wrapper-binary")
|
||||
@ -162,11 +162,11 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
|
||||
@Column(name = "seq_no", nullable = false)
|
||||
var seqNo: Int,
|
||||
|
||||
@Column(name = "transaction_id", length = 64, nullable = false)
|
||||
var txId: String,
|
||||
@Column(name = "transaction_id", length = 64, nullable = true)
|
||||
var txId: String?,
|
||||
|
||||
@Column(name = "note", nullable = false)
|
||||
var note: String
|
||||
@Column(name = "note", nullable = true)
|
||||
var note: String?
|
||||
) : Serializable {
|
||||
constructor(txId: String, note: String) : this(0, txId, note)
|
||||
}
|
||||
|
@ -2,17 +2,17 @@ package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.concurrent.flatMap
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.FinalityHandler
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||
@ -102,6 +102,51 @@ class RetryFlowMockTest {
|
||||
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
|
||||
assertEquals(2, RetryFlow.count)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Patient records do not leak in hospital when using killFlow`() {
|
||||
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
|
||||
override val counterparty: Party
|
||||
get() = TODO("not implemented")
|
||||
|
||||
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun getCounterpartyFlowInfo(): FlowInfo {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <R : Any> receive(receiveType: Class<R>): UntrustworthyData<R> {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun send(payload: Any, maySkipCheckpoint: Boolean) {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun send(payload: Any) {
|
||||
TODO("not implemented")
|
||||
}
|
||||
}), nodeA.services.newContext()).get()
|
||||
// Make sure we have seen an update from the hospital, and thus the flow went there.
|
||||
nodeA.smm.flowHospital.track().updates.toBlocking().first()
|
||||
// Killing it should remove it.
|
||||
nodeA.smm.killFlow(flow.id)
|
||||
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
|
||||
}
|
||||
}
|
||||
|
||||
class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
|
||||
@ -126,6 +171,26 @@ class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
|
||||
}
|
||||
}
|
||||
|
||||
class RetryAndSleepFlow(private val i: Int) : FlowLogic<Unit>() {
|
||||
companion object {
|
||||
var count = 0
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
logger.info("Hello $count")
|
||||
if (count++ < i) {
|
||||
if (i == Int.MAX_VALUE) {
|
||||
throw LimitedRetryCausingError()
|
||||
} else {
|
||||
throw RetryCausingError()
|
||||
}
|
||||
} else {
|
||||
sleep(Duration.ofDays(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
|
||||
companion object {
|
||||
|
@ -17,7 +17,7 @@ class PersistentMapTests {
|
||||
private fun createTestMap(): PersistentMap<String, String, ContractUpgradeServiceImpl.DBContractUpgrade, String> {
|
||||
return PersistentMap(
|
||||
toPersistentEntityKey = { it },
|
||||
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName) },
|
||||
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
|
||||
toPersistentEntity = { key: String, value: String ->
|
||||
ContractUpgradeServiceImpl.DBContractUpgrade().apply {
|
||||
stateRef = key
|
||||
|
@ -35,7 +35,6 @@ include 'experimental:quasar-hook'
|
||||
include 'experimental:kryo-hook'
|
||||
include 'experimental:intellij-plugin'
|
||||
include 'experimental:flow-hook'
|
||||
include 'experimental:blobinspector'
|
||||
include 'experimental:ha-testing'
|
||||
include 'experimental:corda-utils'
|
||||
include 'test-common'
|
||||
|
@ -37,6 +37,7 @@ import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.hours
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.AbstractNode
|
||||
@ -491,7 +492,8 @@ private fun mockNodeConfiguration(): NodeConfiguration {
|
||||
doReturn(null).whenever(it).compatibilityZoneURL
|
||||
doReturn(null).whenever(it).networkServices
|
||||
doReturn(VerifierType.InMemory).whenever(it).verifierType
|
||||
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry
|
||||
// Set to be long enough so retries don't trigger unless we override it
|
||||
doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry
|
||||
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
|
||||
doReturn(null).whenever(it).devModeOptions
|
||||
doReturn(EnterpriseConfiguration(
|
||||
|
@ -58,8 +58,8 @@ object DummyLinearStateSchemaV1 : MappedSchema(schemaFamily = DummyLinearStateSc
|
||||
/**
|
||||
* Dummy attributes
|
||||
*/
|
||||
@Column(name = "linear_string", nullable = false)
|
||||
var linearString: String,
|
||||
@Column(name = "linear_string", nullable = true)
|
||||
var linearString: String?,
|
||||
|
||||
@Column(name = "linear_number", nullable = false)
|
||||
var linearNumber: Long,
|
||||
|
@ -34,7 +34,7 @@ object DummyLinearStateSchemaV2 : MappedSchema(schemaFamily = DummyLinearStateSc
|
||||
@CollectionTable(name = "dummy_linear_states_v2_parts", joinColumns = [(JoinColumn(name = "output_index", referencedColumnName = "output_index")), (JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))])
|
||||
override var participants: MutableSet<AbstractParty>? = null,
|
||||
|
||||
@Column(name = "linear_string", nullable = false) var linearString: String,
|
||||
@Column(name = "linear_string", nullable = true) var linearString: String?,
|
||||
|
||||
@Column(name = "linear_number", nullable = false) var linearNumber: Long,
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
apply plugin: 'java'
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'net.corda.plugins.publish-utils'
|
||||
apply plugin: 'com.jfrog.artifactory'
|
||||
|
||||
dependencies {
|
||||
compile project(':client:jackson')
|
||||
@ -28,11 +29,6 @@ jar {
|
||||
}
|
||||
}
|
||||
|
||||
jar {
|
||||
classifier "ignore"
|
||||
}
|
||||
|
||||
publish {
|
||||
name 'tools-blob-inspector'
|
||||
disableDefaultJar = true
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
apply plugin: 'us.kirchmeier.capsule'
|
||||
apply plugin: 'net.corda.plugins.publish-utils'
|
||||
apply plugin: 'com.jfrog.artifactory'
|
||||
|
||||
description 'Network bootstrapper'
|
||||
|
||||
@ -46,10 +47,6 @@ artifacts {
|
||||
}
|
||||
}
|
||||
|
||||
jar {
|
||||
classifier "ignore"
|
||||
}
|
||||
|
||||
publish {
|
||||
name 'tools-network-bootstrapper'
|
||||
disableDefaultJar = true
|
||||
|
Loading…
x
Reference in New Issue
Block a user