Merge branch 'merge-99e9864' into os-merge-99e9864

# Conflicts:
#	.gitignore
#	.idea/codeStyles/Project.xml
This commit is contained in:
Shams Asari 2018-11-09 14:53:26 +00:00
commit 4801942a87
11 changed files with 251 additions and 170 deletions

View File

@ -1,5 +1,8 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<option name="LINE_SEPARATOR" value="&#10;" />
<option name="RIGHT_MARGIN" value="140" />
<option name="SOFT_MARGINS" value="140" />
<JetCodeStyleSettings>
<option name="PACKAGES_TO_USE_STAR_IMPORTS">
<value>
@ -8,6 +11,33 @@
<package name="tornadofx" withSubpackages="false" static="false" />
</value>
</option>
<option name="CONTINUATION_INDENT_IN_PARAMETER_LISTS" value="true" />
<option name="CONTINUATION_INDENT_IN_ARGUMENT_LISTS" value="true" />
<option name="CONTINUATION_INDENT_FOR_EXPRESSION_BODIES" value="true" />
<option name="CONTINUATION_INDENT_FOR_CHAINED_CALLS" value="true" />
<option name="CONTINUATION_INDENT_IN_SUPERTYPE_LISTS" value="true" />
<option name="CONTINUATION_INDENT_IN_IF_CONDITIONS" value="true" />
<option name="CONTINUATION_INDENT_IN_ELVIS" value="true" />
<option name="WRAP_EXPRESSION_BODY_FUNCTIONS" value="0" />
<option name="IF_RPAREN_ON_NEW_LINE" value="false" />
<option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
</JetCodeStyleSettings>
<editorconfig>
<option name="ENABLED" value="false" />
</editorconfig>
<codeStyleSettings language="kotlin">
<option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1" />
<option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="0" />
<option name="CALL_PARAMETERS_WRAP" value="0" />
<option name="CALL_PARAMETERS_LPAREN_ON_NEXT_LINE" value="false" />
<option name="CALL_PARAMETERS_RPAREN_ON_NEXT_LINE" value="false" />
<option name="METHOD_PARAMETERS_WRAP" value="0" />
<option name="METHOD_PARAMETERS_LPAREN_ON_NEXT_LINE" value="false" />
<option name="METHOD_PARAMETERS_RPAREN_ON_NEXT_LINE" value="false" />
<option name="EXTENDS_LIST_WRAP" value="0" />
<option name="ASSIGNMENT_WRAP" value="0" />
</codeStyleSettings>
</code_scheme>
</component>

View File

@ -35,9 +35,11 @@ handling, and ensures the Corda service is run at boot.
6. Save the below as ``/opt/corda/node.conf``. See :doc:`corda-configuration-file` for a description of these options::
basedir : "/opt/corda"
p2pAddress : "example.com:10002"
rpcAddress : "example.com:10003"
rpcSettings {
address: "example.com:10003"
adminAddress: "example.com:10004"
}
h2port : 11000
emailAddress : "you@example.com"
myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB"
@ -56,18 +58,19 @@ handling, and ensures the Corda service is run at boot.
7. Make the following changes to ``/opt/corda/node.conf``:
* Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address.
This is the address other nodes or RPC interfaces will use to communicate with your node
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below)
* Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match
your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to
communicate with your node.
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below).
* Enter an email address which will be used as an administrative contact during the registration process. This is
only visible to the permissioning service
only visible to the permissioning service.
* Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely
change as it should represent the legal identity of your node
change as it should represent the legal identity of your node.
* Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea)
* Location (``L=``) is your nearest city
* Country (``C=``) is the `ISO 3166-1 alpha-2 code <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_
* Change the RPC username and password
* Change the RPC username and password.
.. note:: Ubuntu 16.04 and most current Linux distributions use SystemD, so if you are running one of these
distributions follow the steps marked **SystemD**.
@ -202,15 +205,16 @@ at boot, and means the Corda service stays running with no users connected to th
3. Save the below as ``C:\Corda\node.conf``. See :doc:`corda-configuration-file` for a description of these options::
basedir : "C:\\Corda"
p2pAddress : "example.com:10002"
rpcAddress : "example.com:10003"
rpcSettings {
address: "example.com:10003"
adminAddress: "example.com:10004"
}
h2port : 11000
emailAddress: "you@example.com"
myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB"
keyStorePassword : "cordacadevpass"
trustStorePassword : "trustpass"
extraAdvertisedServiceIds: [ "" ]
devMode : false
rpcUsers=[
{
@ -224,18 +228,19 @@ at boot, and means the Corda service stays running with no users connected to th
4. Make the following changes to ``C:\Corda\node.conf``:
* Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address.
This is the address other nodes or RPC interfaces will use to communicate with your node
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below)
* Change the ``p2pAddress``, ``rpcSettings.address`` and ``rpcSettings.adminAddress`` values to match
your server's hostname or external IP address. These are the addresses other nodes or RPC interfaces will use to
communicate with your node.
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below).
* Enter an email address which will be used as an administrative contact during the registration process. This is
only visible to the permissioning service
only visible to the permissioning service.
* Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely
change as it should represent the legal identity of your node
change as it should represent the legal identity of your node.
* Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea)
* Location (``L=``) is your nearest city
* Country (``C=``) is the `ISO 3166-1 alpha-2 code <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_
* Change the RPC username and password
* Change the RPC username and password.
5. Copy the required Java keystores to the node. See :doc:`permissioning`

