mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
RPC: make the client library require the platform version it is built for.
Remove an unnecessary override on the CordaRPCOps interface.
This commit is contained in:
@ -2529,7 +2529,6 @@ public interface net.corda.core.messaging.CordaRPCOps extends net.corda.core.mes
|
|||||||
public abstract void clearNetworkMapCache()
|
public abstract void clearNetworkMapCache()
|
||||||
@NotNull
|
@NotNull
|
||||||
public abstract java.time.Instant currentNodeTime()
|
public abstract java.time.Instant currentNodeTime()
|
||||||
public abstract int getProtocolVersion()
|
|
||||||
@NotNull
|
@NotNull
|
||||||
public abstract Iterable<String> getVaultTransactionNotes(net.corda.core.crypto.SecureHash)
|
public abstract Iterable<String> getVaultTransactionNotes(net.corda.core.crypto.SecureHash)
|
||||||
@RPCReturnsObservables
|
@RPCReturnsObservables
|
||||||
|
@ -55,7 +55,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
node = startNode(ALICE_NAME, 1, singletonList(rpcUser));
|
node = startNode(ALICE_NAME, 1000, singletonList(rpcUser));
|
||||||
client = new CordaRPCClient(requireNonNull(node.getNode().getConfiguration().getRpcOptions().getAddress()));
|
client = new CordaRPCClient(requireNonNull(node.getNode().getConfiguration().getRpcOptions().getAddress()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
object DummyOps : RPCOps {
|
object DummyOps : RPCOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Map<Thread, List<StackTraceElement>> {
|
private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Map<Thread, List<StackTraceElement>> {
|
||||||
@ -107,7 +107,7 @@ class RPCStabilityTests {
|
|||||||
Try.on {
|
Try.on {
|
||||||
startRpcClient<RPCOps>(
|
startRpcClient<RPCOps>(
|
||||||
server.get().broker.hostAndPort!!,
|
server.get().broker.hostAndPort!!,
|
||||||
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1)
|
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1000)
|
||||||
).get()
|
).get()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -203,7 +203,7 @@ class RPCStabilityTests {
|
|||||||
rpcDriver {
|
rpcDriver {
|
||||||
val leakObservableOpsImpl = object : LeakObservableOps {
|
val leakObservableOpsImpl = object : LeakObservableOps {
|
||||||
val leakedUnsubscribedCount = AtomicInteger(0)
|
val leakedUnsubscribedCount = AtomicInteger(0)
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun leakObservable(): Observable<Nothing> {
|
override fun leakObservable(): Observable<Nothing> {
|
||||||
return PublishSubject.create<Nothing>().doOnUnsubscribe {
|
return PublishSubject.create<Nothing>().doOnUnsubscribe {
|
||||||
leakedUnsubscribedCount.incrementAndGet()
|
leakedUnsubscribedCount.incrementAndGet()
|
||||||
@ -234,7 +234,7 @@ class RPCStabilityTests {
|
|||||||
fun `client reconnects to rebooted server`() {
|
fun `client reconnects to rebooted server`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val ops = object : ReconnectOps {
|
val ops = object : ReconnectOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun ping() = "pong"
|
override fun ping() = "pong"
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,7 +259,7 @@ class RPCStabilityTests {
|
|||||||
fun `connection failover fails, rpc calls throw`() {
|
fun `connection failover fails, rpc calls throw`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val ops = object : ReconnectOps {
|
val ops = object : ReconnectOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun ping() = "pong"
|
override fun ping() = "pong"
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,7 +290,7 @@ class RPCStabilityTests {
|
|||||||
fun `observables error when connection breaks`() {
|
fun `observables error when connection breaks`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val ops = object : NoOps {
|
val ops = object : NoOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun subscribe(): Observable<Nothing> {
|
override fun subscribe(): Observable<Nothing> {
|
||||||
return PublishSubject.create<Nothing>()
|
return PublishSubject.create<Nothing>()
|
||||||
}
|
}
|
||||||
@ -350,7 +350,7 @@ class RPCStabilityTests {
|
|||||||
fun `client connects to first available server`() {
|
fun `client connects to first available server`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val ops = object : ServerOps {
|
val ops = object : ServerOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun serverId() = "server"
|
override fun serverId() = "server"
|
||||||
}
|
}
|
||||||
val serverFollower = shutdownManager.follower()
|
val serverFollower = shutdownManager.follower()
|
||||||
@ -371,15 +371,15 @@ class RPCStabilityTests {
|
|||||||
fun `3 server failover`() {
|
fun `3 server failover`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val ops1 = object : ServerOps {
|
val ops1 = object : ServerOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun serverId() = "server1"
|
override fun serverId() = "server1"
|
||||||
}
|
}
|
||||||
val ops2 = object : ServerOps {
|
val ops2 = object : ServerOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun serverId() = "server2"
|
override fun serverId() = "server2"
|
||||||
}
|
}
|
||||||
val ops3 = object : ServerOps {
|
val ops3 = object : ServerOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun serverId() = "server3"
|
override fun serverId() = "server3"
|
||||||
}
|
}
|
||||||
val serverFollower1 = shutdownManager.follower()
|
val serverFollower1 = shutdownManager.follower()
|
||||||
@ -443,7 +443,7 @@ class RPCStabilityTests {
|
|||||||
fun `server cleans up queues after disconnected clients`() {
|
fun `server cleans up queues after disconnected clients`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val trackSubscriberOpsImpl = object : TrackSubscriberOps {
|
val trackSubscriberOpsImpl = object : TrackSubscriberOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
val subscriberCount = AtomicInteger(0)
|
val subscriberCount = AtomicInteger(0)
|
||||||
val trackSubscriberCountObservable = UnicastSubject.create<Unit>().share().
|
val trackSubscriberCountObservable = UnicastSubject.create<Unit>().share().
|
||||||
doOnSubscribe { subscriberCount.incrementAndGet() }.
|
doOnSubscribe { subscriberCount.incrementAndGet() }.
|
||||||
@ -486,7 +486,7 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class SlowConsumerRPCOpsImpl : SlowConsumerRPCOps {
|
class SlowConsumerRPCOpsImpl : SlowConsumerRPCOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
|
|
||||||
override fun streamAtInterval(interval: Duration, size: Int): Observable<ByteArray> {
|
override fun streamAtInterval(interval: Duration, size: Int): Observable<ByteArray> {
|
||||||
val chunk = ByteArray(size)
|
val chunk = ByteArray(size)
|
||||||
@ -587,7 +587,7 @@ class RPCStabilityTests {
|
|||||||
val request = RPCApi.ClientToServer.fromClientMessage(it)
|
val request = RPCApi.ClientToServer.fromClientMessage(it)
|
||||||
when (request) {
|
when (request) {
|
||||||
is RPCApi.ClientToServer.RpcRequest -> {
|
is RPCApi.ClientToServer.RpcRequest -> {
|
||||||
val reply = RPCApi.ServerToClient.RpcReply(request.replyId, Try.Success(0), "server")
|
val reply = RPCApi.ServerToClient.RpcReply(request.replyId, Try.Success(1000), "server")
|
||||||
val message = session.createMessage(false)
|
val message = session.createMessage(false)
|
||||||
reply.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message)
|
reply.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message)
|
||||||
message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, dedupeId.getAndIncrement())
|
message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, dedupeId.getAndIncrement())
|
||||||
|
@ -34,9 +34,18 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
|
|||||||
open val connectionMaxRetryInterval: Duration = 3.minutes,
|
open val connectionMaxRetryInterval: Duration = 3.minutes,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The minimum protocol version required from the server.
|
* The minimum protocol version required from the server. This is equivalent to the node's platform version
|
||||||
|
* number. If this minimum version is not met, an exception will be thrown at startup. If you use features
|
||||||
|
* introduced in a later version, you can bump this to match the platform version you need and get an early
|
||||||
|
* check that runs before you do anything.
|
||||||
|
*
|
||||||
|
* If you leave it at the default then things will work but attempting to use an RPC added in a version later
|
||||||
|
* than the server supports will throw [UnsupportedOperationException].
|
||||||
|
*
|
||||||
|
* The default value is whatever version of Corda this RPC library was shipped as a part of. Therefore if you
|
||||||
|
* use the RPC library from Corda 4, it will by default only connect to a node of version 4 or above.
|
||||||
*/
|
*/
|
||||||
open val minimumServerProtocolVersion: Int = 0,
|
open val minimumServerProtocolVersion: Int = 4,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
|
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
|
||||||
|
@ -48,7 +48,7 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
|
|||||||
|
|
||||||
fun makeComplicatedListenableFuture(): CordaFuture<Pair<String, CordaFuture<String>>>
|
fun makeComplicatedListenableFuture(): CordaFuture<Pair<String, CordaFuture<String>>>
|
||||||
|
|
||||||
@RPCSinceVersion(2)
|
@RPCSinceVersion(2000)
|
||||||
fun addedLater()
|
fun addedLater()
|
||||||
|
|
||||||
fun captureUser(): String
|
fun captureUser(): String
|
||||||
@ -58,7 +58,7 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
|
|||||||
private lateinit var complicatedListenableFuturee: CordaFuture<Pair<String, CordaFuture<String>>>
|
private lateinit var complicatedListenableFuturee: CordaFuture<Pair<String, CordaFuture<String>>>
|
||||||
|
|
||||||
inner class TestOpsImpl : TestOps {
|
inner class TestOpsImpl : TestOps {
|
||||||
override val protocolVersion = 1
|
override val protocolVersion = 1000
|
||||||
// do not remove Unit
|
// do not remove Unit
|
||||||
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
|
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
|
||||||
override fun void() {}
|
override fun void() {}
|
||||||
|
@ -33,7 +33,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
|
|||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
data class ObservableRose<out A>(val value: A, val branches: Observable<out ObservableRose<A>>)
|
data class ObservableRose<out A>(val value: A, val branches: Observable<out ObservableRose<A>>)
|
||||||
|
|
||||||
private interface TestOps : RPCOps {
|
interface TestOps : RPCOps {
|
||||||
fun newLatch(numberOfDowns: Int): Long
|
fun newLatch(numberOfDowns: Int): Long
|
||||||
fun waitLatch(id: Long)
|
fun waitLatch(id: Long)
|
||||||
fun downLatch(id: Long)
|
fun downLatch(id: Long)
|
||||||
@ -43,7 +43,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
|
|||||||
|
|
||||||
class TestOpsImpl(private val pool: Executor) : TestOps {
|
class TestOpsImpl(private val pool: Executor) : TestOps {
|
||||||
private val latches = ConcurrentHashMap<Long, CountDownLatch>()
|
private val latches = ConcurrentHashMap<Long, CountDownLatch>()
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
|
|
||||||
override fun newLatch(numberOfDowns: Int): Long {
|
override fun newLatch(numberOfDowns: Int): Long {
|
||||||
val id = random63BitValue()
|
val id = random63BitValue()
|
||||||
|
@ -26,7 +26,7 @@ class RPCFailureTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class OpsImpl : Ops {
|
class OpsImpl : Ops {
|
||||||
override val protocolVersion = 1
|
override val protocolVersion = 1000
|
||||||
override fun getUnserializable() = Unserializable()
|
override fun getUnserializable() = Unserializable()
|
||||||
override fun getUnserializableAsync(): CordaFuture<Unserializable> {
|
override fun getUnserializableAsync(): CordaFuture<Unserializable> {
|
||||||
return openFuture<Unserializable>().apply { capture { getUnserializable() } }
|
return openFuture<Unserializable>().apply { capture { getUnserializable() } }
|
||||||
|
@ -24,7 +24,7 @@ class RPCHighThroughputObservableTests : AbstractRPCTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
internal class TestOpsImpl : TestOps {
|
internal class TestOpsImpl : TestOps {
|
||||||
override val protocolVersion = 1
|
override val protocolVersion = 1000
|
||||||
|
|
||||||
override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 }
|
override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 }
|
||||||
}
|
}
|
||||||
|
@ -5,8 +5,8 @@ import net.corda.core.messaging.RPCOps
|
|||||||
import net.corda.core.utilities.minutes
|
import net.corda.core.utilities.minutes
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||||
import net.corda.testing.node.internal.RPCDriverDSL
|
|
||||||
import net.corda.testing.internal.performance.div
|
import net.corda.testing.internal.performance.div
|
||||||
|
import net.corda.testing.node.internal.RPCDriverDSL
|
||||||
import net.corda.testing.node.internal.performance.startPublishingFixedRateInjector
|
import net.corda.testing.node.internal.performance.startPublishingFixedRateInjector
|
||||||
import net.corda.testing.node.internal.performance.startReporter
|
import net.corda.testing.node.internal.performance.startReporter
|
||||||
import net.corda.testing.node.internal.performance.startTightLoopInjector
|
import net.corda.testing.node.internal.performance.startTightLoopInjector
|
||||||
@ -34,7 +34,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class TestOpsImpl : TestOps {
|
class TestOpsImpl : TestOps {
|
||||||
override val protocolVersion = 0
|
override val protocolVersion = 1000
|
||||||
override fun simpleReply(input: ByteArray, sizeOfReply: Int): ByteArray {
|
override fun simpleReply(input: ByteArray, sizeOfReply: Int): ByteArray {
|
||||||
return ByteArray(sizeOfReply)
|
return ByteArray(sizeOfReply)
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ class RPCPermissionsTests : AbstractRPCTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class TestOpsImpl : TestOps {
|
class TestOpsImpl : TestOps {
|
||||||
override val protocolVersion = 1
|
override val protocolVersion = 1000
|
||||||
override fun validatePermission(method: String, target: String?) {
|
override fun validatePermission(method: String, target: String?) {
|
||||||
val authorized = if (target == null) {
|
val authorized = if (target == null) {
|
||||||
rpcContext().isPermitted(method)
|
rpcContext().isPermitted(method)
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
gradlePluginsVersion=4.0.29
|
gradlePluginsVersion=4.0.29
|
||||||
kotlinVersion=1.2.51
|
kotlinVersion=1.2.51
|
||||||
|
# When adjusting platformVersion upwards please also modify CordaRPCClientConfiguration.minimumServerProtocolVersion \
|
||||||
|
# if there have been any RPC changes. Also please modify InternalMockNetwork.kt:MOCK_VERSION_INFO and NodeBasedTest.startNode
|
||||||
platformVersion=4
|
platformVersion=4
|
||||||
guavaVersion=25.1-jre
|
guavaVersion=25.1-jre
|
||||||
proguardVersion=6.0.3
|
proguardVersion=6.0.3
|
||||||
|
@ -96,12 +96,6 @@ data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRun
|
|||||||
|
|
||||||
/** RPC operations that the node exposes to clients. */
|
/** RPC operations that the node exposes to clients. */
|
||||||
interface CordaRPCOps : RPCOps {
|
interface CordaRPCOps : RPCOps {
|
||||||
/**
|
|
||||||
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
|
||||||
* to be present.
|
|
||||||
*/
|
|
||||||
override val protocolVersion: Int get() = nodeInfo().platformVersion
|
|
||||||
|
|
||||||
/** Returns a list of currently in-progress state machine infos. */
|
/** Returns a list of currently in-progress state machine infos. */
|
||||||
fun stateMachinesSnapshot(): List<StateMachineInfo>
|
fun stateMachinesSnapshot(): List<StateMachineInfo>
|
||||||
|
|
||||||
|
@ -6,7 +6,11 @@ release, see :doc:`upgrade-notes`.
|
|||||||
|
|
||||||
Unreleased
|
Unreleased
|
||||||
----------
|
----------
|
||||||
* Removed experimental feature `CordformDefinition`
|
* The RPC client library now checks at startup whether the server is of the client libraries major version or higher.
|
||||||
|
Therefore to connect to a Corda 4 node you must use version 4 or lower of the library. This behaviour can be overridden
|
||||||
|
by specifying a lower number in the ``CordaRPCClientConfiguration`` class.
|
||||||
|
|
||||||
|
* Removed experimental feature ``CordformDefinition``
|
||||||
|
|
||||||
* Vault query fix: support query by parent classes of Contract State classes (see https://github.com/corda/corda/issues/3714)
|
* Vault query fix: support query by parent classes of Contract State classes (see https://github.com/corda/corda/issues/3714)
|
||||||
|
|
||||||
|
@ -18,8 +18,8 @@ object as normal, and the marshalling back and forth is handled for you.
|
|||||||
|
|
||||||
.. warning:: The built-in Corda webserver is deprecated and unsuitable for production use. If you want to interact with
|
.. warning:: The built-in Corda webserver is deprecated and unsuitable for production use. If you want to interact with
|
||||||
your node via HTTP, you will need to stand up your own webserver, then create an RPC connection between your node
|
your node via HTTP, you will need to stand up your own webserver, then create an RPC connection between your node
|
||||||
and this webserver using the `CordaRPCClient`_ library. You can find an example of how to do this
|
and this webserver using the `CordaRPCClient`_ library. You can find an example of how to do this using the popular
|
||||||
`here <https://github.com/corda/spring-webserver>`_.
|
Spring Boot server `here <https://github.com/corda/spring-webserver>`_.
|
||||||
|
|
||||||
Connecting to a node via RPC
|
Connecting to a node via RPC
|
||||||
----------------------------
|
----------------------------
|
||||||
@ -291,31 +291,43 @@ would expect.
|
|||||||
|
|
||||||
This feature comes with a cost: the server must queue up objects emitted by the server-side observable until you
|
This feature comes with a cost: the server must queue up objects emitted by the server-side observable until you
|
||||||
download them. Note that the server side observation buffer is bounded, once it fills up the client is considered
|
download them. Note that the server side observation buffer is bounded, once it fills up the client is considered
|
||||||
slow and kicked. You are expected to subscribe to all the observables returned, otherwise client-side memory starts
|
slow and will be disconnected. You are expected to subscribe to all the observables returned, otherwise client-side
|
||||||
filling up as observations come in. If you don't want an observable then subscribe then unsubscribe immediately to
|
memory starts filling up as observations come in. If you don't want an observable then subscribe then unsubscribe
|
||||||
clear the client-side buffers and to stop the server from streaming. If your app quits then server side resources
|
immediately to clear the client-side buffers and to stop the server from streaming. For Kotlin users there is a
|
||||||
will be freed automatically.
|
convenience extension method called ``notUsed()`` which can be called on an observable to automate this step.
|
||||||
|
|
||||||
|
If your app quits then server side resources will be freed automatically.
|
||||||
|
|
||||||
.. warning:: If you leak an observable on the client side and it gets garbage collected, you will get a warning
|
.. warning:: If you leak an observable on the client side and it gets garbage collected, you will get a warning
|
||||||
printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection
|
printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection
|
||||||
is non-deterministic.
|
is non-deterministic. If you set ``-Dnet.corda.client.rpc.trackRpcCallSites=true`` on the JVM command line then
|
||||||
|
this warning comes with a stack trace showing where the RPC that returned the forgotten observable was called from.
|
||||||
|
This feature is off by default because tracking RPC call sites is moderately slow.
|
||||||
|
|
||||||
.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass
|
.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass
|
||||||
Observables as parameters to the RPC methods.
|
Observables as parameters to the RPC methods. In other words the streaming is always server to client and not
|
||||||
|
the other way around.
|
||||||
|
|
||||||
Futures
|
Futures
|
||||||
-------
|
-------
|
||||||
A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to
|
A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to
|
||||||
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources.
|
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release
|
||||||
|
any resources.
|
||||||
|
|
||||||
Versioning
|
Versioning
|
||||||
----------
|
----------
|
||||||
The client RPC protocol is versioned using the node's Platform Version (see :doc:`versioning`). When a proxy is created
|
The client RPC protocol is versioned using the node's platform version number (see :doc:`versioning`). When a proxy is created
|
||||||
the server is queried for its version, and you can specify your minimum requirement. Methods added in later versions
|
the server is queried for its version, and you can specify your minimum requirement. Methods added in later versions
|
||||||
are tagged with the ``@RPCSinceVersion`` annotation. If you try to use a method that the server isn't advertising support
|
are tagged with the ``@RPCSinceVersion`` annotation. If you try to use a method that the server isn't advertising support
|
||||||
of, an ``UnsupportedOperationException`` is thrown. If you want to know the version of the server, just use the
|
of, an ``UnsupportedOperationException`` is thrown. If you want to know the version of the server, just use the
|
||||||
``protocolVersion`` property (i.e. ``getProtocolVersion`` in Java).
|
``protocolVersion`` property (i.e. ``getProtocolVersion`` in Java).
|
||||||
|
|
||||||
|
The RPC client library defaults to requiring the platform version it was built with. That means if you use the client
|
||||||
|
library released as part of Corda N, then the node it connects to must be of version N or above. This is checked when
|
||||||
|
the client first connects. If you want to override this behaviour, you can alter the ``minimumServerProtocolVersion``
|
||||||
|
field in the ``CordaRPCClientConfiguration`` object passed to the client. Alternatively, just link your app against
|
||||||
|
an older version of the library.
|
||||||
|
|
||||||
Thread safety
|
Thread safety
|
||||||
-------------
|
-------------
|
||||||
A proxy is thread safe, blocking, and allows multiple RPCs to be in flight at once. Any observables that are returned and
|
A proxy is thread safe, blocking, and allows multiple RPCs to be in flight at once. Any observables that are returned and
|
||||||
@ -338,7 +350,6 @@ such situations:
|
|||||||
.. sourcecode:: Kotlin
|
.. sourcecode:: Kotlin
|
||||||
|
|
||||||
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
||||||
|
|
||||||
val retryInterval = 5.seconds
|
val retryInterval = 5.seconds
|
||||||
|
|
||||||
do {
|
do {
|
||||||
@ -382,7 +393,6 @@ on the ``Observable`` returned by ``CordaRPCOps``.
|
|||||||
.. sourcecode:: Kotlin
|
.. sourcecode:: Kotlin
|
||||||
|
|
||||||
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
||||||
|
|
||||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
||||||
val proxy = connection.proxy
|
val proxy = connection.proxy
|
||||||
|
|
||||||
@ -414,10 +424,6 @@ Client code if fed with instances of ``StateMachineInfo`` using call ``clientCod
|
|||||||
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
|
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
|
||||||
It is down to client code in this case handle those duplicate items as appropriate.
|
It is down to client code in this case handle those duplicate items as appropriate.
|
||||||
|
|
||||||
Wire protocol
|
|
||||||
-------------
|
|
||||||
The client RPC wire protocol is defined and documented in ``net/corda/client/rpc/RPCApi.kt``.
|
|
||||||
|
|
||||||
Wire security
|
Wire security
|
||||||
-------------
|
-------------
|
||||||
``CordaRPCClient`` has an optional constructor parameter of type ``ClientRpcSslOptions``, defaulted to ``null``, which allows
|
``CordaRPCClient`` has an optional constructor parameter of type ``ClientRpcSslOptions``, defaulted to ``null``, which allows
|
||||||
@ -430,7 +436,6 @@ In order for this to work, the client needs to provide a truststore containing a
|
|||||||
|
|
||||||
For the communication to be secure, we recommend using the standard SSL best practices for key management.
|
For the communication to be secure, we recommend using the standard SSL best practices for key management.
|
||||||
|
|
||||||
|
|
||||||
Whitelisting classes with the Corda node
|
Whitelisting classes with the Corda node
|
||||||
----------------------------------------
|
----------------------------------------
|
||||||
CorDapps must whitelist any classes used over RPC with Corda's serialization framework, unless they are whitelisted by
|
CorDapps must whitelist any classes used over RPC with Corda's serialization framework, unless they are whitelisted by
|
||||||
|
@ -13,8 +13,8 @@ import net.corda.node.internal.DataSourceFactory
|
|||||||
import net.corda.node.internal.NodeWithInfo
|
import net.corda.node.internal.NodeWithInfo
|
||||||
import net.corda.node.services.Permissions
|
import net.corda.node.services.Permissions
|
||||||
import net.corda.node.services.config.PasswordEncryption
|
import net.corda.node.services.config.PasswordEncryption
|
||||||
import net.corda.testing.node.internal.NodeBasedTest
|
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
|
import net.corda.testing.node.internal.NodeBasedTest
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||||
import org.apache.shiro.authc.credential.DefaultPasswordService
|
import org.apache.shiro.authc.credential.DefaultPasswordService
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
@ -33,7 +33,6 @@ import kotlin.test.assertFailsWith
|
|||||||
*/
|
*/
|
||||||
@RunWith(Parameterized::class)
|
@RunWith(Parameterized::class)
|
||||||
class AuthDBTests : NodeBasedTest() {
|
class AuthDBTests : NodeBasedTest() {
|
||||||
|
|
||||||
private lateinit var node: NodeWithInfo
|
private lateinit var node: NodeWithInfo
|
||||||
private lateinit var client: CordaRPCClient
|
private lateinit var client: CordaRPCClient
|
||||||
private lateinit var db: UsersDB
|
private lateinit var db: UsersDB
|
||||||
|
@ -140,7 +140,7 @@ class ArtemisRpcTests {
|
|||||||
class TestRpcOpsImpl : TestRpcOps {
|
class TestRpcOpsImpl : TestRpcOps {
|
||||||
override fun greet(name: String): String = "Oh, hello $name!"
|
override fun greet(name: String): String = "Oh, hello $name!"
|
||||||
|
|
||||||
override val protocolVersion: Int = 1
|
override val protocolVersion: Int = 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun tempFile(name: String): Path = tempFolder.root.toPath() / name
|
private fun tempFile(name: String): Path = tempFolder.root.toPath() / name
|
||||||
|
@ -49,6 +49,12 @@ internal class CordaRPCOpsImpl(
|
|||||||
private val flowStarter: FlowStarter,
|
private val flowStarter: FlowStarter,
|
||||||
private val shutdownNode: () -> Unit
|
private val shutdownNode: () -> Unit
|
||||||
) : CordaRPCOps {
|
) : CordaRPCOps {
|
||||||
|
/**
|
||||||
|
* Returns the RPC protocol version, which is the same the node's platform Version. Exists since version 1 so guaranteed
|
||||||
|
* to be present.
|
||||||
|
*/
|
||||||
|
override val protocolVersion: Int get() = nodeInfo().platformVersion
|
||||||
|
|
||||||
override fun networkMapSnapshot(): List<NodeInfo> {
|
override fun networkMapSnapshot(): List<NodeInfo> {
|
||||||
val (snapshot, updates) = networkMapFeed()
|
val (snapshot, updates) = networkMapFeed()
|
||||||
updates.notUsed()
|
updates.notUsed()
|
||||||
|
@ -16,6 +16,8 @@ internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : Cor
|
|||||||
/**
|
/**
|
||||||
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
||||||
* to be present.
|
* to be present.
|
||||||
|
*
|
||||||
|
* TODO: Why is this logic duplicated vs the actual implementation?
|
||||||
*/
|
*/
|
||||||
override val protocolVersion: Int get() = delegate.nodeInfo().platformVersion
|
override val protocolVersion: Int get() = delegate.nodeInfo().platformVersion
|
||||||
|
|
||||||
@ -31,7 +33,6 @@ internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : Cor
|
|||||||
|
|
||||||
private companion object {
|
private companion object {
|
||||||
private fun proxy(delegate: CordaRPCOps, context: () -> RpcAuthContext): CordaRPCOps {
|
private fun proxy(delegate: CordaRPCOps, context: () -> RpcAuthContext): CordaRPCOps {
|
||||||
|
|
||||||
val handler = PermissionsEnforcingInvocationHandler(delegate, context)
|
val handler = PermissionsEnforcingInvocationHandler(delegate, context)
|
||||||
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ import java.time.Clock
|
|||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Vendor")
|
val MOCK_VERSION_INFO = VersionInfo(4, "Mock release", "Mock revision", "Mock Vendor")
|
||||||
|
|
||||||
data class MockNodeArgs(
|
data class MockNodeArgs(
|
||||||
val config: NodeConfiguration,
|
val config: NodeConfiguration,
|
||||||
@ -209,15 +209,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
|||||||
return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
|
return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the identity of the default notary node.
|
|
||||||
* @see defaultNotaryNode
|
|
||||||
*/
|
|
||||||
val defaultNotaryIdentityAndCert: PartyAndCertificate
|
|
||||||
get() {
|
|
||||||
return defaultNotaryNode.info.legalIdentitiesAndCerts.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Because this executor is shared, we need to be careful about nodes shutting it down.
|
* Because this executor is shared, we need to be careful about nodes shutting it down.
|
||||||
*/
|
*/
|
||||||
|
@ -10,9 +10,8 @@ import net.corda.core.node.NodeInfo
|
|||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.NodeWithInfo
|
|
||||||
import net.corda.node.internal.Node
|
import net.corda.node.internal.Node
|
||||||
|
import net.corda.node.internal.NodeWithInfo
|
||||||
import net.corda.node.services.config.*
|
import net.corda.node.services.config.*
|
||||||
import net.corda.nodeapi.internal.config.toConfig
|
import net.corda.nodeapi.internal.config.toConfig
|
||||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||||
@ -85,7 +84,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
|
|||||||
|
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
fun startNode(legalName: CordaX500Name,
|
fun startNode(legalName: CordaX500Name,
|
||||||
platformVersion: Int = 1,
|
platformVersion: Int = 4,
|
||||||
rpcUsers: List<User> = emptyList(),
|
rpcUsers: List<User> = emptyList(),
|
||||||
configOverrides: Map<String, Any> = emptyMap()): NodeWithInfo {
|
configOverrides: Map<String, Any> = emptyMap()): NodeWithInfo {
|
||||||
val baseDirectory = baseDirectory(legalName).createDirectories()
|
val baseDirectory = baseDirectory(legalName).createDirectories()
|
||||||
|
Reference in New Issue
Block a user