CORDA-1660: Wiring up the CordaRPCClient class loader to the p2p serialisation context. (#3454)

This is to allow the standalone shell to be able to receive WireTransactions containing Cash.State objects.
This commit is contained in:
Shams Asari 2018-06-27 17:02:35 +01:00 committed by GitHub
parent f51407cd5d
commit eee2563bfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 119 additions and 62 deletions

View File

@ -1,26 +1,33 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.createCordaRPCClientWithSslAndClassLoader
import net.corda.core.context.*
import net.corda.core.contracts.FungibleAsset
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.packageName
import net.corda.core.internal.location
import net.corda.core.internal.toPath
import net.corda.core.messaging.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
import net.corda.finance.USD
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.contracts.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.all
import net.corda.testing.common.internal.checkNotOnClasspath
import net.corda.testing.core.*
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import net.corda.testing.node.internal.ProcessUtilities
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -28,6 +35,10 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.subjects.PublishSubject
import java.io.File.pathSeparator
import java.net.URLClassLoader
import java.nio.file.Paths
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
@ -36,9 +47,11 @@ import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(all())
)
class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance")) {
companion object {
val rpcUser = User("user1", "test", permissions = setOf(all()))
}
private lateinit var node: StartedNode<Node>
private lateinit var identity: Party
private lateinit var client: CordaRPCClient
@ -51,7 +64,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
@Before
fun setUp() {
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, CordaRPCClientConfiguration.DEFAULT.copy(
client = CordaRPCClient(node.internals.configuration.rpcOptions.address, CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 5
))
identity = node.info.identityFromX500Name(ALICE_NAME)
@ -83,7 +96,6 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
@Test
fun `shutdown command stops the node`() {
val nodeIsShut: PublishSubject<Unit> = PublishSubject.create()
val latch = CountDownLatch(1)
var successful = false
@ -130,7 +142,6 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
}
private class CloseableExecutor(private val delegate: ScheduledExecutorService) : AutoCloseable, ScheduledExecutorService by delegate {
override fun close() {
delegate.shutdown()
}
@ -209,19 +220,70 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
)
}
}
}
private fun checkShellNotification(info: StateMachineInfo) {
val context = info.invocationContext
assertThat(context.origin).isInstanceOf(InvocationOrigin.Shell::class.java)
}
// WireTransaction stores its components as blobs which are deserialised in its constructor. This test makes sure
// the extra class loader given to the CordaRPCClient is used in this deserialisation, as otherwise any WireTransaction
// containing Cash.State objects are not receivable by the client.
//
// We run the client in a separate process, without the finance module on its system classpath to ensure that the
// additional class loader that we give it is used. Cash.State objects are used as they can't be synthesised fully
// by the carpenter, and thus avoiding any false-positive results.
@Test
fun `additional class loader used by WireTransaction when it deserialises its components`() {
val financeLocation = Cash::class.java.location.toPath().toString()
val classpathWithoutFinance = ProcessUtilities.defaultClassPath
.split(pathSeparator)
.filter { financeLocation !in it }
.joinToString(pathSeparator)
private fun checkRpcNotification(info: StateMachineInfo, rpcUsername: String, historicalIds: MutableSet<Trace.InvocationId>, externalTrace: Trace?, impersonatedActor: Actor?) {
val context = info.invocationContext
assertThat(context.origin).isInstanceOf(InvocationOrigin.RPC::class.java)
assertThat(context.externalTrace).isEqualTo(externalTrace)
assertThat(context.impersonatedActor).isEqualTo(impersonatedActor)
assertThat(context.actor?.id?.value).isEqualTo(rpcUsername)
assertThat(historicalIds).doesNotContain(context.trace.invocationId)
historicalIds.add(context.trace.invocationId)
// Create a Cash.State object for the StandaloneCashRpcClient to get
node.services.startFlow(CashIssueFlow(100.POUNDS, OpaqueBytes.of(1), identity), InvocationContext.shell())
val outOfProcessRpc = ProcessUtilities.startJavaProcess<StandaloneCashRpcClient>(
classpath = classpathWithoutFinance,
arguments = listOf(node.internals.configuration.rpcOptions.address.toString(), financeLocation)
)
assertThat(outOfProcessRpc.waitFor()).isZero() // i.e. no exceptions were thrown
}
private fun checkShellNotification(info: StateMachineInfo) {
val context = info.invocationContext
assertThat(context.origin).isInstanceOf(InvocationOrigin.Shell::class.java)
}
private fun checkRpcNotification(info: StateMachineInfo,
rpcUsername: String,
historicalIds: MutableSet<Trace.InvocationId>,
externalTrace: Trace?,
impersonatedActor: Actor?) {
val context = info.invocationContext
assertThat(context.origin).isInstanceOf(InvocationOrigin.RPC::class.java)
assertThat(context.externalTrace).isEqualTo(externalTrace)
assertThat(context.impersonatedActor).isEqualTo(impersonatedActor)
assertThat(context.actor?.id?.value).isEqualTo(rpcUsername)
assertThat(historicalIds).doesNotContain(context.trace.invocationId)
historicalIds.add(context.trace.invocationId)
}
private object StandaloneCashRpcClient {
@JvmStatic
fun main(args: Array<String>) {
checkNotOnClasspath("net.corda.finance.contracts.asset.Cash") {
"The finance module cannot be on the system classpath"
}
val address = NetworkHostAndPort.parse(args[0])
val financeClassLoader = URLClassLoader(arrayOf(Paths.get(args[1]).toUri().toURL()))
val rpcUser = CordaRPCClientTest.rpcUser
val client = createCordaRPCClientWithSslAndClassLoader(address, classLoader = financeClassLoader)
val state = client.use(rpcUser.username, rpcUser.password) {
// financeClassLoader should be allowing the Cash.State to materialise
@Suppress("DEPRECATION")
it.proxy.internalVerifiedTransactionsSnapshot()[0].tx.outputsOfType<FungibleAsset<*>>()[0]
}
assertThat(state.javaClass.name).isEqualTo("net.corda.finance.contracts.asset.Cash${'$'}State")
assertThat(state.amount.quantity).isEqualTo(10000)
assertThat(state.amount.token.product).isEqualTo(Currency.getInstance("GBP"))
// This particular check assures us that the Cash.State that we have hasn't been carpented.
assertThat(state.participants).isEqualTo(listOf(state.owner))
}
}
}

