diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt index 95482e8dfb..5865e7dffd 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt @@ -11,15 +11,15 @@ package net.corda.client.jfx.model import javafx.beans.value.ObservableValue -import javafx.collections.FXCollections -import javafx.collections.ObservableMap -import net.corda.client.jfx.utils.* +import net.corda.client.jfx.utils.distinctBy +import net.corda.client.jfx.utils.lift +import net.corda.client.jfx.utils.map +import net.corda.client.jfx.utils.recordInSequence import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction -import org.fxmisc.easybind.EasyBind /** * [PartiallyResolvedTransaction] holds a [SignedTransaction] that has zero or more inputs resolved. The intent is @@ -53,38 +53,36 @@ data class PartiallyResolvedTransaction( companion object { fun fromSignedTransaction( transaction: SignedTransaction, - stateMap: ObservableMap> - ) = PartiallyResolvedTransaction( - transaction = transaction, - inputs = transaction.inputs.map { stateRef -> - EasyBind.map(stateMap.getObservableValue(stateRef)) { - if (it == null) { + inputTransactions: Map + ): PartiallyResolvedTransaction { + return PartiallyResolvedTransaction( + transaction = transaction, + inputs = transaction.inputs.map { stateRef -> + val tx = inputTransactions.get(stateRef) + if (tx == null) { InputResolution.Unresolved(stateRef) } else { - InputResolution.Resolved(it) + InputResolution.Resolved(tx.coreTransaction.outRef(stateRef.index)) + }.lift() + }, + outputs = if (transaction.coreTransaction is WireTransaction) { + transaction.tx.outRefsOfType().map { + OutputResolution.Resolved(it).lift() } - } - }, - outputs = if (transaction.coreTransaction is WireTransaction) { - transaction.tx.outRefsOfType().map { - OutputResolution.Resolved(it).lift() - } - } else { - // Transaction will have the same number of outputs as inputs - val outputCount = transaction.coreTransaction.inputs.size - val stateRefs = (0 until outputCount).map { StateRef(transaction.id, it) } - stateRefs.map { stateRef -> - EasyBind.map(stateMap.getObservableValue(stateRef)) { - if (it == null) { + } else { + // Transaction will have the same number of outputs as inputs + val outputCount = transaction.coreTransaction.inputs.size + val stateRefs = (0 until outputCount).map { StateRef(transaction.id, it) } + stateRefs.map { stateRef -> + val tx = inputTransactions.get(stateRef) + if (tx == null) { OutputResolution.Unresolved(stateRef) } else { - OutputResolution.Resolved(it) - } + OutputResolution.Resolved(tx.coreTransaction.outRef(stateRef.index)) + }.lift() } - } - } - - ) + }) + } } } @@ -94,13 +92,12 @@ data class PartiallyResolvedTransaction( class TransactionDataModel { private val transactions by observable(NodeMonitorModel::transactions) private val collectedTransactions = transactions.recordInSequence().distinctBy { it.id } - private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates) - private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap>()) { map, (consumed, produced) -> - val states = consumed + produced - states.forEach { map[it.ref] = it } - } + private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable) val partiallyResolvedTransactions = collectedTransactions.map { - PartiallyResolvedTransaction.fromSignedTransaction(it, stateMap) + PartiallyResolvedTransaction.fromSignedTransaction(it, + it.inputs.map { stateRef -> + stateRef to rpcProxy.value!!.cordaRPCOps.internalFindVerifiedTransaction(stateRef.txhash) + }.toMap()) } } diff --git a/constants.properties b/constants.properties index ced3c2015d..71a16a0968 100644 --- a/constants.properties +++ b/constants.properties @@ -1,3 +1,4 @@ +gradlePluginsVersion=4.0.24 # # R3 Proprietary and Confidential # diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index fd40445857..264c80b0b0 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -204,6 +204,14 @@ interface CordaRPCOps : RPCOps { @Deprecated("This method is intended only for internal use and will be removed from the public API soon.") fun internalVerifiedTransactionsSnapshot(): List + /** + * @suppress Returns the full transaction for the provided ID + * + * TODO This method should be removed once SGX work is finalised and the design of the corresponding API using [FilteredTransaction] can be started + */ + @Deprecated("This method is intended only for internal use and will be removed from the public API soon.") + fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? + /** * @suppress Returns a data feed of all recorded transactions and an observable of future recorded ones. * diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index d5293646e9..0d5324a87f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -8,6 +8,8 @@ Unreleased ========== * Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code. +* Added a ``FlowMonitor`` to log information about flows that have been waiting for IO more than a configurable threshold. + * H2 database changes: * The node's H2 database now listens on ``localhost`` by default. * The database server address must also be enabled in the node configuration. diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 813d5f0341..8fd534fc84 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -275,6 +275,10 @@ absolute path to the node's base directory. which indicates that the issuer of the TLS certificate is also the issuer of the CRL. Note: If this parameter is set then the tlsCertCrlDistPoint needs to be set as well. +:flowMonitorPeriodMillis: ``Duration`` of the period suspended flows waiting for IO are logged. Default value is ``60 seconds``. + +:flowMonitorSuspensionLoggingThresholdMillis: Threshold ``Duration`` suspended flows waiting for IO need to exceed before they are logged. Default value is ``60 seconds``. + Examples -------- diff --git a/docs/source/generating-a-node.rst b/docs/source/generating-a-node.rst index ea189a1870..749501fd5d 100644 --- a/docs/source/generating-a-node.rst +++ b/docs/source/generating-a-node.rst @@ -117,10 +117,26 @@ Following the previous example ``PartyB`` node will have additional configuratio [...] // Grants user1 the ability to start the MyFlow flow. rpcUsers = [[ user: "user1", "password": "test", "permissions": ["StartFlow.net.corda.flows.MyFlow"]]] - configFile = "samples/trader-demo/src/main/resources/none-b.conf" + configFile = "samples/trader-demo/src/main/resources/node-b.conf" } } +Cordform parameter `drivers` of the `node` entry lists paths of the files to be copied to the `./drivers` subdirectory of the node. +To copy the same file to all nodes `ext.drivers` can be defined in the top level and reused for each node via `drivers=ext.drivers``. + +.. sourcecode:: groovy + + task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar']) { + ext.drivers = ['lib/my_common_jar.jar'] + [...] + node { + name "O=PartyB,L=New York,C=US" + [...] + drivers = ext.drivers + ['lib/my_specific_jar.jar'] + } + } + + Specifying a custom webserver ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ By default, any node listing a webport will use the default development webserver, which is not production-ready. You diff --git a/docs/source/node-administration.rst b/docs/source/node-administration.rst index 4fbd3a2243..39fe2dcf76 100644 --- a/docs/source/node-administration.rst +++ b/docs/source/node-administration.rst @@ -12,7 +12,7 @@ to time. You can have logging printed to the console as well by passing the ``-- The default logging level is ``INFO`` which can be adjusted by the ``--logging-level`` command line argument. This configuration option will affect all modules. -It may be the case that you require to amend the log level of a particular subset of modules (e.g. if you'd like to take a +It may be the case that you require to amend the log level of a particular subset of modules (e.g., if you'd like to take a closer look at hibernate activity). So, for more bespoke logging configuration, the logger settings can be completely overridden with a `Log4j 2 `_ configuration file assigned to the ``log4j.configurationFile`` system property. @@ -46,7 +46,7 @@ Now start the node as usual but with the additional parameter ``log4j.configurat ``java -Dlog4j.configurationFile=sql.xml -jar corda.jar`` -To determine the name of the logger, for Corda objects, use the fully qualified name (e.g. to look at node output +To determine the name of the logger, for Corda objects, use the fully qualified name (e.g., to look at node output in more detail, use ``net.corda.node.internal.Node`` although be aware that as we have marked this class ``internal`` we reserve the right to move and rename it as it's not part of the public API as yet). For other libraries, refer to their logging name construction. If you can't find what you need to refer to, use the ``--logging-level`` option as above and @@ -158,3 +158,40 @@ node is running out of memory, you can give it more by running the node like thi The example command above would give a 1 gigabyte Java heap. .. note:: Unfortunately the JVM does not let you limit the total memory usage of Java program, just the heap size. + +Backup recommendations +---------------------- + +Various components of the Corda platform read their configuration from the file system, and persist data to a database or into files on disk. +Given that hardware can fail, operators of IT infrastructure must have a sound backup strategy in place. Whilst blockchain platforms can sometimes recover some lost data from their peers, it is rarely the case that a node can recover its full state in this way because real-world blockchain applications invariably contain private information (e.g., customer account information). Moreover, this private information must remain in sync with the ledger state. As such, we strongly recommend implementing a comprehensive backup strategy. + +The following elements of a backup strategy are recommended: + +Database replication +++++++++++++++++++++ + +When properly configured, database replication prevents data loss from occurring in case the database host fails. +In general, the higher the number of replicas, and the further away they are deployed in terms of regions and availability zones, the more a setup is resilient to disasters. +The trade-off is that, ideally, replication should happen synchronously, meaning that a high number of replicas and a considerable network latency will impact the performance of the Corda nodes connecting to the cluster. +Synchronous replication is strongly advised to prevent data loss. + +Database snapshots +++++++++++++++++++ + +Database replication is a powerful technique, but it is very sensitive to destructive SQL updates. Whether malicious or unintentional, a SQL statement might compromise data by getting propagated to all replicas. +Without rolling snapshots, data loss due to such destructive updates will be irreversible. +Using snapshots always implies some data loss in case of a disaster, and the trade-off is between highly frequent backups minimising such a loss, and less frequent backups consuming less resources. +At present, Corda does not offer online updates with regards to transactions. +Should states in the vault ever be lost, partial or total recovery might be achieved by asking third-party companies and/or notaries to provide all data relevant to the affected legal identity. + +File backups +++++++++++++ + +Corda components read and write information from and to the file-system. The advice is to backup the entire root directory of the component, plus any external directories and files optionally specified in the configuration. +Corda assumes the filesystem is reliable. You must ensure that it is configured to provide this assurance, which means you must configure it to synchronously replicate to your backup/DR site. +If the above holds, Corda components will benefit from the following: + +* Guaranteed eventual processing of acknowledged client messages, provided that the backlog of persistent queues is not lost irremediably. +* A timely recovery from deletion or corruption of configuration files (e.g., ``node.conf``, ``node-info`` files, etc.), database drivers, CorDapps binaries and configuration, and certificate directories, provided backups are available to restore from. + +.. warning:: Private keys used to sign transactions should be preserved with the utmost care. The recommendation is to keep at least two separate copies on a storage not connected to the Internet. \ No newline at end of file diff --git a/docs/source/node-structure.rst b/docs/source/node-structure.rst index a098bcdfec..374548f8ee 100644 --- a/docs/source/node-structure.rst +++ b/docs/source/node-structure.rst @@ -15,7 +15,7 @@ A Corda node has the following structure: ├── corda-webserver.jar // The built-in node webserver ├── corda.jar // The core Corda libraries ├── cordapps // The CorDapp JARs installed on the node - ├── drivers // Contains a Jolokia driver used to export JMX metrics + ├── drivers // Contains a Jolokia driver used to export JMX metrics, the node loads any additional JAR files from this directory at startup. ├── logs // The node logs ├── network-parameters // The network parameters automatically downloaded from the network map server ├── node.conf // The node's configuration files diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ca05bce23c..77660a4ea5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -412,6 +412,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, smm.start(tokenizableServices) // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } + (smm as? StateMachineManagerInternal)?.let { + val flowMonitor = FlowMonitor(smm::snapshot, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis) + runOnStop += { flowMonitor.stop() } + flowMonitor.start() + } schedulerService.start() } _started = this diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index f883606ee8..3b8e30bd2d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -118,6 +118,8 @@ internal class CordaRPCOpsImpl( return snapshot } + override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId) + @Suppress("OverridingDeprecatedMember") override fun internalVerifiedTransactionsFeed(): DataFeed, SignedTransaction> { return services.validatedTransactions.track() diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index da6d35a3a3..80c0e4fa24 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -37,6 +37,9 @@ import java.util.* val Int.MB: Long get() = this * 1024L * 1024L +private val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1) +private val DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS: Duration = Duration.ofMinutes(1) + interface NodeConfiguration : NodeSSLConfiguration { val myLegalName: CordaX500Name val emailAddress: String @@ -77,6 +80,9 @@ interface NodeConfiguration : NodeSSLConfiguration { val tlsCertCrlDistPoint: URL? val tlsCertCrlIssuer: String? val effectiveH2Settings: NodeH2Settings? + val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS + val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS + fun validate(): List companion object { @@ -252,7 +258,9 @@ data class NodeConfigurationImpl( private val h2port: Int? = null, private val h2Settings: NodeH2Settings? = null, // do not use or remove (used by Capsule) - private val jarDirs: List = emptyList() + private val jarDirs: List = emptyList(), + override val flowMonitorPeriodMillis: Duration = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS, + override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS ) : NodeConfiguration { companion object { private val logger = loggerFor() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt new file mode 100644 index 0000000000..ac1b2a7938 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -0,0 +1,89 @@ +package net.corda.node.services.statemachine + +import net.corda.core.flows.FlowSession +import net.corda.core.internal.FlowIORequest +import net.corda.core.utilities.loggerFor +import net.corda.node.internal.LifecycleSupport +import java.time.Duration +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneId +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit + +internal class FlowMonitor constructor(private val retrieveFlows: () -> Set>, + private val monitoringPeriod: Duration, + private val suspensionLoggingThreshold: Duration, + private var scheduler: ScheduledExecutorService? = null) : LifecycleSupport { + + private companion object { + private fun defaultScheduler(): ScheduledExecutorService { + return Executors.newSingleThreadScheduledExecutor() + } + + private val logger = loggerFor() + } + + override var started = false + + private var shutdownScheduler = false + + override fun start() { + synchronized(this) { + if (scheduler == null) { + scheduler = defaultScheduler() + shutdownScheduler = true + } + scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty(suspensionLoggingThreshold) }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS) + started = true + } + } + + override fun stop() { + synchronized(this) { + if (shutdownScheduler) { + scheduler!!.shutdown() + } + started = false + } + } + + private fun logFlowsWaitingForParty(suspensionLoggingThreshold: Duration) { + val now = Instant.now() + val flows = retrieveFlows() + for (flow in flows) { + if (flow.isStarted() && flow.ongoingDuration(now) >= suspensionLoggingThreshold) { + flow.ioRequest()?.let { request -> warningMessageForFlowWaitingOnIo(request, flow, now) }?.let(logger::info) + } + } + } + + private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String { + val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ") + message.append( + when (request) { + is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}" + is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}" + is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}" + is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" + is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" + is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" + FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" + is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" + } + ) + message.append(".") + return message.toString() + } + + private fun Iterable.partiesInvolved() = map { it.counterparty }.joinToString(", ", "[", "]") + + private fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest + + private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant) = Duration.between(createdAt(), now) + + private fun FlowStateMachineImpl<*>.createdAt() = context.trace.invocationId.timestamp + + private fun FlowStateMachineImpl<*>.isStarted() = transientState?.value?.checkpoint?.flowState is FlowState.Started +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index e664ffc73a..8cc9b66dbc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -152,6 +152,8 @@ class SingleThreadedStateMachineManager( } } + override fun snapshot(): Set> = mutex.content.flows.values.map { it.fiber }.toSet() + override fun > findStateMachines(flowClass: Class): List>> { return mutex.locked { flows.values.mapNotNull { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index c69590f71f..c448c065ce 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -94,6 +94,11 @@ interface StateMachineManager { fun deliverExternalEvent(event: ExternalEvent) val flowHospital: StaffedFlowHospital + + /** + * Returns a snapshot of all [FlowStateMachineImpl]s currently managed. + */ + fun snapshot(): Set> } // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call