diff --git a/build.gradle b/build.gradle index 8366a58269..92c30ff52a 100644 --- a/build.gradle +++ b/build.gradle @@ -58,6 +58,9 @@ repositories { url 'http://oss.sonatype.org/content/repositories/snapshots' } jcenter() + maven { + url 'https://dl.bintray.com/kotlin/exposed' + } } sourceSets { diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt b/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt index dada08e978..bfa70fdaa0 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt @@ -280,7 +280,7 @@ data class StateRef(val txhash: SecureHash, val index: Int) { data class StateAndRef(val state: TransactionState, val ref: StateRef) /** Filters a list of [StateAndRef] objects according to the type of the states */ -inline fun List>.filterStatesOfType(): List> { +inline fun Iterable>.filterStatesOfType(): List> { return mapNotNull { if (it.state.data is T) StateAndRef(TransactionState(it.state.data, it.state.notary), it.ref) else null } } diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt b/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt index 9ed20cef31..85aa052d22 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt @@ -19,7 +19,8 @@ import java.util.concurrent.Callable class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage, val startPoints: List) : Callable> { class Query( - val withCommandOfType: Class? = null + val withCommandOfType: Class? = null, + val followInputsOfType: Class? = null ) var query: Query = Query() @@ -27,19 +28,22 @@ class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage, override fun call(): List { val q = query - val next = ArrayList() - next += startPoints.flatMap { it.inputs.map { it.txhash } } + val alreadyVisited = HashSet() + val next = ArrayList(startPoints) val results = ArrayList() while (next.isNotEmpty()) { - val hash = next.removeAt(next.lastIndex) - val tx = transactions.getTransaction(hash)?.tx ?: continue + val tx = next.removeAt(next.lastIndex) if (q.matches(tx)) results += tx - next += tx.inputs.map { it.txhash } + val inputsLeadingToUnvisitedTx: Iterable = tx.inputs.filter { it.txhash !in alreadyVisited } + val unvisitedInputTxs: Map = inputsLeadingToUnvisitedTx.map { it.txhash }.toHashSet().map { transactions.getTransaction(it) }.filterNotNull().associateBy { it.txBits.hash } + val unvisitedInputTxsWithInputIndex: Iterable> = inputsLeadingToUnvisitedTx.filter { it.txhash in unvisitedInputTxs.keys }.map { Pair(unvisitedInputTxs[it.txhash]!!, it.index) } + next += (unvisitedInputTxsWithInputIndex.filter { q.followInputsOfType == null || it.first.tx.outputs[it.second].data.javaClass == q.followInputsOfType } + .map { it.first }.filter { alreadyVisited.add(it.txBits.hash) }.map { it.tx }) } return results diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 5a21444757..0bc35c81cc 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -32,7 +32,7 @@ val DEFAULT_SESSION_ID = 0L * Active means they haven't been consumed yet (or we don't know about it). * Relevant means they contain at least one of our pubkeys. */ -class Wallet(val states: List>) { +class Wallet(val states: Iterable>) { @Suppress("UNCHECKED_CAST") inline fun statesOfType() = states.filter { it.state.data is T } as List> diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt index 477d55fce0..d5c6665dde 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt @@ -128,3 +128,17 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac override val myLegalIdentityKey: KeyPair = generateKeyPair(), override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) : SingletonSerializeAsToken(), TxWritableStorageService + +/** + * Make properties appropriate for creating a DataSource for unit tests. + * + * @param nodeName Reflects the "instance" of the in-memory database. Defaults to a random string. + */ +fun makeTestDataSourceProperties(nodeName: String = SecureHash.randomSHA256().toString()): Properties { + val props = Properties() + props.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource") + props.setProperty("dataSource.url", "jdbc:h2:mem:${nodeName}_persistence") + props.setProperty("dataSource.user", "sa") + props.setProperty("dataSource.password", "") + return props +} \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt b/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt index edf4ab298b..07f7818e4f 100644 --- a/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt +++ b/core/src/main/kotlin/com/r3corda/core/testing/InMemoryWalletService.kt @@ -21,19 +21,20 @@ import javax.annotation.concurrent.ThreadSafe * states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed. */ @ThreadSafe -open class InMemoryWalletService(private val services: ServiceHub) : SingletonSerializeAsToken(), WalletService { +open class InMemoryWalletService(protected val services: ServiceHub) : SingletonSerializeAsToken(), WalletService { class ClashingThreads(threads: Set, transactions: Iterable) : Exception("There are multiple linear head states after processing transactions $transactions. The clashing thread(s): $threads") - private val log = loggerFor() + + open protected val log = loggerFor() // Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're // inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference // to wallet somewhere. - private class InnerState { - var wallet = Wallet(emptyList>()) + protected class InnerState { + var wallet = Wallet(emptyList>()) } - private val mutex = ThreadBox(InnerState()) + protected val mutex = ThreadBox(InnerState()) override val currentWallet: Wallet get() = mutex.locked { wallet } @@ -77,6 +78,9 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe Pair(wallet, combinedDelta) } + // TODO: we need to remove the clashing threads concepts and support potential duplicate threads + // because two different nodes can have two different sets of threads and so currently it's possible + // for only one party to have a clash which interferes with determinism of the transactions. val clashingThreads = walletAndNetDelta.first.clashingThreads if (!clashingThreads.isEmpty()) { throw ClashingThreads(clashingThreads, txns) diff --git a/node/build.gradle b/node/build.gradle index 1a182f8e4b..23342d748e 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -8,6 +8,9 @@ repositories { url 'http://oss.sonatype.org/content/repositories/snapshots' } jcenter() + maven { + url 'https://dl.bintray.com/kotlin/exposed' + } } @@ -101,6 +104,15 @@ dependencies { // Unit testing helpers. testCompile 'junit:junit:4.12' testCompile "org.assertj:assertj-core:${assertj_version}" + + // For H2 database support in persistence + compile "com.h2database:h2:1.3.176" + + // Exposed: Kotlin SQL library - under evaluation + compile "org.jetbrains.exposed:exposed:0.5.0" + + // SQL connection pooling library + compile "com.zaxxer:HikariCP:2.4.7" } quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes') diff --git a/node/src/main/kotlin/com/r3corda/node/Main.kt b/node/src/main/kotlin/com/r3corda/node/Main.kt index a489e00ab9..cee6ae1514 100644 --- a/node/src/main/kotlin/com/r3corda/node/Main.kt +++ b/node/src/main/kotlin/com/r3corda/node/Main.kt @@ -1,16 +1,13 @@ package com.r3corda.node import com.r3corda.node.services.config.FullNodeConfiguration -import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigRenderOptions +import com.r3corda.node.services.config.NodeConfiguration import joptsimple.OptionParser import org.slf4j.LoggerFactory -import java.io.File import java.lang.management.ManagementFactory import java.net.InetAddress import java.nio.file.Path import java.nio.file.Paths -import java.util.* val log = LoggerFactory.getLogger("Main") @@ -38,26 +35,8 @@ fun main(args: Array) { } val baseDirectoryPath = if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.baseDirectoryArg)) else Paths.get(".").normalize() - - val defaultConfig = ConfigFactory.parseResources("reference.conf") - - val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) { - File(cmdlineOptions.valueOf(ParamsSpec.configFileArg)) - } else { - baseDirectoryPath.resolve("node.conf").normalize().toFile() - } - val appConfig = ConfigFactory.parseFile(configFile) - - val cmdlineOverrideMap = HashMap() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map. - if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) { - cmdlineOverrideMap.put("basedir", baseDirectoryPath.toString()) - } - val overrideConfig = ConfigFactory.parseMap(cmdlineOverrideMap) - - val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve() - - log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}") - val conf = FullNodeConfiguration(mergedAndResolvedConfig) + val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.configFileArg)) else null + val conf = FullNodeConfiguration(NodeConfiguration.loadConfig(baseDirectoryPath, configFile)) val dir = conf.basedir.toAbsolutePath().normalize() logInfo(args, dir) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 1eaa3c9b76..c043fdeecd 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -46,7 +46,9 @@ import com.r3corda.node.services.wallet.NodeWalletService import com.r3corda.node.utilities.ANSIProgressObserver import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AffinityExecutor +import com.r3corda.node.utilities.configureDatabase import org.slf4j.Logger +import java.io.Closeable import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path @@ -130,6 +132,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, lateinit var scheduler: SchedulerService lateinit var protocolLogicFactory: ProtocolLogicRefFactory val customServices: ArrayList = ArrayList() + protected val closeOnStop: ArrayList = ArrayList() /** Locates and returns a service of the given type if loaded, or throws an exception if not found. */ inline fun findService() = customServices.filterIsInstance().single() @@ -155,12 +158,13 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, require(!started) { "Node has already been started" } log.info("Node starting up ...") + initialiseDatabasePersistence() val storageServices = initialiseStorageService(dir) storage = storageServices.first checkpointStorage = storageServices.second net = makeMessagingService() netMapCache = InMemoryNetworkMapCache() - wallet = NodeWalletService(services) + wallet = makeWalletService() identity = makeIdentityService() // Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because @@ -199,6 +203,16 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, return this } + private fun initialiseDatabasePersistence() { + val props = configuration.dataSourceProperties + if (props.isNotEmpty()) { + val (toClose, database) = configureDatabase(props) + // Now log the vendor string as this will also cause a connection to be tested eagerly. + log.info("Connected to ${database.vendor} database.") + closeOnStop += toClose + } + } + private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory { val protocolWhitelist = HashMap>() for (plugin in pluginRegistries) { @@ -321,14 +335,20 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, return service } + // TODO: sort out ordering of open & protected modifiers of functions in this class. + protected open fun makeWalletService(): WalletService = NodeWalletService(services) + open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() // to indicate "Please shut down gracefully" vs "Shut down now". // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // unsubscribes us forcibly, rather than blocking the shutdown process. - net.stop() + // Stop in opposite order to starting + for (toClose in closeOnStop.reversed()) { + toClose.close() + } } protected abstract fun makeMessagingService(): MessagingServiceInternal diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt index 5cdaee7462..96598ba171 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt @@ -8,7 +8,10 @@ import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.PhysicalLocation import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.node.services.WalletService import com.r3corda.core.node.services.testing.MockIdentityService +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties +import com.r3corda.core.testing.InMemoryWalletService import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.AbstractNode import com.r3corda.node.services.api.MessagingServiceInternal @@ -84,6 +87,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun makeIdentityService() = MockIdentityService(mockNet.identities) + override fun makeWalletService(): WalletService = InMemoryWalletService(services) + override fun startMessagingService() { // Nothing to do } @@ -109,7 +114,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, /** Returns a node, optionally created by the passed factory method. */ fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory, - start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null, vararg advertisedServices: ServiceType): MockNode { + start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null, + databasePersistence: Boolean = false, vararg advertisedServices: ServiceType): MockNode { val newNode = forcedID == -1 val id = if (newNode) counter++ else forcedID @@ -122,6 +128,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override val nearestCity: String = "Atlantis" override val keyStorePassword: String = "dummy" override val trustStorePassword: String = "trustpass" + override val dataSourceProperties: Properties get() = if (databasePersistence) makeTestDataSourceProperties("node_$id") else Properties() } val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair) if (start) { @@ -160,12 +167,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair { require(nodes.isEmpty()) return Pair( - createNode(null, -1, nodeFactory, true, null, notaryKeyPair, NetworkMapService.Type, SimpleNotaryService.Type), + createNode(null, -1, nodeFactory, true, null, notaryKeyPair, false, NetworkMapService.Type, SimpleNotaryService.Type), createNode(nodes[0].info, -1, nodeFactory, true, null) ) } - fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, NetworkMapService.Type, SimpleNotaryService.Type) + fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, false, NetworkMapService.Type, SimpleNotaryService.Type) fun createPartyNode(networkMapAddr: NodeInfo, legalName: String? = null, keyPair: KeyPair? = null) = createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair) @Suppress("unused") // This is used from the network visualiser tool. diff --git a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt index 3cefe85e1e..902ade9943 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt @@ -12,11 +12,15 @@ import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import com.typesafe.config.ConfigRenderOptions +import org.slf4j.LoggerFactory import java.nio.file.Path import java.nio.file.Paths import java.time.Clock import java.time.Instant import java.time.LocalDate +import java.util.* import kotlin.reflect.KProperty import kotlin.reflect.jvm.javaType @@ -26,6 +30,28 @@ interface NodeConfiguration { val nearestCity: String val keyStorePassword: String val trustStorePassword: String + val dataSourceProperties: Properties get() = Properties() + + companion object { + val log = LoggerFactory.getLogger("NodeConfiguration") + + fun loadConfig(baseDirectoryPath: Path, configFileOverride: Path? = null, allowMissingConfig: Boolean = false, configOverrides: Map = emptyMap()): Config { + val defaultConfig = ConfigFactory.parseResources("reference.conf") + + val normalisedBaseDir = baseDirectoryPath.normalize() + val configFile = (configFileOverride?.normalize() ?: normalisedBaseDir.resolve("node.conf")).toFile() + val appConfig = ConfigFactory.parseFile(configFile, ConfigParseOptions.defaults().setAllowMissing(allowMissingConfig)) + + val overridesMap = HashMap() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map. + overridesMap.putAll(configOverrides) + overridesMap["basedir"] = normalisedBaseDir.toAbsolutePath().toString() + val overrideConfig = ConfigFactory.parseMap(overridesMap) + + val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve() + log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}") + return mergedAndResolvedConfig + } + } } @Suppress("UNCHECKED_CAST") @@ -40,16 +66,27 @@ operator fun Config.getValue(receiver: Any, metadata: KProperty<*>): T { Instant::class.java -> Instant.parse(getString(metadata.name)) as T HostAndPort::class.java -> HostAndPort.fromString(getString(metadata.name)) as T Path::class.java -> Paths.get(getString(metadata.name)) as T + Properties::class.java -> getProperties(metadata.name) as T else -> throw IllegalArgumentException("Unsupported type ${metadata.returnType}") } } +fun Config.getProperties(path: String): Properties { + val obj = this.getObject(path) + val props = Properties() + for ((property, objectValue) in obj.entries) { + props.setProperty(property, objectValue.unwrapped().toString()) + } + return props +} + class NodeConfigurationFromConfig(val config: Config = ConfigFactory.load()) : NodeConfiguration { override val myLegalName: String by config override val exportJMXto: String by config override val nearestCity: String by config override val keyStorePassword: String by config override val trustStorePassword: String by config + override val dataSourceProperties: Properties by config } class NameServiceConfig(conf: Config) { @@ -65,6 +102,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { override val exportJMXto: String = "http" override val keyStorePassword: String by conf override val trustStorePassword: String by conf + override val dataSourceProperties: Properties by conf val artemisAddress: HostAndPort by conf val webAddress: HostAndPort by conf val messagingServerAddress: HostAndPort? = if (conf.hasPath("messagingServerAddress")) HostAndPort.fromString(conf.getString("messagingServerAddress")) else null @@ -96,4 +134,5 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { messagingServerAddress ) } -} \ No newline at end of file +} + diff --git a/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt b/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt index b947e20b21..ae6153eecb 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt @@ -1,10 +1,85 @@ package com.r3corda.node.services.wallet +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.StateAndRef +import com.r3corda.core.contracts.StateRef +import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.ServiceHub +import com.r3corda.core.node.services.Wallet import com.r3corda.core.testing.InMemoryWalletService +import com.r3corda.core.utilities.loggerFor +import com.r3corda.core.utilities.trace +import com.r3corda.node.utilities.databaseTransaction +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SchemaUtils.create /** - * Currently, the node wallet service is just the in-memory wallet service until we have finished evaluating and - * selecting a persistence layer (probably an ORM over a SQL DB). + * Currently, the node wallet service is a very simple RDBMS backed implementation. It will change significantly when + * we add further functionality as the design for the wallet and wallet service matures. + * + * TODO: move query / filter criteria into the database query. + * TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time. + * TODO: have transaction storage do some caching. */ -class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services) \ No newline at end of file +class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services) { + + override val log = loggerFor() + + // For now we are just tracking the current state, with no historical reporting ability. + private object UnconsumedStates : Table("vault_unconsumed_states") { + val txhash = binary("transaction_id", 32).primaryKey() + val index = integer("output_index").primaryKey() + } + + init { + // TODO: at some future point, we'll use some schema creation tool to deploy database artifacts if the database + // is not yet initalised to the right version of the schema. + createTablesIfNecessary() + + // Note that our wallet implementation currently does nothing with respect to attempting to apply criteria in the database. + mutex.locked { wallet = Wallet(allUnconsumedStates()) } + + // Now we need to make sure we listen to updates + updates.subscribe { recordUpdate(it) } + } + + private fun recordUpdate(update: Wallet.Update) { + val producedStateRefs = update.produced.map { it.ref } + val consumedStateRefs = update.consumed + log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." } + databaseTransaction { + // Note we also remove the produced in case we are re-inserting in some form of recovery situation. + for (consumed in (consumedStateRefs + producedStateRefs)) { + UnconsumedStates.deleteWhere { + (UnconsumedStates.txhash eq consumed.txhash.bits) and (UnconsumedStates.index eq consumed.index) + } + } + for (produced in producedStateRefs) { + UnconsumedStates.insert { + it[txhash] = produced.txhash.bits + it[index] = produced.index + } + } + } + } + + private fun createTablesIfNecessary() { + log.trace { "Creating database tables if necessary." } + databaseTransaction { + create(UnconsumedStates) + } + } + + private fun allUnconsumedStates(): Iterable> { + // Order by txhash for if and when transaction storage has some caching. + // Map to StateRef and then to StateAndRef. + return databaseTransaction { + UnconsumedStates.selectAll().orderBy(UnconsumedStates.txhash) + .map { StateRef(SecureHash.SHA256(it[UnconsumedStates.txhash]), it[UnconsumedStates.index]) } + .map { + val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.") + StateAndRef(storedTx.tx.outputs[it.index], it) + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt new file mode 100644 index 0000000000..e423b01288 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt @@ -0,0 +1,19 @@ +package com.r3corda.node.utilities + +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.Transaction +import java.io.Closeable +import java.util.* + +fun databaseTransaction(statement: Transaction.() -> T): T = org.jetbrains.exposed.sql.transactions.transaction(statement) + +fun configureDatabase(props: Properties): Pair { + val config = HikariConfig(props) + val dataSource = HikariDataSource(config) + val database = Database.connect(dataSource) + // Check not in read-only mode. + check(!database.metadata.isReadOnly) { "Database should not be readonly." } + return Pair(dataSource, database) +} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt index eddd1f3f94..39d95eb13d 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt @@ -102,7 +102,7 @@ class AttachmentTests { } } } - }, true, null, null, NetworkMapService.Type, SimpleNotaryService.Type) + }, true, null, null, false, NetworkMapService.Type, SimpleNotaryService.Type) val n1 = network.createNode(n0.info) // Insert an attachment into node zero's store directly. diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt index 0f4a20a61a..d1273e6dd6 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt @@ -7,6 +7,7 @@ import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.core.testing.InMemoryWalletService import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MessagingServiceInternal @@ -16,7 +17,6 @@ import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.persistence.DataVending import com.r3corda.node.services.statemachine.StateMachineManager -import com.r3corda.node.services.wallet.NodeWalletService import java.time.Clock @Suppress("LeakingThis") @@ -32,7 +32,7 @@ open class MockServiceHubInternal( val overrideClock: Clock? = NodeClock(), val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory() ) : ServiceHubInternal() { - override val walletService: WalletService = customWallet ?: NodeWalletService(this) + override val walletService: WalletService = customWallet ?: InMemoryWalletService(this) override val keyManagementService: KeyManagementService get() = keyManagement ?: throw UnsupportedOperationException() override val identityService: IdentityService diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt new file mode 100644 index 0000000000..57cb593f6b --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeWalletServiceTest.kt @@ -0,0 +1,71 @@ +package com.r3corda.node.services + +import com.r3corda.contracts.testing.fillWithSomeTestCash +import com.r3corda.core.contracts.DOLLARS +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.node.services.TxWritableStorageService +import com.r3corda.core.node.services.WalletService +import com.r3corda.core.node.services.testing.MockServices +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties +import com.r3corda.core.testing.DUMMY_NOTARY +import com.r3corda.core.utilities.BriefLogFormatter +import com.r3corda.node.services.wallet.NodeWalletService +import com.r3corda.node.utilities.configureDatabase +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.io.Closeable +import java.util.* + +class NodeWalletServiceTest { + lateinit var dataSource: Closeable + + @Before + fun setUp() { + BriefLogFormatter.loggingOn(NodeWalletService::class) + dataSource = configureDatabase(makeTestDataSourceProperties()).first + } + + @After + fun tearDown() { + dataSource.close() + BriefLogFormatter.loggingOff(NodeWalletService::class) + } + + @Test + fun `states not local to instance`() { + val services1 = object : MockServices() { + override val walletService: WalletService = NodeWalletService(this) + + override fun recordTransactions(txs: Iterable) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + walletService.notify(stx.tx) + } + } + } + services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + + val w1 = services1.walletService.currentWallet + assertThat(w1.states).hasSize(3) + + val originalStorage = services1.storageService + val services2 = object : MockServices() { + override val walletService: WalletService = NodeWalletService(this) + + // We need to be able to find the same transactions as before, too. + override val storageService: TxWritableStorageService get() = originalStorage + + override fun recordTransactions(txs: Iterable) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + walletService.notify(stx.tx) + } + } + } + + val w2 = services2.walletService.currentWallet + assertThat(w2.states).hasSize(3) + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt b/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt index 6742e64e8d..db370b73c6 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/WalletWithCashTest.kt @@ -8,13 +8,16 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.services.WalletService import com.r3corda.core.node.services.testing.MockServices +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties import com.r3corda.core.testing.* import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.node.services.wallet.NodeWalletService +import com.r3corda.node.utilities.configureDatabase import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After import org.junit.Before import org.junit.Test +import java.io.Closeable import java.util.* import kotlin.test.assertEquals import kotlin.test.assertNull @@ -24,10 +27,12 @@ import kotlin.test.assertNull class WalletWithCashTest { lateinit var services: MockServices val wallet: WalletService get() = services.walletService + lateinit var dataSource: Closeable @Before fun setUp() { BriefLogFormatter.loggingOn(NodeWalletService::class) + dataSource = configureDatabase(makeTestDataSourceProperties()).first services = object : MockServices() { override val walletService: WalletService = NodeWalletService(this) @@ -42,6 +47,7 @@ class WalletWithCashTest { @After fun tearDown() { + dataSource.close() BriefLogFormatter.loggingOff(NodeWalletService::class) } @@ -51,14 +57,14 @@ class WalletWithCashTest { services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) val w = wallet.currentWallet - assertEquals(3, w.states.size) + assertEquals(3, w.states.toList().size) - val state = w.states[0].state.data as Cash.State + val state = w.states.toList()[0].state.data as Cash.State assertEquals(29.01.DOLLARS `issued by` DUMMY_CASH_ISSUER, state.amount) assertEquals(services.key.public, state.owner) - assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[2].state.data as Cash.State).amount) - assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[1].state.data as Cash.State).amount) + assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[2].state.data as Cash.State).amount) + assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[1].state.data as Cash.State).amount) } @Test @@ -109,7 +115,7 @@ class WalletWithCashTest { }.toSignedTransaction() wallet.notify(dummyIssue.tx) - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) // Issue another linear state of the same thread (nonce different) val dummyIssue2 = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { @@ -120,7 +126,7 @@ class WalletWithCashTest { assertThatThrownBy { wallet.notify(dummyIssue2.tx) } - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) } @Test @@ -136,7 +142,7 @@ class WalletWithCashTest { }.toSignedTransaction() wallet.notify(dummyIssue.tx) - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) // Move the same state val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { @@ -146,6 +152,6 @@ class WalletWithCashTest { }.toSignedTransaction() wallet.notify(dummyMove.tx) - assertEquals(1, wallet.currentWallet.states.size) + assertEquals(1, wallet.currentWallet.states.toList().size) } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt index f09ebb49ce..d80038d14f 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt @@ -45,7 +45,7 @@ class DataVendingServiceTests { // Complete the cash transaction, and then manually relay it ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) val tx = ptx.toSignedTransaction() - assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, walletServiceNode.info, tx) @@ -77,7 +77,7 @@ class DataVendingServiceTests { // The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) val tx = ptx.toSignedTransaction(false) - assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, walletServiceNode.info, tx) @@ -87,6 +87,6 @@ class DataVendingServiceTests { assertTrue(ex.cause is DataVending.Service.TransactionRejectedError) // Check the transaction is not in the receiving node - assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) } } \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index 51dbf3a2a3..c760b2951e 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -26,13 +26,11 @@ import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService -import com.typesafe.config.ConfigFactory import joptsimple.OptionParser import joptsimple.OptionSet import org.apache.commons.io.IOUtils import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.io.File import java.net.URL import java.nio.file.Files import java.nio.file.Path @@ -308,8 +306,8 @@ private fun setup(params: CliParams.SetupNode): Int { dirFile.mkdirs() } - val configFile = params.dir.resolve("config").toFile() - val config = loadConfigFile(configFile, params.defaultLegalName) + val configFile = params.dir.resolve("config") + val config = loadConfigFile(params.dir, configFile, params.defaultLegalName) if (!Files.exists(params.dir.resolve(AbstractNode.PUBLIC_IDENTITY_FILE_NAME))) { createIdentities(params, config) } @@ -445,18 +443,16 @@ private fun getNodeConfig(cliParams: CliParams.RunNode): NodeConfiguration { throw NotSetupException("Missing identity file. Please run node setup before running the node") } - val configFile = cliParams.dir.resolve("config").toFile() - return loadConfigFile(configFile, cliParams.defaultLegalName) + val configFile = cliParams.dir.resolve("config") + return loadConfigFile(cliParams.dir, configFile, cliParams.defaultLegalName) } -private fun loadConfigFile(configFile: File, defaultLegalName: String): NodeConfiguration { - if (!configFile.exists()) { +private fun loadConfigFile(baseDir: Path, configFile: Path, defaultLegalName: String): NodeConfiguration { + if (!Files.exists(configFile)) { createDefaultConfigFile(configFile, defaultLegalName) log.warn("Default config created at $configFile.") } - - val config = ConfigFactory.parseFile(configFile).withFallback(ConfigFactory.load()) - return NodeConfigurationFromConfig(config) + return NodeConfigurationFromConfig(NodeConfiguration.loadConfig(baseDir, configFileOverride = configFile)) } private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfiguration) { @@ -466,8 +462,8 @@ private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfigur node.stop() } -private fun createDefaultConfigFile(configFile: File, legalName: String) { - configFile.writeBytes( +private fun createDefaultConfigFile(configFile: Path, legalName: String) { + Files.write(configFile, """ myLegalName = $legalName """.trimIndent().toByteArray()) diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 2f5da40976..00b373b4b2 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -23,6 +23,7 @@ import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.ProgressTracker import com.r3corda.node.internal.Node +import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService @@ -30,7 +31,6 @@ import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.protocols.NotaryProtocol import com.r3corda.protocols.TwoPartyTradeProtocol -import com.typesafe.config.ConfigFactory import joptsimple.OptionParser import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -126,8 +126,7 @@ fun runTraderDemo(args: Array): Int { Role.BUYER -> "Bank A" Role.SELLER -> "Bank B" } - val override = ConfigFactory.parseString("myLegalName = $myLegalName") - NodeConfigurationFromConfig(override.withFallback(ConfigFactory.load())) + NodeConfigurationFromConfig(NodeConfiguration.loadConfig(directory, allowMissingConfig = true, configOverrides = mapOf("myLegalName" to myLegalName))) } // Which services will this instance of the node provide to the network? @@ -275,7 +274,8 @@ private class TraderDemoProtocolBuyer(val otherSide: Party, private fun logIssuanceAttachment(tradeTX: SignedTransaction) { // Find the original CP issuance. val search = TransactionGraphSearch(serviceHub.storageService.validatedTransactions, listOf(tradeTX.tx)) - search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java) + search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java, + followInputsOfType = CommercialPaper.State::class.java) val cpIssuance = search.call().single() cpIssuance.attachments.first().let { diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 27851335e1..f4a9758acc 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -2,4 +2,10 @@ myLegalName = "Vast Global MegaCorp, Ltd" exportJMXto = "http" nearestCity = "The Moon" keyStorePassword = "cordacadevpass" -trustStorePassword = "trustpass" \ No newline at end of file +trustStorePassword = "trustpass" +dataSourceProperties = { + dataSourceClassName = org.h2.jdbcx.JdbcDataSource + "dataSource.url" = "jdbc:h2:"${basedir}"/persistence" + "dataSource.user" = sa + "dataSource.password" = "" +} \ No newline at end of file