Unify messaging services to have a database and not support client type connections when they should use the RPC connections. Also, push NodeInfo across to the driver via the web interface to remove that use of startClient.

Fix typo
This commit is contained in:
Matthew Nesbit 2016-10-18 13:14:01 +01:00
parent aac64ecb37
commit 4db1836996
14 changed files with 215 additions and 192 deletions

View File

@ -14,7 +14,8 @@ import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.driver.driver
import com.r3corda.node.driver.startClient
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.config.configureWithDevSSLCertificate
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.transactions.SimpleNotaryService
@ -26,6 +27,8 @@ import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.Observer
import java.nio.file.Files
import java.nio.file.Path
import kotlin.concurrent.thread
class NodeMonitorModelTest {
@ -55,10 +58,19 @@ class NodeMonitorModelTest {
aliceNode = aliceNodeFuture.get()
notaryNode = notaryNodeFuture.get()
aliceClient = startClient(aliceNode).get()
newNode = { nodeName -> startNode(nodeName).get() }
val monitor = NodeMonitorModel()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath: Path = Files.createTempDirectory("certs")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
init {
configureWithDevSSLCertificate()
}
}
stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed()
stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed()
progressTracking = monitor.progressTracking.bufferUntilSubscribed()
@ -67,7 +79,7 @@ class NodeMonitorModelTest {
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
monitor.register(aliceNode, aliceClient.config.certificatesPath)
monitor.register(aliceNode, sslConfig.certificatesPath)
driverStarted.set(Unit)
stopDriver.get()

View File

@ -6,10 +6,13 @@ import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.explorer.model.IdentityModel
import com.r3corda.node.driver.PortAllocation
import com.r3corda.node.driver.driver
import com.r3corda.node.driver.startClient
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.config.configureWithDevSSLCertificate
import com.r3corda.node.services.transactions.SimpleNotaryService
import javafx.stage.Stage
import tornadofx.App
import java.nio.file.Files
import java.nio.file.Path
class Main : App() {
override val primaryView = MainWindow::class
@ -35,11 +38,19 @@ class Main : App() {
val aliceNode = aliceNodeFuture.get()
val notaryNode = notaryNodeFuture.get()
val aliceClient = startClient(aliceNode).get()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath: Path = Files.createTempDirectory("certs")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
init {
configureWithDevSSLCertificate()
}
}
Models.get<IdentityModel>(Main::class).notary.set(notaryNode.notaryIdentity)
Models.get<IdentityModel>(Main::class).myIdentity.set(aliceNode.legalIdentity)
Models.get<NodeMonitorModel>(Main::class).register(aliceNode, aliceClient.config.certificatesPath)
Models.get<NodeMonitorModel>(Main::class).register(aliceNode, sslConfig.certificatesPath)
startNode("Bob").get()

View File

@ -11,14 +11,8 @@ import org.junit.Test
class DriverTests {
companion object {
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
fun nodeMustBeUp(nodeInfo: NodeInfo, nodeName: String) {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the node is registered in the network map
poll("network map cache for $nodeName") {
networkMapCache.get().firstOrNull {
it.legalIdentity.name == nodeName
}
}
// Check that the port is bound
addressMustBeBound(hostAndPort)
}
@ -36,8 +30,8 @@ class DriverTests {
val notary = startNode("TestNotary", setOf(ServiceInfo(SimpleNotaryService.type)))
val regulator = startNode("Regulator", setOf(ServiceInfo(RegulatorService.type)))
nodeMustBeUp(networkMapCache, notary.get(), "TestNotary")
nodeMustBeUp(networkMapCache, regulator.get(), "Regulator")
nodeMustBeUp(notary.get(), "TestNotary")
nodeMustBeUp(regulator.get(), "Regulator")
Pair(notary.get(), regulator.get())
}
nodeMustBeDown(notary)
@ -48,7 +42,7 @@ class DriverTests {
fun startingNodeWithNoServicesWorks() {
val noService = driver {
val noService = startNode("NoService")
nodeMustBeUp(networkMapCache, noService.get(), "NoService")
nodeMustBeUp(noService.get(), "NoService")
noService.get()
}
nodeMustBeDown(noService)
@ -58,7 +52,7 @@ class DriverTests {
fun randomFreePortAllocationWorks() {
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
val nodeInfo = startNode("NoService")
nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService")
nodeMustBeUp(nodeInfo.get(), "NoService")
nodeInfo.get()
}
nodeMustBeDown(nodeInfo)

View File

@ -4,6 +4,7 @@ import com.r3corda.core.contracts.*
import com.r3corda.node.api.StatesQuery
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction
@ -41,6 +42,16 @@ interface APIServer {
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this nodes configuration and identities.
* Currently tunnels the NodeInfo as an encoding of the Kryo serialised form.
* TODO this functionality should be available via the RPC
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
/**
* Query your "local" states (containing only outputs involving you) and return the hashes & indexes associated with them
* to probably be later inflated by fetchLedgerTransactions() or fetchStates() although because immutable you can cache them

View File

@ -1,32 +1,30 @@
package com.r3corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.node.services.config.ConfigHelper
import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.JsonSupport
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.io.InputStreamReader
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.text.SimpleDateFormat
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.thread
/**
* This file defines a small "Driver" DSL for starting up nodes.
@ -56,31 +54,9 @@ interface DriverDSLExposedInterface {
*/
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceInfo> = setOf()): Future<NodeInfo>
/**
* Starts an [NodeMessagingClient].
*
* @param providedName name of the client, which will be used for creating its directory.
* @param serverAddress the artemis server to connect to, for example a [Node].
*/
fun startClient(providedName: String, serverAddress: HostAndPort): Future<NodeMessagingClient>
/**
* Starts a local [ArtemisMessagingServer] of which there may only be one.
*/
fun startLocalServer(): Future<ArtemisMessagingServer>
fun waitForAllNodesToFinish()
val networkMapCache: NetworkMapCache
}
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
startClient("driver-local-server-client", localServer.myHostPort)
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
startClient(
providedName = providedName ?: "${remoteNodeInfo.legalIdentity.name}-client",
serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address)
)
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start()
fun shutdown()
@ -222,11 +198,8 @@ class DriverDSL(
val baseDirectory: String,
val isDebug: Boolean
) : DriverDSLInternalInterface {
override val networkMapCache = InMemoryNetworkMapCache()
private val networkMapName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort()
private var networkMapNodeInfo: NodeInfo? = null
private val identity = generateKeyPair()
class State {
val registeredProcesses = LinkedList<Process>()
@ -284,6 +257,25 @@ class DriverDSL(
addressMustNotBeBound(networkMapAddress)
}
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
val url = URL("http://${webAddress.toString()}/api/info")
try {
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
if (conn.responseCode != 200) {
return null
}
// For now the NodeInfo is tunneled in its Kryo format over the Node's Web interface.
val om = ObjectMapper()
val module = SimpleModule("NodeInfo")
module.addDeserializer(NodeInfo::class.java, JsonSupport.NodeInfoDeserializer)
om.registerModule(module)
return om.readValue(conn.inputStream, NodeInfo::class.java)
} catch(e: Exception) {
return null
}
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>): Future<NodeInfo> {
val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
@ -307,88 +299,12 @@ class DriverDSL(
return Executors.newSingleThreadExecutor().submit(Callable<NodeInfo> {
registerProcess(DriverDSL.startNode(config, quasarJarPath, debugPort))
poll("network map cache for $name") {
networkMapCache.partyNodes.forEach {
if (it.legalIdentity.name == name) {
return@poll it
}
}
null
}
queryNodeInfo(apiAddress)!!
})
}
override fun startClient(
providedName: String,
serverAddress: HostAndPort
): Future<NodeMessagingClient> {
val nodeConfiguration = FullNodeConfiguration(
ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, providedName),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to providedName
)
)
)
val client = NodeMessagingClient(nodeConfiguration,
serverHostPort = serverAddress,
myIdentity = identity.public,
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1),
persistentInbox = false // Do not create a permanent queue for our transient UI identity
)
return Executors.newSingleThreadExecutor().submit(Callable<NodeMessagingClient> {
client.configureWithDevSSLCertificate()
client.start(null)
thread { client.run() }
state.locked {
clients.add(client)
}
client
})
}
override fun startLocalServer(): Future<ArtemisMessagingServer> {
val name = "driver-local-server"
val config = FullNodeConfiguration(
ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, name),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to name
)
)
)
val server = ArtemisMessagingServer(config,
portAllocation.nextHostAndPort(),
networkMapCache
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
server.configureWithDevSSLCertificate()
server.start()
state.locked {
localServer = server
}
server
})
}
override fun start() {
startNetworkMapService()
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get()
val networkMapAddr = NodeMessagingClient.makeNetworkMapAddress(networkMapAddress)
networkMapCache.addMapService(networkMapClient, networkMapAddr, true)
networkMapNodeInfo = poll("network map cache for $networkMapName") {
networkMapCache.partyNodes.forEach {
if (it.legalIdentity.name == networkMapName) {
return@poll it
}
}
null
}
}
private fun startNetworkMapService() {

View File

@ -24,6 +24,8 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
}
}
override fun info() = node.services.myInfo
override fun queryStates(query: StatesQuery): List<StateRef> {
// We're going to hard code two options here for now and assume that all LinearStates are deals
// Would like to maybe move to a model where we take something like a JEXL string, although don't want to develop

View File

@ -435,7 +435,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps?)
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps)
protected open fun initialiseCheckpointService(dir: Path): CheckpointStorage {
return DBCheckpointStorage()

View File

@ -4,6 +4,7 @@ import com.codahale.metrics.JmxReporter
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.core.then
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
@ -119,11 +120,10 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
}
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread,
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } })
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database)
}
override fun startMessagingService(cordaRPCOps: CordaRPCOps?) {
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
// Start up the embedded MQ server
messageBroker?.apply {
runOnStop += Runnable { messageBroker?.stop() }
@ -268,23 +268,25 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
webServer = initWebServer()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("com.r3cev.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
// Only start the service API requests once the network map registration is complete
networkMapRegistrationFuture.then {
webServer = initWebServer()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("com.r3cev.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownThread = thread(start = false) {
stop()
}

View File

@ -13,6 +13,7 @@ import com.r3corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.nio.file.FileSystems
@ -53,8 +54,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val executor: AffinityExecutor,
val persistentInbox: Boolean = true,
val persistenceTx: (() -> Unit) -> Unit = { it() }) : ArtemisMessagingComponent(config), MessagingServiceInternal {
val database: Database) : ArtemisMessagingComponent(config), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
@ -106,23 +106,20 @@ class NodeMessagingClient(config: NodeConfiguration,
val uuid = uuidString("message_id")
}
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(if (persistentInbox) {
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
}
} else {
HashSet<UUID>()
})
})
init {
require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
fun start(rpcOps: CordaRPCOps? = null) {
fun start(rpcOps: CordaRPCOps) {
state.locked {
check(!started) { "start can't be called twice" }
started = true
@ -146,7 +143,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val queueName = toQueueName(myAddress)
val query = session.queueQuery(queueName)
if (!query.isExists) {
session.createQueue(queueName, queueName, persistentInbox)
session.createQueue(queueName, queueName, true)
}
knownQueues.add(queueName)
p2pConsumer = session.createConsumer(queueName)
@ -154,13 +151,11 @@ class NodeMessagingClient(config: NodeConfiguration,
// Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything
// and perhaps we should, but these queues are not worth persisting.
if (rpcOps != null) {
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
}
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
}
}
@ -277,7 +272,7 @@ class NodeMessagingClient(config: NodeConfiguration,
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
// start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom {
persistenceTx {
databaseTransaction(database) {
callHandlers(msg, deliverTo)
}
}

View File

@ -11,7 +11,10 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.r3corda.core.contracts.BusinessCalendar
import com.r3corda.core.crypto.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import net.i2p.crypto.eddsa.EdDSAPublicKey
import java.math.BigDecimal
import java.time.LocalDate
@ -54,6 +57,11 @@ object JsonSupport {
cordaModule.addSerializer(PublicKeyTree::class.java, PublicKeyTreeSerializer)
cordaModule.addDeserializer(PublicKeyTree::class.java, PublicKeyTreeDeserializer)
// For NodeInfo
// TODO this tunnels the Kryo representation as a Base58 encoded string. Replace when RPC supports this.
cordaModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
cordaModule.addDeserializer(NodeInfo::class.java, NodeInfoDeserializer)
mapper.registerModule(timeModule)
mapper.registerModule(cordaModule)
mapper.registerModule(KotlinModule())
@ -102,6 +110,25 @@ object JsonSupport {
}
}
object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
override fun serialize(value: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString(Base58.encode(value.serialize().bits))
}
}
object NodeInfoDeserializer : JsonDeserializer<NodeInfo>() {
override fun deserialize(parser: JsonParser, context: DeserializationContext): NodeInfo {
if (parser.currentToken == JsonToken.FIELD_NAME) {
parser.nextToken()
}
try {
return Base58.decode(parser.text).deserialize<NodeInfo>()
} catch (e: Exception) {
throw JsonParseException(parser, "Invalid NodeInfo ${parser.text}: ${e.message}")
}
}
}
object SecureHashSerializer : JsonSerializer<SecureHash>() {
override fun serialize(obj: SecureHash, generator: JsonGenerator, provider: SerializerProvider) {
generator.writeString(obj.toString())

View File

@ -1,22 +1,37 @@
package com.r3corda.node.services
import com.google.common.net.HostAndPort
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.LogHelper
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.*
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.configureDatabase
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.freeLocalHostAndPort
import com.r3corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable
import java.io.Closeable
import java.net.ServerSocket
import java.nio.file.Path
import java.util.concurrent.LinkedBlockingQueue
@ -33,12 +48,44 @@ class ArtemisMessagingTests {
val identity = generateKeyPair()
lateinit var config: NodeConfiguration
lateinit var dataSource: Closeable
lateinit var database: Database
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
val networkMapCache = InMemoryNetworkMapCache()
val rpcOps = object : CordaRPCOps {
override val protocolVersion: Int
get() = throw UnsupportedOperationException()
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
}
@Before
fun setUp() {
// TODO: create a base class that provides a default implementation
@ -52,12 +99,18 @@ class ArtemisMessagingTests {
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
}
@After
fun cleanUp() {
messagingClient?.stop()
messagingServer?.stop()
dataSource.close()
LogHelper.reset(PersistentUniquenessProvider::class)
}
@Test
@ -73,7 +126,7 @@ class ArtemisMessagingTests {
val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start(rpcOps)
}
@Test
@ -84,14 +137,14 @@ class ArtemisMessagingTests {
createMessagingServer(serverAddress).start()
messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient!!.start() }
assertThatThrownBy { messagingClient!!.start(rpcOps) }
messagingClient = null
}
@Test
fun `client should connect to local server`() {
createMessagingServer().start()
createMessagingClient().start()
createMessagingClient().start(rpcOps)
}
@Test
@ -101,7 +154,7 @@ class ArtemisMessagingTests {
createMessagingServer().start()
val messagingClient = createMessagingClient()
messagingClient.start()
messagingClient.start(rpcOps)
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r ->
@ -117,9 +170,11 @@ class ArtemisMessagingTests {
}
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), false).apply {
configureWithDevSSLCertificate()
messagingClient = this
return databaseTransaction(database) {
NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply {
configureWithDevSSLCertificate()
messagingClient = this
}
}
}

View File

@ -52,8 +52,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = ProtocolLogicRefFactory(mapOf(Pair(TestProtocolLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
val services: MockServiceHubInternal
lateinit var services: MockServiceHubInternal
lateinit var scheduler: NodeSchedulerService
lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
lateinit var dataSource: Closeable
@ -72,13 +71,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val testReference: NodeSchedulerServiceTest
}
init {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), persistenceTx = { it() })
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
}
@Before
fun setup() {
@ -89,6 +81,11 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
databaseTransaction(database) {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)

View File

@ -9,7 +9,9 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.MessagingServiceBuilder
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory
import rx.Observable
import rx.subjects.PublishSubject
@ -80,10 +82,10 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
@Synchronized
fun createNode(manuallyPumped: Boolean,
executor: AffinityExecutor,
persistenceTx: (() -> Unit) -> Unit)
database: Database)
: Pair<Handle, com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging>> {
check(counter >= 0) { "In memory network stopped: please recreate." }
val builder = createNodeWithID(manuallyPumped, counter, executor, persistenceTx = persistenceTx) as Builder
val builder = createNodeWithID(manuallyPumped, counter, executor, database = database) as Builder
counter++
val id = builder.id
return Pair(id, builder)
@ -98,9 +100,9 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary.
*/
fun createNodeWithID(manuallyPumped: Boolean, id: Int, executor: AffinityExecutor, description: String? = null,
persistenceTx: (() -> Unit) -> Unit)
database: Database)
: MessagingServiceBuilder<InMemoryMessaging> {
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, persistenceTx)
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, database = database)
}
interface LatencyCalculator {
@ -140,11 +142,11 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
messageReceiveQueues.clear()
}
inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val persistenceTx: (() -> Unit) -> Unit)
inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val database: Database)
: com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging> {
override fun start(): ListenableFuture<InMemoryMessaging> {
synchronized(this@InMemoryMessagingNetwork) {
val node = InMemoryMessaging(manuallyPumped, id, executor, persistenceTx)
val node = InMemoryMessaging(manuallyPumped, id, executor, database)
handleEndpointMap[id] = node
return Futures.immediateFuture(node)
}
@ -208,7 +210,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val handle: Handle,
private val executor: AffinityExecutor,
private val persistenceTx: (() -> Unit) -> Unit)
private val database: Database)
: SingletonSerializeAsToken(), com.r3corda.node.services.api.MessagingServiceInternal {
inner class Handler(val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -348,7 +350,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
if (transfer.message.uniqueMessageId !in processedMessages) {
executor.execute {
persistenceTx {
databaseTransaction(database) {
for (handler in deliverTo) {
try {
handler.callback(transfer.message, handler)

View File

@ -125,8 +125,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingServiceInternal {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName,
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } }).start().get()
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, database).start().get()
}
override fun initialiseCheckpointService(dir: Path): CheckpointStorage {
@ -143,7 +142,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
override fun makeKeyManagementService(): KeyManagementService = E2ETestKeyManagementService(partyKeys)
override fun startMessagingService(cordaRPCOps: CordaRPCOps?) {
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
// Nothing to do
}