Merge remote-tracking branch 'github/master'

This commit is contained in:
Shams Asari
2016-12-09 17:41:17 +00:00
80 changed files with 1798 additions and 364 deletions

View File

@ -132,6 +132,9 @@ sealed class PortAllocation {
* @param dsl The dsl itself.
* @return The value returned in the [dsl] closure.
*/
// TODO: Add an @JvmOverloads annotation
fun <A> driver(
driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
portAllocation: PortAllocation = PortAllocation.Incremental(10000),

View File

@ -247,7 +247,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
// TODO: this model might change but for now it provides some de-coupling
// Add vault observers
CashBalanceAsMetricsObserver(services)
CashBalanceAsMetricsObserver(services, database)
ScheduledActivityObserver(services)
HibernateObserver(services)

View File

@ -42,7 +42,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
abstract val schemaService: SchemaService
abstract override val networkService: MessagingServiceInternal
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
* sends them to the vault for further processing. This is intended for implementations to call from

View File

@ -13,7 +13,7 @@ import net.corda.node.services.api.ServiceHubInternal
*/
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
services.vaultService.updates.subscribe { update ->
services.vaultService.rawUpdates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) }
update.produced.forEach { scheduleStateActivity(it, services.flowLogicRefFactory) }
}

View File

@ -136,6 +136,8 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
// TODO: Set up the connector's host name verifier logic to ensure we connect to the expected node even in case of MITM or BGP hijacks
)
)
}

View File