View File

@ -286,7 +286,7 @@ class CordaRPCClient private constructor(
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
AMQPClientSerializationScheme.initialiseSerialization()
AMQPClientSerializationScheme.initialiseSerialization(classLoader)
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}

View File

@ -3,6 +3,7 @@ package net.corda.client.rpc.internal.serialization.amqp
import net.corda.core.cordapp.Cordapp
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationContext.*
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
@ -29,25 +30,26 @@ class AMQPClientSerializationScheme(
companion object {
/** Call from main only. */
fun initialiseSerialization() {
nodeSerializationEnv = createSerializationEnv()
fun initialiseSerialization(classLoader: ClassLoader? = null) {
nodeSerializationEnv = createSerializationEnv(classLoader)
}
fun createSerializationEnv(): SerializationEnvironment {
fun createSerializationEnv(classLoader: ClassLoader? = null): SerializationEnvironment {
return SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(AMQPClientSerializationScheme(emptyList()))
},
storageContext = AMQP_STORAGE_CONTEXT,
p2pContext = AMQP_P2P_CONTEXT,
p2pContext = if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT,
rpcClientContext = AMQP_RPC_CLIENT_CONTEXT,
rpcServerContext = AMQP_RPC_SERVER_CONTEXT)
rpcServerContext = AMQP_RPC_SERVER_CONTEXT
)
}
}
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase) =
magic == amqpMagic && (
target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return magic == amqpMagic && (target == UseCase.RPCClient || target == UseCase.P2P)
}
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(context.whitelist, ClassLoader.getSystemClassLoader(), context.lenientCarpenterEnabled).apply {
@ -60,4 +62,4 @@ class AMQPClientSerializationScheme(
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
}
}

View File

