Merge branches 'colljos-vault-transaction-notes' and 'master' of https://bitbucket.org/R3-CEV/r3prototyping into colljos-vault-transaction-notes

This commit is contained in:
Jose Coll
2016-10-27 14:59:37 +01:00
47 changed files with 671 additions and 2619 deletions

View File

@ -11,14 +11,8 @@ import org.junit.Test
class DriverTests {
companion object {
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
fun nodeMustBeUp(nodeInfo: NodeInfo, nodeName: String) {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the node is registered in the network map
poll("network map cache for $nodeName") {
networkMapCache.get().firstOrNull {
it.legalIdentity.name == nodeName
}
}
// Check that the port is bound
addressMustBeBound(hostAndPort)
}
@ -36,31 +30,31 @@ class DriverTests {
val notary = startNode("TestNotary", setOf(ServiceInfo(SimpleNotaryService.type)))
val regulator = startNode("Regulator", setOf(ServiceInfo(RegulatorService.type)))
nodeMustBeUp(networkMapCache, notary.get(), "TestNotary")
nodeMustBeUp(networkMapCache, regulator.get(), "Regulator")
nodeMustBeUp(notary.get().nodeInfo, "TestNotary")
nodeMustBeUp(regulator.get().nodeInfo, "Regulator")
Pair(notary.get(), regulator.get())
}
nodeMustBeDown(notary)
nodeMustBeDown(regulator)
nodeMustBeDown(notary.nodeInfo)
nodeMustBeDown(regulator.nodeInfo)
}
@Test
fun startingNodeWithNoServicesWorks() {
val noService = driver {
val noService = startNode("NoService")
nodeMustBeUp(networkMapCache, noService.get(), "NoService")
nodeMustBeUp(noService.get().nodeInfo, "NoService")
noService.get()
}
nodeMustBeDown(noService)
nodeMustBeDown(noService.nodeInfo)
}
@Test
fun randomFreePortAllocationWorks() {
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
val nodeInfo = startNode("NoService")
nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService")
nodeMustBeUp(nodeInfo.get().nodeInfo, "NoService")
nodeInfo.get()
}
nodeMustBeDown(nodeInfo)
nodeMustBeDown(nodeInfo.nodeInfo)
}
}

View File

@ -0,0 +1,54 @@
// Due to Capsule being in the default package, which cannot be imported, this caplet
// must also be in the default package. When using Kotlin there are a whole host of exceptions
// trying to construct this from Capsule, so it is written in Java.
import org.apache.commons.io.FilenameUtils;
import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
public class CordaCaplet extends Capsule {
protected CordaCaplet(Capsule pred) {
super(pred);
}
/**
* Overriding the Caplet classpath generation via the intended interface in Capsule.
*/
@Override
@SuppressWarnings("unchecked")
protected <T> T attribute(Map.Entry<String, T> attr) {
// Equality is used here because Capsule never instantiates these attributes but instead reuses the ones
// defined as public static final fields on the Capsule class, therefore referential equality is safe.
if(ATTR_APP_CLASS_PATH == attr) {
T cp = super.attribute(attr);
List<Path> classpath = augmentClasspath((List<Path>) cp, "plugins");
return (T) augmentClasspath(classpath, "dependencies");
}
return super.attribute(attr);
}
// TODO: Make directory configurable via the capsule manifest.
// TODO: Add working directory variable to capsules string replacement variables.
private List<Path> augmentClasspath(List<Path> classpath, String dirName) {
File dir = new File("plugins");
if(!dir.exists()) {
dir.mkdir();
}
File[] files = dir.listFiles();
for (File file : files) {
if (file.isFile() && isJAR(file)) {
classpath.add(file.toPath().toAbsolutePath());
}
}
return classpath;
}
private Boolean isJAR(File file) {
return file.getName().toLowerCase().endsWith(".jar");
}
}

View File

