Merge commit '4d4253a287c6ddccddeab8ed24f9e16da5e25bc2' into andr3ej-os-merge

# Conflicts:
#	node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
#	node/src/main/kotlin/net/corda/node/internal/Node.kt
This commit is contained in:
Andrzej Cichocki 2018-02-28 14:39:29 +00:00
commit b578b934f7
No known key found for this signature in database
GPG Key ID: 21B3BCB0BD5B0832
7 changed files with 141 additions and 43 deletions

View File

@ -108,7 +108,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
if (connected) { if (connected) {
log.info("Bridge Connected") log.info("Bridge Connected")
val sessionFactory = artemis.started!!.sessionFactory val sessionFactory = artemis.started!!.sessionFactory
val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, false, false, false, DEFAULT_ACK_BATCH_SIZE) val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session this.session = session
val consumer = session.createConsumer(queueName) val consumer = session.createConsumer(queueName)
this.consumer = consumer this.consumer = consumer
@ -146,9 +146,11 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
lock.withLock { lock.withLock {
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
artemisMessage.acknowledge() artemisMessage.acknowledge()
session?.commit()
} else { } else {
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// We need to commit any acknowledged messages before rolling back the failed
// (unacknowledged) message.
session?.commit()
session?.rollback(false) session?.rollback(false)
} }
} }

View File

@ -69,23 +69,33 @@ class AMQPBridgeTest {
val receive = amqpServer.onReceive.toBlocking().iterator val receive = amqpServer.onReceive.toBlocking().iterator
amqpServer.start() amqpServer.start()
val receivedSequence = mutableListOf<Int>()
fun formatMessage(expected: String, actual: Int, received: List<Int>): String {
return "Expected message with id $expected, got $actual, previous message receive sequence: "
"${received.joinToString(", ", "[", "]")}."
}
val received1 = receive.next() val received1 = receive.next()
val messageID1 = received1.applicationProperties["CountProp"] as Int val messageID1 = received1.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
assertEquals(0, messageID1) assertEquals(0, messageID1)
received1.complete(true) // Accept first message received1.complete(true) // Accept first message
receivedSequence.add(messageID1)
val received2 = receive.next() val received2 = receive.next()
val messageID2 = received2.applicationProperties["CountProp"] as Int val messageID2 = received2.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
assertEquals(1, messageID2) assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
received2.complete(false) // Reject message received2.complete(false) // Reject message
receivedSequence.add(messageID2)
while (true) { while (true) {
val received3 = receive.next() val received3 = receive.next()
val messageID3 = received3.applicationProperties["CountProp"] as Int val messageID3 = received3.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
assertNotEquals(0, messageID3) assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence))
receivedSequence.add(messageID3)
if (messageID3 != 1) { // keep rejecting any batched items following rejection if (messageID3 != 1) { // keep rejecting any batched items following rejection
received3.complete(false) received3.complete(false)
} else { // beginnings of replay so accept again } else { // beginnings of replay so accept again
@ -98,6 +108,7 @@ class AMQPBridgeTest {
val received4 = receive.next() val received4 = receive.next()
val messageID4 = received4.applicationProperties["CountProp"] as Int val messageID4 = received4.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
receivedSequence.add(messageID4)
if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip
assertEquals(2, messageID4) // next message should be in order though assertEquals(2, messageID4) // next message should be in order though
break break
@ -119,13 +130,16 @@ class AMQPBridgeTest {
val received5 = receive.next() val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int val messageID5 = received5.applicationProperties["CountProp"] as Int
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
assertEquals(-1, messageID5) // next message should be in order though assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though
assertArrayEquals("Test_end".toByteArray(), received5.payload) assertArrayEquals("Test_end".toByteArray(), received5.payload)
receivedSequence.add(messageID5)
break break
} }
receivedSequence.add(messageID5)
received5.complete(true) received5.complete(true)
} }
println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
bridgeManager.stop() bridgeManager.stop()
amqpServer.stop() amqpServer.stop()
artemisClient.stop() artemisClient.stop()

View File

