From 90284a614337aa8d3632b0f8fe2c7eb33e8119f8 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 27 Aug 2019 13:06:28 +0100 Subject: [PATCH] CORDA-2919: JacksonSupport, for CordaSerializable classes, improved to only uses those properties that are part of Corda serialisation (#5397) --- .idea/compiler.xml | 2 + .../client/jackson/internal/CordaModule.kt | 45 +- .../client/jackson/JacksonSupportTest.kt | 15 + .../node/services/rpc/CheckpointDumper.kt | 10 +- .../shell/InteractiveShellIntegrationTest.kt | 386 ++++++++---------- 5 files changed, 223 insertions(+), 235 deletions(-) diff --git a/.idea/compiler.xml b/.idea/compiler.xml index e311d8edce..2cbf1d59fe 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -23,6 +23,8 @@ + + diff --git a/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt b/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt index 910f679d67..947a88cd38 100644 --- a/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt +++ b/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt @@ -3,6 +3,8 @@ package net.corda.client.jackson.internal import com.fasterxml.jackson.annotation.* +import com.fasterxml.jackson.annotation.JsonAutoDetect.Value +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility import com.fasterxml.jackson.annotation.JsonCreator.Mode.DISABLED import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.core.JsonGenerator @@ -12,10 +14,14 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.databind.* import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.annotation.JsonSerialize +import com.fasterxml.jackson.databind.cfg.MapperConfig import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier import com.fasterxml.jackson.databind.deser.ContextualDeserializer import com.fasterxml.jackson.databind.deser.std.DelegatingDeserializer import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer +import com.fasterxml.jackson.databind.introspect.AnnotatedClass +import com.fasterxml.jackson.databind.introspect.BasicClassIntrospector +import com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.databind.node.IntNode import com.fasterxml.jackson.databind.node.ObjectNode @@ -32,7 +38,6 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.* import net.corda.core.internal.DigitalSignatureWithCert import net.corda.core.internal.createComponentGroups -import net.corda.core.internal.kotlinObjectInstance import net.corda.core.node.NodeInfo import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializedBytes @@ -56,7 +61,13 @@ class CordaModule : SimpleModule("corda-core") { override fun setupModule(context: SetupContext) { super.setupModule(context) + // For classes which are annotated with CordaSerializable we want to use the same set of properties as the Corda serilasation scheme. + // To do that we use CordaSerializableClassIntrospector to first turn on field visibility for these classes (the Jackson default is + // private fields are not included) and then we use CordaSerializableBeanSerializerModifier to remove any extra properties that Jackson + // might pick up. + context.setClassIntrospector(CordaSerializableClassIntrospector(context)) context.addBeanSerializerModifier(CordaSerializableBeanSerializerModifier()) + context.addBeanDeserializerModifier(AmountBeanDeserializerModifier()) context.setMixInAnnotations(PartyAndCertificate::class.java, PartyAndCertificateMixin::class.java) @@ -88,9 +99,22 @@ class CordaModule : SimpleModule("corda-core") { } } -/** - * Use the same properties that AMQP serialization uses if the POJO is @CordaSerializable - */ +private class CordaSerializableClassIntrospector(private val context: Module.SetupContext) : BasicClassIntrospector() { + override fun constructPropertyCollector( + config: MapperConfig<*>?, + ac: AnnotatedClass?, + type: JavaType, + forSerialization: Boolean, + mutatorPrefix: String? + ): POJOPropertiesCollector { + if (hasCordaSerializable(type.rawClass)) { + // Adjust the field visibility of CordaSerializable classes on the fly as they are encountered. + context.configOverride(type.rawClass).visibility = Value.defaultVisibility().withFieldVisibility(Visibility.ANY) + } + return super.constructPropertyCollector(config, ac, type, forSerialization, mutatorPrefix) + } +} + private class CordaSerializableBeanSerializerModifier : BeanSerializerModifier() { // We need to pass in a SerializerFactory when scanning for properties, but don't actually do any serialisation so any will do. private val serializerFactory = SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader) @@ -99,17 +123,10 @@ private class CordaSerializableBeanSerializerModifier : BeanSerializerModifier() beanDesc: BeanDescription, beanProperties: MutableList): MutableList { val beanClass = beanDesc.beanClass - if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null && !SerializeAsToken::class.java.isAssignableFrom(beanClass)) { + if (hasCordaSerializable(beanClass) && !SerializeAsToken::class.java.isAssignableFrom(beanClass)) { val typeInformation = serializerFactory.getTypeInformation(beanClass) - val properties = typeInformation.propertiesOrEmptyMap - val amqpProperties = properties.mapNotNull { (name, property) -> - if (property.isCalculated) null else name - } - val propertyRenames = beanDesc.findProperties().associateBy({ it.name }, { it.internalName }) - (amqpProperties - propertyRenames.values).let { - check(it.isEmpty()) { "Jackson didn't provide serialisers for $it" } - } - beanProperties.removeIf { propertyRenames[it.name] !in amqpProperties } + val propertyNames = typeInformation.propertiesOrEmptyMap.mapNotNull { if (it.value.isCalculated) null else it.key } + beanProperties.removeIf { it.name !in propertyNames } } return beanProperties } diff --git a/client/jackson/src/test/kotlin/net/corda/client/jackson/JacksonSupportTest.kt b/client/jackson/src/test/kotlin/net/corda/client/jackson/JacksonSupportTest.kt index c047e7f8fd..8f71de6b5d 100644 --- a/client/jackson/src/test/kotlin/net/corda/client/jackson/JacksonSupportTest.kt +++ b/client/jackson/src/test/kotlin/net/corda/client/jackson/JacksonSupportTest.kt @@ -29,6 +29,7 @@ import net.corda.core.node.services.NetworkParametersService import net.corda.core.node.services.TransactionStorage import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.SignedTransaction @@ -658,6 +659,15 @@ class JacksonSupportTest(@Suppress("unused") private val name: String, factory: assertThat(mapper.convertValue(json)).isEqualTo(data) } + @Test + fun `LinearState where the linearId property does not match the backing field`() { + val funkyLinearState = FunkyLinearState(UniqueIdentifier()) + // As a sanity check, show that this is a valid CordaSerializable class + assertThat(funkyLinearState.serialize().deserialize()).isEqualTo(funkyLinearState) + val json = mapper.valueToTree(funkyLinearState) + assertThat(mapper.convertValue(json)).isEqualTo(funkyLinearState) + } + @Test fun `kotlin object`() { val json = mapper.valueToTree(KotlinObject) @@ -713,6 +723,11 @@ class JacksonSupportTest(@Suppress("unused") private val name: String, factory: val nonCtor: Int get() = value } + private data class FunkyLinearState(private val linearID: UniqueIdentifier) : LinearState { + override val linearId: UniqueIdentifier get() = linearID + override val participants: List get() = emptyList() + } + private object KotlinObject @CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt index 8e14b134bc..6d37c48776 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt @@ -1,7 +1,6 @@ package net.corda.node.services.rpc import co.paralleluniverse.fibers.Stack -import co.paralleluniverse.strands.Strand import com.fasterxml.jackson.annotation.JsonAutoDetect import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility import com.fasterxml.jackson.annotation.JsonFormat @@ -363,9 +362,12 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private override fun changeProperties(config: SerializationConfig, beanDesc: BeanDescription, beanProperties: MutableList): MutableList { - // Remove references to any node singletons - beanProperties.removeIf { it.type.isTypeOrSubTypeOf(SerializeAsToken::class.java) } - if (FlowLogic::class.java.isAssignableFrom(beanDesc.beanClass)) { + if (SerializeAsToken::class.java.isAssignableFrom(beanDesc.beanClass)) { + // Do not serialise node singletons + // TODO This will cause the singleton to appear as an empty object. Ideally we don't want it to appear at all but this will + // have to do for now. + beanProperties.clear() + } else if (FlowLogic::class.java.isAssignableFrom(beanDesc.beanClass)) { beanProperties.removeIf { it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap" } diff --git a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/InteractiveShellIntegrationTest.kt b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/InteractiveShellIntegrationTest.kt index ca2ff63d58..8aadc6957a 100644 --- a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/InteractiveShellIntegrationTest.kt +++ b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/InteractiveShellIntegrationTest.kt @@ -3,7 +3,6 @@ package net.corda.tools.shell import co.paralleluniverse.fibers.Suspendable import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.type.TypeFactory -import com.google.common.io.Files import com.jcraft.jsch.ChannelExec import com.jcraft.jsch.JSch import com.nhaarman.mockito_kotlin.any @@ -11,17 +10,25 @@ import com.nhaarman.mockito_kotlin.doAnswer import com.nhaarman.mockito_kotlin.mock import net.corda.client.jackson.JacksonSupport import net.corda.client.rpc.RPCException +import net.corda.core.contracts.* import net.corda.core.flows.* +import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party +import net.corda.core.internal.createDirectories import net.corda.core.internal.div +import net.corda.core.internal.inputStream import net.corda.core.internal.list import net.corda.core.internal.messaging.InternalCordaRPCOps import net.corda.core.messaging.ClientRpcSslOptions import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow +import net.corda.core.node.ServiceHub +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.unwrap import net.corda.node.internal.NodeStartup import net.corda.node.services.Permissions import net.corda.node.services.Permissions.Companion.all @@ -34,6 +41,7 @@ import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver import net.corda.testing.driver.internal.NodeHandleInternal import net.corda.testing.internal.useSslRpcOverrides @@ -49,9 +57,10 @@ import org.junit.Ignore import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder -import java.util.zip.ZipFile +import java.util.* +import java.util.zip.ZipInputStream import javax.security.auth.x500.X500Principal -import kotlin.test.assertNotEquals +import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -73,14 +82,8 @@ class InteractiveShellIntegrationTest { fun `shell should not log in with invalid credentials`() { val user = User("u", "p", setOf()) driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = "fake", password = "fake", - hostAndPort = node.rpcAddress) - InteractiveShell.startShell(conf) - + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell("fake", "fake", node.rpcAddress) assertThatThrownBy { InteractiveShell.nodeInfo() }.isInstanceOf(ActiveMQSecurityException::class.java) } } @@ -88,15 +91,9 @@ class InteractiveShellIntegrationTest { @Test fun `shell should log in with valid credentials`() { val user = User("u", "p", setOf()) - driver(DriverParameters(notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress) - - InteractiveShell.startShell(conf) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell(node) InteractiveShell.nodeInfo() } } @@ -104,7 +101,6 @@ class InteractiveShellIntegrationTest { @Test fun `shell should log in with ssl`() { val user = User("mark", "dadada", setOf(all())) - var successful = false val (keyPair, cert) = createKeyPairAndSelfSignedTLSCertificate(testName) val keyStorePath = saveToKeyStore(tempFolder.root.toPath() / "keystore.jks", keyPair, cert) @@ -114,20 +110,10 @@ class InteractiveShellIntegrationTest { val clientSslOptions = ClientRpcSslOptions(trustStorePath, "password") driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - startNode(rpcUsers = listOf(user), customOverrides = brokerSslOptions.useSslRpcOverrides()).getOrThrow().use { node -> - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress, - ssl = clientSslOptions) - - InteractiveShell.startShell(conf) - - InteractiveShell.nodeInfo() - successful = true - } + val node = startNode(rpcUsers = listOf(user), customOverrides = brokerSslOptions.useSslRpcOverrides()).getOrThrow() + startShell(node, clientSslOptions) + InteractiveShell.nodeInfo() } - assertThat(successful).isTrue() } @Test @@ -142,47 +128,33 @@ class InteractiveShellIntegrationTest { val clientSslOptions = ClientRpcSslOptions(trustStorePath, "password") driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - startNode(rpcUsers = listOf(user), customOverrides = brokerSslOptions.useSslRpcOverrides()).getOrThrow().use { node -> - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress, - ssl = clientSslOptions) - - InteractiveShell.startShell(conf) - - assertThatThrownBy { InteractiveShell.nodeInfo() }.isInstanceOf(RPCException::class.java) - } + val node = startNode(rpcUsers = listOf(user), customOverrides = brokerSslOptions.useSslRpcOverrides()).getOrThrow() + startShell(node, clientSslOptions) + assertThatThrownBy { InteractiveShell.nodeInfo() }.isInstanceOf(RPCException::class.java) } } @Test fun `internal shell user should not be able to connect if node started with devMode=false`() { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - startNode().getOrThrow().use { node -> - val conf = (node as NodeHandleInternal).configuration.toShellConfig() - InteractiveShell.startShell(conf) - assertThatThrownBy { InteractiveShell.nodeInfo() }.isInstanceOf(ActiveMQSecurityException::class.java) - } + val node = startNode().getOrThrow() + val conf = (node as NodeHandleInternal).configuration.toShellConfig() + InteractiveShell.startShell(conf) + assertThatThrownBy { InteractiveShell.nodeInfo() }.isInstanceOf(ActiveMQSecurityException::class.java) } } @Ignore @Test fun `ssh runs flows via standalone shell`() { - val user = User("u", "p", setOf(Permissions.startFlow(), + val user = User("u", "p", setOf( + Permissions.startFlow(), Permissions.invokeRpc(CordaRPCOps::registeredFlows), - Permissions.invokeRpc(CordaRPCOps::nodeInfo))) - driver(DriverParameters(notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress, - sshdPort = 2224) - - InteractiveShell.startShell(conf) + Permissions.invokeRpc(CordaRPCOps::nodeInfo) + )) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell(node, sshdPort = 2224) InteractiveShell.nodeInfo() val session = JSch().getSession("u", "localhost", 2224) @@ -200,7 +172,7 @@ class InteractiveShellIntegrationTest { val response = String(Streams.readAll(channel.inputStream)) - val linesWithDoneCount = response.lines().filter { line -> line.contains("Done") } + val linesWithDoneCount = response.lines().filter { line -> "Done" in line } channel.disconnect() session.disconnect() @@ -213,9 +185,11 @@ class InteractiveShellIntegrationTest { @Ignore @Test fun `ssh run flows via standalone shell over ssl to node`() { - val user = User("mark", "dadada", setOf(Permissions.startFlow(), + val user = User("mark", "dadada", setOf( + Permissions.startFlow(), Permissions.invokeRpc(CordaRPCOps::registeredFlows), - Permissions.invokeRpc(CordaRPCOps::nodeInfo)/*all()*/)) + Permissions.invokeRpc(CordaRPCOps::nodeInfo)/*all()*/ + )) val (keyPair, cert) = createKeyPairAndSelfSignedTLSCertificate(testName) val keyStorePath = saveToKeyStore(tempFolder.root.toPath() / "keystore.jks", keyPair, cert) @@ -226,14 +200,7 @@ class InteractiveShellIntegrationTest { var successful = false driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { startNode(rpcUsers = listOf(user), customOverrides = brokerSslOptions.useSslRpcOverrides()).getOrThrow().use { node -> - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress, - ssl = clientSslOptions, - sshdPort = 2223) - - InteractiveShell.startShell(conf) + startShell(node, clientSslOptions, sshdPort = 2223) InteractiveShell.nodeInfo() val session = JSch().getSession("mark", "localhost", 2223) @@ -251,7 +218,7 @@ class InteractiveShellIntegrationTest { val response = String(Streams.readAll(channel.inputStream)) - val linesWithDoneCount = response.lines().filter { line -> line.contains("Done") } + val linesWithDoneCount = response.lines().filter { line -> "Done" in line } channel.disconnect() session.disconnect() // TODO Simon make sure to close them @@ -263,174 +230,136 @@ class InteractiveShellIntegrationTest { } assertThat(successful).isTrue() - } } @Test fun `shell should start flow with fully qualified class name`() { val user = User("u", "p", setOf(all())) - var successful = false - driver(DriverParameters(notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress) - InteractiveShell.startShell(conf) - - // setup and configure some mocks required by InteractiveShell.runFlowByNameFragment() - val output = mock { - on { println(any()) } doAnswer { - val line = it.arguments[0] - println("$line") - if ((line is String) && (line.startsWith("Flow completed with result:"))) - successful = true - } - } - val ansiProgressRenderer = mock { - on { render(any(), any()) } doAnswer { InteractiveShell.latch.countDown() } - } - InteractiveShell.runFlowByNameFragment( - "NoOpFlow", - "", output, node.rpc, ansiProgressRenderer) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell(node) + val (output, lines) = mockRenderPrintWriter() + InteractiveShell.runFlowByNameFragment(NoOpFlow::class.java.name, "", output, node.rpc, mockAnsiProgressRenderer()) + assertThat(lines.last()).startsWith("Flow completed with result:") } - assertThat(successful).isTrue() } @Test fun `shell should start flow with unique un-qualified class name`() { val user = User("u", "p", setOf(all())) - var successful = false - driver(DriverParameters(notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress) - InteractiveShell.startShell(conf) - - // setup and configure some mocks required by InteractiveShell.runFlowByNameFragment() - val output = mock { - on { println(any()) } doAnswer { - val line = it.arguments[0] - println("$line") - if ((line is String) && (line.startsWith("Flow completed with result:"))) - successful = true - } - } - val ansiProgressRenderer = mock { - on { render(any(), any()) } doAnswer { InteractiveShell.latch.countDown() } - } - InteractiveShell.runFlowByNameFragment( - "NoOpFlowA", - "", output, node.rpc, ansiProgressRenderer) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell(node) + val (output, lines) = mockRenderPrintWriter() + InteractiveShell.runFlowByNameFragment("NoOpFlowA", "", output, node.rpc, mockAnsiProgressRenderer()) + assertThat(lines.last()).startsWith("Flow completed with result:") } - assertThat(successful).isTrue() } @Test fun `shell should fail to start flow with ambiguous class name`() { val user = User("u", "p", setOf(all())) - var successful = false - driver(DriverParameters(notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress) - InteractiveShell.startShell(conf) - - // setup and configure some mocks required by InteractiveShell.runFlowByNameFragment() - val output = mock { - on { println(any()) } doAnswer { - val line = it.arguments[0] - println("$line") - if ((line is String) && (line.startsWith("Ambiguous name provided, please be more specific."))) - successful = true - } - } - val ansiProgressRenderer = mock { - on { render(any(), any()) } doAnswer { InteractiveShell.latch.countDown() } - } - InteractiveShell.runFlowByNameFragment( - "NoOpFlo", - "", output, node.rpc, ansiProgressRenderer) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell(node) + val (output, lines) = mockRenderPrintWriter() + InteractiveShell.runFlowByNameFragment("NoOpFlo", "", output, node.rpc, mockAnsiProgressRenderer()) + assertThat(lines.any { it.startsWith("Ambiguous name provided, please be more specific.") }).isTrue() } - assertThat(successful).isTrue() } @Test fun `shell should start flow with partially matching class name`() { val user = User("u", "p", setOf(all())) - var successful = false - driver(DriverParameters(notarySpecs = emptyList())) { - val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true) - val node = nodeFuture.getOrThrow() - - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = node.rpcAddress) - InteractiveShell.startShell(conf) - - // setup and configure some mocks required by InteractiveShell.runFlowByNameFragment() - val output = mock { - on { println(any()) } doAnswer { - val line = it.arguments[0] - println("$line") - if ((line is String) && (line.startsWith("Flow completed with result"))) - successful = true - } - } - val ansiProgressRenderer = mock { - on { render(any(), any()) } doAnswer { InteractiveShell.latch.countDown() } - } - InteractiveShell.runFlowByNameFragment( - "Burble", - "", output, node.rpc, ansiProgressRenderer) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + startShell(node) + val (output, lines) = mockRenderPrintWriter() + InteractiveShell.runFlowByNameFragment("Burble", "", output, node.rpc, mockAnsiProgressRenderer()) + assertThat(lines.last()).startsWith("Flow completed with result") } - assertThat(successful).isTrue() } @Test fun `dumpCheckpoints creates zip with json file for suspended flow`() { val user = User("u", "p", setOf(all())) - driver(DriverParameters(notarySpecs = emptyList())) { - val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true).getOrThrow() - val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user), startInSameProcess = true).getOrThrow() + driver(DriverParameters(startNodesInProcess = true)) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() bobNode.stop() - // create logs directory since the driver is not creating it - (aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).toFile().mkdir() + // Create logs directory since the driver is not creating it + (aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).createDirectories() - val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(), - user = user.username, password = user.password, - hostAndPort = aliceNode.rpcAddress) - InteractiveShell.startShell(conf) - // setup and configure some mocks required by InteractiveShell.runFlowByNameFragment() - val output = mock { - on { println(any()) } doAnswer { - val line = it.arguments[0] - assertNotEquals("Please try 'man run' to learn what syntax is acceptable", line) - } + startShell(aliceNode) + + val linearId = UniqueIdentifier(id = UUID.fromString("7c0719f0-e489-46e8-bf3b-ee203156fc7c")) + aliceNode.rpc.startFlow( + ::FlowForCheckpointDumping, + MyState( + "some random string", + linearId, + listOf(aliceNode.nodeInfo.singleIdentity(), bobNode.nodeInfo.singleIdentity()) + ), + bobNode.nodeInfo.singleIdentity() + ) + + Thread.sleep(5000) + + val (output) = mockRenderPrintWriter() + InteractiveShell.runRPCFromString(listOf("dumpCheckpoints"), output, mock(), aliceNode.rpc as InternalCordaRPCOps, inputObjectMapper) + + val zipFile = (aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() } + val json = ZipInputStream(zipFile.inputStream()).use { zip -> + zip.nextEntry + ObjectMapper().readTree(zip) } - aliceNode.rpc.startFlow(::SendFlow, bobNode.nodeInfo.singleIdentity()) + assertNotNull(json["flowId"].asText()) + assertEquals(FlowForCheckpointDumping::class.java.name, json["topLevelFlowClass"].asText()) + assertEquals(linearId.id.toString(), json["topLevelFlowLogic"]["myState"]["linearId"]["id"].asText()) + assertEquals(4, json["flowCallStackSummary"].size()) + assertEquals(4, json["flowCallStack"].size()) + val sendAndReceiveJson = json["suspendedOn"]["sendAndReceive"][0] + assertEquals(bobNode.nodeInfo.singleIdentity().toString(), sendAndReceiveJson["session"]["peer"].asText()) + assertEquals(SignedTransaction::class.qualifiedName, sendAndReceiveJson["sentPayloadType"].asText()) + } + } - InteractiveShell.runRPCFromString( - listOf("dumpCheckpoints"), output, mock(), aliceNode.rpc as InternalCordaRPCOps, inputObjectMapper) + private fun startShell(node: NodeHandle, ssl: ClientRpcSslOptions? = null, sshdPort: Int? = null) { + val user = node.rpcUsers[0] + startShell(user.username, user.password, node.rpcAddress, ssl, sshdPort) + } - // assert that the checkpoint dump zip has been created - val zip = (aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list() - .find { it.toString().contains("checkpoints_dump-") } - assertNotNull(zip) - // assert that a json file has been created for the suspended flow - val json = ZipFile((zip!!).toFile()).entries().asSequence() - .find { it.name.contains(SendFlow::class.simpleName!!) } - assertNotNull(json) + private fun startShell(user: String, password: String, address: NetworkHostAndPort, ssl: ClientRpcSslOptions? = null, sshdPort: Int? = null) { + val conf = ShellConfiguration( + commandsDirectory = tempFolder.newFolder().toPath(), + user = user, + password = password, + hostAndPort = address, + ssl = ssl, + sshdPort = sshdPort + ) + InteractiveShell.startShell(conf) + } + + private fun mockRenderPrintWriter(): Pair> { + val lines = ArrayList() + val writer = mock { + on { println(any()) } doAnswer { + val line = it.getArgument(0, String::class.java) + println(">>> $line") + lines += line + Unit + } + } + return Pair(writer, lines) + } + + private fun mockAnsiProgressRenderer(): ANSIProgressRenderer { + return mock { + on { render(any(), any()) } doAnswer { InteractiveShell.latch.countDown() } } } @@ -438,7 +367,6 @@ class InteractiveShellIntegrationTest { val objectMapper = JacksonSupport.createNonRpcMapper() val tf = TypeFactory.defaultInstance().withClassLoader(classLoader) objectMapper.typeFactory = tf - return objectMapper } } @@ -470,23 +398,47 @@ class BurbleFlow : FlowLogic() { } } -@StartableByRPC @InitiatingFlow -class SendFlow(private val party: Party) : FlowLogic() { - override val progressTracker = ProgressTracker() +@StartableByRPC +class FlowForCheckpointDumping(private val myState: MyState, private val party: Party): FlowLogic() { + // Make sure any SerializeAsToken instances are not serialised + private var services: ServiceHub? = null + @Suspendable override fun call() { - initiateFlow(party).sendAndReceive("hi").unwrap { it } + services = serviceHub + val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply { + addOutputState(myState) + addCommand(MyContract.Create(), listOf(ourIdentity, party).map(Party::owningKey)) + } + val sessions = listOf(initiateFlow(party)) + val stx = serviceHub.signInitialTransaction(tx) + subFlow(CollectSignaturesFlow(stx, sessions)) + throw IllegalStateException("The test should not get here") } } -@InitiatedBy(SendFlow::class) -class ReceiveFlow(private val session: FlowSession) : FlowLogic() { - override val progressTracker = ProgressTracker() - @Suspendable +@InitiatedBy(FlowForCheckpointDumping::class) +class FlowForCheckpointDumpingResponder(private val session: FlowSession): FlowLogic() { override fun call() { - session.receive().unwrap { it } - session.send("hi") + val signTxFlow = object : SignTransactionFlow(session) { + override fun checkTransaction(stx: SignedTransaction) { + + } + } + subFlow(signTxFlow) + throw IllegalStateException("The test should not get here") } } +class MyContract : Contract { + class Create : CommandData + override fun verify(tx: LedgerTransaction) {} +} + +@BelongsToContract(MyContract::class) +data class MyState( + val data: String, + override val linearId: UniqueIdentifier, + override val participants: List +) : LinearState