Merge remote-tracking branch 'open/master' into mike-merge-f6ee263db10

This commit is contained in:
Mike Hearn 2018-09-03 20:13:38 +02:00
commit 97aef9c8a1
50 changed files with 572 additions and 222 deletions

View File

@ -2532,7 +2532,6 @@ public interface net.corda.core.messaging.CordaRPCOps extends net.corda.core.mes
public abstract void clearNetworkMapCache()
@NotNull
public abstract java.time.Instant currentNodeTime()
public abstract int getProtocolVersion()
@NotNull
public abstract Iterable<String> getVaultTransactionNotes(net.corda.core.crypto.SecureHash)
@RPCReturnsObservables

View File

@ -62,7 +62,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest {
@Before
public void setUp() throws Exception {
super.setUp();
node = startNode(ALICE_NAME, 1, singletonList(rpcUser));
node = startNode(ALICE_NAME, 1000, singletonList(rpcUser));
client = new CordaRPCClient(requireNonNull(node.getNode().getConfiguration().getRpcOptions().getAddress()));
}

View File

@ -47,7 +47,7 @@ class RPCStabilityTests {
}
object DummyOps : RPCOps {
override val protocolVersion = 0
override val protocolVersion = 1000
}
private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Map<Thread, List<StackTraceElement>> {
@ -107,7 +107,7 @@ class RPCStabilityTests {
Try.on {
startRpcClient<RPCOps>(
server.get().broker.hostAndPort!!,
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1)
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1000)
).get()
}
}
@ -203,7 +203,7 @@ class RPCStabilityTests {
rpcDriver {
val leakObservableOpsImpl = object : LeakObservableOps {
val leakedUnsubscribedCount = AtomicInteger(0)
override val protocolVersion = 0
override val protocolVersion = 1000
override fun leakObservable(): Observable<Nothing> {
return PublishSubject.create<Nothing>().doOnUnsubscribe {
leakedUnsubscribedCount.incrementAndGet()
@ -234,7 +234,7 @@ class RPCStabilityTests {
fun `client reconnects to rebooted server`() {
rpcDriver {
val ops = object : ReconnectOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun ping() = "pong"
}
@ -259,7 +259,7 @@ class RPCStabilityTests {
fun `connection failover fails, rpc calls throw`() {
rpcDriver {
val ops = object : ReconnectOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun ping() = "pong"
}
@ -290,7 +290,7 @@ class RPCStabilityTests {
fun `observables error when connection breaks`() {
rpcDriver {
val ops = object : NoOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun subscribe(): Observable<Nothing> {
return PublishSubject.create<Nothing>()
}
@ -350,7 +350,7 @@ class RPCStabilityTests {
fun `client connects to first available server`() {
rpcDriver {
val ops = object : ServerOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun serverId() = "server"
}
val serverFollower = shutdownManager.follower()
@ -371,15 +371,15 @@ class RPCStabilityTests {
fun `3 server failover`() {
rpcDriver {
val ops1 = object : ServerOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun serverId() = "server1"
}
val ops2 = object : ServerOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun serverId() = "server2"
}
val ops3 = object : ServerOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun serverId() = "server3"
}
val serverFollower1 = shutdownManager.follower()
@ -443,7 +443,7 @@ class RPCStabilityTests {
fun `server cleans up queues after disconnected clients`() {
rpcDriver {
val trackSubscriberOpsImpl = object : TrackSubscriberOps {
override val protocolVersion = 0
override val protocolVersion = 1000
val subscriberCount = AtomicInteger(0)
val trackSubscriberCountObservable = UnicastSubject.create<Unit>().share().
doOnSubscribe { subscriberCount.incrementAndGet() }.
@ -486,7 +486,7 @@ class RPCStabilityTests {
}
class SlowConsumerRPCOpsImpl : SlowConsumerRPCOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun streamAtInterval(interval: Duration, size: Int): Observable<ByteArray> {
val chunk = ByteArray(size)
@ -587,7 +587,7 @@ class RPCStabilityTests {
val request = RPCApi.ClientToServer.fromClientMessage(it)
when (request) {
is RPCApi.ClientToServer.RpcRequest -> {
val reply = RPCApi.ServerToClient.RpcReply(request.replyId, Try.Success(0), "server")
val reply = RPCApi.ServerToClient.RpcReply(request.replyId, Try.Success(1000), "server")
val message = session.createMessage(false)
reply.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message)
message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, dedupeId.getAndIncrement())

View File

@ -29,65 +29,76 @@ class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPC
open class CordaRPCClientConfiguration @JvmOverloads constructor(
/**
* Maximum retry interval.
* The maximum retry interval for re-connections. The client will retry connections if the host is lost with
* ever increasing spacing until the max is reached. The default is 3 minutes.
*/
open val connectionMaxRetryInterval: Duration = 3.minutes,
/**
* The minimum protocol version required from the server.
* The minimum protocol version required from the server. This is equivalent to the node's platform version
* number. If this minimum version is not met, an exception will be thrown at startup. If you use features
* introduced in a later version, you can bump this to match the platform version you need and get an early
* check that runs before you do anything.
*
* If you leave it at the default then things will work but attempting to use an RPC added in a version later
* than the server supports will throw [UnsupportedOperationException].
*
* The default value is whatever version of Corda this RPC library was shipped as a part of. Therefore if you
* use the RPC library from Corda 4, it will by default only connect to a node of version 4 or above.
*/
open val minimumServerProtocolVersion: Int = 0,
open val minimumServerProtocolVersion: Int = 4,
/**
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
* constructing call stacks is a moderately expensive operation.
* If set to true the client will track RPC call sites (default is false). If an error occurs subsequently
* during the RPC or in a returned Observable stream the stack trace of the originating RPC will be shown as
* well. Note that constructing call stacks is a moderately expensive operation.
*/
open val trackRpcCallSites: Boolean = false,
open val trackRpcCallSites: Boolean = java.lang.Boolean.getBoolean("net.corda.client.rpc.trackRpcCallSites"),
/**
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
* duration. If set too low it wastes client side cycles.
* duration. If set too low it wastes client side cycles. The default is to check once per second.
*/
open val reapInterval: Duration = 1.seconds,
/**
* The number of threads to use for observations (for executing [Observable.onNext]).
* The number of threads to use for observations for executing [Observable.onNext]. This only has any effect
* if [observableExecutor] is null (which is the default). The default is 4.
*/
open val observationExecutorPoolSize: Int = 4,
/**
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
* See the implementation of [com.google.common.cache.LocalCache] for details.
* This property is no longer used and has no effect.
* @suppress
*/
@Deprecated("This field is no longer used and has no effect.")
open val cacheConcurrencyLevel: Int = 1,
/**
* The retry interval of Artemis connections in milliseconds.
* The base retry interval for reconnection attempts. The default is 5 seconds.
*/
open val connectionRetryInterval: Duration = 5.seconds,
/**
* The retry interval multiplier for exponential backoff.
* The retry interval multiplier for exponential backoff. The default is 1.5
*/
open val connectionRetryIntervalMultiplier: Double = 1.5,
/**
* Maximum reconnect attempts on failover>
* Maximum reconnect attempts on failover or disconnection. The default is -1 which means unlimited.
*/
open val maxReconnectAttempts: Int = unlimitedReconnectAttempts,
/**
* Maximum file size, in bytes.
* Maximum size of RPC responses, in bytes. Default is 10mb.
*/
open val maxFileSize: Int = 10485760,
// 10 MiB maximum allowed file size for attachments, including message headers.
// TODO: acquire this value from Network Map when supported.
/**
* The cache expiry of a deduplication watermark per client.
* The cache expiry of a deduplication watermark per client. Default is 1 day.
*/
open val deduplicationCacheExpiry: Duration = 1.days
@ -97,6 +108,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
private const val unlimitedReconnectAttempts = -1
/** Provides an instance of this class with the parameters set to our recommended defaults. */
@JvmField
val DEFAULT: CordaRPCClientConfiguration = CordaRPCClientConfiguration()
@ -104,7 +116,10 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
/**
* Create a new copy of a configuration object with zero or more parameters modified.
*
* @suppress
*/
@Suppress("DEPRECATION")
@JvmOverloads
fun copy(
connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval,
@ -169,6 +184,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
return result
}
@Suppress("DEPRECATION")
override fun toString(): String {
return "CordaRPCClientConfiguration(" +
"connectionMaxRetryInterval=$connectionMaxRetryInterval, " +
@ -180,7 +196,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
"deduplicationCacheExpiry=$deduplicationCacheExpiry)"
}
// Left is for backwards compatibility with version 3.1
// Left in for backwards compatibility with version 3.1
@Deprecated("Binary compatibility stub")
operator fun component1() = connectionMaxRetryInterval
}

View File

@ -97,12 +97,18 @@ class RPCClientProxyHandler(
// To check whether toString() is being invoked
val toStringMethod: Method = Object::toString.javaMethod!!
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: Throwable) {
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: CallSite) {
var currentThrowable = throwable
while (true) {
val cause = currentThrowable.cause
if (cause == null) {
currentThrowable.initCause(callSite)
try {
currentThrowable.initCause(callSite)
} catch (e: IllegalStateException) {
// OK, we did our best, but the first throwable with a null cause was instantiated using
// Throwable(Throwable) or Throwable(String, Throwable) which means initCause can't ever
// be called even if it was passed null.
}
break
} else {
currentThrowable = cause
@ -146,15 +152,17 @@ class RPCClientProxyHandler(
private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
val observableId = key!!
val rpcCallSite = callSiteMap?.remove(observableId)
val rpcCallSite: CallSite? = callSiteMap?.remove(observableId)
if (cause == RemovalCause.COLLECTED) {
log.warn(listOf(
"A hot observable returned from an RPC was never subscribed to.",
"This wastes server-side resources because it was queueing observations for retrieval.",
"It is being closed now, but please adjust your code to call .notUsed() on the observable",
"to close it explicitly. (Java users: subscribe to it then unsubscribe). This warning",
"will appear less frequently in future versions of the platform and you can ignore it",
"if you want to.").joinToString(" "), rpcCallSite)
"to close it explicitly. (Java users: subscribe to it then unsubscribe). If you aren't sure",
"where the leak is coming from, set -Dnet.corda.client.rpc.trackRpcCallSites=true on the JVM",
"command line and you will get a stack trace with this warning."
).joinToString(" "), rpcCallSite)
rpcCallSite?.printStackTrace()
}
observablesToReap.locked { observables.add(observableId) }
}
@ -215,6 +223,9 @@ class RPCClientProxyHandler(
startSessions()
}
/** A throwable that doesn't represent a real error - it's just here to wrap a stack trace. */
class CallSite(val rpcName: String) : Throwable("<Call site of root RPC '$rpcName'>")
// This is the general function that transforms a client side RPC to internal Artemis messages.
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
lifeCycle.requireState { it == State.STARTED || it == State.SERVER_VERSION_NOT_SET }
@ -230,7 +241,7 @@ class RPCClientProxyHandler(
throw RPCException("RPC server is not available.")
val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
callSiteMap?.set(replyId, CallSite(method.name))
try {
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
val request = RPCApi.ClientToServer.RpcRequest(
@ -273,7 +284,7 @@ class RPCClientProxyHandler(
// The handler for Artemis messages.
private fun artemisMessageHandler(message: ClientMessage) {
fun completeExceptionally(id: InvocationId, e: Throwable, future: SettableFuture<Any?>?) {
val rpcCallSite: Throwable? = callSiteMap?.get(id)
val rpcCallSite: CallSite? = callSiteMap?.get(id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(e, rpcCallSite)
future?.setException(e.cause ?: e)
}
@ -555,13 +566,14 @@ class RPCClientProxyHandler(
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>
private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<Any?>>
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?>
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, RPCClientProxyHandler.CallSite?>
/**
* Holds a context available during de-serialisation of messages that are expected to contain Observables.
*
* @param observableMap holds the Observables that are ultimately exposed to the user.
* @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.
* @property observableMap holds the Observables that are ultimately exposed to the user.
* @property hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.
* @property callSiteMap keeps stack traces captured when an RPC was invoked, useful for debugging when an observable leaks.
*/
data class ObservableContext(
val callSiteMap: CallSiteMap?,

View File

@ -2,8 +2,10 @@ package net.corda.client.rpc.internal.serialization.amqp
import net.corda.client.rpc.internal.ObservableContext
import net.corda.client.rpc.internal.RPCClientProxyHandler
import net.corda.core.context.Trace
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.RPCApi
import net.corda.serialization.internal.amqp.*
import org.apache.qpid.proton.codec.Data
@ -17,11 +19,12 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.transaction.NotSupportedException
/**
* De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized,
* what is actually sent is a reference to the observable that can then be subscribed to.
* De-serializer for Rx [Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side class [RpcServerObservableSerializer] can only serialize them. Observables
* are only notionally serialized, what is actually sent is a reference to the observable that can then be subscribed to.
*/
object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
private val log = loggerFor<RpcClientObservableDeSerializer>()
private object RpcObservableContextKey
fun createContext(
@ -96,22 +99,23 @@ object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<
}
val rpcCallSite = getRpcCallSite(context, observableContext)
observableContext.observableMap.put(observableId, observable)
observableContext.callSiteMap?.put(observableId, rpcCallSite)
log.trace("Deserialising observable $observableId", rpcCallSite)
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
// don't need to store a reference to the Observables themselves.
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
// The unsubscribe is due to [ObservableToFuture]'s use of first().
// The unsubscribe is due to ObservableToFuture's use of first().
observableContext.observableMap.invalidate(observableId)
}.dematerialize<Any>()
}
private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): Throwable? {
private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): RPCClientProxyHandler.CallSite? {
val rpcRequestOrObservableId = context.properties[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId
// Will only return non-null if the trackRpcCallSites option in the RPC configuration has been specified.
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
}

View File

@ -48,7 +48,7 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
fun makeComplicatedListenableFuture(): CordaFuture<Pair<String, CordaFuture<String>>>
@RPCSinceVersion(2)
@RPCSinceVersion(2000)
fun addedLater()
fun captureUser(): String
@ -58,7 +58,7 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
private lateinit var complicatedListenableFuturee: CordaFuture<Pair<String, CordaFuture<String>>>
inner class TestOpsImpl : TestOps {
override val protocolVersion = 1
override val protocolVersion = 1000
// do not remove Unit
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
override fun void() {}

View File

@ -33,7 +33,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
@CordaSerializable
data class ObservableRose<out A>(val value: A, val branches: Observable<out ObservableRose<A>>)
private interface TestOps : RPCOps {
interface TestOps : RPCOps {
fun newLatch(numberOfDowns: Int): Long
fun waitLatch(id: Long)
fun downLatch(id: Long)
@ -43,7 +43,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
class TestOpsImpl(private val pool: Executor) : TestOps {
private val latches = ConcurrentHashMap<Long, CountDownLatch>()
override val protocolVersion = 0
override val protocolVersion = 1000
override fun newLatch(numberOfDowns: Int): Long {
val id = random63BitValue()

View File

@ -26,7 +26,7 @@ class RPCFailureTests {
}
class OpsImpl : Ops {
override val protocolVersion = 1
override val protocolVersion = 1000
override fun getUnserializable() = Unserializable()
override fun getUnserializableAsync(): CordaFuture<Unserializable> {
return openFuture<Unserializable>().apply { capture { getUnserializable() } }

View File

@ -24,7 +24,7 @@ class RPCHighThroughputObservableTests : AbstractRPCTest() {
}
internal class TestOpsImpl : TestOps {
override val protocolVersion = 1
override val protocolVersion = 1000
override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 }
}

View File

@ -6,8 +6,8 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.internal.performance.div
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.node.internal.performance.startPublishingFixedRateInjector
import net.corda.testing.node.internal.performance.startReporter
import net.corda.testing.node.internal.performance.startTightLoopInjector
@ -35,7 +35,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
}
class TestOpsImpl : TestOps {
override val protocolVersion = 0
override val protocolVersion = 1000
override fun simpleReply(input: ByteArray, sizeOfReply: Int): ByteArray {
return ByteArray(sizeOfReply)
}

View File

@ -25,7 +25,7 @@ class RPCPermissionsTests : AbstractRPCTest() {
}
class TestOpsImpl : TestOps {
override val protocolVersion = 1
override val protocolVersion = 1000
override fun validatePermission(method: String, target: String?) {
val authorized = if (target == null) {
rpcContext().isPermitted(method)

View File

@ -1,5 +1,7 @@
gradlePluginsVersion=4.0.29
kotlinVersion=1.2.51
# When adjusting platformVersion upwards please also modify CordaRPCClientConfiguration.minimumServerProtocolVersion \
# if there have been any RPC changes. Also please modify InternalMockNetwork.kt:MOCK_VERSION_INFO and NodeBasedTest.startNode
platformVersion=4
guavaVersion=25.1-jre
proguardVersion=6.0.3

View File

@ -24,22 +24,22 @@ import java.security.cert.X509Certificate
// also note that IDs are numbered from 1 upwards, matching numbering of other enum types in ASN.1 specifications.
// TODO: Link to the specification once it has a permanent URL
enum class CertRole(val validParents: NonEmptySet<CertRole?>, val isIdentity: Boolean, val isWellKnown: Boolean) : ASN1Encodable {
/** Intermediate CA (Doorman service). */
INTERMEDIATE_CA(NonEmptySet.of(null), false, false),
/** Signing certificate for the Doorman CA. */
DOORMAN_CA(NonEmptySet.of(null), false, false),
/** Signing certificate for the network map. */
NETWORK_MAP(NonEmptySet.of(null), false, false),
/** Well known (publicly visible) identity of a service (such as notary). */
SERVICE_IDENTITY(NonEmptySet.of(INTERMEDIATE_CA), true, true),
SERVICE_IDENTITY(NonEmptySet.of(DOORMAN_CA), true, true),
/** Node level CA from which the TLS and well known identity certificates are issued. */
NODE_CA(NonEmptySet.of(INTERMEDIATE_CA), false, false),
NODE_CA(NonEmptySet.of(DOORMAN_CA), false, false),
/** Transport layer security certificate for a node. */
TLS(NonEmptySet.of(NODE_CA), false, false),
/** Well known (publicly visible) identity of a legal entity. */
// TODO: at the moment, Legal Identity certs are issued by Node CA only. However, [INTERMEDIATE_CA] is also added
// TODO: at the moment, Legal Identity certs are issued by Node CA only. However, [DOORMAN_CA] is also added
// as a valid parent of [LEGAL_IDENTITY] for backwards compatibility purposes (eg. if we decide TLS has its
// own Root CA and Intermediate CA directly issues Legal Identities; thus, there won't be a requirement for
// Node CA). Consider removing [INTERMEDIATE_CA] from [validParents] when the model is finalised.
LEGAL_IDENTITY(NonEmptySet.of(INTERMEDIATE_CA, NODE_CA), true, true),
// own Root CA and Doorman CA directly issues Legal Identities; thus, there won't be a requirement for
// Node CA). Consider removing [DOORMAN_CA] from [validParents] when the model is finalised.
LEGAL_IDENTITY(NonEmptySet.of(DOORMAN_CA, NODE_CA), true, true),
/** Confidential (limited visibility) identity of a legal entity. */
CONFIDENTIAL_LEGAL_IDENTITY(NonEmptySet.of(LEGAL_IDENTITY), true, false);

View File

@ -7,16 +7,24 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.OpaqueBytes
import java.security.cert.CertPath
import java.security.cert.X509Certificate
// TODO: Rename this to DigitalSignature.WithCert once we're happy for it to be public API. The methods will need documentation
// and the correct exceptions will be need to be annotated
/** A digital signature with attached certificate of the public key. */
class DigitalSignatureWithCert(val by: X509Certificate, bytes: ByteArray) : DigitalSignature(bytes) {
open class DigitalSignatureWithCert(val by: X509Certificate, bytes: ByteArray) : DigitalSignature(bytes) {
fun verify(content: ByteArray): Boolean = by.publicKey.verify(content, this)
fun verify(content: OpaqueBytes): Boolean = verify(content.bytes)
}
/**
* A digital signature with attached certificate path. The first certificate in the path corresponds to the data signer key.
* @param path certificate path associated with this signature
* @param bytes signature bytes
*/
class DigitalSignatureWithCertPath(val path: List<X509Certificate>, bytes: ByteArray): DigitalSignatureWithCert(path.first(), bytes)
/** Similar to [SignedData] but instead of just attaching the public key, the certificate for the key is attached instead. */
@CordaSerializable
class SignedDataWithCert<T : Any>(val raw: SerializedBytes<T>, val sig: DigitalSignatureWithCert) {

View File

@ -96,12 +96,6 @@ data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRun
/** RPC operations that the node exposes to clients. */
interface CordaRPCOps : RPCOps {
/**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
* to be present.
*/
override val protocolVersion: Int get() = nodeInfo().platformVersion
/** Returns a list of currently in-progress state machine infos. */
fun stateMachinesSnapshot(): List<StateMachineInfo>

View File

@ -3,12 +3,15 @@ package net.corda.core.transactions
import net.corda.core.CordaException
import net.corda.core.KeepForDJVM
import net.corda.core.contracts.*
import net.corda.core.contracts.ComponentGroupEnum.*
import net.corda.core.crypto.*
import net.corda.core.identity.Party
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.*
import net.corda.core.utilities.OpaqueBytes
import java.security.PublicKey
import java.util.function.Predicate
import kotlin.reflect.KClass
/**
* Implemented by [WireTransaction] and [FilteredTransaction]. A TraversableTransaction allows you to iterate
@ -18,29 +21,29 @@ import java.util.function.Predicate
*/
abstract class TraversableTransaction(open val componentGroups: List<ComponentGroup>) : CoreTransaction() {
/** Hashes of the ZIP/JAR files that are needed to interpret the contents of this wire transaction. */
val attachments: List<SecureHash> = deserialiseComponentGroup(ComponentGroupEnum.ATTACHMENTS_GROUP, { SerializedBytes<SecureHash>(it).deserialize() })
val attachments: List<SecureHash> = deserialiseComponentGroup(SecureHash::class, ATTACHMENTS_GROUP)
/** Pointers to the input states on the ledger, identified by (tx identity hash, output index). */
override val inputs: List<StateRef> = deserialiseComponentGroup(ComponentGroupEnum.INPUTS_GROUP, { SerializedBytes<StateRef>(it).deserialize() })
override val inputs: List<StateRef> = deserialiseComponentGroup(StateRef::class, INPUTS_GROUP)
/** Pointers to reference states, identified by (tx identity hash, output index). */
override val references: List<StateRef> = deserialiseComponentGroup(ComponentGroupEnum.REFERENCES_GROUP, { SerializedBytes<StateRef>(it).deserialize() })
override val references: List<StateRef> = deserialiseComponentGroup(StateRef::class, REFERENCES_GROUP)
override val outputs: List<TransactionState<ContractState>> = deserialiseComponentGroup(ComponentGroupEnum.OUTPUTS_GROUP, { SerializedBytes<TransactionState<ContractState>>(it).deserialize(context = SerializationFactory.defaultFactory.defaultContext.withAttachmentsClassLoader(attachments)) })
override val outputs: List<TransactionState<ContractState>> = deserialiseComponentGroup(TransactionState::class, OUTPUTS_GROUP, attachmentsContext = true)
/** Ordered list of ([CommandData], [PublicKey]) pairs that instruct the contracts what to do. */
val commands: List<Command<*>> = deserialiseCommands()
override val notary: Party? = let {
val notaries: List<Party> = deserialiseComponentGroup(ComponentGroupEnum.NOTARY_GROUP, { SerializedBytes<Party>(it).deserialize() })
val notaries: List<Party> = deserialiseComponentGroup(Party::class, NOTARY_GROUP)
check(notaries.size <= 1) { "Invalid Transaction. More than 1 notary party detected." }
if (notaries.isNotEmpty()) notaries[0] else null
notaries.firstOrNull()
}
val timeWindow: TimeWindow? = let {
val timeWindows: List<TimeWindow> = deserialiseComponentGroup(ComponentGroupEnum.TIMEWINDOW_GROUP, { SerializedBytes<TimeWindow>(it).deserialize() })
val timeWindows: List<TimeWindow> = deserialiseComponentGroup(TimeWindow::class, TIMEWINDOW_GROUP)
check(timeWindows.size <= 1) { "Invalid Transaction. More than 1 time-window detected." }
if (timeWindows.isNotEmpty()) timeWindows[0] else null
timeWindows.firstOrNull()
}
/**
@ -63,12 +66,16 @@ abstract class TraversableTransaction(open val componentGroups: List<ComponentGr
}
// Helper function to return a meaningful exception if deserialisation of a component fails.
private fun <T> deserialiseComponentGroup(groupEnum: ComponentGroupEnum, deserialiseBody: (ByteArray) -> T): List<T> {
private fun <T : Any> deserialiseComponentGroup(clazz: KClass<T>,
groupEnum: ComponentGroupEnum,
attachmentsContext: Boolean = false): List<T> {
val factory = SerializationFactory.defaultFactory
val context = factory.defaultContext.let { if (attachmentsContext) it.withAttachmentsClassLoader(attachments) else it }
val group = componentGroups.firstOrNull { it.groupIndex == groupEnum.ordinal }
return if (group != null && group.components.isNotEmpty()) {
group.components.mapIndexed { internalIndex, component ->
try {
deserialiseBody(component.bytes)
factory.deserialize(component, clazz.java, context)
} catch (e: MissingAttachmentsException) {
throw e
} catch (e: Exception) {
@ -87,11 +94,13 @@ abstract class TraversableTransaction(open val componentGroups: List<ComponentGr
// TODO: we could avoid deserialising unrelated signers.
// However, current approach ensures the transaction is not malformed
// and it will throw if any of the signers objects is not List of public keys).
val signersList = deserialiseComponentGroup(ComponentGroupEnum.SIGNERS_GROUP, { SerializedBytes<List<PublicKey>>(it).deserialize() })
val commandDataList = deserialiseComponentGroup(ComponentGroupEnum.COMMANDS_GROUP, { SerializedBytes<CommandData>(it).deserialize(context = SerializationFactory.defaultFactory.defaultContext.withAttachmentsClassLoader(attachments)) })
val group = componentGroups.firstOrNull { it.groupIndex == ComponentGroupEnum.COMMANDS_GROUP.ordinal }
val signersList: List<List<PublicKey>> = uncheckedCast(deserialiseComponentGroup(List::class, SIGNERS_GROUP))
val commandDataList: List<CommandData> = deserialiseComponentGroup(CommandData::class, COMMANDS_GROUP, attachmentsContext = true)
val group = componentGroups.firstOrNull { it.groupIndex == COMMANDS_GROUP.ordinal }
return if (group is FilteredComponentGroup) {
check(commandDataList.size <= signersList.size) { "Invalid Transaction. Less Signers (${signersList.size}) than CommandData (${commandDataList.size}) objects" }
check(commandDataList.size <= signersList.size) {
"Invalid Transaction. Less Signers (${signersList.size}) than CommandData (${commandDataList.size}) objects"
}
val componentHashes = group.components.mapIndexed { index, component -> componentHash(group.nonces[index], component) }
val leafIndices = componentHashes.map { group.partialMerkleTree.leafIndex(it) }
if (leafIndices.isNotEmpty())
@ -100,7 +109,9 @@ abstract class TraversableTransaction(open val componentGroups: List<ComponentGr
} else {
// It is a WireTransaction
// or a FilteredTransaction with no Commands (in which case group is null).
check(commandDataList.size == signersList.size) { "Invalid Transaction. Sizes of CommandData (${commandDataList.size}) and Signers (${signersList.size}) do not match" }
check(commandDataList.size == signersList.size) {
"Invalid Transaction. Sizes of CommandData (${commandDataList.size}) and Signers (${signersList.size}) do not match"
}
commandDataList.mapIndexed { index, commandData -> Command(commandData, signersList[index]) }
}
}
@ -145,47 +156,47 @@ class FilteredTransaction internal constructor(
var signersIncluded = false
fun <T : Any> filter(t: T, componentGroupIndex: Int, internalIndex: Int) {
if (filtering.test(t)) {
val group = filteredSerialisedComponents[componentGroupIndex]
// Because the filter passed, we know there is a match. We also use first Vs single as the init function
// of WireTransaction ensures there are no duplicated groups.
val serialisedComponent = wtx.componentGroups.first { it.groupIndex == componentGroupIndex }.components[internalIndex]
if (group == null) {
// As all of the helper Map structures, like availableComponentNonces, availableComponentHashes
// and groupsMerkleRoots, are computed lazily via componentGroups.forEach, there should always be
// a match on Map.get ensuring it will never return null.
filteredSerialisedComponents[componentGroupIndex] = mutableListOf(serialisedComponent)
filteredComponentNonces[componentGroupIndex] = mutableListOf(wtx.availableComponentNonces[componentGroupIndex]!![internalIndex])
filteredComponentHashes[componentGroupIndex] = mutableListOf(wtx.availableComponentHashes[componentGroupIndex]!![internalIndex])
} else {
group.add(serialisedComponent)
// If the group[componentGroupIndex] existed, then we guarantee that
// filteredComponentNonces[componentGroupIndex] and filteredComponentHashes[componentGroupIndex] are not null.
filteredComponentNonces[componentGroupIndex]!!.add(wtx.availableComponentNonces[componentGroupIndex]!![internalIndex])
filteredComponentHashes[componentGroupIndex]!!.add(wtx.availableComponentHashes[componentGroupIndex]!![internalIndex])
}
// If at least one command is visible, then all command-signers should be visible as well.
// This is required for visibility purposes, see FilteredTransaction.checkAllCommandsVisible() for more details.
if (componentGroupIndex == ComponentGroupEnum.COMMANDS_GROUP.ordinal && !signersIncluded) {
signersIncluded = true
val signersGroupIndex = ComponentGroupEnum.SIGNERS_GROUP.ordinal
// There exist commands, thus the signers group is not empty.
val signersGroupComponents = wtx.componentGroups.first { it.groupIndex == signersGroupIndex }
filteredSerialisedComponents[signersGroupIndex] = signersGroupComponents.components.toMutableList()
filteredComponentNonces[signersGroupIndex] = wtx.availableComponentNonces[signersGroupIndex]!!.toMutableList()
filteredComponentHashes[signersGroupIndex] = wtx.availableComponentHashes[signersGroupIndex]!!.toMutableList()
}
if (!filtering.test(t)) return
val group = filteredSerialisedComponents[componentGroupIndex]
// Because the filter passed, we know there is a match. We also use first Vs single as the init function
// of WireTransaction ensures there are no duplicated groups.
val serialisedComponent = wtx.componentGroups.first { it.groupIndex == componentGroupIndex }.components[internalIndex]
if (group == null) {
// As all of the helper Map structures, like availableComponentNonces, availableComponentHashes
// and groupsMerkleRoots, are computed lazily via componentGroups.forEach, there should always be
// a match on Map.get ensuring it will never return null.
filteredSerialisedComponents[componentGroupIndex] = mutableListOf(serialisedComponent)
filteredComponentNonces[componentGroupIndex] = mutableListOf(wtx.availableComponentNonces[componentGroupIndex]!![internalIndex])
filteredComponentHashes[componentGroupIndex] = mutableListOf(wtx.availableComponentHashes[componentGroupIndex]!![internalIndex])
} else {
group.add(serialisedComponent)
// If the group[componentGroupIndex] existed, then we guarantee that
// filteredComponentNonces[componentGroupIndex] and filteredComponentHashes[componentGroupIndex] are not null.
filteredComponentNonces[componentGroupIndex]!!.add(wtx.availableComponentNonces[componentGroupIndex]!![internalIndex])
filteredComponentHashes[componentGroupIndex]!!.add(wtx.availableComponentHashes[componentGroupIndex]!![internalIndex])
}
// If at least one command is visible, then all command-signers should be visible as well.
// This is required for visibility purposes, see FilteredTransaction.checkAllCommandsVisible() for more details.
if (componentGroupIndex == COMMANDS_GROUP.ordinal && !signersIncluded) {
signersIncluded = true
val signersGroupIndex = SIGNERS_GROUP.ordinal
// There exist commands, thus the signers group is not empty.
val signersGroupComponents = wtx.componentGroups.first { it.groupIndex == signersGroupIndex }
filteredSerialisedComponents[signersGroupIndex] = signersGroupComponents.components.toMutableList()
filteredComponentNonces[signersGroupIndex] = wtx.availableComponentNonces[signersGroupIndex]!!.toMutableList()
filteredComponentHashes[signersGroupIndex] = wtx.availableComponentHashes[signersGroupIndex]!!.toMutableList()
}
}
fun updateFilteredComponents() {
wtx.inputs.forEachIndexed { internalIndex, it -> filter(it, ComponentGroupEnum.INPUTS_GROUP.ordinal, internalIndex) }
wtx.outputs.forEachIndexed { internalIndex, it -> filter(it, ComponentGroupEnum.OUTPUTS_GROUP.ordinal, internalIndex) }
wtx.commands.forEachIndexed { internalIndex, it -> filter(it, ComponentGroupEnum.COMMANDS_GROUP.ordinal, internalIndex) }
wtx.attachments.forEachIndexed { internalIndex, it -> filter(it, ComponentGroupEnum.ATTACHMENTS_GROUP.ordinal, internalIndex) }
if (wtx.notary != null) filter(wtx.notary, ComponentGroupEnum.NOTARY_GROUP.ordinal, 0)
if (wtx.timeWindow != null) filter(wtx.timeWindow, ComponentGroupEnum.TIMEWINDOW_GROUP.ordinal, 0)
wtx.references.forEachIndexed { internalIndex, it -> filter(it, ComponentGroupEnum.REFERENCES_GROUP.ordinal, internalIndex) }
wtx.inputs.forEachIndexed { internalIndex, it -> filter(it, INPUTS_GROUP.ordinal, internalIndex) }
wtx.outputs.forEachIndexed { internalIndex, it -> filter(it, OUTPUTS_GROUP.ordinal, internalIndex) }
wtx.commands.forEachIndexed { internalIndex, it -> filter(it, COMMANDS_GROUP.ordinal, internalIndex) }
wtx.attachments.forEachIndexed { internalIndex, it -> filter(it, ATTACHMENTS_GROUP.ordinal, internalIndex) }
if (wtx.notary != null) filter(wtx.notary, NOTARY_GROUP.ordinal, 0)
if (wtx.timeWindow != null) filter(wtx.timeWindow, TIMEWINDOW_GROUP.ordinal, 0)
wtx.references.forEachIndexed { internalIndex, it -> filter(it, REFERENCES_GROUP.ordinal, internalIndex) }
// It is highlighted that because there is no a signers property in TraversableTransaction,
// one cannot specifically filter them in or out.
// The above is very important to ensure someone won't filter out the signers component group if at least one
@ -195,10 +206,17 @@ class FilteredTransaction internal constructor(
// we decide to filter and attach this field to a FilteredTransaction.
// An example would be to redact certain contract state types, but otherwise leave a transaction alone,
// including the unknown new components.
wtx.componentGroups.filter { it.groupIndex >= ComponentGroupEnum.values().size }.forEach { componentGroup -> componentGroup.components.forEachIndexed { internalIndex, component -> filter(component, componentGroup.groupIndex, internalIndex) } }
wtx.componentGroups
.filter { it.groupIndex >= values().size }
.forEach { componentGroup -> componentGroup.components.forEachIndexed { internalIndex, component -> filter(component, componentGroup.groupIndex, internalIndex) } }
}
fun createPartialMerkleTree(componentGroupIndex: Int) = PartialMerkleTree.build(MerkleTree.getMerkleTree(wtx.availableComponentHashes[componentGroupIndex]!!), filteredComponentHashes[componentGroupIndex]!!)
fun createPartialMerkleTree(componentGroupIndex: Int): PartialMerkleTree {
return PartialMerkleTree.build(
MerkleTree.getMerkleTree(wtx.availableComponentHashes[componentGroupIndex]!!),
filteredComponentHashes[componentGroupIndex]!!
)
}
fun createFilteredComponentGroups(): List<FilteredComponentGroup> {
updateFilteredComponents()
@ -223,8 +241,11 @@ class FilteredTransaction internal constructor(
@Throws(FilteredTransactionVerificationException::class)
fun verify() {
verificationCheck(groupHashes.isNotEmpty()) { "At least one component group hash is required" }
// Verify the top level Merkle tree (group hashes are its leaves, including allOnesHash for empty list or null components in WireTransaction).
verificationCheck(MerkleTree.getMerkleTree(groupHashes).hash == id) { "Top level Merkle tree cannot be verified against transaction's id" }
// Verify the top level Merkle tree (group hashes are its leaves, including allOnesHash for empty list or null
// components in WireTransaction).
verificationCheck(MerkleTree.getMerkleTree(groupHashes).hash == id) {
"Top level Merkle tree cannot be verified against transaction's id"
}
// For completely blind verification (no components are included).
if (filteredComponentGroups.isEmpty()) return
@ -233,8 +254,12 @@ class FilteredTransaction internal constructor(
filteredComponentGroups.forEach { (groupIndex, components, nonces, groupPartialTree) ->
verificationCheck(groupIndex < groupHashes.size) { "There is no matching component group hash for group $groupIndex" }
val groupMerkleRoot = groupHashes[groupIndex]
verificationCheck(groupMerkleRoot == PartialMerkleTree.rootAndUsedHashes(groupPartialTree.root, mutableListOf())) { "Partial Merkle tree root and advertised full Merkle tree root for component group $groupIndex do not match" }
verificationCheck(groupPartialTree.verify(groupMerkleRoot, components.mapIndexed { index, component -> componentHash(nonces[index], component) })) { "Visible components in group $groupIndex cannot be verified against their partial Merkle tree" }
verificationCheck(groupMerkleRoot == PartialMerkleTree.rootAndUsedHashes(groupPartialTree.root, mutableListOf())) {
"Partial Merkle tree root and advertised full Merkle tree root for component group $groupIndex do not match"
}
verificationCheck(groupPartialTree.verify(groupMerkleRoot, components.mapIndexed { index, component -> componentHash(nonces[index], component) })) {
"Visible components in group $groupIndex cannot be verified against their partial Merkle tree"
}
}
}
@ -281,7 +306,9 @@ class FilteredTransaction internal constructor(
val groupFullRoot = MerkleTree.getMerkleTree(group.components.mapIndexed { index, component -> componentHash(group.nonces[index], component) }).hash
visibilityCheck(groupPartialRoot == groupFullRoot) { "Some components for group ${group.groupIndex} are not visible" }
// Verify the top level Merkle tree from groupHashes.
visibilityCheck(MerkleTree.getMerkleTree(groupHashes).hash == id) { "Transaction is malformed. Top level Merkle tree cannot be verified against transaction's id" }
visibilityCheck(MerkleTree.getMerkleTree(groupHashes).hash == id) {
"Transaction is malformed. Top level Merkle tree cannot be verified against transaction's id"
}
}
}
@ -296,15 +323,17 @@ class FilteredTransaction internal constructor(
*/
@Throws(ComponentVisibilityException::class)
fun checkCommandVisibility(publicKey: PublicKey) {
val commandSigners = componentGroups.firstOrNull { it.groupIndex == ComponentGroupEnum.SIGNERS_GROUP.ordinal }
val commandSigners = componentGroups.firstOrNull { it.groupIndex == SIGNERS_GROUP.ordinal }
val expectedNumOfCommands = expectedNumOfCommands(publicKey, commandSigners)
val receivedForThisKeyNumOfCommands = commands.filter { publicKey in it.signers }.size
visibilityCheck(expectedNumOfCommands == receivedForThisKeyNumOfCommands) { "$expectedNumOfCommands commands were expected, but received $receivedForThisKeyNumOfCommands" }
visibilityCheck(expectedNumOfCommands == receivedForThisKeyNumOfCommands) {
"$expectedNumOfCommands commands were expected, but received $receivedForThisKeyNumOfCommands"
}
}
// Function to return number of expected commands to sign.
private fun expectedNumOfCommands(publicKey: PublicKey, commandSigners: ComponentGroup?): Int {
checkAllComponentsVisible(ComponentGroupEnum.SIGNERS_GROUP)
checkAllComponentsVisible(SIGNERS_GROUP)
if (commandSigners == null) return 0
fun signersKeys (internalIndex: Int, opaqueBytes: OpaqueBytes): List<PublicKey> {
try {
@ -340,7 +369,10 @@ class FilteredTransaction internal constructor(
*/
@KeepForDJVM
@CordaSerializable
data class FilteredComponentGroup(override val groupIndex: Int, override val components: List<OpaqueBytes>, val nonces: List<SecureHash>, val partialMerkleTree: PartialMerkleTree) : ComponentGroup(groupIndex, components) {
data class FilteredComponentGroup(override val groupIndex: Int,
override val components: List<OpaqueBytes>,
val nonces: List<SecureHash>,
val partialMerkleTree: PartialMerkleTree) : ComponentGroup(groupIndex, components) {
init {
check(components.size == nonces.size) { "Size of transaction components and nonces do not match" }
}

View File

@ -8,7 +8,7 @@ import kotlin.test.assertFailsWith
class CertRoleTests {
@Test
fun `should deserialize valid value`() {
val expected = CertRole.INTERMEDIATE_CA
val expected = CertRole.DOORMAN_CA
val actual = CertRole.getInstance(ASN1Integer(1L))
assertEquals(expected, actual)
}

View File

@ -6,7 +6,11 @@ release, see :doc:`upgrade-notes`.
Unreleased
----------
* Removed experimental feature `CordformDefinition`
* The RPC client library now checks at startup whether the server is of the client libraries major version or higher.
Therefore to connect to a Corda 4 node you must use version 4 or lower of the library. This behaviour can be overridden
by specifying a lower number in the ``CordaRPCClientConfiguration`` class.
* Removed experimental feature ``CordformDefinition``
* Vault query fix: support query by parent classes of Contract State classes (see https://github.com/corda/corda/issues/3714)

View File

@ -18,8 +18,8 @@ object as normal, and the marshalling back and forth is handled for you.
.. warning:: The built-in Corda webserver is deprecated and unsuitable for production use. If you want to interact with
your node via HTTP, you will need to stand up your own webserver, then create an RPC connection between your node
and this webserver using the `CordaRPCClient`_ library. You can find an example of how to do this
`here <https://github.com/corda/spring-webserver>`_.
and this webserver using the `CordaRPCClient`_ library. You can find an example of how to do this using the popular
Spring Boot server `here <https://github.com/corda/spring-webserver>`_.
Connecting to a node via RPC
----------------------------
@ -291,31 +291,43 @@ would expect.
This feature comes with a cost: the server must queue up objects emitted by the server-side observable until you
download them. Note that the server side observation buffer is bounded, once it fills up the client is considered
slow and kicked. You are expected to subscribe to all the observables returned, otherwise client-side memory starts
filling up as observations come in. If you don't want an observable then subscribe then unsubscribe immediately to
clear the client-side buffers and to stop the server from streaming. If your app quits then server side resources
will be freed automatically.
slow and will be disconnected. You are expected to subscribe to all the observables returned, otherwise client-side
memory starts filling up as observations come in. If you don't want an observable then subscribe then unsubscribe
immediately to clear the client-side buffers and to stop the server from streaming. For Kotlin users there is a
convenience extension method called ``notUsed()`` which can be called on an observable to automate this step.
If your app quits then server side resources will be freed automatically.
.. warning:: If you leak an observable on the client side and it gets garbage collected, you will get a warning
printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection
is non-deterministic.
is non-deterministic. If you set ``-Dnet.corda.client.rpc.trackRpcCallSites=true`` on the JVM command line then
this warning comes with a stack trace showing where the RPC that returned the forgotten observable was called from.
This feature is off by default because tracking RPC call sites is moderately slow.
.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass
Observables as parameters to the RPC methods.
Observables as parameters to the RPC methods. In other words the streaming is always server to client and not
the other way around.
Futures
-------
A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources.
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release
any resources.
Versioning
----------
The client RPC protocol is versioned using the node's Platform Version (see :doc:`versioning`). When a proxy is created
The client RPC protocol is versioned using the node's platform version number (see :doc:`versioning`). When a proxy is created
the server is queried for its version, and you can specify your minimum requirement. Methods added in later versions
are tagged with the ``@RPCSinceVersion`` annotation. If you try to use a method that the server isn't advertising support
of, an ``UnsupportedOperationException`` is thrown. If you want to know the version of the server, just use the
``protocolVersion`` property (i.e. ``getProtocolVersion`` in Java).
The RPC client library defaults to requiring the platform version it was built with. That means if you use the client
library released as part of Corda N, then the node it connects to must be of version N or above. This is checked when
the client first connects. If you want to override this behaviour, you can alter the ``minimumServerProtocolVersion``
field in the ``CordaRPCClientConfiguration`` object passed to the client. Alternatively, just link your app against
an older version of the library.
Thread safety
-------------
A proxy is thread safe, blocking, and allows multiple RPCs to be in flight at once. Any observables that are returned and
@ -343,7 +355,6 @@ such situations:
.. sourcecode:: Kotlin
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
val retryInterval = 5.seconds
do {
@ -387,7 +398,6 @@ on the ``Observable`` returned by ``CordaRPCOps``.
.. sourcecode:: Kotlin
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
val proxy = connection.proxy
@ -419,10 +429,6 @@ Client code if fed with instances of ``StateMachineInfo`` using call ``clientCod
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
It is down to client code in this case handle those duplicate items as appropriate.
Wire protocol
-------------
The client RPC wire protocol is defined and documented in ``net/corda/client/rpc/RPCApi.kt``.
Wire security
-------------
``CordaRPCClient`` has an optional constructor parameter of type ``ClientRpcSslOptions``, defaulted to ``null``, which allows
@ -435,7 +441,6 @@ In order for this to work, the client needs to provide a truststore containing a
For the communication to be secure, we recommend using the standard SSL best practices for key management.
Whitelisting classes with the Corda node
----------------------------------------
CorDapps must whitelist any classes used over RPC with Corda's serialization framework, unless they are whitelisted by

View File

@ -16,10 +16,11 @@ handling, and ensures the Corda service is run at boot.
* Oracle Java 8. The supported versions are listed in :doc:`getting-set-up`
1. Add a system user which will be used to run Corda:
1. As root/sys admin user - add a system user which will be used to run Corda:
``sudo adduser --system --no-create-home --group corda``
2. Create a directory called ``/opt/corda`` and change its ownership to the user you want to use to run Corda:
``mkdir /opt/corda; chown corda:corda /opt/corda``

View File

@ -259,7 +259,7 @@ The protocol is:
* If $URL = ``https://some.server.com/some/path``
* Node submits a PKCS#10 certificate signing request using HTTP POST to ``$URL/certificate``. It will have a MIME
type of ``application/octet-stream``. The ``Client-Version`` header is set to be "1.0".
type of ``application/octet-stream``. The ``Platform-Version`` header is set to be "1.0" and the ``Client-Version`` header to reflect the node software version.
* The server returns an opaque string that references this request (let's call it ``$requestid``, or an HTTP error if something went wrong.
* The returned request ID should be persisted to disk, to handle zones where approval may take a long time due to manual
intervention being required.

View File

@ -56,6 +56,7 @@ class CordaRpcWorkerOps(
const val RPC_WORKER_QUEUE_ADDRESS_PREFIX = "${ArtemisMessagingComponent.INTERNAL_PREFIX}rpc.worker."
}
override val protocolVersion: Int = 1000
private val flowWorkerQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${services.myInfo.legalIdentities[0].owningKey.toStringShort()}"
private val rpcWorkerQueueAddress = "$RPC_WORKER_QUEUE_ADDRESS_PREFIX${services.myInfo.legalIdentities[0].owningKey.toStringShort()}"

View File

@ -88,7 +88,7 @@ class RpcWorkerServiceHub(override val configuration: NodeConfiguration, overrid
override val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database)
@Suppress("LeakingThis")
override val validatedTransactions: WritableTransactionStorage = DBTransactionStorage(configuration.transactionCacheSizeBytes, database)
private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
private val metricRegistry = MetricRegistry()
override val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)

View File

@ -402,7 +402,7 @@ enum class CertificateType(val keyUsage: KeyUsage, vararg val purposes: KeyPurpo
KeyPurposeId.id_kp_clientAuth,
KeyPurposeId.anyExtendedKeyUsage,
isCA = true,
role = CertRole.INTERMEDIATE_CA
role = CertRole.DOORMAN_CA
),
NETWORK_MAP(

View File

@ -3,12 +3,11 @@ package net.corda.nodeapi.internal.network
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.CertRole
import net.corda.core.internal.DigitalSignatureWithCert
import net.corda.core.internal.DigitalSignatureWithCertPath
import net.corda.core.internal.SignedDataWithCert
import net.corda.core.internal.signWithCert
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.nodeapi.internal.crypto.X509Utilities
import java.security.cert.X509Certificate
import java.time.Instant
@ -57,20 +56,13 @@ data class ParametersUpdate(
val updateDeadline: Instant
)
/** Verify that a Network Map certificate is issued by Root CA and its [CertRole] is correct. */
// TODO: Current implementation works under the assumption that there are no intermediate CAs between Root and
// Network Map. Consider a more flexible implementation without the above assumption.
/** Verify that a Network Map certificate path and its [CertRole] is correct. */
fun <T : Any> SignedDataWithCert<T>.verifiedNetworkMapCert(rootCert: X509Certificate): T {
require(CertRole.extract(sig.by) == CertRole.NETWORK_MAP) { "Incorrect cert role: ${CertRole.extract(sig.by)}" }
X509Utilities.validateCertificateChain(rootCert, sig.by, rootCert)
val path = when (this.sig) {
is DigitalSignatureWithCertPath -> (sig as DigitalSignatureWithCertPath).path
else -> listOf(sig.by, rootCert)
}
X509Utilities.validateCertificateChain(rootCert, path)
return verified()
}
class NetworkMapAndSigned private constructor(val networkMap: NetworkMap, val signed: SignedNetworkMap) {
constructor(networkMap: NetworkMap, signer: (SerializedBytes<NetworkMap>) -> DigitalSignatureWithCert) : this(networkMap, networkMap.signWithCert(signer))
constructor(signed: SignedNetworkMap) : this(signed.verified(), signed)
operator fun component1(): NetworkMap = networkMap
operator fun component2(): SignedNetworkMap = signed
}

View File

@ -36,7 +36,6 @@ import kotlin.test.assertFailsWith
*/
@RunWith(Parameterized::class)
class AuthDBTests : NodeBasedTest() {
private lateinit var node: NodeWithInfo
private lateinit var client: CordaRPCClient
private lateinit var db: UsersDB

View File

@ -140,7 +140,7 @@ class ArtemisRpcTests {
class TestRpcOpsImpl : TestRpcOps {
override fun greet(name: String): String = "Oh, hello $name!"
override val protocolVersion: Int = 1
override val protocolVersion: Int = 1000
}
private fun tempFile(name: String): Path = tempFolder.root.toPath() / name

View File

@ -150,7 +150,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val checkpointStorage = DBCheckpointStorage()
@Suppress("LeakingThis")
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
val metricRegistry = MetricRegistry()
val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize()
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize()

View File

@ -50,6 +50,12 @@ internal class CordaRPCOpsImpl(
private val flowStarter: FlowStarter,
private val shutdownNode: () -> Unit
) : CordaRPCOps {
/**
* Returns the RPC protocol version, which is the same the node's platform Version. Exists since version 1 so guaranteed
* to be present.
*/
override val protocolVersion: Int get() = nodeInfo().platformVersion
override fun networkMapSnapshot(): List<NodeInfo> {
val (snapshot, updates) = networkMapFeed()
updates.notUsed()

View File

@ -151,8 +151,8 @@ open class NodeStartup(val args: Array<String>) {
val cause = this.cause
return when {
cause != null && !visited.contains(cause) -> Objects.hash(this::class.java.name, stackTrace, cause.staticLocationBasedHash(visited + cause))
else -> Objects.hash(this::class.java.name, stackTrace)
cause != null && !visited.contains(cause) -> Objects.hash(this::class.java.name, stackTrace.customHashCode(), cause.staticLocationBasedHash(visited + cause))
else -> Objects.hash(this::class.java.name, stackTrace.customHashCode())
}
}
@ -181,6 +181,19 @@ open class NodeStartup(val args: Array<String>) {
}
}
private fun Array<StackTraceElement?>?.customHashCode(): Int {
if (this == null) {
return 0
}
return Arrays.hashCode(map { it?.customHashCode() ?: 0 }.toIntArray())
}
private fun StackTraceElement.customHashCode(): Int {
return Objects.hash(StackTraceElement::class.java.name, methodName, lineNumber)
}
private fun configFileNotFoundMessage(configFile: Path): String {
return """
Unable to load the node config file from '$configFile'.
@ -610,3 +623,5 @@ open class NodeStartup(val args: Array<String>) {
}
}
}

View File

@ -16,6 +16,8 @@ internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : Cor
/**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
* to be present.
*
* TODO: Why is this logic duplicated vs the actual implementation?
*/
override val protocolVersion: Int get() = delegate.nodeInfo().platformVersion
@ -31,7 +33,6 @@ internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : Cor
private companion object {
private fun proxy(delegate: CordaRPCOps, context: () -> RpcAuthContext): CordaRPCOps {
val handler = PermissionsEnforcingInvocationHandler(delegate, context)
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
}

View File

@ -2,6 +2,7 @@ package net.corda.node.serialization.amqp
import net.corda.core.context.Trace
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor
import net.corda.node.services.messaging.ObservableContextInterface
import net.corda.node.services.messaging.ObservableSubscription
@ -30,8 +31,9 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
fun createContext(
serializationContext: SerializationContext,
observableContext: ObservableContextInterface
) = serializationContext.withProperty(
RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
) = serializationContext.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
val log = contextLogger()
}
override val schemaForDocumentation = Schema(
@ -136,5 +138,6 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
}
}
observableContext.observableMap.put(observableId, observableWithSubscription)
log.trace("Serialized observable $observableId of type $obj")
}
}

View File

@ -11,6 +11,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.utilities.registration.cacheControl
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NetworkMap
@ -23,7 +24,7 @@ import java.security.cert.X509Certificate
import java.time.Duration
import java.util.*
class NetworkMapClient(compatibilityZoneURL: URL) {
class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: VersionInfo) {
companion object {
private val logger = contextLogger()
}
@ -38,14 +39,18 @@ class NetworkMapClient(compatibilityZoneURL: URL) {
fun publish(signedNodeInfo: SignedNodeInfo) {
val publishURL = URL("$networkMapUrl/publish")
logger.trace { "Publishing NodeInfo to $publishURL." }
publishURL.post(signedNodeInfo.serialize())
publishURL.post(signedNodeInfo.serialize(),
"Platform-Version" to "${versionInfo.platformVersion}",
"Client-Version" to versionInfo.releaseVersion)
logger.trace { "Published NodeInfo to $publishURL successfully." }
}
fun ackNetworkParametersUpdate(signedParametersHash: SignedData<SecureHash>) {
val ackURL = URL("$networkMapUrl/ack-parameters")
logger.trace { "Sending network parameters with hash ${signedParametersHash.raw.deserialize()} approval to $ackURL." }
ackURL.post(signedParametersHash.serialize())
ackURL.post(signedParametersHash.serialize(),
"Platform-Version" to "${versionInfo.platformVersion}",
"Client-Version" to versionInfo.releaseVersion)
logger.trace { "Sent network parameters approval to $ackURL successfully." }
}

View File

@ -52,7 +52,9 @@ class HTTPNetworkRegistrationService(compatibilityZoneURL: URL, val versionInfo:
}
override fun submitRequest(request: PKCS10CertificationRequest): String {
return String(registrationURL.post(OpaqueBytes(request.encoded), "Client-Version" to "${versionInfo.platformVersion}"))
return String(registrationURL.post(OpaqueBytes(request.encoded),
"Platform-Version" to "${versionInfo.platformVersion}",
"Client-Version" to versionInfo.releaseVersion))
}
}

View File

@ -5,6 +5,7 @@ import net.corda.core.crypto.sha256
import net.corda.core.internal.sign
import net.corda.core.serialization.serialize
import net.corda.core.utilities.seconds
import net.corda.node.VersionInfo
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
@ -40,7 +41,8 @@ class NetworkMapClientTest {
fun setUp() {
server = NetworkMapServer(cacheTimeout)
val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address")).apply { start(DEV_ROOT_CA.certificate) }
networkMapClient = NetworkMapClient(URL("http://$address"),
VersionInfo(1, "TEST", "TEST", "TEST")).apply { start(DEV_ROOT_CA.certificate) }
}
@After

View File

@ -13,6 +13,7 @@ import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.millis
import net.corda.node.VersionInfo
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.NodeInfoAndSigned
@ -63,7 +64,8 @@ class NetworkMapUpdaterTest {
fun setUp() {
server = NetworkMapServer(cacheExpiryMs.millis)
val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address")).apply { start(DEV_ROOT_CA.certificate) }
networkMapClient = NetworkMapClient(URL("http://$address"),
VersionInfo(1, "TEST", "TEST", "TEST")).apply { start(DEV_ROOT_CA.certificate) }
}
@After

View File

@ -9,6 +9,7 @@ import net.corda.core.internal.readObject
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.days
import net.corda.core.utilities.seconds
import net.corda.node.VersionInfo
import net.corda.node.internal.NetworkParametersReader
import net.corda.nodeapi.internal.network.*
import net.corda.testing.common.internal.testNetworkParameters
@ -41,7 +42,7 @@ class NetworkParametersReaderTest {
fun setUp() {
server = NetworkMapServer(cacheTimeout)
val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address"))
networkMapClient = NetworkMapClient(URL("http://$address"), VersionInfo(1, "TEST", "TEST", "TEST"))
networkMapClient.start(DEV_ROOT_CA.certificate)
}

View File

@ -34,7 +34,7 @@ import net.corda.testing.contracts.DummyState
import net.corda.testing.core.*
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.vault.VaultFiller
import net.corda.testing.internal.vault.*
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
@ -48,13 +48,15 @@ import java.math.BigDecimal
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import javax.persistence.*
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class NodeVaultServiceTest {
private companion object {
val cordappPackages = listOf("net.corda.finance.contracts.asset", CashSchemaV1::class.packageName, "net.corda.testing.contracts")
val cordappPackages = listOf("net.corda.finance.contracts.asset", CashSchemaV1::class.packageName, "net.corda.testing.contracts",
"net.corda.testing.internal.vault")
val dummyCashIssuer = TestIdentity(CordaX500Name("Snake Oil Issuer", "London", "GB"), 10)
val DUMMY_CASH_ISSUER = dummyCashIssuer.ref(1)
val bankOfCorda = TestIdentity(BOC_NAME)
@ -769,4 +771,65 @@ class NodeVaultServiceTest {
// We should never see 2 or 7.
}
@Test
fun `Unique column constraint failing causes linear state to not persist to vault`() {
fun createTx(): SignedTransaction {
return services.signInitialTransaction(TransactionBuilder(DUMMY_NOTARY).apply {
addOutputState(UniqueDummyLinearContract.State(listOf(megaCorp.party), "Dummy linear id"), UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID)
addCommand(DummyCommandData, listOf(megaCorp.publicKey))
})
}
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx()))
assertThatExceptionOfType(PersistenceException::class.java).isThrownBy {
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx()))
}
assertEquals(1, database.transaction {
vaultService.queryBy<UniqueDummyLinearContract.State>().states.size
})
}
@Test
fun `Unique column constraint failing causes fungible state to not persist to vault`() {
fun createTx(): SignedTransaction {
return services.signInitialTransaction(TransactionBuilder(DUMMY_NOTARY).apply {
addOutputState(UniqueDummyFungibleContract.State(10.DOLLARS `issued by` DUMMY_CASH_ISSUER, megaCorp.party), UNIQUE_DUMMY_FUNGIBLE_CONTRACT_PROGRAM_ID)
addCommand(DummyCommandData, listOf(megaCorp.publicKey))
})
}
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx()))
assertThatExceptionOfType(PersistenceException::class.java).isThrownBy {
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx()))
}
assertEquals(1, database.transaction {
vaultService.queryBy<UniqueDummyFungibleContract.State>().states.size
})
assertEquals(10.DOLLARS.quantity, database.transaction {
vaultService.queryBy<UniqueDummyFungibleContract.State>().states.first().state.data.amount.quantity
})
}
@Test
fun `Unique column constraint failing causes all states in transaction to fail`() {
fun createTx(): SignedTransaction {
return services.signInitialTransaction(TransactionBuilder(DUMMY_NOTARY).apply {
addOutputState(UniqueDummyLinearContract.State(listOf(megaCorp.party), "Dummy linear id"), UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID)
addOutputState(DummyDealContract.State(listOf(megaCorp.party), "Dummy linear id"), DUMMY_DEAL_PROGRAM_ID)
addCommand(DummyCommandData, listOf(megaCorp.publicKey))
})
}
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx()))
assertThatExceptionOfType(PersistenceException::class.java).isThrownBy {
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx()))
}
assertEquals(1, database.transaction {
vaultService.queryBy<UniqueDummyLinearContract.State>().states.size
})
assertEquals(1, database.transaction {
vaultService.queryBy<DummyDealContract.State>().states.size
})
}
}

View File

@ -0,0 +1,88 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
import net.corda.core.transactions.TransactionBuilder
import net.corda.testing.core.DummyCommandData
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.vault.DUMMY_DEAL_PROGRAM_ID
import net.corda.testing.internal.vault.DummyDealContract
import net.corda.testing.internal.vault.UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID
import net.corda.testing.internal.vault.UniqueDummyLinearContract
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkNotarySpec
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.StartedMockNode
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals
class VaultFlowTest {
private lateinit var mockNetwork: MockNetwork
private lateinit var partyA: StartedMockNode
private lateinit var partyB: StartedMockNode
private lateinit var notaryNode: MockNetworkNotarySpec
@Before
fun setup() {
notaryNode = MockNetworkNotarySpec(CordaX500Name("Notary", "London", "GB"))
mockNetwork = MockNetwork(
listOf(
"net.corda.node.services.vault", "net.corda.testing.internal.vault"
),
notarySpecs = listOf(notaryNode),
threadPerNode = true,
networkSendManuallyPumped = false
)
partyA = mockNetwork.createNode(MockNodeParameters(legalName = CordaX500Name("PartyA", "Berlin", "DE")))
partyB = mockNetwork.createNode(MockNodeParameters(legalName = CordaX500Name("PartyB", "Berlin", "DE")))
mockNetwork.startNodes()
}
@After
fun tearDown() {
mockNetwork.stopNodes()
}
@Test
fun `Unique column constraint failing causes states to not persist to vaults`() {
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy {
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
}
assertEquals(1, partyA.transaction {
partyA.services.vaultService.queryBy<UniqueDummyLinearContract.State>().states.size
})
assertEquals(1, partyB.transaction {
partyB.services.vaultService.queryBy<UniqueDummyLinearContract.State>().states.size
})
assertEquals(1, partyA.transaction {
partyA.services.vaultService.queryBy<DummyDealContract.State>().states.size
})
assertEquals(1, partyB.transaction {
partyB.services.vaultService.queryBy<DummyDealContract.State>().states.size
})
}
}
@InitiatingFlow
class Initiator(private val participants: List<Party>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = serviceHub.signInitialTransaction(TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(UniqueDummyLinearContract.State(participants, "Dummy linear id"), UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID)
addOutputState(DummyDealContract.State(participants, "linear id"), DUMMY_DEAL_PROGRAM_ID)
addCommand(DummyCommandData, listOf(ourIdentity.owningKey))
})
subFlow(FinalityFlow(stx))
}
}

View File

@ -72,7 +72,7 @@ import java.time.Clock
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Vendor")
val MOCK_VERSION_INFO = VersionInfo(4, "Mock release", "Mock revision", "Mock Vendor")
data class MockNodeArgs(
val config: NodeConfiguration,
@ -209,15 +209,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
}
/**
* Return the identity of the default notary node.
* @see defaultNotaryNode
*/
val defaultNotaryIdentityAndCert: PartyAndCertificate
get() {
return defaultNotaryNode.info.legalIdentitiesAndCerts.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
}
/**
* Because this executor is shared, we need to be careful about nodes shutting it down.
*/

View File

@ -17,8 +17,8 @@ import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.User
import org.apache.logging.log4j.Level
@ -86,7 +86,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
@JvmOverloads
fun initNode(legalName: CordaX500Name,
platformVersion: Int = 1,
platformVersion: Int = 4,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): InProcessNode {
val baseDirectory = baseDirectory(legalName).createDirectories()

View File

@ -29,11 +29,15 @@ import java.security.PublicKey
import java.time.Instant
import javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM
// TODO: Make a shared implementation of CordaRPCOps where every method is unimplemented?
class CordaRPCProxyClient(private val targetHostAndPort: NetworkHostAndPort) : CordaRPCOps {
companion object {
val log = contextLogger()
}
override val protocolVersion: Int = 1000
init {
try {
AMQPClientSerializationScheme.initialiseSerialization()

View File

@ -0,0 +1,45 @@
package net.corda.testing.internal.vault
import net.corda.core.contracts.*
import net.corda.core.identity.AbstractParty
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.transactions.LedgerTransaction
import net.corda.testing.core.DummyCommandData
import java.util.*
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Table
const val UNIQUE_DUMMY_FUNGIBLE_CONTRACT_PROGRAM_ID = "net.corda.testing.internal.vault.UniqueDummyFungibleContract"
class UniqueDummyFungibleContract : Contract {
override fun verify(tx: LedgerTransaction) {}
data class State(override val amount: Amount<Issued<Currency>>,
override val owner: AbstractParty) : FungibleAsset<Currency>, QueryableState {
override val exitKeys = setOf(owner.owningKey, amount.token.issuer.party.owningKey)
override val participants = listOf(owner)
override fun withNewOwnerAndAmount(newAmount: Amount<Issued<Currency>>, newOwner: AbstractParty): FungibleAsset<Currency> = copy(amount = amount.copy(newAmount.quantity), owner = newOwner)
override fun withNewOwner(newOwner: AbstractParty) = CommandAndState(DummyCommandData, copy(owner = newOwner))
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(UniqueDummyFungibleStateSchema)
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return UniqueDummyFungibleStateSchema.UniquePersistentDummyFungibleState(currency = amount.token.product.currencyCode)
}
}
}
object UniqueDummyFungibleStateSchema : MappedSchema(schemaFamily = UniqueDummyFungibleStateSchema::class.java, version = 1, mappedTypes = listOf(UniquePersistentDummyFungibleState::class.java)) {
@Entity
@Table(name = "unique_dummy_fungible_state")
class UniquePersistentDummyFungibleState(
@Column(unique = true)
val currency: String
) : PersistentState()
}

View File

@ -0,0 +1,41 @@
package net.corda.testing.internal.vault
import net.corda.core.contracts.Contract
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.AbstractParty
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.transactions.LedgerTransaction
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Table
const val UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID = "net.corda.testing.internal.vault.UniqueDummyLinearContract"
class UniqueDummyLinearContract : Contract {
override fun verify(tx: LedgerTransaction) {}
data class State(
override val participants: List<AbstractParty>,
override val linearId: UniqueIdentifier) : LinearState, QueryableState {
constructor(participants: List<AbstractParty> = listOf(),
ref: String) : this(participants, UniqueIdentifier(ref))
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(UniqueDummyLinearStateSchema)
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return UniqueDummyLinearStateSchema.UniquePersistentLinearDummyState(id = linearId.externalId!!)
}
}
}
object UniqueDummyLinearStateSchema : MappedSchema(schemaFamily = UniqueDummyLinearStateSchema::class.java, version = 1, mappedTypes = listOf(UniquePersistentLinearDummyState::class.java)) {
@Entity
@Table(name = "unique_dummy_linear_state")
class UniquePersistentLinearDummyState(
@Column(unique = true)
val id: String
) : PersistentState()
}

View File

@ -146,7 +146,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
notaries = listOf(NotaryInfo(identity, config.nodeConfig.notary!!.validating)),
modifiedTime = Instant.now(),
maxMessageSize = 10485760,
maxTransactionSize = Int.MAX_VALUE,
maxTransactionSize = 10485760,
epoch = 1,
whitelistedContractImplementations = emptyMap()
))

View File

@ -1,30 +1,41 @@
package net.corda.demobench.ui
import javafx.scene.control.ContentDisplay
import javafx.scene.control.Label
import javafx.scene.control.Tooltip
import javafx.scene.layout.HBox
class PropertyLabel : HBox() {
private val nameLabel = Label()
private val myTooltip = Tooltip()
val nameLabel = Label()
val valueLabel = Label()
private var nameText = ""
private var valueText = ""
var name: String
get() = nameLabel.text
get() = nameText
set(value) {
nameLabel.text = value
nameText = value
updateText()
}
var value: String
get() = valueLabel.text
get() = valueText
set(value) {
valueLabel.text = value
valueText = value
updateText()
}
private fun updateText() {
nameLabel.text = "$nameText $valueText"
myTooltip.text = "$nameText $valueText"
}
init {
nameLabel.styleClass.add("property-name")
valueLabel.styleClass.add("property-value")
children.addAll(nameLabel, valueLabel)
myTooltip.contentDisplay = ContentDisplay.CENTER
Tooltip.install(nameLabel, myTooltip)
children.addAll(nameLabel)
styleClass.add("property-label")
}
}

View File

@ -5,10 +5,10 @@
<?import net.corda.demobench.ui.*?>
<VBox visible="false" prefHeight="953.0" prefWidth="1363.0" xmlns="http://javafx.com/javafx/8.0.111" xmlns:fx="http://javafx.com/fxml/1" styleClass="terminal-vbox">
<HBox fx:id="header" disable="true" prefHeight="95.0" prefWidth="800.0" styleClass="header">
<VBox prefHeight="66.0" HBox.hgrow="ALWAYS">
<VBox prefHeight="66.0" HBox.hgrow="SOMETIMES">
<Label fx:id="nodeName" style="-fx-font-size: 40; -fx-text-fill: red;"/>
</VBox>
<VBox prefHeight="93.0" prefWidth="267.0">
<VBox prefHeight="93.0" prefWidth="267.0" HBox.hgrow="SOMETIMES">
<PropertyLabel fx:id="states" name="States in vault: "/>
<PropertyLabel fx:id="transactions" name="Known transactions: "/>
<PropertyLabel fx:id="balance" name="Balance: "/>

View File

@ -31,11 +31,11 @@ dependencies {
compile project(':node-api')
compile project(':node')
compile group: "com.typesafe", name: "config", version: typesafe_config_version
compile group: "com.fasterxml.jackson.dataformat", name: "jackson-dataformat-yaml", version: "2.9.0"
compile group: "com.fasterxml.jackson.core", name: "jackson-databind", version: "2.9.0"
compile "com.fasterxml.jackson.module:jackson-module-kotlin:2.9.+"
compile group: 'info.picocli', name: 'picocli', version: '3.0.1'
compile "com.typesafe:config:$typesafe_config_version"
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version"
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
compile "com.fasterxml.jackson.module:jackson-module-kotlin:$jackson_version"
compile 'info.picocli:picocli:3.0.1'
// TornadoFX: A lightweight Kotlin framework for working with JavaFX UI's.
compile "no.tornado:tornadofx:$tornadofx_version"

View File

@ -46,7 +46,7 @@ interface Volume {
minimumPlatformVersion = 1,
notaries = it,
maxMessageSize = 10485760,
maxTransactionSize = Int.MAX_VALUE,
maxTransactionSize = 10485760,
modifiedTime = Instant.now(),
epoch = 10,
whitelistedContractImplementations = emptyMap())