@ -175,18 +175,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
initCertificate() initCertificate()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database -> return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like it.transaction {
// a code smell. // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start() persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = nodeInfo.sign { publicKey, serialised -> val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes) privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
nodeInfo
} }
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
nodeInfo
} }
} }
@ -204,7 +205,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"Node's platform version is lower than network's required minimumPlatformVersion" "Node's platform version is lower than network's required minimumPlatformVersion"
} }
// Do all of this in a database transaction so anything that might need a connection has one. // Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database -> val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair) val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
@ -255,7 +256,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
registerCordappFlows(smm) registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } _services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell(rpcOps) startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
} }
networkMapUpdater = NetworkMapUpdater(services.networkMapCache, networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
@ -629,24 +630,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Specific class so that MockNode can catch it. // Specific class so that MockNode can catch it.
class DatabaseConfigurationException(msg: String) : CordaException(msg) class DatabaseConfigurationException(msg: String) : CordaException(msg)
protected open fun <T> initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T { protected open fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
log.debug { log.debug {
val driverClasses = DriverManager.getDrivers().asSequence().map { it.javaClass.name } val driverClasses = DriverManager.getDrivers().asSequence().map { it.javaClass.name }
"Available JDBC drivers: $driverClasses" "Available JDBC drivers: $driverClasses"
} }
val props = configuration.dataSourceProperties val props = configuration.dataSourceProperties
if (props.isNotEmpty()) { if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.")
val database = configureDatabase(props, configuration.database, identityService, schemaService) val database = configureDatabase(props, configuration.database, identityService, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly. // Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log) logVendorString(database, log)
runOnStop += database::close runOnStop += database::close
return database.transaction { return database
insideTransaction(database)
}
} else {
throw DatabaseConfigurationException("There must be a database configured.")
}
} }
private fun makeNotaryService(tokenizableServices: MutableList<Any>, database: CordaPersistence): NotaryService? { private fun makeNotaryService(tokenizableServices: MutableList<Any>, database: CordaPersistence): NotaryService? {

View File

@ -2,7 +2,6 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter import com.codahale.metrics.JmxReporter
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.div import net.corda.core.internal.div
@ -324,7 +323,7 @@ open class Node(configuration: NodeConfiguration,
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details * This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url * on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
*/ */
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T { override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url") val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
val h2Prefix = "jdbc:h2:file:" val h2Prefix = "jdbc:h2:file:"
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) { if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {
@ -344,7 +343,7 @@ open class Node(configuration: NodeConfiguration,
else if (databaseUrl != null) { else if (databaseUrl != null) {
printBasicNodeInfo("Database connection url is", databaseUrl) printBasicNodeInfo("Database connection url is", databaseUrl)
} }
return super.initialiseDatabasePersistence(schemaService, identityService, insideTransaction) return super.initialiseDatabasePersistence(schemaService, identityService)
} }
private val _startupComplete = openFuture<Unit>() private val _startupComplete = openFuture<Unit>()

View File

@ -0,0 +1,81 @@
package net.corda.node.internal
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.readObject
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Files
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertNull
class NodeTest {
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private fun nodeInfoFile() = temporaryFolder.root.listFiles().singleOrNull { it.name.startsWith(NODE_INFO_FILE_NAME_PREFIX) }
private fun AbstractNode.generateNodeInfo(): NodeInfo {
assertNull(nodeInfoFile())
generateAndSaveNodeInfo()
val path = nodeInfoFile()!!.toPath()
val nodeInfo = path.readObject<SignedNodeInfo>().raw.deserialize()
Files.delete(path)
return nodeInfo
}
@Test
fun `generateAndSaveNodeInfo works`() {
val nodeAddress = NetworkHostAndPort("0.1.2.3", 456)
val nodeName = CordaX500Name("Manx Blockchain Corp", "Douglas", "IM")
val platformVersion = 789
val dataSourceProperties = makeTestDataSourceProperties()
val databaseConfig = DatabaseConfig()
val configuration = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(nodeAddress).whenever(it).p2pAddress
doReturn(nodeName).whenever(it).myLegalName
doReturn(null).whenever(it).notary // Don't add notary identity.
doReturn(dataSourceProperties).whenever(it).dataSourceProperties
doReturn(databaseConfig).whenever(it).database
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
doReturn(true).whenever(it).devMode // Needed for identity cert.
doReturn("tsp").whenever(it).trustStorePassword
doReturn("ksp").whenever(it).keyStorePassword
}
configureDatabase(dataSourceProperties, databaseConfig, rigorousMock()).use { database ->
val node = Node(configuration, rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion
}, initialiseSerialization = false)
val nodeInfo = node.generateNodeInfo()
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
assertEquals(platformVersion, nodeInfo.platformVersion)
node.generateNodeInfo().let {
assertNotEquals(nodeInfo, it) // Different serial.
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
}
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
assertEquals(nodeInfo, node.generateNodeInfo())
}
}
}

View File

@ -103,7 +103,7 @@ party to Bank A and Bank A will appear as a counter party
In what follows, we assume we are Bank A (which is listening on port 10005) In what follows, we assume we are Bank A (which is listening on port 10005)
Notice the id field in the output of the ``whoami`` command. We are going to use the id assocatied Notice the id field in the output of the ``whoami`` command. We are going to use the id associated
with Bank C, one of our counter parties, to create a trade. The general command for this is: with Bank C, one of our counter parties, to create a trade. The general command for this is:
curl -i -H "Content-Type: application/json" -X PUT -d <<<JSON representation of the trade>>> http://localhost:10005/api/simmvaluationdemo/<<<counter party id>>>/trades curl -i -H "Content-Type: application/json" -X PUT -d <<<JSON representation of the trade>>> http://localhost:10005/api/simmvaluationdemo/<<<counter party id>>>/trades
@ -123,14 +123,23 @@ where the representation of the trade is
"fixedRate" : "0.1" "fixedRate" : "0.1"
} }
Continuing our example, the specific command we would run is Continuing our example, the specific command would look as follows
curl -i -H "Content-Type: application/json" \ curl -i -H "Content-Type: application/json" \
-X PUT \ -X PUT \
-d '{"id":"trade1","description" : "desc","tradeDate" : [ 2016, 6, 6 ], "convention" : "EUR_FIXED_1Y_EURIBOR_3M", "startDate" : [ 2016, 6, 6 ], "endDate" : [ 2020, 1, 2 ], "buySell" : "BUY", "notional" : "1000", "fixedRate" : "0.1"}' \ -d '{"id":"trade1","description" : "desc","tradeDate" : [ 2016, 6, 6 ], "convention" : "EUR_FIXED_1Y_EURIBOR_3M", "startDate" : [ 2016, 6, 6 ], "endDate" : [ 2020, 1, 2 ], "buySell" : "BUY", "notional" : "1000", "fixedRate" : "0.1"}' \
http://localhost:10005/api/simmvaluationdemo/8Kqd4oWdx4KQGHGL1DzULumUmZyyokeSGJDY1n5M6neUfAj2sjbf65wYwQM/trades http://localhost:10005/api/simmvaluationdemo/8Kqd4oWdx4KQGHGL1DzULumUmZyyokeSGJDY1n5M6neUfAj2sjbf65wYwQM/trades
With an expected response of Note: you should replace the node id 8Kqd4oWdx4KQGHGL1DzULumUmZyyokeSGJDY1n5M6neUfAj2sjbf65wYwQM with the node id returned by the
whoami call above for one of the counterparties. In our worked example we selected "Bank C" and used the generated id for that node.
Thus, the actual command would be:
curl -i -H "Content-Type: application/json" \
-X PUT \
-d '{"id":"trade1","description" : "desc","tradeDate" : [ 2016, 6, 6 ], "convention" : "EUR_FIXED_1Y_EURIBOR_3M", "startDate" : [ 2016, 6, 6 ], "endDate" : [ 2020, 1, 2 ], "buySell" : "BUY", "notional" : "1000", "fixedRate" : "0.1"}' \
http://localhost:10005/api/simmvaluationdemo/<<<INSERT BANK C ID HERE>>/trades
Once executed, the expected response is:
HTTP/1.1 202 Accepted HTTP/1.1 202 Accepted
Date: Thu, 28 Sep 2017 17:19:39 GMT Date: Thu, 28 Sep 2017 17:19:39 GMT
@ -141,7 +150,7 @@ With an expected response of
**Verifying trade completion** **Verifying trade completion**
With the trade completed and stored by both parties, the complete list of trades with our couterparty can be seen with the following command With the trade completed and stored by both parties, the complete list of trades with our counterparty can be seen with the following command
curl -X GET http://localhost:10005/api/simmvaluationdemo/<<<counter party id>>>/trades curl -X GET http://localhost:10005/api/simmvaluationdemo/<<<counter party id>>>/trades
@ -188,7 +197,7 @@ The specific command for out Bank A example is
This demo does not, however, include real SIMM valuation code but a stub for the OpenGamma set of libraries, so please do not base any financial decisions on results generated by this demo. This demo does not, however, include real SIMM valuation code but a stub for the OpenGamma set of libraries, so please do not base any financial decisions on results generated by this demo.
This demo was built in partnership with OpenGamma and used their SIMM library. However, due to licensing constraints we cannot distribute their library with this code. For this reason, we have stubbed out the relevant parts and replaced it with a very simplistic template that returns fake (but correctly structured) data. However, if you wish to use a realistic library, then please do get in touch with OpenGamma directly for access to their libraries and we will be happy to demonstrate how to replace the stub code. This demo was built in partnership with OpenGamma and used their SIMM library. However, due to licensing constraints we cannot distribute their library with this code. For this reason, we have stubbed out the relevant parts and replaced it with a very simplistic template that returns fake (but correctly structured) data. However, if you wish to use a realistic library, then please do get in touch with OpenGamma directly for access to their libraries and we will be happy to demonstrate how to replace the stub code.
## Troubleshooting ## Troubleshooting

View File

@ -297,11 +297,8 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
override val serializationWhitelists: List<SerializationWhitelist> override val serializationWhitelists: List<SerializationWhitelist>
get() = _serializationWhitelists get() = _serializationWhitelists
private var dbCloser: (() -> Any?)? = null private var dbCloser: (() -> Any?)? = null
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T { override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
return super.initialiseDatabasePersistence(schemaService, identityService) { database -> return super.initialiseDatabasePersistence(schemaService, identityService).also { dbCloser = it::close }
dbCloser = database::close
insideTransaction(database)
}
} }
fun disableDBCloseOnStop() { fun disableDBCloseOnStop() {