Merged in mnesbit-cor-389-driver-remove-startClient (pull request #409)

Unify messaging services to have a database and not support client type connections.
This commit is contained in:
Matthew Nesbit 2016-10-19 14:09:50 +00:00
commit 8eee4afe7d
14 changed files with 215 additions and 192 deletions

View File

@ -14,7 +14,8 @@ import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.driver.driver
import com.r3corda.node.driver.startClient
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.config.configureWithDevSSLCertificate
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.transactions.SimpleNotaryService
@ -26,6 +27,8 @@ import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.Observer
import java.nio.file.Files
import java.nio.file.Path
import kotlin.concurrent.thread
class NodeMonitorModelTest {
@ -55,10 +58,19 @@ class NodeMonitorModelTest {
aliceNode = aliceNodeFuture.get()
notaryNode = notaryNodeFuture.get()
aliceClient = startClient(aliceNode).get()
newNode = { nodeName -> startNode(nodeName).get() }
val monitor = NodeMonitorModel()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath: Path = Files.createTempDirectory("certs")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
init {
configureWithDevSSLCertificate()
}
}
stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed()
stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed()
progressTracking = monitor.progressTracking.bufferUntilSubscribed()
@ -67,7 +79,7 @@ class NodeMonitorModelTest {
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
monitor.register(aliceNode, aliceClient.config.certificatesPath)
monitor.register(aliceNode, sslConfig.certificatesPath)
driverStarted.set(Unit)
stopDriver.get()

View File

@ -6,10 +6,13 @@ import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.explorer.model.IdentityModel
import com.r3corda.node.driver.PortAllocation
import com.r3corda.node.driver.driver
import com.r3corda.node.driver.startClient
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.config.configureWithDevSSLCertificate
import com.r3corda.node.services.transactions.SimpleNotaryService
import javafx.stage.Stage
import tornadofx.App
import java.nio.file.Files
import java.nio.file.Path
class Main : App() {
override val primaryView = MainWindow::class
@ -35,11 +38,19 @@ class Main : App() {
val aliceNode = aliceNodeFuture.get()
val notaryNode = notaryNodeFuture.get()
val aliceClient = startClient(aliceNode).get()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath: Path = Files.createTempDirectory("certs")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
init {
configureWithDevSSLCertificate()
}
}
Models.get<IdentityModel>(Main::class).notary.set(notaryNode.notaryIdentity)
Models.get<IdentityModel>(Main::class).myIdentity.set(aliceNode.legalIdentity)
Models.get<NodeMonitorModel>(Main::class).register(aliceNode, aliceClient.config.certificatesPath)
Models.get<NodeMonitorModel>(Main::class).register(aliceNode, sslConfig.certificatesPath)
startNode("Bob").get()

View File

@ -11,14 +11,8 @@ import org.junit.Test
class DriverTests {
companion object {
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
fun nodeMustBeUp(nodeInfo: NodeInfo, nodeName: String) {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the node is registered in the network map
poll("network map cache for $nodeName") {
networkMapCache.get().firstOrNull {
it.legalIdentity.name == nodeName
}
}
// Check that the port is bound
addressMustBeBound(hostAndPort)
}
@ -36,8 +30,8 @@ class DriverTests {
val notary = startNode("TestNotary", setOf(ServiceInfo(SimpleNotaryService.type)))
val regulator = startNode("Regulator", setOf(ServiceInfo(RegulatorService.type)))
nodeMustBeUp(networkMapCache, notary.get(), "TestNotary")
nodeMustBeUp(networkMapCache, regulator.get(), "Regulator")
nodeMustBeUp(notary.get(), "TestNotary")
nodeMustBeUp(regulator.get(), "Regulator")
Pair(notary.get(), regulator.get())
}
nodeMustBeDown(notary)
@ -48,7 +42,7 @@ class DriverTests {
fun startingNodeWithNoServicesWorks() {
val noService = driver {
val noService = startNode("NoService")
nodeMustBeUp(networkMapCache, noService.get(), "NoService")
nodeMustBeUp(noService.get(), "NoService")
noService.get()
}
nodeMustBeDown(noService)
@ -58,7 +52,7 @@ class DriverTests {
fun randomFreePortAllocationWorks() {
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
val nodeInfo = startNode("NoService")
nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService")
nodeMustBeUp(nodeInfo.get(), "NoService")
nodeInfo.get()
}
nodeMustBeDown(nodeInfo)

View File

@ -4,6 +4,7 @@ import com.r3corda.core.contracts.*
import com.r3corda.node.api.StatesQuery
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction
@ -41,6 +42,16 @@ interface APIServer {
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this nodes configuration and identities.
* Currently tunnels the NodeInfo as an encoding of the Kryo serialised form.
* TODO this functionality should be available via the RPC
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
/**
* Query your "local" states (containing only outputs involving you) and return the hashes & indexes associated with them
* to probably be later inflated by fetchLedgerTransactions() or fetchStates() although because immutable you can cache them

View File

@ -1,32 +1,30 @@
package com.r3corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.node.services.config.ConfigHelper
import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.JsonSupport
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.io.InputStreamReader
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.text.SimpleDateFormat
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.thread
/**
* This file defines a small "Driver" DSL for starting up nodes.
@ -56,31 +54,9 @@ interface DriverDSLExposedInterface {
*/
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceInfo> = setOf()): Future<NodeInfo>
/**
* Starts an [NodeMessagingClient].
*
* @param providedName name of the client, which will be used for creating its directory.
* @param serverAddress the artemis server to connect to, for example a [Node].
*/
fun startClient(providedName: String, serverAddress: HostAndPort): Future<NodeMessagingClient>
/**
* Starts a local [ArtemisMessagingServer] of which there may only be one.
*/
fun startLocalServer(): Future<ArtemisMessagingServer>
fun waitForAllNodesToFinish()
val networkMapCache: NetworkMapCache
}
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
startClient("driver-local-server-client", localServer.myHostPort)
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
startClient(
providedName = providedName ?: "${remoteNodeInfo.legalIdentity.name}-client",
serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address)
)
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start()
fun shutdown()
@ -222,11 +198,8 @@ class DriverDSL(
val baseDirectory: String,
val isDebug: Boolean
) : DriverDSLInternalInterface {
override val networkMapCache = InMemoryNetworkMapCache()
private val networkMapName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort()
private var networkMapNodeInfo: NodeInfo? = null
private val identity = generateKeyPair()
class State {
val registeredProcesses = LinkedList<Process>()
@ -284,6 +257,25 @@ class DriverDSL(
addressMustNotBeBound(networkMapAddress)
}
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
val url = URL("http://${webAddress.toString()}/api/info")
try {
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
if (conn.responseCode != 200) {
return null
}
// For now the NodeInfo is tunneled in its Kryo format over the Node's Web interface.
val om = ObjectMapper()
val module = SimpleModule("NodeInfo")
module.addDeserializer(NodeInfo::class.java, JsonSupport.NodeInfoDeserializer)
om.registerModule(module)
return om.readValue(conn.inputStream, NodeInfo::class.java)
} catch(e: Exception) {
return null
}
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>): Future<NodeInfo> {
val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
@ -307,88 +299,12 @@ class DriverDSL(
return Executors.newSingleThreadExecutor().submit(Callable<NodeInfo> {
registerProcess(DriverDSL.startNode(config, quasarJarPath, debugPort))
poll("network map cache for $name") {
networkMapCache.partyNodes.forEach {
if (it.legalIdentity.name == name) {
return@poll it
}
}
null
}
queryNodeInfo(apiAddress)!!
})
}
override fun startClient(
providedName: String,
serverAddress: HostAndPort
): Future<NodeMessagingClient> {
val nodeConfiguration = FullNodeConfiguration(
ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, providedName),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to providedName
)
)
)
val client = NodeMessagingClient(nodeConfiguration,
serverHostPort = serverAddress,
myIdentity = identity.public,
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1),
persistentInbox = false // Do not create a permanent queue for our transient UI identity
)
return Executors.newSingleThreadExecutor().submit(Callable<NodeMessagingClient> {
client.configureWithDevSSLCertificate()
client.start(null)
thread { client.run() }
state.locked {
clients.add(client)
}
client
})
}
override fun startLocalServer(): Future<ArtemisMessagingServer> {
val name = "driver-local-server"
val config = FullNodeConfiguration(
ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, name),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to name
)
)
)
val server = ArtemisMessagingServer(config,
portAllocation.nextHostAndPort(),
networkMapCache
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
server.configureWithDevSSLCertificate()
server.start()
state.locked {
localServer = server
}
server
})
}
override fun start() {
startNetworkMapService()
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get()
val networkMapAddr = NodeMessagingClient.makeNetworkMapAddress(networkMapAddress)
networkMapCache.addMapService(networkMapClient, networkMapAddr, true)
networkMapNodeInfo = poll("network map cache for $networkMapName") {
networkMapCache.partyNodes.forEach {
if (it.legalIdentity.name == networkMapName) {
return@poll it
}
}
null
}
}
private fun startNetworkMapService() {

View File

@ -24,6 +24,8 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
}
}
override fun info() = node.services.myInfo
override fun queryStates(query: StatesQuery): List<StateRef> {
// We're going to hard code two options here for now and assume that all LinearStates are deals
// Would like to maybe move to a model where we take something like a JEXL string, although don't want to develop

View File

@ -435,7 +435,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps?)
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps)
protected open fun initialiseCheckpointService(dir: Path): CheckpointStorage {
return DBCheckpointStorage()

View File

@ -4,6 +4,7 @@ import com.codahale.metrics.JmxReporter
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.core.then
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
@ -119,11 +120,10 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
}
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread,
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } })
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database)
}
override fun startMessagingService(cordaRPCOps: CordaRPCOps?) {
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
// Start up the embedded MQ server
messageBroker?.apply {
runOnStop += Runnable { messageBroker?.stop() }
@ -268,23 +268,25 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
webServer = initWebServer()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("com.r3cev.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
// Only start the service API requests once the network map registration is complete
networkMapRegistrationFuture.then {
webServer = initWebServer()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("com.r3cev.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownThread = thread(start = false) {
stop()
}

View File

@ -13,6 +13,7 @@ import com.r3corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.nio.file.FileSystems
@ -53,8 +54,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val executor: AffinityExecutor,
val persistentInbox: Boolean = true,
val persistenceTx: (() -> Unit) -> Unit = { it() }) : ArtemisMessagingComponent(config), MessagingServiceInternal {
val database: Database) : ArtemisMessagingComponent(config), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
@ -106,23 +106,20 @@ class NodeMessagingClient(config: NodeConfiguration,
val uuid = uuidString("message_id")
}
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(if (persistentInbox) {
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
}
} else {
HashSet<UUID>()
})
})
init {
require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
fun start(rpcOps: CordaRPCOps? = null) {
fun start(rpcOps: CordaRPCOps) {
state.locked {
check(!started) { "start can't be called twice" }
started = true
@ -146,7 +143,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val queueName = toQueueName(myAddress)
val query = session.queueQuery(queueName)
if (!query.isExists) {
session.createQueue(queueName, queueName, persistentInbox)
session.createQueue(queueName, queueName, true)
}
knownQueues.add(queueName)
p2pConsumer = session.createConsumer(queueName)
@ -154,13 +151,11 @@ class NodeMessagingClient(config: NodeConfiguration,
// Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything
// and perhaps we should, but these queues are not worth persisting.
if (rpcOps != null) {
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
}
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
}
}
@ -277,7 +272,7 @@ class NodeMessagingClient(config: NodeConfiguration,
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
// start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom {
persistenceTx {
databaseTransaction(database) {
callHandlers(msg, deliverTo)
}
}

View File

@ -11,7 +11,10 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.r3corda.core.contracts.BusinessCalendar
import com.r3corda.core.crypto.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import net.i2p.crypto.eddsa.EdDSAPublicKey
import java.math.BigDecimal
import java.time.LocalDate
@ -54,6 +57,11 @@ object JsonSupport {
cordaModule.addSerializer(PublicKeyTree::class.java, PublicKeyTreeSerializer)
cordaModule.addDeserializer(PublicKeyTree::class.java, PublicKeyTreeDeserializer)
// For NodeInfo
// TODO this tunnels the Kryo representation as a Base58 encoded string. Replace when RPC supports this.
cordaModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
cordaModule.addDeserializer(NodeInfo::class.java, NodeInfoDeserializer)
mapper.registerModule(timeModule)
mapper.registerModule(cordaModule)
mapper.registerModule(KotlinModule())
@ -102,6 +110,25 @@ object JsonSupport {
}
}
object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
override fun serialize(value: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString(Base58.encode(value.serialize().bits))
}
}
object NodeInfoDeserializer : JsonDeserializer<NodeInfo>() {
override fun deserialize(parser: JsonParser, context: DeserializationContext): NodeInfo {
if (parser.currentToken == JsonToken.FIELD_NAME) {
parser.nextToken()
}
try {
return Base58.decode(parser.text).deserialize<NodeInfo>()
} catch (e: Exception) {
throw JsonParseException(parser, "Invalid NodeInfo ${parser.text}: ${e.message}")
}
}
}
object SecureHashSerializer : JsonSerializer<SecureHash>() {
override fun serialize(obj: SecureHash, generator: JsonGenerator, provider: SerializerProvider) {
generator.writeString(obj.toString())

View File

@ -1,22 +1,37 @@
package com.r3corda.node.services
import com.google.common.net.HostAndPort
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.LogHelper
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.*
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.configureDatabase
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.freeLocalHostAndPort
import com.r3corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable
import java.io.Closeable
import java.net.ServerSocket
import java.nio.file.Path
import java.util.concurrent.LinkedBlockingQueue
@ -33,12 +48,44 @@ class ArtemisMessagingTests {
val identity = generateKeyPair()
lateinit var config: NodeConfiguration
lateinit var dataSource: Closeable
lateinit var database: Database
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
val networkMapCache = InMemoryNetworkMapCache()
val rpcOps = object : CordaRPCOps {
override val protocolVersion: Int
get() = throw UnsupportedOperationException()
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
}
@Before
fun setUp() {
// TODO: create a base class that provides a default implementation
@ -52,12 +99,18 @@ class ArtemisMessagingTests {
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
}
@After
fun cleanUp() {
messagingClient?.stop()
messagingServer?.stop()
dataSource.close()
LogHelper.reset(PersistentUniquenessProvider::class)
}
@Test
@ -73,7 +126,7 @@ class ArtemisMessagingTests {
val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start(rpcOps)
}
@Test
@ -84,14 +137,14 @@ class ArtemisMessagingTests {
createMessagingServer(serverAddress).start()
messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient!!.start() }
assertThatThrownBy { messagingClient!!.start(rpcOps) }
messagingClient = null
}
@Test
fun `client should connect to local server`() {
createMessagingServer().start()
createMessagingClient().start()
createMessagingClient().start(rpcOps)
}
@Test
@ -101,7 +154,7 @@ class ArtemisMessagingTests {
createMessagingServer().start()
val messagingClient = createMessagingClient()
messagingClient.start()
messagingClient.start(rpcOps)
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r ->
@ -117,9 +170,11 @@ class ArtemisMessagingTests {
}
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), false).apply {
configureWithDevSSLCertificate()
messagingClient = this
return databaseTransaction(database) {
NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply {
configureWithDevSSLCertificate()
messagingClient = this
}
}
}

View File

@ -52,8 +52,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = ProtocolLogicRefFactory(mapOf(Pair(TestProtocolLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
val services: MockServiceHubInternal
lateinit var services: MockServiceHubInternal
lateinit var scheduler: NodeSchedulerService
lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
lateinit var dataSource: Closeable
@ -72,13 +71,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val testReference: NodeSchedulerServiceTest
}
init {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), persistenceTx = { it() })
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
}
@Before
fun setup() {
@ -89,6 +81,11 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
databaseTransaction(database) {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)

View File

@ -9,7 +9,9 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.MessagingServiceBuilder
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory
import rx.Observable
import rx.subjects.PublishSubject
@ -80,10 +82,10 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
@Synchronized
fun createNode(manuallyPumped: Boolean,
executor: AffinityExecutor,
persistenceTx: (() -> Unit) -> Unit)
database: Database)
: Pair<Handle, com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging>> {
check(counter >= 0) { "In memory network stopped: please recreate." }
val builder = createNodeWithID(manuallyPumped, counter, executor, persistenceTx = persistenceTx) as Builder
val builder = createNodeWithID(manuallyPumped, counter, executor, database = database) as Builder
counter++
val id = builder.id
return Pair(id, builder)
@ -98,9 +100,9 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary.
*/
fun createNodeWithID(manuallyPumped: Boolean, id: Int, executor: AffinityExecutor, description: String? = null,
persistenceTx: (() -> Unit) -> Unit)
database: Database)
: MessagingServiceBuilder<InMemoryMessaging> {
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, persistenceTx)
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, database = database)
}
interface LatencyCalculator {
@ -140,11 +142,11 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
messageReceiveQueues.clear()
}
inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val persistenceTx: (() -> Unit) -> Unit)
inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val database: Database)
: com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging> {
override fun start(): ListenableFuture<InMemoryMessaging> {
synchronized(this@InMemoryMessagingNetwork) {
val node = InMemoryMessaging(manuallyPumped, id, executor, persistenceTx)
val node = InMemoryMessaging(manuallyPumped, id, executor, database)
handleEndpointMap[id] = node
return Futures.immediateFuture(node)
}
@ -208,7 +210,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val handle: Handle,
private val executor: AffinityExecutor,
private val persistenceTx: (() -> Unit) -> Unit)
private val database: Database)
: SingletonSerializeAsToken(), com.r3corda.node.services.api.MessagingServiceInternal {
inner class Handler(val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -348,7 +350,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
if (transfer.message.uniqueMessageId !in processedMessages) {
executor.execute {
persistenceTx {
databaseTransaction(database) {
for (handler in deliverTo) {
try {
handler.callback(transfer.message, handler)

View File

@ -125,8 +125,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingServiceInternal {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName,
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } }).start().get()
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, database).start().get()
}
override fun initialiseCheckpointService(dir: Path): CheckpointStorage {
@ -143,7 +142,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
override fun makeKeyManagementService(): KeyManagementService = E2ETestKeyManagementService(partyKeys)
override fun startMessagingService(cordaRPCOps: CordaRPCOps?) {
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
// Nothing to do
}