mirror of
https://github.com/corda/corda.git
synced 2025-01-20 11:39:09 +00:00
Merged in cor-258-wallet-persistence (pull request #265)
Basic wallet persistence to embedded H2 disk-backed database
This commit is contained in:
commit
8bc3bdd430
@ -58,6 +58,9 @@ repositories {
|
||||
url 'http://oss.sonatype.org/content/repositories/snapshots'
|
||||
}
|
||||
jcenter()
|
||||
maven {
|
||||
url 'https://dl.bintray.com/kotlin/exposed'
|
||||
}
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
|
@ -280,7 +280,7 @@ data class StateRef(val txhash: SecureHash, val index: Int) {
|
||||
data class StateAndRef<out T : ContractState>(val state: TransactionState<T>, val ref: StateRef)
|
||||
|
||||
/** Filters a list of [StateAndRef] objects according to the type of the states */
|
||||
inline fun <reified T : ContractState> List<StateAndRef<ContractState>>.filterStatesOfType(): List<StateAndRef<T>> {
|
||||
inline fun <reified T : ContractState> Iterable<StateAndRef<ContractState>>.filterStatesOfType(): List<StateAndRef<T>> {
|
||||
return mapNotNull { if (it.state.data is T) StateAndRef(TransactionState(it.state.data, it.state.notary), it.ref) else null }
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,8 @@ import java.util.concurrent.Callable
|
||||
class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage,
|
||||
val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> {
|
||||
class Query(
|
||||
val withCommandOfType: Class<out CommandData>? = null
|
||||
val withCommandOfType: Class<out CommandData>? = null,
|
||||
val followInputsOfType: Class<out ContractState>? = null
|
||||
)
|
||||
|
||||
var query: Query = Query()
|
||||
@ -27,19 +28,22 @@ class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage,
|
||||
override fun call(): List<WireTransaction> {
|
||||
val q = query
|
||||
|
||||
val next = ArrayList<SecureHash>()
|
||||
next += startPoints.flatMap { it.inputs.map { it.txhash } }
|
||||
val alreadyVisited = HashSet<SecureHash>()
|
||||
val next = ArrayList<WireTransaction>(startPoints)
|
||||
|
||||
val results = ArrayList<WireTransaction>()
|
||||
|
||||
while (next.isNotEmpty()) {
|
||||
val hash = next.removeAt(next.lastIndex)
|
||||
val tx = transactions.getTransaction(hash)?.tx ?: continue
|
||||
val tx = next.removeAt(next.lastIndex)
|
||||
|
||||
if (q.matches(tx))
|
||||
results += tx
|
||||
|
||||
next += tx.inputs.map { it.txhash }
|
||||
val inputsLeadingToUnvisitedTx: Iterable<StateRef> = tx.inputs.filter { it.txhash !in alreadyVisited }
|
||||
val unvisitedInputTxs: Map<SecureHash, SignedTransaction> = inputsLeadingToUnvisitedTx.map { it.txhash }.toHashSet().map { transactions.getTransaction(it) }.filterNotNull().associateBy { it.txBits.hash }
|
||||
val unvisitedInputTxsWithInputIndex: Iterable<Pair<SignedTransaction, Int>> = inputsLeadingToUnvisitedTx.filter { it.txhash in unvisitedInputTxs.keys }.map { Pair(unvisitedInputTxs[it.txhash]!!, it.index) }
|
||||
next += (unvisitedInputTxsWithInputIndex.filter { q.followInputsOfType == null || it.first.tx.outputs[it.second].data.javaClass == q.followInputsOfType }
|
||||
.map { it.first }.filter { alreadyVisited.add(it.txBits.hash) }.map { it.tx })
|
||||
}
|
||||
|
||||
return results
|
||||
|
@ -32,7 +32,7 @@ val DEFAULT_SESSION_ID = 0L
|
||||
* Active means they haven't been consumed yet (or we don't know about it).
|
||||
* Relevant means they contain at least one of our pubkeys.
|
||||
*/
|
||||
class Wallet(val states: List<StateAndRef<ContractState>>) {
|
||||
class Wallet(val states: Iterable<StateAndRef<ContractState>>) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state.data is T } as List<StateAndRef<T>>
|
||||
|
||||
|
@ -128,3 +128,17 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac
|
||||
override val myLegalIdentityKey: KeyPair = generateKeyPair(),
|
||||
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
|
||||
: SingletonSerializeAsToken(), TxWritableStorageService
|
||||
|
||||
/**
|
||||
* Make properties appropriate for creating a DataSource for unit tests.
|
||||
*
|
||||
* @param nodeName Reflects the "instance" of the in-memory database. Defaults to a random string.
|
||||
*/
|
||||
fun makeTestDataSourceProperties(nodeName: String = SecureHash.randomSHA256().toString()): Properties {
|
||||
val props = Properties()
|
||||
props.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
|
||||
props.setProperty("dataSource.url", "jdbc:h2:mem:${nodeName}_persistence")
|
||||
props.setProperty("dataSource.user", "sa")
|
||||
props.setProperty("dataSource.password", "")
|
||||
return props
|
||||
}
|
@ -21,19 +21,20 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryWalletService(private val services: ServiceHub) : SingletonSerializeAsToken(), WalletService {
|
||||
open class InMemoryWalletService(protected val services: ServiceHub) : SingletonSerializeAsToken(), WalletService {
|
||||
class ClashingThreads(threads: Set<SecureHash>, transactions: Iterable<WireTransaction>) :
|
||||
Exception("There are multiple linear head states after processing transactions $transactions. The clashing thread(s): $threads")
|
||||
private val log = loggerFor<InMemoryWalletService>()
|
||||
|
||||
open protected val log = loggerFor<InMemoryWalletService>()
|
||||
|
||||
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
|
||||
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
|
||||
// to wallet somewhere.
|
||||
private class InnerState {
|
||||
var wallet = Wallet(emptyList<StateAndRef<OwnableState>>())
|
||||
protected class InnerState {
|
||||
var wallet = Wallet(emptyList<StateAndRef<ContractState>>())
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
protected val mutex = ThreadBox(InnerState())
|
||||
|
||||
override val currentWallet: Wallet get() = mutex.locked { wallet }
|
||||
|
||||
@ -77,6 +78,9 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe
|
||||
Pair(wallet, combinedDelta)
|
||||
}
|
||||
|
||||
// TODO: we need to remove the clashing threads concepts and support potential duplicate threads
|
||||
// because two different nodes can have two different sets of threads and so currently it's possible
|
||||
// for only one party to have a clash which interferes with determinism of the transactions.
|
||||
val clashingThreads = walletAndNetDelta.first.clashingThreads
|
||||
if (!clashingThreads.isEmpty()) {
|
||||
throw ClashingThreads(clashingThreads, txns)
|
||||
|
@ -8,6 +8,9 @@ repositories {
|
||||
url 'http://oss.sonatype.org/content/repositories/snapshots'
|
||||
}
|
||||
jcenter()
|
||||
maven {
|
||||
url 'https://dl.bintray.com/kotlin/exposed'
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -101,6 +104,15 @@ dependencies {
|
||||
// Unit testing helpers.
|
||||
testCompile 'junit:junit:4.12'
|
||||
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
|
||||
// For H2 database support in persistence
|
||||
compile "com.h2database:h2:1.3.176"
|
||||
|
||||
// Exposed: Kotlin SQL library - under evaluation
|
||||
compile "org.jetbrains.exposed:exposed:0.5.0"
|
||||
|
||||
// SQL connection pooling library
|
||||
compile "com.zaxxer:HikariCP:2.4.7"
|
||||
}
|
||||
|
||||
quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes')
|
||||
|
@ -1,16 +1,13 @@
|
||||
package com.r3corda.node
|
||||
|
||||
import com.r3corda.node.services.config.FullNodeConfiguration
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import joptsimple.OptionParser
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.File
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.net.InetAddress
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.util.*
|
||||
|
||||
val log = LoggerFactory.getLogger("Main")
|
||||
|
||||
@ -38,26 +35,8 @@ fun main(args: Array<String>) {
|
||||
}
|
||||
|
||||
val baseDirectoryPath = if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.baseDirectoryArg)) else Paths.get(".").normalize()
|
||||
|
||||
val defaultConfig = ConfigFactory.parseResources("reference.conf")
|
||||
|
||||
val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) {
|
||||
File(cmdlineOptions.valueOf(ParamsSpec.configFileArg))
|
||||
} else {
|
||||
baseDirectoryPath.resolve("node.conf").normalize().toFile()
|
||||
}
|
||||
val appConfig = ConfigFactory.parseFile(configFile)
|
||||
|
||||
val cmdlineOverrideMap = HashMap<String, Any?>() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map.
|
||||
if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) {
|
||||
cmdlineOverrideMap.put("basedir", baseDirectoryPath.toString())
|
||||
}
|
||||
val overrideConfig = ConfigFactory.parseMap(cmdlineOverrideMap)
|
||||
|
||||
val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve()
|
||||
|
||||
log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}")
|
||||
val conf = FullNodeConfiguration(mergedAndResolvedConfig)
|
||||
val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.configFileArg)) else null
|
||||
val conf = FullNodeConfiguration(NodeConfiguration.loadConfig(baseDirectoryPath, configFile))
|
||||
val dir = conf.basedir.toAbsolutePath().normalize()
|
||||
logInfo(args, dir)
|
||||
|
||||
|
@ -46,7 +46,9 @@ import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.ANSIProgressObserver
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import org.slf4j.Logger
|
||||
import java.io.Closeable
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -130,6 +132,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
lateinit var scheduler: SchedulerService
|
||||
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
|
||||
val customServices: ArrayList<Any> = ArrayList()
|
||||
protected val closeOnStop: ArrayList<Closeable> = ArrayList()
|
||||
|
||||
/** Locates and returns a service of the given type if loaded, or throws an exception if not found. */
|
||||
inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single()
|
||||
@ -155,12 +158,13 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
require(!started) { "Node has already been started" }
|
||||
log.info("Node starting up ...")
|
||||
|
||||
initialiseDatabasePersistence()
|
||||
val storageServices = initialiseStorageService(dir)
|
||||
storage = storageServices.first
|
||||
checkpointStorage = storageServices.second
|
||||
net = makeMessagingService()
|
||||
netMapCache = InMemoryNetworkMapCache()
|
||||
wallet = NodeWalletService(services)
|
||||
wallet = makeWalletService()
|
||||
|
||||
identity = makeIdentityService()
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
@ -199,6 +203,16 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
return this
|
||||
}
|
||||
|
||||
private fun initialiseDatabasePersistence() {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isNotEmpty()) {
|
||||
val (toClose, database) = configureDatabase(props)
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
log.info("Connected to ${database.vendor} database.")
|
||||
closeOnStop += toClose
|
||||
}
|
||||
}
|
||||
|
||||
private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory {
|
||||
val protocolWhitelist = HashMap<String, Set<String>>()
|
||||
for (plugin in pluginRegistries) {
|
||||
@ -321,14 +335,20 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
return service
|
||||
}
|
||||
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeWalletService(): WalletService = NodeWalletService(services)
|
||||
|
||||
open fun stop() {
|
||||
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
|
||||
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()
|
||||
// to indicate "Please shut down gracefully" vs "Shut down now".
|
||||
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
|
||||
// unsubscribes us forcibly, rather than blocking the shutdown process.
|
||||
|
||||
net.stop()
|
||||
// Stop in opposite order to starting
|
||||
for (toClose in closeOnStop.reversed()) {
|
||||
toClose.close()
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract fun makeMessagingService(): MessagingServiceInternal
|
||||
|
@ -8,7 +8,10 @@ import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.core.node.services.testing.MockIdentityService
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.AbstractNode
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
@ -84,6 +87,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
|
||||
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
||||
|
||||
override fun makeWalletService(): WalletService = InMemoryWalletService(services)
|
||||
|
||||
override fun startMessagingService() {
|
||||
// Nothing to do
|
||||
}
|
||||
@ -109,7 +114,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
|
||||
/** Returns a node, optionally created by the passed factory method. */
|
||||
fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
|
||||
start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null, vararg advertisedServices: ServiceType): MockNode {
|
||||
start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null,
|
||||
databasePersistence: Boolean = false, vararg advertisedServices: ServiceType): MockNode {
|
||||
val newNode = forcedID == -1
|
||||
val id = if (newNode) counter++ else forcedID
|
||||
|
||||
@ -122,6 +128,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
override val nearestCity: String = "Atlantis"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val dataSourceProperties: Properties get() = if (databasePersistence) makeTestDataSourceProperties("node_$id") else Properties()
|
||||
}
|
||||
val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair)
|
||||
if (start) {
|
||||
@ -160,12 +167,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
|
||||
require(nodes.isEmpty())
|
||||
return Pair(
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, NetworkMapService.Type, SimpleNotaryService.Type),
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, false, NetworkMapService.Type, SimpleNotaryService.Type),
|
||||
createNode(nodes[0].info, -1, nodeFactory, true, null)
|
||||
)
|
||||
}
|
||||
|
||||
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, false, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
fun createPartyNode(networkMapAddr: NodeInfo, legalName: String? = null, keyPair: KeyPair? = null) = createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair)
|
||||
|
||||
@Suppress("unused") // This is used from the network visualiser tool.
|
||||
|
@ -12,11 +12,15 @@ import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigParseOptions
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.util.*
|
||||
import kotlin.reflect.KProperty
|
||||
import kotlin.reflect.jvm.javaType
|
||||
|
||||
@ -26,6 +30,28 @@ interface NodeConfiguration {
|
||||
val nearestCity: String
|
||||
val keyStorePassword: String
|
||||
val trustStorePassword: String
|
||||
val dataSourceProperties: Properties get() = Properties()
|
||||
|
||||
companion object {
|
||||
val log = LoggerFactory.getLogger("NodeConfiguration")
|
||||
|
||||
fun loadConfig(baseDirectoryPath: Path, configFileOverride: Path? = null, allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
|
||||
val defaultConfig = ConfigFactory.parseResources("reference.conf")
|
||||
|
||||
val normalisedBaseDir = baseDirectoryPath.normalize()
|
||||
val configFile = (configFileOverride?.normalize() ?: normalisedBaseDir.resolve("node.conf")).toFile()
|
||||
val appConfig = ConfigFactory.parseFile(configFile, ConfigParseOptions.defaults().setAllowMissing(allowMissingConfig))
|
||||
|
||||
val overridesMap = HashMap<String, Any?>() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map.
|
||||
overridesMap.putAll(configOverrides)
|
||||
overridesMap["basedir"] = normalisedBaseDir.toAbsolutePath().toString()
|
||||
val overrideConfig = ConfigFactory.parseMap(overridesMap)
|
||||
|
||||
val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve()
|
||||
log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}")
|
||||
return mergedAndResolvedConfig
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
@ -40,16 +66,27 @@ operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T {
|
||||
Instant::class.java -> Instant.parse(getString(metadata.name)) as T
|
||||
HostAndPort::class.java -> HostAndPort.fromString(getString(metadata.name)) as T
|
||||
Path::class.java -> Paths.get(getString(metadata.name)) as T
|
||||
Properties::class.java -> getProperties(metadata.name) as T
|
||||
else -> throw IllegalArgumentException("Unsupported type ${metadata.returnType}")
|
||||
}
|
||||
}
|
||||
|
||||
fun Config.getProperties(path: String): Properties {
|
||||
val obj = this.getObject(path)
|
||||
val props = Properties()
|
||||
for ((property, objectValue) in obj.entries) {
|
||||
props.setProperty(property, objectValue.unwrapped().toString())
|
||||
}
|
||||
return props
|
||||
}
|
||||
|
||||
class NodeConfigurationFromConfig(val config: Config = ConfigFactory.load()) : NodeConfiguration {
|
||||
override val myLegalName: String by config
|
||||
override val exportJMXto: String by config
|
||||
override val nearestCity: String by config
|
||||
override val keyStorePassword: String by config
|
||||
override val trustStorePassword: String by config
|
||||
override val dataSourceProperties: Properties by config
|
||||
}
|
||||
|
||||
class NameServiceConfig(conf: Config) {
|
||||
@ -65,6 +102,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
|
||||
override val exportJMXto: String = "http"
|
||||
override val keyStorePassword: String by conf
|
||||
override val trustStorePassword: String by conf
|
||||
override val dataSourceProperties: Properties by conf
|
||||
val artemisAddress: HostAndPort by conf
|
||||
val webAddress: HostAndPort by conf
|
||||
val messagingServerAddress: HostAndPort? = if (conf.hasPath("messagingServerAddress")) HostAndPort.fromString(conf.getString("messagingServerAddress")) else null
|
||||
@ -96,4 +134,5 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
|
||||
messagingServerAddress
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,85 @@
|
||||
package com.r3corda.node.services.wallet
|
||||
|
||||
import com.r3corda.core.contracts.ContractState
|
||||
import com.r3corda.core.contracts.StateAndRef
|
||||
import com.r3corda.core.contracts.StateRef
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.Wallet
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.SchemaUtils.create
|
||||
|
||||
/**
|
||||
* Currently, the node wallet service is just the in-memory wallet service until we have finished evaluating and
|
||||
* selecting a persistence layer (probably an ORM over a SQL DB).
|
||||
* Currently, the node wallet service is a very simple RDBMS backed implementation. It will change significantly when
|
||||
* we add further functionality as the design for the wallet and wallet service matures.
|
||||
*
|
||||
* TODO: move query / filter criteria into the database query.
|
||||
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
|
||||
* TODO: have transaction storage do some caching.
|
||||
*/
|
||||
class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services)
|
||||
class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services) {
|
||||
|
||||
override val log = loggerFor<NodeWalletService>()
|
||||
|
||||
// For now we are just tracking the current state, with no historical reporting ability.
|
||||
private object UnconsumedStates : Table("vault_unconsumed_states") {
|
||||
val txhash = binary("transaction_id", 32).primaryKey()
|
||||
val index = integer("output_index").primaryKey()
|
||||
}
|
||||
|
||||
init {
|
||||
// TODO: at some future point, we'll use some schema creation tool to deploy database artifacts if the database
|
||||
// is not yet initalised to the right version of the schema.
|
||||
createTablesIfNecessary()
|
||||
|
||||
// Note that our wallet implementation currently does nothing with respect to attempting to apply criteria in the database.
|
||||
mutex.locked { wallet = Wallet(allUnconsumedStates()) }
|
||||
|
||||
// Now we need to make sure we listen to updates
|
||||
updates.subscribe { recordUpdate(it) }
|
||||
}
|
||||
|
||||
private fun recordUpdate(update: Wallet.Update) {
|
||||
val producedStateRefs = update.produced.map { it.ref }
|
||||
val consumedStateRefs = update.consumed
|
||||
log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." }
|
||||
databaseTransaction {
|
||||
// Note we also remove the produced in case we are re-inserting in some form of recovery situation.
|
||||
for (consumed in (consumedStateRefs + producedStateRefs)) {
|
||||
UnconsumedStates.deleteWhere {
|
||||
(UnconsumedStates.txhash eq consumed.txhash.bits) and (UnconsumedStates.index eq consumed.index)
|
||||
}
|
||||
}
|
||||
for (produced in producedStateRefs) {
|
||||
UnconsumedStates.insert {
|
||||
it[txhash] = produced.txhash.bits
|
||||
it[index] = produced.index
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createTablesIfNecessary() {
|
||||
log.trace { "Creating database tables if necessary." }
|
||||
databaseTransaction {
|
||||
create(UnconsumedStates)
|
||||
}
|
||||
}
|
||||
|
||||
private fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> {
|
||||
// Order by txhash for if and when transaction storage has some caching.
|
||||
// Map to StateRef and then to StateAndRef.
|
||||
return databaseTransaction {
|
||||
UnconsumedStates.selectAll().orderBy(UnconsumedStates.txhash)
|
||||
.map { StateRef(SecureHash.SHA256(it[UnconsumedStates.txhash]), it[UnconsumedStates.index]) }
|
||||
.map {
|
||||
val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.")
|
||||
StateAndRef(storedTx.tx.outputs[it.index], it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.r3corda.node.utilities
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
|
||||
fun <T> databaseTransaction(statement: Transaction.() -> T): T = org.jetbrains.exposed.sql.transactions.transaction(statement)
|
||||
|
||||
fun configureDatabase(props: Properties): Pair<Closeable, Database> {
|
||||
val config = HikariConfig(props)
|
||||
val dataSource = HikariDataSource(config)
|
||||
val database = Database.connect(dataSource)
|
||||
// Check not in read-only mode.
|
||||
check(!database.metadata.isReadOnly) { "Database should not be readonly." }
|
||||
return Pair(dataSource, database)
|
||||
}
|
@ -102,7 +102,7 @@ class AttachmentTests {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, true, null, null, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
}, true, null, null, false, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
val n1 = network.createNode(n0.info)
|
||||
|
||||
// Insert an attachment into node zero's store directly.
|
||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.node.services.*
|
||||
import com.r3corda.core.node.services.testing.MockStorageService
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
|
||||
import com.r3corda.node.serialization.NodeClock
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
@ -16,7 +17,6 @@ import com.r3corda.node.services.network.MockNetworkMapCache
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.DataVending
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import java.time.Clock
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
@ -32,7 +32,7 @@ open class MockServiceHubInternal(
|
||||
val overrideClock: Clock? = NodeClock(),
|
||||
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory()
|
||||
) : ServiceHubInternal() {
|
||||
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
|
||||
override val walletService: WalletService = customWallet ?: InMemoryWalletService(this)
|
||||
override val keyManagementService: KeyManagementService
|
||||
get() = keyManagement ?: throw UnsupportedOperationException()
|
||||
override val identityService: IdentityService
|
||||
|
@ -0,0 +1,71 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.r3corda.contracts.testing.fillWithSomeTestCash
|
||||
import com.r3corda.core.contracts.DOLLARS
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.node.services.TxWritableStorageService
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.core.node.services.testing.MockServices
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
|
||||
class NodeWalletServiceTest {
|
||||
lateinit var dataSource: Closeable
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
BriefLogFormatter.loggingOn(NodeWalletService::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
BriefLogFormatter.loggingOff(NodeWalletService::class)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `states not local to instance`() {
|
||||
val services1 = object : MockServices() {
|
||||
override val walletService: WalletService = NodeWalletService(this)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
walletService.notify(stx.tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w1 = services1.walletService.currentWallet
|
||||
assertThat(w1.states).hasSize(3)
|
||||
|
||||
val originalStorage = services1.storageService
|
||||
val services2 = object : MockServices() {
|
||||
override val walletService: WalletService = NodeWalletService(this)
|
||||
|
||||
// We need to be able to find the same transactions as before, too.
|
||||
override val storageService: TxWritableStorageService get() = originalStorage
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
walletService.notify(stx.tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val w2 = services2.walletService.currentWallet
|
||||
assertThat(w2.states).hasSize(3)
|
||||
}
|
||||
}
|
@ -8,13 +8,16 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.core.node.services.testing.MockServices
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.*
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
@ -24,10 +27,12 @@ import kotlin.test.assertNull
|
||||
class WalletWithCashTest {
|
||||
lateinit var services: MockServices
|
||||
val wallet: WalletService get() = services.walletService
|
||||
lateinit var dataSource: Closeable
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
BriefLogFormatter.loggingOn(NodeWalletService::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
services = object : MockServices() {
|
||||
override val walletService: WalletService = NodeWalletService(this)
|
||||
|
||||
@ -42,6 +47,7 @@ class WalletWithCashTest {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
BriefLogFormatter.loggingOff(NodeWalletService::class)
|
||||
}
|
||||
|
||||
@ -51,14 +57,14 @@ class WalletWithCashTest {
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w = wallet.currentWallet
|
||||
assertEquals(3, w.states.size)
|
||||
assertEquals(3, w.states.toList().size)
|
||||
|
||||
val state = w.states[0].state.data as Cash.State
|
||||
val state = w.states.toList()[0].state.data as Cash.State
|
||||
assertEquals(29.01.DOLLARS `issued by` DUMMY_CASH_ISSUER, state.amount)
|
||||
assertEquals(services.key.public, state.owner)
|
||||
|
||||
assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[2].state.data as Cash.State).amount)
|
||||
assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[1].state.data as Cash.State).amount)
|
||||
assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[2].state.data as Cash.State).amount)
|
||||
assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[1].state.data as Cash.State).amount)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -109,7 +115,7 @@ class WalletWithCashTest {
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyIssue.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
|
||||
// Issue another linear state of the same thread (nonce different)
|
||||
val dummyIssue2 = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
@ -120,7 +126,7 @@ class WalletWithCashTest {
|
||||
assertThatThrownBy {
|
||||
wallet.notify(dummyIssue2.tx)
|
||||
}
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -136,7 +142,7 @@ class WalletWithCashTest {
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyIssue.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
|
||||
// Move the same state
|
||||
val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
@ -146,6 +152,6 @@ class WalletWithCashTest {
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyMove.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ class DataVendingServiceTests {
|
||||
// Complete the cash transaction, and then manually relay it
|
||||
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
|
||||
val tx = ptx.toSignedTransaction()
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
|
||||
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
|
||||
walletServiceNode.info, tx)
|
||||
|
||||
@ -77,7 +77,7 @@ class DataVendingServiceTests {
|
||||
// The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid
|
||||
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
|
||||
val tx = ptx.toSignedTransaction(false)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
|
||||
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
|
||||
walletServiceNode.info, tx)
|
||||
|
||||
@ -87,6 +87,6 @@ class DataVendingServiceTests {
|
||||
assertTrue(ex.cause is DataVending.Service.TransactionRejectedError)
|
||||
|
||||
// Check the transaction is not in the receiving node
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
|
||||
}
|
||||
}
|
@ -26,13 +26,11 @@ import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import joptsimple.OptionParser
|
||||
import joptsimple.OptionSet
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.File
|
||||
import java.net.URL
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -308,8 +306,8 @@ private fun setup(params: CliParams.SetupNode): Int {
|
||||
dirFile.mkdirs()
|
||||
}
|
||||
|
||||
val configFile = params.dir.resolve("config").toFile()
|
||||
val config = loadConfigFile(configFile, params.defaultLegalName)
|
||||
val configFile = params.dir.resolve("config")
|
||||
val config = loadConfigFile(params.dir, configFile, params.defaultLegalName)
|
||||
if (!Files.exists(params.dir.resolve(AbstractNode.PUBLIC_IDENTITY_FILE_NAME))) {
|
||||
createIdentities(params, config)
|
||||
}
|
||||
@ -445,18 +443,16 @@ private fun getNodeConfig(cliParams: CliParams.RunNode): NodeConfiguration {
|
||||
throw NotSetupException("Missing identity file. Please run node setup before running the node")
|
||||
}
|
||||
|
||||
val configFile = cliParams.dir.resolve("config").toFile()
|
||||
return loadConfigFile(configFile, cliParams.defaultLegalName)
|
||||
val configFile = cliParams.dir.resolve("config")
|
||||
return loadConfigFile(cliParams.dir, configFile, cliParams.defaultLegalName)
|
||||
}
|
||||
|
||||
private fun loadConfigFile(configFile: File, defaultLegalName: String): NodeConfiguration {
|
||||
if (!configFile.exists()) {
|
||||
private fun loadConfigFile(baseDir: Path, configFile: Path, defaultLegalName: String): NodeConfiguration {
|
||||
if (!Files.exists(configFile)) {
|
||||
createDefaultConfigFile(configFile, defaultLegalName)
|
||||
log.warn("Default config created at $configFile.")
|
||||
}
|
||||
|
||||
val config = ConfigFactory.parseFile(configFile).withFallback(ConfigFactory.load())
|
||||
return NodeConfigurationFromConfig(config)
|
||||
return NodeConfigurationFromConfig(NodeConfiguration.loadConfig(baseDir, configFileOverride = configFile))
|
||||
}
|
||||
|
||||
private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfiguration) {
|
||||
@ -466,8 +462,8 @@ private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfigur
|
||||
node.stop()
|
||||
}
|
||||
|
||||
private fun createDefaultConfigFile(configFile: File, legalName: String) {
|
||||
configFile.writeBytes(
|
||||
private fun createDefaultConfigFile(configFile: Path, legalName: String) {
|
||||
Files.write(configFile,
|
||||
"""
|
||||
myLegalName = $legalName
|
||||
""".trimIndent().toByteArray())
|
||||
|
@ -23,6 +23,7 @@ import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.core.utilities.Emoji
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
@ -30,7 +31,6 @@ import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.protocols.NotaryProtocol
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import joptsimple.OptionParser
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -126,8 +126,7 @@ fun runTraderDemo(args: Array<String>): Int {
|
||||
Role.BUYER -> "Bank A"
|
||||
Role.SELLER -> "Bank B"
|
||||
}
|
||||
val override = ConfigFactory.parseString("myLegalName = $myLegalName")
|
||||
NodeConfigurationFromConfig(override.withFallback(ConfigFactory.load()))
|
||||
NodeConfigurationFromConfig(NodeConfiguration.loadConfig(directory, allowMissingConfig = true, configOverrides = mapOf("myLegalName" to myLegalName)))
|
||||
}
|
||||
|
||||
// Which services will this instance of the node provide to the network?
|
||||
@ -275,7 +274,8 @@ private class TraderDemoProtocolBuyer(val otherSide: Party,
|
||||
private fun logIssuanceAttachment(tradeTX: SignedTransaction) {
|
||||
// Find the original CP issuance.
|
||||
val search = TransactionGraphSearch(serviceHub.storageService.validatedTransactions, listOf(tradeTX.tx))
|
||||
search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java)
|
||||
search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java,
|
||||
followInputsOfType = CommercialPaper.State::class.java)
|
||||
val cpIssuance = search.call().single()
|
||||
|
||||
cpIssuance.attachments.first().let {
|
||||
|
@ -2,4 +2,10 @@ myLegalName = "Vast Global MegaCorp, Ltd"
|
||||
exportJMXto = "http"
|
||||
nearestCity = "The Moon"
|
||||
keyStorePassword = "cordacadevpass"
|
||||
trustStorePassword = "trustpass"
|
||||
trustStorePassword = "trustpass"
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
"dataSource.url" = "jdbc:h2:"${basedir}"/persistence"
|
||||
"dataSource.user" = sa
|
||||
"dataSource.password" = ""
|
||||
}
|
Loading…
Reference in New Issue
Block a user