@ -96,6 +96,9 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
*
* @throws RPCException if the server version is too low or if the server isn't reachable within the given time.
*/
// TODO: Add an @JvmOverloads annotation
@Throws(RPCException::class)
fun proxy(timeout: Duration? = null, minVersion: Int = 0): CordaRPCOps {
return state.locked {

View File

@ -8,6 +8,7 @@ import com.esotericsoftware.kryo.io.Output
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.HashMultimap
import net.corda.core.ErrorOr
import net.corda.core.crypto.commonName
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.serialization.SerializedBytes
@ -16,14 +17,12 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.node.services.RPCUserService
import net.corda.node.services.User
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.utilities.AffinityExecutor
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x500.style.BCStyle
import rx.Notification
import rx.Observable
import rx.Subscription
@ -168,7 +167,7 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v
val rpcUser = userService.getUser(validatedUser)
if (rpcUser != null) {
return rpcUser
} else if (X500Name(validatedUser).getRDNs(BCStyle.CN).first().first.value.toString() == nodeLegalName) {
} else if (X500Name(validatedUser).commonName == nodeLegalName) {
return nodeUser
} else {
throw IllegalArgumentException("Validated user '$validatedUser' is not an RPC user nor the NODE user")

View File

@ -24,6 +24,7 @@ import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.bufferUntilDatabaseCommit
import rx.Observable
import rx.subjects.PublishSubject
import java.security.SignatureException
@ -42,7 +43,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override val partyNodes: List<NodeInfo> get() = registeredNodes.map { it.value }
override val networkMapNodes: List<NodeInfo> get() = getNodesWithService(NetworkMapService.type)
private val _changed = PublishSubject.create<MapChange>()
override val changed: Observable<MapChange> = _changed
override val changed: Observable<MapChange> get() = _changed
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
private val _registrationFuture = SettableFuture.create<Unit>()
override val mapServiceRegistered: ListenableFuture<Unit> get() = _registrationFuture
@ -91,9 +94,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
synchronized(_changed) {
val previousNode = registeredNodes.put(node.legalIdentity, node)
if (previousNode == null) {
_changed.onNext(MapChange.Added(node))
changePublisher.onNext(MapChange.Added(node))
} else if (previousNode != node) {
_changed.onNext(MapChange.Modified(node, previousNode))
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
}
}
@ -101,7 +104,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun removeNode(node: NodeInfo) {
synchronized(_changed) {
registeredNodes.remove(node.legalIdentity)
_changed.onNext(MapChange.Removed(node))
changePublisher.onNext(MapChange.Removed(node))
}
}

View File

@ -50,7 +50,7 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
mutex.locked {
stateMachineTransactionMap[transactionId] = stateMachineRunId
updates.onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
}

View File

@ -40,7 +40,7 @@ class DBTransactionStorage : TransactionStorage {
val old = txStorage.get(transaction.id)
if (old == null) {
txStorage.put(transaction.id, transaction)
updatesPublisher.onNext(transaction)
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
true
} else {
false

View File

@ -35,7 +35,7 @@ class HibernateObserver(services: ServiceHubInternal) {
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
init {
services.vaultService.updates.subscribe { persist(it.produced) }
services.vaultService.rawUpdates.subscribe { persist(it.produced) }
}
private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory {

View File

@ -86,15 +86,16 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val result = try {
logic.call()
} catch (t: Throwable) {
processException(t)
actionOnEnd()
commitTransaction()
_resultFuture?.setException(t)
throw ExecutionException(t)
}
// This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd()
_resultFuture?.set(result)
commitTransaction()
_resultFuture?.set(result)
return result
}
@ -264,8 +265,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// This can get called in actionOnSuspend *after* we commit the database transaction, so optionally open a new one here.
databaseTransaction(database) {
actionOnEnd()
_resultFuture?.setException(t)
}
_resultFuture?.setException(t)
}
internal fun resume(scheduler: FiberScheduler) {

View File

@ -12,6 +12,7 @@ import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.abbreviate
import net.corda.core.crypto.Party
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
@ -29,6 +30,7 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.isolatedTransaction
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
@ -93,7 +95,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val changesPublisher = PublishSubject.create<Change>()
fun notifyChangeObservers(psm: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) {
changesPublisher.onNext(Change(psm.logic, addOrRemove, psm.id))
changesPublisher.bufferUntilDatabaseCommit().onNext(Change(psm.logic, addOrRemove, psm.id))
}
})
@ -219,7 +221,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// isn't required to be unique
// TODO For now have the doorman block signups with identical names, and names with characters that
// are used in X.500 name textual serialisation
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peerLegalName)?.legalIdentity
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity
if (otherParty != null) {
onSessionInit(sessionMessage, otherParty)
} else {
@ -393,13 +395,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* restarted with checkpointed state machines in the storage service.
*/
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
val fiber = createFiber(logic)
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the flow completion future inside that context. The problem is that any progress checkpoints are
// unable to acquire the table lock and move forward till the calling transaction finishes.
// Committing in line here on a fresh context ensure we can progress.
isolatedTransaction(database) {
val fiber = isolatedTransaction(database) {
val fiber = createFiber(logic)
updateCheckpoint(fiber)
fiber
}
// If we are not started then our checkpoint will be picked up during start
mutex.locked {

View File

@ -3,12 +3,14 @@ package net.corda.node.services.vault
import com.codahale.metrics.Gauge
import net.corda.core.node.services.VaultService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import java.util.*
/**
* This class observes the vault and reflect current cash balances as exposed metrics in the monitoring service.
*/
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal) {
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal, val database: Database) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
serviceHubInternal.vaultService.updates.subscribe { update ->
@ -29,13 +31,15 @@ class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal) {
//
// Note: exported as pennies.
val m = serviceHubInternal.monitoringService.metrics
for ((key, value) in vault.cashBalances) {
val metric = balanceMetrics.getOrPut(key) {
val newMetric = BalanceMetric()
m.register("VaultBalances.${key}Pennies", newMetric)
newMetric
databaseTransaction(database) {
for ((key, value) in vault.cashBalances) {
val metric = balanceMetrics.getOrPut(key) {
val newMetric = BalanceMetric()
m.register("VaultBalances.${key}Pennies", newMetric)
newMetric
}
metric.pennies = value.quantity
}
metric.pennies = value.quantity
}
}
}

View File

@ -12,6 +12,7 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.tee
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.loggerFor
@ -80,6 +81,9 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
}
val _updatesPublisher = PublishSubject.create<Vault.Update>()
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()
// For use during publishing only.
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> {
// Order by txhash for if and when transaction storage has some caching.
@ -104,6 +108,9 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
override val currentVault: Vault get() = mutex.locked { Vault(allUnconsumedStates()) }
override val rawUpdates: Observable<Vault.Update>
get() = mutex.locked { _rawUpdatesPublisher }
override val updates: Observable<Vault.Update>
get() = mutex.locked { _updatesPublisher }
@ -127,7 +134,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
if (netDelta != Vault.NoUpdate) {
mutex.locked {
recordUpdate(netDelta)
_updatesPublisher.onNext(netDelta)
updatesPublisher.onNext(netDelta)
}
}
return currentVault

View File

@ -7,9 +7,13 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.crypto.toBase58String
import net.corda.node.utilities.StrandLocalTransactionManager.Boundary
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionInterface
import org.jetbrains.exposed.sql.transactions.TransactionManager
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.io.Closeable
import java.security.PublicKey
import java.sql.Connection
@ -18,6 +22,7 @@ import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
import java.util.concurrent.ConcurrentHashMap
/**
* Table prefix for all tables owned by the node module.
@ -78,12 +83,19 @@ fun <T> isolatedTransaction(database: Database, block: Transaction.() -> T): T {
* over each other. So here we use a companion object to hold them as [ThreadLocal] and [StrandLocalTransactionManager]
* is otherwise effectively stateless so it's replacement does not matter. The [ThreadLocal] is then set correctly and
* explicitly just prior to initiating a transaction in [databaseTransaction] and [createDatabaseTransaction] above.
*
* The [StrandLocalTransactionManager] instances have an [Observable] of the transaction close [Boundary]s which
* facilitates the use of [Observable.afterDatabaseCommit] to create event streams that only emit once the database
* transaction is closed and the data has been persisted and becomes visible to other observers.
*/
class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionManager {
companion object {
private val TX_ID = Key<UUID>()
private val threadLocalDb = ThreadLocal<Database>()
private val threadLocalTx = ThreadLocal<Transaction>()
private val databaseToInstance = ConcurrentHashMap<Database, StrandLocalTransactionManager>()
fun setThreadLocalTx(tx: Transaction?): Pair<Database?, Transaction?> {
val oldTx = threadLocalTx.get()
@ -101,10 +113,21 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
set(value: Database) {
threadLocalDb.set(value)
}
val transactionId: UUID
get() = threadLocalTx.get()?.getUserData(TX_ID) ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
val manager: StrandLocalTransactionManager get() = databaseToInstance[database]!!
val transactionBoundaries: PublishSubject<Boundary> get() = manager._transactionBoundaries
}
data class Boundary(val txId: UUID)
private val _transactionBoundaries = PublishSubject.create<Boundary>()
init {
database = initWithDatabase
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
// databae transaction open. The [databaseTransaction] helper above handles this in a finally clause for you
@ -112,16 +135,23 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
if (threadLocalTx.get() != null) {
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
}
database = initWithDatabase
databaseToInstance[database] = this
}
override fun newTransaction(isolation: Int): Transaction = Transaction(StrandLocalTransaction(database, isolation, threadLocalTx)).apply {
threadLocalTx.set(this)
override fun newTransaction(isolation: Int): Transaction {
val impl = StrandLocalTransaction(database, isolation, threadLocalTx, transactionBoundaries)
return Transaction(impl).apply {
threadLocalTx.set(this)
putUserData(TX_ID, impl.id)
}
}
override fun currentOrNull(): Transaction? = threadLocalTx.get()
// Direct copy of [ThreadLocalTransaction].
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>) : TransactionInterface {
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>, val transactionBoundaries: PublishSubject<Boundary>) : TransactionInterface {
val id = UUID.randomUUID()
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
db.connector().apply {
@ -145,13 +175,33 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
override fun close() {
connection.close()
threadLocal.set(outerTransaction)
if (outerTransaction == null) {
transactionBoundaries.onNext(Boundary(id))
}
}
}
}
/**
* Buffer observations until after the current database transaction has been closed. Observations are never
* dropped, simply delayed.
*
* Primarily for use by component authors to publish observations during database transactions without racing against
* closing the database transaction.
*
* For examples, see the call hierarchy of this function.
*/
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
val currentTxId = StrandLocalTransactionManager.transactionId
val databaseTxBoundary: Observable<StrandLocalTransactionManager.Boundary> = StrandLocalTransactionManager.transactionBoundaries.filter { it.txId == currentTxId }.first()
val subject = UnicastSubject.create<T>()
subject.delaySubscription(databaseTxBoundary).subscribe(this)
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
return subject
}
// Composite columns for use with below Exposed helpers.
data class PartyColumns(val name: Column<String>, val owningKey: Column<CompositeKey>)
data class StateRefColumns(val txId: Column<SecureHash>, val index: Column<Int>)
data class TxnNoteColumns(val txId: Column<SecureHash>, val note: Column<String>)

View File

@ -1,12 +1,13 @@
package net.corda.node.utilities.certsigning
import net.corda.core.crypto.CertificateStream
import org.apache.commons.io.IOUtils
import org.bouncycastle.pkcs.PKCS10CertificationRequest
import java.io.IOException
import java.net.HttpURLConnection
import java.net.HttpURLConnection.*
import java.net.URL
import java.security.cert.Certificate
import java.security.cert.CertificateFactory
import java.util.*
import java.util.zip.ZipInputStream
@ -24,18 +25,17 @@ class HTTPCertificateSigningService(val server: URL) : CertificateSigningService
conn.requestMethod = "GET"
return when (conn.responseCode) {
HttpURLConnection.HTTP_OK -> conn.inputStream.use {
ZipInputStream(it).use {
val certificates = ArrayList<Certificate>()
while (it.nextEntry != null) {
certificates.add(CertificateFactory.getInstance("X.509").generateCertificate(it))
}
certificates.toTypedArray()
HTTP_OK -> ZipInputStream(conn.inputStream).use {
val certificates = ArrayList<Certificate>()
val stream = CertificateStream(it)
while (it.nextEntry != null) {
certificates.add(stream.nextCertificate())
}
certificates.toTypedArray()
}
HttpURLConnection.HTTP_NO_CONTENT -> null
HttpURLConnection.HTTP_UNAUTHORIZED -> throw IOException("Certificate signing request has been rejected, please contact Corda network administrator for more information.")
else -> throw IOException("Unexpected response code ${conn.responseCode} - ${IOUtils.toString(conn.errorStream)}")
HTTP_NO_CONTENT -> null
HTTP_UNAUTHORIZED -> throw IOException("Certificate signing request has been rejected: ${conn.errorMessage}")
else -> throwUnexpectedResponseCode(conn)
}
}
@ -49,10 +49,15 @@ class HTTPCertificateSigningService(val server: URL) : CertificateSigningService
conn.outputStream.write(request.encoded)
return when (conn.responseCode) {
HttpURLConnection.HTTP_OK -> IOUtils.toString(conn.inputStream)
HttpURLConnection.HTTP_FORBIDDEN -> throw IOException("Client version $clientVersion is forbidden from accessing permissioning server, please upgrade to newer version.")
else -> throw IOException("Unexpected response code ${conn.responseCode} - ${IOUtils.toString(conn.errorStream)}")
HTTP_OK -> IOUtils.toString(conn.inputStream)
HTTP_FORBIDDEN -> throw IOException("Client version $clientVersion is forbidden from accessing permissioning server, please upgrade to newer version.")
else -> throwUnexpectedResponseCode(conn)
}
}
private fun throwUnexpectedResponseCode(connection: HttpURLConnection): Nothing {
throw IOException("Unexpected response code ${connection.responseCode} - ${connection.errorMessage}")
}
private val HttpURLConnection.errorMessage: String get() = IOUtils.toString(errorStream)
}

View File

@ -8,6 +8,7 @@ import com.typesafe.config.ConfigFactory
import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair
import net.corda.core.messaging.Message
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.LogHelper
@ -16,7 +17,6 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.core.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider

View File

@ -5,6 +5,7 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.expect
import net.corda.testing.node.MockNetwork
import org.junit.Test
@ -30,7 +31,9 @@ class InMemoryNetworkMapCacheTest {
// Node A currently knows only about itself, so this returns node A
assertEquals(nodeA.netMapCache.getNodeByCompositeKey(keyPair.public.composite), nodeA.info)
nodeA.netMapCache.addNode(nodeB.info)
databaseTransaction(nodeA.database) {
nodeA.netMapCache.addNode(nodeB.info)
}
// Now both nodes match, so it throws an error
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByCompositeKey(keyPair.public.composite)

View File

@ -16,6 +16,7 @@ import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.junit.Before
@ -201,7 +202,9 @@ class InMemoryNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
fun success() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService
success(mapServiceNode, registerNode, { service }, { })
databaseTransaction(mapServiceNode.database) {
success(mapServiceNode, registerNode, { service }, { })
}
}
@Test

View File

@ -0,0 +1,142 @@
package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.tee
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
class ObservablesTests {
private fun isInDatabaseTransaction(): Boolean = (TransactionManager.currentOrNull() != null)
@Test
fun `bufferUntilDatabaseCommit delays until transaction closed`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val subject = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val secondEvent = SettableFuture.create<Pair<Int, Boolean>>()
observable.first().subscribe { firstEvent.set(it to isInDatabaseTransaction()) }
observable.skip(1).first().subscribe { secondEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
subject.onNext(1)
assertThat(firstEvent.isDone).isTrue()
assertThat(secondEvent.isDone).isFalse()
}
assertThat(secondEvent.isDone).isTrue()
assertThat(firstEvent.get()).isEqualTo(1 to true)
assertThat(secondEvent.get()).isEqualTo(0 to false)
toBeClosed.close()
}
@Test
fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val subject = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val secondEvent = SettableFuture.create<Pair<Int, Boolean>>()
observable.first().subscribe { firstEvent.set(it to isInDatabaseTransaction()) }
observable.skip(1).first().subscribe { secondEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
assertThat(firstEvent.isDone).isFalse()
assertThat(secondEvent.isDone).isFalse()
}
assertThat(firstEvent.isDone).isTrue()
assertThat(firstEvent.get()).isEqualTo(0 to false)
assertThat(secondEvent.isDone).isFalse()
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(1)
assertThat(secondEvent.isDone).isFalse()
}
assertThat(secondEvent.isDone).isTrue()
assertThat(secondEvent.get()).isEqualTo(1 to false)
toBeClosed.close()
}
@Test
fun `tee correctly copies observations to multiple observers`() {
val subject1 = PublishSubject.create<Int>()
val subject2 = PublishSubject.create<Int>()
val subject3 = PublishSubject.create<Int>()
val event1 = SettableFuture.create<Int>()
val event2 = SettableFuture.create<Int>()
val event3 = SettableFuture.create<Int>()
subject1.subscribe { event1.set(it) }
subject2.subscribe { event2.set(it) }
subject3.subscribe { event3.set(it) }
val tee = subject1.tee(subject2, subject3)
tee.onNext(0)
assertThat(event1.isDone).isTrue()
assertThat(event2.isDone).isTrue()
assertThat(event3.isDone).isTrue()
assertThat(event1.get()).isEqualTo(0)
assertThat(event2.get()).isEqualTo(0)
assertThat(event3.get()).isEqualTo(0)
tee.onCompleted()
assertThat(subject1.hasCompleted()).isTrue()
assertThat(subject2.hasCompleted()).isTrue()
assertThat(subject3.hasCompleted()).isTrue()
}
@Test
fun `combine tee and bufferUntilDatabaseCommit`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val subject = PublishSubject.create<Int>()
val teed = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val teedEvent = SettableFuture.create<Pair<Int, Boolean>>()
observable.first().subscribe { firstEvent.set(it to isInDatabaseTransaction()) }
teed.first().subscribe { teedEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit().tee(teed)
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
assertThat(firstEvent.isDone).isFalse()
assertThat(teedEvent.isDone).isTrue()
}
assertThat(firstEvent.isDone).isTrue()
assertThat(firstEvent.get()).isEqualTo(0 to false)
assertThat(teedEvent.get()).isEqualTo(0 to true)
toBeClosed.close()
}
}