@ -4,6 +4,7 @@ import com.r3corda.core.contracts.*
import com.r3corda.node.api.StatesQuery
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction
@ -41,6 +42,16 @@ interface APIServer {
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this node's configuration and identities.
* Currently tunnels the NodeInfo as an encoding of the Kryo serialised form.
* TODO this functionality should be available via the RPC
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
/**
* Query your "local" states (containing only outputs involving you) and return the hashes & indexes associated with them
* to probably be later inflated by fetchLedgerTransactions() or fetchStates() although because immutable you can cache them

View File

@ -1,5 +1,7 @@
package com.r3corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party
@ -7,6 +9,7 @@ import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.config.ConfigHelper
import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
@ -14,22 +17,24 @@ import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.JsonSupport
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.io.InputStreamReader
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.text.SimpleDateFormat
import java.time.Clock
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.thread
/**
* This file defines a small "Driver" DSL for starting up nodes.
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
*
* The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes.
@ -54,38 +59,18 @@ interface DriverDSLExposedInterface {
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
*/
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceInfo> = setOf()): Future<NodeInfo>
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceInfo> = setOf()): Future<NodeInfoAndConfig>
/**
* Starts an [NodeMessagingClient].
*
* @param providedName name of the client, which will be used for creating its directory.
* @param serverAddress the artemis server to connect to, for example a [Node].
*/
fun startClient(providedName: String, serverAddress: HostAndPort): Future<NodeMessagingClient>
/**
* Starts a local [ArtemisMessagingServer] of which there may only be one.
*/
fun startLocalServer(): Future<ArtemisMessagingServer>
fun waitForAllNodesToFinish()
val networkMapCache: NetworkMapCache
}
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
startClient("driver-local-server-client", localServer.myHostPort)
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
startClient(
providedName = providedName ?: "${remoteNodeInfo.legalIdentity.name}-client",
serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address)
)
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start()
fun shutdown()
}
data class NodeInfoAndConfig(val nodeInfo: NodeInfo, val config: Config)
sealed class PortAllocation {
abstract fun nextPort(): Int
fun nextHostAndPort(): HostAndPort = HostAndPort.fromParts("localhost", nextPort())
@ -122,6 +107,7 @@ sealed class PortAllocation {
* and may be specified in [DriverDSL.startNode].
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param useTestClock If true the test clock will be used in Node.
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode.
* @param dsl The dsl itself.
* @return The value returned in the [dsl] closure.
@ -130,6 +116,7 @@ fun <A> driver(
baseDirectory: String = "build/${getTimestampAsDirectoryName()}",
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
useTestClock: Boolean = false,
isDebug: Boolean = false,
dsl: DriverDSLExposedInterface.() -> A
) = genericDriver(
@ -137,6 +124,7 @@ fun <A> driver(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
baseDirectory = baseDirectory,
useTestClock = useTestClock,
isDebug = isDebug
),
coerce = { it },
@ -216,17 +204,15 @@ fun <A> poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120,
return result
}
class DriverDSL(
open class DriverDSL(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val baseDirectory: String,
val useTestClock: Boolean,
val isDebug: Boolean
) : DriverDSLInternalInterface {
override val networkMapCache = InMemoryNetworkMapCache()
private val networkMapName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort()
private var networkMapNodeInfo: NodeInfo? = null
private val identity = generateKeyPair()
class State {
val registeredProcesses = LinkedList<Process>()
@ -284,7 +270,26 @@ class DriverDSL(
addressMustNotBeBound(networkMapAddress)
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>): Future<NodeInfo> {
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
val url = URL("http://${webAddress.toString()}/api/info")
try {
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
if (conn.responseCode != 200) {
return null
}
// For now the NodeInfo is tunneled in its Kryo format over the Node's Web interface.
val om = ObjectMapper()
val module = SimpleModule("NodeInfo")
module.addDeserializer(NodeInfo::class.java, JsonSupport.NodeInfoDeserializer)
om.registerModule(module)
return om.readValue(conn.inputStream, NodeInfo::class.java)
} catch(e: Exception) {
return null
}
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>): Future<NodeInfoAndConfig> {
val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
@ -301,94 +306,19 @@ class DriverDSL(
"artemisAddress" to messagingAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.joinToString(","),
"networkMapAddress" to networkMapAddress.toString()
"networkMapAddress" to networkMapAddress.toString(),
"useTestClock" to useTestClock
)
)
return Executors.newSingleThreadExecutor().submit(Callable<NodeInfo> {
return Executors.newSingleThreadExecutor().submit(Callable<NodeInfoAndConfig> {
registerProcess(DriverDSL.startNode(config, quasarJarPath, debugPort))
poll("network map cache for $name") {
networkMapCache.partyNodes.forEach {
if (it.legalIdentity.name == name) {
return@poll it
}
}
null
}
NodeInfoAndConfig(queryNodeInfo(apiAddress)!!, config)
})
}
override fun startClient(
providedName: String,
serverAddress: HostAndPort
): Future<NodeMessagingClient> {
val nodeConfiguration = FullNodeConfiguration(
ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, providedName),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to providedName
)
)
)
val client = NodeMessagingClient(nodeConfiguration,
serverHostPort = serverAddress,
myIdentity = identity.public,
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1),
persistentInbox = false // Do not create a permanent queue for our transient UI identity
)
return Executors.newSingleThreadExecutor().submit(Callable<NodeMessagingClient> {
client.configureWithDevSSLCertificate()
client.start(null)
thread { client.run() }
state.locked {
clients.add(client)
}
client
})
}
override fun startLocalServer(): Future<ArtemisMessagingServer> {
val name = "driver-local-server"
val config = FullNodeConfiguration(
ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, name),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to name
)
)
)
val server = ArtemisMessagingServer(config,
portAllocation.nextHostAndPort(),
networkMapCache
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
server.configureWithDevSSLCertificate()
server.start()
state.locked {
localServer = server
}
server
})
}
override fun start() {
startNetworkMapService()
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get()
val networkMapAddr = NodeMessagingClient.makeNetworkMapAddress(networkMapAddress)
networkMapCache.addMapService(networkMapClient, networkMapAddr, true)
networkMapNodeInfo = poll("network map cache for $networkMapName") {
networkMapCache.partyNodes.forEach {
if (it.legalIdentity.name == networkMapName) {
return@poll it
}
}
null
}
}
private fun startNetworkMapService() {
@ -396,7 +326,6 @@ class DriverDSL(
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val nodeDirectory = "$baseDirectory/$networkMapName"
val config = ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(nodeDirectory),
allowMissingConfig = true,
@ -405,7 +334,8 @@ class DriverDSL(
"basedir" to Paths.get(nodeDirectory).normalize().toString(),
"artemisAddress" to networkMapAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to ""
"extraAdvertisedServiceIds" to "",
"useTestClock" to useTestClock
)
)

View File

@ -24,6 +24,8 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
}
}
override fun info() = node.services.myInfo
override fun queryStates(query: StatesQuery): List<StateRef> {
// We're going to hard code two options here for now and assume that all LinearStates are deals
// Would like to maybe move to a model where we take something like a JEXL string, although don't want to develop

View File

@ -209,9 +209,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = makeKeyManagementService()
api = APIServerImpl(this@AbstractNode)
scheduler = NodeSchedulerService(database, services)
protocolLogicFactory = initialiseProtocolLogicFactory()
scheduler = NodeSchedulerService(database, services, protocolLogicFactory)
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
@ -435,15 +434,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps?)
protected open fun initialiseCheckpointService(dir: Path): CheckpointStorage {
return DBCheckpointStorage()
}
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps)
protected open fun initialiseStorageService(dir: Path): Pair<TxWritableStorageService, CheckpointStorage> {
val attachments = makeAttachmentStorage(dir)
val checkpointStorage = initialiseCheckpointService(dir)
val checkpointStorage = DBCheckpointStorage()
val transactionStorage = DBTransactionStorage()
_servicesThatAcceptUploads += attachments
// Populate the partyKeys set.

View File

@ -4,6 +4,7 @@ import com.codahale.metrics.JmxReporter
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.core.then
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
@ -119,11 +120,10 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
}
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread,
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } })
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database)
}
override fun startMessagingService(cordaRPCOps: CordaRPCOps?) {
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
// Start up the embedded MQ server
messageBroker?.apply {
runOnStop += Runnable { messageBroker?.stop() }
@ -268,23 +268,25 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
webServer = initWebServer()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("com.r3cev.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
// Only start the service API requests once the network map registration is complete
networkMapRegistrationFuture.then {
webServer = initWebServer()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("com.r3cev.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownThread = thread(start = false) {
stop()
}

View File

@ -1,25 +0,0 @@
package com.r3corda.node.services.clientapi
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.TwoPartyDealProtocol.Fixer
import com.r3corda.protocols.TwoPartyDealProtocol.Floater
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the symmetric session work
*/
object FixingSessionInitiation {
class Plugin: CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
class Service(services: ServiceHubInternal) : SingletonSerializeAsToken() {
init {
services.registerProtocolInitiator(Floater::class) { Fixer(it) }
}
}
}

View File

@ -2,6 +2,7 @@ package com.r3corda.node.services.config
import com.google.common.net.HostAndPort
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.exists
import com.r3corda.core.utilities.loggerFor
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -89,14 +90,24 @@ fun Config.getProperties(path: String): Properties {
*/
fun NodeSSLConfiguration.configureWithDevSSLCertificate() {
Files.createDirectories(certificatesPath)
if (!Files.exists(trustStorePath)) {
if (!trustStorePath.exists()) {
Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"),
trustStorePath)
}
if (!Files.exists(keyStorePath)) {
if (!keyStorePath.exists()) {
val caKeyStore = X509Utilities.loadKeyStore(
javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordadevcakeys.jks"),
"cordacadevpass")
X509Utilities.createKeystoreForSSL(keyStorePath, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass")
}
}
// TODO Move this to CoreTestUtils.kt once we can pry this from the explorer
fun configureTestSSL(): NodeSSLConfiguration = object : NodeSSLConfiguration {
override val certificatesPath = Files.createTempDirectory("certs")
override val keyStorePassword: String get() = "cordacadevpass"
override val trustStorePassword: String get() = "trustpass"
init {
configureWithDevSSLCertificate()
}
}

View File

@ -5,10 +5,13 @@ import com.r3corda.core.div
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.node.internal.Node
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.TestClock
import com.typesafe.config.Config
import java.nio.file.Path
import java.time.Clock
import java.util.*
interface NodeSSLConfiguration {
@ -46,8 +49,12 @@ class FullNodeConfiguration(config: Config) : NodeConfiguration {
val webAddress: HostAndPort by config
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
val extraAdvertisedServiceIds: String by config
val useTestClock: Boolean by config.getOrElse { false }
fun createNode(): Node {
// This is a sanity feature do not remove.
require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" }
val advertisedServices = mutableSetOf<ServiceInfo>()
if (!extraAdvertisedServiceIds.isNullOrEmpty()) {
for (serviceId in extraAdvertisedServiceIds.split(",")) {
@ -56,7 +63,7 @@ class FullNodeConfiguration(config: Config) : NodeConfiguration {
}
if (networkMapAddress == null) advertisedServices.add(ServiceInfo(NetworkMapService.type))
val networkMapMessageAddress: SingleMessageRecipient? = if (networkMapAddress == null) null else NodeMessagingClient.makeNetworkMapAddress(networkMapAddress!!)
return Node(this, networkMapMessageAddress, advertisedServices)
return Node(this, networkMapMessageAddress, advertisedServices, if(useTestClock == true) TestClock() else NodeClock())
}
}

View File

@ -45,7 +45,7 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class NodeSchedulerService(private val database: Database,
private val services: ServiceHubInternal,
private val protocolLogicRefFactory: ProtocolLogicRefFactory = ProtocolLogicRefFactory(),
private val protocolLogicRefFactory: ProtocolLogicRefFactory,
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor())
: SchedulerService, SingletonSerializeAsToken() {

View File

@ -15,16 +15,15 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.FileSystems
import java.nio.file.Path
import java.security.KeyStore
import java.security.PublicKey
/**
* The base class for Artemis services that defines shared data structures and transport configuration
*
* @param certificatePath A place where Artemis can stash its message journal and other files.
* @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore
*/
abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : SingletonSerializeAsToken() {
abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
companion object {
init {
@ -36,7 +35,7 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
const val RPC_REQUESTS_QUEUE = "rpc.requests"
@JvmStatic
protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap")
protected val NETWORK_MAP_ADDRESS = SimpleString("${PEERS_PREFIX}networkmap")
/**
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
@ -70,7 +69,7 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
}
protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString = NETWORK_MAP_ADDRESS
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
}
/**
@ -80,12 +79,12 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
*/
data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) }
override fun toString(): String {
return "NodeAddress(identity = $queueName, $hostAndPort"
}
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
}
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
abstract val config: NodeSSLConfiguration
protected fun parseKeyFromQueueName(name: String): PublicKey {
require(name.startsWith(PEERS_PREFIX))
return parsePublicKeyBase58(name.substring(PEERS_PREFIX.length))
@ -119,39 +118,46 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
}
}
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt(),
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int): TransportConfiguration {
config.keyStorePath.expectedOnDefaultFileSystem()
config.trustStorePath.expectedOnDefaultFileSystem()
return TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt(),
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
// and AES encryption
TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.keyStorePath,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStorePath,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
)
)
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
// and AES encryption
TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.keyStorePath,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStorePath,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
)
)
}
fun configureWithDevSSLCertificate() {
config.configureWithDevSSLCertificate()
}
protected fun Path.expectedOnDefaultFileSystem() {
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
}

View File

@ -3,11 +3,15 @@ package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.AddressFormatException
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.div
import com.r3corda.core.exists
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.use
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE_NAME
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE_NAME
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
@ -17,11 +21,25 @@ import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import rx.Subscription
import java.math.BigInteger
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.security.Principal
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.security.auth.Subject
import javax.security.auth.callback.CallbackHandler
import javax.security.auth.callback.NameCallback
import javax.security.auth.callback.PasswordCallback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException
import javax.security.auth.spi.LoginModule
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
@ -37,9 +55,9 @@ import javax.annotation.concurrent.ThreadSafe
* a fully connected network, trusted network or on localhost.
*/
@ThreadSafe
class ArtemisMessagingServer(config: NodeConfiguration,
class ArtemisMessagingServer(override val config: NodeConfiguration,
val myHostPort: HostAndPort,
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent(config) {
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent() {
companion object {
val log = loggerFor<ArtemisMessagingServer>()
}
@ -52,6 +70,10 @@ class ArtemisMessagingServer(config: NodeConfiguration,
private lateinit var activeMQServer: ActiveMQServer
private var networkChangeHandle: Subscription? = null
init {
config.basedir.expectedOnDefaultFileSystem()
}
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
@ -116,12 +138,7 @@ class ArtemisMessagingServer(config: NodeConfiguration,
}
private fun configureAndStartServer() {
val config = createArtemisConfig(config.certificatesPath, myHostPort).apply {
securityRoles = mapOf(
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
)
}
val config = createArtemisConfig()
val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(config, securityManager).apply {
@ -157,28 +174,61 @@ class ArtemisMessagingServer(config: NodeConfiguration,
activeMQServer.start()
}
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
val config = ConfigurationImpl()
setConfigDirectories(config, directory)
config.acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
val artemisDir = config.basedir / "artemis"
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "largemessages").toString()
acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", myHostPort.port)
)
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
config.idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
config.isPersistIDCache = true
return config
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
setupUserRoles()
}
// This gives nodes full access and RPC clients only enough to do RPC
private fun ConfigurationImpl.setupUserRoles() {
// TODO COR-307
val nodeRole = Role(NODE_ROLE_NAME, true, true, true, true, true, true, true, true)
val clientRpcRole = restrictedRole(RPC_ROLE_NAME, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
securityRoles = mapOf(
"#" to setOf(nodeRole),
"clients.*.rpc.responses.*" to setOf(nodeRole, clientRpcRole),
"clients.*.rpc.observations.*" to setOf(nodeRole, clientRpcRole),
RPC_REQUESTS_QUEUE to setOf(nodeRole, restrictedRole(RPC_ROLE_NAME, send = true))
)
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue,
deleteNonDurableQueue, manage, browse)
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
// TODO: set up proper security configuration https://r3-cev.atlassian.net/browse/COR-307
val securityConfig = SecurityConfiguration().apply {
addUser("internal", BigInteger(128, newSecureRandom()).toString(16))
addRole("internal", "internal")
defaultUser = "internal"
val rpcUsersFile = config.basedir / "rpc-users.properties"
if (!rpcUsersFile.exists()) {
val users = Properties()
users["user1"] = "test"
Files.newOutputStream(rpcUsersFile).use {
users.store(it, null)
}
}
return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(NodeLoginModule.FILE_KEY to rpcUsersFile)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
}
private fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
@ -194,12 +244,11 @@ class ArtemisMessagingServer(config: NodeConfiguration,
private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
private fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) {
private fun deployBridge(hostAndPort: HostAndPort, name: String) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
val nameStr = name.toString()
setName(nameStr)
queueName = nameStr
forwardingAddress = nameStr
setName(name)
queueName = name
forwardingAddress = name
staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridges automatic deduplication logic
@ -218,7 +267,7 @@ class ArtemisMessagingServer(config: NodeConfiguration,
if (!connectorExists(hostAndPort))
addConnector(hostAndPort)
if (!bridgeExists(name))
deployBridge(hostAndPort, name)
deployBridge(hostAndPort, name.toString())
}
private fun maybeDestroyBridge(name: SimpleString) {
@ -227,11 +276,81 @@ class ArtemisMessagingServer(config: NodeConfiguration,
}
}
private fun setConfigDirectories(config: Configuration, dir: Path) {
config.apply {
bindingsDirectory = dir.resolve("bindings").toString()
journalDirectory = dir.resolve("journal").toString()
largeMessagesDirectory = dir.resolve("largemessages").toString()
class NodeLoginModule : LoginModule {
companion object {
const val FILE_KEY = "rpc-users-file"
const val NODE_ROLE_NAME = "NodeRole"
const val RPC_ROLE_NAME = "RpcRole"
}
private val users = Properties()
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var principals: List<Principal>
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
val rpcUsersFile = options[FILE_KEY] as Path
if (rpcUsersFile.exists()) {
rpcUsersFile.use {
users.load(it)
}
}
}
override fun login(): Boolean {
val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false)
try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback))
} catch (e: IOException) {
throw LoginException(e.message)
} catch (e: UnsupportedCallbackException) {
throw LoginException("${e.message} not available to obtain information from user")
}
val username = nameCallback.name ?: throw FailedLoginException("User name is null")
val receivedPassword = passwordCallback.password ?: throw FailedLoginException("Password is null")
val password = if (username == "Node") "Node" else users[username] ?: throw FailedLoginException("User does not exist")
if (password != String(receivedPassword)) {
throw FailedLoginException("Password does not match")
}
principals = listOf(
UserPrincipal(username),
RolePrincipal(if (username == "Node") NODE_ROLE_NAME else RPC_ROLE_NAME))
loginSucceeded = true
return loginSucceeded
}
override fun commit(): Boolean {
val result = loginSucceeded
if (result) {
subject.principals.addAll(principals)
}
clear()
return result
}
override fun abort(): Boolean {
clear()
return true
}
override fun logout(): Boolean {
subject.principals.removeAll(principals)
return true
}
private fun clear() {
loginSucceeded = false
}
}
}

View File

@ -11,11 +11,13 @@ import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.nio.file.FileSystems
import java.security.PublicKey
import java.time.Instant
import java.util.*
@ -49,12 +51,11 @@ import javax.annotation.concurrent.ThreadSafe
* in this class.
*/
@ThreadSafe
class NodeMessagingClient(config: NodeConfiguration,
class NodeMessagingClient(override val config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val executor: AffinityExecutor,
val persistentInbox: Boolean = true,
val persistenceTx: (() -> Unit) -> Unit = { it() }) : ArtemisMessagingComponent(config), MessagingServiceInternal {
val database: Database) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
@ -86,8 +87,7 @@ class NodeMessagingClient(config: NodeConfiguration,
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
var undeliveredMessages = listOf<Message>()
var pendingRedelivery = JDBCHashSet<Message>("pending_messages",loadOnInit = true)
}
/** A registration to handle messages of different types */
@ -106,23 +106,16 @@ class NodeMessagingClient(config: NodeConfiguration,
val uuid = uuidString("message_id")
}
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(if (persistentInbox) {
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
}
} else {
HashSet<UUID>()
})
})
init {
require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
fun start(rpcOps: CordaRPCOps? = null) {
fun start(rpcOps: CordaRPCOps) {
state.locked {
check(!started) { "start can't be called twice" }
started = true
@ -135,7 +128,7 @@ class NodeMessagingClient(config: NodeConfiguration,
// Create a session. Note that the acknowledgement of messages is not flushed to
// the Artermis journal until the default buffer size of 1MB is acknowledged.
val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
val session = clientFactory!!.createSession("Node", "Node", false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
this.session = session
session.start()
@ -146,7 +139,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val queueName = toQueueName(myAddress)
val query = session.queueQuery(queueName)
if (!query.isExists) {
session.createQueue(queueName, queueName, persistentInbox)
session.createQueue(queueName, queueName, true)
}
knownQueues.add(queueName)
p2pConsumer = session.createConsumer(queueName)
@ -154,13 +147,11 @@ class NodeMessagingClient(config: NodeConfiguration,
// Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything
// and perhaps we should, but these queues are not worth persisting.
if (rpcOps != null) {
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
}
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
}
}
@ -227,8 +218,9 @@ class NodeMessagingClient(config: NodeConfiguration,
val topic = message.getStringProperty(TOPIC_PROPERTY)
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
// Use the magic deduplication property built into Artemis as our message identity too
val uuid = UUID.fromString(message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid")
val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID))
val user = message.getStringProperty(HDR_VALIDATED_USER)
log.info("Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid")
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
@ -259,10 +251,10 @@ class NodeMessagingClient(config: NodeConfiguration,
// without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
// This is a hack; transient messages held in memory isn't crash resistant.
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
state.locked {
undeliveredMessages += msg
databaseTransaction(database) {
pendingRedelivery.add(msg)
}
}
return false
}
@ -277,7 +269,7 @@ class NodeMessagingClient(config: NodeConfiguration,
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
// start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom {
persistenceTx {
databaseTransaction(database) {
callHandlers(msg, deliverTo)
}
}
@ -346,7 +338,7 @@ class NodeMessagingClient(config: NodeConfiguration,
putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
if (knownQueues.add(queueName)) {
@ -376,9 +368,12 @@ class NodeMessagingClient(config: NodeConfiguration,
val handler = Handler(topicSession, callback)
handlers.add(handler)
val messagesToRedeliver = state.locked {
val messagesToRedeliver = undeliveredMessages
undeliveredMessages = listOf()
messagesToRedeliver
val pending = ArrayList<Message>()
databaseTransaction(database) {
pending.addAll(pendingRedelivery)
pendingRedelivery.clear()
}
pending
}
messagesToRedeliver.forEach { deliver(it) }
return handler
@ -391,8 +386,8 @@ class NodeMessagingClient(config: NodeConfiguration,
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
return object : Message {
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val topicSession: TopicSession = topicSession
override val data: ByteArray = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val uniqueMessageId: UUID = uuid
@ -408,7 +403,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val msg = session!!.createMessage(false).apply {
writeBodyBufferBytes(bits.bits)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
producer!!.send(toAddress, msg)
}

View File

@ -1,73 +0,0 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.*
import java.util.Collections.synchronizedMap
import javax.annotation.concurrent.ThreadSafe
/**
* File-based checkpoint storage, storing checkpoints per file.
*/
@ThreadSafe
class PerFileCheckpointStorage(val storeDir: Path) : CheckpointStorage {
companion object {
private val logger = loggerFor<PerFileCheckpointStorage>()
private val fileExtension = ".checkpoint"
}
private val checkpointFiles = synchronizedMap(IdentityHashMap<Checkpoint, Path>())
init {
logger.trace { "Initialising per file checkpoint storage on $storeDir" }
Files.createDirectories(storeDir)
Files.list(storeDir)
.filter { it.toString().toLowerCase().endsWith(fileExtension) }
.forEach {
val checkpoint = Files.readAllBytes(it).deserialize<Checkpoint>()
checkpointFiles[checkpoint] = it
}
}
override fun addCheckpoint(checkpoint: Checkpoint) {
val fileName = "${checkpoint.id.toString().toLowerCase()}$fileExtension"
val checkpointFile = storeDir.resolve(fileName)
atomicWrite(checkpointFile, checkpoint.serialize())
logger.trace { "Stored $checkpoint to $checkpointFile" }
checkpointFiles[checkpoint] = checkpointFile
}
private fun atomicWrite(checkpointFile: Path, serialisedCheckpoint: SerializedBytes<Checkpoint>) {
val tempCheckpointFile = checkpointFile.parent.resolve("${checkpointFile.fileName}.tmp")
serialisedCheckpoint.writeToFile(tempCheckpointFile)
Files.move(tempCheckpointFile, checkpointFile, StandardCopyOption.ATOMIC_MOVE)
}
override fun removeCheckpoint(checkpoint: Checkpoint) {
val checkpointFile = checkpointFiles.remove(checkpoint)
require(checkpointFile != null) { "Trying to removing unknown checkpoint: $checkpoint" }
Files.delete(checkpointFile)
logger.trace { "Removed $checkpoint ($checkpointFile)" }
}
override fun forEach(block: (Checkpoint)->Boolean) {
synchronized(checkpointFiles) {
for(checkpoint in checkpointFiles.keys) {
if (!block(checkpoint)) {
break
}
}
}
}
}

View File

@ -1,70 +0,0 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.ThreadBox
import com.r3corda.core.bufferUntilSubscribed
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.TransactionStorage
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Files
import java.nio.file.Path
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* File-based transaction storage, storing transactions per file.
*/
@ThreadSafe
class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
companion object {
private val logger = loggerFor<PerFileCheckpointStorage>()
private val fileExtension = ".transaction"
}
private val mutex = ThreadBox(object {
val transactionsMap = HashMap<SecureHash, SignedTransaction>()
val updatesPublisher = PublishSubject.create<SignedTransaction>()
fun notify(transaction: SignedTransaction) = updatesPublisher.onNext(transaction)
})
override val updates: Observable<SignedTransaction>
get() = mutex.content.updatesPublisher
init {
logger.trace { "Initialising per file transaction storage on $storeDir" }
Files.createDirectories(storeDir)
mutex.locked {
Files.list(storeDir)
.filter { it.toString().toLowerCase().endsWith(fileExtension) }
.map { Files.readAllBytes(it).deserialize<SignedTransaction>() }
.forEach { transactionsMap[it.id] = it }
}
}
override fun addTransaction(transaction: SignedTransaction) {
val transactionFile = storeDir.resolve("${transaction.id.toString().toLowerCase()}$fileExtension")
transaction.serialize().writeToFile(transactionFile)
mutex.locked {
transactionsMap[transaction.id] = transaction
notify(transaction)
}
logger.trace { "Stored $transaction to $transactionFile" }
}
override fun getTransaction(id: SecureHash): SignedTransaction? = mutex.locked { transactionsMap[id] }
val transactions: Iterable<SignedTransaction> get() = mutex.locked { transactionsMap.values.toList() }
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
return mutex.locked {
Pair(transactionsMap.values.toList(), updates.bufferUntilSubscribed())
}
}
}

View File

@ -11,7 +11,10 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.r3corda.core.contracts.BusinessCalendar
import com.r3corda.core.crypto.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import net.i2p.crypto.eddsa.EdDSAPublicKey
import java.math.BigDecimal
import java.time.LocalDate
@ -54,6 +57,11 @@ object JsonSupport {
cordaModule.addSerializer(PublicKeyTree::class.java, PublicKeyTreeSerializer)
cordaModule.addDeserializer(PublicKeyTree::class.java, PublicKeyTreeDeserializer)
// For NodeInfo
// TODO this tunnels the Kryo representation as a Base58 encoded string. Replace when RPC supports this.
cordaModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
cordaModule.addDeserializer(NodeInfo::class.java, NodeInfoDeserializer)
mapper.registerModule(timeModule)
mapper.registerModule(cordaModule)
mapper.registerModule(KotlinModule())
@ -102,6 +110,25 @@ object JsonSupport {
}
}
object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
override fun serialize(value: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString(Base58.encode(value.serialize().bits))
}
}
object NodeInfoDeserializer : JsonDeserializer<NodeInfo>() {
override fun deserialize(parser: JsonParser, context: DeserializationContext): NodeInfo {
if (parser.currentToken == JsonToken.FIELD_NAME) {
parser.nextToken()
}
try {
return Base58.decode(parser.text).deserialize<NodeInfo>()
} catch (e: Exception) {
throw JsonParseException(parser, "Invalid NodeInfo ${parser.text}: ${e.message}")
}
}
}
object SecureHashSerializer : JsonSerializer<SecureHash>() {
override fun serialize(obj: SecureHash, generator: JsonGenerator, provider: SerializerProvider) {
generator.writeString(obj.toString())

View File

@ -0,0 +1,43 @@
package com.r3corda.node.utilities
import com.r3corda.core.serialization.SerializeAsToken
import com.r3corda.core.serialization.SerializeAsTokenContext
import com.r3corda.core.serialization.SingletonSerializationToken
import java.time.*
import javax.annotation.concurrent.ThreadSafe
/**
* A [Clock] that can have the date advanced for use in demos.
*/
@ThreadSafe
class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock(), SerializeAsToken {
private val token = SingletonSerializationToken(this)
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
@Synchronized fun updateDate(date: LocalDate): Boolean {
val currentDate = LocalDate.now(this)
if (currentDate.isBefore(date)) {
// It's ok to increment
delegateClock = Clock.offset(delegateClock, Duration.between(currentDate.atStartOfDay(), date.atStartOfDay()))
notifyMutationObservers()
return true
}
return false
}
@Synchronized override fun instant(): Instant {
return delegateClock.instant()
}
// Do not use this. Instead seek to use ZonedDateTime methods.
override fun withZone(zone: ZoneId): Clock {
throw UnsupportedOperationException("Tokenized clock does not support withZone()")
}
@Synchronized override fun getZone(): ZoneId {
return delegateClock.zone
}
}

View File

@ -1,4 +1,3 @@
# Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry
com.r3corda.node.services.clientapi.FixingSessionInitiation$Plugin
com.r3corda.node.services.NotaryChange$Plugin
com.r3corda.node.services.persistence.DataVending$Plugin

View File

@ -13,4 +13,5 @@ dataSourceProperties = {
devMode = true
certificateSigningService = "https://cordaci-netperm.corda.r3cev.com"
useHTTPS = false
h2port = 0
h2port = 0
useTestClock = false

View File

@ -155,16 +155,14 @@ class TwoPartyTradeProtocolTests {
bobNode.pumpReceive()
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
databaseTransaction(bobNode.database) {
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
}
val storage = bobNode.storage.validatedTransactions
val bobTransactionsBeforeCrash = if (storage is PerFileTransactionStorage) {
storage.transactions
} else if (storage is DBTransactionStorage) {
databaseTransaction(bobNode.database) {
storage.transactions
}
} else throw IllegalArgumentException("Unknown storage implementation")
val bobTransactionsBeforeCrash = databaseTransaction(bobNode.database) {
(storage as DBTransactionStorage).transactions
}
assertThat(bobTransactionsBeforeCrash).isNotEmpty()
// .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk.

View File

@ -1,22 +1,37 @@
package com.r3corda.node.services
import com.google.common.net.HostAndPort
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.LogHelper
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.*
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.configureDatabase
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.freeLocalHostAndPort
import com.r3corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable
import java.io.Closeable
import java.net.ServerSocket
import java.nio.file.Path
import java.util.concurrent.LinkedBlockingQueue
@ -33,12 +48,44 @@ class ArtemisMessagingTests {
val identity = generateKeyPair()
lateinit var config: NodeConfiguration
lateinit var dataSource: Closeable
lateinit var database: Database
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
val networkMapCache = InMemoryNetworkMapCache()
val rpcOps = object : CordaRPCOps {
override val protocolVersion: Int
get() = throw UnsupportedOperationException()
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
}
@Before
fun setUp() {
// TODO: create a base class that provides a default implementation
@ -52,12 +99,18 @@ class ArtemisMessagingTests {
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
}
@After
fun cleanUp() {
messagingClient?.stop()
messagingServer?.stop()
dataSource.close()
LogHelper.reset(PersistentUniquenessProvider::class)
}
@Test
@ -73,7 +126,7 @@ class ArtemisMessagingTests {
val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start(rpcOps)
}
@Test
@ -84,14 +137,14 @@ class ArtemisMessagingTests {
createMessagingServer(serverAddress).start()
messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient!!.start() }
assertThatThrownBy { messagingClient!!.start(rpcOps) }
messagingClient = null
}
@Test
fun `client should connect to local server`() {
createMessagingServer().start()
createMessagingClient().start()
createMessagingClient().start(rpcOps)
}
@Test
@ -101,7 +154,7 @@ class ArtemisMessagingTests {
createMessagingServer().start()
val messagingClient = createMessagingClient()
messagingClient.start()
messagingClient.start(rpcOps)
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r ->
@ -117,9 +170,11 @@ class ArtemisMessagingTests {
}
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), false).apply {
configureWithDevSSLCertificate()
messagingClient = this
return databaseTransaction(database) {
NodeMessagingClient(config, server, identity.public, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply {
configureWithDevSSLCertificate()
messagingClient = this
}
}
}

View File

@ -12,7 +12,7 @@ import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.DUMMY_NOTARY
import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.persistence.DBCheckpointStorage
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.utilities.AffinityExecutor
@ -52,8 +52,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = ProtocolLogicRefFactory(mapOf(Pair(TestProtocolLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
val services: MockServiceHubInternal
lateinit var services: MockServiceHubInternal
lateinit var scheduler: NodeSchedulerService
lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
lateinit var dataSource: Closeable
@ -72,13 +71,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val testReference: NodeSchedulerServiceTest
}
init {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), persistenceTx = { it() })
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
}
@Before
fun setup() {
@ -89,9 +81,14 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
databaseTransaction(database) {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
mockSMM.changes.subscribe { change ->
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllProtocols.countDown()

View File

@ -1,99 +0,0 @@
package com.r3corda.node.services.persistence
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.node.services.api.Checkpoint
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
class PerFileCheckpointStorageTests {
val fileSystem: FileSystem = Jimfs.newFileSystem(unix())
val storeDir: Path = fileSystem.getPath("store")
lateinit var checkpointStorage: PerFileCheckpointStorage
@Before
fun setUp() {
newCheckpointStorage()
}
@After
fun cleanUp() {
fileSystem.close()
}
@Test
fun `add new checkpoint`() {
val checkpoint = newCheckpoint()
checkpointStorage.addCheckpoint(checkpoint)
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
newCheckpointStorage()
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
}
@Test
fun `remove checkpoint`() {
val checkpoint = newCheckpoint()
checkpointStorage.addCheckpoint(checkpoint)
checkpointStorage.removeCheckpoint(checkpoint)
assertThat(checkpointStorage.checkpoints()).isEmpty()
newCheckpointStorage()
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
@Test
fun `remove unknown checkpoint`() {
val checkpoint = newCheckpoint()
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
checkpointStorage.removeCheckpoint(checkpoint)
}
}
@Test
fun `add two checkpoints then remove first one`() {
val firstCheckpoint = newCheckpoint()
checkpointStorage.addCheckpoint(firstCheckpoint)
val secondCheckpoint = newCheckpoint()
checkpointStorage.addCheckpoint(secondCheckpoint)
checkpointStorage.removeCheckpoint(firstCheckpoint)
assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint)
newCheckpointStorage()
assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint)
}
@Test
fun `add checkpoint and then remove after 'restart'`() {
val originalCheckpoint = newCheckpoint()
checkpointStorage.addCheckpoint(originalCheckpoint)
newCheckpointStorage()
val reconstructedCheckpoint = checkpointStorage.checkpoints().single()
assertThat(reconstructedCheckpoint).isEqualTo(originalCheckpoint).isNotSameAs(originalCheckpoint)
checkpointStorage.removeCheckpoint(reconstructedCheckpoint)
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
@Test
fun `non-checkpoint files are ignored`() {
val checkpoint = newCheckpoint()
checkpointStorage.addCheckpoint(checkpoint)
Files.write(storeDir.resolve("random-non-checkpoint-file"), "this is not a checkpoint!!".toByteArray())
newCheckpointStorage()
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
}
private fun newCheckpointStorage() {
checkpointStorage = PerFileCheckpointStorage(storeDir)
}
private var checkpointCount = 1
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)))
}

View File

@ -1,100 +0,0 @@
package com.r3corda.node.services.persistence
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.NullPublicKey
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.transactions.SignedTransaction
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class PerFileTransactionStorageTests {
val fileSystem: FileSystem = Jimfs.newFileSystem(unix())
val storeDir: Path = fileSystem.getPath("store")
lateinit var transactionStorage: PerFileTransactionStorage
@Before
fun setUp() {
newTransactionStorage()
}
@After
fun cleanUp() {
fileSystem.close()
}
@Test
fun `empty store`() {
assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
assertThat(transactionStorage.transactions).isEmpty()
newTransactionStorage()
assertThat(transactionStorage.transactions).isEmpty()
}
@Test
fun `one transaction`() {
val transaction = newTransaction()
transactionStorage.addTransaction(transaction)
assertTransactionIsRetrievable(transaction)
assertThat(transactionStorage.transactions).containsExactly(transaction)
newTransactionStorage()
assertTransactionIsRetrievable(transaction)
assertThat(transactionStorage.transactions).containsExactly(transaction)
}
@Test
fun `two transactions across restart`() {
val firstTransaction = newTransaction()
val secondTransaction = newTransaction()
transactionStorage.addTransaction(firstTransaction)
newTransactionStorage()
transactionStorage.addTransaction(secondTransaction)
assertTransactionIsRetrievable(firstTransaction)
assertTransactionIsRetrievable(secondTransaction)
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
}
@Test
fun `non-transaction files are ignored`() {
val transactions = newTransaction()
transactionStorage.addTransaction(transactions)
Files.write(storeDir.resolve("random-non-tx-file"), "this is not a transaction!!".toByteArray())
newTransactionStorage()
assertThat(transactionStorage.transactions).containsExactly(transactions)
}
@Test
fun `updates are fired`() {
val future = SettableFuture.create<SignedTransaction>()
transactionStorage.updates.subscribe { tx -> future.set(tx) }
val expected = newTransaction()
transactionStorage.addTransaction(expected)
val actual = future.get(1, TimeUnit.SECONDS)
assertEquals(expected, actual)
}
private fun newTransactionStorage() {
transactionStorage = PerFileTransactionStorage(storeDir)
}
private fun assertTransactionIsRetrievable(transaction: SignedTransaction) {
assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction)
}
private var txCount = 0
private fun newTransaction() = SignedTransaction(
SerializedBytes(Ints.toByteArray(++txCount)),
listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1))))
}

View File

@ -10,6 +10,7 @@ import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.services.persistence.checkpoints
import com.r3corda.node.services.statemachine.StateMachineManager.*
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.initiateSingleShotProtocol
import com.r3corda.testing.node.InMemoryMessagingNetwork
import com.r3corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
@ -73,6 +74,7 @@ class StateMachineManagerTests {
// We push through just enough messages to get only the payload sent
node2.pumpReceive()
node2.disableDBCloseOnStop()
node2.stop()
net.runNetwork()
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1)
@ -95,6 +97,7 @@ class StateMachineManagerTests {
val protocol = NoOpProtocol()
node3.smm.add(protocol)
assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet
node3.disableDBCloseOnStop()
node3.stop()
node3 = net.createNode(node1.info.address, forcedID = node3.id)
@ -103,6 +106,7 @@ class StateMachineManagerTests {
net.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
assertEquals(true, restoredProtocol.protocolStarted) // Now we should have run the protocol and hopefully cleared the init checkpoint
node3.disableDBCloseOnStop()
node3.stop()
// Now it is completed the protocol should leave no Checkpoint.
@ -119,6 +123,7 @@ class StateMachineManagerTests {
node2.smm.add(ReceiveThenSuspendProtocol(node1.info.legalIdentity)) // Prepare checkpointed receive protocol
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
node2.stop() // kill receiver
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1)
assertThat(restoredProtocol.receivedPayloads[0]).isEqualTo(payload)
@ -138,16 +143,22 @@ class StateMachineManagerTests {
// Kick off first send and receive
node2.smm.add(PingPongProtocol(node3.info.legalIdentity, payload))
assertEquals(1, node2.checkpointStorage.checkpoints().size)
databaseTransaction(node2.database) {
assertEquals(1, node2.checkpointStorage.checkpoints().size)
}
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
// Restart node and thus reload the checkpoint and resend the message with same UUID
node2.stop()
databaseTransaction(node2.database) {
assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint
}
val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
node2.manuallyCloseDB()
val (firstAgain, fut1) = node2b.getSingleProtocol<PingPongProtocol>()
// Run the network which will also fire up the second protocol. First message should get deduped. So message data stays in sync.
net.runNetwork()
assertEquals(1, node2.checkpointStorage.checkpoints().size)
node2b.smm.executor.flush()
fut1.get()
@ -156,8 +167,12 @@ class StateMachineManagerTests {
assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way
// can't give a precise value as every addMessageHandler re-runs the undelivered messages
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages")
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
databaseTransaction(node2b.database) {
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
}
databaseTransaction(node3.database) {
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
}
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
assertEquals(payload, secondProtocol.get().receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
@ -253,8 +268,10 @@ class StateMachineManagerTests {
private inline fun <reified P : ProtocolLogic<*>> MockNode.restartAndGetRestoredProtocol(
networkMapNode: MockNode? = null): P {
disableDBCloseOnStop() //Handover DB to new node copy
stop()
val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray())
manuallyCloseDB()
mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine
return newNode.getSingleProtocol<P>().first
}