diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml index ae7a747939..e6da17c0e7 100644 --- a/.idea/codeStyles/Project.xml +++ b/.idea/codeStyles/Project.xml @@ -1,5 +1,8 @@ + \ No newline at end of file diff --git a/docs/source/deploying-a-node.rst b/docs/source/deploying-a-node.rst index 453e1638b0..7b18a10351 100644 --- a/docs/source/deploying-a-node.rst +++ b/docs/source/deploying-a-node.rst @@ -35,9 +35,11 @@ handling, and ensures the Corda service is run at boot. 6. Save the below as ``/opt/corda/node.conf``. See :doc:`corda-configuration-file` for a description of these options:: - basedir : "/opt/corda" p2pAddress : "example.com:10002" - rpcAddress : "example.com:10003" + rpcSettings { + address: "example.com:10003" + adminAddress: "example.com:10004" + } h2port : 11000 emailAddress : "you@example.com" myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB" @@ -56,18 +58,19 @@ handling, and ensures the Corda service is run at boot. 7. Make the following changes to ``/opt/corda/node.conf``: - * Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address. - This is the address other nodes or RPC interfaces will use to communicate with your node - * Change the ports if necessary, for example if you are running multiple nodes on one server (see below) + * Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match + your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to + communicate with your node. + * Change the ports if necessary, for example if you are running multiple nodes on one server (see below). * Enter an email address which will be used as an administrative contact during the registration process. This is - only visible to the permissioning service + only visible to the permissioning service. * Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely - change as it should represent the legal identity of your node + change as it should represent the legal identity of your node. * Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea) * Location (``L=``) is your nearest city * Country (``C=``) is the `ISO 3166-1 alpha-2 code `_ - * Change the RPC username and password + * Change the RPC username and password. .. note:: Ubuntu 16.04 and most current Linux distributions use SystemD, so if you are running one of these distributions follow the steps marked **SystemD**. @@ -202,15 +205,16 @@ at boot, and means the Corda service stays running with no users connected to th 3. Save the below as ``C:\Corda\node.conf``. See :doc:`corda-configuration-file` for a description of these options:: - basedir : "C:\\Corda" p2pAddress : "example.com:10002" - rpcAddress : "example.com:10003" + rpcSettings { + address: "example.com:10003" + adminAddress: "example.com:10004" + } h2port : 11000 emailAddress: "you@example.com" myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB" keyStorePassword : "cordacadevpass" trustStorePassword : "trustpass" - extraAdvertisedServiceIds: [ "" ] devMode : false rpcUsers=[ { @@ -224,18 +228,19 @@ at boot, and means the Corda service stays running with no users connected to th 4. Make the following changes to ``C:\Corda\node.conf``: - * Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address. - This is the address other nodes or RPC interfaces will use to communicate with your node - * Change the ports if necessary, for example if you are running multiple nodes on one server (see below) + * Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match + your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to + communicate with your node. + * Change the ports if necessary, for example if you are running multiple nodes on one server (see below). * Enter an email address which will be used as an administrative contact during the registration process. This is - only visible to the permissioning service + only visible to the permissioning service. * Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely - change as it should represent the legal identity of your node + change as it should represent the legal identity of your node. * Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea) * Location (``L=``) is your nearest city * Country (``C=``) is the `ISO 3166-1 alpha-2 code `_ - * Change the RPC username and password + * Change the RPC username and password. 5. Copy the required Java keystores to the node. See :doc:`permissioning` diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt index cea4680411..55282c558c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionRejectException.kt @@ -1,8 +1,16 @@ package net.corda.node.services.statemachine import net.corda.core.CordaException +import net.corda.core.flows.FlowLogic /** * An exception propagated and thrown in case a session initiation fails. */ -class SessionRejectException(message: String) : CordaException(message) +open class SessionRejectException(message: String) : CordaException(message) { + class UnknownClass(val initiatorFlowClassName: String) : SessionRejectException("Don't know $initiatorFlowClassName") + + class NotAFlow(val initiatorClass: Class<*>) : SessionRejectException("${initiatorClass.name} is not a flow") + + class NotRegistered(val initiatorFlowClass: Class>) : SessionRejectException("${initiatorFlowClass.name} is not registered") +} + diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 60fb528bcb..112b65ace0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -18,7 +18,8 @@ import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed -import net.corda.core.serialization.* +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointDeserialize @@ -32,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine @@ -92,8 +92,6 @@ class SingleThreadedStateMachineManager( val timedFlows = HashMap() } - override val flowHospital: StaffedFlowHospital = StaffedFlowHospital() - private val mutex = ThreadBox(InnerState()) private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor) private val timeoutScheduler = Executors.newScheduledThreadPool(1) @@ -104,12 +102,14 @@ class SingleThreadedStateMachineManager( private val sessionToFlow = ConcurrentHashMap() private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null - private val transitionExecutor = makeTransitionExecutor() private val ourSenderUUID = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: CheckpointSerializationContext? = null private var actionExecutor: ActionExecutor? = null + override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID) + private val transitionExecutor = makeTransitionExecutor() + override val allStateMachines: List> get() = mutex.locked { flows.values.map { it.fiber.logic } } @@ -204,7 +204,7 @@ class SingleThreadedStateMachineManager( invocationContext = context, flowLogic = flowLogic, flowStart = FlowStart.Explicit, - ourIdentity = ourIdentity ?: getOurFirstIdentity(), + ourIdentity = ourIdentity ?: ourFirstIdentity, deduplicationHandler = deduplicationHandler, isStartIdempotent = false ) @@ -402,23 +402,23 @@ class SingleThreadedStateMachineManager( } private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) { - val message: ReceivedMessage = event.receivedMessage - val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler - val peer = message.peer + val peer = event.receivedMessage.peer val sessionMessage = try { - message.data.deserialize() + event.receivedMessage.data.deserialize() } catch (ex: Exception) { logger.error("Received corrupt SessionMessage data from $peer") - deduplicationHandler.afterDatabaseTransaction() + event.deduplicationHandler.afterDatabaseTransaction() return } val sender = serviceHub.networkMapCache.getPeerByLegalName(peer) if (sender != null) { when (sessionMessage) { - is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender) - is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender) + is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender) + is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event) } } else { + // TODO Send the event to the flow hospital to be retried on network map update + // TODO Test that restarting the node attempts to retry logger.error("Unknown peer $peer in $sessionMessage") } } @@ -448,14 +448,8 @@ class SingleThreadedStateMachineManager( } } - private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) { - fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage { - val errorId = secureRandom.nextLong() - val payload = RejectSessionMessage(message, errorId) - return ExistingSessionMessage(initiatorSessionId, payload) - } - - val replyError = try { + private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) { + try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) val senderSession = FlowSessionImpl(sender, initiatedSessionId) @@ -465,40 +459,34 @@ class SingleThreadedStateMachineManager( is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName) } val senderCoreFlowVersion = when (initiatedFlowFactory) { - is InitiatedFlowFactory.Core -> senderPlatformVersion + is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion is InitiatedFlowFactory.CorDapp -> null } - startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) - null - } catch (exception: Exception) { - logger.warn("Exception while creating initiated flow", exception) - createErrorMessage( - sessionMessage.initiatorSessionId, - (exception as? SessionRejectException)?.message ?: "Unable to establish session" - ) - } - - if (replyError != null) { - flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) - deduplicationHandler.afterDatabaseTransaction() + startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) + } catch (t: Throwable) { + logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " + + "flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t) + flowHospital.sessionInitErrored(sessionMessage, sender, event, t) } } // TODO this is a temporary hack until we figure out multiple identities - private fun getOurFirstIdentity(): Party { - return serviceHub.myInfo.legalIdentities[0] - } + private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0] private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> { - val initiatingFlowClass = try { - Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java) + val initiatorClass = try { + Class.forName(message.initiatorFlowClassName, true, classloader) } catch (e: ClassNotFoundException) { - throw SessionRejectException("Don't know ${message.initiatorFlowClassName}") - } catch (e: ClassCastException) { - throw SessionRejectException("${message.initiatorFlowClassName} is not a flow") + throw SessionRejectException.UnknownClass(message.initiatorFlowClassName) } - return serviceHub.getFlowFactory(initiatingFlowClass) - ?: throw SessionRejectException("$initiatingFlowClass is not registered") + + val initiatorFlowClass = try { + initiatorClass.asSubclass(FlowLogic::class.java) + } catch (e: ClassCastException) { + throw SessionRejectException.NotAFlow(initiatorClass) + } + + return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass) } private fun startInitiatedFlow( @@ -511,7 +499,7 @@ class SingleThreadedStateMachineManager( initiatedFlowInfo: FlowInfo ) { val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo) - val ourIdentity = getOurFirstIdentity() + val ourIdentity = ourFirstIdentity startFlowInternal( InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity, initiatingMessageDeduplicationHandler, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 512497b6b9..f29749566c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -1,6 +1,8 @@ package net.corda.node.services.statemachine +import net.corda.core.crypto.newSecureRandom import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox import net.corda.core.internal.TimedFlow import net.corda.core.internal.bufferUntilSubscribed @@ -16,65 +18,101 @@ import java.util.* /** * This hospital consults "staff" to see if they can automatically diagnose and treat flows. */ -class StaffedFlowHospital { +class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) { private companion object { private val log = contextLogger() private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor) } private val mutex = ThreadBox(object { - val patients = HashMap() + val flowPatients = HashMap() + val treatableSessionInits = HashMap() val recordsPublisher = PublishSubject.create() }) + private val secureRandom = newSecureRandom() - class MedicalHistory { - internal val records: MutableList = mutableListOf() - - fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean { - val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount - return records - .filterIsInstance() - .count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max + /** + * The node was unable to initiate the [InitialSessionMessage] from [sender]. + */ + fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) { + val time = Instant.now() + val id = UUID.randomUUID() + val outcome = if (error is SessionRejectException.UnknownClass) { + // We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is + // installed on restart, at which point the message will be able proceed as normal. If not then it will need + // to be dropped manually. + Outcome.OVERNIGHT_OBSERVATION + } else { + Outcome.UNTREATABLE } - override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" + val record = sessionMessage.run { MedicalRecord.SessionInit(id, time, outcome, initiatorFlowClassName, flowVersion, appName, sender, error) } + mutex.locked { + if (outcome != Outcome.UNTREATABLE) { + treatableSessionInits[id] = InternalSessionInitRecord(sessionMessage, event, record) + } + recordsPublisher.onNext(record) + } + + if (outcome == Outcome.UNTREATABLE) { + sendBackError(error, sessionMessage, sender, event) + } + } + + private fun sendBackError(error: Throwable, sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) { + val message = (error as? SessionRejectException)?.message ?: "Unable to establish session" + val payload = RejectSessionMessage(message, secureRandom.nextLong()) + val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload) + + flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) + event.deduplicationHandler.afterDatabaseTransaction() + } + + /** + * Drop the errored session-init message with the given ID ([MedicalRecord.SessionInit.id]). This will cause the node + * to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker + * so that it never gets redelivered. + */ + fun dropSessionInit(id: UUID) { + val (sessionMessage, event, publicRecord) = mutex.locked { + requireNotNull(treatableSessionInits.remove(id)) { "$id does not refer to any session init message" } + } + log.info("Errored session-init permanently dropped: $publicRecord") + sendBackError(publicRecord.error, sessionMessage, publicRecord.sender, event) } /** * The flow running in [flowFiber] has errored. */ fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + val time = Instant.now() log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") - val suspendCount = currentState.checkpoint.numberOfSuspends val event = mutex.locked { - val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() } - - val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount) - medicalHistory.records += admitted - recordsPublisher.onNext(admitted) + val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() } val report = consultStaff(flowFiber, currentState, errors, medicalHistory) - val (newRecord, event) = when (report.diagnosis) { + val (outcome, event) = when (report.diagnosis) { Diagnosis.DISCHARGE -> { log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}") - Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint) + Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint) } Diagnosis.OVERNIGHT_OBSERVATION -> { log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}") // We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart - Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null) + Pair(Outcome.OVERNIGHT_OBSERVATION, null) } Diagnosis.NOT_MY_SPECIALTY -> { // None of the staff care for these errors so we let them propagate log.info("Flow ${flowFiber.id} error allowed to propagate") - Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation) + Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation) } } - medicalHistory.records += newRecord - recordsPublisher.onNext(newRecord) + val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome) + medicalHistory.records += record + recordsPublisher.onNext(record) event } @@ -86,8 +124,9 @@ class StaffedFlowHospital { private fun consultStaff(flowFiber: FlowFiber, currentState: StateMachineState, errors: List, - medicalHistory: MedicalHistory): ConsultationReport { + medicalHistory: FlowMedicalHistory): ConsultationReport { return errors + .asSequence() .mapIndexed { index, error -> log.info("Flow ${flowFiber.id} has error [$index]", error) val diagnoses: Map> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) } @@ -105,43 +144,61 @@ class StaffedFlowHospital { * The flow has been removed from the state machine. */ fun flowRemoved(flowId: StateMachineRunId) { - mutex.locked { patients.remove(flowId) } + mutex.locked { flowPatients.remove(flowId) } } // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC /** Returns a stream of medical records as flows pass through the hospital. */ fun track(): DataFeed, MedicalRecord> { return mutex.locked { - DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed()) + val snapshot = (flowPatients.values.flatMap { it.records } + treatableSessionInits.values.map { it.publicRecord }).sortedBy { it.time } + DataFeed(snapshot, recordsPublisher.bufferUntilSubscribed()) } } - sealed class MedicalRecord { - abstract val flowId: StateMachineRunId - abstract val at: Instant - abstract val suspendCount: Int + class FlowMedicalHistory { + internal val records: MutableList = mutableListOf() - data class Admitted(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int) : MedicalRecord() + fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean { + val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends + return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max + } - data class Discharged(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int, - val by: List, - val errors: List) : MedicalRecord() - - data class KeptInForObservation(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int, - val by: List, - val errors: List) : MedicalRecord() - - data class NothingWeCanDo(override val flowId: StateMachineRunId, - override val at: Instant, - override val suspendCount: Int) : MedicalRecord() + override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" } + private data class InternalSessionInitRecord(val sessionMessage: InitialSessionMessage, + val event: ExternalEvent.ExternalMessageEvent, + val publicRecord: MedicalRecord.SessionInit) + + sealed class MedicalRecord { + abstract val time: Instant + abstract val outcome: Outcome + abstract val errors: List + + /** Medical record for a flow that has errored. */ + data class Flow(override val time: Instant, + val flowId: StateMachineRunId, + val suspendCount: Int, + override val errors: List, + val by: List, + override val outcome: Outcome) : MedicalRecord() + + /** Medical record for a session initiation that was unsuccessful. */ + data class SessionInit(val id: UUID, + override val time: Instant, + override val outcome: Outcome, + val initiatorFlowClassName: String, + val flowVersion: Int, + val appName: String, + val sender: Party, + val error: Throwable) : MedicalRecord() { + override val errors: List get() = listOf(error) + } + } + + enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE } + /** The order of the enum values are in priority order. */ enum class Diagnosis { /** Retry from last safe point. */ @@ -153,14 +210,14 @@ class StaffedFlowHospital { } interface Staff { - fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis + fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis } /** * SQL Deadlock detection. */ object DeadlockNurse : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { return if (mentionsDeadlock(newError)) { Diagnosis.DISCHARGE } else { @@ -178,8 +235,8 @@ class StaffedFlowHospital { * Primary key violation detection for duplicate inserts. Will detect other constraint violations too. */ object DuplicateInsertSpecialist : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { - return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this)) { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { + return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) { Diagnosis.DISCHARGE } else { Diagnosis.NOT_MY_SPECIALTY @@ -196,9 +253,9 @@ class StaffedFlowHospital { * exceed the limit specified by the [FlowTimeoutException]. */ object DoctorTimeout : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { if (newError is FlowTimeoutException) { - if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) { + if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this, currentState)) { return Diagnosis.DISCHARGE } else { val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " + @@ -216,12 +273,18 @@ class StaffedFlowHospital { * Parks [FinalityHandler]s for observation. */ object FinalityDoctor : Staff { - override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { - return (currentState.flowLogic as? FinalityHandler)?.let { logic -> Diagnosis.OVERNIGHT_OBSERVATION.also { warn(logic, flowFiber, currentState) } } ?: Diagnosis.NOT_MY_SPECIALTY + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { + return if (currentState.flowLogic is FinalityHandler) { + warn(currentState.flowLogic, flowFiber, currentState) + Diagnosis.OVERNIGHT_OBSERVATION + } else { + Diagnosis.NOT_MY_SPECIALTY + } } private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) { - log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}") + log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + + "the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}") } } } diff --git a/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt b/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt index 9a09548b82..c68a0f2ad6 100644 --- a/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt @@ -10,8 +10,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.POUNDS import net.corda.finance.contracts.asset.Cash import net.corda.finance.issuedBy -import net.corda.node.services.statemachine.StaffedFlowHospital -import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation +import net.corda.node.services.statemachine.StaffedFlowHospital.* import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity @@ -72,11 +71,11 @@ class FinalityHandlerTest { val keptInForObservation = smm.flowHospital .track() .let { it.updates.startWith(it.snapshot) } - .filter { it.flowId == runId } - .ofType(KeptInForObservation::class.java) + .ofType(MedicalRecord.Flow::class.java) + .filter { it.flowId == runId && it.outcome == Outcome.OVERNIGHT_OBSERVATION } .toBlocking() .first() - assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor) + assertThat(keptInForObservation.by).contains(FinalityDoctor) } private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index d1517dc318..53ec7d9f2a 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.concurrent.Semaphore +import net.corda.client.rpc.notUsed import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef @@ -68,7 +69,7 @@ class FlowFrameworkTests { @Before fun setUpMockNet() { mockNet = InternalMockNetwork( - cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), + cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), servicePeerAllocationStrategy = RoundRobin() ) @@ -100,8 +101,8 @@ class FlowFrameworkTests { assertThat(flow.lazyTime).isNotNull() } - class SuspendThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor { - var thrown = false + class SuspendThrowingActionExecutor(private val exception: Exception, private val delegate: ActionExecutor) : ActionExecutor { + private var thrown = false @Suspendable override fun executeAction(fiber: FlowFiber, action: Action) { if (action is Action.CommitTransaction && !thrown) { @@ -367,10 +368,17 @@ class FlowFrameworkTests { } @Test - fun `unknown class in session init`() { + fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() { aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob) mockNet.runNetwork() - assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected + assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital + val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot + assertThat(medicalRecords).hasSize(1) + val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit + assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class") + bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender + mockNet.runNetwork() + assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class") } @@ -603,10 +611,6 @@ class FlowFrameworkTripartyTests { private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) - private fun assertSessionTransfers(vararg expected: SessionTransfer) { - assertThat(receivedSessionMessages).containsExactly(*expected) - } - private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } assertThat(actualForNode).containsExactly(*expected) @@ -749,16 +753,6 @@ class FlowFrameworkPersistenceTests { return newNode.getSingleFlow

().first } - private fun assertSessionTransfers(vararg expected: SessionTransfer) { - assertThat(receivedSessionMessages).containsExactly(*expected) - } - - private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { - val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } - assertThat(actualForNode).containsExactly(*expected) - return actualForNode - } - private fun receivedSessionMessagesObservable(): Observable { return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() } @@ -832,19 +826,6 @@ private val FlowLogic<*>.progressSteps: CordaFuture() { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 7822e6f136..4255224d16 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -175,8 +175,6 @@ class RetryFlowMockTest { TODO("not implemented") } }), nodeA.services.newContext()).get() - // Should be 2 records, one for admission and one for keep in. - records.next() records.next() // Killing it should remove it. nodeA.smm.killFlow(flow.id) diff --git a/release-tools/testing/README.md b/release-tools/testing/README.md index 0f859967f6..b49ce25426 100644 --- a/release-tools/testing/README.md +++ b/release-tools/testing/README.md @@ -58,7 +58,9 @@ This will create a new sub-task under each of the test tickets for `` ` ## Options -Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user `. For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`. +Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user `. You can also provide the user name in the environment variable, `JIRA_USER`. + +For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`. There is also a useful dry-run option, `--dry-run` or `-d`, that lets you run through the command without creating any tickets or applying any changes to JIRA. diff --git a/release-tools/testing/login_manager.py b/release-tools/testing/login_manager.py index 08a4e1412a..2fc38b1184 100644 --- a/release-tools/testing/login_manager.py +++ b/release-tools/testing/login_manager.py @@ -1,7 +1,7 @@ # {{{ Dependencies from __future__ import print_function -import sys +import sys, os try: from getpass import getpass @@ -41,9 +41,13 @@ def confirm(message, auto_yes=False): # {{{ login(account, user, password, use_keyring) - Present user with login prompt and return the provided username and password. If use_keyring is true, use previously provided password (if any) def login(account, user=None, password=None, use_keyring=True): if not user: - user = prompt('Username: ') - user = u'{}@r3.com'.format(user) if '@' not in user else user - if not user: return (None, None) + if 'JIRA_USER' not in os.environ: + user = prompt('Username: ') + user = u'{}@r3.com'.format(user) if '@' not in user else user + if not user: return (None, None) + else: + user = os.environ['JIRA_USER'] + print('Username: {}'.format(user)) else: user = u'{}@r3.com'.format(user) if '@' not in user else user print('Username: {}'.format(user)) diff --git a/release-tools/testing/test-manager b/release-tools/testing/test-manager index f0471bbb1b..d288817f63 100755 --- a/release-tools/testing/test-manager +++ b/release-tools/testing/test-manager @@ -16,10 +16,10 @@ try: def red(message): return colored(message, 'red') def yellow(message): return colored(message, 'yellow') def faint(message): return colored(message, 'white', attrs=['dark']) - def on_green(message): return colored(message, 'white', 'on_green') - def on_red(message): return colored(message, 'white', 'on_red') - def blue_on_white(message): return colored(message, 'blue', 'on_white') - def yellow_on_white(message): return colored(message, 'yellow', 'on_white') + def on_green(message): return colored(message, 'green') + def on_red(message): return colored(message, 'red') + def blue_on_white(message): return colored(message, 'blue') + def yellow_on_white(message): return colored(message, 'yellow') except: def blue(message): return u'[{}]'.format(message) def green(message): return message @@ -159,7 +159,10 @@ def create_version(args): jira.jira.create_version(name=version, project=project, description=version) print(u' {} - Created version for project {}'.format(green('SUCCESS'), blue(project))) except Exception as error: - print(u' {} - Failed to version: {}'.format(red('FAIL'), error)) + if args.verbose: + print(u' {} - Failed to version: {}'.format(red('FAIL'), error)) + else: + print(u' {} - Failed to version: {}'.format(red('FAIL'), error.text)) print()