View File

@ -1,8 +1,16 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaException
import net.corda.core.flows.FlowLogic
/**
* An exception propagated and thrown in case a session initiation fails.
*/
class SessionRejectException(message: String) : CordaException(message)
open class SessionRejectException(message: String) : CordaException(message) {
class UnknownClass(val initiatorFlowClassName: String) : SessionRejectException("Don't know $initiatorFlowClassName")
class NotAFlow(val initiatorClass: Class<*>) : SessionRejectException("${initiatorClass.name} is not a flow")
class NotRegistered(val initiatorFlowClass: Class<out FlowLogic<*>>) : SessionRejectException("${initiatorFlowClass.name} is not registered")
}

View File

@ -18,7 +18,8 @@ import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointDeserialize
@ -32,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.transitions.StateMachine
@ -92,8 +92,6 @@ class SingleThreadedStateMachineManager(
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
}
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital()
private val mutex = ThreadBox(InnerState())
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
@ -104,12 +102,14 @@ class SingleThreadedStateMachineManager(
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
private val transitionExecutor = makeTransitionExecutor()
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: CheckpointSerializationContext? = null
private var actionExecutor: ActionExecutor? = null
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID)
private val transitionExecutor = makeTransitionExecutor()
override val allStateMachines: List<FlowLogic<*>>
get() = mutex.locked { flows.values.map { it.fiber.logic } }
@ -204,7 +204,7 @@ class SingleThreadedStateMachineManager(
invocationContext = context,
flowLogic = flowLogic,
flowStart = FlowStart.Explicit,
ourIdentity = ourIdentity ?: getOurFirstIdentity(),
ourIdentity = ourIdentity ?: ourFirstIdentity,
deduplicationHandler = deduplicationHandler,
isStartIdempotent = false
)
@ -402,23 +402,23 @@ class SingleThreadedStateMachineManager(
}
private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) {
val message: ReceivedMessage = event.receivedMessage
val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler
val peer = message.peer
val peer = event.receivedMessage.peer
val sessionMessage = try {
message.data.deserialize<SessionMessage>()
event.receivedMessage.data.deserialize<SessionMessage>()
} catch (ex: Exception) {
logger.error("Received corrupt SessionMessage data from $peer")
deduplicationHandler.afterDatabaseTransaction()
event.deduplicationHandler.afterDatabaseTransaction()
return
}
val sender = serviceHub.networkMapCache.getPeerByLegalName(peer)
if (sender != null) {
when (sessionMessage) {
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender)
is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender)
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender)
is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event)
}
} else {
// TODO Send the event to the flow hospital to be retried on network map update
// TODO Test that restarting the node attempts to retry
logger.error("Unknown peer $peer in $sessionMessage")
}
}
@ -448,14 +448,8 @@ class SingleThreadedStateMachineManager(
}
}
private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) {
fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage {
val errorId = secureRandom.nextLong()
val payload = RejectSessionMessage(message, errorId)
return ExistingSessionMessage(initiatorSessionId, payload)
}
val replyError = try {
private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
try {
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
val initiatedSessionId = SessionId.createRandom(secureRandom)
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
@ -465,40 +459,34 @@ class SingleThreadedStateMachineManager(
is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName)
}
val senderCoreFlowVersion = when (initiatedFlowFactory) {
is InitiatedFlowFactory.Core -> senderPlatformVersion
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
is InitiatedFlowFactory.CorDapp -> null
}
startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
null
} catch (exception: Exception) {
logger.warn("Exception while creating initiated flow", exception)
createErrorMessage(
sessionMessage.initiatorSessionId,
(exception as? SessionRejectException)?.message ?: "Unable to establish session"
)
}
if (replyError != null) {
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
deduplicationHandler.afterDatabaseTransaction()
startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
} catch (t: Throwable) {
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
flowHospital.sessionInitErrored(sessionMessage, sender, event, t)
}
}
// TODO this is a temporary hack until we figure out multiple identities
private fun getOurFirstIdentity(): Party {
return serviceHub.myInfo.legalIdentities[0]
}
private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0]
private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> {
val initiatingFlowClass = try {
Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java)
val initiatorClass = try {
Class.forName(message.initiatorFlowClassName, true, classloader)
} catch (e: ClassNotFoundException) {
throw SessionRejectException("Don't know ${message.initiatorFlowClassName}")
} catch (e: ClassCastException) {
throw SessionRejectException("${message.initiatorFlowClassName} is not a flow")
throw SessionRejectException.UnknownClass(message.initiatorFlowClassName)
}
return serviceHub.getFlowFactory(initiatingFlowClass)
?: throw SessionRejectException("$initiatingFlowClass is not registered")
val initiatorFlowClass = try {
initiatorClass.asSubclass(FlowLogic::class.java)
} catch (e: ClassCastException) {
throw SessionRejectException.NotAFlow(initiatorClass)
}
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
}
private fun <A> startInitiatedFlow(
@ -511,7 +499,7 @@ class SingleThreadedStateMachineManager(
initiatedFlowInfo: FlowInfo
) {
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
val ourIdentity = getOurFirstIdentity()
val ourIdentity = ourFirstIdentity
startFlowInternal(
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
initiatingMessageDeduplicationHandler,

View File

@ -1,6 +1,8 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.newSecureRandom
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.bufferUntilSubscribed
@ -16,65 +18,101 @@ import java.util.*
/**
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
*/
class StaffedFlowHospital {
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
private companion object {
private val log = contextLogger()
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
}
private val mutex = ThreadBox(object {
val patients = HashMap<StateMachineRunId, MedicalHistory>()
val flowPatients = HashMap<StateMachineRunId, FlowMedicalHistory>()
val treatableSessionInits = HashMap<UUID, InternalSessionInitRecord>()
val recordsPublisher = PublishSubject.create<MedicalRecord>()
})
private val secureRandom = newSecureRandom()
class MedicalHistory {
internal val records: MutableList<MedicalRecord> = mutableListOf()
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean {
val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount
return records
.filterIsInstance<MedicalRecord.Discharged>()
.count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
/**
* The node was unable to initiate the [InitialSessionMessage] from [sender].
*/
fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) {
val time = Instant.now()
val id = UUID.randomUUID()
val outcome = if (error is SessionRejectException.UnknownClass) {
// We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is
// installed on restart, at which point the message will be able proceed as normal. If not then it will need
// to be dropped manually.
Outcome.OVERNIGHT_OBSERVATION
} else {
Outcome.UNTREATABLE
}
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
val record = sessionMessage.run { MedicalRecord.SessionInit(id, time, outcome, initiatorFlowClassName, flowVersion, appName, sender, error) }
mutex.locked {
if (outcome != Outcome.UNTREATABLE) {
treatableSessionInits[id] = InternalSessionInitRecord(sessionMessage, event, record)
}
recordsPublisher.onNext(record)
}
if (outcome == Outcome.UNTREATABLE) {
sendBackError(error, sessionMessage, sender, event)
}
}
private fun sendBackError(error: Throwable, sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
val message = (error as? SessionRejectException)?.message ?: "Unable to establish session"
val payload = RejectSessionMessage(message, secureRandom.nextLong())
val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload)
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
event.deduplicationHandler.afterDatabaseTransaction()
}
/**
* Drop the errored session-init message with the given ID ([MedicalRecord.SessionInit.id]). This will cause the node
* to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker
* so that it never gets redelivered.
*/
fun dropSessionInit(id: UUID) {
val (sessionMessage, event, publicRecord) = mutex.locked {
requireNotNull(treatableSessionInits.remove(id)) { "$id does not refer to any session init message" }
}
log.info("Errored session-init permanently dropped: $publicRecord")
sendBackError(publicRecord.error, sessionMessage, publicRecord.sender, event)
}
/**
* The flow running in [flowFiber] has errored.
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
val time = Instant.now()
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
val suspendCount = currentState.checkpoint.numberOfSuspends
val event = mutex.locked {
val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() }
val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount)
medicalHistory.records += admitted
recordsPublisher.onNext(admitted)
val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() }
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
val (newRecord, event) = when (report.diagnosis) {
val (outcome, event) = when (report.diagnosis) {
Diagnosis.DISCHARGE -> {
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint)
Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null)
Pair(Outcome.OVERNIGHT_OBSERVATION, null)
}
Diagnosis.NOT_MY_SPECIALTY -> {
// None of the staff care for these errors so we let them propagate
log.info("Flow ${flowFiber.id} error allowed to propagate")
Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation)
Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation)
}
}
medicalHistory.records += newRecord
recordsPublisher.onNext(newRecord)
val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome)
medicalHistory.records += record
recordsPublisher.onNext(record)
event
}
@ -86,8 +124,9 @@ class StaffedFlowHospital {
private fun consultStaff(flowFiber: FlowFiber,
currentState: StateMachineState,
errors: List<Throwable>,
medicalHistory: MedicalHistory): ConsultationReport {
medicalHistory: FlowMedicalHistory): ConsultationReport {
return errors
.asSequence()
.mapIndexed { index, error ->
log.info("Flow ${flowFiber.id} has error [$index]", error)
val diagnoses: Map<Diagnosis, List<Staff>> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) }
@ -105,43 +144,61 @@ class StaffedFlowHospital {
* The flow has been removed from the state machine.
*/
fun flowRemoved(flowId: StateMachineRunId) {
mutex.locked { patients.remove(flowId) }
mutex.locked { flowPatients.remove(flowId) }
}
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
/** Returns a stream of medical records as flows pass through the hospital. */
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
return mutex.locked {
DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed())
val snapshot = (flowPatients.values.flatMap { it.records } + treatableSessionInits.values.map { it.publicRecord }).sortedBy { it.time }
DataFeed(snapshot, recordsPublisher.bufferUntilSubscribed())
}
}
sealed class MedicalRecord {
abstract val flowId: StateMachineRunId
abstract val at: Instant
abstract val suspendCount: Int
class FlowMedicalHistory {
internal val records: MutableList<MedicalRecord.Flow> = mutableListOf()
data class Admitted(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int) : MedicalRecord()
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean {
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
}
data class Discharged(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int,
val by: List<Staff>,
val errors: List<Throwable>) : MedicalRecord()
data class KeptInForObservation(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int,
val by: List<Staff>,
val errors: List<Throwable>) : MedicalRecord()
data class NothingWeCanDo(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int) : MedicalRecord()
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
}
private data class InternalSessionInitRecord(val sessionMessage: InitialSessionMessage,
val event: ExternalEvent.ExternalMessageEvent,
val publicRecord: MedicalRecord.SessionInit)
sealed class MedicalRecord {
abstract val time: Instant
abstract val outcome: Outcome
abstract val errors: List<Throwable>
/** Medical record for a flow that has errored. */
data class Flow(override val time: Instant,
val flowId: StateMachineRunId,
val suspendCount: Int,
override val errors: List<Throwable>,
val by: List<Staff>,
override val outcome: Outcome) : MedicalRecord()
/** Medical record for a session initiation that was unsuccessful. */
data class SessionInit(val id: UUID,
override val time: Instant,
override val outcome: Outcome,
val initiatorFlowClassName: String,
val flowVersion: Int,
val appName: String,
val sender: Party,
val error: Throwable) : MedicalRecord() {
override val errors: List<Throwable> get() = listOf(error)
}
}
enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE }
/** The order of the enum values are in priority order. */
enum class Diagnosis {
/** Retry from last safe point. */
@ -153,14 +210,14 @@ class StaffedFlowHospital {
}
interface Staff {
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
}
/**
* SQL Deadlock detection.
*/
object DeadlockNurse : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (mentionsDeadlock(newError)) {
Diagnosis.DISCHARGE
} else {
@ -178,8 +235,8 @@ class StaffedFlowHospital {
* Primary key violation detection for duplicate inserts. Will detect other constraint violations too.
*/
object DuplicateInsertSpecialist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this)) {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.NOT_MY_SPECIALTY
@ -196,9 +253,9 @@ class StaffedFlowHospital {
* exceed the limit specified by the [FlowTimeoutException].
*/
object DoctorTimeout : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
if (newError is FlowTimeoutException) {
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this, currentState)) {
return Diagnosis.DISCHARGE
} else {
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
@ -216,12 +273,18 @@ class StaffedFlowHospital {
* Parks [FinalityHandler]s for observation.
*/
object FinalityDoctor : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
return (currentState.flowLogic as? FinalityHandler)?.let { logic -> Diagnosis.OVERNIGHT_OBSERVATION.also { warn(logic, flowFiber, currentState) } } ?: Diagnosis.NOT_MY_SPECIALTY
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (currentState.flowLogic is FinalityHandler) {
warn(currentState.flowLogic, flowFiber, currentState)
Diagnosis.OVERNIGHT_OBSERVATION
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) {
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
}
}
}

View File

@ -10,8 +10,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.finance.POUNDS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.issuedBy
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation
import net.corda.node.services.statemachine.StaffedFlowHospital.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
@ -72,11 +71,11 @@ class FinalityHandlerTest {
val keptInForObservation = smm.flowHospital
.track()
.let { it.updates.startWith(it.snapshot) }
.filter { it.flowId == runId }
.ofType(KeptInForObservation::class.java)
.ofType(MedicalRecord.Flow::class.java)
.filter { it.flowId == runId && it.outcome == Outcome.OVERNIGHT_OBSERVATION }
.toBlocking()
.first()
assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor)
assertThat(keptInForObservation.by).contains(FinalityDoctor)
}
private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? {

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.client.rpc.notUsed
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
@ -68,7 +69,7 @@ class FlowFrameworkTests {
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"),
servicePeerAllocationStrategy = RoundRobin()
)
@ -100,8 +101,8 @@ class FlowFrameworkTests {
assertThat(flow.lazyTime).isNotNull()
}
class SuspendThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
var thrown = false
class SuspendThrowingActionExecutor(private val exception: Exception, private val delegate: ActionExecutor) : ActionExecutor {
private var thrown = false
@Suspendable
override fun executeAction(fiber: FlowFiber, action: Action) {
if (action is Action.CommitTransaction && !thrown) {
@ -367,10 +368,17 @@ class FlowFrameworkTests {
}
@Test
fun `unknown class in session init`() {
fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() {
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
assertThat(medicalRecords).hasSize(1)
val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit
assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class")
bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class")
}
@ -603,10 +611,6 @@ class FlowFrameworkTripartyTests {
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
assertThat(receivedSessionMessages).containsExactly(*expected)
}
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
assertThat(actualForNode).containsExactly(*expected)
@ -749,16 +753,6 @@ class FlowFrameworkPersistenceTests {
return newNode.getSingleFlow<P>().first
}
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
assertThat(receivedSessionMessages).containsExactly(*expected)
}
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
assertThat(actualForNode).containsExactly(*expected)
return actualForNode
}
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
}
@ -832,19 +826,6 @@ private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTr
.toFuture()
}
class ThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
var thrown = false
@Suspendable
override fun executeAction(fiber: FlowFiber, action: Action) {
if (thrown) {
delegate.executeAction(fiber, action)
} else {
thrown = true
throw exception
}
}
}
@InitiatingFlow
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {

View File

@ -175,8 +175,6 @@ class RetryFlowMockTest {
TODO("not implemented")
}
}), nodeA.services.newContext()).get()
// Should be 2 records, one for admission and one for keep in.
records.next()
records.next()
// Killing it should remove it.
nodeA.smm.killFlow(flow.id)

View File

@ -58,7 +58,9 @@ This will create a new sub-task under each of the test tickets for `<PRODUCT>` `
## Options
Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user <USER>`. For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`.
Each command described above has a set of additional options. More specifically, if you want to use a particular JIRA user instead of being prompted for a user name every time, you can specify `--user <USER>`. You can also provide the user name in the environment variable, `JIRA_USER`.
For verbose logging, you can supply `--verbose` or `-v`. And to auto-reply to the prompt of whether to proceed or not, provide `--yes` or `-y`.
There is also a useful dry-run option, `--dry-run` or `-d`, that lets you run through the command without creating any tickets or applying any changes to JIRA.

View File

@ -1,7 +1,7 @@
# {{{ Dependencies
from __future__ import print_function
import sys
import sys, os
try:
from getpass import getpass
@ -41,9 +41,13 @@ def confirm(message, auto_yes=False):
# {{{ login(account, user, password, use_keyring) - Present user with login prompt and return the provided username and password. If use_keyring is true, use previously provided password (if any)
def login(account, user=None, password=None, use_keyring=True):
if not user:
user = prompt('Username: ')
user = u'{}@r3.com'.format(user) if '@' not in user else user
if not user: return (None, None)
if 'JIRA_USER' not in os.environ:
user = prompt('Username: ')
user = u'{}@r3.com'.format(user) if '@' not in user else user
if not user: return (None, None)
else:
user = os.environ['JIRA_USER']
print('Username: {}'.format(user))
else:
user = u'{}@r3.com'.format(user) if '@' not in user else user
print('Username: {}'.format(user))

View File

@ -16,10 +16,10 @@ try:
def red(message): return colored(message, 'red')
def yellow(message): return colored(message, 'yellow')
def faint(message): return colored(message, 'white', attrs=['dark'])
def on_green(message): return colored(message, 'white', 'on_green')
def on_red(message): return colored(message, 'white', 'on_red')
def blue_on_white(message): return colored(message, 'blue', 'on_white')
def yellow_on_white(message): return colored(message, 'yellow', 'on_white')
def on_green(message): return colored(message, 'green')
def on_red(message): return colored(message, 'red')
def blue_on_white(message): return colored(message, 'blue')
def yellow_on_white(message): return colored(message, 'yellow')
except:
def blue(message): return u'[{}]'.format(message)
def green(message): return message
@ -159,7 +159,10 @@ def create_version(args):
jira.jira.create_version(name=version, project=project, description=version)
print(u' {} - Created version for project {}'.format(green('SUCCESS'), blue(project)))
except Exception as error:
print(u' {} - Failed to version: {}'.format(red('FAIL'), error))
if args.verbose:
print(u' {} - Failed to version: {}'.format(red('FAIL'), error))
else:
print(u' {} - Failed to version: {}'.format(red('FAIL'), error.text))
print()