mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
[CORDA-1264]: Ensure correct serialisation and masking for throwables raised by a node and propagated through RPC. (#2892)
This commit is contained in:
committed by
GitHub
parent
196a24f030
commit
0d1d7daedc
@ -0,0 +1,6 @@
|
||||
package net.corda
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.nodeapi.exceptions.RpcSerializableError
|
||||
|
||||
class ClientRelevantException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause), RpcSerializableError
|
@ -5,7 +5,6 @@ import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.cordapp.CordappProvider
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
@ -22,6 +21,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
@ -114,7 +114,7 @@ class AttachmentLoadingTests {
|
||||
driver {
|
||||
installIsolatedCordappTo(bankAName)
|
||||
val (bankA, bankB) = createTwoNodes()
|
||||
assertFailsWith<UnexpectedFlowEndException>("Party C=CH,L=Zurich,O=BankB rejected session request: Don't know net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator") {
|
||||
assertFailsWith<InternalNodeException> {
|
||||
bankA.rpc.startFlowDynamic(flowInitiatorClass, bankB.nodeInfo.legalIdentities.first()).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,120 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.ClientRelevantException
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import org.assertj.core.api.Assertions.assertThatCode
|
||||
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
|
||||
import org.hibernate.exception.GenericJDBCException
|
||||
import org.junit.Test
|
||||
import java.sql.SQLException
|
||||
|
||||
class RpcExceptionHandlingTest {
|
||||
|
||||
private val user = User("mark", "dadada", setOf(Permissions.all()))
|
||||
private val users = listOf(user)
|
||||
|
||||
@Test
|
||||
fun `rpc client handles exceptions thrown on node side`() {
|
||||
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
|
||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
|
||||
assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
||||
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `rpc client handles client-relevant exceptions thrown on node side`() {
|
||||
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
|
||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
val clientRelevantMessage = "This is for the players!"
|
||||
|
||||
assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception ->
|
||||
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
assertThat(exception.message).isEqualTo(clientRelevantMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `rpc client handles exceptions thrown on counter-party side`() {
|
||||
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
|
||||
val nodeA = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
val nodeB = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||
|
||||
assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
||||
|
||||
assertThat(exception).hasNoCause()
|
||||
assertThat(exception.stackTrace).isEmpty()
|
||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class Flow : FlowLogic<String>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
|
||||
throw GenericJDBCException("Something went wrong!", SQLException("Oops!"))
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class InitFlow(private val party: Party) : FlowLogic<String>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
|
||||
val session = initiateFlow(party)
|
||||
return session.sendAndReceive<String>("hey").unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(InitFlow::class)
|
||||
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
|
||||
initiatingSession.receive<String>().unwrap { it }
|
||||
throw GenericJDBCException("Something went wrong!", SQLException("Oops!"))
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class ClientRelevantErrorFlow(private val message: String) : FlowLogic<String>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
|
||||
throw ClientRelevantException(message, SQLException("Oops!"))
|
||||
}
|
||||
}
|
@ -204,14 +204,9 @@ internal class CordaRPCOpsImpl(
|
||||
}
|
||||
|
||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
|
||||
try {
|
||||
return database.transaction {
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
return database.transaction {
|
||||
services.attachments.queryAttachments(query, sorting)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
// log and rethrow exception so we keep a copy server side
|
||||
log.error(e.message)
|
||||
throw e.cause ?: e
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,8 +27,11 @@ import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.SecurityConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.node.services.config.shell.shellUser
|
||||
import net.corda.node.services.config.shouldInitCrashShell
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
@ -281,7 +284,11 @@ open class Node(configuration: NodeConfiguration,
|
||||
// Start up the MQ clients.
|
||||
rpcMessagingClient?.run {
|
||||
runOnStop += this::close
|
||||
start(rpcOps, securityManager)
|
||||
when (rpcOps) {
|
||||
// not sure what this RPCOps base interface is for
|
||||
is SecureCordaRPCOps -> start(RpcExceptionHandlingProxy(rpcOps), securityManager)
|
||||
else -> start(rpcOps, securityManager)
|
||||
}
|
||||
}
|
||||
verifierMessagingClient?.run {
|
||||
runOnStop += this::stop
|
||||
|
@ -0,0 +1,149 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.doOnError
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.concurrent.doOnError
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.mapErrors
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.nodeapi.exceptions.adapters.InternalObfuscatingFlowHandle
|
||||
import net.corda.nodeapi.exceptions.adapters.InternalObfuscatingFlowProgressHandle
|
||||
import java.io.InputStream
|
||||
import java.security.PublicKey
|
||||
|
||||
class RpcExceptionHandlingProxy(private val delegate: SecureCordaRPCOps) : CordaRPCOps {
|
||||
|
||||
private companion object {
|
||||
private val logger = loggerFor<RpcExceptionHandlingProxy>()
|
||||
}
|
||||
|
||||
override val protocolVersion: Int get() = delegate.protocolVersion
|
||||
|
||||
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> = wrap {
|
||||
|
||||
val handle = delegate.startFlowDynamic(logicType, *args)
|
||||
val result = InternalObfuscatingFlowHandle(handle)
|
||||
result.returnValue.doOnError { error -> logger.error(error.message, error) }
|
||||
result
|
||||
}
|
||||
|
||||
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> = wrap {
|
||||
|
||||
val handle = delegate.startTrackedFlowDynamic(logicType, *args)
|
||||
val result = InternalObfuscatingFlowProgressHandle(handle)
|
||||
result.returnValue.doOnError { error -> logger.error(error.message, error) }
|
||||
result
|
||||
}
|
||||
|
||||
override fun waitUntilNetworkReady() = wrapFuture(delegate::waitUntilNetworkReady)
|
||||
|
||||
override fun stateMachinesFeed() = wrapFeed(delegate::stateMachinesFeed)
|
||||
|
||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = wrapFeed { delegate.vaultTrackBy(criteria, paging, sorting, contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>) = wrapFeed { delegate.vaultTrack(contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria) = wrapFeed { delegate.vaultTrackByCriteria(contractStateType, criteria) }
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification) = wrapFeed { delegate.vaultTrackByWithPagingSpec(contractStateType, criteria, paging) }
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort) = wrapFeed { delegate.vaultTrackByWithSorting(contractStateType, criteria, sorting) }
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingFeed() = wrapFeed(delegate::stateMachineRecordedTransactionMappingFeed)
|
||||
|
||||
override fun networkMapFeed() = wrapFeed(delegate::networkMapFeed)
|
||||
|
||||
override fun networkParametersFeed() = wrapFeed(delegate::networkParametersFeed)
|
||||
|
||||
override fun internalVerifiedTransactionsFeed() = wrapFeed(delegate::internalVerifiedTransactionsFeed)
|
||||
|
||||
override fun stateMachinesSnapshot() = wrap(delegate::stateMachinesSnapshot)
|
||||
|
||||
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>) = wrap { delegate.vaultQueryBy(criteria, paging, sorting, contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>) = wrap { delegate.vaultQuery(contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>) = wrap { delegate.vaultQueryByCriteria(criteria, contractStateType) }
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification) = wrap { delegate.vaultQueryByWithPagingSpec(contractStateType, criteria, paging) }
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort) = wrap { delegate.vaultQueryByWithSorting(contractStateType, criteria, sorting) }
|
||||
|
||||
override fun internalVerifiedTransactionsSnapshot() = wrap(delegate::internalVerifiedTransactionsSnapshot)
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingSnapshot() = wrap(delegate::stateMachineRecordedTransactionMappingSnapshot)
|
||||
|
||||
override fun networkMapSnapshot() = wrap(delegate::networkMapSnapshot)
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) = wrap { delegate.acceptNewNetworkParameters(parametersHash) }
|
||||
|
||||
override fun nodeInfo() = wrap(delegate::nodeInfo)
|
||||
|
||||
override fun notaryIdentities() = wrap(delegate::notaryIdentities)
|
||||
|
||||
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = wrap { delegate.addVaultTransactionNote(txnId, txnNote) }
|
||||
|
||||
override fun getVaultTransactionNotes(txnId: SecureHash) = wrap { delegate.getVaultTransactionNotes(txnId) }
|
||||
|
||||
override fun attachmentExists(id: SecureHash) = wrap { delegate.attachmentExists(id) }
|
||||
|
||||
override fun openAttachment(id: SecureHash) = wrap { delegate.openAttachment(id) }
|
||||
|
||||
override fun uploadAttachment(jar: InputStream) = wrap { delegate.uploadAttachment(jar) }
|
||||
|
||||
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String) = wrap { delegate.uploadAttachmentWithMetadata(jar, uploader, filename) }
|
||||
|
||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?) = wrap { delegate.queryAttachments(query, sorting) }
|
||||
|
||||
override fun currentNodeTime() = wrap(delegate::currentNodeTime)
|
||||
|
||||
override fun wellKnownPartyFromAnonymous(party: AbstractParty) = wrap { delegate.wellKnownPartyFromAnonymous(party) }
|
||||
|
||||
override fun partyFromKey(key: PublicKey) = wrap { delegate.partyFromKey(key) }
|
||||
|
||||
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name) = wrap { delegate.wellKnownPartyFromX500Name(x500Name) }
|
||||
|
||||
override fun notaryPartyFromX500Name(x500Name: CordaX500Name) = wrap { delegate.notaryPartyFromX500Name(x500Name) }
|
||||
|
||||
override fun partiesFromName(query: String, exactMatch: Boolean) = wrap { delegate.partiesFromName(query, exactMatch) }
|
||||
|
||||
override fun registeredFlows() = wrap(delegate::registeredFlows)
|
||||
|
||||
override fun nodeInfoFromParty(party: AbstractParty) = wrap { delegate.nodeInfoFromParty(party) }
|
||||
|
||||
override fun clearNetworkMapCache() = wrap(delegate::clearNetworkMapCache)
|
||||
|
||||
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = wrap { delegate.setFlowsDrainingModeEnabled(enabled) }
|
||||
|
||||
override fun isFlowsDrainingModeEnabled() = wrap(delegate::isFlowsDrainingModeEnabled)
|
||||
|
||||
override fun shutdown() = wrap(delegate::shutdown)
|
||||
|
||||
private fun <RESULT> wrap(call: () -> RESULT): RESULT {
|
||||
|
||||
return try {
|
||||
call.invoke()
|
||||
} catch (error: Throwable) {
|
||||
logger.error(error.message, error)
|
||||
throw InternalNodeException.obfuscateIfInternal(error)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <SNAPSHOT, ELEMENT> wrapFeed(call: () -> DataFeed<SNAPSHOT, ELEMENT>) = wrap {
|
||||
|
||||
call.invoke().doOnError { error -> logger.error(error.message, error) }.mapErrors(InternalNodeException.Companion::obfuscateIfInternal)
|
||||
}
|
||||
|
||||
private fun <RESULT> wrapFuture(call: () -> CordaFuture<RESULT>): CordaFuture<RESULT> = wrap { call.invoke().mapError(InternalNodeException.Companion::obfuscateIfInternal).doOnError { error -> logger.error(error.message, error) } }
|
||||
}
|
@ -14,6 +14,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
@ -183,8 +184,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
|
||||
logger.info("Accepted network parameter update $update: $newNetParams")
|
||||
} else {
|
||||
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map " +
|
||||
"advertises update with hash $newParametersHash. Please check newest version")
|
||||
throw OutdatedNetworkParameterHashException(parametersHash, newParametersHash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user