From d883b3f134cddb12275911340c52eb046c12dd57 Mon Sep 17 00:00:00 2001 From: "rick.parker" <rick.parker@r3cev.com> Date: Tue, 2 Aug 2016 17:06:36 +0100 Subject: [PATCH] First working hand-rolled persistent wallet First working Exposed-assisted persistent wallet Cleaned up Exposed-based persistent wallet Cleaned up warnings Fixed up some generic types Improved comments Fix up TODO comment Hikari and config integration Fix existing tests Clean up after looking at PR Clean up commented out lines Fix initialisation of IRS demo leaving database open Fix up after rebase Review feedback. Main change is lazy wallet iteration. Rebased and incorporated config changes. Use standardised config loading. Make wallet cash test use persistent wallet. Added test to ensure wallet retains state in database across instance creation. Tidy up whitespace and fix bug in test. --- build.gradle | 3 + .../com/r3corda/core/contracts/Structures.kt | 2 +- .../core/contracts/TransactionGraphSearch.kt | 16 ++-- .../r3corda/core/node/services/Services.kt | 2 +- .../node/services/testing/MockServices.kt | 14 ++++ .../core/testing/InMemoryWalletService.kt | 14 ++-- node/build.gradle | 12 +++ node/src/main/kotlin/com/r3corda/node/Main.kt | 27 +------ .../com/r3corda/node/internal/AbstractNode.kt | 24 +++++- .../r3corda/node/internal/testing/MockNode.kt | 13 ++- .../node/services/config/NodeConfiguration.kt | 41 +++++++++- .../node/services/wallet/NodeWalletService.kt | 81 ++++++++++++++++++- .../r3corda/node/utilities/DatabaseSupport.kt | 19 +++++ .../r3corda/node/messaging/AttachmentTests.kt | 2 +- .../node/services/MockServiceHubInternal.kt | 4 +- .../node/services/NodeWalletServiceTest.kt | 71 ++++++++++++++++ .../node/services/WalletWithCashTest.kt | 22 +++-- .../persistence/DataVendingServiceTests.kt | 6 +- src/main/kotlin/com/r3corda/demos/IRSDemo.kt | 22 +++-- .../kotlin/com/r3corda/demos/TraderDemo.kt | 8 +- src/main/resources/reference.conf | 8 +- 21 files changed, 333 insertions(+), 78 deletions(-) create mode 100644 node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt create mode 100644 node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt diff --git a/build.gradle b/build.gradle index 8366a58269..92c30ff52a 100644 --- a/build.gradle +++ b/build.gradle @@ -58,6 +58,9 @@ repositories { url 'http://oss.sonatype.org/content/repositories/snapshots' } jcenter() + maven { + url 'https://dl.bintray.com/kotlin/exposed' + } } sourceSets { diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt b/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt index dada08e978..bfa70fdaa0 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt @@ -280,7 +280,7 @@ data class StateRef(val txhash: SecureHash, val index: Int) { data class StateAndRef<out T : ContractState>(val state: TransactionState<T>, val ref: StateRef) /** Filters a list of [StateAndRef] objects according to the type of the states */ -inline fun <reified T : ContractState> List<StateAndRef<ContractState>>.filterStatesOfType(): List<StateAndRef<T>> { +inline fun <reified T : ContractState> Iterable<StateAndRef<ContractState>>.filterStatesOfType(): List<StateAndRef<T>> { return mapNotNull { if (it.state.data is T) StateAndRef(TransactionState(it.state.data, it.state.notary), it.ref) else null } } diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt b/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt index 9ed20cef31..85aa052d22 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt @@ -19,7 +19,8 @@ import java.util.concurrent.Callable class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage, val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> { class Query( - val withCommandOfType: Class<out CommandData>? = null + val withCommandOfType: Class<out CommandData>? = null, + val followInputsOfType: Class<out ContractState>? = null ) var query: Query = Query() @@ -27,19 +28,22 @@ class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage, override fun call(): List<WireTransaction> { val q = query - val next = ArrayList<SecureHash>() - next += startPoints.flatMap { it.inputs.map { it.txhash } } + val alreadyVisited = HashSet<SecureHash>() + val next = ArrayList<WireTransaction>(startPoints) val results = ArrayList<WireTransaction>() while (next.isNotEmpty()) { - val hash = next.removeAt(next.lastIndex) - val tx = transactions.getTransaction(hash)?.tx ?: continue + val tx = next.removeAt(next.lastIndex) if (q.matches(tx)) results += tx - next += tx.inputs.map { it.txhash } + val inputsLeadingToUnvisitedTx: Iterable<StateRef> = tx.inputs.filter { it.txhash !in alreadyVisited } + val unvisitedInputTxs: Map<SecureHash, SignedTransaction> = inputsLeadingToUnvisitedTx.map { it.txhash }.toHashSet().map { transactions.getTransaction(it) }.filterNotNull().associateBy { it.txBits.hash } + val unvisitedInputTxsWithInputIndex: Iterable<Pair<SignedTransaction, Int>> = inputsLeadingToUnvisitedTx.filter { it.txhash in unvisitedInputTxs.keys }.map { Pair(unvisitedInputTxs[it.txhash]!!, it.index) } + next += (unvisitedInputTxsWithInputIndex.filter { q.followInputsOfType == null || it.first.tx.outputs[it.second].data.javaClass == q.followInputsOfType } + .map { it.first }.filter { alreadyVisited.add(it.txBits.hash) }.map { it.tx }) } return results diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 5a21444757..0bc35c81cc 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -32,7 +32,7 @@ val DEFAULT_SESSION_ID = 0L * Active means they haven't been consumed yet (or we don't know about it). * Relevant means they contain at least one of our pubkeys. */ -class Wallet(val states: List<StateAndRef<ContractState>>) { +class Wallet(val states: Iterable<StateAndRef<ContractState>>) { @Suppress("UNCHECKED_CAST") inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state.data is T } as List<StateAndRef<T>> diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt index 477d55fce0..d5c6665dde 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt @@ -128,3 +128,17 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac override val myLegalIdentityKey: KeyPair = generateKeyPair(), override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) : SingletonSerializeAsToken(), TxWritableStorageService + +/** + * Make properties appropriate for creating a DataSource for unit tests. + * + * @param nodeName Reflects the "instance" of the in-memory database. Defaults to a random string. + */ +fun makeTestDataSourceProperties(nodeName: String = SecureHash.randomSHA256().toString()): Properties { + val props = Properties() + props.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource") + props.setProperty("dataSource.url", "jdbc:h2:mem:${nodeName}_persistence") + props.setProperty("dataSource.user", "sa") + props.setProperty("dataSource.password", "") + return props +} \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt b/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt index edf4ab298b..07f7818e4f 100644 --- a/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt +++ b/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt @@ -21,19 +21,20 @@ import javax.annotation.concurrent.ThreadSafe * states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed. */ @ThreadSafe -open class InMemoryWalletService(private val services: ServiceHub) : SingletonSerializeAsToken(), WalletService { +open class InMemoryWalletService(protected val services: ServiceHub) : SingletonSerializeAsToken(), WalletService { class ClashingThreads(threads: Set<SecureHash>, transactions: Iterable<WireTransaction>) : Exception("There are multiple linear head states after processing transactions $transactions. The clashing thread(s): $threads") - private val log = loggerFor<InMemoryWalletService>() + + open protected val log = loggerFor<InMemoryWalletService>() // Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're // inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference // to wallet somewhere. - private class InnerState { - var wallet = Wallet(emptyList<StateAndRef<OwnableState>>()) + protected class InnerState { + var wallet = Wallet(emptyList<StateAndRef<ContractState>>()) } - private val mutex = ThreadBox(InnerState()) + protected val mutex = ThreadBox(InnerState()) override val currentWallet: Wallet get() = mutex.locked { wallet } @@ -77,6 +78,9 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe Pair(wallet, combinedDelta) } + // TODO: we need to remove the clashing threads concepts and support potential duplicate threads + // because two different nodes can have two different sets of threads and so currently it's possible + // for only one party to have a clash which interferes with determinism of the transactions. val clashingThreads = walletAndNetDelta.first.clashingThreads if (!clashingThreads.isEmpty()) { throw ClashingThreads(clashingThreads, txns) diff --git a/node/build.gradle b/node/build.gradle index 1a182f8e4b..23342d748e 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -8,6 +8,9 @@ repositories { url 'http://oss.sonatype.org/content/repositories/snapshots' } jcenter() + maven { + url 'https://dl.bintray.com/kotlin/exposed' + } } @@ -101,6 +104,15 @@ dependencies { // Unit testing helpers. testCompile 'junit:junit:4.12' testCompile "org.assertj:assertj-core:${assertj_version}" + + // For H2 database support in persistence + compile "com.h2database:h2:1.3.176" + + // Exposed: Kotlin SQL library - under evaluation + compile "org.jetbrains.exposed:exposed:0.5.0" + + // SQL connection pooling library + compile "com.zaxxer:HikariCP:2.4.7" } quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes') diff --git a/node/src/main/kotlin/com/r3corda/node/Main.kt b/node/src/main/kotlin/com/r3corda/node/Main.kt index a489e00ab9..cee6ae1514 100644 --- a/node/src/main/kotlin/com/r3corda/node/Main.kt +++ b/node/src/main/kotlin/com/r3corda/node/Main.kt @@ -1,16 +1,13 @@ package com.r3corda.node import com.r3corda.node.services.config.FullNodeConfiguration -import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigRenderOptions +import com.r3corda.node.services.config.NodeConfiguration import joptsimple.OptionParser import org.slf4j.LoggerFactory -import java.io.File import java.lang.management.ManagementFactory import java.net.InetAddress import java.nio.file.Path import java.nio.file.Paths -import java.util.* val log = LoggerFactory.getLogger("Main") @@ -38,26 +35,8 @@ fun main(args: Array<String>) { } val baseDirectoryPath = if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.baseDirectoryArg)) else Paths.get(".").normalize() - - val defaultConfig = ConfigFactory.parseResources("reference.conf") - - val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) { - File(cmdlineOptions.valueOf(ParamsSpec.configFileArg)) - } else { - baseDirectoryPath.resolve("node.conf").normalize().toFile() - } - val appConfig = ConfigFactory.parseFile(configFile) - - val cmdlineOverrideMap = HashMap<String, Any?>() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map. - if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) { - cmdlineOverrideMap.put("basedir", baseDirectoryPath.toString()) - } - val overrideConfig = ConfigFactory.parseMap(cmdlineOverrideMap) - - val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve() - - log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}") - val conf = FullNodeConfiguration(mergedAndResolvedConfig) + val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.configFileArg)) else null + val conf = FullNodeConfiguration(NodeConfiguration.loadConfig(baseDirectoryPath, configFile)) val dir = conf.basedir.toAbsolutePath().normalize() logInfo(args, dir) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 1eaa3c9b76..c043fdeecd 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -46,7 +46,9 @@ import com.r3corda.node.services.wallet.NodeWalletService import com.r3corda.node.utilities.ANSIProgressObserver import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AffinityExecutor +import com.r3corda.node.utilities.configureDatabase import org.slf4j.Logger +import java.io.Closeable import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path @@ -130,6 +132,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, lateinit var scheduler: SchedulerService lateinit var protocolLogicFactory: ProtocolLogicRefFactory val customServices: ArrayList<Any> = ArrayList() + protected val closeOnStop: ArrayList<Closeable> = ArrayList() /** Locates and returns a service of the given type if loaded, or throws an exception if not found. */ inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single() @@ -155,12 +158,13 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, require(!started) { "Node has already been started" } log.info("Node starting up ...") + initialiseDatabasePersistence() val storageServices = initialiseStorageService(dir) storage = storageServices.first checkpointStorage = storageServices.second net = makeMessagingService() netMapCache = InMemoryNetworkMapCache() - wallet = NodeWalletService(services) + wallet = makeWalletService() identity = makeIdentityService() // Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because @@ -199,6 +203,16 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, return this } + private fun initialiseDatabasePersistence() { + val props = configuration.dataSourceProperties + if (props.isNotEmpty()) { + val (toClose, database) = configureDatabase(props) + // Now log the vendor string as this will also cause a connection to be tested eagerly. + log.info("Connected to ${database.vendor} database.") + closeOnStop += toClose + } + } + private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory { val protocolWhitelist = HashMap<String, Set<String>>() for (plugin in pluginRegistries) { @@ -321,14 +335,20 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, return service } + // TODO: sort out ordering of open & protected modifiers of functions in this class. + protected open fun makeWalletService(): WalletService = NodeWalletService(services) + open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() // to indicate "Please shut down gracefully" vs "Shut down now". // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // unsubscribes us forcibly, rather than blocking the shutdown process. - net.stop() + // Stop in opposite order to starting + for (toClose in closeOnStop.reversed()) { + toClose.close() + } } protected abstract fun makeMessagingService(): MessagingServiceInternal diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt index 5cdaee7462..96598ba171 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt @@ -8,7 +8,10 @@ import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.PhysicalLocation import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.node.services.WalletService import com.r3corda.core.node.services.testing.MockIdentityService +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties +import com.r3corda.core.testing.InMemoryWalletService import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.AbstractNode import com.r3corda.node.services.api.MessagingServiceInternal @@ -84,6 +87,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun makeIdentityService() = MockIdentityService(mockNet.identities) + override fun makeWalletService(): WalletService = InMemoryWalletService(services) + override fun startMessagingService() { // Nothing to do } @@ -109,7 +114,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, /** Returns a node, optionally created by the passed factory method. */ fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory, - start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null, vararg advertisedServices: ServiceType): MockNode { + start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null, + databasePersistence: Boolean = false, vararg advertisedServices: ServiceType): MockNode { val newNode = forcedID == -1 val id = if (newNode) counter++ else forcedID @@ -122,6 +128,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override val nearestCity: String = "Atlantis" override val keyStorePassword: String = "dummy" override val trustStorePassword: String = "trustpass" + override val dataSourceProperties: Properties get() = if (databasePersistence) makeTestDataSourceProperties("node_$id") else Properties() } val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair) if (start) { @@ -160,12 +167,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> { require(nodes.isEmpty()) return Pair( - createNode(null, -1, nodeFactory, true, null, notaryKeyPair, NetworkMapService.Type, SimpleNotaryService.Type), + createNode(null, -1, nodeFactory, true, null, notaryKeyPair, false, NetworkMapService.Type, SimpleNotaryService.Type), createNode(nodes[0].info, -1, nodeFactory, true, null) ) } - fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, NetworkMapService.Type, SimpleNotaryService.Type) + fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, false, NetworkMapService.Type, SimpleNotaryService.Type) fun createPartyNode(networkMapAddr: NodeInfo, legalName: String? = null, keyPair: KeyPair? = null) = createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair) @Suppress("unused") // This is used from the network visualiser tool. diff --git a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt index 3cefe85e1e..902ade9943 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt @@ -12,11 +12,15 @@ import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import com.typesafe.config.ConfigRenderOptions +import org.slf4j.LoggerFactory import java.nio.file.Path import java.nio.file.Paths import java.time.Clock import java.time.Instant import java.time.LocalDate +import java.util.* import kotlin.reflect.KProperty import kotlin.reflect.jvm.javaType @@ -26,6 +30,28 @@ interface NodeConfiguration { val nearestCity: String val keyStorePassword: String val trustStorePassword: String + val dataSourceProperties: Properties get() = Properties() + + companion object { + val log = LoggerFactory.getLogger("NodeConfiguration") + + fun loadConfig(baseDirectoryPath: Path, configFileOverride: Path? = null, allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config { + val defaultConfig = ConfigFactory.parseResources("reference.conf") + + val normalisedBaseDir = baseDirectoryPath.normalize() + val configFile = (configFileOverride?.normalize() ?: normalisedBaseDir.resolve("node.conf")).toFile() + val appConfig = ConfigFactory.parseFile(configFile, ConfigParseOptions.defaults().setAllowMissing(allowMissingConfig)) + + val overridesMap = HashMap<String, Any?>() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map. + overridesMap.putAll(configOverrides) + overridesMap["basedir"] = normalisedBaseDir.toAbsolutePath().toString() + val overrideConfig = ConfigFactory.parseMap(overridesMap) + + val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve() + log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}") + return mergedAndResolvedConfig + } + } } @Suppress("UNCHECKED_CAST") @@ -40,16 +66,27 @@ operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T { Instant::class.java -> Instant.parse(getString(metadata.name)) as T HostAndPort::class.java -> HostAndPort.fromString(getString(metadata.name)) as T Path::class.java -> Paths.get(getString(metadata.name)) as T + Properties::class.java -> getProperties(metadata.name) as T else -> throw IllegalArgumentException("Unsupported type ${metadata.returnType}") } } +fun Config.getProperties(path: String): Properties { + val obj = this.getObject(path) + val props = Properties() + for ((property, objectValue) in obj.entries) { + props.setProperty(property, objectValue.unwrapped().toString()) + } + return props +} + class NodeConfigurationFromConfig(val config: Config = ConfigFactory.load()) : NodeConfiguration { override val myLegalName: String by config override val exportJMXto: String by config override val nearestCity: String by config override val keyStorePassword: String by config override val trustStorePassword: String by config + override val dataSourceProperties: Properties by config } class NameServiceConfig(conf: Config) { @@ -65,6 +102,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { override val exportJMXto: String = "http" override val keyStorePassword: String by conf override val trustStorePassword: String by conf + override val dataSourceProperties: Properties by conf val artemisAddress: HostAndPort by conf val webAddress: HostAndPort by conf val messagingServerAddress: HostAndPort? = if (conf.hasPath("messagingServerAddress")) HostAndPort.fromString(conf.getString("messagingServerAddress")) else null @@ -96,4 +134,5 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { messagingServerAddress ) } -} \ No newline at end of file +} + diff --git a/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt b/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt index b947e20b21..ae6153eecb 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt @@ -1,10 +1,85 @@ package com.r3corda.node.services.wallet +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.StateAndRef +import com.r3corda.core.contracts.StateRef +import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.ServiceHub +import com.r3corda.core.node.services.Wallet import com.r3corda.core.testing.InMemoryWalletService +import com.r3corda.core.utilities.loggerFor +import com.r3corda.core.utilities.trace +import com.r3corda.node.utilities.databaseTransaction +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SchemaUtils.create /** - * Currently, the node wallet service is just the in-memory wallet service until we have finished evaluating and - * selecting a persistence layer (probably an ORM over a SQL DB). + * Currently, the node wallet service is a very simple RDBMS backed implementation. It will change significantly when + * we add further functionality as the design for the wallet and wallet service matures. + * + * TODO: move query / filter criteria into the database query. + * TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time. + * TODO: have transaction storage do some caching. */ -class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services) \ No newline at end of file +class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services) { + + override val log = loggerFor<NodeWalletService>() + + // For now we are just tracking the current state, with no historical reporting ability. + private object UnconsumedStates : Table("vault_unconsumed_states") { + val txhash = binary("transaction_id", 32).primaryKey() + val index = integer("output_index").primaryKey() + } + + init { + // TODO: at some future point, we'll use some schema creation tool to deploy database artifacts if the database + // is not yet initalised to the right version of the schema. + createTablesIfNecessary() + + // Note that our wallet implementation currently does nothing with respect to attempting to apply criteria in the database. + mutex.locked { wallet = Wallet(allUnconsumedStates()) } + + // Now we need to make sure we listen to updates + updates.subscribe { recordUpdate(it) } + } + + private fun recordUpdate(update: Wallet.Update) { + val producedStateRefs = update.produced.map { it.ref } + val consumedStateRefs = update.consumed + log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." } + databaseTransaction { + // Note we also remove the produced in case we are re-inserting in some form of recovery situation. + for (consumed in (consumedStateRefs + producedStateRefs)) { + UnconsumedStates.deleteWhere { + (UnconsumedStates.txhash eq consumed.txhash.bits) and (UnconsumedStates.index eq consumed.index) + } + } + for (produced in producedStateRefs) { + UnconsumedStates.insert { + it[txhash] = produced.txhash.bits + it[index] = produced.index + } + } + } + } + + private fun createTablesIfNecessary() { + log.trace { "Creating database tables if necessary." } + databaseTransaction { + create(UnconsumedStates) + } + } + + private fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> { + // Order by txhash for if and when transaction storage has some caching. + // Map to StateRef and then to StateAndRef. + return databaseTransaction { + UnconsumedStates.selectAll().orderBy(UnconsumedStates.txhash) + .map { StateRef(SecureHash.SHA256(it[UnconsumedStates.txhash]), it[UnconsumedStates.index]) } + .map { + val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.") + StateAndRef(storedTx.tx.outputs[it.index], it) + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt new file mode 100644 index 0000000000..e423b01288 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt @@ -0,0 +1,19 @@ +package com.r3corda.node.utilities + +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.Transaction +import java.io.Closeable +import java.util.* + +fun <T> databaseTransaction(statement: Transaction.() -> T): T = org.jetbrains.exposed.sql.transactions.transaction(statement) + +fun configureDatabase(props: Properties): Pair<Closeable, Database> { + val config = HikariConfig(props) + val dataSource = HikariDataSource(config) + val database = Database.connect(dataSource) + // Check not in read-only mode. + check(!database.metadata.isReadOnly) { "Database should not be readonly." } + return Pair(dataSource, database) +} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt index eddd1f3f94..39d95eb13d 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt @@ -102,7 +102,7 @@ class AttachmentTests { } } } - }, true, null, null, NetworkMapService.Type, SimpleNotaryService.Type) + }, true, null, null, false, NetworkMapService.Type, SimpleNotaryService.Type) val n1 = network.createNode(n0.info) // Insert an attachment into node zero's store directly. diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt index 0f4a20a61a..d1273e6dd6 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt @@ -7,6 +7,7 @@ import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.core.testing.InMemoryWalletService import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MessagingServiceInternal @@ -16,7 +17,6 @@ import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.persistence.DataVending import com.r3corda.node.services.statemachine.StateMachineManager -import com.r3corda.node.services.wallet.NodeWalletService import java.time.Clock @Suppress("LeakingThis") @@ -32,7 +32,7 @@ open class MockServiceHubInternal( val overrideClock: Clock? = NodeClock(), val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory() ) : ServiceHubInternal() { - override val walletService: WalletService = customWallet ?: NodeWalletService(this) + override val walletService: WalletService = customWallet ?: InMemoryWalletService(this) override val keyManagementService: KeyManagementService get() = keyManagement ?: throw UnsupportedOperationException() override val identityService: IdentityService diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt new file mode 100644 index 0000000000..57cb593f6b --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt @@ -0,0 +1,71 @@ +package com.r3corda.node.services + +import com.r3corda.contracts.testing.fillWithSomeTestCash +import com.r3corda.core.contracts.DOLLARS +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.node.services.TxWritableStorageService +import com.r3corda.core.node.services.WalletService +import com.r3corda.core.node.services.testing.MockServices +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties +import com.r3corda.core.testing.DUMMY_NOTARY +import com.r3corda.core.utilities.BriefLogFormatter +import com.r3corda.node.services.wallet.NodeWalletService +import com.r3corda.node.utilities.configureDatabase +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.io.Closeable +import java.util.* + +class NodeWalletServiceTest { + lateinit var dataSource: Closeable + + @Before + fun setUp() { + BriefLogFormatter.loggingOn(NodeWalletService::class) + dataSource = configureDatabase(makeTestDataSourceProperties()).first + } + + @After + fun tearDown() { + dataSource.close() + BriefLogFormatter.loggingOff(NodeWalletService::class) + } + + @Test + fun `states not local to instance`() { + val services1 = object : MockServices() { + override val walletService: WalletService = NodeWalletService(this) + + override fun recordTransactions(txs: Iterable<SignedTransaction>) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + walletService.notify(stx.tx) + } + } + } + services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + + val w1 = services1.walletService.currentWallet + assertThat(w1.states).hasSize(3) + + val originalStorage = services1.storageService + val services2 = object : MockServices() { + override val walletService: WalletService = NodeWalletService(this) + + // We need to be able to find the same transactions as before, too. + override val storageService: TxWritableStorageService get() = originalStorage + + override fun recordTransactions(txs: Iterable<SignedTransaction>) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + walletService.notify(stx.tx) + } + } + } + + val w2 = services2.walletService.currentWallet + assertThat(w2.states).hasSize(3) + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt b/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt index 6742e64e8d..db370b73c6 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt @@ -8,13 +8,16 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.services.WalletService import com.r3corda.core.node.services.testing.MockServices +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties import com.r3corda.core.testing.* import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.node.services.wallet.NodeWalletService +import com.r3corda.node.utilities.configureDatabase import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After import org.junit.Before import org.junit.Test +import java.io.Closeable import java.util.* import kotlin.test.assertEquals import kotlin.test.assertNull @@ -24,10 +27,12 @@ import kotlin.test.assertNull class WalletWithCashTest { lateinit var services: MockServices val wallet: WalletService get() = services.walletService + lateinit var dataSource: Closeable @Before fun setUp() { BriefLogFormatter.loggingOn(NodeWalletService::class) + dataSource = configureDatabase(makeTestDataSourceProperties()).first services = object : MockServices() { override val walletService: WalletService = NodeWalletService(this) @@ -42,6 +47,7 @@ class WalletWithCashTest { @After fun tearDown() { + dataSource.close() BriefLogFormatter.loggingOff(NodeWalletService::class) } @@ -51,14 +57,14 @@ class WalletWithCashTest { services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) val w = wallet.currentWallet - assertEquals(3, w.states.size) + assertEquals(3, w.states.toList().size) - val state = w.states[0].state.data as Cash.State + val state = w.states.toList()[0].state.data as Cash.State assertEquals(29.01.DOLLARS `issued by` DUMMY_CASH_ISSUER, state.amount) assertEquals(services.key.public, state.owner) - assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[2].state.data as Cash.State).amount) - assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[1].state.data as Cash.State).amount) + assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[2].state.data as Cash.State).amount) + assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[1].state.data as Cash.State).amount) } @Test @@ -109,7 +115,7 @@ class WalletWithCashTest { }.toSignedTransaction() wallet.notify(dummyIssue.tx) - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) // Issue another linear state of the same thread (nonce different) val dummyIssue2 = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { @@ -120,7 +126,7 @@ class WalletWithCashTest { assertThatThrownBy { wallet.notify(dummyIssue2.tx) } - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) } @Test @@ -136,7 +142,7 @@ class WalletWithCashTest { }.toSignedTransaction() wallet.notify(dummyIssue.tx) - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) // Move the same state val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { @@ -146,6 +152,6 @@ class WalletWithCashTest { }.toSignedTransaction() wallet.notify(dummyMove.tx) - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt index f09ebb49ce..d80038d14f 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt @@ -45,7 +45,7 @@ class DataVendingServiceTests { // Complete the cash transaction, and then manually relay it ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) val tx = ptx.toSignedTransaction() - assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, walletServiceNode.info, tx) @@ -77,7 +77,7 @@ class DataVendingServiceTests { // The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) val tx = ptx.toSignedTransaction(false) - assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, walletServiceNode.info, tx) @@ -87,6 +87,6 @@ class DataVendingServiceTests { assertTrue(ex.cause is DataVending.Service.TransactionRejectedError) // Check the transaction is not in the receiving node - assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) } } \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index 51dbf3a2a3..c760b2951e 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -26,13 +26,11 @@ import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService -import com.typesafe.config.ConfigFactory import joptsimple.OptionParser import joptsimple.OptionSet import org.apache.commons.io.IOUtils import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.io.File import java.net.URL import java.nio.file.Files import java.nio.file.Path @@ -308,8 +306,8 @@ private fun setup(params: CliParams.SetupNode): Int { dirFile.mkdirs() } - val configFile = params.dir.resolve("config").toFile() - val config = loadConfigFile(configFile, params.defaultLegalName) + val configFile = params.dir.resolve("config") + val config = loadConfigFile(params.dir, configFile, params.defaultLegalName) if (!Files.exists(params.dir.resolve(AbstractNode.PUBLIC_IDENTITY_FILE_NAME))) { createIdentities(params, config) } @@ -445,18 +443,16 @@ private fun getNodeConfig(cliParams: CliParams.RunNode): NodeConfiguration { throw NotSetupException("Missing identity file. Please run node setup before running the node") } - val configFile = cliParams.dir.resolve("config").toFile() - return loadConfigFile(configFile, cliParams.defaultLegalName) + val configFile = cliParams.dir.resolve("config") + return loadConfigFile(cliParams.dir, configFile, cliParams.defaultLegalName) } -private fun loadConfigFile(configFile: File, defaultLegalName: String): NodeConfiguration { - if (!configFile.exists()) { +private fun loadConfigFile(baseDir: Path, configFile: Path, defaultLegalName: String): NodeConfiguration { + if (!Files.exists(configFile)) { createDefaultConfigFile(configFile, defaultLegalName) log.warn("Default config created at $configFile.") } - - val config = ConfigFactory.parseFile(configFile).withFallback(ConfigFactory.load()) - return NodeConfigurationFromConfig(config) + return NodeConfigurationFromConfig(NodeConfiguration.loadConfig(baseDir, configFileOverride = configFile)) } private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfiguration) { @@ -466,8 +462,8 @@ private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfigur node.stop() } -private fun createDefaultConfigFile(configFile: File, legalName: String) { - configFile.writeBytes( +private fun createDefaultConfigFile(configFile: Path, legalName: String) { + Files.write(configFile, """ myLegalName = $legalName """.trimIndent().toByteArray()) diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 2f5da40976..00b373b4b2 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -23,6 +23,7 @@ import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.ProgressTracker import com.r3corda.node.internal.Node +import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService @@ -30,7 +31,6 @@ import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.protocols.NotaryProtocol import com.r3corda.protocols.TwoPartyTradeProtocol -import com.typesafe.config.ConfigFactory import joptsimple.OptionParser import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -126,8 +126,7 @@ fun runTraderDemo(args: Array<String>): Int { Role.BUYER -> "Bank A" Role.SELLER -> "Bank B" } - val override = ConfigFactory.parseString("myLegalName = $myLegalName") - NodeConfigurationFromConfig(override.withFallback(ConfigFactory.load())) + NodeConfigurationFromConfig(NodeConfiguration.loadConfig(directory, allowMissingConfig = true, configOverrides = mapOf("myLegalName" to myLegalName))) } // Which services will this instance of the node provide to the network? @@ -275,7 +274,8 @@ private class TraderDemoProtocolBuyer(val otherSide: Party, private fun logIssuanceAttachment(tradeTX: SignedTransaction) { // Find the original CP issuance. val search = TransactionGraphSearch(serviceHub.storageService.validatedTransactions, listOf(tradeTX.tx)) - search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java) + search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java, + followInputsOfType = CommercialPaper.State::class.java) val cpIssuance = search.call().single() cpIssuance.attachments.first().let { diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 27851335e1..f4a9758acc 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -2,4 +2,10 @@ myLegalName = "Vast Global MegaCorp, Ltd" exportJMXto = "http" nearestCity = "The Moon" keyStorePassword = "cordacadevpass" -trustStorePassword = "trustpass" \ No newline at end of file +trustStorePassword = "trustpass" +dataSourceProperties = { + dataSourceClassName = org.h2.jdbcx.JdbcDataSource + "dataSource.url" = "jdbc:h2:"${basedir}"/persistence" + "dataSource.user" = sa + "dataSource.password" = "" +} \ No newline at end of file