diff --git a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt index 21366eea14..fc115bfbe9 100644 --- a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt @@ -14,7 +14,8 @@ import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.core.transactions.SignedTransaction import com.r3corda.node.driver.driver -import com.r3corda.node.driver.startClient +import com.r3corda.node.services.config.NodeSSLConfiguration +import com.r3corda.node.services.config.configureWithDevSSLCertificate import com.r3corda.node.services.messaging.NodeMessagingClient import com.r3corda.node.services.messaging.StateMachineUpdate import com.r3corda.node.services.transactions.SimpleNotaryService @@ -26,6 +27,8 @@ import org.junit.Before import org.junit.Test import rx.Observable import rx.Observer +import java.nio.file.Files +import java.nio.file.Path import kotlin.concurrent.thread class NodeMonitorModelTest { @@ -55,10 +58,19 @@ class NodeMonitorModelTest { aliceNode = aliceNodeFuture.get() notaryNode = notaryNodeFuture.get() - aliceClient = startClient(aliceNode).get() newNode = { nodeName -> startNode(nodeName).get() } val monitor = NodeMonitorModel() + val sslConfig = object : NodeSSLConfiguration { + override val certificatesPath: Path = Files.createTempDirectory("certs") + override val keyStorePassword = "cordacadevpass" + override val trustStorePassword = "trustpass" + + init { + configureWithDevSSLCertificate() + } + } + stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed() stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed() progressTracking = monitor.progressTracking.bufferUntilSubscribed() @@ -67,7 +79,7 @@ class NodeMonitorModelTest { networkMapUpdates = monitor.networkMap.bufferUntilSubscribed() clientToService = monitor.clientToService - monitor.register(aliceNode, aliceClient.config.certificatesPath) + monitor.register(aliceNode, sslConfig.certificatesPath) driverStarted.set(Unit) stopDriver.get() diff --git a/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt b/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt index d4702fcd38..67bb8cf6de 100644 --- a/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt +++ b/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt @@ -6,10 +6,13 @@ import com.r3corda.core.node.services.ServiceInfo import com.r3corda.explorer.model.IdentityModel import com.r3corda.node.driver.PortAllocation import com.r3corda.node.driver.driver -import com.r3corda.node.driver.startClient +import com.r3corda.node.services.config.NodeSSLConfiguration +import com.r3corda.node.services.config.configureWithDevSSLCertificate import com.r3corda.node.services.transactions.SimpleNotaryService import javafx.stage.Stage import tornadofx.App +import java.nio.file.Files +import java.nio.file.Path class Main : App() { override val primaryView = MainWindow::class @@ -35,11 +38,19 @@ class Main : App() { val aliceNode = aliceNodeFuture.get() val notaryNode = notaryNodeFuture.get() - val aliceClient = startClient(aliceNode).get() + val sslConfig = object : NodeSSLConfiguration { + override val certificatesPath: Path = Files.createTempDirectory("certs") + override val keyStorePassword = "cordacadevpass" + override val trustStorePassword = "trustpass" + + init { + configureWithDevSSLCertificate() + } + } Models.get(Main::class).notary.set(notaryNode.notaryIdentity) Models.get(Main::class).myIdentity.set(aliceNode.legalIdentity) - Models.get(Main::class).register(aliceNode, aliceClient.config.certificatesPath) + Models.get(Main::class).register(aliceNode, sslConfig.certificatesPath) startNode("Bob").get() diff --git a/node/src/integration-test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/integration-test/kotlin/com/r3corda/node/driver/DriverTests.kt index 28799d4b7e..5bc4efe046 100644 --- a/node/src/integration-test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/integration-test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -11,14 +11,8 @@ import org.junit.Test class DriverTests { companion object { - fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) { + fun nodeMustBeUp(nodeInfo: NodeInfo, nodeName: String) { val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) - // Check that the node is registered in the network map - poll("network map cache for $nodeName") { - networkMapCache.get().firstOrNull { - it.legalIdentity.name == nodeName - } - } // Check that the port is bound addressMustBeBound(hostAndPort) } @@ -36,8 +30,8 @@ class DriverTests { val notary = startNode("TestNotary", setOf(ServiceInfo(SimpleNotaryService.type))) val regulator = startNode("Regulator", setOf(ServiceInfo(RegulatorService.type))) - nodeMustBeUp(networkMapCache, notary.get(), "TestNotary") - nodeMustBeUp(networkMapCache, regulator.get(), "Regulator") + nodeMustBeUp(notary.get(), "TestNotary") + nodeMustBeUp(regulator.get(), "Regulator") Pair(notary.get(), regulator.get()) } nodeMustBeDown(notary) @@ -48,7 +42,7 @@ class DriverTests { fun startingNodeWithNoServicesWorks() { val noService = driver { val noService = startNode("NoService") - nodeMustBeUp(networkMapCache, noService.get(), "NoService") + nodeMustBeUp(noService.get(), "NoService") noService.get() } nodeMustBeDown(noService) @@ -58,7 +52,7 @@ class DriverTests { fun randomFreePortAllocationWorks() { val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) { val nodeInfo = startNode("NoService") - nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService") + nodeMustBeUp(nodeInfo.get(), "NoService") nodeInfo.get() } nodeMustBeDown(nodeInfo) diff --git a/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt b/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt index ef87de66cf..1245d7c846 100644 --- a/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt +++ b/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt @@ -4,6 +4,7 @@ import com.r3corda.core.contracts.* import com.r3corda.node.api.StatesQuery import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.NodeInfo import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.WireTransaction @@ -41,6 +42,16 @@ interface APIServer { @Produces(MediaType.TEXT_PLAIN) fun status(): Response + /** + * Report this nodes configuration and identities. + * Currently tunnels the NodeInfo as an encoding of the Kryo serialised form. + * TODO this functionality should be available via the RPC + */ + @GET + @Path("info") + @Produces(MediaType.APPLICATION_JSON) + fun info(): NodeInfo + /** * Query your "local" states (containing only outputs involving you) and return the hashes & indexes associated with them * to probably be later inflated by fetchLedgerTransactions() or fetchStates() although because immutable you can cache them diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index ac228d1c61..f19c168a5a 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -1,32 +1,30 @@ package com.r3corda.node.driver +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.module.SimpleModule import com.google.common.net.HostAndPort import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.Party -import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.node.NodeInfo -import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceInfo import com.r3corda.node.services.config.ConfigHelper import com.r3corda.node.services.config.FullNodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.services.messaging.NodeMessagingClient -import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService -import com.r3corda.node.utilities.AffinityExecutor +import com.r3corda.node.utilities.JsonSupport import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions import org.slf4j.Logger import org.slf4j.LoggerFactory import java.io.File +import java.io.InputStreamReader import java.net.* import java.nio.file.Path import java.nio.file.Paths import java.text.SimpleDateFormat import java.util.* import java.util.concurrent.* -import kotlin.concurrent.thread /** * This file defines a small "Driver" DSL for starting up nodes. @@ -56,31 +54,9 @@ interface DriverDSLExposedInterface { */ fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): Future - /** - * Starts an [NodeMessagingClient]. - * - * @param providedName name of the client, which will be used for creating its directory. - * @param serverAddress the artemis server to connect to, for example a [Node]. - */ - fun startClient(providedName: String, serverAddress: HostAndPort): Future - - /** - * Starts a local [ArtemisMessagingServer] of which there may only be one. - */ - fun startLocalServer(): Future fun waitForAllNodesToFinish() - val networkMapCache: NetworkMapCache } -fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) = - startClient("driver-local-server-client", localServer.myHostPort) - -fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) = - startClient( - providedName = providedName ?: "${remoteNodeInfo.legalIdentity.name}-client", - serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address) - ) - interface DriverDSLInternalInterface : DriverDSLExposedInterface { fun start() fun shutdown() @@ -222,11 +198,8 @@ class DriverDSL( val baseDirectory: String, val isDebug: Boolean ) : DriverDSLInternalInterface { - override val networkMapCache = InMemoryNetworkMapCache() private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() - private var networkMapNodeInfo: NodeInfo? = null - private val identity = generateKeyPair() class State { val registeredProcesses = LinkedList() @@ -284,6 +257,25 @@ class DriverDSL( addressMustNotBeBound(networkMapAddress) } + private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? { + val url = URL("http://${webAddress.toString()}/api/info") + try { + val conn = url.openConnection() as HttpURLConnection + conn.requestMethod = "GET" + if (conn.responseCode != 200) { + return null + } + // For now the NodeInfo is tunneled in its Kryo format over the Node's Web interface. + val om = ObjectMapper() + val module = SimpleModule("NodeInfo") + module.addDeserializer(NodeInfo::class.java, JsonSupport.NodeInfoDeserializer) + om.registerModule(module) + return om.readValue(conn.inputStream, NodeInfo::class.java) + } catch(e: Exception) { + return null + } + } + override fun startNode(providedName: String?, advertisedServices: Set): Future { val messagingAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort() @@ -307,88 +299,12 @@ class DriverDSL( return Executors.newSingleThreadExecutor().submit(Callable { registerProcess(DriverDSL.startNode(config, quasarJarPath, debugPort)) - poll("network map cache for $name") { - networkMapCache.partyNodes.forEach { - if (it.legalIdentity.name == name) { - return@poll it - } - } - null - } + queryNodeInfo(apiAddress)!! }) } - override fun startClient( - providedName: String, - serverAddress: HostAndPort - ): Future { - - val nodeConfiguration = FullNodeConfiguration( - ConfigHelper.loadConfig( - baseDirectoryPath = Paths.get(baseDirectory, providedName), - allowMissingConfig = true, - configOverrides = mapOf( - "myLegalName" to providedName - ) - ) - ) - val client = NodeMessagingClient(nodeConfiguration, - serverHostPort = serverAddress, - myIdentity = identity.public, - executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1), - persistentInbox = false // Do not create a permanent queue for our transient UI identity - ) - - return Executors.newSingleThreadExecutor().submit(Callable { - client.configureWithDevSSLCertificate() - client.start(null) - thread { client.run() } - state.locked { - clients.add(client) - } - client - }) - } - - override fun startLocalServer(): Future { - val name = "driver-local-server" - val config = FullNodeConfiguration( - ConfigHelper.loadConfig( - baseDirectoryPath = Paths.get(baseDirectory, name), - allowMissingConfig = true, - configOverrides = mapOf( - "myLegalName" to name - ) - ) - ) - val server = ArtemisMessagingServer(config, - portAllocation.nextHostAndPort(), - networkMapCache - ) - return Executors.newSingleThreadExecutor().submit(Callable { - server.configureWithDevSSLCertificate() - server.start() - state.locked { - localServer = server - } - server - }) - } - - override fun start() { startNetworkMapService() - val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get() - val networkMapAddr = NodeMessagingClient.makeNetworkMapAddress(networkMapAddress) - networkMapCache.addMapService(networkMapClient, networkMapAddr, true) - networkMapNodeInfo = poll("network map cache for $networkMapName") { - networkMapCache.partyNodes.forEach { - if (it.legalIdentity.name == networkMapName) { - return@poll it - } - } - null - } } private fun startNetworkMapService() { diff --git a/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt b/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt index 1b90f63ff7..1d78ab6263 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt @@ -24,6 +24,8 @@ class APIServerImpl(val node: AbstractNode) : APIServer { } } + override fun info() = node.services.myInfo + override fun queryStates(query: StatesQuery): List { // We're going to hard code two options here for now and assume that all LinearStates are deals // Would like to maybe move to a model where we take something like a JEXL string, although don't want to develop diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 28bbd44fff..d2cc9c71cf 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -435,7 +435,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo protected abstract fun makeMessagingService(): MessagingServiceInternal - protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps?) + protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps) protected open fun initialiseCheckpointService(dir: Path): CheckpointStorage { return DBCheckpointStorage() diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index b86dd5b92d..7dcd08d96d 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -4,6 +4,7 @@ import com.codahale.metrics.JmxReporter import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceInfo +import com.r3corda.core.then import com.r3corda.core.utilities.loggerFor import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MessagingServiceInternal @@ -119,11 +120,10 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: } val legalIdentity = obtainLegalIdentity() val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null - return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, - persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } }) + return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database) } - override fun startMessagingService(cordaRPCOps: CordaRPCOps?) { + override fun startMessagingService(cordaRPCOps: CordaRPCOps) { // Start up the embedded MQ server messageBroker?.apply { runOnStop += Runnable { messageBroker?.stop() } @@ -268,23 +268,25 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: override fun start(): Node { alreadyRunningNodeCheck() super.start() - webServer = initWebServer() - // Begin exporting our own metrics via JMX. - JmxReporter. - forRegistry(services.monitoringService.metrics). - inDomain("com.r3cev.corda"). - createsObjectNamesWith { type, domain, name -> - // Make the JMX hierarchy a bit better organised. - val category = name.substringBefore('.') - val subName = name.substringAfter('.', "") - if (subName == "") - ObjectName("$domain:name=$category") - else - ObjectName("$domain:type=$category,name=$subName") - }. - build(). - start() - + // Only start the service API requests once the network map registration is complete + networkMapRegistrationFuture.then { + webServer = initWebServer() + // Begin exporting our own metrics via JMX. + JmxReporter. + forRegistry(services.monitoringService.metrics). + inDomain("com.r3cev.corda"). + createsObjectNamesWith { type, domain, name -> + // Make the JMX hierarchy a bit better organised. + val category = name.substringBefore('.') + val subName = name.substringAfter('.', "") + if (subName == "") + ObjectName("$domain:name=$category") + else + ObjectName("$domain:type=$category,name=$subName") + }. + build(). + start() + } shutdownThread = thread(start = false) { stop() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt index 3fce883218..d69663bc53 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt @@ -13,6 +13,7 @@ import com.r3corda.node.utilities.* import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* +import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement import java.nio.file.FileSystems @@ -53,8 +54,7 @@ class NodeMessagingClient(config: NodeConfiguration, val serverHostPort: HostAndPort, val myIdentity: PublicKey?, val executor: AffinityExecutor, - val persistentInbox: Boolean = true, - val persistenceTx: (() -> Unit) -> Unit = { it() }) : ArtemisMessagingComponent(config), MessagingServiceInternal { + val database: Database) : ArtemisMessagingComponent(config), MessagingServiceInternal { companion object { val log = loggerFor() @@ -106,23 +106,20 @@ class NodeMessagingClient(config: NodeConfiguration, val uuid = uuidString("message_id") } - private val processedMessages: MutableSet = Collections.synchronizedSet(if (persistentInbox) { + private val processedMessages: MutableSet = Collections.synchronizedSet( object : AbstractJDBCHashSet(Table, loadOnInit = true) { override fun elementFromRow(row: ResultRow): UUID = row[table.uuid] override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) { insert[table.uuid] = entry } - } - } else { - HashSet() - }) + }) init { require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } } - fun start(rpcOps: CordaRPCOps? = null) { + fun start(rpcOps: CordaRPCOps) { state.locked { check(!started) { "start can't be called twice" } started = true @@ -146,7 +143,7 @@ class NodeMessagingClient(config: NodeConfiguration, val queueName = toQueueName(myAddress) val query = session.queueQuery(queueName) if (!query.isExists) { - session.createQueue(queueName, queueName, persistentInbox) + session.createQueue(queueName, queueName, true) } knownQueues.add(queueName) p2pConsumer = session.createConsumer(queueName) @@ -154,13 +151,11 @@ class NodeMessagingClient(config: NodeConfiguration, // Create an RPC queue and consumer: this will service locally connected clients only (not via a // bridge) and those clients must have authenticated. We could use a single consumer for everything // and perhaps we should, but these queues are not worth persisting. - if (rpcOps != null) { - session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE) - session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1") - rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) - rpcNotificationConsumer = session.createConsumer("rpc.qremovals") - dispatcher = createRPCDispatcher(state, rpcOps) - } + session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE) + session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1") + rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) + rpcNotificationConsumer = session.createConsumer("rpc.qremovals") + dispatcher = createRPCDispatcher(state, rpcOps) } } @@ -277,7 +272,7 @@ class NodeMessagingClient(config: NodeConfiguration, // Note that handlers may re-enter this class. We aren't holding any locks and methods like // start/run/stop have re-entrancy assertions at the top, so it is OK. executor.fetchFrom { - persistenceTx { + databaseTransaction(database) { callHandlers(msg, deliverTo) } } diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/JsonSupport.kt b/node/src/main/kotlin/com/r3corda/node/utilities/JsonSupport.kt index 07c492b0f2..5ec7b65902 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/JsonSupport.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/JsonSupport.kt @@ -11,7 +11,10 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.module.kotlin.KotlinModule import com.r3corda.core.contracts.BusinessCalendar import com.r3corda.core.crypto.* +import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.IdentityService +import com.r3corda.core.serialization.deserialize +import com.r3corda.core.serialization.serialize import net.i2p.crypto.eddsa.EdDSAPublicKey import java.math.BigDecimal import java.time.LocalDate @@ -54,6 +57,11 @@ object JsonSupport { cordaModule.addSerializer(PublicKeyTree::class.java, PublicKeyTreeSerializer) cordaModule.addDeserializer(PublicKeyTree::class.java, PublicKeyTreeDeserializer) + // For NodeInfo + // TODO this tunnels the Kryo representation as a Base58 encoded string. Replace when RPC supports this. + cordaModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer) + cordaModule.addDeserializer(NodeInfo::class.java, NodeInfoDeserializer) + mapper.registerModule(timeModule) mapper.registerModule(cordaModule) mapper.registerModule(KotlinModule()) @@ -102,6 +110,25 @@ object JsonSupport { } } + object NodeInfoSerializer : JsonSerializer() { + override fun serialize(value: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) { + gen.writeString(Base58.encode(value.serialize().bits)) + } + } + + object NodeInfoDeserializer : JsonDeserializer() { + override fun deserialize(parser: JsonParser, context: DeserializationContext): NodeInfo { + if (parser.currentToken == JsonToken.FIELD_NAME) { + parser.nextToken() + } + try { + return Base58.decode(parser.text).deserialize() + } catch (e: Exception) { + throw JsonParseException(parser, "Invalid NodeInfo ${parser.text}: ${e.message}") + } + } + } + object SecureHashSerializer : JsonSerializer() { override fun serialize(obj: SecureHash, generator: JsonGenerator, provider: SerializerProvider) { generator.writeString(obj.toString()) diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index 4814c44fcf..9d23a2b21e 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -1,22 +1,37 @@ package com.r3corda.node.services import com.google.common.net.HostAndPort +import com.r3corda.core.contracts.ClientToServiceCommand +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.StateAndRef import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.createMessage +import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID +import com.r3corda.core.node.services.NetworkMapCache +import com.r3corda.core.node.services.StateMachineTransactionMapping +import com.r3corda.core.node.services.Vault +import com.r3corda.core.transactions.SignedTransaction +import com.r3corda.core.utilities.LogHelper import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingServer -import com.r3corda.node.services.messaging.NodeMessagingClient +import com.r3corda.node.services.messaging.* import com.r3corda.node.services.network.InMemoryNetworkMapCache +import com.r3corda.node.services.transactions.PersistentUniquenessProvider import com.r3corda.node.utilities.AffinityExecutor +import com.r3corda.node.utilities.configureDatabase +import com.r3corda.node.utilities.databaseTransaction import com.r3corda.testing.freeLocalHostAndPort +import com.r3corda.testing.node.makeTestDataSourceProperties import org.assertj.core.api.Assertions.assertThatThrownBy +import org.jetbrains.exposed.sql.Database import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import rx.Observable +import java.io.Closeable import java.net.ServerSocket import java.nio.file.Path import java.util.concurrent.LinkedBlockingQueue @@ -33,12 +48,44 @@ class ArtemisMessagingTests { val identity = generateKeyPair() lateinit var config: NodeConfiguration + lateinit var dataSource: Closeable + lateinit var database: Database var messagingClient: NodeMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null val networkMapCache = InMemoryNetworkMapCache() + val rpcOps = object : CordaRPCOps { + override val protocolVersion: Int + get() = throw UnsupportedOperationException() + + override fun stateMachinesAndUpdates(): Pair, Observable> { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun vaultAndUpdates(): Pair>, Observable> { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun verifiedTransactions(): Pair, Observable> { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun stateMachineRecordedTransactionMapping(): Pair, Observable> { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun networkMapUpdates(): Pair, Observable> { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + } + @Before fun setUp() { // TODO: create a base class that provides a default implementation @@ -52,12 +99,18 @@ class ArtemisMessagingTests { override val keyStorePassword: String = "testpass" override val trustStorePassword: String = "trustpass" } + LogHelper.setLevel(PersistentUniquenessProvider::class) + val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) + dataSource = dataSourceAndDatabase.first + database = dataSourceAndDatabase.second } @After fun cleanUp() { messagingClient?.stop() messagingServer?.stop() + dataSource.close() + LogHelper.reset(PersistentUniquenessProvider::class) } @Test @@ -73,7 +126,7 @@ class ArtemisMessagingTests { val remoteServerAddress = freeLocalHostAndPort() createMessagingServer(remoteServerAddress).start() - createMessagingClient(server = remoteServerAddress).start() + createMessagingClient(server = remoteServerAddress).start(rpcOps) } @Test @@ -84,14 +137,14 @@ class ArtemisMessagingTests { createMessagingServer(serverAddress).start() messagingClient = createMessagingClient(server = invalidServerAddress) - assertThatThrownBy { messagingClient!!.start() } + assertThatThrownBy { messagingClient!!.start(rpcOps) } messagingClient = null } @Test fun `client should connect to local server`() { createMessagingServer().start() - createMessagingClient().start() + createMessagingClient().start(rpcOps) } @Test @@ -101,7 +154,7 @@ class ArtemisMessagingTests { createMessagingServer().start() val messagingClient = createMessagingClient() - messagingClient.start() + messagingClient.start(rpcOps) thread { messagingClient.run() } messagingClient.addMessageHandler(topic) { message, r -> @@ -117,9 +170,11 @@ class ArtemisMessagingTests { } private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { - return NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), false).apply { - configureWithDevSSLCertificate() - messagingClient = this + return databaseTransaction(database) { + NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply { + configureWithDevSSLCertificate() + messagingClient = this + } } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt index e30423c92d..48c7b24d7b 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt @@ -52,8 +52,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") val factory = ProtocolLogicRefFactory(mapOf(Pair(TestProtocolLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name)))) - val services: MockServiceHubInternal - + lateinit var services: MockServiceHubInternal lateinit var scheduler: NodeSchedulerService lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor lateinit var dataSource: Closeable @@ -72,13 +71,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { val testReference: NodeSchedulerServiceTest } - init { - val kms = MockKeyManagementService(ALICE_KEY) - val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), persistenceTx = { it() }) - services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference { - override val testReference = this@NodeSchedulerServiceTest - } - } @Before fun setup() { @@ -89,6 +81,11 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { dataSource = dataSourceAndDatabase.first database = dataSourceAndDatabase.second databaseTransaction(database) { + val kms = MockKeyManagementService(ALICE_KEY) + val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database) + services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference { + override val testReference = this@NodeSchedulerServiceTest + } scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor) smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database) diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt index 92f946c307..66c5a3a248 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt @@ -9,7 +9,9 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.trace import com.r3corda.node.services.api.MessagingServiceBuilder import com.r3corda.node.utilities.AffinityExecutor +import com.r3corda.node.utilities.databaseTransaction import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging +import org.jetbrains.exposed.sql.Database import org.slf4j.LoggerFactory import rx.Observable import rx.subjects.PublishSubject @@ -80,10 +82,10 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria @Synchronized fun createNode(manuallyPumped: Boolean, executor: AffinityExecutor, - persistenceTx: (() -> Unit) -> Unit) + database: Database) : Pair> { check(counter >= 0) { "In memory network stopped: please recreate." } - val builder = createNodeWithID(manuallyPumped, counter, executor, persistenceTx = persistenceTx) as Builder + val builder = createNodeWithID(manuallyPumped, counter, executor, database = database) as Builder counter++ val id = builder.id return Pair(id, builder) @@ -98,9 +100,9 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria * @param persistenceTx a lambda to wrap message handling in a transaction if necessary. */ fun createNodeWithID(manuallyPumped: Boolean, id: Int, executor: AffinityExecutor, description: String? = null, - persistenceTx: (() -> Unit) -> Unit) + database: Database) : MessagingServiceBuilder { - return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, persistenceTx) + return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, database = database) } interface LatencyCalculator { @@ -140,11 +142,11 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria messageReceiveQueues.clear() } - inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val persistenceTx: (() -> Unit) -> Unit) + inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val database: Database) : com.r3corda.node.services.api.MessagingServiceBuilder { override fun start(): ListenableFuture { synchronized(this@InMemoryMessagingNetwork) { - val node = InMemoryMessaging(manuallyPumped, id, executor, persistenceTx) + val node = InMemoryMessaging(manuallyPumped, id, executor, database) handleEndpointMap[id] = node return Futures.immediateFuture(node) } @@ -208,7 +210,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle, private val executor: AffinityExecutor, - private val persistenceTx: (() -> Unit) -> Unit) + private val database: Database) : SingletonSerializeAsToken(), com.r3corda.node.services.api.MessagingServiceInternal { inner class Handler(val topicSession: TopicSession, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @@ -348,7 +350,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria if (transfer.message.uniqueMessageId !in processedMessages) { executor.execute { - persistenceTx { + databaseTransaction(database) { for (handler in deliverTo) { try { handler.callback(transfer.message, handler) diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt index 537a9c192f..18b08d6c1e 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt @@ -125,8 +125,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // through the java.nio API which we are already mocking via Jimfs. override fun makeMessagingService(): MessagingServiceInternal { require(id >= 0) { "Node ID must be zero or positive, was passed: " + id } - return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, - persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } }).start().get() + return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, database).start().get() } override fun initialiseCheckpointService(dir: Path): CheckpointStorage { @@ -143,7 +142,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun makeKeyManagementService(): KeyManagementService = E2ETestKeyManagementService(partyKeys) - override fun startMessagingService(cordaRPCOps: CordaRPCOps?) { + override fun startMessagingService(cordaRPCOps: CordaRPCOps) { // Nothing to do }