mirror of
https://github.com/corda/corda.git
synced 2025-03-03 12:57:29 +00:00
Merge remote-tracking branch 'remotes/open/master' into merges/may-21-17-24
# Conflicts: # constants.properties # node-api/src/main/kotlin/net/corda/nodeapi/exceptions/RejectedCommandException.kt # node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt # node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt
This commit is contained in:
commit
e3b2d3332d
@ -131,8 +131,9 @@ see changes to this list.
|
||||
* Nigel King (R3)
|
||||
* Nuam Athaweth (MUFG)
|
||||
* Oscar Zibordi de Paiva (Bradesco)
|
||||
* OP Financial
|
||||
* Patrick Kuo (R3)
|
||||
* Pekka Kaipio
|
||||
* Pekka Kaipio (OP Financial)
|
||||
* Phillip Griffin
|
||||
* Piotr Piskorski (Nordea)
|
||||
* Przemyslaw Bak (R3)
|
||||
@ -184,4 +185,4 @@ see changes to this list.
|
||||
* Vipin Bharathan
|
||||
* Wawrzek Niewodniczanski (R3)
|
||||
* Wei Wu Zhang (Commonwealth Bank of Australia)
|
||||
* Zabrina Smith (Northern Trust)
|
||||
* Zabrina Smith (Northern Trust)
|
||||
|
@ -11,10 +11,10 @@
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.nodeapi.exceptions.RpcSerializableError
|
||||
import net.corda.core.ClientRelevantError
|
||||
|
||||
/**
|
||||
* Thrown to indicate that the calling user does not have permission for something they have requested (for example
|
||||
* calling a method).
|
||||
*/
|
||||
class PermissionException(message: String) : CordaRuntimeException(message), RpcSerializableError
|
||||
class PermissionException(message: String) : CordaRuntimeException(message), ClientRelevantError
|
@ -8,7 +8,7 @@
|
||||
# Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
#
|
||||
|
||||
gradlePluginsVersion=4.0.19
|
||||
gradlePluginsVersion=4.0.20
|
||||
kotlinVersion=1.2.41
|
||||
platformVersion=4
|
||||
guavaVersion=21.0
|
||||
|
@ -8,11 +8,12 @@
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.nodeapi.exceptions
|
||||
package net.corda.core
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
/**
|
||||
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
|
||||
* Allows an implementing [Throwable] to be propagated to clients.
|
||||
*/
|
||||
class RejectedCommandException(message: String) : CordaRuntimeException(message), RpcSerializableError
|
||||
@CordaSerializable
|
||||
interface ClientRelevantError
|
@ -277,6 +277,12 @@ fun <T> Any.declaredField(name: String): DeclaredField<T> = DeclaredField(javaCl
|
||||
*/
|
||||
fun <T> Any.declaredField(clazz: KClass<*>, name: String): DeclaredField<T> = DeclaredField(clazz.java, name, this)
|
||||
|
||||
/**
|
||||
* Returns a [DeclaredField] wrapper around the (possibly non-public) instance field of the receiver object, but declared
|
||||
* in its superclass [clazz].
|
||||
*/
|
||||
fun <T> Any.declaredField(clazz: Class<*>, name: String): DeclaredField<T> = DeclaredField(clazz, name, this)
|
||||
|
||||
/** creates a new instance if not a Kotlin object */
|
||||
fun <T : Any> KClass<T>.objectOrNewInstance(): T {
|
||||
return this.objectInstance ?: this.createInstance()
|
||||
@ -287,10 +293,43 @@ fun <T : Any> KClass<T>.objectOrNewInstance(): T {
|
||||
* visibility.
|
||||
*/
|
||||
class DeclaredField<T>(clazz: Class<*>, name: String, private val receiver: Any?) {
|
||||
private val javaField = clazz.getDeclaredField(name).apply { isAccessible = true }
|
||||
private val javaField = findField(name, clazz)
|
||||
var value: T
|
||||
get() = uncheckedCast<Any?, T>(javaField.get(receiver))
|
||||
set(value) = javaField.set(receiver, value)
|
||||
get() {
|
||||
synchronized(this) {
|
||||
return javaField.accessible { uncheckedCast<Any?, T>(get(receiver)) }
|
||||
}
|
||||
}
|
||||
set(value) {
|
||||
synchronized(this) {
|
||||
javaField.accessible {
|
||||
set(receiver, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
val name: String = javaField.name
|
||||
|
||||
private fun <RESULT> Field.accessible(action: Field.() -> RESULT): RESULT {
|
||||
val accessible = isAccessible
|
||||
isAccessible = true
|
||||
try {
|
||||
return action(this)
|
||||
} finally {
|
||||
isAccessible = accessible
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(NoSuchFieldException::class)
|
||||
private fun findField(fieldName: String, clazz: Class<*>?): Field {
|
||||
if (clazz == null) {
|
||||
throw NoSuchFieldException(fieldName)
|
||||
}
|
||||
return try {
|
||||
return clazz.getDeclaredField(fieldName)
|
||||
} catch (e: NoSuchFieldException) {
|
||||
findField(fieldName, clazz.superclass)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** The annotated object would have a more restricted visibility were it not needed in tests. */
|
||||
|
@ -300,9 +300,11 @@ interface CordaRPCOps : RPCOps {
|
||||
fun openAttachment(id: SecureHash): InputStream
|
||||
|
||||
/** Uploads a jar to the node, returns it's hash. */
|
||||
@Throws(java.nio.file.FileAlreadyExistsException::class)
|
||||
fun uploadAttachment(jar: InputStream): SecureHash
|
||||
|
||||
/** Uploads a jar including metadata to the node, returns it's hash. */
|
||||
@Throws(java.nio.file.FileAlreadyExistsException::class)
|
||||
fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash
|
||||
|
||||
/** Queries attachments metadata */
|
||||
|
@ -13,6 +13,7 @@
|
||||
package net.corda.core.node.services.vault
|
||||
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -449,18 +450,7 @@ class FieldInfo internal constructor(val name: String, val entityClass: Class<*>
|
||||
*/
|
||||
@Throws(NoSuchFieldException::class)
|
||||
fun getField(fieldName: String, entityClass: Class<*>): FieldInfo {
|
||||
return getField(fieldName, entityClass, entityClass)
|
||||
}
|
||||
|
||||
@Throws(NoSuchFieldException::class)
|
||||
private fun getField(fieldName: String, clazz: Class<*>?, invokingClazz: Class<*>): FieldInfo {
|
||||
if (clazz == null) {
|
||||
throw NoSuchFieldException(fieldName)
|
||||
}
|
||||
return try {
|
||||
val field = clazz.getDeclaredField(fieldName)
|
||||
return FieldInfo(field.name, invokingClazz)
|
||||
} catch (e: NoSuchFieldException) {
|
||||
getField(fieldName, clazz.superclass, invokingClazz)
|
||||
}
|
||||
val field = entityClass.declaredField<Any>(entityClass, fieldName)
|
||||
return FieldInfo(field.name, entityClass)
|
||||
}
|
@ -116,7 +116,7 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
|
||||
resolveIdentity: (PublicKey) -> Party?,
|
||||
resolveAttachment: (SecureHash) -> Attachment?,
|
||||
resolveStateRef: (StateRef) -> TransactionState<*>?,
|
||||
@SuppressWarnings("unused") resolveContractAttachment: (TransactionState<ContractState>) -> AttachmentId?
|
||||
@Suppress("UNUSED_PARAMETER") resolveContractAttachment: (TransactionState<ContractState>) -> AttachmentId?
|
||||
): LedgerTransaction {
|
||||
return toLedgerTransactionInternal(resolveIdentity, resolveAttachment, resolveStateRef, null)
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.core.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
@ -27,7 +28,6 @@ import net.corda.finance.USD
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.node.internal.SecureCordaRPCOps
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
@ -126,7 +126,7 @@ class ContractUpgradeFlowTest {
|
||||
return startRpcClient<CordaRPCOps>(
|
||||
rpcAddress = startRpcServer(
|
||||
rpcUser = user,
|
||||
ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services, { })
|
||||
ops = node.rpcOps
|
||||
).get().broker.hostAndPort!!,
|
||||
username = user.username,
|
||||
password = user.password
|
||||
@ -163,7 +163,7 @@ class ContractUpgradeFlowTest {
|
||||
DummyContractV2::class.java).returnValue
|
||||
|
||||
mockNet.runNetwork()
|
||||
assertFailsWith(UnexpectedFlowEndException::class) { rejectedFuture.getOrThrow() }
|
||||
assertFailsWith(CordaRuntimeException::class) { rejectedFuture.getOrThrow() }
|
||||
|
||||
// Party B authorise the contract state upgrade, and immediately deauthorise the same.
|
||||
rpcB.startFlow({ stateAndRef, upgrade -> ContractUpgradeFlow.Authorise(stateAndRef, upgrade) },
|
||||
@ -178,7 +178,7 @@ class ContractUpgradeFlowTest {
|
||||
DummyContractV2::class.java).returnValue
|
||||
|
||||
mockNet.runNetwork()
|
||||
assertFailsWith(UnexpectedFlowEndException::class) { deauthorisedFuture.getOrThrow() }
|
||||
assertFailsWith(CordaRuntimeException::class) { deauthorisedFuture.getOrThrow() }
|
||||
|
||||
// Party B authorise the contract state upgrade.
|
||||
rpcB.startFlow({ stateAndRef, upgrade -> ContractUpgradeFlow.Authorise(stateAndRef, upgrade) },
|
||||
|
@ -50,9 +50,10 @@ private fun createAttachmentData(content: String) = ByteArrayOutputStream().appl
|
||||
}
|
||||
}.toByteArray()
|
||||
|
||||
private fun Attachment.extractContent() = ByteArrayOutputStream().apply { extractFile("content", this) }.toString(UTF_8.name())
|
||||
private fun
|
||||
Attachment.extractContent() = ByteArrayOutputStream().apply { extractFile("content", this) }.toString(UTF_8.name())
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Suppress("deprecation")
|
||||
private fun StartedNode<*>.saveAttachment(content: String) = database.transaction {
|
||||
attachments.importAttachment(createAttachmentData(content).inputStream())
|
||||
}
|
||||
|
@ -301,6 +301,91 @@ requires a higher version of the protocol than the server supports, ``Unsupporte
|
||||
Otherwise, if the server implementation throws an exception, that exception is serialised and rethrown on the client
|
||||
side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal.
|
||||
|
||||
Connection management
|
||||
---------------------
|
||||
It is possible to not be able to connect to the server on the first attempt. In that case, the ``CordaRPCCLient.start()``
|
||||
method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for
|
||||
such situations:
|
||||
|
||||
.. sourcecode:: Kotlin
|
||||
|
||||
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
||||
|
||||
val retryInterval = 5.seconds
|
||||
|
||||
do {
|
||||
val connection = try {
|
||||
logger.info("Connecting to: $nodeHostAndPort")
|
||||
val client = CordaRPCClient(
|
||||
nodeHostAndPort,
|
||||
object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = retryInterval
|
||||
}
|
||||
)
|
||||
val _connection = client.start(username, password)
|
||||
// Check connection is truly operational before returning it.
|
||||
val nodeInfo = _connection.proxy.nodeInfo()
|
||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||
_connection
|
||||
} catch(secEx: ActiveMQSecurityException) {
|
||||
// Happens when incorrect credentials provided - no point to retry connecting.
|
||||
throw secEx
|
||||
}
|
||||
catch(ex: RPCException) {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
logger.info("Exception upon establishing connection: " + ex.message)
|
||||
null
|
||||
}
|
||||
|
||||
if(connection != null) {
|
||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||
return connection
|
||||
}
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
} while (connection == null)
|
||||
}
|
||||
|
||||
After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw
|
||||
an exception and created observables will no longer receive observations. Below is an example of how to reconnect and
|
||||
back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler
|
||||
on the ``Observable`` returned by ``CordaRPCOps``.
|
||||
|
||||
.. sourcecode:: Kotlin
|
||||
|
||||
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
||||
|
||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
||||
val proxy = connection.proxy
|
||||
|
||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||
|
||||
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
|
||||
val subscription: Subscription = stateMachineUpdatesRaw
|
||||
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
|
||||
.subscribe({ clientCode(it) /* Client code here */ }, {
|
||||
// Terminate subscription such that nothing gets past this point to downstream Observables.
|
||||
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
|
||||
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
||||
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
||||
// force closing the connection to avoid propagation of notification to the server side.
|
||||
connection.forceClose()
|
||||
// Perform re-connect.
|
||||
performRpcReconnect(nodeHostAndPort, username, password)
|
||||
})
|
||||
|
||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||
}
|
||||
|
||||
In this code snippet it is possible to see that function ``performRpcReconnect`` creates an RPC connection and implements
|
||||
the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be made when failover
|
||||
happens then the code will terminate existing subscription, closes RPC connection and recursively calls ``performRpcReconnect``
|
||||
which will re-subscribe once RPC connection comes back online.
|
||||
|
||||
Client code if fed with instances of ``StateMachineInfo`` using call ``clientCode(it)``. Upon re-connec, this code receives
|
||||
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
|
||||
It is down to client code in this case handle those duplicate items as appropriate.
|
||||
|
||||
Wire protocol
|
||||
-------------
|
||||
The client RPC wire protocol is defined and documented in ``net/corda/client/rpc/RPCApi.kt``.
|
||||
|
@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
|
||||
class KryoHookAgent {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun premain(@SuppressWarnings("unused") argumentsString: String?, instrumentation: Instrumentation) {
|
||||
fun premain(@Suppress("UNUSED_PARAMETER") argumentsString: String?, instrumentation: Instrumentation) {
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
val statsTrees = KryoHook.events.values.flatMap {
|
||||
readTrees(it, 0).second
|
||||
@ -105,13 +105,13 @@ object KryoHook : ClassFileTransformer {
|
||||
val events = ConcurrentHashMap<Long, ArrayList<StatsEvent>>()
|
||||
|
||||
@JvmStatic
|
||||
fun writeEnter(@SuppressWarnings("unused") kryo: Kryo, output: Output, obj: Any) {
|
||||
fun writeEnter(@Suppress("UNUSED_PARAMETER") kryo: Kryo, output: Output, obj: Any) {
|
||||
events.getOrPut(Strand.currentStrand().id) { ArrayList() }.add(
|
||||
StatsEvent.Enter(obj.javaClass.name, output.total())
|
||||
)
|
||||
}
|
||||
@JvmStatic
|
||||
fun writeExit(@SuppressWarnings("unused") kryo: Kryo, output: Output, obj: Any) {
|
||||
fun writeExit(@Suppress("UNUSED_PARAMETER") kryo: Kryo, output: Output, obj: Any) {
|
||||
events[Strand.currentStrand().id]!!.add(
|
||||
StatsEvent.Exit(obj.javaClass.name, output.total())
|
||||
)
|
||||
|
@ -96,6 +96,12 @@ jar {
|
||||
}
|
||||
}
|
||||
|
||||
cordapp {
|
||||
info {
|
||||
vendor "R3"
|
||||
}
|
||||
}
|
||||
|
||||
publish {
|
||||
name jar.baseName
|
||||
}
|
||||
|
@ -1,37 +0,0 @@
|
||||
package net.corda.nodeapi.exceptions
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.flows.FlowException
|
||||
import java.io.InvalidClassException
|
||||
|
||||
// could change to use package name matching but trying to avoid reflection for now
|
||||
private val whitelisted = setOf(
|
||||
FlowException::class,
|
||||
InvalidClassException::class,
|
||||
RpcSerializableError::class,
|
||||
TransactionVerificationException::class
|
||||
)
|
||||
|
||||
/**
|
||||
* An [Exception] to signal RPC clients that something went wrong within a Corda node.
|
||||
*/
|
||||
class InternalNodeException(message: String) : CordaRuntimeException(message) {
|
||||
|
||||
companion object {
|
||||
|
||||
private const val DEFAULT_MESSAGE = "Something went wrong within the Corda node."
|
||||
|
||||
fun defaultMessage(): String = DEFAULT_MESSAGE
|
||||
|
||||
fun obfuscateIfInternal(wrapped: Throwable): Throwable {
|
||||
(wrapped as? CordaRuntimeException)?.setCause(null)
|
||||
return when {
|
||||
whitelisted.any { it.isInstance(wrapped) } -> wrapped
|
||||
else -> InternalNodeException(DEFAULT_MESSAGE).apply {
|
||||
stackTrace = emptyArray()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package net.corda.nodeapi.exceptions
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.crypto.SecureHash
|
||||
|
||||
class OutdatedNetworkParameterHashException(old: SecureHash, new: SecureHash) : CordaRuntimeException(TEMPLATE.format(old, new)), RpcSerializableError {
|
||||
|
||||
private companion object {
|
||||
private const val TEMPLATE = "Refused to accept parameters with hash %s because network map advertises update with hash %s. Please check newest version"
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.nodeapi.exceptions
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.ClientRelevantError
|
||||
import net.corda.core.flows.IdentifiableException
|
||||
|
||||
/**
|
||||
* Thrown to indicate that an attachment was already uploaded to a Corda node.
|
||||
*/
|
||||
class DuplicateAttachmentException(attachmentHash: String) : java.nio.file.FileAlreadyExistsException(attachmentHash), ClientRelevantError
|
||||
|
||||
/**
|
||||
* Thrown to indicate that a flow was not designed for RPC and should be started from an RPC client.
|
||||
*/
|
||||
class NonRpcFlowException(logicType: Class<*>) : IllegalArgumentException("${logicType.name} was not designed for RPC"), ClientRelevantError
|
||||
|
||||
/**
|
||||
* An [Exception] to signal RPC clients that something went wrong within a Corda node.
|
||||
* The message is generic on purpose, as this prevents internal information from reaching RPC clients.
|
||||
* Leaking internal information outside can compromise privacy e.g., party names and security e.g., passwords, stacktraces, etc.
|
||||
*
|
||||
* @param errorIdentifier an optional identifier for tracing problems across parties.
|
||||
*/
|
||||
class InternalNodeException(private val errorIdentifier: Long? = null) : CordaRuntimeException(message), ClientRelevantError, IdentifiableException {
|
||||
|
||||
companion object {
|
||||
/**
|
||||
* Message for the exception.
|
||||
*/
|
||||
const val message = "Something went wrong within the Corda node."
|
||||
}
|
||||
|
||||
override fun getErrorId(): Long? {
|
||||
return errorIdentifier
|
||||
}
|
||||
}
|
||||
|
||||
class OutdatedNetworkParameterHashException(old: SecureHash, new: SecureHash) : CordaRuntimeException(TEMPLATE.format(old, new)), ClientRelevantError {
|
||||
|
||||
private companion object {
|
||||
private const val TEMPLATE = "Refused to accept parameters with hash %s because network map advertises update with hash %s. Please check newest version"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
|
||||
*/
|
||||
class RejectedCommandException(message: String) : CordaRuntimeException(message), ClientRelevantError
|
@ -1,9 +0,0 @@
|
||||
package net.corda.nodeapi.exceptions
|
||||
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
/**
|
||||
* Allows an implementing [Throwable] to be propagated to RPC clients.
|
||||
*/
|
||||
@CordaSerializable
|
||||
interface RpcSerializableError
|
@ -1,15 +0,0 @@
|
||||
package net.corda.nodeapi.exceptions.adapters
|
||||
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
|
||||
/**
|
||||
* Adapter able to mask errors within a Corda node for RPC clients.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class InternalObfuscatingFlowHandle<RESULT>(val wrapped: FlowHandle<RESULT>) : FlowHandle<RESULT> by wrapped {
|
||||
|
||||
override val returnValue = wrapped.returnValue.mapError(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
}
|
@ -1,22 +0,0 @@
|
||||
package net.corda.nodeapi.exceptions.adapters
|
||||
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.mapErrors
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
|
||||
/**
|
||||
* Adapter able to mask errors within a Corda node for RPC clients.
|
||||
*/
|
||||
@CordaSerializable
|
||||
class InternalObfuscatingFlowProgressHandle<RESULT>(val wrapped: FlowProgressHandle<RESULT>) : FlowProgressHandle<RESULT> by wrapped {
|
||||
|
||||
override val returnValue = wrapped.returnValue.mapError(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
|
||||
override val progress = wrapped.progress.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
|
||||
override val stepsTreeIndexFeed = wrapped.stepsTreeIndexFeed?.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
|
||||
override val stepsTreeFeed = wrapped.stepsTreeFeed?.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
package net.corda
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.nodeapi.exceptions.RpcSerializableError
|
||||
import net.corda.core.ClientRelevantError
|
||||
|
||||
class ClientRelevantException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause), RpcSerializableError
|
||||
class ClientRelevantException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause), ClientRelevantError
|
@ -22,7 +22,6 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
@ -57,7 +56,6 @@ class BootTests : IntegrationTest() {
|
||||
start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue
|
||||
assertThatThrownBy { future.getOrThrow() }
|
||||
.isInstanceOf(CordaRuntimeException::class.java)
|
||||
.hasMessageContaining(InternalNodeException.defaultMessage())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,13 @@ package net.corda.node.services
|
||||
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.PartyAndReference
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.cordapp.CordappProvider
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -32,10 +38,10 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.DriverDSL
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.*
|
||||
@ -118,10 +124,10 @@ class AttachmentLoadingTests : IntegrationTest() {
|
||||
|
||||
@Test
|
||||
fun `test that attachments retrieved over the network are not used for code`() = withoutTestSerialization {
|
||||
driver {
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
installIsolatedCordappTo(bankAName)
|
||||
val (bankA, bankB) = createTwoNodes()
|
||||
assertFailsWith<InternalNodeException> {
|
||||
assertFailsWith<CordaRuntimeException>("Party C=CH,L=Zurich,O=BankB rejected session request: Don't know net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator") {
|
||||
bankA.rpc.startFlowDynamic(flowInitiatorClass, bankB.nodeInfo.legalIdentities.first()).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
|
@ -2,13 +2,13 @@ package net.corda.node.services.rpc
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.ClientRelevantException
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
@ -21,7 +21,7 @@ import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.User
|
||||
import org.assertj.core.api.Assertions.assertThatCode
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
|
||||
import org.hibernate.exception.GenericJDBCException
|
||||
import org.junit.ClassRule
|
||||
@ -45,11 +45,10 @@ class RpcExceptionHandlingTest : IntegrationTest() {
|
||||
|
||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
|
||||
assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
||||
assertThatThrownBy { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(CordaRuntimeException::class.java) { exception ->
|
||||
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -61,7 +60,7 @@ class RpcExceptionHandlingTest : IntegrationTest() {
|
||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
val clientRelevantMessage = "This is for the players!"
|
||||
|
||||
assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception ->
|
||||
assertThatThrownBy { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(CordaRuntimeException::class.java) { exception ->
|
||||
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
@ -75,7 +74,7 @@ class RpcExceptionHandlingTest : IntegrationTest() {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
|
||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
val exceptionMessage = "Flow error!"
|
||||
assertThatCode { node.rpc.startFlow(::FlowExceptionFlow, exceptionMessage).returnValue.getOrThrow() }
|
||||
assertThatThrownBy { node.rpc.startFlow(::FlowExceptionFlow, exceptionMessage).returnValue.getOrThrow() }
|
||||
.isInstanceOfSatisfying(FlowException::class.java) { exception ->
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
@ -91,12 +90,10 @@ class RpcExceptionHandlingTest : IntegrationTest() {
|
||||
val nodeA = startNode(NodeParameters(providedName = ALICE_NAME, rpcUsers = users)).getOrThrow()
|
||||
val nodeB = startNode(NodeParameters(providedName = BOB_NAME, rpcUsers = users)).getOrThrow()
|
||||
|
||||
assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() }
|
||||
.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
||||
assertThatThrownBy { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() }.isInstanceOfSatisfying(CordaRuntimeException::class.java) { exception ->
|
||||
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -74,6 +74,8 @@ import net.corda.node.internal.cordapp.CordappConfigFileProvider
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
import net.corda.node.internal.cordapp.CordappProviderInternal
|
||||
import net.corda.node.internal.rpc.proxies.AuthenticatedRpcOpsProxy
|
||||
import net.corda.node.internal.rpc.proxies.ExceptionSerialisingRpcOpsProxy
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.ContractUpgradeHandler
|
||||
import net.corda.node.services.FinalityHandler
|
||||
@ -245,7 +247,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
/** The implementation of the [CordaRPCOps] interface used by this node. */
|
||||
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
|
||||
|
||||
return SecureCordaRPCOps(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } })
|
||||
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } })
|
||||
// Mind that order is relevant here.
|
||||
val proxies = listOf(::AuthenticatedRpcOpsProxy, ::ExceptionSerialisingRpcOpsProxy)
|
||||
return proxies.fold(ops) { delegate, decorate -> decorate(delegate) }
|
||||
}
|
||||
|
||||
private fun initCertificate() {
|
||||
|
@ -35,12 +35,12 @@ import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.messaging.context
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.exceptions.NonRpcFlowException
|
||||
import net.corda.nodeapi.exceptions.RejectedCommandException
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import rx.Observable
|
||||
@ -183,7 +183,7 @@ internal class CordaRPCOpsImpl(
|
||||
}
|
||||
|
||||
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachine<T> {
|
||||
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
|
||||
if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType)
|
||||
if (isFlowsDrainingModeEnabled()) {
|
||||
throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
|
||||
}
|
||||
@ -335,8 +335,4 @@ internal class CordaRPCOpsImpl(
|
||||
is InvocationOrigin.Scheduled -> FlowInitiator.Scheduled((origin as InvocationOrigin.Scheduled).scheduledState)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.internal
|
||||
|
||||
import java.lang.reflect.InvocationHandler
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* Helps writing correct [InvocationHandler]s.
|
||||
*/
|
||||
internal interface InvocationHandlerTemplate : InvocationHandler {
|
||||
val delegate: Any
|
||||
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
||||
val args = arguments ?: emptyArray()
|
||||
return try {
|
||||
method.invoke(delegate, *args)
|
||||
} catch (e: InvocationTargetException) {
|
||||
throw e.targetException
|
||||
}
|
||||
}
|
||||
}
|
@ -92,7 +92,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
|
||||
fun printWarning(message: String) {
|
||||
Emoji.renderIfSupported {
|
||||
println("${Emoji.warningSign} ATTENTION: ${message}")
|
||||
println("${Emoji.warningSign} ATTENTION: $message")
|
||||
}
|
||||
staticLog.warn(message)
|
||||
}
|
||||
@ -196,7 +196,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
val serverAddress = configuration.messagingServerAddress
|
||||
?: NetworkHostAndPort("localhost", configuration.p2pAddress.port)
|
||||
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) {
|
||||
BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress)
|
||||
BrokerAddresses(configuration.rpcOptions.address, configuration.rpcOptions.adminAddress)
|
||||
} else {
|
||||
startLocalRpcBroker()
|
||||
}
|
||||
@ -320,10 +320,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
// Start up the MQ clients.
|
||||
internalRpcMessagingClient?.run {
|
||||
runOnStop += this::close
|
||||
when (rpcOps) {
|
||||
is SecureCordaRPCOps -> init(RpcExceptionHandlingProxy(rpcOps), securityManager)
|
||||
else -> init(rpcOps, securityManager)
|
||||
}
|
||||
init(rpcOps, securityManager)
|
||||
}
|
||||
verifierMessagingClient?.run {
|
||||
runOnStop += this::stop
|
||||
@ -415,7 +412,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps))
|
||||
registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps))
|
||||
registerScheme(KryoServerSerializationScheme() )
|
||||
registerScheme(KryoServerSerializationScheme())
|
||||
},
|
||||
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
|
||||
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),
|
||||
|
@ -1,201 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.node.services.messaging.RpcAuthContext
|
||||
import java.io.InputStream
|
||||
import java.security.PublicKey
|
||||
|
||||
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
|
||||
class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val context: () -> RpcAuthContext) : CordaRPCOps {
|
||||
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> = guard("networkParametersFeed") {
|
||||
implementation.networkParametersFeed()
|
||||
}
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) = guard("acceptNewNetworkParameters") {
|
||||
implementation.acceptNewNetworkParameters(parametersHash)
|
||||
}
|
||||
|
||||
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash = guard("uploadAttachmentWithMetadata") {
|
||||
implementation.uploadAttachmentWithMetadata(jar, uploader, filename)
|
||||
}
|
||||
|
||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> = guard("queryAttachments") {
|
||||
implementation.queryAttachments(query, sorting)
|
||||
}
|
||||
|
||||
override fun stateMachinesSnapshot() = guard("stateMachinesSnapshot") {
|
||||
implementation.stateMachinesSnapshot()
|
||||
}
|
||||
|
||||
override fun stateMachinesFeed() = guard("stateMachinesFeed") {
|
||||
implementation.stateMachinesFeed()
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = guard("vaultQueryBy") {
|
||||
implementation.vaultQueryBy(criteria, paging, sorting, contractStateType)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = guard("vaultTrackBy") {
|
||||
implementation.vaultTrackBy(criteria, paging, sorting, contractStateType)
|
||||
}
|
||||
|
||||
@Suppress("DEPRECATION", "OverridingDeprecatedMember")
|
||||
override fun internalVerifiedTransactionsSnapshot() = guard("internalVerifiedTransactionsSnapshot", implementation::internalVerifiedTransactionsSnapshot)
|
||||
|
||||
@Suppress("DEPRECATION", "OverridingDeprecatedMember")
|
||||
override fun internalVerifiedTransactionsFeed() = guard("internalVerifiedTransactionsFeed", implementation::internalVerifiedTransactionsFeed)
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingSnapshot() = guard("stateMachineRecordedTransactionMappingSnapshot", implementation::stateMachineRecordedTransactionMappingSnapshot)
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingFeed() = guard("stateMachineRecordedTransactionMappingFeed", implementation::stateMachineRecordedTransactionMappingFeed)
|
||||
|
||||
override fun networkMapSnapshot(): List<NodeInfo> = guard("networkMapSnapshot", implementation::networkMapSnapshot)
|
||||
|
||||
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> = guard("networkMapFeed", implementation::networkMapFeed)
|
||||
|
||||
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?) = guard("startFlowDynamic", listOf(logicType)) {
|
||||
implementation.startFlowDynamic(logicType, *args)
|
||||
}
|
||||
|
||||
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?) = guard("startTrackedFlowDynamic", listOf(logicType)) {
|
||||
implementation.startTrackedFlowDynamic(logicType, *args)
|
||||
}
|
||||
|
||||
override fun killFlow(id: StateMachineRunId): Boolean = guard("killFlow") {
|
||||
return implementation.killFlow(id)
|
||||
}
|
||||
|
||||
override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo)
|
||||
|
||||
override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities)
|
||||
|
||||
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") {
|
||||
implementation.addVaultTransactionNote(txnId, txnNote)
|
||||
}
|
||||
|
||||
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> = guard("getVaultTransactionNotes") {
|
||||
implementation.getVaultTransactionNotes(txnId)
|
||||
}
|
||||
|
||||
override fun attachmentExists(id: SecureHash) = guard("attachmentExists") {
|
||||
implementation.attachmentExists(id)
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash) = guard("openAttachment") {
|
||||
implementation.openAttachment(id)
|
||||
}
|
||||
|
||||
override fun uploadAttachment(jar: InputStream) = guard("uploadAttachment") {
|
||||
implementation.uploadAttachment(jar)
|
||||
}
|
||||
|
||||
override fun currentNodeTime() = guard("currentNodeTime", implementation::currentNodeTime)
|
||||
|
||||
override fun waitUntilNetworkReady() = guard("waitUntilNetworkReady", implementation::waitUntilNetworkReady)
|
||||
|
||||
override fun wellKnownPartyFromAnonymous(party: AbstractParty) = guard("wellKnownPartyFromAnonymous") {
|
||||
implementation.wellKnownPartyFromAnonymous(party)
|
||||
}
|
||||
|
||||
override fun partyFromKey(key: PublicKey) = guard("partyFromKey") {
|
||||
implementation.partyFromKey(key)
|
||||
}
|
||||
|
||||
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name) = guard("wellKnownPartyFromX500Name") {
|
||||
implementation.wellKnownPartyFromX500Name(x500Name)
|
||||
}
|
||||
|
||||
override fun notaryPartyFromX500Name(x500Name: CordaX500Name) = guard("notaryPartyFromX500Name") {
|
||||
implementation.notaryPartyFromX500Name(x500Name)
|
||||
}
|
||||
|
||||
override fun partiesFromName(query: String, exactMatch: Boolean) = guard("partiesFromName") {
|
||||
implementation.partiesFromName(query, exactMatch)
|
||||
}
|
||||
|
||||
override fun registeredFlows() = guard("registeredFlows", implementation::registeredFlows)
|
||||
|
||||
override fun nodeInfoFromParty(party: AbstractParty) = guard("nodeInfoFromParty") {
|
||||
implementation.nodeInfoFromParty(party)
|
||||
}
|
||||
|
||||
override fun clearNetworkMapCache() = guard("clearNetworkMapCache", implementation::clearNetworkMapCache)
|
||||
|
||||
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>): Vault.Page<T> = guard("vaultQuery") {
|
||||
implementation.vaultQuery(contractStateType)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> = guard("vaultQueryByCriteria") {
|
||||
implementation.vaultQueryByCriteria(criteria, contractStateType)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> = guard("vaultQueryByWithPagingSpec") {
|
||||
implementation.vaultQueryByWithPagingSpec(contractStateType, criteria, paging)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> = guard("vaultQueryByWithSorting") {
|
||||
implementation.vaultQueryByWithSorting(contractStateType, criteria, sorting)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> = guard("vaultTrack") {
|
||||
implementation.vaultTrack(contractStateType)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> = guard("vaultTrackByCriteria") {
|
||||
implementation.vaultTrackByCriteria(contractStateType, criteria)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> = guard("vaultTrackByWithPagingSpec") {
|
||||
implementation.vaultTrackByWithPagingSpec(contractStateType, criteria, paging)
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> = guard("vaultTrackByWithSorting") {
|
||||
implementation.vaultTrackByWithSorting(contractStateType, criteria, sorting)
|
||||
}
|
||||
|
||||
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = guard("setFlowsDrainingModeEnabled") {
|
||||
implementation.setFlowsDrainingModeEnabled(enabled)
|
||||
}
|
||||
|
||||
override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled)
|
||||
|
||||
override fun shutdown() = guard("shutdown", implementation::shutdown)
|
||||
|
||||
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
|
||||
private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action)
|
||||
|
||||
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
|
||||
private inline fun <RESULT> guard(methodName: String, args: List<Class<*>>, action: () -> RESULT) : RESULT {
|
||||
if (!context().isPermitted(methodName, *(args.map { it.name }.toTypedArray()))) {
|
||||
throw PermissionException("User not authorized to perform RPC call $methodName with target $args")
|
||||
}
|
||||
else {
|
||||
return action()
|
||||
}
|
||||
}
|
||||
}
|
@ -1,154 +0,0 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.doOnError
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.concurrent.doOnError
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.mapErrors
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.nodeapi.exceptions.adapters.InternalObfuscatingFlowHandle
|
||||
import net.corda.nodeapi.exceptions.adapters.InternalObfuscatingFlowProgressHandle
|
||||
import java.io.InputStream
|
||||
import java.security.PublicKey
|
||||
|
||||
class RpcExceptionHandlingProxy(private val delegate: SecureCordaRPCOps) : CordaRPCOps {
|
||||
|
||||
private companion object {
|
||||
private val logger = loggerFor<RpcExceptionHandlingProxy>()
|
||||
}
|
||||
|
||||
override val protocolVersion: Int get() = delegate.protocolVersion
|
||||
|
||||
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> = wrap {
|
||||
|
||||
val handle = delegate.startFlowDynamic(logicType, *args)
|
||||
val result = InternalObfuscatingFlowHandle(handle)
|
||||
result.returnValue.doOnError { error -> logger.error(error.message, error) }
|
||||
result
|
||||
}
|
||||
|
||||
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> = wrap {
|
||||
|
||||
val handle = delegate.startTrackedFlowDynamic(logicType, *args)
|
||||
val result = InternalObfuscatingFlowProgressHandle(handle)
|
||||
result.returnValue.doOnError { error -> logger.error(error.message, error) }
|
||||
result
|
||||
}
|
||||
|
||||
override fun waitUntilNetworkReady() = wrapFuture(delegate::waitUntilNetworkReady)
|
||||
|
||||
override fun stateMachinesFeed() = wrapFeed(delegate::stateMachinesFeed)
|
||||
|
||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = wrapFeed { delegate.vaultTrackBy(criteria, paging, sorting, contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>) = wrapFeed { delegate.vaultTrack(contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria) = wrapFeed { delegate.vaultTrackByCriteria(contractStateType, criteria) }
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification) = wrapFeed { delegate.vaultTrackByWithPagingSpec(contractStateType, criteria, paging) }
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort) = wrapFeed { delegate.vaultTrackByWithSorting(contractStateType, criteria, sorting) }
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingFeed() = wrapFeed(delegate::stateMachineRecordedTransactionMappingFeed)
|
||||
|
||||
override fun networkMapFeed() = wrapFeed(delegate::networkMapFeed)
|
||||
|
||||
override fun networkParametersFeed() = wrapFeed(delegate::networkParametersFeed)
|
||||
|
||||
@Suppress("DEPRECATION", "OverridingDeprecatedMember")
|
||||
override fun internalVerifiedTransactionsFeed() = wrapFeed(delegate::internalVerifiedTransactionsFeed)
|
||||
|
||||
override fun stateMachinesSnapshot() = wrap(delegate::stateMachinesSnapshot)
|
||||
|
||||
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = wrap { delegate.vaultQueryBy(criteria, paging, sorting, contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>) = wrap { delegate.vaultQuery(contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>) = wrap { delegate.vaultQueryByCriteria(criteria, contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification) = wrap { delegate.vaultQueryByWithPagingSpec(contractStateType, criteria, paging) }
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort) = wrap { delegate.vaultQueryByWithSorting(contractStateType, criteria, sorting) }
|
||||
|
||||
@Suppress("DEPRECATION", "OverridingDeprecatedMember")
|
||||
override fun internalVerifiedTransactionsSnapshot() = wrap(delegate::internalVerifiedTransactionsSnapshot)
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingSnapshot() = wrap(delegate::stateMachineRecordedTransactionMappingSnapshot)
|
||||
|
||||
override fun networkMapSnapshot() = wrap(delegate::networkMapSnapshot)
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) = wrap { delegate.acceptNewNetworkParameters(parametersHash) }
|
||||
|
||||
override fun killFlow(id: StateMachineRunId) = wrap { delegate.killFlow(id) }
|
||||
|
||||
override fun nodeInfo() = wrap(delegate::nodeInfo)
|
||||
|
||||
override fun notaryIdentities() = wrap(delegate::notaryIdentities)
|
||||
|
||||
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = wrap { delegate.addVaultTransactionNote(txnId, txnNote) }
|
||||
|
||||
override fun getVaultTransactionNotes(txnId: SecureHash) = wrap { delegate.getVaultTransactionNotes(txnId) }
|
||||
|
||||
override fun attachmentExists(id: SecureHash) = wrap { delegate.attachmentExists(id) }
|
||||
|
||||
override fun openAttachment(id: SecureHash) = wrap { delegate.openAttachment(id) }
|
||||
|
||||
override fun uploadAttachment(jar: InputStream) = wrap { delegate.uploadAttachment(jar) }
|
||||
|
||||
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String) = wrap { delegate.uploadAttachmentWithMetadata(jar, uploader, filename) }
|
||||
|
||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?) = wrap { delegate.queryAttachments(query, sorting) }
|
||||
|
||||
override fun currentNodeTime() = wrap(delegate::currentNodeTime)
|
||||
|
||||
override fun wellKnownPartyFromAnonymous(party: AbstractParty) = wrap { delegate.wellKnownPartyFromAnonymous(party) }
|
||||
|
||||
override fun partyFromKey(key: PublicKey) = wrap { delegate.partyFromKey(key) }
|
||||
|
||||
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name) = wrap { delegate.wellKnownPartyFromX500Name(x500Name) }
|
||||
|
||||
override fun notaryPartyFromX500Name(x500Name: CordaX500Name) = wrap { delegate.notaryPartyFromX500Name(x500Name) }
|
||||
|
||||
override fun partiesFromName(query: String, exactMatch: Boolean) = wrap { delegate.partiesFromName(query, exactMatch) }
|
||||
|
||||
override fun registeredFlows() = wrap(delegate::registeredFlows)
|
||||
|
||||
override fun nodeInfoFromParty(party: AbstractParty) = wrap { delegate.nodeInfoFromParty(party) }
|
||||
|
||||
override fun clearNetworkMapCache() = wrap(delegate::clearNetworkMapCache)
|
||||
|
||||
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = wrap { delegate.setFlowsDrainingModeEnabled(enabled) }
|
||||
|
||||
override fun isFlowsDrainingModeEnabled() = wrap(delegate::isFlowsDrainingModeEnabled)
|
||||
|
||||
override fun shutdown() = wrap(delegate::shutdown)
|
||||
|
||||
private fun <RESULT> wrap(call: () -> RESULT): RESULT {
|
||||
|
||||
return try {
|
||||
call.invoke()
|
||||
} catch (error: Throwable) {
|
||||
logger.error(error.message, error)
|
||||
throw InternalNodeException.obfuscateIfInternal(error)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <SNAPSHOT, ELEMENT> wrapFeed(call: () -> DataFeed<SNAPSHOT, ELEMENT>) = wrap {
|
||||
|
||||
call.invoke().doOnError { error -> logger.error(error.message, error) }.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
}
|
||||
|
||||
private fun <RESULT> wrapFuture(call: () -> CordaFuture<RESULT>): CordaFuture<RESULT> = wrap { call.invoke().mapError(InternalNodeException.Companion::obfuscateIfInternal).doOnError { error -> logger.error(error.message, error) } }
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.messaging.rpcContext
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
|
||||
/**
|
||||
* Implementation of [CordaRPCOps] that checks authorisation.
|
||||
*/
|
||||
class SecureCordaRPCOps(services: ServiceHubInternal,
|
||||
smm: StateMachineManager,
|
||||
database: CordaPersistence,
|
||||
flowStarter: FlowStarter,
|
||||
shutdownNode: () -> Unit,
|
||||
val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, shutdownNode)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) {
|
||||
|
||||
/**
|
||||
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
||||
* to be present.
|
||||
*/
|
||||
override val protocolVersion: Int get() = unsafe.nodeInfo().platformVersion
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.internal.rpc.proxies
|
||||
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.node.internal.InvocationHandlerTemplate
|
||||
import net.corda.node.services.messaging.RpcAuthContext
|
||||
import net.corda.node.services.messaging.rpcContext
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy
|
||||
|
||||
/**
|
||||
* Implementation of [CordaRPCOps] that checks authorisation.
|
||||
*/
|
||||
internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : CordaRPCOps by proxy(delegate, ::rpcContext) {
|
||||
/**
|
||||
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
||||
* to be present.
|
||||
*/
|
||||
override val protocolVersion: Int get() = delegate.nodeInfo().platformVersion
|
||||
|
||||
// Need overriding to pass additional `listOf(logicType)` argument for polymorphic `startFlow` permissions.
|
||||
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?) = guard("startFlowDynamic", listOf(logicType), ::rpcContext) {
|
||||
delegate.startFlowDynamic(logicType, *args)
|
||||
}
|
||||
|
||||
// Need overriding to pass additional `listOf(logicType)` argument for polymorphic `startFlow` permissions.
|
||||
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?) = guard("startTrackedFlowDynamic", listOf(logicType), ::rpcContext) {
|
||||
delegate.startTrackedFlowDynamic(logicType, *args)
|
||||
}
|
||||
|
||||
private companion object {
|
||||
private fun proxy(delegate: CordaRPCOps, context: () -> RpcAuthContext): CordaRPCOps {
|
||||
|
||||
val handler = PermissionsEnforcingInvocationHandler(delegate, context)
|
||||
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private class PermissionsEnforcingInvocationHandler(override val delegate: CordaRPCOps, private val context: () -> RpcAuthContext) : InvocationHandlerTemplate {
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?) = guard(method.name, context, { super.invoke(proxy, method, arguments) })
|
||||
}
|
||||
}
|
||||
|
||||
private fun <RESULT> guard(methodName: String, context: () -> RpcAuthContext, action: () -> RESULT) = guard(methodName, emptyList(), context, action)
|
||||
|
||||
private fun <RESULT> guard(methodName: String, args: List<Class<*>>, context: () -> RpcAuthContext, action: () -> RESULT): RESULT {
|
||||
if (!context().isPermitted(methodName, *(args.map { it.name }.toTypedArray()))) {
|
||||
throw PermissionException("User not authorized to perform RPC call $methodName with target $args")
|
||||
} else {
|
||||
return action()
|
||||
}
|
||||
}
|
@ -0,0 +1,110 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.internal.rpc.proxies
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.CordaThrowable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.mapErrors
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowHandleImpl
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.FlowProgressHandleImpl
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.node.internal.InvocationHandlerTemplate
|
||||
import rx.Observable
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy.newProxyInstance
|
||||
|
||||
internal class ExceptionSerialisingRpcOpsProxy(private val delegate: CordaRPCOps) : CordaRPCOps by proxy(delegate) {
|
||||
private companion object {
|
||||
private fun proxy(delegate: CordaRPCOps): CordaRPCOps {
|
||||
val handler = ErrorSerialisingInvocationHandler(delegate)
|
||||
return newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private class ErrorSerialisingInvocationHandler(override val delegate: CordaRPCOps) : InvocationHandlerTemplate {
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
||||
try {
|
||||
val result = super.invoke(proxy, method, arguments)
|
||||
return result?.let { ensureSerialisable(it) }
|
||||
} catch (exception: Exception) {
|
||||
throw ensureSerialisable(exception)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <RESULT : Any> ensureSerialisable(result: RESULT): Any {
|
||||
return when (result) {
|
||||
is CordaFuture<*> -> wrapFuture(result)
|
||||
is DataFeed<*, *> -> wrapFeed(result)
|
||||
is FlowProgressHandle<*> -> wrapFlowProgressHandle(result)
|
||||
is FlowHandle<*> -> wrapFlowHandle(result)
|
||||
is Observable<*> -> wrapObservable(result)
|
||||
else -> result
|
||||
}
|
||||
}
|
||||
|
||||
private fun wrapFlowProgressHandle(handle: FlowProgressHandle<*>): FlowProgressHandle<*> {
|
||||
val returnValue = wrapFuture(handle.returnValue)
|
||||
val progress = wrapObservable(handle.progress)
|
||||
val stepsTreeIndexFeed = handle.stepsTreeIndexFeed?.let { wrapFeed(it) }
|
||||
val stepsTreeFeed = handle.stepsTreeFeed?.let { wrapFeed(it) }
|
||||
|
||||
return FlowProgressHandleImpl(handle.id, returnValue, progress, stepsTreeIndexFeed, stepsTreeFeed)
|
||||
}
|
||||
|
||||
private fun wrapFlowHandle(handle: FlowHandle<*>): FlowHandle<*> {
|
||||
return FlowHandleImpl(handle.id, wrapFuture(handle.returnValue))
|
||||
}
|
||||
|
||||
private fun <ELEMENT> wrapObservable(observable: Observable<ELEMENT>): Observable<ELEMENT> {
|
||||
return observable.mapErrors(::ensureSerialisable)
|
||||
}
|
||||
|
||||
private fun <SNAPSHOT, ELEMENT> wrapFeed(feed: DataFeed<SNAPSHOT, ELEMENT>): DataFeed<SNAPSHOT, ELEMENT> {
|
||||
return feed.mapErrors(::ensureSerialisable)
|
||||
}
|
||||
|
||||
private fun <RESULT> wrapFuture(future: CordaFuture<RESULT>): CordaFuture<RESULT> {
|
||||
return future.mapError(::ensureSerialisable)
|
||||
}
|
||||
|
||||
private fun ensureSerialisable(error: Throwable): Throwable {
|
||||
val serialisable = (superclasses(error::class.java) + error::class.java).any { it.isAnnotationPresent(CordaSerializable::class.java) || it.interfaces.any { it.isAnnotationPresent(CordaSerializable::class.java) } }
|
||||
val result = if (serialisable) error else CordaRuntimeException(error.message, error)
|
||||
if (result is CordaThrowable) {
|
||||
result.stackTrace = arrayOf<StackTraceElement>()
|
||||
result.setCause(null)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
private fun superclasses(clazz: Class<*>): List<Class<*>> {
|
||||
val superclasses = mutableListOf<Class<*>>()
|
||||
var current: Class<*>?
|
||||
var superclass = clazz.superclass
|
||||
while (superclass != null) {
|
||||
superclasses += superclass
|
||||
current = superclass
|
||||
superclass = current.superclass
|
||||
}
|
||||
return superclasses
|
||||
}
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "ExceptionSerialisingRpcOpsProxy"
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
import net.corda.node.utilities.NonInvalidatingWeightBasedCache
|
||||
import net.corda.nodeapi.exceptions.DuplicateAttachmentException
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.nodeapi.internal.withContractsInJar
|
||||
@ -297,7 +298,7 @@ class NodeAttachmentService(
|
||||
log.info("Stored new attachment $id")
|
||||
id
|
||||
} else {
|
||||
throw java.nio.file.FileAlreadyExistsException(id.toString())
|
||||
throw DuplicateAttachmentException(id.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,13 +41,13 @@ import net.corda.finance.USD
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.node.internal.SecureCordaRPCOps
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
|
||||
import net.corda.node.services.messaging.RpcAuthContext
|
||||
import net.corda.nodeapi.exceptions.NonRpcFlowException
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.expect
|
||||
@ -98,7 +98,7 @@ class CordaRPCOpsImplTest {
|
||||
fun setup() {
|
||||
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas"))
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services, { })
|
||||
rpc = aliceNode.rpcOps
|
||||
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
|
||||
|
||||
mockNet.runNetwork()
|
||||
@ -276,10 +276,10 @@ class CordaRPCOpsImplTest {
|
||||
@Test
|
||||
fun `can't upload the same attachment`() {
|
||||
withPermissions(invokeRpc(CordaRPCOps::uploadAttachment), invokeRpc(CordaRPCOps::attachmentExists)) {
|
||||
val inputJar1 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)
|
||||
val inputJar2 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)
|
||||
val secureHash1 = rpc.uploadAttachment(inputJar1)
|
||||
assertThatExceptionOfType(java.nio.file.FileAlreadyExistsException::class.java).isThrownBy {
|
||||
val inputJar1 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)
|
||||
val inputJar2 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)
|
||||
val secureHash1 = rpc.uploadAttachment(inputJar1)
|
||||
val secureHash2 = rpc.uploadAttachment(inputJar2)
|
||||
}
|
||||
}
|
||||
@ -303,7 +303,7 @@ class CordaRPCOpsImplTest {
|
||||
@Test
|
||||
fun `attempt to start non-RPC flow`() {
|
||||
withPermissions(startFlow<NonRPCFlow>()) {
|
||||
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
|
||||
assertThatExceptionOfType(NonRpcFlowException::class.java).isThrownBy {
|
||||
rpc.startFlow(::NonRPCFlow)
|
||||
}
|
||||
}
|
||||
|
@ -116,6 +116,19 @@ class FlowFrameworkTests {
|
||||
assertThat(flow.lazyTime).isNotNull()
|
||||
}
|
||||
|
||||
class SuspendThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
|
||||
var thrown = false
|
||||
@Suspendable
|
||||
override fun executeAction(fiber: FlowFiber, action: Action) {
|
||||
if (action is Action.CommitTransaction && !thrown) {
|
||||
thrown = true
|
||||
throw exception
|
||||
} else {
|
||||
delegate.executeAction(fiber, action)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `exception while fiber suspended`() {
|
||||
bobNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
|
||||
@ -123,7 +136,7 @@ class FlowFrameworkTests {
|
||||
val fiber = aliceNode.services.startFlow(flow) as FlowStateMachineImpl
|
||||
// Before the flow runs change the suspend action to throw an exception
|
||||
val exceptionDuringSuspend = Exception("Thrown during suspend")
|
||||
val throwingActionExecutor = ThrowingActionExecutor(exceptionDuringSuspend, fiber.transientValues!!.value.actionExecutor)
|
||||
val throwingActionExecutor = SuspendThrowingActionExecutor(exceptionDuringSuspend, fiber.transientValues!!.value.actionExecutor)
|
||||
fiber.transientValues = TransientReference(fiber.transientValues!!.value.copy(actionExecutor = throwingActionExecutor))
|
||||
mockNet.runNetwork()
|
||||
assertThatThrownBy {
|
||||
|
Loading…
x
Reference in New Issue
Block a user