Added --logging-level cmd line arg to set the logging level, and improved flow logging

This commit is contained in:
Shams Asari
2017-02-13 14:43:27 +00:00
parent 48c65ac5d2
commit 40dde555e7
24 changed files with 285 additions and 260 deletions

View File

@ -11,7 +11,6 @@ import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.driver.DriverBasedTest

View File

@ -2,8 +2,10 @@ package net.corda.node
import com.typesafe.config.Config
import joptsimple.OptionParser
import joptsimple.util.EnumConverter
import net.corda.core.div
import net.corda.node.services.config.ConfigHelper
import org.slf4j.event.Level
import java.io.PrintStream
import java.nio.file.Path
import java.nio.file.Paths
@ -20,6 +22,11 @@ class ArgsParser {
.accepts("config-file", "The path to the config file")
.withRequiredArg()
.defaultsTo("node.conf")
private val loggerLevel = optionParser
.accepts("logging-level", "Enable logging at this level and higher")
.withRequiredArg()
.withValuesConvertedBy(object : EnumConverter<Level>(Level::class.java) {})
.defaultsTo(Level.INFO)
private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
private val isWebserverArg = optionParser.accepts("webserver")
private val helpArg = optionParser.accepts("help").forHelp()
@ -31,14 +38,22 @@ class ArgsParser {
}
val baseDirectory = Paths.get(optionSet.valueOf(baseDirectoryArg)).normalize().toAbsolutePath()
val configFile = baseDirectory / optionSet.valueOf(configFileArg)
val help = optionSet.has(helpArg)
val loggingLevel = optionSet.valueOf(loggerLevel)
val logToConsole = optionSet.has(logToConsoleArg)
val isWebserver = optionSet.has(isWebserverArg)
return CmdLineOptions(baseDirectory, configFile, optionSet.has(helpArg), optionSet.has(logToConsoleArg), isWebserver)
return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isWebserver)
}
fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink)
}
data class CmdLineOptions(val baseDirectory: Path, val configFile: Path?, val help: Boolean, val logToConsole: Boolean, val isWebserver: Boolean) {
data class CmdLineOptions(val baseDirectory: Path,
val configFile: Path,
val help: Boolean,
val loggingLevel: Level,
val logToConsole: Boolean,
val isWebserver: Boolean) {
fun loadConfig(allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
return ConfigHelper.loadConfig(baseDirectory, configFile, allowMissingConfig, configOverrides)
}

View File

@ -2,6 +2,7 @@
package net.corda.node
import com.typesafe.config.ConfigException
import joptsimple.OptionException
import net.corda.core.*
import net.corda.core.utilities.Emoji
import net.corda.node.internal.Node
@ -14,7 +15,6 @@ import org.slf4j.LoggerFactory
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.nio.file.Paths
import kotlin.concurrent.thread
import kotlin.system.exitProcess
private var renderBasicInfoToConsole = true
@ -38,8 +38,9 @@ fun main(args: Array<String>) {
val cmdlineOptions = try {
argsParser.parse(*args)
} catch (ex: Exception) {
println("Unknown command line arguments: ${ex.message}")
} catch (ex: OptionException) {
println("Invalid command line arguments: ${ex.message}")
argsParser.printHelp(System.out)
exitProcess(1)
}
@ -49,10 +50,10 @@ fun main(args: Array<String>) {
exitProcess(0)
}
// Set up logging.
// Set up logging. These properties are referenced from the XML config file.
System.setProperty("defaultLogLevel", cmdlineOptions.loggingLevel.name.toLowerCase())
if (cmdlineOptions.logToConsole) {
// This property is referenced from the XML config file.
System.setProperty("consoleLogLevel", "info")
System.setProperty("consoleLogLevel", cmdlineOptions.loggingLevel.name.toLowerCase())
renderBasicInfoToConsole = false
}

View File

@ -147,7 +147,7 @@ sealed class PortAllocation {
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param useTestClock If true the test clock will be used in Node.
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode.
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging.
* @param dsl The dsl itself.
* @return The value returned in the [dsl] closure.
*/
@ -502,19 +502,19 @@ open class DriverDSL(
else
""
val additionalKeys = listOf("amq.delivery.delay.ms")
val inheritedProperties = listOf("amq.delivery.delay.ms")
val systemArgs = mutableMapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "Corda"
)
for (key in additionalKeys) {
if (System.getProperty(key) != null) {
systemArgs.set(key, System.getProperty(key))
}
inheritedProperties.forEach {
val property = System.getProperty(it)
if (property != null) systemArgs[it] = property
}
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val javaArgs = listOf(path) +
systemArgs.map { "-D${it.key}=${it.value}" } +
listOf(
@ -524,7 +524,8 @@ open class DriverDSL(
"-XX:+UseG1GC",
"-cp", classpath,
className,
"--base-directory=${nodeConf.baseDirectory}"
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel"
).filter(String::isNotEmpty)
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())

View File

@ -28,19 +28,16 @@ object ConfigHelper {
private val log = loggerFor<ConfigHelper>()
fun loadConfig(baseDirectory: Path,
configFileOverride: Path? = null,
configFile: Path = baseDirectory / "node.conf",
allowMissingConfig: Boolean = false,
configOverrides: Map<String, Any?> = emptyMap()): Config {
val defaultConfig = ConfigFactory.parseResources("reference.conf", ConfigParseOptions.defaults().setAllowMissing(false))
val configFile = configFileOverride ?: baseDirectory / "node.conf"
val appConfig = ConfigFactory.parseFile(configFile.toFile(), ConfigParseOptions.defaults().setAllowMissing(allowMissingConfig))
val parseOptions = ConfigParseOptions.defaults()
val defaultConfig = ConfigFactory.parseResources("reference.conf", parseOptions.setAllowMissing(false))
val appConfig = ConfigFactory.parseFile(configFile.toFile(), parseOptions.setAllowMissing(allowMissingConfig))
val overrideConfig = ConfigFactory.parseMap(configOverrides + mapOf(
// Add substitution values here
"basedir" to baseDirectory.toString())
)
val finalConfig = overrideConfig
.withFallback(appConfig)
.withFallback(defaultConfig)

View File

@ -67,7 +67,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// confusion.
const val TOPIC_PROPERTY = "platform-topic"
const val SESSION_ID_PROPERTY = "session-id"
val AMQ_DELAY = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
private val AMQ_DELAY: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
}
private class InnerState {
@ -88,12 +88,16 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
/** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("${config.myLegalName} Messaging", 1)
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
/**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress.asPeer(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
override val myAddress: SingleMessageRecipient = if (myIdentity != null) {
NodeAddress.asPeer(myIdentity, serverHostPort)
} else {
NetworkMapAddress(serverHostPort)
}
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
@ -108,7 +112,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
}
)
fun start(rpcOps: RPCOps, userService: RPCUserService) {
state.locked {
@ -257,7 +262,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Use the magic deduplication property built into Artemis as our message identity too
val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID))
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
log.info("Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid")
log.trace { "Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid" }
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
@ -372,11 +377,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (AMQ_DELAY > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + AMQ_DELAY);
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + AMQ_DELAY)
}
}
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " +
"uuid: ${message.uniqueMessageId}")
log.trace { "Send to: $mqAddress topic: ${message.topicSession.topic} " +
"sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}" }
producer!!.send(mqAddress, artemisMessage)
}
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.SecureHash
import net.corda.node.services.statemachine.StateMachineManager.FlowSession
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
interface FlowIORequest {
@ -22,17 +21,20 @@ interface SendRequest : SessionedFlowIORequest {
interface ReceiveRequest<T : SessionMessage> : SessionedFlowIORequest, WaitingRequest {
val receiveType: Class<T>
val userReceiveType: Class<*>?
}
data class SendAndReceive<T : SessionMessage>(override val session: FlowSession,
override val message: SessionMessage,
override val receiveType: Class<T>) : SendRequest, ReceiveRequest<T> {
override val receiveType: Class<T>,
override val userReceiveType: Class<*>?) : SendRequest, ReceiveRequest<T> {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}
data class ReceiveOnly<T : SessionMessage>(override val session: FlowSession,
override val receiveType: Class<T>) : ReceiveRequest<T> {
override val receiveType: Class<T>,
override val userReceiveType: Class<*>?) : ReceiveRequest<T> {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}

View File

@ -0,0 +1,44 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.node.services.statemachine.FlowSessionState.Initiated
import net.corda.node.services.statemachine.FlowSessionState.Initiating
import java.util.concurrent.ConcurrentLinkedQueue
class FlowSession(
val flow: FlowLogic<*>,
val ourSessionId: Long,
val initiatingParty: Party?,
var state: FlowSessionState)
{
val receivedMessages = ConcurrentLinkedQueue<ReceivedSessionMessage<*>>()
val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*>
override fun toString(): String {
return "${javaClass.simpleName}(flow=$flow, ourSessionId=$ourSessionId, initiatingParty=$initiatingParty, state=$state)"
}
}
/**
* [FlowSessionState] describes the session's state.
*
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party] corresponding to either a
* specific peer or a service.
* [Initiated] is post-handshake. At this point [Initiating.otherParty] will have been resolved to a specific peer
* [Initiated.peerParty], and the peer's sessionId has been initialised.
*/
sealed class FlowSessionState {
abstract val sendToParty: Party
/** [otherParty] may be a specific peer or a service party */
class Initiating(val otherParty: Party) : FlowSessionState() {
override val sendToParty: Party get() = otherParty
override fun toString(): String = "${javaClass.simpleName}($otherParty)"
}
class Initiated(val peerParty: Party, val peerSessionId: Long) : FlowSessionState() {
override val sendToParty: Party get() = peerParty
override fun toString(): String = "${javaClass.simpleName}($peerParty, $peerSessionId)"
}
}

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.abbreviate
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
@ -15,10 +16,9 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.random63BitValue
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager.FlowSession
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState
import net.corda.node.utilities.StrandLocalTransactionManager
import net.corda.node.utilities.createDatabaseTransaction
import org.jetbrains.exposed.sql.Database
@ -28,11 +28,10 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.sql.SQLException
import java.util.*
import java.util.concurrent.ExecutionException
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>,
scheduler: FiberScheduler) : Fiber<Unit>("flow", scheduler), FlowStateMachine<R> {
scheduler: FiberScheduler) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R> {
companion object {
// Used to work around a small limitation in Quasar.
private val QUASAR_UNBLOCKER = run {
@ -56,9 +55,13 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Transient private var txTrampoline: Transaction? = null
@Transient private var _logger: Logger? = null
/**
* Return the logger for this state machine. The logger name incorporates [id] and so including this in the log
* message is not necessary.
*/
override val logger: Logger get() {
return _logger ?: run {
val l = LoggerFactory.getLogger(id.toString())
val l = LoggerFactory.getLogger("net.corda.flow.$id")
_logger = l
return l
}
@ -80,12 +83,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
init {
logic.stateMachine = this
name = id.toString()
}
@Suspendable
override fun run() {
createTransaction()
logger.debug { "Calling flow: $logic" }
val result = try {
logic.call()
} catch (e: FlowException) {
@ -93,11 +96,13 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val propagated = e.stackTrace[0].className == javaClass.name
actionOnEnd(e, propagated)
_resultFuture?.setException(e)
logger.debug(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e)
return
} catch (t: Throwable) {
logger.warn("Terminated by unexpected exception", t)
actionOnEnd(t, false)
_resultFuture?.setException(t)
throw ExecutionException(t)
return
}
// Only sessions which have a single send and nothing else will block here
@ -107,6 +112,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd(null, false)
_resultFuture?.set(result)
logger.debug { "Flow finished with result $result" }
}
private fun createTransaction() {
@ -134,27 +140,35 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
otherParty: Party,
payload: Any,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." }
val session = getConfirmedSession(otherParty, sessionFlow)
return if (session == null) {
val sessionData = if (session == null) {
val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true)
// Only do a receive here as the session init has carried the payload
receiveInternal<SessionData>(newSession, receiveType)
} else {
sendAndReceiveInternal<SessionData>(session, createSessionData(session, payload), receiveType)
}.checkPayloadIs(receiveType)
val sendData = createSessionData(session, payload)
sendAndReceiveInternal<SessionData>(session, sendData, receiveType)
}
logger.debug { "Received ${sessionData.message.payload.toString().abbreviate(300)}" }
return sessionData.checkPayloadIs(receiveType)
}
@Suspendable
override fun <T : Any> receive(receiveType: Class<T>,
otherParty: Party,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
logger.debug { "receive(${receiveType.name}, $otherParty) ..." }
val session = getConfirmedSession(otherParty, sessionFlow) ?:
startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
return receiveInternal<SessionData>(session, receiveType).checkPayloadIs(receiveType)
val sessionData = receiveInternal<SessionData>(session, receiveType)
logger.debug { "Received ${sessionData.message.payload.toString().abbreviate(300)}" }
return sessionData.checkPayloadIs(receiveType)
}
@Suspendable
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
logger.debug { "send($otherParty, ${payload.toString().abbreviate(300)})" }
val session = getConfirmedSession(otherParty, sessionFlow)
if (session == null) {
// Don't send the payload again if it was already piggy-backed on a session init
@ -164,6 +178,27 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
@Suspendable
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
logger.debug { "waitForLedgerCommit($hash) ..." }
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash)
if (stx != null) {
logger.debug { "Transaction $hash committed to ledger" }
return stx
}
// If the tx isn't committed then we may have been resumed due to an session ending in an error
for (session in openSessions.values) {
for (receivedMessage in session.receivedMessages) {
if (receivedMessage.message is ErrorSessionEnd) {
session.erroredEnd(receivedMessage.message)
}
}
}
throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
}
/**
* This method will suspend the state machine and wait for incoming session init response from other party.
*/
@ -178,23 +213,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
@Suspendable
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
logger.info("Waiting for transaction $hash to commit")
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash)
if (stx != null) return stx
// If the tx isn't committed then we may have been resumed due to an session ending in an error
for (session in openSessions.values) {
for (receivedMessage in session.receivedMessages) {
if (receivedMessage.message is ErrorSessionEnd) {
session.erroredEnd(receivedMessage.message)
}
}
}
throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
}
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val sessionState = session.state
val peerSessionId = when (sessionState) {
@ -212,14 +230,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private inline fun <reified M : ExistingSessionMessage> receiveInternal(
session: FlowSession,
userReceiveType: Class<*>?): ReceivedSessionMessage<M> {
return waitForMessage(ReceiveOnly(session, M::class.java), userReceiveType)
return waitForMessage(ReceiveOnly(session, M::class.java, userReceiveType))
}
private inline fun <reified M : ExistingSessionMessage> sendAndReceiveInternal(
session: FlowSession,
message: SessionMessage,
userReceiveType: Class<*>?): ReceivedSessionMessage<M> {
return waitForMessage(SendAndReceive(session, message, M::class.java), userReceiveType)
return waitForMessage(SendAndReceive(session, message, M::class.java, userReceiveType))
}
@Suspendable
@ -253,11 +271,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
private fun <M : ExistingSessionMessage> waitForMessage(
receiveRequest: ReceiveRequest<M>,
userReceiveType: Class<*>?): ReceivedSessionMessage<M> {
val receivedMessage = receiveRequest.suspendAndExpectReceive()
return receivedMessage.confirmReceiveType(receiveRequest, userReceiveType)
private fun <M : ExistingSessionMessage> waitForMessage(receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> {
return receiveRequest.suspendAndExpectReceive().confirmReceiveType(receiveRequest)
}
@Suspendable
@ -280,8 +296,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
private fun <M : ExistingSessionMessage> ReceivedSessionMessage<*>.confirmReceiveType(
receiveRequest: ReceiveRequest<M>,
userReceiveType: Class<*>?): ReceivedSessionMessage<M> {
receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> {
val session = receiveRequest.session
val receiveType = receiveRequest.receiveType
if (receiveType.isInstance(message)) {
@ -292,7 +307,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
if (message is ErrorSessionEnd) {
session.erroredEnd(message)
} else {
val expectedType = userReceiveType?.name ?: receiveType.simpleName
val expectedType = receiveRequest.userReceiveType?.name ?: receiveType.simpleName
throw FlowSessionException("Counterparty flow on ${session.state.sendToParty} has completed without " +
"sending a $expectedType")
}
@ -310,7 +325,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
throw FlowSessionException("Counterparty flow on ${state.sendToParty} had an internal error and has terminated")
}
}
@Suspendable
private fun suspend(ioRequest: FlowIORequest) {
// We have to pass the thread local database transaction across via a transient field as the fiber park
@ -321,7 +336,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
waitingForResponse = ioRequest
var exceptionDuringSuspend: Throwable? = null
parkAndSerialize { fiber, serializer ->
parkAndSerialize { f, s ->
logger.trace { "Suspended on $ioRequest" }
// restore the Tx onto the ThreadLocal so that we can commit the ensuing checkpoint to the DB
try {
@ -332,6 +347,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// Quasar does not terminate the fiber properly if an exception occurs during a suspend. We have to
// resume the fiber just so that we can throw it when it's running.
exceptionDuringSuspend = t
logger.trace("Resuming so fiber can it terminate with the exception thrown during suspend process", t)
resume(scheduler)
}
}
@ -353,7 +369,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logger.trace("Started")
start()
} else {
logger.trace("Resumed")
Fiber.unpark(this, QUASAR_UNBLOCKER)
}
} catch (t: Throwable) {

View File

@ -1,6 +1,5 @@
package net.corda.node.services.statemachine
import net.corda.core.abbreviate
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowException
import net.corda.core.utilities.UntrustworthyData
@ -14,9 +13,7 @@ interface ExistingSessionMessage : SessionMessage {
}
data class SessionData(override val recipientSessionId: Long, val payload: Any) : ExistingSessionMessage {
override fun toString(): String {
return "${javaClass.simpleName}(recipientSessionId=$recipientSessionId, payload=${payload.toString().abbreviate(100)})"
}
override fun toString(): String = "${javaClass.simpleName}(recipientSessionId=$recipientSessionId, payload=$payload)"
}
interface SessionInitResponse : ExistingSessionMessage {

View File

@ -31,8 +31,6 @@ import net.corda.core.utilities.trace
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiated
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiating
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
@ -40,8 +38,6 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutionException
import javax.annotation.concurrent.ThreadSafe
/**
@ -79,6 +75,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
companion object {
private val logger = loggerFor<StateMachineManager>()
internal val sessionTopic = TopicSession("platform.session")
init {
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.error("Caught exception from flow", throwable)
}
}
}
val scheduler = FiberScheduler()
@ -148,12 +150,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*/
val changes: Observable<Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
init {
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.error("Caught exception from flow", throwable)
}
}
fun start() {
restoreFibersFromCheckpoints()
listenToLedgerTransactions()
@ -168,7 +164,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (fibers.isNotEmpty()) {
executor.executeASAP {
for (fiber in fibers) {
fiber.logger.info("Transaction $hash has committed to the ledger, resuming")
fiber.logger.trace { "Transaction $hash has committed to the ledger, resuming" }
fiber.waitingForResponse = null
resumeFiber(fiber)
}
@ -246,7 +242,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
serviceHub.storageService.validatedTransactions.getTransaction(waitingForResponse.hash)
}
if (stx != null) {
fiber.logger.info("Resuming fiber as tx ${waitingForResponse.hash} has committed.")
fiber.logger.info("Resuming fiber as tx ${waitingForResponse.hash} has committed")
fiber.waitingForResponse = null
resumeFiber(fiber)
} else {
@ -288,14 +284,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// before then.
session.fiber.waitingForResponse = null
updateCheckpoint(session.fiber)
session.fiber.logger.debug { "About to resume due to $message" }
session.fiber.logger.trace { "Resuming due to $message" }
resumeFiber(session.fiber)
}
} else {
val peerParty = recentlyClosedSessions.remove(message.recipientSessionId)
if (peerParty != null) {
if (message is SessionConfirm) {
logger.debug { "Received session confirmation but associated fiber has already terminated, so sending session end" }
logger.trace { "Received session confirmation but associated fiber has already terminated, so sending session end" }
sendSessionMessage(peerParty, NormalSessionEnd(message.initiatedSessionId))
} else {
logger.trace { "Ignoring session end message for already closed session: $message" }
@ -315,7 +311,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun onSessionInit(sessionInit: SessionInit, sender: Party) {
logger.trace { "Received $sessionInit $sender" }
logger.trace { "Received $sessionInit from $sender" }
val otherPartySessionId = sessionInit.initiatorSessionId
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
@ -347,14 +343,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
updateCheckpoint(fiber)
session
} catch (e: Exception) {
logger.warn("Couldn't start session for $sessionInit", e)
logger.warn("Couldn't start flow session from $sessionInit", e)
sendSessionReject("Unable to establish session")
return
}
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber)
session.fiber.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(session.fiber)
session.fiber.logger.debug { "Initiated by $sender using marker ${markerClass.name}" }
session.fiber.logger.trace { "Initiated from $sessionInit on $session" }
resumeFiber(session.fiber)
}
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
@ -426,7 +423,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun FlowSession.endSession(exception: Throwable?, propagated: Boolean) {
val initiatedState = state as? Initiated ?: return
val initiatedState = state as? FlowSessionState.Initiated ?: return
val sessionEnd = if (exception == null) {
NormalSessionEnd(initiatedState.peerSessionId)
} else {
@ -443,25 +440,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
recentlyClosedSessions[ourSessionId] = initiatedState.peerParty
}
private fun startFiber(fiber: FlowStateMachineImpl<*>) {
try {
resumeFiber(fiber)
} catch (e: ExecutionException) {
// There are two ways we can take exceptions in this method:
//
// 1) A bug in the SMM code itself whilst setting up the new flow. In that case the exception will
// propagate out of this method as it would for any method.
//
// 2) An exception in the first part of the fiber after it's been invoked for the first time via
// fiber.start(). In this case the exception will be caught and stashed in the flow logic future,
// then sent to the unhandled exception handler above which logs it, and is then rethrown wrapped
// in an ExecutionException or RuntimeException+EE so we can just catch it here and ignore it.
} catch (e: RuntimeException) {
if (e.cause !is ExecutionException)
throw e
}
}
/**
* Kicks off a brand new state machine of the given class.
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
@ -484,7 +462,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// If we are not started then our checkpoint will be picked up during start
mutex.locked {
if (started) {
startFiber(fiber)
resumeFiber(fiber)
}
}
return fiber
@ -509,7 +487,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fiber.resume(scheduler)
}
} else {
fiber.logger.debug("Not resuming as SMM is stopping.")
fiber.logger.trace("Not resuming as SMM is stopping.")
decrementLiveFibers()
}
}
@ -551,40 +529,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
?: throw IllegalArgumentException("Don't know about party $party")
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
val logger = fiber?.logger ?: logger
logger.debug { "Sending $message to party $party, address: $address" }
logger.trace { "Sending $message to party $party @ $address" }
serviceHub.networkService.send(sessionTopic, message, address)
}
/**
* [FlowSessionState] describes the session's state.
*
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party] corresponding to either a
* specific peer or a service.
* [Initiated] is post-handshake. At this point [Initiating.otherParty] will have been resolved to a specific peer
* [Initiated.peerParty], and the peer's sessionId has been initialised.
*/
sealed class FlowSessionState {
abstract val sendToParty: Party
class Initiating(
val otherParty: Party /** This may be a specific peer or a service party */
) : FlowSessionState() {
override val sendToParty: Party get() = otherParty
}
class Initiated(
val peerParty: Party, /** This must be a peer party */
val peerSessionId: Long
) : FlowSessionState() {
override val sendToParty: Party get() = peerParty
}
}
data class FlowSession(
val flow: FlowLogic<*>,
val ourSessionId: Long,
val initiatingParty: Party?,
var state: FlowSessionState)
{
val receivedMessages = ConcurrentLinkedQueue<ReceivedSessionMessage<*>>()
val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*>
}
}

View File

@ -5,6 +5,7 @@ import net.corda.core.div
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
import org.slf4j.event.Level
import java.nio.file.Paths
class ArgsParserTest {
@ -18,65 +19,59 @@ class ArgsParserTest {
configFile = workingDirectory / "node.conf",
help = false,
logToConsole = false,
loggingLevel = Level.INFO,
isWebserver = false))
}
@Test
fun `just base-directory with relative path`() {
fun `base-directory with relative path`() {
val expectedBaseDir = Paths.get("tmp").normalize().toAbsolutePath()
val cmdLineOptions = parser.parse("--base-directory", "tmp")
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = expectedBaseDir,
configFile = expectedBaseDir / "node.conf",
help = false,
logToConsole = false,
isWebserver = false))
assertThat(cmdLineOptions.baseDirectory).isEqualTo(expectedBaseDir)
assertThat(cmdLineOptions.configFile).isEqualTo(expectedBaseDir / "node.conf")
}
@Test
fun `just base-directory with absolute path`() {
fun `base-directory with absolute path`() {
val baseDirectory = Paths.get("tmp").normalize().toAbsolutePath()
val cmdLineOptions = parser.parse("--base-directory", baseDirectory.toString())
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = baseDirectory,
configFile = baseDirectory / "node.conf",
help = false,
logToConsole = false,
isWebserver = false))
assertThat(cmdLineOptions.baseDirectory).isEqualTo(baseDirectory)
assertThat(cmdLineOptions.configFile).isEqualTo(baseDirectory / "node.conf")
}
@Test
fun `just config-file with relative path`() {
fun `config-file with relative path`() {
val cmdLineOptions = parser.parse("--config-file", "different.conf")
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = workingDirectory,
configFile = workingDirectory / "different.conf",
help = false,
logToConsole = false,
isWebserver = false))
assertThat(cmdLineOptions.baseDirectory).isEqualTo(workingDirectory)
assertThat(cmdLineOptions.configFile).isEqualTo(workingDirectory / "different.conf")
}
@Test
fun `just config-file with absolute path`() {
fun `config-file with absolute path`() {
val configFile = Paths.get("tmp", "a.conf").normalize().toAbsolutePath()
val cmdLineOptions = parser.parse("--config-file", configFile.toString())
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = workingDirectory,
configFile = configFile,
help = false,
logToConsole = false,
isWebserver = false))
assertThat(cmdLineOptions.baseDirectory).isEqualTo(workingDirectory)
assertThat(cmdLineOptions.configFile).isEqualTo(configFile)
}
@Test
fun `just webserver `() {
fun `log-to-console`() {
val cmdLineOptions = parser.parse("--log-to-console")
assertThat(cmdLineOptions.logToConsole).isTrue()
}
@Test
fun `logging-level`() {
for (level in Level.values()) {
val cmdLineOptions = parser.parse("--logging-level", level.name)
assertThat(cmdLineOptions.loggingLevel).isEqualTo(level)
}
}
@Test
fun `webserver`() {
val cmdLineOptions = parser.parse("--webserver")
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = workingDirectory,
configFile = workingDirectory / "node.conf",
help = false,
logToConsole = false,
isWebserver = true))
assertThat(cmdLineOptions.isWebserver).isTrue()
}
@Test
@ -99,4 +94,18 @@ class ArgsParserTest {
parser.parse("--config-file")
}.withMessageContaining("config-file")
}
@Test
fun `logging-level without argument`() {
assertThatExceptionOfType(OptionException::class.java).isThrownBy {
parser.parse("--logging-level")
}.withMessageContaining("logging-level")
}
@Test
fun `logging-level with invalid argument`() {
assertThatExceptionOfType(OptionException::class.java).isThrownBy {
parser.parse("--logging-level", "not-a-level")
}.withMessageContaining("logging-level")
}
}

View File

@ -2,7 +2,6 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand.UncaughtExceptionHandler
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.DummyState
@ -17,11 +16,11 @@ import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.random63BitValue
import net.corda.core.rootCause
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.LogHelper
import net.corda.core.utilities.unwrap
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
@ -53,6 +52,12 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class StateMachineManagerTests {
companion object {
init {
LogHelper.setLevel("+net.corda.flow")
}
}
private val net = MockNetwork(servicePeerAllocationStrategy = RoundRobin())
private val sessionTransfers = ArrayList<SessionTransfer>()
private lateinit var node1: MockNode
@ -111,17 +116,11 @@ class StateMachineManagerTests {
fiber.actionOnSuspend = {
throw exceptionDuringSuspend
}
var uncaughtException: Throwable? = null
fiber.uncaughtExceptionHandler = UncaughtExceptionHandler { f, e ->
uncaughtException = e
}
net.runNetwork()
assertThatThrownBy {
fiber.resultFuture.getOrThrow()
}.isSameAs(exceptionDuringSuspend)
assertThat(node1.smm.allStateMachines).isEmpty()
// Make sure it doesn't get swallowed up
assertThat(uncaughtException?.rootCause).isSameAs(exceptionDuringSuspend)
// Make sure the fiber does actually terminate
assertThat(fiber.isTerminated).isTrue()
}