mirror of
https://github.com/corda/corda.git
synced 2025-01-18 10:46:38 +00:00
REVERT - CORDA-1264 - Needs more thought prior to release (#2952)
* REVERT - CORDA-1264 - Needs more thought prior to release
This reverts commit 33af80ac55
.
Since this causes regressions in the way errors are propogated to
clients it seems best to not break or hold up 3.1 but revert this commit
and move on
* fix revert
This commit is contained in:
parent
4e4901825a
commit
ddec72674c
@ -4356,7 +4356,7 @@ public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Compa
|
|||||||
public int getServerProtocolVersion()
|
public int getServerProtocolVersion()
|
||||||
public void notifyServerAndClose()
|
public void notifyServerAndClose()
|
||||||
##
|
##
|
||||||
public final class net.corda.client.rpc.PermissionException extends net.corda.core.CordaRuntimeException implements net.corda.nodeapi.exceptions.RpcSerializableError
|
public final class net.corda.client.rpc.PermissionException extends net.corda.core.CordaRuntimeException
|
||||||
public <init>(String)
|
public <init>(String)
|
||||||
##
|
##
|
||||||
@net.corda.core.DoNotImplement public interface net.corda.client.rpc.RPCConnection extends java.io.Closeable
|
@net.corda.core.DoNotImplement public interface net.corda.client.rpc.RPCConnection extends java.io.Closeable
|
||||||
|
@ -12,6 +12,7 @@ import net.corda.finance.DOLLARS
|
|||||||
import net.corda.finance.USD
|
import net.corda.finance.USD
|
||||||
import net.corda.finance.contracts.getCashBalance
|
import net.corda.finance.contracts.getCashBalance
|
||||||
import net.corda.finance.contracts.getCashBalances
|
import net.corda.finance.contracts.getCashBalances
|
||||||
|
import net.corda.finance.flows.CashException
|
||||||
import net.corda.finance.flows.CashIssueFlow
|
import net.corda.finance.flows.CashIssueFlow
|
||||||
import net.corda.finance.flows.CashPaymentFlow
|
import net.corda.finance.flows.CashPaymentFlow
|
||||||
import net.corda.finance.schemas.CashSchemaV1
|
import net.corda.finance.schemas.CashSchemaV1
|
||||||
@ -19,7 +20,6 @@ import net.corda.node.internal.Node
|
|||||||
import net.corda.node.internal.StartedNode
|
import net.corda.node.internal.StartedNode
|
||||||
import net.corda.node.services.Permissions.Companion.invokeRpc
|
import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||||
import net.corda.node.services.Permissions.Companion.startFlow
|
import net.corda.node.services.Permissions.Companion.startFlow
|
||||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
|
||||||
import net.corda.testing.core.*
|
import net.corda.testing.core.*
|
||||||
import net.corda.testing.node.User
|
import net.corda.testing.node.User
|
||||||
import net.corda.testing.node.internal.NodeBasedTest
|
import net.corda.testing.node.internal.NodeBasedTest
|
||||||
@ -101,7 +101,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
|
|||||||
fun `sub-type of FlowException thrown by flow`() {
|
fun `sub-type of FlowException thrown by flow`() {
|
||||||
login(rpcUser.username, rpcUser.password)
|
login(rpcUser.username, rpcUser.password)
|
||||||
val handle = connection!!.proxy.startFlow(::CashPaymentFlow, 100.DOLLARS, identity)
|
val handle = connection!!.proxy.startFlow(::CashPaymentFlow, 100.DOLLARS, identity)
|
||||||
assertThatExceptionOfType(InternalNodeException::class.java).isThrownBy {
|
assertThatExceptionOfType(CashException::class.java).isThrownBy {
|
||||||
handle.returnValue.getOrThrow()
|
handle.returnValue.getOrThrow()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,10 +2,9 @@ package net.corda.client.rpc
|
|||||||
|
|
||||||
import net.corda.core.CordaRuntimeException
|
import net.corda.core.CordaRuntimeException
|
||||||
import net.corda.core.serialization.CordaSerializable
|
import net.corda.core.serialization.CordaSerializable
|
||||||
import net.corda.nodeapi.exceptions.RpcSerializableError
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrown to indicate that the calling user does not have permission for something they have requested (for example
|
* Thrown to indicate that the calling user does not have permission for something they have requested (for example
|
||||||
* calling a method).
|
* calling a method).
|
||||||
*/
|
*/
|
||||||
class PermissionException(message: String) : CordaRuntimeException(message), RpcSerializableError
|
class PermissionException(msg: String) : CordaRuntimeException(msg)
|
||||||
|
@ -5,7 +5,6 @@ package net.corda.core
|
|||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.internal.concurrent.thenMatch
|
import net.corda.core.internal.concurrent.thenMatch
|
||||||
import net.corda.core.messaging.DataFeed
|
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Observer
|
import rx.Observer
|
||||||
|
|
||||||
@ -45,29 +44,3 @@ fun <T> Observable<T>.toFuture(): CordaFuture<T> = openFuture<T>().also {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a [DataFeed] that transforms errors according to the provided [transform] function.
|
|
||||||
*/
|
|
||||||
fun <SNAPSHOT, ELEMENT> DataFeed<SNAPSHOT, ELEMENT>.mapErrors(transform: (Throwable) -> Throwable): DataFeed<SNAPSHOT, ELEMENT> {
|
|
||||||
|
|
||||||
return copy(updates = updates.mapErrors(transform))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a [DataFeed] that processes errors according to the provided [action].
|
|
||||||
*/
|
|
||||||
fun <SNAPSHOT, ELEMENT> DataFeed<SNAPSHOT, ELEMENT>.doOnError(action: (Throwable) -> Unit): DataFeed<SNAPSHOT, ELEMENT> {
|
|
||||||
|
|
||||||
return copy(updates = updates.doOnError(action))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an [Observable] that transforms errors according to the provided [transform] function.
|
|
||||||
*/
|
|
||||||
fun <ELEMENT> Observable<ELEMENT>.mapErrors(transform: (Throwable) -> Throwable): Observable<ELEMENT> {
|
|
||||||
|
|
||||||
return onErrorResumeNext { error ->
|
|
||||||
Observable.error(transform(error))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -39,29 +39,6 @@ fun <V, W> CordaFuture<out V>.map(transform: (V) -> W): CordaFuture<W> = CordaFu
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a future that will also apply the passed closure on an error.
|
|
||||||
*/
|
|
||||||
fun <RESULT> CordaFuture<out RESULT>.doOnError(accept: (Throwable) -> Unit): CordaFuture<RESULT> = CordaFutureImpl<RESULT>().also { result ->
|
|
||||||
thenMatch({
|
|
||||||
result.capture { it }
|
|
||||||
}, {
|
|
||||||
accept(it)
|
|
||||||
result.setException(it)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a future that will map an error thrown using the provided [transform] function.
|
|
||||||
*/
|
|
||||||
fun <ELEMENT> CordaFuture<out ELEMENT>.mapError(transform: (Throwable) -> Throwable): CordaFuture<ELEMENT> = CordaFutureImpl<ELEMENT>().also { result ->
|
|
||||||
thenMatch({
|
|
||||||
result.capture { it }
|
|
||||||
}, {
|
|
||||||
result.setException(transform(it))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a future that will have the same outcome as the future returned by the given transform.
|
* Returns a future that will have the same outcome as the future returned by the given transform.
|
||||||
* But if this future or the transform fails, the returned future's outcome is the same throwable.
|
* But if this future or the transform fails, the returned future's outcome is the same throwable.
|
||||||
|
@ -7,8 +7,6 @@ from the previous milestone release.
|
|||||||
Unreleased
|
Unreleased
|
||||||
----------
|
----------
|
||||||
|
|
||||||
* Errors thrown by a Corda node will now reported to a calling RPC client with attention to serialization and obfuscation of internal data.
|
|
||||||
|
|
||||||
* Update the fast-classpath-scanner dependent library version from 2.0.21 to 2.12.3
|
* Update the fast-classpath-scanner dependent library version from 2.0.21 to 2.12.3
|
||||||
|
|
||||||
.. note:: Whilst this is not the latest version of this library, that being 2.18.1 at time of writing, versions later
|
.. note:: Whilst this is not the latest version of this library, that being 2.18.1 at time of writing, versions later
|
||||||
|
@ -1,32 +0,0 @@
|
|||||||
package net.corda.nodeapi.exceptions
|
|
||||||
|
|
||||||
import net.corda.core.CordaRuntimeException
|
|
||||||
import java.io.InvalidClassException
|
|
||||||
|
|
||||||
// could change to use package name matching but trying to avoid reflection for now
|
|
||||||
private val whitelisted = setOf(
|
|
||||||
InvalidClassException::class,
|
|
||||||
RpcSerializableError::class
|
|
||||||
)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An [Exception] to signal RPC clients that something went wrong within a Corda node.
|
|
||||||
*/
|
|
||||||
class InternalNodeException(message: String) : CordaRuntimeException(message) {
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
|
|
||||||
private const val DEFAULT_MESSAGE = "Something went wrong within the Corda node."
|
|
||||||
|
|
||||||
fun defaultMessage(): String = DEFAULT_MESSAGE
|
|
||||||
|
|
||||||
fun obfuscateIfInternal(wrapped: Throwable): Throwable {
|
|
||||||
|
|
||||||
(wrapped as? CordaRuntimeException)?.setCause(null)
|
|
||||||
return when {
|
|
||||||
whitelisted.any { it.isInstance(wrapped) } -> wrapped
|
|
||||||
else -> InternalNodeException(DEFAULT_MESSAGE)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,11 +0,0 @@
|
|||||||
package net.corda.nodeapi.exceptions
|
|
||||||
|
|
||||||
import net.corda.core.CordaRuntimeException
|
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
|
|
||||||
class OutdatedNetworkParameterHashException(old: SecureHash, new: SecureHash) : CordaRuntimeException(TEMPLATE.format(old, new)), RpcSerializableError {
|
|
||||||
|
|
||||||
private companion object {
|
|
||||||
private const val TEMPLATE = "Refused to accept parameters with hash %s because network map advertises update with hash %s. Please check newest version"
|
|
||||||
}
|
|
||||||
}
|
|
@ -5,4 +5,4 @@ import net.corda.core.CordaRuntimeException
|
|||||||
/**
|
/**
|
||||||
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
|
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
|
||||||
*/
|
*/
|
||||||
class RejectedCommandException(message: String) : CordaRuntimeException(message), RpcSerializableError
|
class RejectedCommandException(msg: String) : CordaRuntimeException(msg)
|
@ -1,9 +0,0 @@
|
|||||||
package net.corda.nodeapi.exceptions
|
|
||||||
|
|
||||||
import net.corda.core.serialization.CordaSerializable
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Allows an implementing [Throwable] to be propagated to RPC clients.
|
|
||||||
*/
|
|
||||||
@CordaSerializable
|
|
||||||
interface RpcSerializableError
|
|
@ -1,15 +0,0 @@
|
|||||||
package net.corda.nodeapi.exceptions.adapters
|
|
||||||
|
|
||||||
import net.corda.core.internal.concurrent.mapError
|
|
||||||
import net.corda.core.messaging.FlowHandle
|
|
||||||
import net.corda.core.serialization.CordaSerializable
|
|
||||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adapter able to mask errors within a Corda node for RPC clients.
|
|
||||||
*/
|
|
||||||
@CordaSerializable
|
|
||||||
data class InternalObfuscatingFlowHandle<RESULT>(val wrapped: FlowHandle<RESULT>) : FlowHandle<RESULT> by wrapped {
|
|
||||||
|
|
||||||
override val returnValue = wrapped.returnValue.mapError(InternalNodeException.Companion::obfuscateIfInternal)
|
|
||||||
}
|
|
@ -1,22 +0,0 @@
|
|||||||
package net.corda.nodeapi.exceptions.adapters
|
|
||||||
|
|
||||||
import net.corda.core.internal.concurrent.mapError
|
|
||||||
import net.corda.core.mapErrors
|
|
||||||
import net.corda.core.messaging.FlowProgressHandle
|
|
||||||
import net.corda.core.serialization.CordaSerializable
|
|
||||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adapter able to mask errors within a Corda node for RPC clients.
|
|
||||||
*/
|
|
||||||
@CordaSerializable
|
|
||||||
class InternalObfuscatingFlowProgressHandle<RESULT>(val wrapped: FlowProgressHandle<RESULT>) : FlowProgressHandle<RESULT> by wrapped {
|
|
||||||
|
|
||||||
override val returnValue = wrapped.returnValue.mapError(InternalNodeException.Companion::obfuscateIfInternal)
|
|
||||||
|
|
||||||
override val progress = wrapped.progress.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
|
||||||
|
|
||||||
override val stepsTreeIndexFeed = wrapped.stepsTreeIndexFeed?.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
|
||||||
|
|
||||||
override val stepsTreeFeed = wrapped.stepsTreeFeed?.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
|
||||||
}
|
|
@ -1,6 +0,0 @@
|
|||||||
package net.corda
|
|
||||||
|
|
||||||
import net.corda.core.CordaRuntimeException
|
|
||||||
import net.corda.nodeapi.exceptions.RpcSerializableError
|
|
||||||
|
|
||||||
class ClientRelevantException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause), RpcSerializableError
|
|
@ -5,6 +5,7 @@ import com.nhaarman.mockito_kotlin.whenever
|
|||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.cordapp.CordappProvider
|
import net.corda.core.cordapp.CordappProvider
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.flows.UnexpectedFlowEndException
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
@ -21,7 +22,6 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.node.internal.cordapp.CordappLoader
|
import net.corda.node.internal.cordapp.CordappLoader
|
||||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
|
||||||
import net.corda.testing.common.internal.testNetworkParameters
|
import net.corda.testing.common.internal.testNetworkParameters
|
||||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||||
@ -113,7 +113,7 @@ class AttachmentLoadingTests {
|
|||||||
driver {
|
driver {
|
||||||
installIsolatedCordappTo(bankAName)
|
installIsolatedCordappTo(bankAName)
|
||||||
val (bankA, bankB) = createTwoNodes()
|
val (bankA, bankB) = createTwoNodes()
|
||||||
assertFailsWith<InternalNodeException> {
|
assertFailsWith<UnexpectedFlowEndException>("Party C=CH,L=Zurich,O=BankB rejected session request: Don't know net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator") {
|
||||||
bankA.rpc.startFlowDynamic(flowInitiatorClass, bankB.nodeInfo.legalIdentities.first()).returnValue.getOrThrow()
|
bankA.rpc.startFlowDynamic(flowInitiatorClass, bankB.nodeInfo.legalIdentities.first()).returnValue.getOrThrow()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,120 +0,0 @@
|
|||||||
package net.corda.node.services.rpc
|
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
|
||||||
import net.corda.ClientRelevantException
|
|
||||||
import net.corda.core.flows.*
|
|
||||||
import net.corda.core.identity.Party
|
|
||||||
import net.corda.core.messaging.startFlow
|
|
||||||
import net.corda.core.utilities.getOrThrow
|
|
||||||
import net.corda.core.utilities.unwrap
|
|
||||||
import net.corda.node.services.Permissions
|
|
||||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
|
||||||
import net.corda.testing.core.singleIdentity
|
|
||||||
import net.corda.testing.driver.DriverParameters
|
|
||||||
import net.corda.testing.driver.NodeParameters
|
|
||||||
import net.corda.testing.driver.driver
|
|
||||||
import net.corda.testing.node.User
|
|
||||||
import org.assertj.core.api.Assertions.assertThatCode
|
|
||||||
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
|
|
||||||
import org.hibernate.exception.GenericJDBCException
|
|
||||||
import org.junit.Test
|
|
||||||
import java.sql.SQLException
|
|
||||||
|
|
||||||
class RpcExceptionHandlingTest {
|
|
||||||
|
|
||||||
private val user = User("mark", "dadada", setOf(Permissions.all()))
|
|
||||||
private val users = listOf(user)
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `rpc client handles exceptions thrown on node side`() {
|
|
||||||
|
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
|
||||||
|
|
||||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
|
||||||
|
|
||||||
assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
|
||||||
|
|
||||||
assertThat(exception).hasNoCause()
|
|
||||||
assertThat(exception.stackTrace).isEmpty()
|
|
||||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `rpc client handles client-relevant exceptions thrown on node side`() {
|
|
||||||
|
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
|
||||||
|
|
||||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
|
||||||
val clientRelevantMessage = "This is for the players!"
|
|
||||||
|
|
||||||
assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception ->
|
|
||||||
|
|
||||||
assertThat(exception).hasNoCause()
|
|
||||||
assertThat(exception.stackTrace).isEmpty()
|
|
||||||
assertThat(exception.message).isEqualTo(clientRelevantMessage)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `rpc client handles exceptions thrown on counter-party side`() {
|
|
||||||
|
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
|
||||||
|
|
||||||
val nodeA = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
|
||||||
val nodeB = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
|
||||||
|
|
||||||
assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
|
||||||
|
|
||||||
assertThat(exception).hasNoCause()
|
|
||||||
assertThat(exception.stackTrace).isEmpty()
|
|
||||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@StartableByRPC
|
|
||||||
class Flow : FlowLogic<String>() {
|
|
||||||
|
|
||||||
@Suspendable
|
|
||||||
override fun call(): String {
|
|
||||||
|
|
||||||
throw GenericJDBCException("Something went wrong!", SQLException("Oops!"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@StartableByRPC
|
|
||||||
@InitiatingFlow
|
|
||||||
class InitFlow(private val party: Party) : FlowLogic<String>() {
|
|
||||||
|
|
||||||
@Suspendable
|
|
||||||
override fun call(): String {
|
|
||||||
|
|
||||||
val session = initiateFlow(party)
|
|
||||||
return session.sendAndReceive<String>("hey").unwrap { it }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@InitiatedBy(InitFlow::class)
|
|
||||||
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
|
|
||||||
|
|
||||||
@Suspendable
|
|
||||||
override fun call() {
|
|
||||||
|
|
||||||
initiatingSession.receive<String>().unwrap { it }
|
|
||||||
throw GenericJDBCException("Something went wrong!", SQLException("Oops!"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@StartableByRPC
|
|
||||||
class ClientRelevantErrorFlow(private val message: String) : FlowLogic<String>() {
|
|
||||||
|
|
||||||
@Suspendable
|
|
||||||
override fun call(): String {
|
|
||||||
|
|
||||||
throw ClientRelevantException(message, SQLException("Oops!"))
|
|
||||||
}
|
|
||||||
}
|
|
@ -203,9 +203,14 @@ internal class CordaRPCOpsImpl(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
|
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
|
||||||
// TODO: this operation should not require an explicit transaction
|
try {
|
||||||
return database.transaction {
|
return database.transaction {
|
||||||
services.attachments.queryAttachments(query, sorting)
|
services.attachments.queryAttachments(query, sorting)
|
||||||
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
// log and rethrow exception so we keep a copy server side
|
||||||
|
log.error(e.message)
|
||||||
|
throw e.cause ?: e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -295,11 +295,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
// Start up the MQ clients.
|
// Start up the MQ clients.
|
||||||
rpcMessagingClient?.run {
|
rpcMessagingClient?.run {
|
||||||
runOnStop += this::close
|
runOnStop += this::close
|
||||||
when (rpcOps) {
|
start(rpcOps, securityManager)
|
||||||
// not sure what this RPCOps base interface is for
|
|
||||||
is SecureCordaRPCOps -> start(RpcExceptionHandlingProxy(rpcOps), securityManager)
|
|
||||||
else -> start(rpcOps, securityManager)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
verifierMessagingClient?.run {
|
verifierMessagingClient?.run {
|
||||||
runOnStop += this::stop
|
runOnStop += this::stop
|
||||||
|
@ -1,147 +0,0 @@
|
|||||||
package net.corda.node.internal
|
|
||||||
|
|
||||||
import net.corda.core.concurrent.CordaFuture
|
|
||||||
import net.corda.core.contracts.ContractState
|
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
import net.corda.core.doOnError
|
|
||||||
import net.corda.core.flows.FlowLogic
|
|
||||||
import net.corda.core.identity.AbstractParty
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
|
||||||
import net.corda.core.internal.concurrent.doOnError
|
|
||||||
import net.corda.core.internal.concurrent.mapError
|
|
||||||
import net.corda.core.mapErrors
|
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
|
||||||
import net.corda.core.messaging.DataFeed
|
|
||||||
import net.corda.core.messaging.FlowHandle
|
|
||||||
import net.corda.core.messaging.FlowProgressHandle
|
|
||||||
import net.corda.core.node.services.vault.*
|
|
||||||
import net.corda.core.utilities.loggerFor
|
|
||||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
|
||||||
import net.corda.nodeapi.exceptions.adapters.InternalObfuscatingFlowHandle
|
|
||||||
import net.corda.nodeapi.exceptions.adapters.InternalObfuscatingFlowProgressHandle
|
|
||||||
import java.io.InputStream
|
|
||||||
import java.security.PublicKey
|
|
||||||
|
|
||||||
class RpcExceptionHandlingProxy(private val delegate: SecureCordaRPCOps) : CordaRPCOps {
|
|
||||||
|
|
||||||
private companion object {
|
|
||||||
private val logger = loggerFor<RpcExceptionHandlingProxy>()
|
|
||||||
}
|
|
||||||
|
|
||||||
override val protocolVersion: Int get() = delegate.protocolVersion
|
|
||||||
|
|
||||||
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> = wrap {
|
|
||||||
|
|
||||||
val handle = delegate.startFlowDynamic(logicType, *args)
|
|
||||||
val result = InternalObfuscatingFlowHandle(handle)
|
|
||||||
result.returnValue.doOnError { error -> logger.error(error.message, error) }
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> = wrap {
|
|
||||||
|
|
||||||
val handle = delegate.startTrackedFlowDynamic(logicType, *args)
|
|
||||||
val result = InternalObfuscatingFlowProgressHandle(handle)
|
|
||||||
result.returnValue.doOnError { error -> logger.error(error.message, error) }
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun waitUntilNetworkReady() = wrapFuture(delegate::waitUntilNetworkReady)
|
|
||||||
|
|
||||||
override fun stateMachinesFeed() = wrapFeed(delegate::stateMachinesFeed)
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = wrapFeed { delegate.vaultTrackBy(criteria, paging, sorting, contractStateType) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>) = wrapFeed { delegate.vaultTrack(contractStateType) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria) = wrapFeed { delegate.vaultTrackByCriteria(contractStateType, criteria) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification) = wrapFeed { delegate.vaultTrackByWithPagingSpec(contractStateType, criteria, paging) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort) = wrapFeed { delegate.vaultTrackByWithSorting(contractStateType, criteria, sorting) }
|
|
||||||
|
|
||||||
override fun stateMachineRecordedTransactionMappingFeed() = wrapFeed(delegate::stateMachineRecordedTransactionMappingFeed)
|
|
||||||
|
|
||||||
override fun networkMapFeed() = wrapFeed(delegate::networkMapFeed)
|
|
||||||
|
|
||||||
override fun networkParametersFeed() = wrapFeed(delegate::networkParametersFeed)
|
|
||||||
|
|
||||||
override fun internalVerifiedTransactionsFeed() = wrapFeed(delegate::internalVerifiedTransactionsFeed)
|
|
||||||
|
|
||||||
override fun stateMachinesSnapshot() = wrap(delegate::stateMachinesSnapshot)
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = wrap { delegate.vaultQueryBy(criteria, paging, sorting, contractStateType) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>) = wrap { delegate.vaultQuery(contractStateType) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>) = wrap { delegate.vaultQueryByCriteria(criteria, contractStateType) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification) = wrap { delegate.vaultQueryByWithPagingSpec(contractStateType, criteria, paging) }
|
|
||||||
|
|
||||||
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort) = wrap { delegate.vaultQueryByWithSorting(contractStateType, criteria, sorting) }
|
|
||||||
|
|
||||||
override fun internalVerifiedTransactionsSnapshot() = wrap(delegate::internalVerifiedTransactionsSnapshot)
|
|
||||||
|
|
||||||
override fun stateMachineRecordedTransactionMappingSnapshot() = wrap(delegate::stateMachineRecordedTransactionMappingSnapshot)
|
|
||||||
|
|
||||||
override fun networkMapSnapshot() = wrap(delegate::networkMapSnapshot)
|
|
||||||
|
|
||||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) = wrap { delegate.acceptNewNetworkParameters(parametersHash) }
|
|
||||||
|
|
||||||
override fun nodeInfo() = wrap(delegate::nodeInfo)
|
|
||||||
|
|
||||||
override fun notaryIdentities() = wrap(delegate::notaryIdentities)
|
|
||||||
|
|
||||||
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = wrap { delegate.addVaultTransactionNote(txnId, txnNote) }
|
|
||||||
|
|
||||||
override fun getVaultTransactionNotes(txnId: SecureHash) = wrap { delegate.getVaultTransactionNotes(txnId) }
|
|
||||||
|
|
||||||
override fun attachmentExists(id: SecureHash) = wrap { delegate.attachmentExists(id) }
|
|
||||||
|
|
||||||
override fun openAttachment(id: SecureHash) = wrap { delegate.openAttachment(id) }
|
|
||||||
|
|
||||||
override fun uploadAttachment(jar: InputStream) = wrap { delegate.uploadAttachment(jar) }
|
|
||||||
|
|
||||||
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String) = wrap { delegate.uploadAttachmentWithMetadata(jar, uploader, filename) }
|
|
||||||
|
|
||||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?) = wrap { delegate.queryAttachments(query, sorting) }
|
|
||||||
|
|
||||||
override fun currentNodeTime() = wrap(delegate::currentNodeTime)
|
|
||||||
|
|
||||||
override fun wellKnownPartyFromAnonymous(party: AbstractParty) = wrap { delegate.wellKnownPartyFromAnonymous(party) }
|
|
||||||
|
|
||||||
override fun partyFromKey(key: PublicKey) = wrap { delegate.partyFromKey(key) }
|
|
||||||
|
|
||||||
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name) = wrap { delegate.wellKnownPartyFromX500Name(x500Name) }
|
|
||||||
|
|
||||||
override fun notaryPartyFromX500Name(x500Name: CordaX500Name) = wrap { delegate.notaryPartyFromX500Name(x500Name) }
|
|
||||||
|
|
||||||
override fun partiesFromName(query: String, exactMatch: Boolean) = wrap { delegate.partiesFromName(query, exactMatch) }
|
|
||||||
|
|
||||||
override fun registeredFlows() = wrap(delegate::registeredFlows)
|
|
||||||
|
|
||||||
override fun nodeInfoFromParty(party: AbstractParty) = wrap { delegate.nodeInfoFromParty(party) }
|
|
||||||
|
|
||||||
override fun clearNetworkMapCache() = wrap(delegate::clearNetworkMapCache)
|
|
||||||
|
|
||||||
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = wrap { delegate.setFlowsDrainingModeEnabled(enabled) }
|
|
||||||
|
|
||||||
override fun isFlowsDrainingModeEnabled() = wrap(delegate::isFlowsDrainingModeEnabled)
|
|
||||||
|
|
||||||
private fun <RESULT> wrap(call: () -> RESULT): RESULT {
|
|
||||||
|
|
||||||
return try {
|
|
||||||
call.invoke()
|
|
||||||
} catch (error: Throwable) {
|
|
||||||
logger.error(error.message, error)
|
|
||||||
throw InternalNodeException.obfuscateIfInternal(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun <SNAPSHOT, ELEMENT> wrapFeed(call: () -> DataFeed<SNAPSHOT, ELEMENT>) = wrap {
|
|
||||||
|
|
||||||
call.invoke().doOnError { error -> logger.error(error.message, error) }.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun <RESULT> wrapFuture(call: () -> CordaFuture<RESULT>): CordaFuture<RESULT> = wrap { call.invoke().mapError(InternalNodeException.Companion::obfuscateIfInternal).doOnError { error -> logger.error(error.message, error) } }
|
|
||||||
}
|
|
@ -14,8 +14,11 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.minutes
|
import net.corda.core.utilities.minutes
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||||
import net.corda.node.utilities.NamedThreadFactory
|
import net.corda.node.utilities.NamedThreadFactory
|
||||||
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
|
|
||||||
import net.corda.nodeapi.internal.network.*
|
import net.corda.nodeapi.internal.network.*
|
||||||
|
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||||
|
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||||
|
import net.corda.nodeapi.internal.network.SignedNetworkParameters
|
||||||
|
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -154,7 +157,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
|||||||
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
|
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
|
||||||
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
|
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
|
||||||
} else {
|
} else {
|
||||||
throw throw OutdatedNetworkParameterHashException(parametersHash, newParametersHash)
|
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map " +
|
||||||
|
"advertises update with hash $newParametersHash. Please check newest version")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user