mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
Merged in persistence-support-in-mock-node (pull request #371)
Persistence support in MockNode
This commit is contained in:
commit
46d6749616
@ -3,11 +3,10 @@ package com.r3corda.node.utilities
|
||||
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||
import junit.framework.TestSuite
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.junit.AfterClass
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Test
|
||||
import org.junit.*
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Suite
|
||||
import java.io.Closeable
|
||||
@ -19,22 +18,26 @@ import java.util.*
|
||||
JDBCHashMapTestSuite.MapLoadOnInitFalse::class,
|
||||
JDBCHashMapTestSuite.MapLoadOnInitTrue::class,
|
||||
JDBCHashMapTestSuite.SetLoadOnInitFalse::class,
|
||||
JDBCHashMapTestSuite.SetLoadOnInitTrue::class,
|
||||
JDBCHashMapTestSuite.MapCanBeReloaded::class)
|
||||
JDBCHashMapTestSuite.SetLoadOnInitTrue::class)
|
||||
class JDBCHashMapTestSuite {
|
||||
companion object {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var transaction: Transaction
|
||||
lateinit var database: Database
|
||||
|
||||
@JvmStatic
|
||||
@BeforeClass
|
||||
fun before() {
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
setUpDatabaseTx()
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
@AfterClass
|
||||
fun after() {
|
||||
closeDatabaseTx()
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
@ -51,8 +54,6 @@ class JDBCHashMapTestSuite {
|
||||
)
|
||||
// putAll(null) not supported by Kotlin MutableMap interface
|
||||
.suppressing(com.google.common.collect.testing.testers.MapPutAllTester::class.java.getMethod("testPutAll_nullCollectionReference"))
|
||||
.withSetUp { setUpDatabaseTx() }
|
||||
.withTearDown { closeDatabaseTx() }
|
||||
.createTestSuite()
|
||||
|
||||
@JvmStatic
|
||||
@ -74,8 +75,6 @@ class JDBCHashMapTestSuite {
|
||||
.suppressing(com.google.common.collect.testing.testers.CollectionRemoveAllTester::class.java.getMethod("testRemoveAll_nullCollectionReferenceEmptySubject"))
|
||||
.suppressing(com.google.common.collect.testing.testers.CollectionRetainAllTester::class.java.getMethod("testRetainAll_nullCollectionReferenceNonEmptySubject"))
|
||||
.suppressing(com.google.common.collect.testing.testers.CollectionRetainAllTester::class.java.getMethod("testRetainAll_nullCollectionReferenceEmptySubject"))
|
||||
.withSetUp { setUpDatabaseTx() }
|
||||
.withTearDown { closeDatabaseTx() }
|
||||
.createTestSuite()
|
||||
|
||||
private fun setUpDatabaseTx() {
|
||||
@ -84,6 +83,7 @@ class JDBCHashMapTestSuite {
|
||||
|
||||
private fun closeDatabaseTx() {
|
||||
transaction.commit()
|
||||
transaction.close()
|
||||
}
|
||||
}
|
||||
|
||||
@ -179,36 +179,35 @@ class JDBCHashMapTestSuite {
|
||||
|
||||
private val transientMapForComparison = applyOpsToMap(LinkedHashMap())
|
||||
|
||||
companion object {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
|
||||
@JvmStatic
|
||||
@BeforeClass
|
||||
fun before() {
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
}
|
||||
@Before
|
||||
fun before() {
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
@AfterClass
|
||||
fun after() {
|
||||
dataSource.close()
|
||||
}
|
||||
@After
|
||||
fun after() {
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun `fill map and check content after reconstruction`() {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
val persistentMap = JDBCHashMap<String, String>("the_table")
|
||||
// Populate map the first time.
|
||||
applyOpsToMap(persistentMap)
|
||||
assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray())
|
||||
}
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
val persistentMap = JDBCHashMap<String, String>("the_table", loadOnInit = false)
|
||||
assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray())
|
||||
}
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
val persistentMap = JDBCHashMap<String, String>("the_table", loadOnInit = true)
|
||||
assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray())
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||
import com.r3corda.node.services.vault.CashBalanceAsMetricsObserver
|
||||
import com.r3corda.node.services.vault.NodeVaultService
|
||||
import com.r3corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Files
|
||||
@ -65,7 +66,7 @@ import java.util.concurrent.TimeUnit
|
||||
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
|
||||
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
|
||||
abstract class AbstractNode(val configuration: NodeConfiguration, val networkMapService: SingleMessageRecipient?,
|
||||
val advertisedServices: Set<ServiceType>, val platformClock: Clock): SingletonSerializeAsToken() {
|
||||
val advertisedServices: Set<ServiceType>, val platformClock: Clock) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
|
||||
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
|
||||
@ -133,9 +134,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
|
||||
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
|
||||
val customServices: ArrayList<Any> = ArrayList()
|
||||
protected val runOnStop: ArrayList<Runnable> = ArrayList()
|
||||
lateinit var database: Database
|
||||
|
||||
/** Locates and returns a service of the given type if loaded, or throws an exception if not found. */
|
||||
inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single()
|
||||
inline fun <reified T : Any> findService() = customServices.filterIsInstance<T>().single()
|
||||
|
||||
var isPreviousCheckpointsPresent = false
|
||||
private set
|
||||
@ -193,7 +195,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
|
||||
smm = StateMachineManager(services,
|
||||
listOf(tokenizableServices),
|
||||
checkpointStorage,
|
||||
serverThread)
|
||||
serverThread,
|
||||
database)
|
||||
if (serverThread is ExecutorService) {
|
||||
runOnStop += Runnable {
|
||||
// We wait here, even though any in-flight messages should have been drained away because the
|
||||
@ -230,10 +233,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isNotEmpty()) {
|
||||
val (toClose, database) = configureDatabase(props)
|
||||
this.database = database
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
log.info("Connected to ${database.vendor} database.")
|
||||
runOnStop += Runnable { toClose.close() }
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
insideTransaction()
|
||||
}
|
||||
} else {
|
||||
@ -259,7 +263,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
|
||||
val service = serviceClass.getConstructor(ServiceHubInternal::class.java).newInstance(services)
|
||||
serviceList.add(service)
|
||||
tokenizableServices.add(service)
|
||||
if(service is AcceptsFileUpload) {
|
||||
if (service is AcceptsFileUpload) {
|
||||
_servicesThatAcceptUploads += service
|
||||
}
|
||||
}
|
||||
@ -361,7 +365,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
|
||||
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) }
|
||||
|
||||
netMapCache.changed.subscribe { mapChange ->
|
||||
if(mapChange.type == MapChangeType.Added) {
|
||||
if (mapChange.type == MapChangeType.Added) {
|
||||
service.registerIdentity(mapChange.node.identity)
|
||||
}
|
||||
}
|
||||
@ -398,7 +402,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
|
||||
val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions"))
|
||||
_servicesThatAcceptUploads += attachments
|
||||
val (identity, keyPair) = obtainKeyPair(dir)
|
||||
return Pair(constructStorageService(attachments, transactionStorage, keyPair, identity),checkpointStorage)
|
||||
return Pair(constructStorageService(attachments, transactionStorage, keyPair, identity), checkpointStorage)
|
||||
}
|
||||
|
||||
protected open fun constructStorageService(attachments: NodeAttachmentService,
|
||||
|
@ -30,6 +30,7 @@ import org.eclipse.jetty.webapp.WebAppContext
|
||||
import org.glassfish.jersey.server.ResourceConfig
|
||||
import org.glassfish.jersey.server.ServerProperties
|
||||
import org.glassfish.jersey.servlet.ServletContainer
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.io.RandomAccessFile
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.nio.channels.FileLock
|
||||
@ -123,11 +124,10 @@ class Node(val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
p2pAddr
|
||||
}()
|
||||
val ops = ServerRPCOps(services)
|
||||
if (networkMapService != null) {
|
||||
return NodeMessagingClient(configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread, rpcOps = ops)
|
||||
} else {
|
||||
return NodeMessagingClient(configuration, serverAddr, null, serverThread, rpcOps = ops)
|
||||
}
|
||||
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) services.storageService.myLegalIdentityKey.public else null
|
||||
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread,
|
||||
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } },
|
||||
rpcOps = ops)
|
||||
}
|
||||
|
||||
override fun startMessagingService() {
|
||||
@ -237,7 +237,7 @@ class Node(val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
jerseyServlet.initOrder = 0 // Initialise at server start
|
||||
|
||||
// Wrap all API calls in a database transaction.
|
||||
val filterHolder = FilterHolder(DatabaseTransactionFilter())
|
||||
val filterHolder = FilterHolder(DatabaseTransactionFilter(database))
|
||||
addFilter(filterHolder, "/api/*", EnumSet.of(DispatcherType.REQUEST))
|
||||
}
|
||||
}
|
||||
@ -292,7 +292,7 @@ class Node(val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
shutdown = true
|
||||
|
||||
// Unregister shutdown hook to prevent any unnecessary second calls to stop
|
||||
if((shutdownThread != null) && (Thread.currentThread() != shutdownThread)){
|
||||
if ((shutdownThread != null) && (Thread.currentThread() != shutdownThread)) {
|
||||
Runtime.getRuntime().removeShutdownHook(shutdownThread)
|
||||
shutdownThread = null
|
||||
}
|
||||
@ -334,7 +334,7 @@ class Node(val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
}
|
||||
|
||||
// Servlet filter to wrap API requests with a database transaction.
|
||||
private class DatabaseTransactionFilter : Filter {
|
||||
private class DatabaseTransactionFilter(val database: Database) : Filter {
|
||||
override fun init(filterConfig: FilterConfig?) {
|
||||
}
|
||||
|
||||
@ -342,7 +342,7 @@ class Node(val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
}
|
||||
|
||||
override fun doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
chain.doFilter(request, response)
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.JDBCHashSet
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
@ -45,6 +44,9 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* @param executor An executor to run received message tasks upon.
|
||||
* @param persistentInbox If true the inbox will be created persistent if not already created.
|
||||
* If false the inbox queue will be transient, which is appropriate for UI clients for example.
|
||||
* @param persistenceTx A lambda to wrap message processing in any transaction required by the persistence layer (e.g.
|
||||
* a database transaction) without introducing a dependency on the actual solution and any parameters it requires
|
||||
* in this class.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeMessagingClient(config: NodeConfiguration,
|
||||
@ -52,6 +54,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
val myIdentity: PublicKey?,
|
||||
val executor: AffinityExecutor,
|
||||
val persistentInbox: Boolean = true,
|
||||
val persistenceTx: (() -> Unit) -> Unit = { it() },
|
||||
private val rpcOps: CordaRPCOps? = null) : ArtemisMessagingComponent(config), MessagingServiceInternal {
|
||||
companion object {
|
||||
val log = loggerFor<NodeMessagingClient>()
|
||||
@ -264,13 +267,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
|
||||
// start/run/stop have re-entrancy assertions at the top, so it is OK.
|
||||
executor.fetchFrom {
|
||||
// TODO: we should be able to clean this up if we separate client and server code, but for now
|
||||
// interpret persistent as "server" and non-persistent as "client".
|
||||
if (persistentInbox) {
|
||||
databaseTransaction {
|
||||
callHandlers(msg, deliverTo)
|
||||
}
|
||||
} else {
|
||||
persistenceTx {
|
||||
callHandlers(msg, deliverTo)
|
||||
}
|
||||
}
|
||||
|
@ -13,10 +13,11 @@ import com.r3corda.core.protocols.StateMachineRunId
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.utilities.createDatabaseTransaction
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
@ -40,6 +41,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
@Transient internal lateinit var suspendAction: (ProtocolIORequest) -> Unit
|
||||
@Transient internal lateinit var actionOnEnd: () -> Unit
|
||||
@Transient internal var receivedPayload: Any? = null
|
||||
@Transient internal lateinit var database: Database
|
||||
|
||||
@Transient private var _logger: Logger? = null
|
||||
override val logger: Logger get() {
|
||||
@ -85,7 +87,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
|
||||
private fun createTransaction() {
|
||||
// Make sure we have a database transaction
|
||||
TransactionManager.currentOrNew(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
createDatabaseTransaction(database)
|
||||
logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}." }
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@ import com.r3corda.node.services.api.CheckpointStorage
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.UnicastSubject
|
||||
@ -57,7 +58,10 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableServices: List<Any>, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) {
|
||||
class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableServices: List<Any>,
|
||||
val checkpointStorage: CheckpointStorage,
|
||||
val executor: AffinityExecutor,
|
||||
val database: Database) {
|
||||
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
|
||||
|
||||
val scheduler = FiberScheduler()
|
||||
@ -219,6 +223,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
|
||||
private fun initFiber(psm: ProtocolStateMachineImpl<*>, startingCheckpoint: () -> Checkpoint): Checkpoint {
|
||||
psm.database = database
|
||||
psm.serviceHub = serviceHub
|
||||
psm.suspendAction = { request ->
|
||||
psm.logger.trace { "Suspended fiber ${psm.id} ${psm.logic}" }
|
||||
|
@ -9,7 +9,6 @@ import com.r3corda.core.node.services.UniquenessProvider
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.utilities.JDBCHashMap
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
@ -23,31 +22,25 @@ class PersistentUniquenessProvider() : UniquenessProvider, SingletonSerializeAsT
|
||||
|
||||
/**
|
||||
* For each input state store the consuming transaction information.
|
||||
* TODO: remove databaseTransaction here once node initialisation is wrapped in it
|
||||
*/
|
||||
val committedStates = ThreadBox(databaseTransaction {
|
||||
JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(TABLE_NAME, loadOnInit = false)
|
||||
})
|
||||
val committedStates = ThreadBox(JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(TABLE_NAME, loadOnInit = false))
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val conflict = committedStates.locked {
|
||||
// TODO: remove databaseTransaction here once protocols are wrapped in it
|
||||
databaseTransaction {
|
||||
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
for (inputState in states) {
|
||||
val consumingTx = get(inputState)
|
||||
if (consumingTx != null) conflictingStates[inputState] = consumingTx
|
||||
}
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
log.debug("Failure, input states already committed: ${conflictingStates.keys.toString()}")
|
||||
UniquenessProvider.Conflict(conflictingStates)
|
||||
} else {
|
||||
states.forEachIndexed { i, stateRef ->
|
||||
put(stateRef, UniquenessProvider.ConsumingTx(txId, i, callerIdentity))
|
||||
}
|
||||
log.debug("Successfully committed all input states: $states")
|
||||
null
|
||||
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
for (inputState in states) {
|
||||
val consumingTx = get(inputState)
|
||||
if (consumingTx != null) conflictingStates[inputState] = consumingTx
|
||||
}
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
log.debug("Failure, input states already committed: ${conflictingStates.keys.toString()}")
|
||||
UniquenessProvider.Conflict(conflictingStates)
|
||||
} else {
|
||||
states.forEachIndexed { i, stateRef ->
|
||||
put(stateRef, UniquenessProvider.ConsumingTx(txId, i, callerIdentity))
|
||||
}
|
||||
log.debug("Successfully committed all input states: $states")
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,21 +1,106 @@
|
||||
package com.r3corda.node.utilities
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionInterface
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.io.Closeable
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
|
||||
// TODO: Handle commit failure due to database unavailable. Better to shutdown and await database reconnect/recovery.
|
||||
fun <T> databaseTransaction(statement: Transaction.() -> T): T = org.jetbrains.exposed.sql.transactions.transaction(Connection.TRANSACTION_REPEATABLE_READ, 1, statement)
|
||||
fun <T> databaseTransaction(db: Database, statement: Transaction.() -> T): T {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
StrandLocalTransactionManager.database = db
|
||||
return org.jetbrains.exposed.sql.transactions.transaction(Connection.TRANSACTION_REPEATABLE_READ, 1, statement)
|
||||
}
|
||||
|
||||
fun createDatabaseTransaction(db: Database): Transaction {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
StrandLocalTransactionManager.database = db
|
||||
return TransactionManager.currentOrNew(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
}
|
||||
|
||||
fun configureDatabase(props: Properties): Pair<Closeable, Database> {
|
||||
val config = HikariConfig(props)
|
||||
val dataSource = HikariDataSource(config)
|
||||
val database = Database.connect(dataSource)
|
||||
val database = Database.connect(dataSource) { db -> StrandLocalTransactionManager(db) }
|
||||
// Check not in read-only mode.
|
||||
check(!database.metadata.isReadOnly) { "Database should not be readonly." }
|
||||
databaseTransaction(database) {
|
||||
check(!database.metadata.isReadOnly) { "Database should not be readonly." }
|
||||
}
|
||||
return Pair(dataSource, database)
|
||||
}
|
||||
|
||||
/**
|
||||
* A relatively close copy of the [ThreadLocalTransactionManager] in Exposed but with the following adjustments to suit
|
||||
* our environment:
|
||||
*
|
||||
* Because the construction of a [Database] instance results in replacing the singleton [TransactionManager] instance,
|
||||
* our tests involving two [MockNode]s effectively replace the database instances of each other and continue to trample
|
||||
* over each other. So here we use a companion object to hold them as [ThreadLocal] and [StrandLocalTransactionManager]
|
||||
* is otherwise effectively stateless so it's replacement does not matter. The [ThreadLocal] is then set correctly and
|
||||
* explicitly just prior to initiating a transaction in [databaseTransaction] and [createDatabaseTransaction] above.
|
||||
*/
|
||||
class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionManager {
|
||||
|
||||
companion object {
|
||||
private val threadLocalDb = ThreadLocal<Database>()
|
||||
private val threadLocalTx = ThreadLocal<Transaction>()
|
||||
|
||||
var database: Database
|
||||
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find database set on current strand: ${Strand.currentStrand()}")
|
||||
set(value: Database) {
|
||||
threadLocalDb.set(value)
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
database = initWithDatabase
|
||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||
// databae transaction open. The [databaseTransaction] helper above handles this in a finally clause for you
|
||||
// but any manual database transaction management is liable to have this problem.
|
||||
if (threadLocalTx.get() != null) {
|
||||
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
|
||||
}
|
||||
}
|
||||
|
||||
override fun newTransaction(isolation: Int): Transaction = Transaction(StrandLocalTransaction(database, isolation, threadLocalTx)).apply {
|
||||
threadLocalTx.set(this)
|
||||
}
|
||||
|
||||
override fun currentOrNull(): Transaction? = threadLocalTx.get()
|
||||
|
||||
// Direct copy of [ThreadLocalTransaction].
|
||||
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>) : TransactionInterface {
|
||||
|
||||
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
db.connector().apply {
|
||||
autoCommit = false
|
||||
transactionIsolation = isolation
|
||||
}
|
||||
}
|
||||
|
||||
override val outerTransaction = threadLocal.get()
|
||||
|
||||
override fun commit() {
|
||||
connection.commit()
|
||||
}
|
||||
|
||||
override fun rollback() {
|
||||
if (!connection.isClosed) {
|
||||
connection.rollback()
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
connection.close()
|
||||
threadLocal.set(outerTransaction)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -98,7 +98,7 @@ class AttachmentTests {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, true, null, null, false, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
}, true, null, null, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
val n1 = network.createNode(n0.info.address)
|
||||
|
||||
// Insert an attachment into node zero's store directly.
|
||||
|
@ -71,7 +71,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
|
||||
init {
|
||||
val kms = MockKeyManagementService(ALICE_KEY)
|
||||
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"))
|
||||
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), persistenceTx = { it() })
|
||||
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
@ -82,10 +82,12 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
countDown = CountDownLatch(1)
|
||||
smmHasRemovedAllProtocols = CountDownLatch(1)
|
||||
calls = 0
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
val database = dataSourceAndDatabase.second
|
||||
scheduler = NodeSchedulerService(services, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor)
|
||||
val mockSMM = StateMachineManager(services, listOf(services), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)
|
||||
mockSMM.changes.subscribe { change ->
|
||||
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
|
||||
smmHasRemovedAllProtocols.countDown()
|
||||
@ -98,7 +100,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
@After
|
||||
fun tearDown() {
|
||||
// We need to make sure the StateMachineManager is done before shutting down executors.
|
||||
if(services.smm.allStateMachines.isNotEmpty()) {
|
||||
if (services.smm.allStateMachines.isNotEmpty()) {
|
||||
smmHasRemovedAllProtocols.await()
|
||||
}
|
||||
smmExecutor.shutdown()
|
||||
@ -125,6 +127,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
(serviceHub as TestReference).testReference.calls += increment
|
||||
(serviceHub as TestReference).testReference.countDown.countDown()
|
||||
}
|
||||
|
||||
override val topic: String get() = throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@ import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.testing.node.MockServices
|
||||
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -21,11 +22,14 @@ import java.util.*
|
||||
|
||||
class NodeVaultServiceTest {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(NodeVaultService::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
}
|
||||
|
||||
@After
|
||||
@ -36,7 +40,7 @@ class NodeVaultServiceTest {
|
||||
|
||||
@Test
|
||||
fun `states not local to instance`() {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
val services1 = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
|
||||
|
@ -26,7 +26,6 @@ import java.security.KeyPair
|
||||
*/
|
||||
class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
|
||||
lateinit var network: MockNetwork
|
||||
lateinit var dataSource: Closeable
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
@ -35,7 +34,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
network.stopNodes()
|
||||
}
|
||||
|
||||
/**
|
||||
@ -50,7 +49,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
|
||||
|
||||
fun swizzle() {
|
||||
delegate.unregisterNetworkHandlers()
|
||||
delegate=makeNetworkMapService(delegate.services)
|
||||
delegate = makeNetworkMapService(delegate.services)
|
||||
}
|
||||
|
||||
private fun makeNetworkMapService(services: ServiceHubInternal): AbstractNetworkMapService {
|
||||
@ -72,17 +71,17 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
|
||||
|
||||
/**
|
||||
* Perform basic tests of registering, de-registering and fetching the full network map.
|
||||
*
|
||||
* TODO: make the names of these and those in [AbstractNetworkMapServiceTest] and [InMemoryNetworkMapServiceTest] more
|
||||
* meaningful.
|
||||
*/
|
||||
@Test
|
||||
fun success() {
|
||||
val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory)
|
||||
val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService
|
||||
|
||||
// We have to set this up after the non-persistent nodes as they install a dummy transaction manager.
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
|
||||
databaseTransaction {
|
||||
success(mapServiceNode, registerNode, { service.delegate }, {service.swizzle()})
|
||||
databaseTransaction(mapServiceNode.database) {
|
||||
success(mapServiceNode, registerNode, { service.delegate }, { service.swizzle() })
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,10 +92,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
|
||||
// Confirm there's a network map service on node 0
|
||||
val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService
|
||||
|
||||
// We have to set this up after the non-persistent nodes as they install a dummy transaction manager.
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
|
||||
databaseTransaction {
|
||||
databaseTransaction(mapServiceNode.database) {
|
||||
`success with network`(network, mapServiceNode, registerNode, { service.swizzle() })
|
||||
}
|
||||
}
|
||||
@ -108,10 +104,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
|
||||
// Confirm there's a network map service on node 0
|
||||
val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService
|
||||
|
||||
// We have to set this up after the non-persistent nodes as they install a dummy transaction manager.
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
|
||||
databaseTransaction {
|
||||
databaseTransaction(mapServiceNode.database) {
|
||||
`subscribe with network`(network, mapServiceNode, registerNode, { service.delegate }, { service.swizzle() })
|
||||
}
|
||||
}
|
||||
|
@ -5,9 +5,11 @@ import com.r3corda.core.node.services.UniquenessException
|
||||
import com.r3corda.core.utilities.LogHelper
|
||||
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.testing.MEGA_CORP
|
||||
import com.r3corda.testing.generateStateRef
|
||||
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -20,11 +22,14 @@ class PersistentUniquenessProviderTests {
|
||||
val txID = SecureHash.randomSHA256()
|
||||
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
}
|
||||
|
||||
@After
|
||||
@ -34,24 +39,28 @@ class PersistentUniquenessProviderTests {
|
||||
}
|
||||
|
||||
@Test fun `should commit a transaction with unused inputs without exception`() {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val inputState = generateStateRef()
|
||||
databaseTransaction(database) {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity)
|
||||
provider.commit(listOf(inputState), txID, identity)
|
||||
}
|
||||
}
|
||||
|
||||
@Test fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val inputState = generateStateRef()
|
||||
databaseTransaction(database) {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
provider.commit(inputs, txID, identity)
|
||||
val inputs = listOf(inputState)
|
||||
provider.commit(inputs, txID, identity)
|
||||
|
||||
val ex = assertFailsWith<UniquenessException> { provider.commit(inputs, txID, identity) }
|
||||
val ex = assertFailsWith<UniquenessException> { provider.commit(inputs, txID, identity) }
|
||||
|
||||
val consumingTx = ex.error.stateHistory[inputState]!!
|
||||
assertEquals(consumingTx.id, txID)
|
||||
assertEquals(consumingTx.inputIndex, inputs.indexOf(inputState))
|
||||
assertEquals(consumingTx.requestingParty, identity)
|
||||
val consumingTx = ex.error.stateHistory[inputState]!!
|
||||
assertEquals(consumingTx.id, txID)
|
||||
assertEquals(consumingTx.inputIndex, inputs.indexOf(inputState))
|
||||
assertEquals(consumingTx.requestingParty, identity)
|
||||
}
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@ import com.r3corda.testing.*
|
||||
import com.r3corda.testing.node.MockServices
|
||||
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -31,12 +32,15 @@ class VaultWithCashTest {
|
||||
lateinit var services: MockServices
|
||||
val vault: VaultService get() = services.vaultService
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(NodeVaultService::class)
|
||||
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||
databaseTransaction {
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
databaseTransaction(database) {
|
||||
services = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
|
||||
@ -59,7 +63,7 @@ class VaultWithCashTest {
|
||||
|
||||
@Test
|
||||
fun splits() {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
// Fix the PRNG so that we get the same splits every time.
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
@ -77,7 +81,7 @@ class VaultWithCashTest {
|
||||
|
||||
@Test
|
||||
fun `issue and spend total correctly and irrelevant ignored`() {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
// A tx that sends us money.
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val usefulTX = TransactionType.General.Builder(null).apply {
|
||||
@ -116,7 +120,7 @@ class VaultWithCashTest {
|
||||
|
||||
@Test
|
||||
fun `branching LinearStates fails to verify`() {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val linearId = UniqueIdentifier()
|
||||
|
||||
@ -136,7 +140,7 @@ class VaultWithCashTest {
|
||||
|
||||
@Test
|
||||
fun `sequencing LinearStates works`() {
|
||||
databaseTransaction {
|
||||
databaseTransaction(database) {
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
|
||||
val linearId = UniqueIdentifier()
|
||||
|
@ -199,7 +199,7 @@ private fun runBuyer(node: Node, amount: Amount<Currency>) {
|
||||
// Self issue some cash.
|
||||
//
|
||||
// TODO: At some point this demo should be extended to have a central bank node.
|
||||
databaseTransaction {
|
||||
databaseTransaction(node.database) {
|
||||
node.services.fillWithSomeTestCash(300000.DOLLARS,
|
||||
outputNotary = node.info.identity, // In this demo, the buyer and notary are the same.
|
||||
ownedBy = node.services.keyManagementService.freshKey().public)
|
||||
|
@ -15,10 +15,7 @@ import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.testing.node.TestClock
|
||||
import com.r3corda.testing.node.setTo
|
||||
import com.r3corda.testing.node.*
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.nio.file.Path
|
||||
@ -71,6 +68,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val certificateSigningService: HostAndPort = HostAndPort.fromParts("localhost", 0)
|
||||
override val dataSourceProperties = makeTestDataSourceProperties()
|
||||
}
|
||||
return SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair)
|
||||
}
|
||||
@ -97,6 +95,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val certificateSigningService: HostAndPort = HostAndPort.fromParts("localhost", 0)
|
||||
override val dataSourceProperties = makeTestDataSourceProperties()
|
||||
}
|
||||
|
||||
return object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) {}
|
||||
@ -119,6 +118,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val certificateSigningService: HostAndPort = HostAndPort.fromParts("localhost", 0)
|
||||
override val dataSourceProperties = makeTestDataSourceProperties()
|
||||
}
|
||||
return SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair)
|
||||
}
|
||||
@ -140,6 +140,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val certificateSigningService: HostAndPort = HostAndPort.fromParts("localhost", 0)
|
||||
override val dataSourceProperties = makeTestDataSourceProperties()
|
||||
}
|
||||
|
||||
return object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) {
|
||||
@ -167,6 +168,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val certificateSigningService: HostAndPort = HostAndPort.fromParts("localhost", 0)
|
||||
override val dataSourceProperties = makeTestDataSourceProperties()
|
||||
}
|
||||
|
||||
val n = object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) {
|
||||
@ -293,7 +295,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused") // Used from the network visualiser tool.
|
||||
@Suppress("unused") // Used from the network visualiser tool.
|
||||
val networkInitialisationFinished: ListenableFuture<*> =
|
||||
Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
|
||||
|
||||
|
@ -75,11 +75,15 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
|
||||
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
|
||||
* executor.
|
||||
*
|
||||
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary. Defaults to a no-op.
|
||||
*/
|
||||
@Synchronized
|
||||
fun createNode(manuallyPumped: Boolean): Pair<Handle, com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging>> {
|
||||
fun createNode(manuallyPumped: Boolean,
|
||||
persistenceTx: (() -> Unit) -> Unit)
|
||||
: Pair<Handle, com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging>> {
|
||||
check(counter >= 0) { "In memory network stopped: please recreate." }
|
||||
val builder = createNodeWithID(manuallyPumped, counter) as Builder
|
||||
val builder = createNodeWithID(manuallyPumped, counter, persistenceTx = persistenceTx) as Builder
|
||||
counter++
|
||||
val id = builder.id
|
||||
return Pair(id, builder)
|
||||
@ -91,9 +95,12 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
* @param manuallyPumped see [createNode].
|
||||
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
|
||||
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate.
|
||||
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary.
|
||||
*/
|
||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int, description: String? = null): com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging> {
|
||||
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"))
|
||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int, description: String? = null,
|
||||
persistenceTx: (() -> Unit) -> Unit)
|
||||
: com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging> {
|
||||
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), persistenceTx)
|
||||
}
|
||||
|
||||
interface LatencyCalculator {
|
||||
@ -133,10 +140,11 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
messageReceiveQueues.clear()
|
||||
}
|
||||
|
||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging> {
|
||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle, val persistenceTx: (() -> Unit) -> Unit)
|
||||
: com.r3corda.node.services.api.MessagingServiceBuilder<InMemoryMessaging> {
|
||||
override fun start(): ListenableFuture<InMemoryMessaging> {
|
||||
synchronized(this@InMemoryMessagingNetwork) {
|
||||
val node = InMemoryMessaging(manuallyPumped, id)
|
||||
val node = InMemoryMessaging(manuallyPumped, id, persistenceTx)
|
||||
handleEndpointMap[id] = node
|
||||
return Futures.immediateFuture(node)
|
||||
}
|
||||
@ -197,7 +205,10 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
* An instance can be obtained by creating a builder and then using the start method.
|
||||
*/
|
||||
@ThreadSafe
|
||||
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), com.r3corda.node.services.api.MessagingServiceInternal {
|
||||
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
|
||||
private val handle: Handle,
|
||||
private val persistenceTx: (() -> Unit) -> Unit)
|
||||
: SingletonSerializeAsToken(), com.r3corda.node.services.api.MessagingServiceInternal {
|
||||
inner class Handler(val executor: Executor?, val topicSession: TopicSession,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
@ -334,7 +345,9 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(transfer.message, handler)
|
||||
persistenceTx {
|
||||
handler.callback(transfer.message, handler)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.KeyManagementService
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.VaultService
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.testing.InMemoryVaultService
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
@ -23,8 +24,8 @@ import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.keys.E2ETestKeyManagementService
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.protocols.ServiceRequestMessage
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -33,7 +34,8 @@ import java.util.*
|
||||
|
||||
/**
|
||||
* A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
|
||||
* Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem.
|
||||
* Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem or an in
|
||||
* memory H2 database instance.
|
||||
*
|
||||
* Mock network nodes require manual pumping by default: they will not run asynchronous. This means that
|
||||
* for message exchanges to take place (and associated handlers to run), you must call the [runNetwork]
|
||||
@ -50,6 +52,9 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
val filesystem = com.google.common.jimfs.Jimfs.newFileSystem(com.google.common.jimfs.Configuration.unix())
|
||||
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped)
|
||||
|
||||
// A unique identifier for this network to segregate databases with the same nodeID but different networks.
|
||||
private val networkId = random63BitValue()
|
||||
|
||||
val identities = ArrayList<Party>()
|
||||
|
||||
private val _nodes = ArrayList<MockNode>()
|
||||
@ -87,7 +92,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
|
||||
override fun makeMessagingService(): com.r3corda.node.services.api.MessagingServiceInternal {
|
||||
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
|
||||
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get()
|
||||
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName,
|
||||
persistenceTx = { body: () -> Unit -> databaseTransaction(database) { body() } }).start().get()
|
||||
}
|
||||
|
||||
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
||||
@ -100,17 +106,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
// If the in-memory H2 instance is configured, use that, otherwise mock out the transaction manager.
|
||||
override fun initialiseDatabasePersistence(insideTransaction: () -> Unit) {
|
||||
try {
|
||||
super.initialiseDatabasePersistence(insideTransaction)
|
||||
} catch(fallback: DatabaseConfigurationException) {
|
||||
log.info("Using mocked database features.")
|
||||
TransactionManager.manager = TestTransactionManager()
|
||||
insideTransaction()
|
||||
}
|
||||
}
|
||||
|
||||
override fun makeNetworkMapService() {
|
||||
inNodeNetworkMapService = InMemoryNetworkMapService(services)
|
||||
}
|
||||
@ -158,7 +153,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
/** Returns a node, optionally created by the passed factory method. */
|
||||
fun createNode(networkMapAddress: SingleMessageRecipient? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
|
||||
start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null,
|
||||
databasePersistence: Boolean = false, vararg advertisedServices: ServiceType): MockNode {
|
||||
vararg advertisedServices: ServiceType): MockNode {
|
||||
val newNode = forcedID == -1
|
||||
val id = if (newNode) counter++ else forcedID
|
||||
|
||||
@ -176,7 +171,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
override val exportJMXto: String = ""
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val dataSourceProperties: Properties get() = if (databasePersistence) makeTestDataSourceProperties("node_$id") else Properties()
|
||||
override val dataSourceProperties: Properties get() = makeTestDataSourceProperties("node_${id}_net_$networkId")
|
||||
override val certificateSigningService: HostAndPort = HostAndPort.fromParts("localhost", 0)
|
||||
}
|
||||
val node = nodeFactory.create(config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair)
|
||||
@ -217,7 +212,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
|
||||
require(nodes.isEmpty())
|
||||
return Pair(
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, false, com.r3corda.node.services.network.NetworkMapService.Type, com.r3corda.node.services.transactions.SimpleNotaryService.Type),
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, com.r3corda.node.services.network.NetworkMapService.Type, com.r3corda.node.services.transactions.SimpleNotaryService.Type),
|
||||
createNode(nodes[0].info.address, -1, nodeFactory, true, null)
|
||||
)
|
||||
}
|
||||
@ -245,14 +240,14 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
}
|
||||
|
||||
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null): MockNode {
|
||||
return createNode(null, -1, defaultFactory, true, legalName, keyPair, false, com.r3corda.node.services.network.NetworkMapService.Type, com.r3corda.node.services.transactions.SimpleNotaryService.Type)
|
||||
return createNode(null, -1, defaultFactory, true, legalName, keyPair, com.r3corda.node.services.network.NetworkMapService.Type, com.r3corda.node.services.transactions.SimpleNotaryService.Type)
|
||||
}
|
||||
|
||||
fun createPartyNode(networkMapAddr: SingleMessageRecipient, legalName: String? = null, keyPair: KeyPair? = null): MockNode {
|
||||
return createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair)
|
||||
}
|
||||
|
||||
@Suppress("unused") // This is used from the network visualiser tool.
|
||||
@Suppress("unused") // This is used from the network visualiser tool.
|
||||
fun addressToNode(address: SingleMessageRecipient): MockNode = nodes.single { it.net.myAddress == address }
|
||||
|
||||
fun startNodes() {
|
||||
|
@ -1,43 +0,0 @@
|
||||
package com.r3corda.testing.node
|
||||
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionInterface
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
|
||||
/**
|
||||
* A dummy transaction manager used by [MockNode] to avoid uninitialised lateinit var. Any attempt to use this results in an exception.
|
||||
*/
|
||||
class TestTransactionManager : TransactionManager {
|
||||
|
||||
var current = ThreadLocal<Transaction>()
|
||||
|
||||
override fun currentOrNull() = current.get()
|
||||
|
||||
override fun newTransaction(isolation: Int): Transaction {
|
||||
val newTx = Transaction(TestTransactionImpl(this))
|
||||
current.set(newTx)
|
||||
return newTx
|
||||
}
|
||||
|
||||
class TestTransactionImpl(val manager: TestTransactionManager) : TransactionInterface {
|
||||
override val connection: Connection
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val db: Database
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val outerTransaction: Transaction?
|
||||
get() = throw UnsupportedOperationException()
|
||||
|
||||
override fun close() {
|
||||
manager.current.set(null)
|
||||
}
|
||||
|
||||
override fun commit() {
|
||||
}
|
||||
|
||||
override fun rollback() {
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user