diff --git a/.idea/compiler.xml b/.idea/compiler.xml index e16d02f15e..8dfe4ee76b 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -100,8 +100,6 @@ - - diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index f5814f22e6..c77a2ba99d 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -74,7 +74,6 @@ see changes to this list. * gary-rowe * Gavin Thomas (R3) * George Marcel Smetana (Bradesco) -* George Smetana (Bradesco) * Giulio Katis (Westpac) * Giuseppe Cardone (Intesa Sanpaolo) * Guy Hochstetler (R3) @@ -134,7 +133,7 @@ see changes to this list. * Nigel King (R3) * Nitesh Solanki (Persistent Systems Limited) * Nuam Athaweth (MUFG) -* Oscar Zibordi de Paiva (Bradesco) +* Oscar Zibordi de Paiva (Scopus Soluções em TI) * OP Financial * Patrick Kuo (R3) * Pekka Kaipio (OP Financial) @@ -172,7 +171,7 @@ see changes to this list. * Stanly Johnson (Servntire Global) * Szymon Sztuka (R3) * tb-pq -* Thiago Rafael Ferreira (Scorpius IT Solutions) +* Thiago Rafael Ferreira (Scopus Soluções em TI) * Thomas O'Donnell (Macquarie) * Thomas Schroeter (R3) * Tim Swanson (R3) @@ -185,6 +184,7 @@ see changes to this list. * Tudor Malene (R3) * Tushar Singh Bora * varunkm +* Venelin Stoykov (INDUSTRIA) * verymahler * Viktor Kolomeyko (R3) * Vipin Bharathan diff --git a/core/src/main/kotlin/net/corda/core/crypto/SecureHash.kt b/core/src/main/kotlin/net/corda/core/crypto/SecureHash.kt index 3e505a2f02..091dff47c1 100644 --- a/core/src/main/kotlin/net/corda/core/crypto/SecureHash.kt +++ b/core/src/main/kotlin/net/corda/core/crypto/SecureHash.kt @@ -56,13 +56,13 @@ sealed class SecureHash(bytes: ByteArray) : OpaqueBytes(bytes) { * @throws IllegalArgumentException The input string does not contain 64 hexadecimal digits, or it contains incorrectly-encoded characters. */ @JvmStatic - fun parse(str: String): SHA256 { - return str.toUpperCase().parseAsHex().let { + fun parse(str: String?): SHA256 { + return str?.toUpperCase()?.parseAsHex()?.let { when (it.size) { 32 -> SHA256(it) else -> throw IllegalArgumentException("Provided string is ${it.size} bytes not 32 bytes in hex: $str") } - } + } ?: throw IllegalArgumentException("Provided string is null") } private val sha256MessageDigest = SHA256DigestSupplier() diff --git a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt index f007d56767..2cca161d9f 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt @@ -158,7 +158,13 @@ class FetchAttachmentsFlow(requests: Set, override fun maybeWriteToDisk(downloaded: List) { for (attachment in downloaded) { - serviceHub.attachments.importAttachment(attachment.open(), "$P2P_UPLOADER:${otherSideSession.counterparty.name}", null) + with(serviceHub.attachments) { + if (!hasAttachment(attachment.id)) { + importAttachment(attachment.open(), "$P2P_UPLOADER:${otherSideSession.counterparty.name}", null) + } else { + logger.info("Attachment ${attachment.id} already exists, skipping.") + } + } } } diff --git a/core/src/main/kotlin/net/corda/core/schemas/CommonSchema.kt b/core/src/main/kotlin/net/corda/core/schemas/CommonSchema.kt index 2ee20228c7..d0547d1985 100644 --- a/core/src/main/kotlin/net/corda/core/schemas/CommonSchema.kt +++ b/core/src/main/kotlin/net/corda/core/schemas/CommonSchema.kt @@ -61,7 +61,7 @@ object CommonSchemaV1 : MappedSchema(schemaFamily = CommonSchema.javaClass, vers /** X500Name of participant parties **/ @Transient - open var participants: MutableSet? = null, + open var participants: MutableSet? = null, /** [OwnableState] attributes */ diff --git a/docs/source/flow-state-machines.rst b/docs/source/flow-state-machines.rst index b811b21b0f..5f7f2a8caa 100644 --- a/docs/source/flow-state-machines.rst +++ b/docs/source/flow-state-machines.rst @@ -492,7 +492,7 @@ whether the change is one of position (i.e. progress), structure (i.e. new subta aspect of rendering (i.e. a step has changed in some way and is requesting a re-render). The flow framework is somewhat integrated with this API. Each ``FlowLogic`` may optionally provide a tracker by -overriding the ``flowTracker`` property (``getFlowTracker`` method in Java). If the +overriding the ``progressTracker`` property (``getProgressTracker`` method in Java). If the ``FlowLogic.subFlow`` method is used, then the tracker of the sub-flow will be made a child of the current step in the parent flow automatically, if the parent is using tracking in the first place. The framework will also automatically set the current step to ``DONE`` for you, when the flow is finished. diff --git a/docs/source/generating-a-node.rst b/docs/source/generating-a-node.rst index 300b62540a..a457835924 100644 --- a/docs/source/generating-a-node.rst +++ b/docs/source/generating-a-node.rst @@ -43,6 +43,8 @@ in the `Kotlin CorDapp Template , + participants: Set, owner: AbstractParty, quantity: Long, issuerParty: AbstractParty, @@ -40,6 +40,6 @@ object SampleCashSchemaV2 : MappedSchema(schemaFamily = CashSchema.javaClass, ve @ElementCollection @Column(name = "participants", nullable = true) @CollectionTable(name = "cash_states_v2_participants", joinColumns = [JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")]) - override var participants: MutableSet? = null + override var participants: MutableSet? = null } } diff --git a/finance/src/test/kotlin/net/corda/finance/schemas/SampleCashSchemaV3.kt b/finance/src/test/kotlin/net/corda/finance/schemas/SampleCashSchemaV3.kt index b484cb1089..582dae5e78 100644 --- a/finance/src/test/kotlin/net/corda/finance/schemas/SampleCashSchemaV3.kt +++ b/finance/src/test/kotlin/net/corda/finance/schemas/SampleCashSchemaV3.kt @@ -36,11 +36,11 @@ object SampleCashSchemaV3 : MappedSchema(schemaFamily = CashSchema.javaClass, ve @CollectionTable(name="cash_state_participants", joinColumns = arrayOf( JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))) - var participants: MutableSet? = null, + var participants: MutableSet? = null, /** X500Name of owner party **/ @Column(name = "owner_name", nullable = true) - var owner: AbstractParty, + var owner: AbstractParty?, @Column(name = "pennies", nullable = false) var pennies: Long, @@ -50,7 +50,7 @@ object SampleCashSchemaV3 : MappedSchema(schemaFamily = CashSchema.javaClass, ve /** X500Name of issuer party **/ @Column(name = "issuer_name", nullable = true) - var issuer: AbstractParty, + var issuer: AbstractParty?, @Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false) @Type(type = "corda-wrapper-binary") diff --git a/finance/src/test/kotlin/net/corda/finance/schemas/SampleCommercialPaperSchemaV2.kt b/finance/src/test/kotlin/net/corda/finance/schemas/SampleCommercialPaperSchemaV2.kt index 323a743843..6565df5d94 100644 --- a/finance/src/test/kotlin/net/corda/finance/schemas/SampleCommercialPaperSchemaV2.kt +++ b/finance/src/test/kotlin/net/corda/finance/schemas/SampleCommercialPaperSchemaV2.kt @@ -55,6 +55,6 @@ object SampleCommercialPaperSchemaV2 : MappedSchema(schemaFamily = CommercialPap @ElementCollection @Column(name = "participants") @CollectionTable(name = "cp_states_v2_participants", joinColumns = [JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")]) - override var participants: MutableSet? = null + override var participants: MutableSet? = null } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 3519fe386f..d819bd29f8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -1099,7 +1099,7 @@ fun configureDatabase(hikariProperties: Properties, return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters) } catch (ex: Exception) { when { - ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.") + ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex) ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html") else -> throw CouldNotCreateDataSourceException("Could not create the DataSource: ${ex.message}", ex) } diff --git a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt index 10624f9c96..cff3ac39cb 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt @@ -25,8 +25,8 @@ class PersistentScheduledFlowRepository(val database: CordaPersistence) : Schedu } private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair { - val txId = scheduledStateRecord.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") - val index = scheduledStateRecord.output.index ?: throw IllegalStateException("DB returned null integer index") + val txId = scheduledStateRecord.output.txId + val index = scheduledStateRecord.output.index return Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), scheduledStateRecord.scheduledAt)) } diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index f74e890c04..a8ce8c8a7e 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -105,8 +105,8 @@ class PersistentIdentityService(override val trustRoot: X509Certificate, @Column(name = "name", length = 128, nullable = false) var name: String = "", - @Column(name = "pk_hash", length = MAX_HASH_HEX_SIZE, nullable = false) - var publicKeyHash: String = "" + @Column(name = "pk_hash", length = MAX_HASH_HEX_SIZE, nullable = true) + var publicKeyHash: String? = "" ) : Serializable override val caCertStore: CertStore diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt index 3ac3bd8d52..2af6d459bd 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt @@ -46,8 +46,8 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat @Column(name = "tx_id", length = 64, nullable = false) var txId: String = "", - @Column(name = "state_machine_run_id", length = 36, nullable = false) - var stateMachineRunId: String = "" + @Column(name = "state_machine_run_id", length = 36, nullable = true) + var stateMachineRunId: String? = "" ) : Serializable private companion object { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 52d326fa21..feeb0329cc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -23,12 +23,20 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.* +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.TimedFlow +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.castIfPossible import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed -import net.corda.core.serialization.* +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger @@ -40,7 +48,11 @@ import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion -import net.corda.node.services.statemachine.interceptors.* +import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor +import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker +import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor +import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor +import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -52,7 +64,11 @@ import rx.Observable import rx.subjects.PublishSubject import java.security.SecureRandom import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.ThreadSafe import kotlin.collections.ArrayList @@ -230,6 +246,7 @@ class SingleThreadedStateMachineManager( database.transaction { checkpointStorage.removeCheckpoint(id) } + transitionExecutor.forceRemoveFlow(id) } } else { // TODO replace with a clustered delete after we'll support clustered nodes diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index a08ebada2e..4228a73367 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -2,7 +2,6 @@ package net.corda.node.services.statemachine import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.ThreadBox -import net.corda.core.internal.TimedFlow import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.messaging.DataFeed import net.corda.core.utilities.contextLogger @@ -110,8 +109,8 @@ class StaffedFlowHospital { /** * The flow has been removed from the state machine. */ - fun flowRemoved(flowFiber: FlowFiber) { - mutex.locked { patients.remove(flowFiber.id) } + fun flowRemoved(flowId: StateMachineRunId) { + mutex.locked { patients.remove(flowId) } } // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC @@ -204,24 +203,14 @@ class StaffedFlowHospital { object DoctorTimeout : Staff { override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { if (newError is FlowTimeoutException) { - if (isTimedFlow(flowFiber)) { - if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) { - return Diagnosis.DISCHARGE - } else { - log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}") - } + if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) { + return Diagnosis.DISCHARGE } else { - log.warn("\"Unable to restart flow: ${flowFiber.javaClass}, it is not timed and does not contain any timed sub-flows.") + log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}") } } return Diagnosis.NOT_MY_SPECIALTY } - - private fun isTimedFlow(flowFiber: FlowFiber): Boolean { - return flowFiber.snapshot().checkpoint.subFlowStack.any { - TimedFlow::class.java.isAssignableFrom(it.flowClass) - } - } } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt index 41b8c9fc85..cdd9476afe 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt @@ -11,6 +11,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult @@ -27,6 +28,13 @@ interface TransitionExecutor { transition: TransitionResult, actionExecutor: ActionExecutor ): Pair + + /** + * Called if the normal exit path where the new state is marked as removed via [StateMachineState.isRemoved] is not called. + * Currently this only happens via [StateMachineManager.killFlow]. This allows instances of this interface to clean up + * any state they are holding for a flow to prevent a memory leak. + */ + fun forceRemoveFlow(id: StateMachineRunId) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt index 5b896ee45c..4f7ad640bf 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt @@ -11,6 +11,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.core.utilities.contextLogger import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult @@ -30,6 +31,8 @@ class TransitionExecutorImpl( val secureRandom: SecureRandom, val database: CordaPersistence ) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) {} + private companion object { val log = contextLogger() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt index b543757df8..8b47f30201 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt @@ -13,7 +13,12 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.StateMachineRunId import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.time.Instant @@ -58,4 +63,9 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti return Pair(continuation, nextState) } + + override fun forceRemoveFlow(id: StateMachineRunId) { + records.remove(id) + delegate.forceRemoveFlow(id) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt index c7ded2c353..3911afa067 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt @@ -11,11 +11,18 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.FlowState +import net.corda.node.services.statemachine.FlowStateMachineImpl +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.util.concurrent.LinkedBlockingQueue @@ -28,6 +35,10 @@ class FiberDeserializationCheckingInterceptor( val fiberDeserializationChecker: FiberDeserializationChecker, val delegate: TransitionExecutor ) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + delegate.forceRemoveFlow(id) + } + @Suspendable override fun executeTransition( fiber: FlowFiber, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt index 231a3b2467..a94c865440 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt @@ -12,7 +12,13 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.StateMachineRunId -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.util.concurrent.ConcurrentHashMap @@ -25,6 +31,16 @@ class HospitalisingInterceptor( private val flowHospital: StaffedFlowHospital, private val delegate: TransitionExecutor ) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + removeFlow(id) + delegate.forceRemoveFlow(id) + } + + private fun removeFlow(id: StateMachineRunId) { + hospitalisedFlows.remove(id) + flowHospital.flowRemoved(id) + } + private val hospitalisedFlows = ConcurrentHashMap() @Suspendable @@ -51,8 +67,7 @@ class HospitalisingInterceptor( } } if (nextState.isRemoved) { - hospitalisedFlows.remove(fiber.id) - flowHospital.flowRemoved(fiber) + removeFlow(fiber.id) } return Pair(continuation, nextState) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt index 4f7f140673..bbd4e7472e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt @@ -2,11 +2,21 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable import com.codahale.metrics.MetricRegistry -import net.corda.node.services.statemachine.* +import net.corda.core.flows.StateMachineRunId +import net.corda.node.services.statemachine.Action +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + delegate.forceRemoveFlow(id) + } + @Suspendable override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair { val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt index 698ac106fa..5a84b8cb00 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt @@ -11,8 +11,13 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.time.Instant @@ -21,6 +26,10 @@ import java.time.Instant * This interceptor simply prints all state machine transitions. Useful for debugging. */ class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + delegate.forceRemoveFlow(id) + } + companion object { val log = contextLogger() } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 748ad581b5..b935c898fd 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -47,8 +47,8 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl @EmbeddedId val id: PersistentStateRef, - @Column(name = "consuming_transaction_id", nullable = false) - val consumingTxHash: String + @Column(name = "consuming_transaction_id", nullable = true) + val consumingTxHash: String? ) : Serializable @Entity @@ -60,11 +60,11 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl @Column(nullable = true) val id: Int? = null, - @Column(name = "consuming_transaction_id", nullable = false) - val consumingTxHash: String, + @Column(name = "consuming_transaction_id", nullable = true) + val consumingTxHash: String?, - @Column(name = "requesting_party_name", nullable = false) - var partyName: String, + @Column(name = "requesting_party_name", nullable = true) + var partyName: String?, @Lob @Column(name = "request_signature", nullable = false) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index 02bae2c566..74489a8f14 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -100,8 +100,8 @@ class RaftUniquenessProvider( class CommittedState( @EmbeddedId val id: PersistentStateRef, - @Column(name = "consuming_transaction_id", nullable = false) - var value: String = "", + @Column(name = "consuming_transaction_id", nullable = true) + var value: String? = "", @Column(name = "raft_log_index", nullable = false) var index: Long = 0 ) : Serializable diff --git a/node/src/main/kotlin/net/corda/node/services/upgrade/ContractUpgradeServiceImpl.kt b/node/src/main/kotlin/net/corda/node/services/upgrade/ContractUpgradeServiceImpl.kt index 50e7995c1c..dddaf4221d 100644 --- a/node/src/main/kotlin/net/corda/node/services/upgrade/ContractUpgradeServiceImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/upgrade/ContractUpgradeServiceImpl.kt @@ -32,15 +32,15 @@ class ContractUpgradeServiceImpl : ContractUpgradeService, SingletonSerializeAsT var stateRef: String = "", /** refers to the UpgradedContract class name*/ - @Column(name = "contract_class_name", nullable = false) - var upgradedContractClassName: String = "" + @Column(name = "contract_class_name", nullable = true) + var upgradedContractClassName: String? = "" ) : Serializable private companion object { fun createContractUpgradesMap(): PersistentMap { return PersistentMap( toPersistentEntityKey = { it }, - fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName) }, + fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") }, toPersistentEntity = { key: String, value: String -> DBContractUpgrade().apply { stateRef = key diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 119febfc37..7cb92ed609 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -252,7 +252,7 @@ class NodeVaultService( val txIdPredicate = criteriaBuilder.equal(vaultStates.get(VaultSchemaV1.VaultTxnNote::txId.name), txnId.toString()) criteriaQuery.where(txIdPredicate) val results = session.createQuery(criteriaQuery).resultList - results.asIterable().map { it.note } + results.asIterable().map { it.note ?: "" } } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt index 8acc247892..17e92153ee 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt @@ -87,7 +87,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio joinColumns = [(JoinColumn(name = "output_index", referencedColumnName = "output_index")), (JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))], foreignKey = ForeignKey(name = "FK__lin_stat_parts__lin_stat")) @Column(name = "participants") - var participants: MutableSet? = null, + var participants: MutableSet? = null, // Reason for not using Set is described here: // https://stackoverflow.com/questions/44213074/kotlin-collection-has-neither-generic-type-or-onetomany-targetentity @@ -124,7 +124,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio /** X500Name of owner party **/ @Column(name = "owner_name", nullable = true) - var owner: AbstractParty, + var owner: AbstractParty?, /** [FungibleAsset] attributes * @@ -140,7 +140,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio /** X500Name of issuer party **/ @Column(name = "issuer_name", nullable = true) - var issuer: AbstractParty, + var issuer: AbstractParty?, @Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false) @Type(type = "corda-wrapper-binary") @@ -162,11 +162,11 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio @Column(name = "seq_no", nullable = false) var seqNo: Int, - @Column(name = "transaction_id", length = 64, nullable = false) - var txId: String, + @Column(name = "transaction_id", length = 64, nullable = true) + var txId: String?, - @Column(name = "note", nullable = false) - var note: String + @Column(name = "note", nullable = true) + var note: String? ) : Serializable { constructor(txId: String, note: String) : this(0, txId, note) } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index fca7e8687b..56d009eeef 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -2,17 +2,17 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.core.concurrent.CordaFuture -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowSession -import net.corda.core.flows.InitiatedBy -import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.* import net.corda.core.identity.Party +import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.packageName import net.corda.core.messaging.MessageRecipients +import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.internal.StartedNode +import net.corda.node.services.FinalityHandler import net.corda.node.services.messaging.Message import net.corda.node.services.persistence.DBTransactionStorage import net.corda.nodeapi.internal.persistence.contextTransaction @@ -102,6 +102,51 @@ class RetryFlowMockTest { assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty() assertEquals(2, RetryFlow.count) } + + @Test + fun `Patient records do not leak in hospital when using killFlow`() { + val flow: FlowStateMachine = nodeA.services.startFlow(FinalityHandler(object : FlowSession() { + override val counterparty: Party + get() = TODO("not implemented") + + override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo { + TODO("not implemented") + } + + override fun getCounterpartyFlowInfo(): FlowInfo { + TODO("not implemented") + } + + override fun sendAndReceive(receiveType: Class, payload: Any, maySkipCheckpoint: Boolean): UntrustworthyData { + TODO("not implemented") + } + + override fun sendAndReceive(receiveType: Class, payload: Any): UntrustworthyData { + TODO("not implemented") + } + + override fun receive(receiveType: Class, maySkipCheckpoint: Boolean): UntrustworthyData { + TODO("not implemented") + } + + override fun receive(receiveType: Class): UntrustworthyData { + TODO("not implemented") + } + + override fun send(payload: Any, maySkipCheckpoint: Boolean) { + TODO("not implemented") + } + + override fun send(payload: Any) { + TODO("not implemented") + } + }), nodeA.services.newContext()).get() + // Make sure we have seen an update from the hospital, and thus the flow went there. + nodeA.smm.flowHospital.track().updates.toBlocking().first() + // Killing it should remove it. + nodeA.smm.killFlow(flow.id) + assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty() + } } class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint") @@ -126,6 +171,26 @@ class RetryFlow(private val i: Int) : FlowLogic() { } } +class RetryAndSleepFlow(private val i: Int) : FlowLogic() { + companion object { + var count = 0 + } + + @Suspendable + override fun call() { + logger.info("Hello $count") + if (count++ < i) { + if (i == Int.MAX_VALUE) { + throw LimitedRetryCausingError() + } else { + throw RetryCausingError() + } + } else { + sleep(Duration.ofDays(1)) + } + } +} + @InitiatingFlow class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic() { companion object { diff --git a/node/src/test/kotlin/net/corda/node/utilities/PersistentMapTests.kt b/node/src/test/kotlin/net/corda/node/utilities/PersistentMapTests.kt index 671e9c1fc8..2ee96bae91 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/PersistentMapTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/PersistentMapTests.kt @@ -17,7 +17,7 @@ class PersistentMapTests { private fun createTestMap(): PersistentMap { return PersistentMap( toPersistentEntityKey = { it }, - fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName) }, + fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") }, toPersistentEntity = { key: String, value: String -> ContractUpgradeServiceImpl.DBContractUpgrade().apply { stateRef = key diff --git a/settings.gradle b/settings.gradle index e19af34b00..b3387a2a1b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -35,7 +35,6 @@ include 'experimental:quasar-hook' include 'experimental:kryo-hook' include 'experimental:intellij-plugin' include 'experimental:flow-hook' -include 'experimental:blobinspector' include 'experimental:ha-testing' include 'experimental:corda-utils' include 'test-common' diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index f43b356e6b..8503cf0c77 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -37,6 +37,7 @@ import net.corda.core.node.services.KeyManagementService import net.corda.core.serialization.SerializationWhitelist import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.hours import net.corda.core.utilities.seconds import net.corda.node.VersionInfo import net.corda.node.internal.AbstractNode @@ -491,7 +492,8 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(null).whenever(it).compatibilityZoneURL doReturn(null).whenever(it).networkServices doReturn(VerifierType.InMemory).whenever(it).verifierType - doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry + // Set to be long enough so retries don't trigger unless we override it + doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions doReturn(EnterpriseConfiguration( diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV1.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV1.kt index 4aee250fdc..05e776acdf 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV1.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV1.kt @@ -58,8 +58,8 @@ object DummyLinearStateSchemaV1 : MappedSchema(schemaFamily = DummyLinearStateSc /** * Dummy attributes */ - @Column(name = "linear_string", nullable = false) - var linearString: String, + @Column(name = "linear_string", nullable = true) + var linearString: String?, @Column(name = "linear_number", nullable = false) var linearNumber: Long, diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV2.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV2.kt index 13f34b6496..b442218276 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV2.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/vault/DummyLinearStateSchemaV2.kt @@ -34,7 +34,7 @@ object DummyLinearStateSchemaV2 : MappedSchema(schemaFamily = DummyLinearStateSc @CollectionTable(name = "dummy_linear_states_v2_parts", joinColumns = [(JoinColumn(name = "output_index", referencedColumnName = "output_index")), (JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))]) override var participants: MutableSet? = null, - @Column(name = "linear_string", nullable = false) var linearString: String, + @Column(name = "linear_string", nullable = true) var linearString: String?, @Column(name = "linear_number", nullable = false) var linearNumber: Long, diff --git a/tools/blobinspector/build.gradle b/tools/blobinspector/build.gradle index 9c4328b8e3..06b4f21e52 100644 --- a/tools/blobinspector/build.gradle +++ b/tools/blobinspector/build.gradle @@ -1,6 +1,7 @@ apply plugin: 'java' apply plugin: 'kotlin' apply plugin: 'net.corda.plugins.publish-utils' +apply plugin: 'com.jfrog.artifactory' dependencies { compile project(':client:jackson') @@ -28,11 +29,6 @@ jar { } } -jar { - classifier "ignore" -} - publish { name 'tools-blob-inspector' - disableDefaultJar = true } diff --git a/tools/bootstrapper/build.gradle b/tools/bootstrapper/build.gradle index 45425327b0..057229c099 100644 --- a/tools/bootstrapper/build.gradle +++ b/tools/bootstrapper/build.gradle @@ -10,6 +10,7 @@ apply plugin: 'us.kirchmeier.capsule' apply plugin: 'net.corda.plugins.publish-utils' +apply plugin: 'com.jfrog.artifactory' description 'Network bootstrapper' @@ -46,10 +47,6 @@ artifacts { } } -jar { - classifier "ignore" -} - publish { name 'tools-network-bootstrapper' disableDefaultJar = true