Resolved merge conflicts.

This commit is contained in:
szymonsztuka 2018-06-11 13:40:38 +01:00
commit 98e58b282c
38 changed files with 228 additions and 91 deletions

2
.idea/compiler.xml generated
View File

@ -100,8 +100,6 @@
<module name="experimental-behave_main" target="1.8" />
<module name="experimental-behave_smokeTest" target="1.8" />
<module name="experimental-behave_test" target="1.8" />
<module name="experimental-blobinspector_main" target="1.8" />
<module name="experimental-blobinspector_test" target="1.8" />
<module name="experimental-kryo-hook_main" target="1.8" />
<module name="experimental-kryo-hook_test" target="1.8" />
<module name="experimental_main" target="1.8" />

View File

@ -74,7 +74,6 @@ see changes to this list.
* gary-rowe
* Gavin Thomas (R3)
* George Marcel Smetana (Bradesco)
* George Smetana (Bradesco)
* Giulio Katis (Westpac)
* Giuseppe Cardone (Intesa Sanpaolo)
* Guy Hochstetler (R3)
@ -134,7 +133,7 @@ see changes to this list.
* Nigel King (R3)
* Nitesh Solanki (Persistent Systems Limited)
* Nuam Athaweth (MUFG)
* Oscar Zibordi de Paiva (Bradesco)
* Oscar Zibordi de Paiva (Scopus Soluções em TI)
* OP Financial
* Patrick Kuo (R3)
* Pekka Kaipio (OP Financial)
@ -172,7 +171,7 @@ see changes to this list.
* Stanly Johnson (Servntire Global)
* Szymon Sztuka (R3)
* tb-pq
* Thiago Rafael Ferreira (Scorpius IT Solutions)
* Thiago Rafael Ferreira (Scopus Soluções em TI)
* Thomas O'Donnell (Macquarie)
* Thomas Schroeter (R3)
* Tim Swanson (R3)
@ -185,6 +184,7 @@ see changes to this list.
* Tudor Malene (R3)
* Tushar Singh Bora
* varunkm
* Venelin Stoykov (INDUSTRIA)
* verymahler
* Viktor Kolomeyko (R3)
* Vipin Bharathan

View File

