mirror of
synced 2025-03-19 18:45:28 +00:00
Merge opensource into master
This commit is contained in:
@ -1533,7 +1533,6 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
@org.jetbrains.annotations.NotNull public abstract List networkMapSnapshot()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo nodeInfo()
@org.jetbrains.annotations.Nullable public abstract net.corda.core.node.NodeInfo nodeInfoFromParty(net.corda.core.identity.AbstractParty)
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract rx.Observable nodeStateObservable()
@org.jetbrains.annotations.NotNull public abstract List notaryIdentities()
@org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party notaryPartyFromX500Name(net.corda.core.identity.CordaX500Name)
@org.jetbrains.annotations.NotNull public abstract java.io.InputStream openAttachment(net.corda.core.crypto.SecureHash)
@ -1613,11 +1612,6 @@ public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Obje
@net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.MessageRecipients
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.NodeState extends java.lang.Enum
protected <init>(String, int)
public static net.corda.core.messaging.NodeState valueOf(String)
public static net.corda.core.messaging.NodeState[] values()
@net.corda.core.DoNotImplement public interface net.corda.core.messaging.RPCOps
public abstract int getProtocolVersion()
@ -1714,7 +1708,6 @@ public @interface net.corda.core.messaging.RPCReturnsObservables
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo getMyInfo()
@org.jetbrains.annotations.NotNull public abstract rx.Observable getMyNodeStateObservable()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.NetworkMapCache getNetworkMapCache()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.TransactionStorage getValidatedTransactions()
@ -48,6 +48,7 @@ buildscript {
ext.dependency_checker_version = '3.0.1'
ext.commons_collections_version = '4.1'
ext.beanutils_version = '1.9.3'
ext.crash_version = 'faba68332800f21278c5b600bf14ad55cef5989e'
ext.spring_jdbc_version ='5.0.0.RELEASE'
// Update 121 is required for ObjectInputFilter and at time of writing 131 was latest:
@ -7,7 +7,7 @@ import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimap
import net.corda.client.jackson.StringToMethodCallParser.ParsedMethodCall
import net.corda.core.CordaException
import org.slf4j.LoggerFactory
import net.corda.core.utilities.contextLogger
import java.lang.reflect.Constructor
import java.lang.reflect.Method
import java.util.concurrent.Callable
@ -90,7 +90,7 @@ open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
return result
private val log = LoggerFactory.getLogger(StringToMethodCallParser::class.java)!!
private val log = contextLogger()
/** The methods that can be invoked via this parser. */
@ -6,16 +6,12 @@ import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.NodeState
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.User
import net.corda.testing.IntegrationTest
import net.corda.testing.driver.poll
import net.corda.testing.internal.*
@ -242,30 +238,6 @@ class RPCStabilityTests : IntegrationTest() {
fun `clients receive notifications that node is shutting down`() {
val alice = User("Alice", "Alice", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val bob = User("Bob", "Bob", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val slagathor = User("Slagathor", "Slagathor", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val userList = listOf(alice, bob, slagathor)
val expectedMessages = ArrayList<NodeState>()
rpcDriver(startNodesInProcess = true) {
val node = startNode(rpcUsers = listOf(alice, bob, slagathor)).getOrThrow()
userList.forEach {
val connection = node.rpcClientToNode().start(it.username, it.password)
val nodeStateObservable = connection.proxy.nodeStateObservable()
nodeStateObservable.subscribe { update ->
assertEquals(userList.size, expectedMessages.size)
assertEquals(NodeState.SHUTTING_DOWN, expectedMessages.first())
interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit>
@ -11,10 +11,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.core.utilities.*
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.RPCApi
@ -96,7 +93,7 @@ class RPCClient<I : RPCOps>(
) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext)
companion object {
private val log = loggerFor<RPCClient<*>>()
private val log = contextLogger()
fun start(
@ -22,10 +22,7 @@ import net.corda.core.internal.ThreadBox
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.serialize
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.*
import net.corda.nodeapi.ArtemisConsumer
import net.corda.nodeapi.ArtemisProducer
import net.corda.nodeapi.RPCApi
@ -91,7 +88,7 @@ class RPCClientProxyHandler(
private val lifeCycle = LifeCycle(State.UNSTARTED)
private companion object {
val log = loggerFor<RPCClientProxyHandler>()
private val log = contextLogger()
// To check whether toString() is being invoked
val toStringMethod: Method = Object::toString.javaMethod!!
@ -11,10 +11,7 @@ import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import net.corda.core.utilities.*
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
import net.corda.finance.SWISS_FRANCS
@ -45,7 +42,7 @@ import kotlin.test.assertTrue
class StandaloneCordaRPClientTest {
private companion object {
val log = loggerFor<StandaloneCordaRPClientTest>()
private val log = contextLogger()
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15200)
const val attachmentSize = 2116
@ -1,5 +1,5 @@
@ -348,6 +348,20 @@ abstract class FlowLogic<out T> {
fun trackStepsTreeIndex(): DataFeed<Int, Int>? {
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe
return progressTracker?.let {
DataFeed(it.stepsTreeIndex, it.stepsTreeIndexChanges)
fun trackStepsTree(): DataFeed<List<Pair<Int,String>>, List<Pair<Int,String>>>? {
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe
return progressTracker?.let {
DataFeed(it.allStepsLabels, it.stepsTreeChanges)
* Suspends the flow until the transaction with the specified ID is received, successfully verified and
* sent to the vault for processing. Note that this call suspends until the transaction is considered
@ -1,6 +1,6 @@
package net.corda.core.internal
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
import java.util.concurrent.atomic.AtomicReference
import kotlin.reflect.KProperty
@ -47,8 +47,12 @@ class ThreadLeakException : RuntimeException("Leaked thread detected: ${Thread.c
/** @param isAGlobalThreadBeingCreated whether a global thread (that should not inherit any value) is being created. */
class InheritableThreadLocalToggleField<T>(name: String,
private val log: Logger = loggerFor<InheritableThreadLocalToggleField<*>>(),
private val log: Logger = staticLog,
private val isAGlobalThreadBeingCreated: (Array<StackTraceElement>) -> Boolean) : ToggleField<T>(name) {
companion object {
private val staticLog = contextLogger()
private inner class Holder(value: T) : AtomicReference<T?>(value) {
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException()
fun childValue(): Holder? {
@ -3,8 +3,8 @@ package net.corda.core.internal.concurrent
import net.corda.core.internal.VisibleForTesting
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.match
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import org.slf4j.Logger
import java.time.Duration
import java.util.concurrent.CompletableFuture
@ -118,7 +118,7 @@ interface OpenFuture<V> : ValueOrException<V>, CordaFuture<V>
internal class CordaFutureImpl<V>(private val impl: CompletableFuture<V> = CompletableFuture()) : Future<V> by impl, OpenFuture<V> {
companion object {
private val defaultLog = loggerFor<CordaFutureImpl<*>>()
private val defaultLog = contextLogger()
internal val listenerFailedMessage = "Future listener failed:"
@ -226,10 +226,6 @@ interface CordaRPCOps : RPCOps {
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo
/** Returns and [Observable] object with future states of the node. */
fun nodeStateObservable(): Observable<NodeState>
* Returns network's notary identities, assuming this will not change while the node is running.
@ -468,8 +464,3 @@ inline fun <T, A, B, C, D, E, F, reified R : FlowLogic<T>> CordaRPCOps.startTrac
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>)
enum class NodeState {
@ -31,6 +31,9 @@ interface FlowHandle<A> : AutoCloseable {
interface FlowProgressHandle<A> : FlowHandle<A> {
val progress: Observable<String>
val stepsTreeIndexFeed: DataFeed<Int, Int>?
val stepsTreeFeed: DataFeed<List<Pair<Int, String>>, List<Pair<Int, String>>>?
* Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up
* server resources.
@ -52,10 +55,17 @@ data class FlowHandleImpl<A>(
data class FlowProgressHandleImpl<A>(
data class FlowProgressHandleImpl<A> @JvmOverloads constructor(
override val id: StateMachineRunId,
override val returnValue: CordaFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
override val progress: Observable<String>,
override val stepsTreeIndexFeed: DataFeed<Int, Int>? = null,
override val stepsTreeFeed: DataFeed<List<Pair<Int, String>>, List<Pair<Int, String>>>? = null) : FlowProgressHandle<A> {
// For API compatibility
fun copy(id: StateMachineRunId, returnValue: CordaFuture<A>, progress: Observable<String>): FlowProgressHandleImpl<A> {
return copy(id = id, returnValue = returnValue, progress = progress, stepsTreeFeed = null, stepsTreeIndexFeed = null)
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.
override fun close() {
@ -8,13 +8,11 @@ import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.ContractUpgradeFlow
import net.corda.core.messaging.NodeState
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import rx.Observable
import java.security.PublicKey
import java.sql.Connection
import java.time.Clock
@ -150,9 +148,6 @@ interface ServiceHub : ServicesForResolution {
/** The [NodeInfo] object corresponding to our own entry in the network map. */
val myInfo: NodeInfo
/** The [Observable] object used to communicate to RPC clients the state of the node. */
val myNodeStateObservable: Observable<NodeState>
* Return the singleton instance of the given Corda service type. This is a class that is annotated with
* [CordaService] and will have automatically been registered by the node.
@ -45,6 +45,13 @@ interface AttachmentStorage {
@Throws(FileAlreadyExistsException::class, IOException::class)
fun importAttachment(jar: InputStream, uploader: String, filename: String): AttachmentId
* Inserts or returns Attachment Id of attachment. Does not throw an exception if already uploaded.
* @param jar [InputStream] of Jar file
* @return [AttachmentId] of uploaded attachment
fun importOrGetAttachment(jar: InputStream): AttachmentId
* Searches attachment using given criteria and optional sort rules
* @param criteria Query criteria to use as a filter
@ -53,5 +60,12 @@ interface AttachmentStorage {
* @return List of AttachmentId of attachment matching criteria, sorted according to given sorting parameter
fun queryAttachments(criteria: AttachmentQueryCriteria, sorting: AttachmentSort? = null): List<AttachmentId>
* Searches for an attachment already in the store
* @param attachmentId The attachment Id
* @return true if it's in there
fun hasAttachment(attachmentId: AttachmentId): Boolean
@ -9,7 +9,7 @@ import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
import java.security.PublicKey
@ -45,8 +45,11 @@ abstract class NotaryService : SingletonSerializeAsToken() {
* of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary.
abstract class TrustedAuthorityNotaryService : NotaryService() {
protected open val log: Logger = loggerFor<TrustedAuthorityNotaryService>()
companion object {
private val staticLog = contextLogger()
protected open val log: Logger get() = staticLog
// TODO: specify the valid time window in config, and convert TimeWindowChecker to a utility method
protected abstract val timeWindowChecker: TimeWindowChecker
protected abstract val uniquenessProvider: UniquenessProvider
@ -24,12 +24,19 @@ infix fun Int.exactAdd(b: Int): Int = Math.addExact(this, b)
infix fun Long.exactAdd(b: Long): Long = Math.addExact(this, b)
* Get the [Logger] for a class using the syntax
* Usually you won't need this method:
* * If you're in a companion object, use [contextLogger]
* * If you're in an object singleton, use [LoggerFactory.getLogger] directly on javaClass
* `val logger = loggerFor<MyClass>()`
* Otherwise, this gets the [Logger] for a class using the syntax
* `private val log = loggerFor<MyClass>()`
inline fun <reified T : Any> loggerFor(): Logger = LoggerFactory.getLogger(T::class.java)
/** When called from a companion object, returns the logger for the enclosing class. */
fun Any.contextLogger(): Logger = LoggerFactory.getLogger(javaClass.enclosingClass)
/** Log a TRACE level message produced by evaluating the given lamdba, but only if TRACE logging is enabled. */
inline fun Logger.trace(msg: () -> String) {
if (isTraceEnabled) trace(msg())
@ -6,9 +6,6 @@ import rx.Subscription
import rx.subjects.PublishSubject
import java.util.*
// TODO: Expose the concept of errors.
// TODO: It'd be helpful if this class was at least partly thread safe.
* A progress tracker helps surface information about the progress of an operation to a user interface or API of some
* kind. It lets you define a set of _steps_ that represent an operation. A step is represented by an object (typically
@ -34,16 +31,16 @@ import java.util.*
class ProgressTracker(vararg steps: Step) {
sealed class Change {
data class Position(val tracker: ProgressTracker, val newStep: Step) : Change() {
sealed class Change(val progressTracker: ProgressTracker) {
data class Position(val tracker: ProgressTracker, val newStep: Step) : Change(tracker) {
override fun toString() = newStep.label
data class Rendering(val tracker: ProgressTracker, val ofStep: Step) : Change() {
data class Rendering(val tracker: ProgressTracker, val ofStep: Step) : Change(tracker) {
override fun toString() = ofStep.label
data class Structural(val tracker: ProgressTracker, val parent: Step) : Change() {
data class Structural(val tracker: ProgressTracker, val parent: Step) : Change(tracker) {
override fun toString() = "Structural step change in child of ${parent.label}"
@ -70,17 +67,23 @@ class ProgressTracker(vararg steps: Step) {
override fun equals(other: Any?) = other is DONE
/** The steps in this tracker, same as the steps passed to the constructor but with UNSTARTED and DONE inserted. */
val steps = arrayOf(UNSTARTED, *steps, DONE)
// This field won't be serialized.
private val _changes by transient { PublishSubject.create<Change>() }
private data class Child(val tracker: ProgressTracker, @Transient val subscription: Subscription?)
private val childProgressTrackers = mutableMapOf<Step, Child>()
/** The steps in this tracker, same as the steps passed to the constructor but with UNSTARTED and DONE inserted. */
val steps = arrayOf(UNSTARTED, *steps, DONE)
private var _allStepsCache: List<Pair<Int, Step>> = _allSteps()
// This field won't be serialized.
private val _changes by transient { PublishSubject.create<Change>() }
private val _stepsTreeChanges by transient { PublishSubject.create<List<Pair<Int, String>>>() }
private val _stepsTreeIndexChanges by transient { PublishSubject.create<Int>() }
init {
steps.forEach {
val childTracker = it.childProgressTracker()
@ -92,7 +95,15 @@ class ProgressTracker(vararg steps: Step) {
/** The zero-based index of the current step in the [steps] array (i.e. with UNSTARTED and DONE) */
var stepIndex: Int = 0
private set
private set(value) {
field = value
var stepsTreeIndex: Int = -1
private set(value) {
field = value
* Reading returns the value of steps[stepIndex], writing moves the position of the current tracker. Once moved to
@ -118,22 +129,39 @@ class ProgressTracker(vararg steps: Step) {
stepIndex = index
_changes.onNext(Change.Position(this, steps[index]))
curChangeSubscription = currentStep.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
curChangeSubscription = currentStep.changes.subscribe({
if (it is Change.Structural || it is Change.Rendering) rebuildStepsTree() else recalculateStepsTreeIndex()
}, { _changes.onError(it) })
if (currentStep == DONE) _changes.onCompleted()
if (currentStep == DONE) {
/** Returns the current step, descending into children to find the deepest step we are up to. */
val currentStepRecursive: Step
get() = getChildProgressTracker(currentStep)?.currentStepRecursive ?: currentStep
private fun currentStepRecursiveWithoutUnstarted(): Step {
val stepRecursive = getChildProgressTracker(currentStep)?.currentStepRecursive
return if (stepRecursive == null || stepRecursive == UNSTARTED) currentStep else stepRecursive
fun getChildProgressTracker(step: Step): ProgressTracker? = childProgressTrackers[step]?.tracker
fun setChildProgressTracker(step: ProgressTracker.Step, childProgressTracker: ProgressTracker) {
val subscription = childProgressTracker.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
val subscription = childProgressTracker.changes.subscribe({
if (it is Change.Structural || it is Change.Rendering) rebuildStepsTree() else recalculateStepsTreeIndex()
}, { _changes.onError(it) })
childProgressTrackers[step] = Child(childProgressTracker, subscription)
childProgressTracker.parent = this
_changes.onNext(Change.Structural(this, step))
private fun removeChildProgressTracker(step: ProgressTracker.Step) {
@ -142,6 +170,7 @@ class ProgressTracker(vararg steps: Step) {
_changes.onNext(Change.Structural(this, step))
@ -166,6 +195,18 @@ class ProgressTracker(vararg steps: Step) {
return cursor
private fun rebuildStepsTree() {
_allStepsCache = _allSteps()
private fun recalculateStepsTreeIndex() {
val step = currentStepRecursiveWithoutUnstarted()
stepsTreeIndex = _allStepsCache.indexOfFirst { it.second == step }
private fun _allSteps(level: Int = 0): List<Pair<Int, Step>> {
val result = ArrayList<Pair<Int, Step>>()
for (step in steps) {
@ -177,11 +218,15 @@ class ProgressTracker(vararg steps: Step) {
return result
private fun _allStepsLabels(level: Int = 0): List<Pair<Int, String>> = _allSteps(level).map { Pair(it.first, it.second.label) }
* A list of all steps in this ProgressTracker and the children, with the indent level provided starting at zero.
* Note that UNSTARTED is never counted, and DONE is only counted at the calling level.
val allSteps: List<Pair<Int, Step>> get() = _allSteps()
val allSteps: List<Pair<Int, Step>> get() = _allStepsCache
val allStepsLabels: List<Pair<Int, String>> get() = _allStepsLabels()
private var curChangeSubscription: Subscription? = null
@ -200,8 +245,15 @@ class ProgressTracker(vararg steps: Step) {
val changes: Observable<Change> get() = _changes
val stepsTreeChanges: Observable<List<Pair<Int,String>>> get() = _stepsTreeChanges
val stepsTreeIndexChanges: Observable<Int> get() = _stepsTreeIndexChanges
/** Returns true if the progress tracker has ended, either by reaching the [DONE] step or prematurely with an error */
val hasEnded: Boolean get() = _changes.hasCompleted() || _changes.hasThrowable()
// TODO: Expose the concept of errors.
// TODO: It'd be helpful if this class was at least partly thread safe.
@ -0,0 +1,13 @@
package net.corda.core.contracts
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
class PrivacySaltTest {
fun `all-zero PrivacySalt not allowed`() {
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
}.withMessage("Privacy salt should not be all zeros.")
@ -5,46 +5,45 @@ import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.node.network
import net.corda.testing.node.MockNetwork
import net.corda.testing.singleIdentity
import net.corda.testing.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
class ReceiveMultipleFlowTests {
private val mockNet = MockNetwork()
private val nodes = (0..2).map { mockNet.createPartyNode() }
fun stopNodes() {
fun `receive all messages in parallel using map style`() {
network(3) { nodes ->
val doubleValue = 5.0
nodes[1].registerAnswer(AlgorithmDefinition::class, doubleValue)
val stringValue = "Thriller"
nodes[2].registerAnswer(AlgorithmDefinition::class, stringValue)
val flow = nodes[0].services.startFlow(ParallelAlgorithmMap(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val result = flow.resultFuture.getOrThrow()
assertThat(result).isEqualTo(doubleValue * stringValue.length)
val doubleValue = 5.0
nodes[1].registerAnswer(AlgorithmDefinition::class, doubleValue)
val stringValue = "Thriller"
nodes[2].registerAnswer(AlgorithmDefinition::class, stringValue)
val flow = nodes[0].services.startFlow(ParallelAlgorithmMap(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val result = flow.resultFuture.getOrThrow()
assertThat(result).isEqualTo(doubleValue * stringValue.length)
fun `receive all messages in parallel using list style`() {
network(3) { nodes ->
val value1 = 5.0
nodes[1].registerAnswer(ParallelAlgorithmList::class, value1)
val value2 = 6.0
nodes[2].registerAnswer(ParallelAlgorithmList::class, value2)
val flow = nodes[0].services.startFlow(ParallelAlgorithmList(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val data = flow.resultFuture.getOrThrow()
assertThat(data.fold(1.0) { a, b -> a * b }).isEqualTo(value1 * value2)
val value1 = 5.0
nodes[1].registerAnswer(ParallelAlgorithmList::class, value1)
val value2 = 6.0
nodes[2].registerAnswer(ParallelAlgorithmList::class, value2)
val flow = nodes[0].services.startFlow(ParallelAlgorithmList(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val data = flow.resultFuture.getOrThrow()
assertThat(data.fold(1.0) { a, b -> a * b }).isEqualTo(value1 * value2)
class ParallelAlgorithmMap(doubleMember: Party, stringMember: Party) : AlgorithmDefinition(doubleMember, stringMember) {
@ -5,6 +5,7 @@ import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFails
import org.assertj.core.api.Assertions.*
class ProgressTrackerTest {
object SimpleSteps {
@ -24,13 +25,23 @@ class ProgressTrackerTest {
fun tracker() = ProgressTracker(AYY, BEE, SEA)
object BabySteps {
object UNOS : ProgressTracker.Step("unos")
object DOES : ProgressTracker.Step("does")
object TRES : ProgressTracker.Step("tres")
fun tracker() = ProgressTracker(UNOS, DOES, TRES)
lateinit var pt: ProgressTracker
lateinit var pt2: ProgressTracker
lateinit var pt3: ProgressTracker
fun before() {
pt = SimpleSteps.tracker()
pt2 = ChildSteps.tracker()
pt3 = BabySteps.tracker()
@ -81,6 +92,118 @@ class ProgressTrackerTest {
assertEquals(ChildSteps.BEE, pt2.nextStep())
fun `steps tree index counts children steps`() {
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
val allSteps = pt.allSteps
//capture notifications
val stepsIndexNotifications = LinkedList<Int>()
pt.stepsTreeIndexChanges.subscribe {
stepsIndexNotifications += it
val stepsTreeNotification = LinkedList<List<Pair<Int, String>>>()
pt.stepsTreeChanges.subscribe {
stepsTreeNotification += it
fun assertCurrentStepsTree(index:Int, step: ProgressTracker.Step) {
assertEquals(index, pt.stepsTreeIndex)
assertEquals(step, allSteps[pt.stepsTreeIndex].second)
//travel tree
pt.currentStep = SimpleSteps.ONE
assertCurrentStepsTree(0, SimpleSteps.ONE)
pt.currentStep = SimpleSteps.TWO
assertCurrentStepsTree(1, SimpleSteps.TWO)
pt2.currentStep = ChildSteps.BEE
assertCurrentStepsTree(3, ChildSteps.BEE)
pt.currentStep = SimpleSteps.THREE
assertCurrentStepsTree(5, SimpleSteps.THREE)
//assert no structure changes and proper steps propagation
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(0, 1, 3, 5))
fun `structure changes are pushed down when progress trackers are added`() {
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
//capture notifications
val stepsIndexNotifications = LinkedList<Int>()
pt.stepsTreeIndexChanges.subscribe {
stepsIndexNotifications += it
//put current state as a first change for simplicity when asserting
val stepsTreeNotification = mutableListOf(pt.allStepsLabels)
pt.stepsTreeChanges.subscribe {
stepsTreeNotification += it
fun assertCurrentStepsTree(index:Int, step: ProgressTracker.Step) {
assertEquals(index, pt.stepsTreeIndex)
assertEquals(step.label, stepsTreeNotification.last()[pt.stepsTreeIndex].second)
pt.currentStep = SimpleSteps.TWO
assertCurrentStepsTree(1, SimpleSteps.TWO)
pt.currentStep = SimpleSteps.FOUR
assertCurrentStepsTree(6, SimpleSteps.FOUR)
pt.setChildProgressTracker(SimpleSteps.THREE, pt3)
assertCurrentStepsTree(9, SimpleSteps.FOUR)
//assert no structure changes and proper steps propagation
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 6, 9))
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state
fun `structure changes are pushed down when progress trackers are removed`() {
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
//capture notifications
val stepsIndexNotifications = LinkedList<Int>()
pt.stepsTreeIndexChanges.subscribe {
stepsIndexNotifications += it
//put current state as a first change for simplicity when asserting
val stepsTreeNotification = mutableListOf(pt.allStepsLabels)
pt.stepsTreeChanges.subscribe {
stepsTreeNotification += it
fun assertCurrentStepsTree(index:Int, step: ProgressTracker.Step) {
assertEquals(index, pt.stepsTreeIndex)
assertEquals(step.label, stepsTreeNotification.last()[pt.stepsTreeIndex].second)
pt.currentStep = SimpleSteps.TWO
pt2.currentStep = ChildSteps.SEA
pt3.currentStep = BabySteps.UNOS
assertCurrentStepsTree(4, ChildSteps.SEA)
pt.setChildProgressTracker(SimpleSteps.TWO, pt3)
assertCurrentStepsTree(2, BabySteps.UNOS)
//assert no structure changes and proper steps propagation
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 2))
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state
fun `can be rewound`() {
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
@ -9,5 +9,6 @@
"https://docs.corda.net/releases/release-M13.0": "M13.0",
"https://docs.corda.net/releases/release-M14.0": "M14.0",
"https://docs.corda.net/releases/release-V1.0": "V1.0",
"https://docs.corda.net/releases/release-V2.0": "V2.0",
"https://docs.corda.net/head/": "Master"
@ -6,11 +6,14 @@ from the previous milestone release.
* ``ConfigUtilities`` now read system properties for a node. This allow to specify data source properties at runtime.
* ``AttachmentStorage`` now allows providing metadata on attachments upload - username and filename, currently as plain
strings. Those can be then used for querying, utilizing ``queryAttachments`` method of the same interface.
* ``SSH Server`` - The node can now expose shell via SSH server with proper authorization and permissioning built in.
* ``CordaRPCOps`` implementation now checks permissions for any function invocation, rather than just when starting flows.
* ``wellKnownPartyFromAnonymous()`` now always resolve the key to a ``Party``, then the party to the well known party.
@ -153,6 +153,11 @@ path to the node's base directory.
Each should be a string. Only the JARs in the directories are added, not the directories themselves. This is useful
for including JDBC drivers and the like. e.g. ``jarDirs = [ 'lib' ]``
:sshd: If provided, node will start internal SSH server which will provide a management shell. It uses the same credentials
and permissions as RPC subsystem. It has one required parameter.
:port: - the port to start SSH server on
:relay: If provided, the node will attempt to tunnel inbound connections via an external relay. The relay's address will be
advertised to the network map service instead of the provided ``p2pAddress``.
@ -9,10 +9,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.*
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashException
import net.corda.finance.flows.CashIssueFlow
@ -25,7 +22,7 @@ object CustomVaultQuery {
class Service(val services: AppServiceHub) : SingletonSerializeAsToken() {
private companion object {
val log = loggerFor<Service>()
private val log = contextLogger()
fun rebalanceCurrencyReserves(): List<Amount<Currency>> {
val nativeQuery = """
@ -33,15 +33,16 @@ service.
rpcPort 10006
webPort 10007
cordapps = ["net.corda:corda-finance:$corda_release_version"]
rpcUsers = [[ user: "user1", "password": "test", "permissions": []]]
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["ALL]]]
node {
name "O=PartyB,L=New York,C=US"
p2pPort 10008
rpcPort 10009
webPort 10010
sshdPort 10024
cordapps = ["net.corda:corda-finance:$corda_release_version"]
rpcUsers = [[ user: "user1", "password": "test", "permissions": []]]
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["ALL"]]]
@ -101,6 +102,9 @@ node via its built-in CRaSH shell.
Go to the terminal window displaying the CRaSH shell of PartyA. Typing ``help`` will display a list of the available
.. note:: Local terminal shell is available only in a development mode. In production environment SSH server can be enabled.
More about SSH and how to connect can be found on :doc:`Shell` page.
We want to create an IOU of 100 with PartyB. We start the ``IOUFlow`` by typing:
.. code:: bash
@ -53,6 +53,11 @@ reserve the right to move and rename it as it's not part of the public API as ye
logging name construction. If you can't find what you need to refer to, use the ``--logging-level`` option as above and
then determine the logging module name from the console output.
SSH access
Node can be configured to run SSH server. See :doc:`shell` for details.
Database access
@ -6,8 +6,24 @@ Here are release notes for each snapshot release from M9 onwards.
Support for observer/regulator nodes has returned. Read :doc:`tutorial-observer-nodes` to learn more or examine the
interest rate swaps demo.
Release 2.0
Following quickly on the heels of the release of Corda 1.0, Corda version 2.0 consolidates
a number of security updates for our dependent libraries alongside the reintroduction of the Observer node functionality.
This was absent from version 1 but based on user feedback its re-introduction removes the need for complicated "isRelevant()" checks.
In addition the fix for a small bug present in the coin selection code of V1.0 is integrated from master.
* **Version Bump**
Due to the introduction of new APIs, Corda 2.0 has a platform version of 2. This will be advertised in the network map structures
and via the versioning APIs.
* **Observer Nodes**
Adds the facility for transparent forwarding of transactions to some third party observer, such as a regulator. By having
that entity simply run an Observer node they can simply recieve a stream of digitally signed, de-duplicated reports that
can be used for reporting.
Release 1.0
@ -32,8 +48,8 @@ unless an incompatible change is required for security reasons:
Utilities and serialisers for working with JSON representations of basic types.
Our extensive testing frameworks will continue to evolve alongside future Corda APIs. As part of our commitment to ease of use and modularity
we have introduced a new test node driver module to encapsulate all test functionality in support of building standalone node integration
tests using our DSL driver.
we have introduced a new test node driver module to encapsulate all test functionality in support of building standalone node integration
tests using our DSL driver.
Please read :doc:`corda-api` for complete details.
@ -18,11 +18,47 @@ Some of its features include:
* View JMX metrics and monitoring exports.
* UNIX style pipes for both text and objects, an ``egrep`` command and a command for working with columnular data.
.. note:: A future version of Corda will add SSH access to the node.
It is based on the popular `CRaSH`_ shell used in various other projects and supports many of the same features.
The shell may be disabled by passing the ``--no-local-shell`` flag to the node.
Local terminal shell runs only in development mode. It may be disabled by passing the ``--no-local-shell`` flag to the node.
SSH server
Shell can also be accessible via SSH. By default SSH server is *disabled*. To enable it port must be configured - in ``node.conf`` file
.. code:: bash
sshd {
port = 2222
Authentication and authorization
SSH require user to login first - using the same users as RPC system. In fact, shell serves as a proxy to RPC and communicates
with node using RPC calls. This also means that RPC permissions are enforced. No permissions are required to allow the connection
and login in.
Watching flows (``flow watch``) requires ``InvokeRpc.stateMachinesFeed`` while starting flows requires
``InvokeRpc.startTrackedFlowDynamic`` and ``InvokeRpc.registeredFlows`` in addition to a permission for a particular flow.
Host key
The host key is loaded from ``sshkey/hostkey.pem`` file. If the file does not exist, it will be generated randomly, however
in the development mode seed may be tuned to give the same results on the same computer - in order to avoid host checking
Linux and MacOS computers usually come with SSH client preinstalled. On Windows it usually require extra download.
Usual connection syntax is ``ssh user@host -p 2222`` - where ``user`` is a RPC username, and ``-p`` specifies a port parameters -
it's the same as setup in ``node.conf`` file. ``host`` should point to a node hostname, usually ``localhost`` if connecting and
running node on the same computer. Password will be asked after establishing connection.
:note: While developing, checking multiple samples or simply restarting a node frequently host key may be regenerated. SSH usually
saved once trusted hosts and will refuse to connect in case of a change. Then check may be disabled with extra options
``ssh -o StrictHostKeyChecking=no user@host -p2222``. This option should never be used in production environment!
Getting help
@ -4,7 +4,7 @@ import net.corda.core.contracts.*
import net.corda.core.contracts.Amount.Companion.sumOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import java.security.PublicKey
import java.util.*
@ -32,8 +32,7 @@ data class PartyAndAmount<T : Any>(val party: AbstractParty, val amount: Amount<
abstract class OnLedgerAsset<T : Any, C : CommandData, S : FungibleAsset<T>> : Contract {
companion object {
val log = loggerFor<OnLedgerAsset<*, *, *>>()
private val log = contextLogger()
* Generate a transaction that moves an amount of currency to the given pubkey.
@ -4,15 +4,12 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.*
import net.corda.finance.contracts.asset.Cash
import java.sql.*
@ -46,7 +43,7 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
val log = loggerFor<AbstractCashSelection>()
private val log = contextLogger()
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
@ -5,8 +5,8 @@ import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
@ -15,7 +15,7 @@ import java.util.*
class CashSelectionH2Impl : AbstractCashSelection() {
companion object {
const val JDBC_DRIVER_NAME = "H2 JDBC Driver"
val log = loggerFor<CashSelectionH2Impl>()
private val log = contextLogger()
override fun isCompatible(metadata: DatabaseMetaData): Boolean {
@ -3,10 +3,7 @@ package net.corda.finance.contracts.asset.cash.selection
import net.corda.core.contracts.Amount
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.toBase58String
import net.corda.core.utilities.*
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
@ -16,7 +13,7 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
companion object {
val JDBC_DRIVER_NAME = "PostgreSQL JDBC Driver"
val log = loggerFor<CashSelectionPostgreSQLImpl>()
private val log = contextLogger()
override fun isCompatible(metadata: DatabaseMetaData): Boolean {
@ -63,13 +60,13 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
paramOffset += 1
if (onlyFromIssuerParties.isNotEmpty()) {
val issuerKeys = connection.createArrayOf("BYTEA", onlyFromIssuerParties.map
val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map
{ it.owningKey.toBase58String() }.toTypedArray())
statement.setArray(3 + paramOffset, issuerKeys)
paramOffset += 1
if (withIssuerRefs.isNotEmpty()) {
val issuerRefs = connection.createArrayOf("VARCHAR", withIssuerRefs.map
val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map
{ it.bytes }.toTypedArray())
statement.setArray(3 + paramOffset, issuerRefs)
paramOffset += 1
@ -79,5 +76,4 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
return statement.executeQuery()
@ -27,7 +27,7 @@ class CashIssueAndPaymentFlow(val amount: Amount<Currency>,
val anonymous: Boolean,
val notary: Party,
progressTracker: ProgressTracker) : AbstractCashFlow<AbstractCashFlow.Result>(progressTracker) {
constructor(amount: Amount<Currency>,
constructor(amount: Amount<Currency>,
issueRef: OpaqueBytes,
recipient: Party,
anonymous: Boolean,
@ -96,15 +96,15 @@ class Node(private val project: Project) : CordformNode() {
* Set the SSHD port for this node.
* Enables SSH access on given port
* @param sshdPort The SSHD port.
* @param sshdPort The port for SSH server to listen on
fun sshdPort(sshdPort: Int) {
config = config.withValue("sshdAddress",
fun sshdPort(sshdPort: Int?) {
config = config.withValue("sshd.port", ConfigValueFactory.fromAnyRef(sshdPort))
internal fun build() {
@ -5,7 +5,7 @@ import net.corda.core.internal.ThreadBox
import net.corda.core.internal.createDirectories
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import rx.Observable
import rx.Scheduler
import rx.Subscription
@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit
class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseable {
companion object {
private val log = loggerFor<NodeInfoFilesCopier>()
private val log = contextLogger()
const val NODE_INFO_FILE_NAME_PREFIX = "nodeInfo-"
@ -10,7 +10,7 @@ import net.corda.nodeapi.internal.AttachmentsClassLoader
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.serialization.amqp.hasAnnotationInHierarchy
import net.corda.nodeapi.internal.serialization.kryo.ThrowableSerializer
import java.io.PrintWriter
@ -187,7 +187,7 @@ class TransientClassWhiteList(delegate: ClassWhitelist) : AbstractMutableClassWh
class LoggingWhitelist(val delegate: ClassWhitelist, val global: Boolean = true) : MutableClassWhitelist {
companion object {
val log = loggerFor<LoggingWhitelist>()
private val log = contextLogger()
val globallySeen: MutableSet<String> = Collections.synchronizedSet(mutableSetOf())
val journalWriter: PrintWriter? = openOptionalDynamicWhitelistJournal()
@ -36,7 +36,6 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
@ -54,8 +53,8 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
@ -1,7 +1,7 @@
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.nameForType
import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.codec.Data
@ -17,7 +17,9 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
open val kotlinConstructor = constructorForDeserialization(clazz)
val javaConstructor by lazy { kotlinConstructor?.javaConstructor }
private val logger = loggerFor<ObjectSerializer>()
companion object {
private val logger = contextLogger()
open internal val propertySerializers: Collection<PropertySerializer> by lazy {
propertiesForSerialization(kotlinConstructor, clazz, factory)
@ -1,6 +1,6 @@
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import org.apache.qpid.proton.amqp.Binary
import org.apache.qpid.proton.codec.Data
import java.lang.reflect.Method
@ -61,8 +61,7 @@ sealed class PropertySerializer(val name: String, val readMethod: Method?, val r
companion object {
private val logger = loggerFor<PropertySerializer>()
private val logger = contextLogger()
fun make(name: String, readMethod: Method?, resolvedType: Type, factory: SerializerFactory): PropertySerializer {
readMethod?.isAccessible = true
if (SerializerFactory.isPrimitive(resolvedType)) {
@ -0,0 +1,27 @@
package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
import java.io.NotSerializableException
import java.security.cert.CertPath
import java.security.cert.CertificateException
import java.security.cert.CertificateFactory
class CertPathSerializer(factory: SerializerFactory)
: CustomSerializer.Proxy<CertPath, CertPathSerializer.CertPathProxy>(CertPath::class.java, CertPathProxy::class.java, factory) {
override fun toProxy(obj: CertPath): CertPathProxy = CertPathProxy(obj.type, obj.encoded)
override fun fromProxy(proxy: CertPathProxy): CertPath {
try {
val cf = CertificateFactory.getInstance(proxy.type)
return cf.generateCertPath(proxy.encoded.inputStream())
} catch (ce: CertificateException) {
val nse = NotSerializableException("java.security.cert.CertPath: $type")
throw nse
data class CertPathProxy(val type: String, val encoded: ByteArray)
@ -1,30 +0,0 @@
package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.core.crypto.Crypto
import net.corda.core.identity.PartyAndCertificate
import net.corda.nodeapi.internal.serialization.amqp.*
import java.io.ByteArrayInputStream
import java.io.NotSerializableException
import java.security.cert.CertPath
import java.security.cert.CertificateException
import java.security.cert.CertificateFactory
* A serializer that writes out a party and certificate in encoded format.
class PartyAndCertificateSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<PartyAndCertificate, PartyAndCertificateSerializer.PartyAndCertificateProxy>(PartyAndCertificate::class.java, PartyAndCertificateProxy::class.java, factory) {
override fun toProxy(obj: PartyAndCertificate): PartyAndCertificateProxy = PartyAndCertificateProxy(obj.certPath.type, obj.certPath.encoded)
override fun fromProxy(proxy: PartyAndCertificateProxy): PartyAndCertificate {
try {
val cf = CertificateFactory.getInstance(proxy.type)
return PartyAndCertificate(cf.generateCertPath(ByteArrayInputStream(proxy.encoded)))
} catch (ce: CertificateException) {
val nse = NotSerializableException("java.security.cert.CertPath: " + type)
throw nse
data class PartyAndCertificateProxy(val type: String, val encoded: ByteArray)
@ -3,14 +3,14 @@ package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.core.CordaRuntimeException
import net.corda.core.CordaThrowable
import net.corda.core.serialization.SerializationFactory
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.serialization.amqp.*
import java.io.NotSerializableException
class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<Throwable, ThrowableSerializer.ThrowableProxy>(Throwable::class.java, ThrowableProxy::class.java, factory) {
companion object {
private val logger = loggerFor<ThrowableSerializer>()
private val logger = contextLogger()
override val revealSubclassesInSchema: Boolean = true
@ -1,23 +0,0 @@
package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.nodeapi.internal.serialization.amqp.*
import org.apache.qpid.proton.codec.Data
import org.bouncycastle.asn1.ASN1InputStream
import org.bouncycastle.asn1.x500.X500Name
import java.lang.reflect.Type
* Custom serializer for X500 names that utilizes their ASN.1 encoding on the wire.
object X500NameSerializer : CustomSerializer.Implements<X500Name>(X500Name::class.java) {
override val schemaForDocumentation = Schema(listOf(RestrictedType(type.toString(), "", listOf(type.toString()), SerializerFactory.primitiveTypeName(ByteArray::class.java)!!, descriptor, emptyList())))
override fun writeDescribedObject(obj: X500Name, data: Data, type: Type, output: SerializationOutput) {
output.writeObject(obj.encoded, data, clazz)
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): X500Name {
val binary = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
return X500Name.getInstance(ASN1InputStream(binary).readObject())
@ -1,23 +0,0 @@
package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.core.crypto.Crypto
import net.corda.nodeapi.internal.serialization.amqp.*
import org.apache.qpid.proton.codec.Data
import org.bouncycastle.cert.X509CertificateHolder
import java.lang.reflect.Type
* A serializer that writes out a certificate in X.509 format.
object X509CertificateHolderSerializer : CustomSerializer.Implements<X509CertificateHolder>(X509CertificateHolder::class.java) {
override val schemaForDocumentation = Schema(listOf(RestrictedType(type.toString(), "", listOf(type.toString()), SerializerFactory.primitiveTypeName(ByteArray::class.java)!!, descriptor, emptyList())))
override fun writeDescribedObject(obj: X509CertificateHolder, data: Data, type: Type, output: SerializationOutput) {
output.writeObject(obj.encoded, data, clazz)
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): X509CertificateHolder {
val bits = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
return X509CertificateHolder(bits)
@ -0,0 +1,27 @@
package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.nodeapi.internal.serialization.amqp.*
import org.apache.qpid.proton.codec.Data
import java.lang.reflect.Type
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
object X509CertificateSerializer : CustomSerializer.Implements<X509Certificate>(X509Certificate::class.java) {
override val schemaForDocumentation = Schema(listOf(RestrictedType(
override fun writeDescribedObject(obj: X509Certificate, data: Data, type: Type, output: SerializationOutput) {
output.writeObject(obj.encoded, data, clazz)
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): X509Certificate {
val bits = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
return CertificateFactory.getInstance("X.509").generateCertificate(bits.inputStream()) as X509Certificate
@ -32,8 +32,6 @@ import net.corda.nodeapi.internal.serialization.GeneratedAttachment
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPrivateKey
import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPublicKey
import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPrivateCrtKey
@ -53,6 +51,7 @@ import java.io.InputStream
import java.lang.reflect.Modifier.isPublic
import java.security.PublicKey
import java.security.cert.CertPath
import java.security.cert.X509Certificate
import java.util.*
import kotlin.collections.ArrayList
@ -109,8 +108,7 @@ object DefaultKryoCustomizer {
register(FileInputStream::class.java, InputStreamSerializer)
register(CertPath::class.java, CertPathSerializer)
register(X509CertPath::class.java, CertPathSerializer)
register(X500Name::class.java, X500NameSerializer)
register(X509CertificateHolder::class.java, X509CertificateSerializer)
register(X509Certificate::class.java, X509CertificateSerializer)
register(BCECPrivateKey::class.java, PrivateKeySerializer)
register(BCECPublicKey::class.java, publicKeySerializer)
register(BCRSAPrivateCrtKey::class.java, PrivateKeySerializer)
@ -15,7 +15,8 @@ import net.corda.core.crypto.TransactionSignature
import net.corda.core.identity.Party
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationContext.UseCase.*
import net.corda.core.serialization.SerializationContext.UseCase.Checkpoint
import net.corda.core.serialization.SerializationContext.UseCase.Storage
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.toFuture
@ -24,9 +25,6 @@ import net.corda.core.transactions.*
import net.corda.core.utilities.SgxSupport
import net.corda.nodeapi.internal.serialization.CordaClassResolver
import net.corda.nodeapi.internal.serialization.serializationContextKey
import org.bouncycastle.asn1.ASN1InputStream
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.cert.X509CertificateHolder
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Observable
@ -37,6 +35,7 @@ import java.security.PrivateKey
import java.security.PublicKey
import java.security.cert.CertPath
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.reflect.KClass
@ -475,46 +474,28 @@ object ClassSerializer : Serializer<Class<*>>() {
* For serialising an [X500Name] without touching Sun internal classes.
object X500NameSerializer : Serializer<X500Name>() {
override fun read(kryo: Kryo, input: Input, type: Class<X500Name>): X500Name {
return X500Name.getInstance(ASN1InputStream(input.readBytes()).readObject())
override fun write(kryo: Kryo, output: Output, obj: X500Name) {
* For serialising an [CertPath] in an X.500 standard format.
object CertPathSerializer : Serializer<CertPath>() {
val factory: CertificateFactory = CertificateFactory.getInstance("X.509")
override fun read(kryo: Kryo, input: Input, type: Class<CertPath>): CertPath {
return factory.generateCertPath(input)
val factory = CertificateFactory.getInstance(input.readString())
return factory.generateCertPath(input.readBytesWithLength().inputStream())
override fun write(kryo: Kryo, output: Output, obj: CertPath) {
* For serialising an [X509CertificateHolder] in an X.500 standard format.
object X509CertificateSerializer : Serializer<X509CertificateHolder>() {
override fun read(kryo: Kryo, input: Input, type: Class<X509CertificateHolder>): X509CertificateHolder {
return X509CertificateHolder(input.readBytes())
object X509CertificateSerializer : Serializer<X509Certificate>() {
override fun read(kryo: Kryo, input: Input, type: Class<X509Certificate>): X509Certificate {
val factory = CertificateFactory.getInstance("X.509")
return factory.generateCertificate(input.readBytesWithLength().inputStream()) as X509Certificate
override fun write(kryo: Kryo, output: Output, obj: X509CertificateHolder) {
override fun write(kryo: Kryo, output: Output, obj: X509Certificate) {
@ -21,13 +21,14 @@ import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.ExpectedException
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.InputStream
import java.time.Instant
import java.util.Collections
import kotlin.test.*
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class KryoTests {
@ -36,9 +37,6 @@ class KryoTests {
private lateinit var factory: SerializationFactory
private lateinit var context: SerializationContext
val expectedEx: ExpectedException = ExpectedException.none()
fun setup() {
factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
@ -51,7 +49,7 @@ class KryoTests {
fun ok() {
fun `simple data class`() {
val birthday = Instant.parse("1984-04-17T00:30:00.00Z")
val mike = Person("mike", birthday)
val bits = mike.serialize(factory, context)
@ -59,7 +57,7 @@ class KryoTests {
fun nullables() {
fun `null values`() {
val bob = Person("bob", null)
val bits = bob.serialize(factory, context)
assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null))
@ -202,13 +200,6 @@ class KryoTests {
assertEquals(expected, actual)
fun `all-zero PrivacySalt not allowed`() {
expectedEx.expectMessage("Privacy salt should not be all zeros.")
private object TestSingleton
@ -2,16 +2,16 @@ package net.corda.nodeapi.internal.serialization
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.util.DefaultClassResolver
import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.services.statemachine.SessionData
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.amqpSpecific
import net.corda.testing.kryoSpecific
import net.corda.testing.SerializationEnvironmentRule
import org.assertj.core.api.Assertions
import org.bouncycastle.asn1.x500.X500Name
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert.assertArrayEquals
import org.junit.Rule
import org.junit.Test
@ -53,7 +53,7 @@ class MapsSerializationTest {
fun `check throws for forbidden declared type`() = amqpSpecific("Such exceptions are not expected in Kryo mode.") {
val payload = HashMap<String, String>(smallMap)
val wrongPayloadType = WrongPayloadType(payload)
Assertions.assertThatThrownBy { wrongPayloadType.serialize() }
assertThatThrownBy { wrongPayloadType.serialize() }
"Map type class java.util.HashMap is unstable under iteration. Suggested fix: use java.util.LinkedHashMap instead.")
@ -62,27 +62,29 @@ class MapsSerializationTest {
data class MyKey(val keyContent: Double)
data class MyValue(val valueContent: X500Name)
data class MyValue(val valueContent: CordaX500Name)
fun `check map serialization works with custom types`() {
val myMap = mapOf(
MyKey(1.0) to MyValue(X500Name("CN=one")),
MyKey(10.0) to MyValue(X500Name("CN=ten")))
MyKey(1.0) to MyValue(CordaX500Name("OOO", "LLL", "CC")),
MyKey(10.0) to MyValue(CordaX500Name("OO", "LL", "CC")))
fun `check empty map serialises as Java emptyMap`() = kryoSpecific("Specifically checks Kryo serialization") {
val nameID = 0
val serializedForm = emptyMap<Int, Int>().serialize()
val output = ByteArrayOutputStream().apply {
write(DefaultClassResolver.NAME + 2)
fun `check empty map serialises as Java emptyMap`() {
kryoSpecific("Specifically checks Kryo serialization") {
val nameID = 0
val serializedForm = emptyMap<Int, Int>().serialize()
val output = ByteArrayOutputStream().apply {
write(DefaultClassResolver.NAME + 2)
assertArrayEquals(output.toByteArray(), serializedForm.bytes)
assertArrayEquals(output.toByteArray(), serializedForm.bytes)
@ -1,17 +1,20 @@
@file:Suppress("unused", "MemberVisibilityCanPrivate")
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.client.rpc.RPCException
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.secureRandomBytes
import net.corda.core.flows.FlowException
import net.corda.core.identity.AbstractParty
import net.corda.core.internal.toX509CertHolder
import net.corda.core.internal.AbstractAttachment
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.MissingAttachmentsException
import net.corda.core.serialization.SerializationFactory
import net.corda.core.transactions.LedgerTransaction
import net.corda.client.rpc.RPCException
import net.corda.core.contracts.*
import net.corda.core.internal.AbstractAttachment
import net.corda.core.serialization.MissingAttachmentsException
import net.corda.core.utilities.OpaqueBytes
import net.corda.nodeapi.internal.serialization.AllWhitelist
import net.corda.nodeapi.internal.serialization.EmptyWhitelist
import net.corda.nodeapi.internal.serialization.GeneratedAttachment
@ -25,10 +28,8 @@ import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.qpid.proton.amqp.*
import org.apache.qpid.proton.codec.DecoderImpl
import org.apache.qpid.proton.codec.EncoderImpl
import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertNotSame
import org.junit.Assert.assertSame
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.*
import org.junit.Assert.*
import org.junit.Ignore
import org.junit.Test
import java.io.ByteArrayInputStream
@ -39,10 +40,7 @@ import java.nio.ByteBuffer
import java.time.*
import java.time.temporal.ChronoUnit
import java.util.*
import kotlin.reflect.full.declaredFunctions
import kotlin.reflect.full.declaredMemberFunctions
import kotlin.reflect.full.superclasses
import kotlin.reflect.jvm.javaMethod
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@ -79,7 +77,6 @@ class SerializationOutputTests {
data class Woo(val fred: Int) {
val bob = "Bob"
@ -89,7 +86,6 @@ class SerializationOutputTests {
data class AnnotatedWoo(val fred: Int) {
val bob = "Bob"
@ -151,6 +147,13 @@ class SerializationOutputTests {
data class PolymorphicProperty(val foo: FooInterface?)
class NonZeroByte(val value: Byte) {
init {
require(value.toInt() != 0) { "Zero not allowed" }
private inline fun <reified T : Any> serdes(obj: T,
factory: SerializerFactory = SerializerFactory(
AllWhitelist, ClassLoader.getSystemClassLoader()),
@ -406,6 +409,32 @@ class SerializationOutputTests {
fun `class constructor is invoked on deserialisation`() {
val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()))
val des = DeserializationInput(ser.serializerFactory)
val serialisedOne = ser.serialize(NonZeroByte(1)).bytes
val serialisedTwo = ser.serialize(NonZeroByte(2)).bytes
// Find the index that holds the value byte
val valueIndex = serialisedOne.zip(serialisedTwo).mapIndexedNotNull { index, (oneByte, twoByte) ->
if (oneByte.toInt() == 1 && twoByte.toInt() == 2) index else null
val copy = serialisedTwo.clone()
// Double check
copy[valueIndex] = 0x03
assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java).value).isEqualTo(3)
// Now use the forbidden value
copy[valueIndex] = 0x00
assertThatExceptionOfType(NotSerializableException::class.java).isThrownBy {
des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java)
}.withMessageContaining("Zero not allowed")
fun `test custom serializers on public key`() {
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
@ -762,26 +791,32 @@ class SerializationOutputTests {
fun `test certificate holder serialize`() {
fun `test privacy salt serialize`() {
fun `test X509 certificate serialize`() {
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
val obj = BOB_IDENTITY.certificate.toX509CertHolder()
val obj = BOB_IDENTITY.certificate
serdes(obj, factory, factory2)
fun `test party and certificate serialize`() {
fun `test cert path serialize`() {
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
val obj = BOB_IDENTITY
val obj = BOB_IDENTITY.certPath
serdes(obj, factory, factory2)
@ -139,8 +139,14 @@ dependencies {
compile "io.netty:netty-all:$netty_version"
// CRaSH: An embeddable monitoring and admin shell with support for adding new commands written in Groovy.
compile("com.github.corda.crash:crash.shell:d5da86ba1b38e9c33af2a621dd15ba286307bec4") {
compile("com.github.corda.crash:crash.shell:$crash_version") {
exclude group: "org.slf4j", module: "slf4j-jdk14"
exclude group: "org.bouncycastle"
compile("com.github.corda.crash:crash.connectors.ssh:$crash_version") {
exclude group: "org.slf4j", module: "slf4j-jdk14"
exclude group: "org.bouncycastle"
// OkHTTP: Simple HTTP library.
@ -160,6 +166,9 @@ dependencies {
integrationTestCompile "junit:junit:$junit_version"
integrationTestCompile "org.assertj:assertj-core:${assertj_version}"
// Jsh: Testing SSH server
integrationTestCompile group: 'com.jcraft', name: 'jsch', version: '0.1.54'
// Jetty dependencies for NetworkMapClient test.
// Web stuff: for HTTP[S] servlets
testCompile "org.eclipse.jetty:jetty-servlet:${jetty_version}"
Normal file
Normal file
@ -0,0 +1,170 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.jcraft.jsch.ChannelExec
import com.jcraft.jsch.JSch
import com.jcraft.jsch.JSchException
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.driver.driver
import org.bouncycastle.util.io.Streams
import org.junit.Test
import net.corda.node.services.Permissions.Companion.startFlow
import java.net.ConnectException
import kotlin.test.assertTrue
import kotlin.test.fail
import org.assertj.core.api.Assertions.assertThat
import java.util.regex.Pattern
class SSHServerTest {
fun `ssh server does not start be default`() {
val user = User("u", "p", setOf())
// The driver will automatically pick up the annotated flows below
driver() {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user))
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
try {
} catch (e:JSchException) {
assertTrue(e.cause is ConnectException)
fun `ssh server starts when configured`() {
val user = User("u", "p", setOf())
// The driver will automatically pick up the annotated flows below
driver {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
fun `ssh server verify credentials`() {
val user = User("u", "p", setOf())
// The driver will automatically pick up the annotated flows below
driver {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
try {
fail("Server should reject invalid credentials")
} catch (e: JSchException) {
//There is no specialized exception for this
assertTrue(e.message == "Auth fail")
fun `ssh respects permissions`() {
val user = User("u", "p", setOf(startFlow<FlowICanRun>()))
// The driver will automatically pick up the annotated flows below
driver(isDebug = true) {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
val channel = session.openChannel("exec") as ChannelExec
channel.setCommand("start FlowICannotRun otherParty: \"O=Alice Corp,L=Madrid,C=ES\"")
val response = String(Streams.readAll(channel.inputStream))
val flowNameEscaped = Pattern.quote("StartFlow.${SSHServerTest::class.qualifiedName}$${FlowICannotRun::class.simpleName}")
assertThat(response).matches("(?s)User not permissioned with any of \\[[^]]*${flowNameEscaped}.*")
fun `ssh runs flows`() {
val user = User("u", "p", setOf(startFlow<FlowICanRun>()))
// The driver will automatically pick up the annotated flows below
driver(isDebug = true) {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
val channel = session.openChannel("exec") as ChannelExec
channel.setCommand("start FlowICanRun")
val response = String(Streams.readAll(channel.inputStream))
//There are ANSI control characters involved, so we want to avoid direct byte to byte matching
assertThat(response.lines()).filteredOn( { it.contains("✓") && it.contains("Done")}).hasSize(1)
class FlowICanRun : FlowLogic<String>() {
private val HELLO_STEP = ProgressTracker.Step("Hello")
override fun call(): String {
progressTracker?.currentStep = HELLO_STEP
return "bambam"
override val progressTracker: ProgressTracker? = ProgressTracker(HELLO_STEP)
class FlowICannotRun(val otherParty: Party) : FlowLogic<String>() {
override fun call(): String = initiateFlow(otherParty).receive<String>().unwrap { it }
override val progressTracker: ProgressTracker? = ProgressTracker()
@ -13,8 +13,8 @@ import net.corda.core.internal.div
import net.corda.core.internal.toLedgerTransaction
import net.corda.core.serialization.SerializationFactory
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.testing.*
@ -43,7 +43,7 @@ class AttachmentLoadingTests : IntegrationTest() {
private companion object {
val logger = loggerFor<AttachmentLoadingTests>()
private val logger = contextLogger()
val isolatedJAR = AttachmentLoadingTests::class.java.getResource("isolated.jar")!!
val ISOLATED_CONTRACT_ID = "net.corda.finance.contracts.isolated.AnotherDummyContract"
@ -2,6 +2,9 @@ package net.corda.node.shell;
// See the comments at the top of run.java
import net.corda.core.messaging.CordaRPCOps;
import net.corda.node.utilities.ANSIProgressRenderer;
import net.corda.node.utilities.CRaSHNSIProgressRenderer;
import org.crsh.cli.*;
import org.crsh.command.*;
import org.crsh.text.*;
@ -9,6 +12,7 @@ import org.crsh.text.ui.TableElement;
import java.util.*;
import static net.corda.node.services.messaging.RPCServerKt.CURRENT_RPC_CONTEXT;
import static net.corda.node.shell.InteractiveShell.*;
@ -25,25 +29,27 @@ public class FlowShellCommand extends InteractiveShellCommand {
@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input
) {
startFlow(name, input, out);
startFlow(name, input, out, ops(), ansiProgressRenderer());
// TODO Limit number of flows shown option?
@Usage("watch information about state machines running on the node with result information")
public void watch(InvocationContext<TableElement> context) throws Exception {
runStateMachinesView(out, ops());
static void startFlow(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input,
RenderPrintWriter out) {
RenderPrintWriter out,
CordaRPCOps rpcOps,
ANSIProgressRenderer ansiProgressRenderer) {
if (name == null) {
out.println("You must pass a name for the flow, see 'man flow'", Color.red);
String inp = input == null ? "" : String.join(" ", input).trim();
runFlowByNameFragment(name, inp, out);
runFlowByNameFragment(name, inp, out, rpcOps, ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out) );
@ -2,6 +2,8 @@ package net.corda.node.shell;
// A simple forwarder to the "flow start" command, for easier typing.
import net.corda.node.utilities.ANSIProgressRenderer;
import net.corda.node.utilities.CRaSHNSIProgressRenderer;
import org.crsh.cli.*;
import java.util.*;
@ -11,6 +13,7 @@ public class StartShellCommand extends InteractiveShellCommand {
@Man("An alias for 'flow start'. Example: \"start Yo target: Some other company\"")
public void main(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input) {
FlowShellCommand.startFlow(name, input, out);
ANSIProgressRenderer ansiProgressRenderer = ansiProgressRenderer();
FlowShellCommand.startFlow(name, input, out, ops(), ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out));
@ -36,6 +36,7 @@ import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.RPCUserService
import net.corda.node.services.api.*
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NodeConfiguration
@ -55,11 +56,11 @@ import net.corda.node.services.transactions.*
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
import rx.Observable
import rx.subjects.PublishSubject
import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.security.KeyPair
@ -119,7 +120,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl
protected val nodeStateObservable: PublishSubject<NodeState> = PublishSubject.create<NodeState>()
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager
@ -130,6 +130,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val _nodeReadyFuture = openFuture<Unit>()
protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) }
lateinit var userService: RPCUserService get
/** Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database */
val nodeReadyFuture: CordaFuture<Unit>
@ -211,8 +213,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
runOnStop += network::stop
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
@ -243,6 +244,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
open fun startShell(rpcOps: CordaRPCOps) {
InteractiveShell.startShell(configuration, rpcOps, userService, _services.identityService, _services.database)
private fun initNodeInfo(): Pair<Set<KeyPair>, NodeInfo> {
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
val keyPairs = mutableSetOf(identityKeyPair)
@ -624,9 +629,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
// unsubscribes us forcibly, rather than blocking the shutdown process.
// Notify observers that the node is shutting down
// Run shutdown hooks in opposite order to starting
for (toRun in runOnStop.reversed()) {
@ -727,7 +729,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
override val networkService: MessagingService get() = network
override val clock: Clock get() = platformClock
override val myNodeStateObservable: Observable<NodeState> get() = nodeStateObservable
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
@ -20,8 +20,8 @@ import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.context
@ -117,10 +117,6 @@ internal class CordaRPCOpsImpl(
return services.myInfo
override fun nodeStateObservable(): Observable<NodeState> {
return services.myNodeStateObservable
override fun notaryIdentities(): List<Party> {
return services.networkMapCache.notaryIdentities
@ -142,7 +138,9 @@ internal class CordaRPCOpsImpl(
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
progress = stateMachine.logic.track()?.updates ?: Observable.empty()
progress = stateMachine.logic.track()?.updates ?: Observable.empty(),
stepsTreeIndexFeed = stateMachine.logic.trackStepsTreeIndex(),
stepsTreeFeed = stateMachine.logic.trackStepsTree()
@ -296,6 +294,6 @@ internal class CordaRPCOpsImpl(
companion object {
private val log = loggerFor<CordaRPCOpsImpl>()
private val log = contextLogger()
@ -8,20 +8,20 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.contextLogger
import net.corda.node.VersionInfo
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.config.VerifierType
import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
@ -49,7 +49,7 @@ open class Node(configuration: NodeConfiguration,
cordappLoader: CordappLoader = makeCordappLoader(configuration)
) : AbstractNode(configuration, createClock(configuration), versionInfo, cordappLoader) {
companion object {
private val logger = loggerFor<Node>()
private val staticLog = contextLogger()
var renderBasicInfoToConsole = true
/** Used for useful info that we always want to show, even when not logging to the console */
@ -79,8 +79,11 @@ open class Node(configuration: NodeConfiguration,
override val log: Logger get() = logger
override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService
override val log: Logger get() = staticLog
override fun makeTransactionVerifierService(): TransactionVerifierService = when (configuration.verifierType) {
VerifierType.OutOfProcess -> verifierMessagingClient!!.verifierService
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
@ -127,7 +130,6 @@ open class Node(configuration: NodeConfiguration,
private var shutdownHook: ShutdownHook? = null
private lateinit var userService: RPCUserService
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers)
@ -135,15 +137,18 @@ open class Node(configuration: NodeConfiguration,
val advertisedAddress = info.addresses.single()
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
return NodeMessagingClient(
rpcMessagingClient = RPCMessagingClient(configuration, serverAddress)
verifierMessagingClient = when (configuration.verifierType) {
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics)
VerifierType.InMemory -> null
return P2PMessagingClient(
@ -202,9 +207,19 @@ open class Node(configuration: NodeConfiguration,
runOnStop += this::stop
// Start up the MQ client.
(network as NodeMessagingClient).start(rpcOps, userService)
// Start up the MQ clients.
rpcMessagingClient.run {
runOnStop += this::stop
start(rpcOps, userService)
verifierMessagingClient?.run {
runOnStop += this::stop
(network as P2PMessagingClient).apply {
runOnStop += this::stop
@ -273,7 +288,7 @@ open class Node(configuration: NodeConfiguration,
{ th -> logger.error("Unexpected exception", th) }
{ th -> staticLog.error("Unexpected exception", th) } // XXX: Why not use log?
shutdownHook = addShutdownHook {
@ -294,9 +309,13 @@ open class Node(configuration: NodeConfiguration,
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader))
private lateinit var rpcMessagingClient: RPCMessagingClient
private var verifierMessagingClient: VerifierMessagingClient? = null
/** Starts a blocking event loop for message dispatch. */
fun run() {
(network as NodeMessagingClient).run(messageBroker!!.serverControl)
(network as P2PMessagingClient).run()
private var shutdown = false
@ -23,14 +23,13 @@ import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.nio.file.Path
import java.nio.file.Paths
import java.time.LocalDate
import java.util.*
import kotlin.system.exitProcess
/** This class is responsible for starting a Node from command line arguments. */
open class NodeStartup(val args: Array<String>) {
companion object {
private val logger by lazy { loggerFor<Node>() }
private val logger by lazy { loggerFor<Node>() } // I guess this is lazy to allow for logging init, but why Node?
val LOGS_CAN_BE_FOUND_IN_STRING = "Logs can be found in"
@ -118,12 +117,13 @@ open class NodeStartup(val args: Array<String>) {
Node.printBasicNodeInfo("Node for \"$name\" started up and registered in $elapsed sec")
// Don't start the shell if there's no console attached.
val runShell = !cmdlineOptions.noLocalShell && System.console() != null
startedNode.internals.startupComplete.then {
try {
InteractiveShell.startShell(cmdlineOptions.baseDirectory, runShell, cmdlineOptions.sshdServer, startedNode)
} catch (e: Throwable) {
logger.error("Shell failed to start", e)
if (!cmdlineOptions.noLocalShell && System.console() != null && conf.devMode) {
startedNode.internals.startupComplete.then {
try {
} catch (e: Throwable) {
logger.error("Shell failed to start", e)
@ -305,11 +305,6 @@ open class NodeStartup(val args: Array<String>) {
"Computers are useless. They can only\ngive you answers. -- Picasso"
// TODO: Delete this after CordaCon.
val cordaCon2017date = LocalDate.of(2017, 9, 12)
val cordaConBanner = if (LocalDate.now() < cordaCon2017date)
"${Emoji.soon} Register for our Free CordaCon event : see https://goo.gl/Z15S8W" else ""
if (Emoji.hasEmojiTerminal)
messages += "Kind of like a regular database but\nwith emojis, colours and ascii art. ${Emoji.coolGuy}"
val (msg1, msg2) = messages.randomOrNull()!!.split('\n')
@ -323,8 +318,6 @@ open class NodeStartup(val args: Array<String>) {
a("--- ${versionInfo.vendor} ${versionInfo.releaseVersion} (${versionInfo.revision.take(7)}) -----------------------------------------------").
@ -8,14 +8,12 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.NodeState
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.node.services.messaging.RpcAuthContext
import rx.Observable
import java.io.InputStream
import java.security.PublicKey
@ -68,8 +66,6 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo)
override fun nodeStateObservable(): Observable<NodeState> = guard("nodeStateObservable", implementation::nodeStateObservable)
override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities)
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") {
@ -12,7 +12,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.classloading.requireAnnotation
import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
@ -53,8 +53,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
companion object {
private val logger = loggerFor<CordappLoader>()
private val logger = contextLogger()
* Default cordapp dir name
@ -2,18 +2,24 @@ package net.corda.node.internal.cordapp
import com.google.common.collect.HashBiMap
import net.corda.core.contracts.ContractClassName
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.cordapp.Cordapp
import net.corda.core.cordapp.CordappContext
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import java.net.URL
* Cordapp provider and store. For querying CorDapps for their attachment and vice versa.
open class CordappProviderImpl(private val cordappLoader: CordappLoader, attachmentStorage: AttachmentStorage) : SingletonSerializeAsToken(), CordappProviderInternal {
companion object {
private val log = loggerFor<CordappProviderImpl>()
override fun getAppContext(): CordappContext {
// TODO: Use better supported APIs in Java 9
Exception().stackTrace.forEach { stackFrame ->
@ -45,7 +51,7 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader, attachm
private fun loadContractsIntoAttachmentStore(attachmentStorage: AttachmentStorage): Map<SecureHash, URL> {
val cordappsWithAttachments = cordapps.filter { !it.contractClassNames.isEmpty() }.map { it.jarPath }
val attachmentIds = cordappsWithAttachments.map { it.openStream().use { attachmentStorage.importAttachment(it) } }
val attachmentIds = cordappsWithAttachments.map { it.openStream().use { attachmentStorage.importOrGetAttachment(it) }}
return attachmentIds.zip(cordappsWithAttachments).toMap()
@ -16,7 +16,7 @@ import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCacheBase
import net.corda.core.node.services.TransactionStorage
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.services.config.NodeConfiguration
@ -43,7 +43,7 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
interface ServiceHubInternal : ServiceHub {
companion object {
private val log = loggerFor<ServiceHubInternal>()
private val log = contextLogger()
override val vaultService: VaultServiceInternal
@ -5,13 +5,13 @@ import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SignatureScheme
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.utilities.loggerFor
import net.corda.node.utilities.*
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.toProperties
import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralSubtree
import org.bouncycastle.asn1.x509.NameConstraints
import org.slf4j.LoggerFactory
import java.nio.file.Path
import java.security.KeyStore
@ -19,8 +19,7 @@ fun configOf(vararg pairs: Pair<String, Any?>): Config = ConfigFactory.parseMap(
operator fun Config.plus(overrides: Map<String, Any?>): Config = ConfigFactory.parseMap(overrides).withFallback(this)
object ConfigHelper {
private val log = loggerFor<ConfigHelper>()
private val log = LoggerFactory.getLogger(javaClass)
fun loadConfig(baseDirectory: Path,
configFile: Path = baseDirectory / "node.conf",
allowMissingConfig: Boolean = false,
@ -39,6 +39,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
// TODO Move into DevModeOptions
val useTestClock: Boolean get() = false
val detectPublicIp: Boolean get() = true
val sshd: SSHDConfiguration?
val relay: RelayConfiguration?
@ -111,7 +112,9 @@ data class NodeConfigurationImpl(
override val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis()
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null
) : NodeConfiguration {
override val exportJMXto: String get() = "http"
@ -147,6 +150,7 @@ data class CertChainPolicyConfig(val role: String, private val policy: CertChain
data class SSHDConfiguration(val port: Int)
data class RelayConfiguration(val relayHost: String,
val remoteInboundPort: Int,
val username: String,
@ -17,7 +17,7 @@ import net.corda.core.internal.until
import net.corda.core.node.StateLoader
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.node.internal.MutableClock
import net.corda.node.services.api.FlowStarter
@ -65,8 +65,7 @@ class NodeSchedulerService(private val clock: Clock,
: SchedulerService, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<NodeSchedulerService>()
private val log = contextLogger()
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
* used in demos or testing. This will substitute a Fiber compatible Future so the current
@ -8,7 +8,7 @@ import net.corda.core.internal.toX509CertHolder
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import org.bouncycastle.cert.X509CertificateHolder
import java.security.InvalidAlgorithmParameterException
@ -16,7 +16,6 @@ import java.security.PublicKey
import java.security.cert.*
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
import javax.security.auth.x500.X500Principal
* Simple identity service which caches parties and provides functionality for efficient lookup.
@ -33,7 +32,7 @@ class InMemoryIdentityService(identities: Iterable<PartyAndCertificate> = emptyS
trustRoot: X509CertificateHolder) : this(wellKnownIdentities, confidentialIdentities, trustRoot.cert)
companion object {
private val log = loggerFor<InMemoryIdentityService>()
private val log = contextLogger()
@ -10,11 +10,10 @@ import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.contextLogger
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.X509Utilities
import org.bouncycastle.cert.X509CertificateHolder
import java.io.ByteArrayInputStream
import java.security.InvalidAlgorithmParameterException
@ -36,7 +35,7 @@ class PersistentIdentityService(identities: Iterable<PartyAndCertificate> = empt
trustRoot: X509CertificateHolder) : this(wellKnownIdentities, confidentialIdentities, trustRoot.cert)
companion object {
private val log = loggerFor<PersistentIdentityService>()
private val log = contextLogger()
private val certFactory: CertificateFactory = CertificateFactory.getInstance("X.509")
fun createPKMap(): AppendOnlyPersistentMap<SecureHash, PartyAndCertificate, PersistentIdentity, String> {
@ -0,0 +1,58 @@
package net.corda.node.services.messaging
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort) {
companion object {
private val log = loggerFor<ArtemisMessagingClient>()
class Started(val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer)
var started: Started? = null
private set
fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" }
log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = -1
clientFailureCheckPeriod = -1
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
isUseGlobalPools = nodeSerializationEnv != null
val sessionFactory = locator.createSessionFactory()
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
// size of 1MB is acknowledged.
val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
// Create a general purpose producer.
val producer = session.createProducer()
return Started(sessionFactory, session, producer).also { started = it }
fun stop() = synchronized(this) {
started!!.run {
// Ensure any trailing messages are committed to the journal
// Closing the factory closes all the sessions it produced as well.
started = null
@ -12,10 +12,7 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.parsePublicKeyBase58
import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
@ -99,7 +96,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<ArtemisMessagingServer>()
private val log = contextLogger()
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
val MAX_FILE_SIZE = 10485760
@ -235,8 +232,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// TODO remove the NODE_USER role once the webserver doesn't need it
// TODO: remove the NODE_USER role below once the webserver doesn't need it anymore.
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole)
// Each RPC user must have its own role and its own queue. This prevents users accessing each other's queues
// and stealing RPC responses.
for ((username) in userService.users) {
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf(
@ -417,7 +416,7 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
companion object {
private val log = loggerFor<VerifyingNettyConnector>()
private val log = contextLogger()
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
@ -540,8 +539,7 @@ class NodeLoginModule : LoginModule {
const val VERIFIER_ROLE = "SystemRoles/Verifier"
const val CERT_CHAIN_CHECKS_OPTION_NAME = "CertChainChecks"
val log = loggerFor<NodeLoginModule>()
private val log = contextLogger()
private var loginSucceeded: Boolean = false
@ -1,6 +1,5 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.openFuture
@ -133,14 +132,6 @@ interface MessagingService {
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
fun stop()
@ -1,50 +1,32 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.sequence
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import java.security.PublicKey
import java.time.Instant
import java.util.*
@ -77,18 +59,16 @@ import javax.persistence.Lob
* If not provided, will default to [serverAddress].
class NodeMessagingClient(private val config: NodeConfiguration,
private val versionInfo: VersionInfo,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val database: CordaPersistence,
private val metrics: MetricRegistry,
advertisedAddress: NetworkHostAndPort = serverAddress
class P2PMessagingClient(config: NodeConfiguration,
private val versionInfo: VersionInfo,
serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val database: CordaPersistence,
advertisedAddress: NetworkHostAndPort = serverAddress
) : SingletonSerializeAsToken(), MessagingService {
companion object {
private val log = loggerFor<NodeMessagingClient>()
private val log = contextLogger()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
@ -99,8 +79,6 @@ class NodeMessagingClient(private val config: NodeConfiguration,
private val releaseVersionProperty = SimpleString("release-version")
private val platformVersionProperty = SimpleString("platform-version")
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
private val messageMaxRetryCount: Int = 3
fun createProcessedMessage(): AppendOnlyPersistentMap<UUID, Instant, ProcessedMessage, String> {
@ -144,15 +122,8 @@ class NodeMessagingClient(private val config: NodeConfiguration,
private class InnerState {
var started = false
var running = false
var producer: ClientProducer? = null
var p2pConsumer: ClientConsumer? = null
var session: ClientSession? = null
var sessionFactory: ClientSessionFactory? = null
var rpcServer: RPCServer? = null
// Consumer for inbound client RPC messages.
var verificationResponseConsumer: ClientConsumer? = null
private val messagesToRedeliver = database.transaction {
@ -161,11 +132,6 @@ class NodeMessagingClient(private val config: NodeConfiguration,
private val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()
val verifierService = when (config.verifierType) {
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
VerifierType.OutOfProcess -> createOutOfProcessVerifierService()
/** A registration to handle messages of different types */
data class Handler(val topicSession: TopicSession,
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -176,7 +142,8 @@ class NodeMessagingClient(private val config: NodeConfiguration,
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val artemis = ArtemisMessagingClient(config, serverAddress)
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
@ -209,54 +176,11 @@ class NodeMessagingClient(private val config: NodeConfiguration,
var recipients: ByteArray = ByteArray(0)
fun start(rpcOps: RPCOps, userService: RPCUserService) {
fun start() {
state.locked {
check(!started) { "start can't be called twice" }
started = true
log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = -1
clientFailureCheckPeriod = -1
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
isUseGlobalPools = nodeSerializationEnv != null
sessionFactory = locator.createSessionFactory()
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
// size of 1MB is acknowledged.
val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
this.session = session
// Create a general purpose producer.
val producer = session.createProducer()
this.producer = producer
val session = artemis.start().session
// Create a queue, consumer and producer for handling P2P network messages.
p2pConsumer = session.createConsumer(P2P_QUEUE)
val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS)
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal))
fun checkVerifierCount() {
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
if (config.verifierType == VerifierType.OutOfProcess) {
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
@ -309,14 +233,12 @@ class NodeMessagingClient(private val config: NodeConfiguration,
* Starts the p2p event loop: this method only returns once [stop] has been called.
fun run(serverControl: ActiveMQServerControl) {
fun run() {
try {
val consumer = state.locked {
check(started) { "start must be called first" }
check(artemis.started != null) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
// If it's null, it means we already called stop, so return immediately.
p2pConsumer ?: return
@ -401,10 +323,16 @@ class NodeMessagingClient(private val config: NodeConfiguration,
override fun stop() {
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
fun stop() {
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(artemis.started != null)
val prevRunning = running
running = false
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
@ -423,13 +351,7 @@ class NodeMessagingClient(private val config: NodeConfiguration,
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
if (running) {
state.locked {
producer = null
// Ensure any trailing messages are committed to the journal
// Closing the factory closes all the sessions it produced as well.
sessionFactory = null
@ -440,7 +362,8 @@ class NodeMessagingClient(private val config: NodeConfiguration,
messagingExecutor.fetchFrom {
state.locked {
val mqAddress = getMQAddress(target)
val artemisMessage = session!!.createMessage(true).apply {
val artemis = artemis.started!!
val artemisMessage = artemis.session.createMessage(true).apply {
putStringProperty(cordaVendorProperty, cordaVendor)
putStringProperty(releaseVersionProperty, releaseVersion)
putIntProperty(platformVersionProperty, versionInfo.platformVersion)
@ -459,15 +382,14 @@ class NodeMessagingClient(private val config: NodeConfiguration,
"Send to: $mqAddress topic: ${message.topicSession.topic} " +
"sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}"
producer!!.send(mqAddress, artemisMessage)
artemis.producer.send(mqAddress, artemisMessage)
retryId?.let {
database.transaction {
messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) })
scheduledMessageRedeliveries[it] = messagingExecutor.schedule({
sendWithRetry(0, mqAddress, artemisMessage, it)
}, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS)
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
@ -498,12 +420,12 @@ class NodeMessagingClient(private val config: NodeConfiguration,
state.locked {
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
producer!!.send(address, message)
artemis.started!!.producer.send(address, message)
scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({
sendWithRetry(retryCount + 1, address, message, retryId)
}, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS)
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
override fun cancelRedelivery(retryId: Long) {
@ -533,10 +455,11 @@ class NodeMessagingClient(private val config: NodeConfiguration,
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: String) {
state.alreadyLocked {
val queueQuery = session!!.queueQuery(SimpleString(queueName))
val session = artemis.started!!.session
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session!!.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
@ -564,22 +487,6 @@ class NodeMessagingClient(private val config: NodeConfiguration,
return NodeClientMessage(topicSession, data, uuid)
private fun createOutOfProcessVerifierService(): TransactionVerifierService {
return object : OutOfProcessTransactionVerifierService(metrics) {
override fun sendRequest(nonce: Long, transaction: LedgerTransaction) {
messagingExecutor.fetchFrom {
state.locked {
val message = session!!.createMessage(false)
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
// TODO Rethink PartyInfo idea and merging PeerAddress/ServiceAddress (the only difference is that Service address doesn't hold host and port)
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
@ -0,0 +1,29 @@
package net.corda.node.services.messaging
import net.corda.core.identity.CordaX500Name
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.RPCUserService
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort) : SingletonSerializeAsToken() {
private val artemis = ArtemisMessagingClient(config, serverAddress)
private var rpcServer: RPCServer? = null
fun start(rpcOps: RPCOps, userService: RPCUserService) = synchronized(this) {
val locator = artemis.start().sessionFactory.serverLocator
val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS)
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal))
fun start2(serverControl: ActiveMQServerControl) = synchronized(this) {
fun stop() = synchronized(this) {
@ -24,10 +24,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import net.corda.core.utilities.*
import net.corda.node.services.RPCUserService
import net.corda.node.services.logging.pushToLoggingContext
import net.corda.nodeapi.*
@ -42,6 +39,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
import org.apache.activemq.artemis.api.core.management.ManagementHelper
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import rx.Notification
import rx.Observable
@ -90,7 +88,7 @@ class RPCServer(
private val rpcConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
) {
private companion object {
val log = loggerFor<RPCServer>()
private val log = contextLogger()
private enum class State {
@ -442,7 +440,7 @@ class ObservableContext(
val observationSendExecutor: ExecutorService
) {
private companion object {
val log = loggerFor<ObservableContext>()
private val log = contextLogger()
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
@ -465,8 +463,7 @@ class ObservableContext(
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
private object RpcObservableContextKey
private val log = loggerFor<RpcServerObservableSerializer>()
private val log = LoggerFactory.getLogger(javaClass)
fun createContext(observableContext: ObservableContext): SerializationContext {
return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
@ -23,6 +23,7 @@ data class RpcPermissions(private val values: Set<String> = emptySet()) {
companion object {
val NONE = RpcPermissions()
val ALL = RpcPermissions(setOf("ALL"))
fun coverAny(permissions: Set<String>) = !values.intersect(permissions + Permissions.all()).isEmpty()
@ -0,0 +1,73 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.random63BitValue
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.*
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import net.corda.nodeapi.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import java.util.concurrent.*
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry) : SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<VerifierMessagingClient>()
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
private val artemis = ArtemisMessagingClient(config, serverAddress)
/** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
private var verificationResponseConsumer: ClientConsumer? = null
fun start(): Unit = synchronized(this) {
val session = artemis.start().session
fun checkVerifierCount() {
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
// Attempts to create a durable queue on the broker which is bound to an address of the same name.
fun createQueueIfAbsent(queueName: String) {
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
fun start2() = synchronized(this) {
fun stop() = synchronized(this) {
internal val verifierService = OutOfProcessTransactionVerifierService(metrics) { nonce, transaction ->
messagingExecutor.fetchFrom {
sendRequest(nonce, transaction)
private fun sendRequest(nonce: Long, transaction: LedgerTransaction) = synchronized(this) {
val started = artemis.started!!
val message = started.session.createMessage(false)
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
started.producer.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
@ -8,7 +8,7 @@ import net.corda.core.internal.openHttpConnection
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.api.NetworkMapCacheInternal
@ -25,10 +25,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class NetworkMapClient(compatibilityZoneURL: URL) {
companion object {
val logger = loggerFor<NetworkMapClient>()
private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
fun publish(signedNodeInfo: SignedData<NodeInfo>) {
@ -73,7 +69,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
private val networkMapClient: NetworkMapClient?) : Closeable {
companion object {
private val logger = loggerFor<NetworkMapUpdater>()
private val logger = contextLogger()
private val retryInterval = 1.minutes
@ -7,7 +7,7 @@ import net.corda.core.internal.*
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.nodeapi.NodeInfoFilesCopier
import rx.Observable
@ -40,8 +40,7 @@ class NodeInfoWatcher(private val nodePath: Path,
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes.toSet()
companion object {
private val logger = loggerFor<NodeInfoWatcher>()
private val logger = contextLogger()
* Saves the given [NodeInfo] to a path.
* The node is 'encoded' as a SignedData<NodeInfo>, signed with the owning key of its first identity.
@ -19,7 +19,7 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.CordaPersistence
@ -63,7 +63,7 @@ class NetworkMapCacheImpl(
open class PersistentNetworkMapCache(private val database: CordaPersistence) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
companion object {
val logger = loggerFor<PersistentNetworkMapCache>()
private val logger = contextLogger()
// TODO Small explanation, partyNodes and registeredNodes is left in memory as it was before, because it will be removed in
@ -3,7 +3,7 @@ package net.corda.node.services.persistence
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import org.hibernate.type.descriptor.WrapperOptions
import org.hibernate.type.descriptor.java.AbstractTypeDescriptor
import org.hibernate.type.descriptor.java.ImmutableMutabilityPlan
@ -11,7 +11,7 @@ import org.hibernate.type.descriptor.java.MutabilityPlan
class AbstractPartyDescriptor(identitySvc: () -> IdentityService) : AbstractTypeDescriptor<AbstractParty>(AbstractParty::class.java) {
companion object {
private val log = loggerFor<AbstractPartyDescriptor>()
private val log = contextLogger()
private val identityService: IdentityService by lazy(identitySvc)
@ -3,7 +3,7 @@ package net.corda.node.services.persistence
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import javax.persistence.AttributeConverter
import javax.persistence.Converter
@ -14,7 +14,7 @@ import javax.persistence.Converter
@Converter(autoApply = true)
class AbstractPartyToX500NameAsStringConverter(identitySvc: () -> IdentityService) : AttributeConverter<AbstractParty, String> {
companion object {
private val log = loggerFor<AbstractPartyToX500NameAsStringConverter>()
private val log = contextLogger()
private val identityService: IdentityService by lazy(identitySvc)
@ -3,8 +3,7 @@ package net.corda.node.services.persistence
import net.corda.core.internal.castIfPossible
import net.corda.core.node.services.IdentityService
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import net.corda.node.services.api.SchemaService
import net.corda.node.utilities.DatabaseTransactionManager
@ -29,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap
class HibernateConfiguration(val schemaService: SchemaService, private val databaseProperties: Properties, private val createIdentityService: () -> IdentityService) {
companion object {
val logger = loggerFor<HibernateConfiguration>()
private val logger = contextLogger()
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
@ -1,32 +1,31 @@
package net.corda.node.services.persistence
import com.codahale.metrics.MetricRegistry
import net.corda.core.internal.VisibleForTesting
import com.google.common.hash.HashCode
import com.google.common.hash.Hashing
import com.google.common.hash.HashingInputStream
import com.google.common.io.CountingInputStream
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.*
import net.corda.core.internal.AbstractAttachment
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.AbstractAttachment
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.serialization.*
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.currentDBSession
import java.io.*
import java.lang.Exception
import java.nio.file.Paths
import java.time.Instant
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.Column
* Stores attachments using Hibernate to database.
@ -34,6 +33,29 @@ import javax.persistence.Column
class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
// Just iterate over the entries with verification enabled: should be good enough to catch mistakes.
// Note that JarInputStream won't throw any kind of error at all if the file stream is in fact not
// a ZIP! It'll just pretend it's an empty archive, which is kind of stupid but that's how it works.
// So we have to check to ensure we found at least one item.
private fun checkIsAValidJAR(stream: InputStream) {
val jar = JarInputStream(stream, true)
var count = 0
while (true) {
val cursor = jar.nextJarEntry ?: break
val entryPath = Paths.get(cursor.name)
// Security check to stop zips trying to escape their rightful place.
require(!entryPath.isAbsolute) { "Path $entryPath is absolute" }
require(entryPath.normalize() == entryPath) { "Path $entryPath is not normalised" }
require(!('\\' in cursor.name || cursor.name == "." || cursor.name == "..")) { "Bad character in $entryPath" }
require(count > 0) { "Stream is either empty or not a JAR/ZIP" }
@Table(name = "${NODE_DATABASE_PREFIX}attachments",
indexes = arrayOf(Index(name = "att_id_idx", columnList = "att_id")))
@ -56,10 +78,6 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
var filename: String? = null
) : Serializable
companion object {
private val log = loggerFor<NodeAttachmentService>()
var checkAttachmentsOnLoad = true
@ -171,6 +189,24 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
return import(jar, uploader, filename)
fun getAttachmentIdAndBytes(jar: InputStream): Pair<AttachmentId, ByteArray> {
val hs = HashingInputStream(Hashing.sha256(), jar)
val bytes = hs.readBytes()
val id = SecureHash.SHA256(hs.hash().asBytes())
return Pair(id, bytes)
override fun hasAttachment(attachmentId: AttachmentId): Boolean {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
val attachments = criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)
criteriaQuery.where(criteriaBuilder.equal(attachments.get<String>(DBAttachment::attId.name), attachmentId.toString()))
return (session.createQuery(criteriaQuery).singleResult > 0)
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
private fun import(jar: InputStream, uploader: String?, filename: String?): AttachmentId {
require(jar !is JarInputStream)
@ -180,27 +216,28 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
// To do this we must pipe stream into the database without knowing its hash, which we will learn only once
// the insert/upload is complete. We can then query to see if it's a duplicate and if so, erase, and if not
// set the hash field of the new attachment record.
val hs = HashingInputStream(Hashing.sha256(), jar)
val bytes = hs.readBytes()
val id = SecureHash.SHA256(hs.hash().asBytes())
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
val attachments = criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)
criteriaQuery.where(criteriaBuilder.equal(attachments.get<String>(DBAttachment::attId.name), id.toString()))
val count = session.createQuery(criteriaQuery).singleResult
if (count == 0L) {
val (id, bytes) = getAttachmentIdAndBytes(jar)
if (!hasAttachment(id)) {
val session = currentDBSession()
val attachment = NodeAttachmentService.DBAttachment(attId = id.toString(), content = bytes, uploader = uploader, filename = filename)
log.info("Stored new attachment $id")
return id
} else {
throw java.nio.file.FileAlreadyExistsException(id.toString())
return id
override fun importOrGetAttachment(jar: InputStream): AttachmentId {
try {
return importAttachment(jar)
catch (faee: java.nio.file.FileAlreadyExistsException) {
return AttachmentId.parse(faee.message!!)
override fun queryAttachments(criteria: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
@ -227,22 +264,4 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
private fun checkIsAValidJAR(stream: InputStream) {
// Just iterate over the entries with verification enabled: should be good enough to catch mistakes.
// Note that JarInputStream won't throw any kind of error at all if the file stream is in fact not
// a ZIP! It'll just pretend it's an empty archive, which is kind of stupid but that's how it works.
// So we have to check to ensure we found at least one item.
val jar = JarInputStream(stream, true)
var count = 0
while (true) {
val cursor = jar.nextJarEntry ?: break
val entryPath = Paths.get(cursor.name)
// Security check to stop zips trying to escape their rightful place.
require(!entryPath.isAbsolute) { "Path $entryPath is absolute" }
require(entryPath.normalize() == entryPath) { "Path $entryPath is not normalised" }
require(!('\\' in cursor.name || cursor.name == "." || cursor.name == "..")) { "Bad character in $entryPath" }
require(count > 0) { "Stream is either empty or not a JAR/ZIP" }
@ -7,8 +7,8 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.services.Vault
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.node.utilities.DatabaseTransactionManager
import org.hibernate.FlushMode
@ -20,7 +20,7 @@ import rx.Observable
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver private constructor(private val config: HibernateConfiguration) {
companion object {
private val log = loggerFor<HibernateObserver>()
private val log = contextLogger()
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration): HibernateObserver {
val observer = HibernateObserver(config)
@ -14,7 +14,7 @@ import net.corda.node.services.api.SchemaService
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
@ -47,8 +47,8 @@ class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, Singleto
@ -3,7 +3,7 @@ package net.corda.node.services.statemachine
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.loggerFor
import org.slf4j.LoggerFactory
import java.nio.file.Path
import java.util.*
@ -26,8 +26,7 @@ interface FlowStackSnapshotFactory {
fun persistAsJsonFile(flowClass: Class<out FlowLogic<*>>, baseDir: Path, flowId: StateMachineRunId)
private object DefaultFlowStackSnapshotFactory : FlowStackSnapshotFactory {
private val log = loggerFor<DefaultFlowStackSnapshotFactory>()
private val log = LoggerFactory.getLogger(javaClass)
override fun getFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot? {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
return null
@ -27,10 +27,7 @@ import net.corda.core.serialization.SerializationDefaults.SERIALIZATION_FACTORY
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.core.utilities.*
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
@ -67,7 +64,7 @@ class StateMachineManagerImpl(
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
companion object {
private val logger = loggerFor<StateMachineManagerImpl>()
private val logger = contextLogger()
internal val sessionTopic = TopicSession("platform.session")
init {
@ -16,7 +16,6 @@ import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.FilteredTransaction
@ -41,7 +40,7 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal,
cluster: BFTSMaRt.Cluster) : NotaryService() {
companion object {
val id = constructId(validating = false, bft = true)
private val log = loggerFor<BFTNonValidatingNotaryService>()
private val log = contextLogger()
private val client: BFTSMaRt.Client
@ -29,8 +29,8 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.BFTSMaRt.Client
import net.corda.node.services.transactions.BFTSMaRt.Replica
@ -77,7 +77,7 @@ object BFTSMaRt {
class Client(config: BFTSMaRtConfig, private val clientId: Int, private val cluster: Cluster, private val notaryService: BFTNonValidatingNotaryService) : SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<Client>()
private val log = contextLogger()
/** A proxy for communicating with the BFT cluster */
@ -181,7 +181,7 @@ object BFTSMaRt {
protected val notaryIdentityKey: PublicKey,
private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() {
companion object {
private val log = loggerFor<Replica>()
private val log = contextLogger()
private val stateManagerOverride = run {
@ -2,8 +2,8 @@ package net.corda.node.services.transactions
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import java.io.FileWriter
import java.io.PrintWriter
import java.net.InetAddress
@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, debug: Boolean, val exposeRaces: Boolean) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
companion object {
private val log = loggerFor<BFTSMaRtConfig>()
private val log = contextLogger()
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
@ -4,7 +4,7 @@ import io.atomix.copycat.Command
import io.atomix.copycat.Query
import io.atomix.copycat.server.Commit
import io.atomix.copycat.server.StateMachine
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.utilities.*
import java.util.LinkedHashMap
@ -17,7 +17,7 @@ import java.util.LinkedHashMap
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, Pair<Long, V>, E, EK>) : StateMachine() {
companion object {
private val log = loggerFor<DistributedImmutableMap<*, *, *, *>>()
private val log = contextLogger()
object Commands {
@ -11,16 +11,17 @@ import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.VerifierApi
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import java.util.concurrent.ConcurrentHashMap
abstract class OutOfProcessTransactionVerifierService(
private val metrics: MetricRegistry
class OutOfProcessTransactionVerifierService(
private val metrics: MetricRegistry,
private val sendRequest: (Long, LedgerTransaction) -> Unit
) : SingletonSerializeAsToken(), TransactionVerifierService {
companion object {
val log = loggerFor<OutOfProcessTransactionVerifierService>()
private val log = contextLogger()
private data class VerificationHandle(
@ -60,8 +61,6 @@ abstract class OutOfProcessTransactionVerifierService(
abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction)
override fun verify(transaction: LedgerTransaction): CordaFuture<*> {
log.info("Verifying ${transaction.id}")
val future = openFuture<Unit>()
@ -9,11 +9,8 @@ import net.corda.core.internal.ThreadBox
import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import java.io.Serializable
@ -62,8 +59,7 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
private val mutex = ThreadBox(InnerState())
companion object {
private val log = loggerFor<PersistentUniquenessProvider>()
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistentNotaryCommit, PersistentStateRef> =
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
@ -26,7 +26,7 @@ import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.RaftConfig
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.CordaPersistence
@ -49,8 +49,7 @@ import javax.persistence.*
class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<RaftUniquenessProvider>()
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<String, Pair<Long, Any>, RaftState, String> =
toPersistentEntityKey = { it },
@ -76,10 +75,10 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
var key: String = "",
@Column(name = "state_value")
var value: ByteArray = ByteArray(0),
@Column(name = "state_index")
var index: Long = 0
@ -11,7 +11,7 @@ import net.corda.core.node.services.vault.QueryCriteria.CommonQueryCriteria
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.node.services.persistence.NodeAttachmentService
import org.hibernate.query.criteria.internal.expression.LiteralExpression
@ -107,7 +107,7 @@ class HibernateAttachmentQueryCriteriaParser(override val criteriaBuilder: Crite
AbstractQueryCriteriaParser<AttachmentQueryCriteria, AttachmentsQueryCriteriaParser, AttachmentSort>(), AttachmentsQueryCriteriaParser {
private companion object {
val log = loggerFor<HibernateAttachmentQueryCriteriaParser>()
private val log = contextLogger()
init {
@ -170,7 +170,7 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
val criteriaQuery: CriteriaQuery<Tuple>,
val vaultStates: Root<VaultSchemaV1.VaultStates>) : AbstractQueryCriteriaParser<QueryCriteria, IQueryCriteriaParser, Sort>(), IQueryCriteriaParser {
private companion object {
val log = loggerFor<HibernateQueryCriteriaParser>()
private val log = contextLogger()
// incrementally build list of join predicates
@ -57,7 +57,7 @@ class NodeVaultService(
hibernateConfig: HibernateConfiguration
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
val log = loggerFor<NodeVaultService>()
private val log = contextLogger()
private class InnerState {
@ -29,7 +29,8 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
mappedTypes = listOf(VaultStates::class.java, VaultLinearStates::class.java, VaultFungibleStates::class.java, VaultTxnNote::class.java)) {
@Table(name = "vault_states",
indexes = arrayOf(Index(name = "state_status_idx", columnList = "state_status")))
indexes = arrayOf(Index(name = "state_status_idx", columnList = "state_status"),
Index(name = "lock_id_idx", columnList = "lock_id, state_status")))
class VaultStates(
/** NOTE: serialized transaction state (including contract state) is now resolved from transaction store */
// TODO: create a distinct table to hold serialized state data (once DBTransactionStore is encrypted)
@ -4,17 +4,14 @@ import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.VaultService
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.trace
import net.corda.core.utilities.*
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import java.util.*
class VaultSoftLockManager private constructor(private val vault: VaultService) {
companion object {
private val log = loggerFor<VaultSoftLockManager>()
private val log = contextLogger()
fun install(vault: VaultService, smm: StateMachineManager) {
val manager = VaultSoftLockManager(vault)
@ -0,0 +1,35 @@
package net.corda.node.shell
import net.corda.core.context.Actor
import net.corda.core.context.InvocationContext
import net.corda.core.identity.CordaX500Name
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.services.RPCUserService
import net.corda.node.services.messaging.RpcPermissions
import org.crsh.auth.AuthInfo
import org.crsh.auth.AuthenticationPlugin
import org.crsh.plugin.CRaSHPlugin
class CordaAuthenticationPlugin(val rpcOps:CordaRPCOps, val userService:RPCUserService, val nodeLegalName:CordaX500Name) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
override fun getImplementation(): AuthenticationPlugin<String> = this
override fun getName(): String = "corda"
override fun authenticate(username: String?, credential: String?): AuthInfo {
if (username == null || credential == null) {
return AuthInfo.UNSUCCESSFUL
val user = userService.getUser(username)
if (user != null && user.password == credential) {
val actor = Actor(Actor.Id(username), userService.id, nodeLegalName)
return CordaSSHAuthInfo(true, RPCOpsWithContext(rpcOps, InvocationContext.rpc(actor), RpcPermissions(user.permissions)))
return AuthInfo.UNSUCCESSFUL;
override fun getCredentialType(): Class<String> = String::class.java
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user