mirror of
synced 2025-03-19 18:45:28 +00:00
Merge remote-tracking branch 'remotes/open/master' into merges/os-2018-06-19-2dded2a
This commit is contained in:
@ -11,15 +11,15 @@
package net.corda.client.jfx.model
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableMap
import net.corda.client.jfx.utils.*
import net.corda.client.jfx.utils.distinctBy
import net.corda.client.jfx.utils.lift
import net.corda.client.jfx.utils.map
import net.corda.client.jfx.utils.recordInSequence
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import org.fxmisc.easybind.EasyBind
* [PartiallyResolvedTransaction] holds a [SignedTransaction] that has zero or more inputs resolved. The intent is
@ -53,38 +53,36 @@ data class PartiallyResolvedTransaction(
companion object {
fun fromSignedTransaction(
transaction: SignedTransaction,
stateMap: ObservableMap<StateRef, StateAndRef<ContractState>>
) = PartiallyResolvedTransaction(
transaction = transaction,
inputs = transaction.inputs.map { stateRef ->
EasyBind.map(stateMap.getObservableValue(stateRef)) {
if (it == null) {
inputTransactions: Map<StateRef, SignedTransaction?>
): PartiallyResolvedTransaction {
return PartiallyResolvedTransaction(
transaction = transaction,
inputs = transaction.inputs.map { stateRef ->
val tx = inputTransactions.get(stateRef)
if (tx == null) {
} else {
outputs = if (transaction.coreTransaction is WireTransaction) {
transaction.tx.outRefsOfType<ContractState>().map {
outputs = if (transaction.coreTransaction is WireTransaction) {
transaction.tx.outRefsOfType<ContractState>().map {
} else {
// Transaction will have the same number of outputs as inputs
val outputCount = transaction.coreTransaction.inputs.size
val stateRefs = (0 until outputCount).map { StateRef(transaction.id, it) }
stateRefs.map { stateRef ->
EasyBind.map(stateMap.getObservableValue(stateRef)) {
if (it == null) {
} else {
// Transaction will have the same number of outputs as inputs
val outputCount = transaction.coreTransaction.inputs.size
val stateRefs = (0 until outputCount).map { StateRef(transaction.id, it) }
stateRefs.map { stateRef ->
val tx = inputTransactions.get(stateRef)
if (tx == null) {
} else {
@ -94,13 +92,12 @@ data class PartiallyResolvedTransaction(
class TransactionDataModel {
private val transactions by observable(NodeMonitorModel::transactions)
private val collectedTransactions = transactions.recordInSequence().distinctBy { it.id }
private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates)
private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap<StateRef, StateAndRef<ContractState>>()) { map, (consumed, produced) ->
val states = consumed + produced
states.forEach { map[it.ref] = it }
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)
val partiallyResolvedTransactions = collectedTransactions.map {
PartiallyResolvedTransaction.fromSignedTransaction(it, stateMap)
it.inputs.map { stateRef ->
stateRef to rpcProxy.value!!.cordaRPCOps.internalFindVerifiedTransaction(stateRef.txhash)
@ -1,3 +1,4 @@
# R3 Proprietary and Confidential
@ -204,6 +204,14 @@ interface CordaRPCOps : RPCOps {
@Deprecated("This method is intended only for internal use and will be removed from the public API soon.")
fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction>
* @suppress Returns the full transaction for the provided ID
* TODO This method should be removed once SGX work is finalised and the design of the corresponding API using [FilteredTransaction] can be started
@Deprecated("This method is intended only for internal use and will be removed from the public API soon.")
fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction?
* @suppress Returns a data feed of all recorded transactions and an observable of future recorded ones.
@ -8,6 +8,8 @@ Unreleased
* Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code.
* Added a ``FlowMonitor`` to log information about flows that have been waiting for IO more than a configurable threshold.
* H2 database changes:
* The node's H2 database now listens on ``localhost`` by default.
* The database server address must also be enabled in the node configuration.
@ -275,6 +275,10 @@ absolute path to the node's base directory.
which indicates that the issuer of the TLS certificate is also the issuer of the CRL.
Note: If this parameter is set then the tlsCertCrlDistPoint needs to be set as well.
:flowMonitorPeriodMillis: ``Duration`` of the period suspended flows waiting for IO are logged. Default value is ``60 seconds``.
:flowMonitorSuspensionLoggingThresholdMillis: Threshold ``Duration`` suspended flows waiting for IO need to exceed before they are logged. Default value is ``60 seconds``.
@ -117,10 +117,26 @@ Following the previous example ``PartyB`` node will have additional configuratio
// Grants user1 the ability to start the MyFlow flow.
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["StartFlow.net.corda.flows.MyFlow"]]]
configFile = "samples/trader-demo/src/main/resources/none-b.conf"
configFile = "samples/trader-demo/src/main/resources/node-b.conf"
Cordform parameter `drivers` of the `node` entry lists paths of the files to be copied to the `./drivers` subdirectory of the node.
To copy the same file to all nodes `ext.drivers` can be defined in the top level and reused for each node via `drivers=ext.drivers``.
.. sourcecode:: groovy
task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar']) {
ext.drivers = ['lib/my_common_jar.jar']
node {
name "O=PartyB,L=New York,C=US"
drivers = ext.drivers + ['lib/my_specific_jar.jar']
Specifying a custom webserver
By default, any node listing a webport will use the default development webserver, which is not production-ready. You
@ -12,7 +12,7 @@ to time. You can have logging printed to the console as well by passing the ``--
The default logging level is ``INFO`` which can be adjusted by the ``--logging-level`` command line argument. This configuration
option will affect all modules.
It may be the case that you require to amend the log level of a particular subset of modules (e.g. if you'd like to take a
It may be the case that you require to amend the log level of a particular subset of modules (e.g., if you'd like to take a
closer look at hibernate activity). So, for more bespoke logging configuration, the logger settings can be completely overridden
with a `Log4j 2 <https://logging.apache.org/log4j/2.x>`_ configuration file assigned to the ``log4j.configurationFile`` system property.
@ -46,7 +46,7 @@ Now start the node as usual but with the additional parameter ``log4j.configurat
``java <Your existing startup options here> -Dlog4j.configurationFile=sql.xml -jar corda.jar``
To determine the name of the logger, for Corda objects, use the fully qualified name (e.g. to look at node output
To determine the name of the logger, for Corda objects, use the fully qualified name (e.g., to look at node output
in more detail, use ``net.corda.node.internal.Node`` although be aware that as we have marked this class ``internal`` we
reserve the right to move and rename it as it's not part of the public API as yet). For other libraries, refer to their
logging name construction. If you can't find what you need to refer to, use the ``--logging-level`` option as above and
@ -158,3 +158,40 @@ node is running out of memory, you can give it more by running the node like thi
The example command above would give a 1 gigabyte Java heap.
.. note:: Unfortunately the JVM does not let you limit the total memory usage of Java program, just the heap size.
Backup recommendations
Various components of the Corda platform read their configuration from the file system, and persist data to a database or into files on disk.
Given that hardware can fail, operators of IT infrastructure must have a sound backup strategy in place. Whilst blockchain platforms can sometimes recover some lost data from their peers, it is rarely the case that a node can recover its full state in this way because real-world blockchain applications invariably contain private information (e.g., customer account information). Moreover, this private information must remain in sync with the ledger state. As such, we strongly recommend implementing a comprehensive backup strategy.
The following elements of a backup strategy are recommended:
Database replication
When properly configured, database replication prevents data loss from occurring in case the database host fails.
In general, the higher the number of replicas, and the further away they are deployed in terms of regions and availability zones, the more a setup is resilient to disasters.
The trade-off is that, ideally, replication should happen synchronously, meaning that a high number of replicas and a considerable network latency will impact the performance of the Corda nodes connecting to the cluster.
Synchronous replication is strongly advised to prevent data loss.
Database snapshots
Database replication is a powerful technique, but it is very sensitive to destructive SQL updates. Whether malicious or unintentional, a SQL statement might compromise data by getting propagated to all replicas.
Without rolling snapshots, data loss due to such destructive updates will be irreversible.
Using snapshots always implies some data loss in case of a disaster, and the trade-off is between highly frequent backups minimising such a loss, and less frequent backups consuming less resources.
At present, Corda does not offer online updates with regards to transactions.
Should states in the vault ever be lost, partial or total recovery might be achieved by asking third-party companies and/or notaries to provide all data relevant to the affected legal identity.
File backups
Corda components read and write information from and to the file-system. The advice is to backup the entire root directory of the component, plus any external directories and files optionally specified in the configuration.
Corda assumes the filesystem is reliable. You must ensure that it is configured to provide this assurance, which means you must configure it to synchronously replicate to your backup/DR site.
If the above holds, Corda components will benefit from the following:
* Guaranteed eventual processing of acknowledged client messages, provided that the backlog of persistent queues is not lost irremediably.
* A timely recovery from deletion or corruption of configuration files (e.g., ``node.conf``, ``node-info`` files, etc.), database drivers, CorDapps binaries and configuration, and certificate directories, provided backups are available to restore from.
.. warning:: Private keys used to sign transactions should be preserved with the utmost care. The recommendation is to keep at least two separate copies on a storage not connected to the Internet.
@ -15,7 +15,7 @@ A Corda node has the following structure:
├── corda-webserver.jar // The built-in node webserver
├── corda.jar // The core Corda libraries
├── cordapps // The CorDapp JARs installed on the node
├── drivers // Contains a Jolokia driver used to export JMX metrics
├── drivers // Contains a Jolokia driver used to export JMX metrics, the node loads any additional JAR files from this directory at startup.
├── logs // The node logs
├── network-parameters // The network parameters automatically downloaded from the network map server
├── node.conf // The node's configuration files
@ -412,6 +412,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
(smm as? StateMachineManagerInternal)?.let {
val flowMonitor = FlowMonitor(smm::snapshot, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis)
runOnStop += { flowMonitor.stop() }
_started = this
@ -118,6 +118,8 @@ internal class CordaRPCOpsImpl(
return snapshot
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId)
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return services.validatedTransactions.track()
@ -37,6 +37,9 @@ import java.util.*
val Int.MB: Long get() = this * 1024L * 1024L
private val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1)
interface NodeConfiguration : NodeSSLConfiguration {
val myLegalName: CordaX500Name
val emailAddress: String
@ -77,6 +80,9 @@ interface NodeConfiguration : NodeSSLConfiguration {
val tlsCertCrlDistPoint: URL?
val tlsCertCrlIssuer: String?
val effectiveH2Settings: NodeH2Settings?
val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS
val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
fun validate(): List<String>
companion object {
@ -252,7 +258,9 @@ data class NodeConfigurationImpl(
private val h2port: Int? = null,
private val h2Settings: NodeH2Settings? = null,
// do not use or remove (used by Capsule)
private val jarDirs: List<String> = emptyList()
private val jarDirs: List<String> = emptyList(),
override val flowMonitorPeriodMillis: Duration = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS,
override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
) : NodeConfiguration {
companion object {
private val logger = loggerFor<NodeConfigurationImpl>()
@ -0,0 +1,89 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.FlowSession
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.LifecycleSupport
import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
internal class FlowMonitor constructor(private val retrieveFlows: () -> Set<FlowStateMachineImpl<*>>,
private val monitoringPeriod: Duration,
private val suspensionLoggingThreshold: Duration,
private var scheduler: ScheduledExecutorService? = null) : LifecycleSupport {
private companion object {
private fun defaultScheduler(): ScheduledExecutorService {
return Executors.newSingleThreadScheduledExecutor()
private val logger = loggerFor<FlowMonitor>()
override var started = false
private var shutdownScheduler = false
override fun start() {
synchronized(this) {
if (scheduler == null) {
scheduler = defaultScheduler()
shutdownScheduler = true
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty(suspensionLoggingThreshold) }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
started = true
override fun stop() {
synchronized(this) {
if (shutdownScheduler) {
started = false
private fun logFlowsWaitingForParty(suspensionLoggingThreshold: Duration) {
val now = Instant.now()
val flows = retrieveFlows()
for (flow in flows) {
if (flow.isStarted() && flow.ongoingDuration(now) >= suspensionLoggingThreshold) {
flow.ioRequest()?.let { request -> warningMessageForFlowWaitingOnIo(request, flow, now) }?.let(logger::info)
private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String {
val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ")
when (request) {
is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}"
is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}"
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
return message.toString()
private fun Iterable<FlowSession>.partiesInvolved() = map { it.counterparty }.joinToString(", ", "[", "]")
private fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest
private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant) = Duration.between(createdAt(), now)
private fun FlowStateMachineImpl<*>.createdAt() = context.trace.invocationId.timestamp
private fun FlowStateMachineImpl<*>.isStarted() = transientState?.value?.checkpoint?.flowState is FlowState.Started
@ -152,6 +152,8 @@ class SingleThreadedStateMachineManager(
override fun snapshot(): Set<FlowStateMachineImpl<*>> = mutex.content.flows.values.map { it.fiber }.toSet()
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked {
flows.values.mapNotNull {
@ -94,6 +94,11 @@ interface StateMachineManager {
fun deliverExternalEvent(event: ExternalEvent)
val flowHospital: StaffedFlowHospital
* Returns a snapshot of all [FlowStateMachineImpl]s currently managed.
fun snapshot(): Set<FlowStateMachineImpl<*>>
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call
Reference in New Issue
Block a user