@ -56,13 +56,13 @@ sealed class SecureHash(bytes: ByteArray) : OpaqueBytes(bytes) {
* @throws IllegalArgumentException The input string does not contain 64 hexadecimal digits, or it contains incorrectly-encoded characters.
*/
@JvmStatic
fun parse(str: String): SHA256 {
return str.toUpperCase().parseAsHex().let {
fun parse(str: String?): SHA256 {
return str?.toUpperCase()?.parseAsHex()?.let {
when (it.size) {
32 -> SHA256(it)
else -> throw IllegalArgumentException("Provided string is ${it.size} bytes not 32 bytes in hex: $str")
}
}
} ?: throw IllegalArgumentException("Provided string is null")
}
private val sha256MessageDigest = SHA256DigestSupplier()

View File

@ -158,7 +158,13 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
override fun maybeWriteToDisk(downloaded: List<Attachment>) {
for (attachment in downloaded) {
serviceHub.attachments.importAttachment(attachment.open(), "$P2P_UPLOADER:${otherSideSession.counterparty.name}", null)
with(serviceHub.attachments) {
if (!hasAttachment(attachment.id)) {
importAttachment(attachment.open(), "$P2P_UPLOADER:${otherSideSession.counterparty.name}", null)
} else {
logger.info("Attachment ${attachment.id} already exists, skipping.")
}
}
}
}

View File

@ -61,7 +61,7 @@ object CommonSchemaV1 : MappedSchema(schemaFamily = CommonSchema.javaClass, vers
/** X500Name of participant parties **/
@Transient
open var participants: MutableSet<AbstractParty>? = null,
open var participants: MutableSet<AbstractParty?>? = null,
/** [OwnableState] attributes */

View File

@ -492,7 +492,7 @@ whether the change is one of position (i.e. progress), structure (i.e. new subta
aspect of rendering (i.e. a step has changed in some way and is requesting a re-render).
The flow framework is somewhat integrated with this API. Each ``FlowLogic`` may optionally provide a tracker by
overriding the ``flowTracker`` property (``getFlowTracker`` method in Java). If the
overriding the ``progressTracker`` property (``getProgressTracker`` method in Java). If the
``FlowLogic.subFlow`` method is used, then the tracker of the sub-flow will be made a child of the current
step in the parent flow automatically, if the parent is using tracking in the first place. The framework will also
automatically set the current step to ``DONE`` for you, when the flow is finished.

View File

@ -43,6 +43,8 @@ in the `Kotlin CorDapp Template <https://github.com/corda/cordapp-template-kotli
}
// No webport property, so no webserver will be created.
h2Port 10004
// Starts an internal SSH server providing a management shell on the node.
sshdPort 2223
// Includes the corda-finance CorDapp on our node.
cordapps = ["$corda_release_distribution:corda-finance:$corda_release_version"]
// Specify a JVM argument to be used when running the node (in this case, extra heap size).

View File

@ -139,7 +139,7 @@ Download a sample project
Run from the terminal
^^^^^^^^^^^^^^^^^^^^^
1. From the cordapp-example folder, deploy the nodes by running ``./gradlew deployNodes``
2. Start the nodes by running ``kotlin-source/build/nodes/runnodes``. Do not click while 8 additional terminal windows start up.
2. Start the nodes by running ``kotlin-source/build/nodes/runnodes``. Do not click while 7 additional terminal windows start up.
3. Wait until all the terminal windows display either "Webserver started up in XX.X sec" or "Node for "NodeC" started up and registered in XX.XX sec"
4. Test the CorDapp is running correctly by visiting the front end at http://localhost:10009/web/example/

View File

@ -39,7 +39,7 @@ object CashSchemaV1 : MappedSchema(
class PersistentCashState(
/** X500Name of owner party **/
@Column(name = "owner_name", nullable = true)
var owner: AbstractParty,
var owner: AbstractParty?,
@Column(name = "pennies", nullable = false)
var pennies: Long,

View File

@ -30,7 +30,7 @@ object SampleCashSchemaV2 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
/** product type */
@Column(name = "ccy_code", length = 3, nullable = false)
var currency: String,
participants: Set<AbstractParty>,
participants: Set<AbstractParty?>,
owner: AbstractParty,
quantity: Long,
issuerParty: AbstractParty,
@ -40,6 +40,6 @@ object SampleCashSchemaV2 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
@ElementCollection
@Column(name = "participants", nullable = true)
@CollectionTable(name = "cash_states_v2_participants", joinColumns = [JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")])
override var participants: MutableSet<AbstractParty>? = null
override var participants: MutableSet<AbstractParty?>? = null
}
}

View File

@ -36,11 +36,11 @@ object SampleCashSchemaV3 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
@CollectionTable(name="cash_state_participants", joinColumns = arrayOf(
JoinColumn(name = "output_index", referencedColumnName = "output_index"),
JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")))
var participants: MutableSet<AbstractParty>? = null,
var participants: MutableSet<AbstractParty?>? = null,
/** X500Name of owner party **/
@Column(name = "owner_name", nullable = true)
var owner: AbstractParty,
var owner: AbstractParty?,
@Column(name = "pennies", nullable = false)
var pennies: Long,
@ -50,7 +50,7 @@ object SampleCashSchemaV3 : MappedSchema(schemaFamily = CashSchema.javaClass, ve
/** X500Name of issuer party **/
@Column(name = "issuer_name", nullable = true)
var issuer: AbstractParty,
var issuer: AbstractParty?,
@Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false)
@Type(type = "corda-wrapper-binary")

View File

@ -55,6 +55,6 @@ object SampleCommercialPaperSchemaV2 : MappedSchema(schemaFamily = CommercialPap
@ElementCollection
@Column(name = "participants")
@CollectionTable(name = "cp_states_v2_participants", joinColumns = [JoinColumn(name = "output_index", referencedColumnName = "output_index"), JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id")])
override var participants: MutableSet<AbstractParty>? = null
override var participants: MutableSet<AbstractParty?>? = null
}
}

View File

@ -1099,7 +1099,7 @@ fun configureDatabase(hikariProperties: Properties,
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
} catch (ex: Exception) {
when {
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.")
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)
ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html")
else -> throw CouldNotCreateDataSourceException("Could not create the DataSource: ${ex.message}", ex)
}

View File

@ -25,8 +25,8 @@ class PersistentScheduledFlowRepository(val database: CordaPersistence) : Schedu
}
private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair<StateRef, ScheduledStateRef> {
val txId = scheduledStateRecord.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
val index = scheduledStateRecord.output.index ?: throw IllegalStateException("DB returned null integer index")
val txId = scheduledStateRecord.output.txId
val index = scheduledStateRecord.output.index
return Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), scheduledStateRecord.scheduledAt))
}

View File

@ -105,8 +105,8 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
@Column(name = "name", length = 128, nullable = false)
var name: String = "",
@Column(name = "pk_hash", length = MAX_HASH_HEX_SIZE, nullable = false)
var publicKeyHash: String = ""
@Column(name = "pk_hash", length = MAX_HASH_HEX_SIZE, nullable = true)
var publicKeyHash: String? = ""
) : Serializable
override val caCertStore: CertStore

View File

@ -46,8 +46,8 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat
@Column(name = "tx_id", length = 64, nullable = false)
var txId: String = "",
@Column(name = "state_machine_run_id", length = 36, nullable = false)
var stateMachineRunId: String = ""
@Column(name = "state_machine_run_id", length = 36, nullable = true)
var stateMachineRunId: String? = ""
) : Serializable
private companion object {

View File

@ -23,12 +23,20 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
@ -40,7 +48,11 @@ import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -52,7 +64,11 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.security.SecureRandom
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
@ -230,6 +246,7 @@ class SingleThreadedStateMachineManager(
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
transitionExecutor.forceRemoveFlow(id)
}
} else {
// TODO replace with a clustered delete after we'll support clustered nodes

View File

@ -2,7 +2,6 @@ package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.contextLogger
@ -110,8 +109,8 @@ class StaffedFlowHospital {
/**
* The flow has been removed from the state machine.
*/
fun flowRemoved(flowFiber: FlowFiber) {
mutex.locked { patients.remove(flowFiber.id) }
fun flowRemoved(flowId: StateMachineRunId) {
mutex.locked { patients.remove(flowId) }
}
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
@ -204,24 +203,14 @@ class StaffedFlowHospital {
object DoctorTimeout : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
if (newError is FlowTimeoutException) {
if (isTimedFlow(flowFiber)) {
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
return Diagnosis.DISCHARGE
} else {
log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}")
}
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
return Diagnosis.DISCHARGE
} else {
log.warn("\"Unable to restart flow: ${flowFiber.javaClass}, it is not timed and does not contain any timed sub-flows.")
log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}")
}
}
return Diagnosis.NOT_MY_SPECIALTY
}
private fun isTimedFlow(flowFiber: FlowFiber): Boolean {
return flowFiber.snapshot().checkpoint.subFlowStack.any {
TimedFlow::class.java.isAssignableFrom(it.flowClass)
}
}
}
/**

View File

@ -11,6 +11,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -27,6 +28,13 @@ interface TransitionExecutor {
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState>
/**
* Called if the normal exit path where the new state is marked as removed via [StateMachineState.isRemoved] is not called.
* Currently this only happens via [StateMachineManager.killFlow]. This allows instances of this interface to clean up
* any state they are holding for a flow to prevent a memory leak.
*/
fun forceRemoveFlow(id: StateMachineRunId)
}
/**

View File

@ -11,6 +11,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -30,6 +31,8 @@ class TransitionExecutorImpl(
val secureRandom: SecureRandom,
val database: CordaPersistence
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {}
private companion object {
val log = contextLogger()
}

View File

@ -13,7 +13,12 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.time.Instant
@ -58,4 +63,9 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
return Pair(continuation, nextState)
}
override fun forceRemoveFlow(id: StateMachineRunId) {
records.remove(id)
delegate.forceRemoveFlow(id)
}
}

View File

@ -11,11 +11,18 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.LinkedBlockingQueue
@ -28,6 +35,10 @@ class FiberDeserializationCheckingInterceptor(
val fiberDeserializationChecker: FiberDeserializationChecker,
val delegate: TransitionExecutor
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
@Suspendable
override fun executeTransition(
fiber: FlowFiber,

View File

@ -12,7 +12,13 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.ConcurrentHashMap
@ -25,6 +31,16 @@ class HospitalisingInterceptor(
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
removeFlow(id)
delegate.forceRemoveFlow(id)
}
private fun removeFlow(id: StateMachineRunId) {
hospitalisedFlows.remove(id)
flowHospital.flowRemoved(id)
}
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
@Suspendable
@ -51,8 +67,7 @@ class HospitalisingInterceptor(
}
}
if (nextState.isRemoved) {
hospitalisedFlows.remove(fiber.id)
flowHospital.flowRemoved(fiber)
removeFlow(fiber.id)
}
return Pair(continuation, nextState)
}

View File

@ -2,11 +2,21 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.node.services.statemachine.*
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
@Suspendable
override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair<FlowContinuation, StateMachineState> {
val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor)

View File

@ -11,8 +11,13 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.time.Instant
@ -21,6 +26,10 @@ import java.time.Instant
* This interceptor simply prints all state machine transitions. Useful for debugging.
*/
class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
companion object {
val log = contextLogger()
}

View File

@ -47,8 +47,8 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
@EmbeddedId
val id: PersistentStateRef,
@Column(name = "consuming_transaction_id", nullable = false)
val consumingTxHash: String
@Column(name = "consuming_transaction_id", nullable = true)
val consumingTxHash: String?
) : Serializable
@Entity
@ -60,11 +60,11 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
@Column(nullable = true)
val id: Int? = null,
@Column(name = "consuming_transaction_id", nullable = false)
val consumingTxHash: String,
@Column(name = "consuming_transaction_id", nullable = true)
val consumingTxHash: String?,
@Column(name = "requesting_party_name", nullable = false)
var partyName: String,
@Column(name = "requesting_party_name", nullable = true)
var partyName: String?,
@Lob
@Column(name = "request_signature", nullable = false)

View File

@ -100,8 +100,8 @@ class RaftUniquenessProvider(
class CommittedState(
@EmbeddedId
val id: PersistentStateRef,
@Column(name = "consuming_transaction_id", nullable = false)
var value: String = "",
@Column(name = "consuming_transaction_id", nullable = true)
var value: String? = "",
@Column(name = "raft_log_index", nullable = false)
var index: Long = 0
) : Serializable

View File

@ -32,15 +32,15 @@ class ContractUpgradeServiceImpl : ContractUpgradeService, SingletonSerializeAsT
var stateRef: String = "",
/** refers to the UpgradedContract class name*/
@Column(name = "contract_class_name", nullable = false)
var upgradedContractClassName: String = ""
@Column(name = "contract_class_name", nullable = true)
var upgradedContractClassName: String? = ""
) : Serializable
private companion object {
fun createContractUpgradesMap(): PersistentMap<String, String, DBContractUpgrade, String> {
return PersistentMap(
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName) },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
toPersistentEntity = { key: String, value: String ->
DBContractUpgrade().apply {
stateRef = key

View File

@ -252,7 +252,7 @@ class NodeVaultService(
val txIdPredicate = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultTxnNote::txId.name), txnId.toString())
criteriaQuery.where(txIdPredicate)
val results = session.createQuery(criteriaQuery).resultList
results.asIterable().map { it.note }
results.asIterable().map { it.note ?: "" }
}
}

View File

@ -87,7 +87,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
joinColumns = [(JoinColumn(name = "output_index", referencedColumnName = "output_index")), (JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))],
foreignKey = ForeignKey(name = "FK__lin_stat_parts__lin_stat"))
@Column(name = "participants")
var participants: MutableSet<AbstractParty>? = null,
var participants: MutableSet<AbstractParty?>? = null,
// Reason for not using Set is described here:
// https://stackoverflow.com/questions/44213074/kotlin-collection-has-neither-generic-type-or-onetomany-targetentity
@ -124,7 +124,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
/** X500Name of owner party **/
@Column(name = "owner_name", nullable = true)
var owner: AbstractParty,
var owner: AbstractParty?,
/** [FungibleAsset] attributes
*
@ -140,7 +140,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
/** X500Name of issuer party **/
@Column(name = "issuer_name", nullable = true)
var issuer: AbstractParty,
var issuer: AbstractParty?,
@Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false)
@Type(type = "corda-wrapper-binary")
@ -162,11 +162,11 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
@Column(name = "seq_no", nullable = false)
var seqNo: Int,
@Column(name = "transaction_id", length = 64, nullable = false)
var txId: String,
@Column(name = "transaction_id", length = 64, nullable = true)
var txId: String?,
@Column(name = "note", nullable = false)
var note: String
@Column(name = "note", nullable = true)
var note: String?
) : Serializable {
constructor(txId: String, note: String) : this(0, txId, note)
}

View File

@ -2,17 +2,17 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.packageName
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.StartedNode
import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction
@ -102,6 +102,51 @@ class RetryFlowMockTest {
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
assertEquals(2, RetryFlow.count)
}
@Test
fun `Patient records do not leak in hospital when using killFlow`() {
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
override val counterparty: Party
get() = TODO("not implemented")
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
TODO("not implemented")
}
override fun getCounterpartyFlowInfo(): FlowInfo {
TODO("not implemented")
}
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
TODO("not implemented")
}
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
TODO("not implemented")
}
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
TODO("not implemented")
}
override fun <R : Any> receive(receiveType: Class<R>): UntrustworthyData<R> {
TODO("not implemented")
}
override fun send(payload: Any, maySkipCheckpoint: Boolean) {
TODO("not implemented")
}
override fun send(payload: Any) {
TODO("not implemented")
}
}), nodeA.services.newContext()).get()
// Make sure we have seen an update from the hospital, and thus the flow went there.
nodeA.smm.flowHospital.track().updates.toBlocking().first()
// Killing it should remove it.
nodeA.smm.killFlow(flow.id)
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
}
}
class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
@ -126,6 +171,26 @@ class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
}
}
class RetryAndSleepFlow(private val i: Int) : FlowLogic<Unit>() {
companion object {
var count = 0
}
@Suspendable
override fun call() {
logger.info("Hello $count")
if (count++ < i) {
if (i == Int.MAX_VALUE) {
throw LimitedRetryCausingError()
} else {
throw RetryCausingError()
}
} else {
sleep(Duration.ofDays(1))
}
}
}
@InitiatingFlow
class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
companion object {

View File

@ -17,7 +17,7 @@ class PersistentMapTests {
private fun createTestMap(): PersistentMap<String, String, ContractUpgradeServiceImpl.DBContractUpgrade, String> {
return PersistentMap(
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName) },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
toPersistentEntity = { key: String, value: String ->
ContractUpgradeServiceImpl.DBContractUpgrade().apply {
stateRef = key

View File

@ -35,7 +35,6 @@ include 'experimental:quasar-hook'
include 'experimental:kryo-hook'
include 'experimental:intellij-plugin'
include 'experimental:flow-hook'
include 'experimental:blobinspector'
include 'experimental:ha-testing'
include 'experimental:corda-utils'
include 'test-common'

View File

@ -37,6 +37,7 @@ import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.hours
import net.corda.core.utilities.seconds
import net.corda.node.VersionInfo
import net.corda.node.internal.AbstractNode
@ -491,7 +492,8 @@ private fun mockNodeConfiguration(): NodeConfiguration {
doReturn(null).whenever(it).compatibilityZoneURL
doReturn(null).whenever(it).networkServices
doReturn(VerifierType.InMemory).whenever(it).verifierType
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry
// Set to be long enough so retries don't trigger unless we override it
doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).devModeOptions
doReturn(EnterpriseConfiguration(

View File

@ -58,8 +58,8 @@ object DummyLinearStateSchemaV1 : MappedSchema(schemaFamily = DummyLinearStateSc
/**
* Dummy attributes
*/
@Column(name = "linear_string", nullable = false)
var linearString: String,
@Column(name = "linear_string", nullable = true)
var linearString: String?,
@Column(name = "linear_number", nullable = false)
var linearNumber: Long,

View File

@ -34,7 +34,7 @@ object DummyLinearStateSchemaV2 : MappedSchema(schemaFamily = DummyLinearStateSc
@CollectionTable(name = "dummy_linear_states_v2_parts", joinColumns = [(JoinColumn(name = "output_index", referencedColumnName = "output_index")), (JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"))])
override var participants: MutableSet<AbstractParty>? = null,
@Column(name = "linear_string", nullable = false) var linearString: String,
@Column(name = "linear_string", nullable = true) var linearString: String?,
@Column(name = "linear_number", nullable = false) var linearNumber: Long,

View File

@ -1,6 +1,7 @@
apply plugin: 'java'
apply plugin: 'kotlin'
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'com.jfrog.artifactory'
dependencies {
compile project(':client:jackson')
@ -28,11 +29,6 @@ jar {
}
}
jar {
classifier "ignore"
}
publish {
name 'tools-blob-inspector'
disableDefaultJar = true
}

View File

@ -10,6 +10,7 @@
apply plugin: 'us.kirchmeier.capsule'
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'com.jfrog.artifactory'
description 'Network bootstrapper'
@ -46,10 +47,6 @@ artifacts {
}
}
jar {
classifier "ignore"
}
publish {
name 'tools-network-bootstrapper'
disableDefaultJar = true