mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
First working hand-rolled persistent wallet
First working Exposed-assisted persistent wallet Cleaned up Exposed-based persistent wallet Cleaned up warnings Fixed up some generic types Improved comments Fix up TODO comment Hikari and config integration Fix existing tests Clean up after looking at PR Clean up commented out lines Fix initialisation of IRS demo leaving database open Fix up after rebase Review feedback. Main change is lazy wallet iteration. Rebased and incorporated config changes. Use standardised config loading. Make wallet cash test use persistent wallet. Added test to ensure wallet retains state in database across instance creation. Tidy up whitespace and fix bug in test.
This commit is contained in:
parent
f20322136f
commit
d883b3f134
@ -58,6 +58,9 @@ repositories {
|
||||
url 'http://oss.sonatype.org/content/repositories/snapshots'
|
||||
}
|
||||
jcenter()
|
||||
maven {
|
||||
url 'https://dl.bintray.com/kotlin/exposed'
|
||||
}
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
|
@ -280,7 +280,7 @@ data class StateRef(val txhash: SecureHash, val index: Int) {
|
||||
data class StateAndRef<out T : ContractState>(val state: TransactionState<T>, val ref: StateRef)
|
||||
|
||||
/** Filters a list of [StateAndRef] objects according to the type of the states */
|
||||
inline fun <reified T : ContractState> List<StateAndRef<ContractState>>.filterStatesOfType(): List<StateAndRef<T>> {
|
||||
inline fun <reified T : ContractState> Iterable<StateAndRef<ContractState>>.filterStatesOfType(): List<StateAndRef<T>> {
|
||||
return mapNotNull { if (it.state.data is T) StateAndRef(TransactionState(it.state.data, it.state.notary), it.ref) else null }
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,8 @@ import java.util.concurrent.Callable
|
||||
class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage,
|
||||
val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> {
|
||||
class Query(
|
||||
val withCommandOfType: Class<out CommandData>? = null
|
||||
val withCommandOfType: Class<out CommandData>? = null,
|
||||
val followInputsOfType: Class<out ContractState>? = null
|
||||
)
|
||||
|
||||
var query: Query = Query()
|
||||
@ -27,19 +28,22 @@ class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage,
|
||||
override fun call(): List<WireTransaction> {
|
||||
val q = query
|
||||
|
||||
val next = ArrayList<SecureHash>()
|
||||
next += startPoints.flatMap { it.inputs.map { it.txhash } }
|
||||
val alreadyVisited = HashSet<SecureHash>()
|
||||
val next = ArrayList<WireTransaction>(startPoints)
|
||||
|
||||
val results = ArrayList<WireTransaction>()
|
||||
|
||||
while (next.isNotEmpty()) {
|
||||
val hash = next.removeAt(next.lastIndex)
|
||||
val tx = transactions.getTransaction(hash)?.tx ?: continue
|
||||
val tx = next.removeAt(next.lastIndex)
|
||||
|
||||
if (q.matches(tx))
|
||||
results += tx
|
||||
|
||||
next += tx.inputs.map { it.txhash }
|
||||
val inputsLeadingToUnvisitedTx: Iterable<StateRef> = tx.inputs.filter { it.txhash !in alreadyVisited }
|
||||
val unvisitedInputTxs: Map<SecureHash, SignedTransaction> = inputsLeadingToUnvisitedTx.map { it.txhash }.toHashSet().map { transactions.getTransaction(it) }.filterNotNull().associateBy { it.txBits.hash }
|
||||
val unvisitedInputTxsWithInputIndex: Iterable<Pair<SignedTransaction, Int>> = inputsLeadingToUnvisitedTx.filter { it.txhash in unvisitedInputTxs.keys }.map { Pair(unvisitedInputTxs[it.txhash]!!, it.index) }
|
||||
next += (unvisitedInputTxsWithInputIndex.filter { q.followInputsOfType == null || it.first.tx.outputs[it.second].data.javaClass == q.followInputsOfType }
|
||||
.map { it.first }.filter { alreadyVisited.add(it.txBits.hash) }.map { it.tx })
|
||||
}
|
||||
|
||||
return results
|
||||
|
@ -32,7 +32,7 @@ val DEFAULT_SESSION_ID = 0L
|
||||
* Active means they haven't been consumed yet (or we don't know about it).
|
||||
* Relevant means they contain at least one of our pubkeys.
|
||||
*/
|
||||
class Wallet(val states: List<StateAndRef<ContractState>>) {
|
||||
class Wallet(val states: Iterable<StateAndRef<ContractState>>) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state.data is T } as List<StateAndRef<T>>
|
||||
|
||||
|
@ -128,3 +128,17 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac
|
||||
override val myLegalIdentityKey: KeyPair = generateKeyPair(),
|
||||
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
|
||||
: SingletonSerializeAsToken(), TxWritableStorageService
|
||||
|
||||
/**
|
||||
* Make properties appropriate for creating a DataSource for unit tests.
|
||||
*
|
||||
* @param nodeName Reflects the "instance" of the in-memory database. Defaults to a random string.
|
||||
*/
|
||||
fun makeTestDataSourceProperties(nodeName: String = SecureHash.randomSHA256().toString()): Properties {
|
||||
val props = Properties()
|
||||
props.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
|
||||
props.setProperty("dataSource.url", "jdbc:h2:mem:${nodeName}_persistence")
|
||||
props.setProperty("dataSource.user", "sa")
|
||||
props.setProperty("dataSource.password", "")
|
||||
return props
|
||||
}
|
@ -21,19 +21,20 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryWalletService(private val services: ServiceHub) : SingletonSerializeAsToken(), WalletService {
|
||||
open class InMemoryWalletService(protected val services: ServiceHub) : SingletonSerializeAsToken(), WalletService {
|
||||
class ClashingThreads(threads: Set<SecureHash>, transactions: Iterable<WireTransaction>) :
|
||||
Exception("There are multiple linear head states after processing transactions $transactions. The clashing thread(s): $threads")
|
||||
private val log = loggerFor<InMemoryWalletService>()
|
||||
|
||||
open protected val log = loggerFor<InMemoryWalletService>()
|
||||
|
||||
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
|
||||
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
|
||||
// to wallet somewhere.
|
||||
private class InnerState {
|
||||
var wallet = Wallet(emptyList<StateAndRef<OwnableState>>())
|
||||
protected class InnerState {
|
||||
var wallet = Wallet(emptyList<StateAndRef<ContractState>>())
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
protected val mutex = ThreadBox(InnerState())
|
||||
|
||||
override val currentWallet: Wallet get() = mutex.locked { wallet }
|
||||
|
||||
@ -77,6 +78,9 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe
|
||||
Pair(wallet, combinedDelta)
|
||||
}
|
||||
|
||||
// TODO: we need to remove the clashing threads concepts and support potential duplicate threads
|
||||
// because two different nodes can have two different sets of threads and so currently it's possible
|
||||
// for only one party to have a clash which interferes with determinism of the transactions.
|
||||
val clashingThreads = walletAndNetDelta.first.clashingThreads
|
||||
if (!clashingThreads.isEmpty()) {
|
||||
throw ClashingThreads(clashingThreads, txns)
|
||||
|
@ -8,6 +8,9 @@ repositories {
|
||||
url 'http://oss.sonatype.org/content/repositories/snapshots'
|
||||
}
|
||||
jcenter()
|
||||
maven {
|
||||
url 'https://dl.bintray.com/kotlin/exposed'
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -101,6 +104,15 @@ dependencies {
|
||||
// Unit testing helpers.
|
||||
testCompile 'junit:junit:4.12'
|
||||
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
|
||||
// For H2 database support in persistence
|
||||
compile "com.h2database:h2:1.3.176"
|
||||
|
||||
// Exposed: Kotlin SQL library - under evaluation
|
||||
compile "org.jetbrains.exposed:exposed:0.5.0"
|
||||
|
||||
// SQL connection pooling library
|
||||
compile "com.zaxxer:HikariCP:2.4.7"
|
||||
}
|
||||
|
||||
quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes')
|
||||
|
@ -1,16 +1,13 @@
|
||||
package com.r3corda.node
|
||||
|
||||
import com.r3corda.node.services.config.FullNodeConfiguration
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import joptsimple.OptionParser
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.File
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.net.InetAddress
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.util.*
|
||||
|
||||
val log = LoggerFactory.getLogger("Main")
|
||||
|
||||
@ -38,26 +35,8 @@ fun main(args: Array<String>) {
|
||||
}
|
||||
|
||||
val baseDirectoryPath = if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.baseDirectoryArg)) else Paths.get(".").normalize()
|
||||
|
||||
val defaultConfig = ConfigFactory.parseResources("reference.conf")
|
||||
|
||||
val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) {
|
||||
File(cmdlineOptions.valueOf(ParamsSpec.configFileArg))
|
||||
} else {
|
||||
baseDirectoryPath.resolve("node.conf").normalize().toFile()
|
||||
}
|
||||
val appConfig = ConfigFactory.parseFile(configFile)
|
||||
|
||||
val cmdlineOverrideMap = HashMap<String, Any?>() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map.
|
||||
if (cmdlineOptions.has(ParamsSpec.baseDirectoryArg)) {
|
||||
cmdlineOverrideMap.put("basedir", baseDirectoryPath.toString())
|
||||
}
|
||||
val overrideConfig = ConfigFactory.parseMap(cmdlineOverrideMap)
|
||||
|
||||
val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve()
|
||||
|
||||
log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}")
|
||||
val conf = FullNodeConfiguration(mergedAndResolvedConfig)
|
||||
val configFile = if (cmdlineOptions.has(ParamsSpec.configFileArg)) Paths.get(cmdlineOptions.valueOf(ParamsSpec.configFileArg)) else null
|
||||
val conf = FullNodeConfiguration(NodeConfiguration.loadConfig(baseDirectoryPath, configFile))
|
||||
val dir = conf.basedir.toAbsolutePath().normalize()
|
||||
logInfo(args, dir)
|
||||
|
||||
|
@ -46,7 +46,9 @@ import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.ANSIProgressObserver
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import org.slf4j.Logger
|
||||
import java.io.Closeable
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -130,6 +132,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
lateinit var scheduler: SchedulerService
|
||||
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
|
||||
val customServices: ArrayList<Any> = ArrayList()
|
||||
protected val closeOnStop: ArrayList<Closeable> = ArrayList()
|
||||
|
||||
/** Locates and returns a service of the given type if loaded, or throws an exception if not found. */
|
||||
inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single()
|
||||
@ -155,12 +158,13 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
require(!started) { "Node has already been started" }
|
||||
log.info("Node starting up ...")
|
||||
|
||||
initialiseDatabasePersistence()
|
||||
val storageServices = initialiseStorageService(dir)
|
||||
storage = storageServices.first
|
||||
checkpointStorage = storageServices.second
|
||||
net = makeMessagingService()
|
||||
netMapCache = InMemoryNetworkMapCache()
|
||||
wallet = NodeWalletService(services)
|
||||
wallet = makeWalletService()
|
||||
|
||||
identity = makeIdentityService()
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
@ -199,6 +203,16 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
return this
|
||||
}
|
||||
|
||||
private fun initialiseDatabasePersistence() {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isNotEmpty()) {
|
||||
val (toClose, database) = configureDatabase(props)
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
log.info("Connected to ${database.vendor} database.")
|
||||
closeOnStop += toClose
|
||||
}
|
||||
}
|
||||
|
||||
private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory {
|
||||
val protocolWhitelist = HashMap<String, Set<String>>()
|
||||
for (plugin in pluginRegistries) {
|
||||
@ -321,14 +335,20 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
return service
|
||||
}
|
||||
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeWalletService(): WalletService = NodeWalletService(services)
|
||||
|
||||
open fun stop() {
|
||||
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
|
||||
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()
|
||||
// to indicate "Please shut down gracefully" vs "Shut down now".
|
||||
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
|
||||
// unsubscribes us forcibly, rather than blocking the shutdown process.
|
||||
|
||||
net.stop()
|
||||
// Stop in opposite order to starting
|
||||
for (toClose in closeOnStop.reversed()) {
|
||||
toClose.close()
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract fun makeMessagingService(): MessagingServiceInternal
|
||||
|
@ -8,7 +8,10 @@ import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.core.node.services.testing.MockIdentityService
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.AbstractNode
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
@ -84,6 +87,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
|
||||
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
||||
|
||||
override fun makeWalletService(): WalletService = InMemoryWalletService(services)
|
||||
|
||||
override fun startMessagingService() {
|
||||
// Nothing to do
|
||||
}
|
||||
@ -109,7 +114,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
|
||||
/** Returns a node, optionally created by the passed factory method. */
|
||||
fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
|
||||
start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null, vararg advertisedServices: ServiceType): MockNode {
|
||||
start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null,
|
||||
databasePersistence: Boolean = false, vararg advertisedServices: ServiceType): MockNode {
|
||||
val newNode = forcedID == -1
|
||||
val id = if (newNode) counter++ else forcedID
|
||||
|
||||
@ -122,6 +128,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
override val nearestCity: String = "Atlantis"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val dataSourceProperties: Properties get() = if (databasePersistence) makeTestDataSourceProperties("node_$id") else Properties()
|
||||
}
|
||||
val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair)
|
||||
if (start) {
|
||||
@ -160,12 +167,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
|
||||
require(nodes.isEmpty())
|
||||
return Pair(
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, NetworkMapService.Type, SimpleNotaryService.Type),
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, false, NetworkMapService.Type, SimpleNotaryService.Type),
|
||||
createNode(nodes[0].info, -1, nodeFactory, true, null)
|
||||
)
|
||||
}
|
||||
|
||||
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, false, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
fun createPartyNode(networkMapAddr: NodeInfo, legalName: String? = null, keyPair: KeyPair? = null) = createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair)
|
||||
|
||||
@Suppress("unused") // This is used from the network visualiser tool.
|
||||
|
@ -12,11 +12,15 @@ import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigParseOptions
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.util.*
|
||||
import kotlin.reflect.KProperty
|
||||
import kotlin.reflect.jvm.javaType
|
||||
|
||||
@ -26,6 +30,28 @@ interface NodeConfiguration {
|
||||
val nearestCity: String
|
||||
val keyStorePassword: String
|
||||
val trustStorePassword: String
|
||||
val dataSourceProperties: Properties get() = Properties()
|
||||
|
||||
companion object {
|
||||
val log = LoggerFactory.getLogger("NodeConfiguration")
|
||||
|
||||
fun loadConfig(baseDirectoryPath: Path, configFileOverride: Path? = null, allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
|
||||
val defaultConfig = ConfigFactory.parseResources("reference.conf")
|
||||
|
||||
val normalisedBaseDir = baseDirectoryPath.normalize()
|
||||
val configFile = (configFileOverride?.normalize() ?: normalisedBaseDir.resolve("node.conf")).toFile()
|
||||
val appConfig = ConfigFactory.parseFile(configFile, ConfigParseOptions.defaults().setAllowMissing(allowMissingConfig))
|
||||
|
||||
val overridesMap = HashMap<String, Any?>() // If we do require a few other command line overrides eg for a nicer development experience they would go inside this map.
|
||||
overridesMap.putAll(configOverrides)
|
||||
overridesMap["basedir"] = normalisedBaseDir.toAbsolutePath().toString()
|
||||
val overrideConfig = ConfigFactory.parseMap(overridesMap)
|
||||
|
||||
val mergedAndResolvedConfig = overrideConfig.withFallback(appConfig).withFallback(defaultConfig).resolve()
|
||||
log.info("config:\n ${mergedAndResolvedConfig.root().render(ConfigRenderOptions.defaults())}")
|
||||
return mergedAndResolvedConfig
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
@ -40,16 +66,27 @@ operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T {
|
||||
Instant::class.java -> Instant.parse(getString(metadata.name)) as T
|
||||
HostAndPort::class.java -> HostAndPort.fromString(getString(metadata.name)) as T
|
||||
Path::class.java -> Paths.get(getString(metadata.name)) as T
|
||||
Properties::class.java -> getProperties(metadata.name) as T
|
||||
else -> throw IllegalArgumentException("Unsupported type ${metadata.returnType}")
|
||||
}
|
||||
}
|
||||
|
||||
fun Config.getProperties(path: String): Properties {
|
||||
val obj = this.getObject(path)
|
||||
val props = Properties()
|
||||
for ((property, objectValue) in obj.entries) {
|
||||
props.setProperty(property, objectValue.unwrapped().toString())
|
||||
}
|
||||
return props
|
||||
}
|
||||
|
||||
class NodeConfigurationFromConfig(val config: Config = ConfigFactory.load()) : NodeConfiguration {
|
||||
override val myLegalName: String by config
|
||||
override val exportJMXto: String by config
|
||||
override val nearestCity: String by config
|
||||
override val keyStorePassword: String by config
|
||||
override val trustStorePassword: String by config
|
||||
override val dataSourceProperties: Properties by config
|
||||
}
|
||||
|
||||
class NameServiceConfig(conf: Config) {
|
||||
@ -65,6 +102,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
|
||||
override val exportJMXto: String = "http"
|
||||
override val keyStorePassword: String by conf
|
||||
override val trustStorePassword: String by conf
|
||||
override val dataSourceProperties: Properties by conf
|
||||
val artemisAddress: HostAndPort by conf
|
||||
val webAddress: HostAndPort by conf
|
||||
val messagingServerAddress: HostAndPort? = if (conf.hasPath("messagingServerAddress")) HostAndPort.fromString(conf.getString("messagingServerAddress")) else null
|
||||
@ -96,4 +134,5 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
|
||||
messagingServerAddress
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,85 @@
|
||||
package com.r3corda.node.services.wallet
|
||||
|
||||
import com.r3corda.core.contracts.ContractState
|
||||
import com.r3corda.core.contracts.StateAndRef
|
||||
import com.r3corda.core.contracts.StateRef
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.Wallet
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.SchemaUtils.create
|
||||
|
||||
/**
|
||||
* Currently, the node wallet service is just the in-memory wallet service until we have finished evaluating and
|
||||
* selecting a persistence layer (probably an ORM over a SQL DB).
|
||||
* Currently, the node wallet service is a very simple RDBMS backed implementation. It will change significantly when
|
||||
* we add further functionality as the design for the wallet and wallet service matures.
|
||||
*
|
||||
* TODO: move query / filter criteria into the database query.
|
||||
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
|
||||
* TODO: have transaction storage do some caching.
|
||||
*/
|
||||
class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services)
|
||||
class NodeWalletService(services: ServiceHub) : InMemoryWalletService(services) {
|
||||
|
||||
override val log = loggerFor<NodeWalletService>()
|
||||
|
||||
// For now we are just tracking the current state, with no historical reporting ability.
|
||||
private object UnconsumedStates : Table("vault_unconsumed_states") {
|
||||
val txhash = binary("transaction_id", 32).primaryKey()
|
||||
val index = integer("output_index").primaryKey()
|
||||
}
|
||||
|
||||
init {
|
||||
// TODO: at some future point, we'll use some schema creation tool to deploy database artifacts if the database
|
||||
// is not yet initalised to the right version of the schema.
|
||||
createTablesIfNecessary()
|
||||
|
||||
// Note that our wallet implementation currently does nothing with respect to attempting to apply criteria in the database.
|
||||
mutex.locked { wallet = Wallet(allUnconsumedStates()) }
|
||||
|
||||
// Now we need to make sure we listen to updates
|
||||
updates.subscribe { recordUpdate(it) }
|
||||
}
|
||||
|
||||
private fun recordUpdate(update: Wallet.Update) {
|
||||
val producedStateRefs = update.produced.map { it.ref }
|
||||
val consumedStateRefs = update.consumed
|
||||
log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." }
|
||||
databaseTransaction {
|
||||
// Note we also remove the produced in case we are re-inserting in some form of recovery situation.
|
||||
for (consumed in (consumedStateRefs + producedStateRefs)) {
|
||||
UnconsumedStates.deleteWhere {
|
||||
(UnconsumedStates.txhash eq consumed.txhash.bits) and (UnconsumedStates.index eq consumed.index)
|
||||
}
|
||||
}
|
||||
for (produced in producedStateRefs) {
|
||||
UnconsumedStates.insert {
|
||||
it[txhash] = produced.txhash.bits
|
||||
it[index] = produced.index
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createTablesIfNecessary() {
|
||||
log.trace { "Creating database tables if necessary." }
|
||||
databaseTransaction {
|
||||
create(UnconsumedStates)
|
||||
}
|
||||
}
|
||||
|
||||
private fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> {
|
||||
// Order by txhash for if and when transaction storage has some caching.
|
||||
// Map to StateRef and then to StateAndRef.
|
||||
return databaseTransaction {
|
||||
UnconsumedStates.selectAll().orderBy(UnconsumedStates.txhash)
|
||||
.map { StateRef(SecureHash.SHA256(it[UnconsumedStates.txhash]), it[UnconsumedStates.index]) }
|
||||
.map {
|
||||
val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.")
|
||||
StateAndRef(storedTx.tx.outputs[it.index], it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.r3corda.node.utilities
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
|
||||
fun <T> databaseTransaction(statement: Transaction.() -> T): T = org.jetbrains.exposed.sql.transactions.transaction(statement)
|
||||
|
||||
fun configureDatabase(props: Properties): Pair<Closeable, Database> {
|
||||
val config = HikariConfig(props)
|
||||
val dataSource = HikariDataSource(config)
|
||||
val database = Database.connect(dataSource)
|
||||
// Check not in read-only mode.
|
||||
check(!database.metadata.isReadOnly) { "Database should not be readonly." }
|
||||
return Pair(dataSource, database)
|
||||
}
|
@ -102,7 +102,7 @@ class AttachmentTests {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, true, null, null, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
}, true, null, null, false, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
val n1 = network.createNode(n0.info)
|
||||
|
||||
// Insert an attachment into node zero's store directly.
|
||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.node.services.*
|
||||
import com.r3corda.core.node.services.testing.MockStorageService
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
|
||||
import com.r3corda.node.serialization.NodeClock
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
@ -16,7 +17,6 @@ import com.r3corda.node.services.network.MockNetworkMapCache
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.DataVending
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import java.time.Clock
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
@ -32,7 +32,7 @@ open class MockServiceHubInternal(
|
||||
val overrideClock: Clock? = NodeClock(),
|
||||
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory()
|
||||
) : ServiceHubInternal() {
|
||||
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
|
||||
override val walletService: WalletService = customWallet ?: InMemoryWalletService(this)
|
||||
override val keyManagementService: KeyManagementService
|
||||
get() = keyManagement ?: throw UnsupportedOperationException()
|
||||
override val identityService: IdentityService
|
||||
|
@ -0,0 +1,71 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.r3corda.contracts.testing.fillWithSomeTestCash
|
||||
import com.r3corda.core.contracts.DOLLARS
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.node.services.TxWritableStorageService
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.core.node.services.testing.MockServices
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
|
||||
class NodeWalletServiceTest {
|
||||
lateinit var dataSource: Closeable
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
BriefLogFormatter.loggingOn(NodeWalletService::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
BriefLogFormatter.loggingOff(NodeWalletService::class)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `states not local to instance`() {
|
||||
val services1 = object : MockServices() {
|
||||
override val walletService: WalletService = NodeWalletService(this)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
walletService.notify(stx.tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w1 = services1.walletService.currentWallet
|
||||
assertThat(w1.states).hasSize(3)
|
||||
|
||||
val originalStorage = services1.storageService
|
||||
val services2 = object : MockServices() {
|
||||
override val walletService: WalletService = NodeWalletService(this)
|
||||
|
||||
// We need to be able to find the same transactions as before, too.
|
||||
override val storageService: TxWritableStorageService get() = originalStorage
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
walletService.notify(stx.tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val w2 = services2.walletService.currentWallet
|
||||
assertThat(w2.states).hasSize(3)
|
||||
}
|
||||
}
|
@ -8,13 +8,16 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.core.node.services.testing.MockServices
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.*
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
@ -24,10 +27,12 @@ import kotlin.test.assertNull
|
||||
class WalletWithCashTest {
|
||||
lateinit var services: MockServices
|
||||
val wallet: WalletService get() = services.walletService
|
||||
lateinit var dataSource: Closeable
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
BriefLogFormatter.loggingOn(NodeWalletService::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
services = object : MockServices() {
|
||||
override val walletService: WalletService = NodeWalletService(this)
|
||||
|
||||
@ -42,6 +47,7 @@ class WalletWithCashTest {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
BriefLogFormatter.loggingOff(NodeWalletService::class)
|
||||
}
|
||||
|
||||
@ -51,14 +57,14 @@ class WalletWithCashTest {
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w = wallet.currentWallet
|
||||
assertEquals(3, w.states.size)
|
||||
assertEquals(3, w.states.toList().size)
|
||||
|
||||
val state = w.states[0].state.data as Cash.State
|
||||
val state = w.states.toList()[0].state.data as Cash.State
|
||||
assertEquals(29.01.DOLLARS `issued by` DUMMY_CASH_ISSUER, state.amount)
|
||||
assertEquals(services.key.public, state.owner)
|
||||
|
||||
assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[2].state.data as Cash.State).amount)
|
||||
assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states[1].state.data as Cash.State).amount)
|
||||
assertEquals(35.38.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[2].state.data as Cash.State).amount)
|
||||
assertEquals(35.61.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[1].state.data as Cash.State).amount)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -109,7 +115,7 @@ class WalletWithCashTest {
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyIssue.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
|
||||
// Issue another linear state of the same thread (nonce different)
|
||||
val dummyIssue2 = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
@ -120,7 +126,7 @@ class WalletWithCashTest {
|
||||
assertThatThrownBy {
|
||||
wallet.notify(dummyIssue2.tx)
|
||||
}
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -136,7 +142,7 @@ class WalletWithCashTest {
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyIssue.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
|
||||
// Move the same state
|
||||
val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
@ -146,6 +152,6 @@ class WalletWithCashTest {
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyMove.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
assertEquals(1, wallet.currentWallet.states.toList().size)
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ class DataVendingServiceTests {
|
||||
// Complete the cash transaction, and then manually relay it
|
||||
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
|
||||
val tx = ptx.toSignedTransaction()
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
|
||||
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
|
||||
walletServiceNode.info, tx)
|
||||
|
||||
@ -77,7 +77,7 @@ class DataVendingServiceTests {
|
||||
// The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid
|
||||
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
|
||||
val tx = ptx.toSignedTransaction(false)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
|
||||
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
|
||||
walletServiceNode.info, tx)
|
||||
|
||||
@ -87,6 +87,6 @@ class DataVendingServiceTests {
|
||||
assertTrue(ex.cause is DataVending.Service.TransactionRejectedError)
|
||||
|
||||
// Check the transaction is not in the receiving node
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
|
||||
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
|
||||
}
|
||||
}
|
@ -26,13 +26,11 @@ import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import joptsimple.OptionParser
|
||||
import joptsimple.OptionSet
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.File
|
||||
import java.net.URL
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -308,8 +306,8 @@ private fun setup(params: CliParams.SetupNode): Int {
|
||||
dirFile.mkdirs()
|
||||
}
|
||||
|
||||
val configFile = params.dir.resolve("config").toFile()
|
||||
val config = loadConfigFile(configFile, params.defaultLegalName)
|
||||
val configFile = params.dir.resolve("config")
|
||||
val config = loadConfigFile(params.dir, configFile, params.defaultLegalName)
|
||||
if (!Files.exists(params.dir.resolve(AbstractNode.PUBLIC_IDENTITY_FILE_NAME))) {
|
||||
createIdentities(params, config)
|
||||
}
|
||||
@ -445,18 +443,16 @@ private fun getNodeConfig(cliParams: CliParams.RunNode): NodeConfiguration {
|
||||
throw NotSetupException("Missing identity file. Please run node setup before running the node")
|
||||
}
|
||||
|
||||
val configFile = cliParams.dir.resolve("config").toFile()
|
||||
return loadConfigFile(configFile, cliParams.defaultLegalName)
|
||||
val configFile = cliParams.dir.resolve("config")
|
||||
return loadConfigFile(cliParams.dir, configFile, cliParams.defaultLegalName)
|
||||
}
|
||||
|
||||
private fun loadConfigFile(configFile: File, defaultLegalName: String): NodeConfiguration {
|
||||
if (!configFile.exists()) {
|
||||
private fun loadConfigFile(baseDir: Path, configFile: Path, defaultLegalName: String): NodeConfiguration {
|
||||
if (!Files.exists(configFile)) {
|
||||
createDefaultConfigFile(configFile, defaultLegalName)
|
||||
log.warn("Default config created at $configFile.")
|
||||
}
|
||||
|
||||
val config = ConfigFactory.parseFile(configFile).withFallback(ConfigFactory.load())
|
||||
return NodeConfigurationFromConfig(config)
|
||||
return NodeConfigurationFromConfig(NodeConfiguration.loadConfig(baseDir, configFileOverride = configFile))
|
||||
}
|
||||
|
||||
private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfiguration) {
|
||||
@ -466,8 +462,8 @@ private fun createIdentities(params: CliParams.SetupNode, nodeConf: NodeConfigur
|
||||
node.stop()
|
||||
}
|
||||
|
||||
private fun createDefaultConfigFile(configFile: File, legalName: String) {
|
||||
configFile.writeBytes(
|
||||
private fun createDefaultConfigFile(configFile: Path, legalName: String) {
|
||||
Files.write(configFile,
|
||||
"""
|
||||
myLegalName = $legalName
|
||||
""".trimIndent().toByteArray())
|
||||
|
@ -23,6 +23,7 @@ import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.core.utilities.Emoji
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
@ -30,7 +31,6 @@ import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.protocols.NotaryProtocol
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import joptsimple.OptionParser
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -126,8 +126,7 @@ fun runTraderDemo(args: Array<String>): Int {
|
||||
Role.BUYER -> "Bank A"
|
||||
Role.SELLER -> "Bank B"
|
||||
}
|
||||
val override = ConfigFactory.parseString("myLegalName = $myLegalName")
|
||||
NodeConfigurationFromConfig(override.withFallback(ConfigFactory.load()))
|
||||
NodeConfigurationFromConfig(NodeConfiguration.loadConfig(directory, allowMissingConfig = true, configOverrides = mapOf("myLegalName" to myLegalName)))
|
||||
}
|
||||
|
||||
// Which services will this instance of the node provide to the network?
|
||||
@ -275,7 +274,8 @@ private class TraderDemoProtocolBuyer(val otherSide: Party,
|
||||
private fun logIssuanceAttachment(tradeTX: SignedTransaction) {
|
||||
// Find the original CP issuance.
|
||||
val search = TransactionGraphSearch(serviceHub.storageService.validatedTransactions, listOf(tradeTX.tx))
|
||||
search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java)
|
||||
search.query = TransactionGraphSearch.Query(withCommandOfType = CommercialPaper.Commands.Issue::class.java,
|
||||
followInputsOfType = CommercialPaper.State::class.java)
|
||||
val cpIssuance = search.call().single()
|
||||
|
||||
cpIssuance.attachments.first().let {
|
||||
|
@ -2,4 +2,10 @@ myLegalName = "Vast Global MegaCorp, Ltd"
|
||||
exportJMXto = "http"
|
||||
nearestCity = "The Moon"
|
||||
keyStorePassword = "cordacadevpass"
|
||||
trustStorePassword = "trustpass"
|
||||
trustStorePassword = "trustpass"
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
"dataSource.url" = "jdbc:h2:"${basedir}"/persistence"
|
||||
"dataSource.user" = sa
|
||||
"dataSource.password" = ""
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user