diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 41f5a7d72b..94ddb681ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -108,7 +108,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa if (connected) { log.info("Bridge Connected") val sessionFactory = artemis.started!!.sessionFactory - val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, false, false, false, DEFAULT_ACK_BATCH_SIZE) + val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) this.session = session val consumer = session.createConsumer(queueName) this.consumer = consumer @@ -146,9 +146,11 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa lock.withLock { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { artemisMessage.acknowledge() - session?.commit() } else { log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + // We need to commit any acknowledged messages before rolling back the failed + // (unacknowledged) message. + session?.commit() session?.rollback(false) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index c6408bb420..0baa7e8897 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -69,23 +69,33 @@ class AMQPBridgeTest { val receive = amqpServer.onReceive.toBlocking().iterator amqpServer.start() + val receivedSequence = mutableListOf() + + fun formatMessage(expected: String, actual: Int, received: List): String { + return "Expected message with id $expected, got $actual, previous message receive sequence: " + "${received.joinToString(", ", "[", "]")}." + } + val received1 = receive.next() val messageID1 = received1.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertEquals(0, messageID1) received1.complete(true) // Accept first message + receivedSequence.add(messageID1) val received2 = receive.next() val messageID2 = received2.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) - assertEquals(1, messageID2) + assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence)) received2.complete(false) // Reject message + receivedSequence.add(messageID2) while (true) { val received3 = receive.next() val messageID3 = received3.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) - assertNotEquals(0, messageID3) + assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence)) + receivedSequence.add(messageID3) if (messageID3 != 1) { // keep rejecting any batched items following rejection received3.complete(false) } else { // beginnings of replay so accept again @@ -98,6 +108,7 @@ class AMQPBridgeTest { val received4 = receive.next() val messageID4 = received4.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) + receivedSequence.add(messageID4) if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip assertEquals(2, messageID4) // next message should be in order though break @@ -119,13 +130,16 @@ class AMQPBridgeTest { val received5 = receive.next() val messageID5 = received5.applicationProperties["CountProp"] as Int if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip - assertEquals(-1, messageID5) // next message should be in order though + assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though assertArrayEquals("Test_end".toByteArray(), received5.payload) + receivedSequence.add(messageID5) break } + receivedSequence.add(messageID5) received5.complete(true) } + println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}") bridgeManager.stop() amqpServer.stop() artemisClient.stop() diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a79c77c4b7..4138f7a92b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -175,18 +175,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, initCertificate() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) - return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database -> - // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like - // a code smell. - val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) - persistentNetworkMapCache.start() - val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) - val signedNodeInfo = nodeInfo.sign { publicKey, serialised -> - val privateKey = keyPairs.single { it.public == publicKey }.private - privateKey.sign(serialised.bytes) + return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use { + it.transaction { + // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell. + val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) + persistentNetworkMapCache.start() + val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) + val signedNodeInfo = nodeInfo.sign { publicKey, serialised -> + val privateKey = keyPairs.single { it.public == publicKey }.private + privateKey.sign(serialised.bytes) + } + NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo) + nodeInfo } - NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo) - nodeInfo } } @@ -204,7 +205,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, "Node's platform version is lower than network's required minimumPlatformVersion" } // Do all of this in a database transaction so anything that might need a connection has one. - val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database -> + val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction { val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) @@ -255,7 +256,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, registerCordappFlows(smm) _services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } startShell(rpcOps) - Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) + Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } networkMapUpdater = NetworkMapUpdater(services.networkMapCache, NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), @@ -629,24 +630,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Specific class so that MockNode can catch it. class DatabaseConfigurationException(msg: String) : CordaException(msg) - protected open fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T { + protected open fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence { log.debug { val driverClasses = DriverManager.getDrivers().asSequence().map { it.javaClass.name } "Available JDBC drivers: $driverClasses" } val props = configuration.dataSourceProperties - if (props.isNotEmpty()) { - val database = configureDatabase(props, configuration.database, identityService, schemaService) - // Now log the vendor string as this will also cause a connection to be tested eagerly. - logVendorString(database, log) - runOnStop += database::close - return database.transaction { - insideTransaction(database) - } - } else { - throw DatabaseConfigurationException("There must be a database configured.") - } + if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.") + val database = configureDatabase(props, configuration.database, identityService, schemaService) + // Now log the vendor string as this will also cause a connection to be tested eagerly. + logVendorString(database, log) + runOnStop += database::close + return database } private fun makeNotaryService(tokenizableServices: MutableList, database: CordaPersistence): NotaryService? { diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index fc67d2e1ae..1cce869de9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -2,7 +2,6 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter import net.corda.core.concurrent.CordaFuture -import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.thenMatch import net.corda.core.internal.div @@ -324,7 +323,7 @@ open class Node(configuration: NodeConfiguration, * This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details * on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url */ - override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T { + override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence { val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url") val h2Prefix = "jdbc:h2:file:" if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) { @@ -344,7 +343,7 @@ open class Node(configuration: NodeConfiguration, else if (databaseUrl != null) { printBasicNodeInfo("Database connection url is", databaseUrl) } - return super.initialiseDatabasePersistence(schemaService, identityService, insideTransaction) + return super.initialiseDatabasePersistence(schemaService, identityService) } private val _startupComplete = openFuture() diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt new file mode 100644 index 0000000000..e47ab67a0d --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -0,0 +1,81 @@ +package net.corda.node.internal + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.readObject +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.deserialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.VersionInfo +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.network.PersistentNetworkMapCache +import net.corda.nodeapi.internal.SignedNodeInfo +import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX +import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.nio.file.Files +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals +import kotlin.test.assertNull + +class NodeTest { + private abstract class AbstractNodeConfiguration : NodeConfiguration + + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + private fun nodeInfoFile() = temporaryFolder.root.listFiles().singleOrNull { it.name.startsWith(NODE_INFO_FILE_NAME_PREFIX) } + private fun AbstractNode.generateNodeInfo(): NodeInfo { + assertNull(nodeInfoFile()) + generateAndSaveNodeInfo() + val path = nodeInfoFile()!!.toPath() + val nodeInfo = path.readObject().raw.deserialize() + Files.delete(path) + return nodeInfo + } + + @Test + fun `generateAndSaveNodeInfo works`() { + val nodeAddress = NetworkHostAndPort("0.1.2.3", 456) + val nodeName = CordaX500Name("Manx Blockchain Corp", "Douglas", "IM") + val platformVersion = 789 + val dataSourceProperties = makeTestDataSourceProperties() + val databaseConfig = DatabaseConfig() + val configuration = rigorousMock().also { + doReturn(nodeAddress).whenever(it).p2pAddress + doReturn(nodeName).whenever(it).myLegalName + doReturn(null).whenever(it).notary // Don't add notary identity. + doReturn(dataSourceProperties).whenever(it).dataSourceProperties + doReturn(databaseConfig).whenever(it).database + doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory + doReturn(true).whenever(it).devMode // Needed for identity cert. + doReturn("tsp").whenever(it).trustStorePassword + doReturn("ksp").whenever(it).keyStorePassword + } + configureDatabase(dataSourceProperties, databaseConfig, rigorousMock()).use { database -> + val node = Node(configuration, rigorousMock().also { + doReturn(platformVersion).whenever(it).platformVersion + }, initialiseSerialization = false) + val nodeInfo = node.generateNodeInfo() + assertEquals(listOf(nodeAddress), nodeInfo.addresses) + assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name }) + assertEquals(platformVersion, nodeInfo.platformVersion) + node.generateNodeInfo().let { + assertNotEquals(nodeInfo, it) // Different serial. + assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial)) + } + PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo) + assertEquals(nodeInfo, node.generateNodeInfo()) + } + } +} diff --git a/samples/simm-valuation-demo/README.md b/samples/simm-valuation-demo/README.md index b6160dc5d5..8a5bd84144 100644 --- a/samples/simm-valuation-demo/README.md +++ b/samples/simm-valuation-demo/README.md @@ -103,7 +103,7 @@ party to Bank A and Bank A will appear as a counter party In what follows, we assume we are Bank A (which is listening on port 10005) -Notice the id field in the output of the ``whoami`` command. We are going to use the id assocatied +Notice the id field in the output of the ``whoami`` command. We are going to use the id associated with Bank C, one of our counter parties, to create a trade. The general command for this is: curl -i -H "Content-Type: application/json" -X PUT -d <<>> http://localhost:10005/api/simmvaluationdemo/<<>>/trades @@ -123,14 +123,23 @@ where the representation of the trade is "fixedRate" : "0.1" } -Continuing our example, the specific command we would run is +Continuing our example, the specific command would look as follows curl -i -H "Content-Type: application/json" \ -X PUT \ -d '{"id":"trade1","description" : "desc","tradeDate" : [ 2016, 6, 6 ], "convention" : "EUR_FIXED_1Y_EURIBOR_3M", "startDate" : [ 2016, 6, 6 ], "endDate" : [ 2020, 1, 2 ], "buySell" : "BUY", "notional" : "1000", "fixedRate" : "0.1"}' \ http://localhost:10005/api/simmvaluationdemo/8Kqd4oWdx4KQGHGL1DzULumUmZyyokeSGJDY1n5M6neUfAj2sjbf65wYwQM/trades -With an expected response of +Note: you should replace the node id 8Kqd4oWdx4KQGHGL1DzULumUmZyyokeSGJDY1n5M6neUfAj2sjbf65wYwQM with the node id returned by the +whoami call above for one of the counterparties. In our worked example we selected "Bank C" and used the generated id for that node. +Thus, the actual command would be: + + curl -i -H "Content-Type: application/json" \ + -X PUT \ + -d '{"id":"trade1","description" : "desc","tradeDate" : [ 2016, 6, 6 ], "convention" : "EUR_FIXED_1Y_EURIBOR_3M", "startDate" : [ 2016, 6, 6 ], "endDate" : [ 2020, 1, 2 ], "buySell" : "BUY", "notional" : "1000", "fixedRate" : "0.1"}' \ + http://localhost:10005/api/simmvaluationdemo/<<>/trades + +Once executed, the expected response is: HTTP/1.1 202 Accepted Date: Thu, 28 Sep 2017 17:19:39 GMT @@ -141,7 +150,7 @@ With an expected response of **Verifying trade completion** -With the trade completed and stored by both parties, the complete list of trades with our couterparty can be seen with the following command +With the trade completed and stored by both parties, the complete list of trades with our counterparty can be seen with the following command curl -X GET http://localhost:10005/api/simmvaluationdemo/<<>>/trades @@ -188,7 +197,7 @@ The specific command for out Bank A example is This demo does not, however, include real SIMM valuation code but a stub for the OpenGamma set of libraries, so please do not base any financial decisions on results generated by this demo. -This demo was built in partnership with OpenGamma and used their SIMM library. However, due to licensing constraints we cannot distribute their library with this code. For this reason, we have stubbed out the relevant parts and replaced it with a very simplistic template that returns fake (but correctly structured) data. However, if you wish to use a realistic library, then please do get in touch with OpenGamma directly for access to their libraries and we will be happy to demonstrate how to replace the stub code. +This demo was built in partnership with OpenGamma and used their SIMM library. However, due to licensing constraints we cannot distribute their library with this code. For this reason, we have stubbed out the relevant parts and replaced it with a very simplistic template that returns fake (but correctly structured) data. However, if you wish to use a realistic library, then please do get in touch with OpenGamma directly for access to their libraries and we will be happy to demonstrate how to replace the stub code. ## Troubleshooting diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 0a5d250856..effa894298 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -297,11 +297,8 @@ open class InternalMockNetwork(private val cordappPackages: List, override val serializationWhitelists: List get() = _serializationWhitelists private var dbCloser: (() -> Any?)? = null - override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T { - return super.initialiseDatabasePersistence(schemaService, identityService) { database -> - dbCloser = database::close - insideTransaction(database) - } + override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence { + return super.initialiseDatabasePersistence(schemaService, identityService).also { dbCloser = it::close } } fun disableDBCloseOnStop() {