@ -24,9 +24,8 @@ data class TransactionState<out T : ContractState> @JvmOverloads constructor(
* Currently these are loaded from the classpath of the node which includes the cordapp directory - at some
* point these will also be loaded and run from the attachment store directly, allowing contracts to be
* sent across, and run, from the network from within a sandbox environment.
*
* TODO: Implement the contract sandbox loading of the contract attachments
* */
*/
// TODO: Implement the contract sandbox loading of the contract attachments
val contract: ContractClassName,
/** Identity of the notary that ensures the state is not used as an input to a transaction more than once */
val notary: Party,

View File

@ -74,11 +74,11 @@ open class SerializerFactory(
@DeleteForDJVM
constructor(whitelist: ClassWhitelist,
classLoader: ClassLoader,
carpenterClassLoader: ClassLoader,
lenientCarpenter: Boolean = false,
evolutionSerializerGetter: EvolutionSerializerGetterBase = EvolutionSerializerGetter(),
fingerPrinter: FingerPrinter = SerializerFingerPrinter()
) : this(whitelist, ClassCarpenterImpl(whitelist, classLoader, lenientCarpenter), evolutionSerializerGetter, fingerPrinter)
) : this(whitelist, ClassCarpenterImpl(whitelist, carpenterClassLoader, lenientCarpenter), evolutionSerializerGetter, fingerPrinter)
init {
fingerPrinter.setOwner(this)

View File

@ -1,17 +1,16 @@
package net.corda.testing.node.internal
import net.corda.core.internal.div
import net.corda.core.internal.exists
import java.io.File.pathSeparator
import java.nio.file.Path
object ProcessUtilities {
inline fun <reified C : Any> startJavaProcess(
arguments: List<String>,
classpath: String = defaultClassPath,
jdwpPort: Int? = null,
extraJvmArguments: List<String> = emptyList()
): Process {
return startJavaProcessImpl(C::class.java.name, arguments, defaultClassPath, jdwpPort, extraJvmArguments, null, null)
return startJavaProcessImpl(C::class.java.name, arguments, classpath, jdwpPort, extraJvmArguments, null, null)
}
fun startCordaProcess(

View File

@ -8,6 +8,7 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.asContextEnv
import net.corda.testing.common.internal.checkNotOnClasspath
import net.corda.testing.common.internal.testNetworkParameters
import java.nio.file.Path
import java.nio.file.Paths
@ -67,11 +68,8 @@ class NodeProcess(
}
init {
try {
Class.forName("net.corda.node.Corda")
throw Error("Smoke test has the node in its classpath. Please remove the offending dependency.")
} catch (e: ClassNotFoundException) {
// If the class can't be found then we're good!
checkNotOnClasspath("net.corda.node.Corda") {
"Smoke test has the node in its classpath. Please remove the offending dependency."
}
}
}

View File

@ -0,0 +1,10 @@
package net.corda.testing.common.internal
inline fun checkNotOnClasspath(className: String, errorMessage: () -> Any) {
try {
Class.forName(className)
throw IllegalStateException(errorMessage().toString())
} catch (e: ClassNotFoundException) {
// If the class can't be found then we're good!
}
}

View File

@ -15,18 +15,10 @@ import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.Emoji
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.rootCause
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.messaging.*
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext
@ -131,8 +123,7 @@ object InteractiveShell {
config["crash.ssh.port"] = configuration.sshdPort?.toString()
config["crash.auth"] = "corda"
configuration.sshHostKeyDirectory?.apply {
val sshKeysDir = configuration.sshHostKeyDirectory
sshKeysDir.createDirectories()
val sshKeysDir = configuration.sshHostKeyDirectory.createDirectories()
config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString()
config["crash.ssh.keygen"] = "true"
}
@ -275,7 +266,7 @@ object InteractiveShell {
val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, om)
val latch = CountDownLatch(1)
ansiProgressRenderer.render(stateObservable, { latch.countDown() })
ansiProgressRenderer.render(stateObservable, latch::countDown)
// Wait for the flow to end and the progress tracker to notice. By the time the latch is released
// the tracker is done with the screen.
while (!Thread.currentThread().isInterrupted) {
@ -291,11 +282,7 @@ object InteractiveShell {
}
}
}
stateObservable.returnValue.get()?.apply {
if (this !is Throwable) {
output.println("Flow completed with result: $this")
}
}
output.println("Flow completed with result: ${stateObservable.returnValue.get()}")
} catch (e: NoApplicableConstructor) {
output.println("No matching constructor found:", Color.red)
e.errors.forEach { output.println("- $it", Color.red) }