Merge branch 'release/os/4.3' into merge/2019-10-30-os-43-to-44

This commit is contained in:
szymonsztuka 2019-09-30 16:20:58 +01:00
commit ac2c822dd4
23 changed files with 585 additions and 233 deletions

View File

@ -1,8 +1,8 @@
package net.corda.client.jfx.model
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
@ -48,7 +48,7 @@ class NodeMonitorModel : AutoCloseable {
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<MapChange> = networkMapSubject
private lateinit var rpc: CordaRPCOps
private lateinit var rpc: CordaRPCConnection
val proxyObservable = SimpleObjectProperty<CordaRPCOps>()
lateinit var notaryIdentities: List<Party>
@ -61,7 +61,7 @@ class NodeMonitorModel : AutoCloseable {
*/
override fun close() {
try {
(rpc as ReconnectingCordaRPCOps).close()
rpc.close()
} catch (e: Exception) {
logger.error("Error closing RPC connection to node", e)
}
@ -72,35 +72,42 @@ class NodeMonitorModel : AutoCloseable {
* TODO provide an unsubscribe mechanism
*/
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT)
proxyObservable.value = rpc
rpc = CordaRPCClient(nodeHostAndPort).start(username, password)
proxyObservable.value = rpc.proxy
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = rpc.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
val (
statesSnapshot,
vaultUpdates
) = rpc.proxy.vaultTrackBy<ContractState>(
QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)
)
val unconsumedStates =
statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
}.toSet()
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet())
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext)
// Transactions
val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.internalVerifiedTransactionsFeed()
val (transactions, newTransactions) =
@Suppress("DEPRECATION") rpc.proxy.internalVerifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext)
// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed()
val (smTxMappings, futureSmTxMappings) =
rpc.proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext)
// Parties on network
val (parties, futurePartyUpdate) = rpc.networkMapFeed()
val (parties, futurePartyUpdate) = rpc.proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext)
val stateMachines = rpc.stateMachinesSnapshot()
val stateMachines = rpc.proxy.stateMachinesSnapshot()
notaryIdentities = rpc.notaryIdentities()
notaryIdentities = rpc.proxy.notaryIdentities()
// Extract the flow tracking stream
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...

View File

@ -56,8 +56,8 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5
))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
cashStatesFeed.updates.subscribe { latch.countDown() }
@ -96,8 +96,8 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5
))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
val subscription = cashStatesFeed.updates.subscribe { latch.countDown() }
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
@ -113,6 +113,7 @@ class CordaRPCClientReconnectionTest {
latch.await(4, TimeUnit.SECONDS)
}
}
}
}
@ -136,8 +137,8 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5
))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
cashStatesFeed.updates.subscribe { latch.countDown() }

View File

@ -27,6 +27,9 @@ import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
/**
* This class is essentially just a wrapper for an RPCConnection<CordaRPCOps> and can be treated identically.
@ -35,9 +38,10 @@ import java.util.*
*/
class CordaRPCConnection private constructor(
private val oneTimeConnection: RPCConnection<CordaRPCOps>?,
private val observersPool: ExecutorService?,
private val reconnectingCordaRPCOps: ReconnectingCordaRPCOps?
) : RPCConnection<CordaRPCOps> {
internal constructor(connection: RPCConnection<CordaRPCOps>?) : this(connection, null)
internal constructor(connection: RPCConnection<CordaRPCOps>?) : this(connection, null, null)
companion object {
@CordaInternal
@ -46,14 +50,20 @@ class CordaRPCConnection private constructor(
password: String,
addresses: List<NetworkHostAndPort>,
rpcConfiguration: CordaRPCClientConfiguration,
gracefulReconnect: GracefulReconnect
gracefulReconnect: GracefulReconnect,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
): CordaRPCConnection {
return CordaRPCConnection(null, ReconnectingCordaRPCOps(
val observersPool: ExecutorService = Executors.newCachedThreadPool()
return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps(
addresses,
username,
password,
rpcConfiguration,
gracefulReconnect
gracefulReconnect,
sslConfiguration,
classLoader,
observersPool
))
}
}
@ -69,7 +79,18 @@ class CordaRPCConnection private constructor(
override fun forceClose() = actualConnection.forceClose()
override fun close() = actualConnection.close()
override fun close() {
try {
actualConnection.close()
} finally {
observersPool?.apply {
shutdown()
if (!awaitTermination(@Suppress("MagicNumber")30, TimeUnit.SECONDS)) {
shutdownNow()
}
}
}
}
}
/**
@ -322,19 +343,36 @@ class CordaRPCClient private constructor(
) {
@JvmOverloads
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(
hostAndPort = hostAndPort, haAddressPool = emptyList(), configuration = configuration
constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
configuration = configuration
)
constructor(hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
classLoader: ClassLoader): this(hostAndPort, configuration, null, classLoader = classLoader)
constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
classLoader: ClassLoader
): this(
hostAndPort,
configuration,
null,
classLoader = classLoader
)
constructor(
hostAndPort: NetworkHostAndPort,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) : this(hostAndPort = hostAndPort, haAddressPool = emptyList(), sslConfiguration = sslConfiguration, classLoader = classLoader)
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
@ -342,7 +380,13 @@ class CordaRPCClient private constructor(
configuration: CordaRPCClientConfiguration,
sslConfiguration: ClientRpcSslOptions?,
classLoader: ClassLoader? = null
) : this(hostAndPort = hostAndPort, haAddressPool = emptyList(), configuration = configuration, sslConfiguration = sslConfiguration, classLoader = classLoader)
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
@ -350,7 +394,13 @@ class CordaRPCClient private constructor(
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) : this(hostAndPort = null, haAddressPool = haAddressPool, configuration = configuration, sslConfiguration = sslConfiguration, classLoader = classLoader)
) : this(
hostAndPort = null,
haAddressPool = haAddressPool,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
// Here to keep the keep ABI compatibility happy
companion object {}
@ -362,11 +412,23 @@ class CordaRPCClient private constructor(
try {
val cache = Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap()
// If the client has explicitly provided a classloader use this one to scan for custom serializers, otherwise use the current one.
// If the client has explicitly provided a classloader use this one to scan for custom serializers,
// otherwise use the current one.
val serializationClassLoader = this.classLoader ?: this.javaClass.classLoader
val customSerializers = createInstancesOfClassesImplementing(serializationClassLoader, SerializationCustomSerializer::class.java)
val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, serializationClassLoader).toSet()
AMQPClientSerializationScheme.initialiseSerialization(serializationClassLoader, customSerializers, serializationWhitelists, cache)
val customSerializers = createInstancesOfClassesImplementing(
serializationClassLoader,
SerializationCustomSerializer::class.java
)
val serializationWhitelists = ServiceLoader.load(
SerializationWhitelist::class.java,
serializationClassLoader
).toSet()
AMQPClientSerializationScheme.initialiseSerialization(
serializationClassLoader,
customSerializers,
serializationWhitelists,
cache
)
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}
@ -401,7 +463,11 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
return start(username, password, null, null, gracefulReconnect)
}
@ -418,7 +484,12 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
targetLegalIdentity: CordaX500Name,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
return start(username, password, null, null, targetLegalIdentity, gracefulReconnect)
}
@ -436,7 +507,13 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
externalTrace: Trace?,
impersonatedActor: Actor?,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect)
}
@ -457,7 +534,14 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
externalTrace: Trace?,
impersonatedActor: Actor?,
targetLegalIdentity: CordaX500Name?,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
val addresses = if (haAddressPool.isEmpty()) {
listOf(hostAndPort!!)
} else {
@ -465,9 +549,23 @@ class CordaRPCClient private constructor(
}
return if (gracefulReconnect != null) {
CordaRPCConnection.createWithGracefulReconnection(username, password, addresses, configuration, gracefulReconnect)
CordaRPCConnection.createWithGracefulReconnection(
username,
password,
addresses,
configuration,
gracefulReconnect,
sslConfiguration
)
} else {
CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
CordaRPCConnection(getRpcClient().start(
InternalCordaRPCOps::class.java,
username,
password,
externalTrace,
impersonatedActor,
targetLegalIdentity
))
}
}

View File

@ -32,37 +32,24 @@ import java.util.concurrent.TimeUnit
/**
* Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail.
*
* All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown.
* All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a
* [CouldNotStartFlowException] is thrown.
*
* When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer
* be a usable [rx.Observable] but an instance of [ReconnectingObservable].
* The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] it will just fail.
* The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable]
* it will just fail.
* The returned [DataFeed.snapshot] is the snapshot as it was when the feed was first retrieved.
*
* Note: There is no guarantee that observations will not be lost.
*
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.*
*/
// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which ReconnectingObservables
// and other things can attach themselves as listeners for reconnect events.
// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which
// ReconnectingObservables and other things can attach themselves as listeners for reconnect events.
class ReconnectingCordaRPCOps private constructor(
val reconnectingRPCConnection: ReconnectingRPCConnection,
private val observersPool: ExecutorService,
private val userPool: Boolean
) : AutoCloseable, InternalCordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
// Constructors that mirror CordaRPCClient.
constructor(
nodeHostAndPort: NetworkHostAndPort,
username: String,
password: String,
rpcConfiguration: CordaRPCClientConfiguration,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null
) : this(
ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, rpcConfiguration, sslConfiguration, classLoader),
observersPool ?: Executors.newCachedThreadPool(),
observersPool != null)
val reconnectingRPCConnection: ReconnectingRPCConnection
) : InternalCordaRPCOps by proxy(reconnectingRPCConnection) {
constructor(
nodeHostAndPorts: List<NetworkHostAndPort>,
username: String,
@ -71,18 +58,23 @@ class ReconnectingCordaRPCOps private constructor(
gracefulReconnect: GracefulReconnect? = null,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null
) : this(
ReconnectingRPCConnection(nodeHostAndPorts, username, password, rpcConfiguration, sslConfiguration, classLoader, gracefulReconnect),
observersPool ?: Executors.newCachedThreadPool(),
observersPool != null)
observersPool: ExecutorService
) : this(ReconnectingRPCConnection(
nodeHostAndPorts,
username,
password,
rpcConfiguration,
sslConfiguration,
classLoader,
gracefulReconnect,
observersPool))
private companion object {
private val log = contextLogger()
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): InternalCordaRPCOps {
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection): InternalCordaRPCOps {
return Proxy.newProxyInstance(
this::class.java.classLoader,
arrayOf(InternalCordaRPCOps::class.java),
ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as InternalCordaRPCOps
ErrorInterceptingHandler(reconnectingRPCConnection)) as InternalCordaRPCOps
}
}
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
@ -125,7 +117,8 @@ class ReconnectingCordaRPCOps private constructor(
val rpcConfiguration: CordaRPCClientConfiguration,
val sslConfiguration: ClientRpcSslOptions? = null,
val classLoader: ClassLoader?,
val gracefulReconnect: GracefulReconnect? = null
val gracefulReconnect: GracefulReconnect? = null,
val observersPool: ExecutorService
) : RPCConnection<CordaRPCOps> {
private var currentRPCConnection: CordaRPCConnection? = null
enum class CurrentState {
@ -178,12 +171,18 @@ class ReconnectingCordaRPCOps private constructor(
return currentRPCConnection!!
}
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0): CordaRPCConnection {
private tailrec fun establishConnectionWithRetry(
retryInterval: Duration = 1.seconds,
roundRobinIndex: Int = 0
): CordaRPCConnection {
val attemptedAddress = nodeHostAndPorts[roundRobinIndex]
log.info("Connecting to: $attemptedAddress")
try {
return CordaRPCClient(
attemptedAddress, rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader
attemptedAddress,
rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1),
sslConfiguration,
classLoader
).start(username, password).also {
// Check connection is truly operational before returning it.
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
@ -240,7 +239,7 @@ class ReconnectingCordaRPCOps private constructor(
currentRPCConnection?.close()
}
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler {
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
@ -290,7 +289,7 @@ class ReconnectingCordaRPCOps private constructor(
DataFeed::class.java -> {
// Intercept the data feed methods and return a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args))
val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) {
val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) {
// This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
}
@ -302,8 +301,7 @@ class ReconnectingCordaRPCOps private constructor(
}
}
override fun close() {
if (!userPool) observersPool.shutdown()
fun close() {
retryFlowsPool.shutdown()
reconnectingRPCConnection.forceClose()
}

View File

@ -4,21 +4,18 @@ import net.corda.core.messaging.DataFeed
import rx.Observable
import rx.Subscriber
import rx.Subscription
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicReference
class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubscriber<T>) : Observable<T>(subscriber) {
constructor(
reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection,
executor: ExecutorService,
initialDataFeed: DataFeed<*, T>,
createDataFeed: () -> DataFeed<*, T>
) : this(ReconnectingSubscriber(reconnectingRPCConnection, executor, initialDataFeed, createDataFeed))
) : this(ReconnectingSubscriber(reconnectingRPCConnection, initialDataFeed, createDataFeed))
private class ReconnectingSubscriber<T>(
private val reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection,
private val executor: ExecutorService,
private val initialDataFeed: DataFeed<*, T>,
private val createDataFeed: () -> DataFeed<*, T>
) : OnSubscribe<T>, Subscription {
@ -59,7 +56,7 @@ class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubs
private fun scheduleResubscribe(error: Throwable) {
if (unsubscribed) return
executor.execute {
reconnectingRPCConnection.observersPool.execute {
if (unsubscribed) return@execute
reconnectingRPCConnection.reconnectOnError(error)
val newDataFeed = createDataFeed()

View File

@ -366,7 +366,14 @@ class Verifier(val ltx: LedgerTransaction, private val transactionClassLoader: C
val contractInstances: List<Contract> = contractClasses.map { (contractClassName, contractClass) ->
try {
contractClass.getDeclaredConstructor().newInstance()
/**
* This function must execute with the DJVM's sandbox, which does not
* permit user code to access [java.lang.reflect.Constructor] objects.
*
* [Class.newInstance] is deprecated as of Java 9.
*/
@Suppress("deprecation")
contractClass.newInstance()
} catch (e: Exception) {
throw TransactionVerificationException.ContractCreationError(ltx.id, contractClassName, e)
}

View File

@ -8,6 +8,7 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.services.Vault
import net.corda.core.schemas.StatePersistable
import net.corda.core.serialization.CordaSerializable
@ -82,6 +83,7 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
open val participants: List<AbstractParty>? = null
abstract val contractStateTypes: Set<Class<out ContractState>>?
open val externalIds: List<UUID> = emptyList()
open val exactParticipants: List<AbstractParty>? = null
override fun visit(parser: IQueryCriteriaParser): Collection<Predicate> {
return parser.parseCriteria(this)
}
@ -101,8 +103,23 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
override val constraintTypes: Set<Vault.ConstraintInfo.Type> = emptySet(),
override val constraints: Set<Vault.ConstraintInfo> = emptySet(),
override val participants: List<AbstractParty>? = null,
override val externalIds: List<UUID> = emptyList()
override val externalIds: List<UUID> = emptyList(),
override val exactParticipants: List<AbstractParty>? = null
) : CommonQueryCriteria() {
@DeprecatedConstructorForDeserialization(version = 7)
constructor(
status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
contractStateTypes: Set<Class<out ContractState>>? = null,
stateRefs: List<StateRef>? = null,
notary: List<AbstractParty>? = null,
softLockingCondition: SoftLockingCondition? = null,
timeCondition: TimeCondition? = null,
relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL,
constraintTypes: Set<Vault.ConstraintInfo.Type> = emptySet(),
constraints: Set<Vault.ConstraintInfo> = emptySet(),
participants: List<AbstractParty>? = null,
externalIds: List<UUID> = emptyList()
) : this(status, contractStateTypes, stateRefs, notary, softLockingCondition, timeCondition, relevancyStatus, constraintTypes, constraints, participants, externalIds, null)
// V4 constructors.
@DeprecatedConstructorForDeserialization(version = 7)
constructor(
@ -162,6 +179,36 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
fun withConstraints(constraints: Set<Vault.ConstraintInfo>): VaultQueryCriteria = copy(constraints = constraints)
fun withParticipants(participants: List<AbstractParty>): VaultQueryCriteria = copy(participants = participants)
fun withExternalIds(externalIds: List<UUID>): VaultQueryCriteria = copy(externalIds = externalIds)
fun withExactParticipants(exactParticipants: List<AbstractParty>): VaultQueryCriteria = copy(exactParticipants = exactParticipants)
fun copy(
status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
contractStateTypes: Set<Class<out ContractState>>? = null,
stateRefs: List<StateRef>? = null,
notary: List<AbstractParty>? = null,
softLockingCondition: SoftLockingCondition? = null,
timeCondition: TimeCondition? = null,
relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL,
constraintTypes: Set<Vault.ConstraintInfo.Type> = emptySet(),
constraints: Set<Vault.ConstraintInfo> = emptySet(),
participants: List<AbstractParty>? = null,
externalIds: List<UUID> = emptyList()
): VaultQueryCriteria {
return VaultQueryCriteria(
status,
contractStateTypes,
stateRefs,
notary,
softLockingCondition,
timeCondition,
relevancyStatus,
constraintTypes,
constraints,
participants,
externalIds,
exactParticipants
)
}
fun copy(
status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
@ -218,13 +265,24 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
* LinearStateQueryCriteria: provides query by attributes defined in [VaultSchema.VaultLinearState]
*/
data class LinearStateQueryCriteria(
override val participants: List<AbstractParty>?,
override val participants: List<AbstractParty>? = null,
val uuid: List<UUID>? = null,
val externalId: List<String>? = null,
override val status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
override val contractStateTypes: Set<Class<out ContractState>>? = null,
override val relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL
override val relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL,
override val exactParticipants: List<AbstractParty>?
) : CommonQueryCriteria() {
// V4 c'tor
@DeprecatedConstructorForDeserialization(version = 4)
constructor(
participants: List<AbstractParty>? = null,
uuid: List<UUID>? = null,
externalId: List<String>? = null,
status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
contractStateTypes: Set<Class<out ContractState>>? = null,
relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL
) : this(participants, uuid, externalId, status, contractStateTypes, relevancyStatus, null)
// V3 c'tor
@JvmOverloads
@DeprecatedConstructorForDeserialization(version = 2)
@ -265,6 +323,8 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
fun withStatus(status: Vault.StateStatus): LinearStateQueryCriteria = copy(status = status)
fun withContractStateTypes(contractStateTypes: Set<Class<out ContractState>>): LinearStateQueryCriteria = copy(contractStateTypes = contractStateTypes)
fun withRelevancyStatus(relevancyStatus: Vault.RelevancyStatus): LinearStateQueryCriteria = copy(relevancyStatus = relevancyStatus)
fun withExactParticipants(exactParticipants: List<AbstractParty>): LinearStateQueryCriteria =
copy(exactParticipants = exactParticipants)
fun copy(
participants: List<AbstractParty>? = this.participants,
@ -272,6 +332,23 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
externalId: List<String>? = this.externalId,
status: Vault.StateStatus = this.status,
contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes
): LinearStateQueryCriteria {
return LinearStateQueryCriteria(
participants,
uuid,
externalId,
status,
contractStateTypes
)
}
fun copy(
participants: List<AbstractParty>? = this.participants,
uuid: List<UUID>? = this.uuid,
externalId: List<String>? = this.externalId,
status: Vault.StateStatus = this.status,
contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes,
relevancyStatus: Vault.RelevancyStatus = this.relevancyStatus
): LinearStateQueryCriteria {
return LinearStateQueryCriteria(
participants,
@ -317,8 +394,21 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
val issuerRef: List<OpaqueBytes>? = null,
override val status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
override val contractStateTypes: Set<Class<out ContractState>>? = null,
override val relevancyStatus: Vault.RelevancyStatus
override val relevancyStatus: Vault.RelevancyStatus,
override val exactParticipants: List<AbstractParty>? = null
) : CommonQueryCriteria() {
// V4 c'tor
@DeprecatedConstructorForDeserialization(version = 1)
constructor(
participants: List<AbstractParty>? = null,
owner: List<AbstractParty>? = null,
quantity: ColumnPredicate<Long>? = null,
issuer: List<AbstractParty>? = null,
issuerRef: List<OpaqueBytes>? = null,
status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
contractStateTypes: Set<Class<out ContractState>>? = null,
relevancyStatus: Vault.RelevancyStatus
) : this(participants, owner, quantity, issuer, issuerRef, status, contractStateTypes, relevancyStatus, null)
@JvmOverloads
@DeprecatedConstructorForDeserialization(version = 1)
constructor(
@ -344,6 +434,30 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
fun withStatus(status: Vault.StateStatus): FungibleAssetQueryCriteria = copy(status = status)
fun withContractStateTypes(contractStateTypes: Set<Class<out ContractState>>): FungibleAssetQueryCriteria = copy(contractStateTypes = contractStateTypes)
fun withRelevancyStatus(relevancyStatus: Vault.RelevancyStatus): FungibleAssetQueryCriteria = copy(relevancyStatus = relevancyStatus)
fun withExactParticipants(exactParticipants: List<AbstractParty>): FungibleAssetQueryCriteria
= copy(exactParticipants = exactParticipants)
fun copy(
participants: List<AbstractParty>? = this.participants,
owner: List<AbstractParty>? = this.owner,
quantity: ColumnPredicate<Long>? = this.quantity,
issuer: List<AbstractParty>? = this.issuer,
issuerRef: List<OpaqueBytes>? = this.issuerRef,
status: Vault.StateStatus = this.status,
contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes,
relevancyStatus: Vault.RelevancyStatus = this.relevancyStatus
): FungibleAssetQueryCriteria {
return FungibleAssetQueryCriteria(
participants,
owner,
quantity,
issuer,
issuerRef,
status,
contractStateTypes,
relevancyStatus
)
}
fun copy(
participants: List<AbstractParty>? = this.participants,

File diff suppressed because one or more lines are too long

View File

@ -86,7 +86,8 @@ There are four implementations of this interface which can be chained together t
1. ``VaultQueryCriteria`` provides filterable criteria on attributes within the Vault states table: status (UNCONSUMED,
CONSUMED), state reference(s), contract state type(s), notaries, soft locked states, timestamps (RECORDED, CONSUMED),
state constraints (see :ref:`Constraint Types <implicit_constraint_types>`), relevancy (ALL, RELEVANT, NON_RELEVANT).
state constraints (see :ref:`Constraint Types <implicit_constraint_types>`), relevancy (ALL, RELEVANT, NON_RELEVANT),
participants (exact or any match).
.. note:: Sensible defaults are defined for frequently used attributes (status = UNCONSUMED, always include soft
locked states).
@ -94,7 +95,7 @@ There are four implementations of this interface which can be chained together t
2. ``FungibleAssetQueryCriteria`` provides filterable criteria on attributes defined in the Corda Core
``FungibleAsset`` contract state interface, used to represent assets that are fungible, countable and issued by a
specific party (eg. ``Cash.State`` and ``CommodityContract.State`` in the Corda finance module). Filterable
attributes include: participants(s), owner(s), quantity, issuer party(s) and issuer reference(s).
attributes include: participants (exact or any match), owner(s), quantity, issuer party(s) and issuer reference(s).
.. note:: All contract states that extend the ``FungibleAsset`` now automatically persist that interfaces common
state attributes to the **vault_fungible_states** table.
@ -102,7 +103,7 @@ There are four implementations of this interface which can be chained together t
3. ``LinearStateQueryCriteria`` provides filterable criteria on attributes defined in the Corda Core ``LinearState``
and ``DealState`` contract state interfaces, used to represent entities that continuously supersede themselves, all
of which share the same ``linearId`` (e.g. trade entity states such as the ``IRSState`` defined in the SIMM
valuation demo). Filterable attributes include: participant(s), linearId(s), uuid(s), and externalId(s).
valuation demo). Filterable attributes include: participants (exact or any match), linearId(s), uuid(s), and externalId(s).
.. note:: All contract states that extend ``LinearState`` or ``DealState`` now automatically persist those
interfaces common state attributes to the **vault_linear_states** table.
@ -292,7 +293,7 @@ Query for unconsumed states for a given notary:
:end-before: DOCEND VaultQueryExample4
:dedent: 12
Query for unconsumed states for a given set of participants:
Query for unconsumed states for a given set of participants (matches any state that contains at least one of the specified participants):
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt
:language: kotlin
@ -300,6 +301,14 @@ Query for unconsumed states for a given set of participants:
:end-before: DOCEND VaultQueryExample5
:dedent: 12
Query for unconsumed states for a given set of participants (exactly matches only states that contain all specified participants):
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt
:language: kotlin
:start-after: DOCSTART VaultQueryExample51
:end-before: DOCEND VaultQueryExample51
:dedent: 12
Query for unconsumed states recorded between two time intervals:
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt
@ -364,7 +373,7 @@ Query for unconsumed deal states with deals references:
:end-before: DOCEND VaultQueryExample10
:dedent: 12
Query for unconsumed deal states with deals parties:
Query for unconsumed deal states with deals parties (any match):
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt
:language: kotlin
@ -372,6 +381,14 @@ Query for unconsumed deal states with deals parties:
:end-before: DOCEND VaultQueryExample11
:dedent: 12
Query for unconsumed deal states with deals parties (exact match):
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt
:language: kotlin
:start-after: DOCSTART VaultQueryExample52
:end-before: DOCEND VaultQueryExample52
:dedent: 12
Query for only relevant linear states in the vault:
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt

View File

@ -8,6 +8,8 @@ Unreleased
----------
* Moved and renamed the testing web server to the ``testing`` subproject. Also renamed the published artifact to ``corda-testserver.jar``.
* New Vault Query criteria to specify exact matches for specified participants.
* Support for Java 11 (compatibility mode). Please read https://github.com/corda/corda/pull/5356.
* Updating FinalityFlow with functionality to indicate the appropriate StatesToRecord. This allows the initiating party to record states

View File

@ -5,8 +5,9 @@ This document explains the coding style used in the Corda repository. You will b
recommendations when submitting patches for review. Please take the time to read them and internalise them, to save
time during code review.
What follows are *recommendations* and not *rules*. They are in places intentionally vague, so use your good judgement
when interpreting them.
What follows are mostly *recommendations* and not *rules*. They are in places intentionally vague, so use your good judgement
when interpreting them. The rules that are currently being enforced via the Detekt PR gateway can be found `here
<https://github.com/corda/corda/blob/release/os/4.3/detekt-config.yml>`_.
1. General style
################
@ -35,10 +36,9 @@ that doesn't mean it's always better. In particular:
1.1 Line Length and Spacing
---------------------------
We aim for line widths of no more than 120 characters. That is wide enough to avoid lots of pointless wrapping but
We aim for line widths of no more than 140 characters. That is wide enough to avoid lots of pointless wrapping but
narrow enough that with a widescreen monitor and a 12 point fixed width font (like Menlo) you can fit two files
next to each other. This is not a rigidly enforced rule and if wrapping a line would be excessively awkward, let it
overflow. Overflow of a few characters here and there isn't a big deal: the goal is general convenience.
next to each other. This is a rule that we enforce.
Where the number of parameters in a function, class, etc. causes an overflow past the end of the first line, they should
be structured one parameter per line.

View File

@ -133,7 +133,8 @@ class RpcReconnectTests {
}
val reconnect = GracefulReconnect(onDisconnect = { numDisconnects++ }, onReconnect = onReconnect)
val client = CordaRPCClient(addressesForRpc)
val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect).proxy as ReconnectingCordaRPCOps
val bankAReconnectingRPCConnection = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect)
val bankAReconnectingRpc = bankAReconnectingRPCConnection.proxy as ReconnectingCordaRPCOps
// DOCEND rpcReconnectingRPC
// Observe the vault and collect the observations.
@ -330,7 +331,7 @@ class RpcReconnectTests {
// Stop the observers.
vaultSubscription.unsubscribe()
stateMachineSubscription.unsubscribe()
bankAReconnectingRpc.close()
bankAReconnectingRPCConnection.close()
}
proxy.close()

View File

@ -471,8 +471,12 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
predicateSet.add(criteriaBuilder.and(vaultFungibleStatesRoot.get<ByteArray>("issuerRef").`in`(issuerRefs)))
}
if (criteria.participants != null && criteria.exactParticipants != null)
throw VaultQueryException("Cannot specify both participants (${criteria.participants}) and exactParticipants " +
"(${criteria.exactParticipants}).")
// Participants.
criteria.participants?.let {
if (criteria.participants != null || criteria.exactParticipants != null) {
// Join VaultFungibleState and PersistentParty tables (participant values are added to the common query criteria predicate)
val statePartyToFungibleStatesJoin = criteriaBuilder.and(
criteriaBuilder.equal(vaultFungibleStatesRoot.get<VaultSchemaV1.VaultFungibleStates>("stateRef"),
@ -508,8 +512,12 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
predicateSet.add(criteriaBuilder.and(vaultLinearStatesRoot.get<String>("externalId").`in`(externalIds)))
}
if (criteria.participants != null && criteria.exactParticipants != null)
throw VaultQueryException("Cannot specify both participants (${criteria.participants}) " +
"and exactParticipants (${criteria.exactParticipants}).")
// Participants.
criteria.participants?.let {
if (criteria.participants != null || criteria.exactParticipants != null) {
// Join VaultLinearState and PersistentParty tables (participant values are added to the common query criteria predicate)
val statePartyToLinearStatesJoin = criteriaBuilder.and(
criteriaBuilder.equal(vaultLinearStatesRoot.get<VaultSchemaV1.VaultLinearStates>("stateRef"),
@ -702,6 +710,38 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
}
}
// Exact participants
// Requires a tricky SQL query to ensure *only* exact matches are selected (eg. a transaction cannot have more nor less than the
// exact participants specified in the query criteria).
criteria.exactParticipants?.let {
val exactParticipants = criteria.exactParticipants!!
// obtain all transactions where other participants are not present
val subQueryNotExists = criteriaQuery.subquery(Tuple::class.java)
val subRoot = subQueryNotExists.from(VaultSchemaV1.PersistentParty::class.java)
subQueryNotExists.select(subRoot.get("x500Name"))
subQueryNotExists.where(criteriaBuilder.and(
criteriaBuilder.equal(vaultStates.get<VaultSchemaV1.VaultStates>("stateRef"),
subRoot.get<VaultSchemaV1.PersistentParty>("compositeKey").get<PersistentStateRef>("stateRef"))),
criteriaBuilder.not(subRoot.get<VaultSchemaV1.PersistentParty>("x500Name").`in`(exactParticipants)))
val subQueryNotExistsPredicate = criteriaBuilder.and(criteriaBuilder.not(criteriaBuilder.exists(subQueryNotExists)))
constraintPredicates.add(subQueryNotExistsPredicate)
// join with transactions for each matching participant (only required where more than one)
if (exactParticipants.size > 1)
exactParticipants.forEach { participant ->
val subQueryExists = criteriaQuery.subquery(Tuple::class.java)
val subRootExists = subQueryExists.from(VaultSchemaV1.PersistentParty::class.java)
subQueryExists.select(subRootExists.get("x500Name"))
subQueryExists.where(criteriaBuilder.and(
criteriaBuilder.equal(vaultStates.get<VaultSchemaV1.VaultStates>("stateRef"),
subRootExists.get<VaultSchemaV1.PersistentParty>("compositeKey").get<PersistentStateRef>("stateRef"))),
criteriaBuilder.equal(subRootExists.get<VaultSchemaV1.PersistentParty>("x500Name"), participant))
val subQueryExistsPredicate = criteriaBuilder.and(criteriaBuilder.exists(subQueryExists))
constraintPredicates.add(subQueryExistsPredicate)
}
}
return emptySet()
}

View File

@ -12,23 +12,19 @@
<include file="migration/node-core.changelog-v8.xml"/>
<include file="migration/node-core.changelog-tx-mapping.xml"/>
<include file="migration/node-core.changelog-v9.xml"/>
<!-- This migration was originally written within the same script as node-core.changelog-v14-data. However, node-core.changelog-v10
runs a service with mapped schema which now has the new table so the table should be created before hand. The transfer of data
can then happen later in node-core.changelog-v14-data. -->
<include file="migration/node-core.changelog-v14-table.xml"/>
<include file="migration/node-core.changelog-v10.xml"/>
<include file="migration/node-core.changelog-v11.xml"/>
<!-- This migration was originally written within the same script as node-core.changelog-v14-data. However, node-core.changelog-v12
runs a service with mapped schema which now has the new table so the table should be created before hand. The transfer of data
can then happen later in node-core.changelog-v14-data. -->
<include file="migration/node-core.changelog-v14-table.xml"/>
<include file="migration/node-core.changelog-v12.xml"/>
<!-- This changeset (which creates extra columns in the transactions tables), must be run before the vault state migration (in
vault-schema.changelog-v9.xml), as that will use the current hibernate mappings, and those require all DB columns to be
created. -->
<!-- This migration (which creates extra columns in the transactions tables), must be run before the vault state migration (in
vault-schema.changelog-v9.xml), as that will use the current hibernate mappings, and those require all DB columns to be created. -->
<include file="migration/node-core.changelog-v13.xml"/>
<!-- This change should be done before the v14-data migration. -->
<include file="migration/node-core.changelog-v15-table.xml"/>
<include file="migration/node-core.changelog-v15.xml"/>
<include file="migration/node-core.changelog-v16.xml"/>
<!-- This must run after node-core.changelog-init.xml, to prevent database columns being created twice. -->
<include file="migration/vault-schema.changelog-v9.xml"/>

View File

@ -20,6 +20,18 @@
</changeSet>
<changeSet author="R3.Corda" id="modify identity_value column type" dbms="postgresql">
<addColumn tableName="node_identities">
<column name="temp_identity_value" type="varbinary(64000)">
<constraints nullable="true"/>
</column>
</addColumn>
<sql>UPDATE node_identities SET temp_identity_value = lo_get(identity_value)</sql>
<addNotNullConstraint tableName="node_identities" columnName="temp_identity_value"/>
<dropColumn tableName="node_identities" columnName="identity_value"/>
<renameColumn tableName="node_identities" oldColumnName="temp_identity_value" newColumnName="identity_value"/>
</changeSet>
<changeSet author="R3.Corda" id="migrate_identity_service_to_use_publicKey.toShortString()">
<customChange class="net.corda.node.migration.PersistentIdentityMigration">
</customChange>

View File

@ -1,19 +0,0 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add-new-pk-hash-to-pk-table">
<createTable tableName="node_hash_to_key">
<column name="pk_hash" type="NVARCHAR(130)">
<constraints nullable="false"/>
</column>
<column name="public_key" type="blob">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="pk_hash" constraintName="node_hash_to_key_pk_hash" tableName="node_hash_to_key"/>
</changeSet>
</databaseChangeLog>

View File

@ -1,25 +1,31 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="modify identity_value column type" dbms="postgresql">
<preConditions onFail="MARK_RAN">
<not>
<sqlCheck expectedResult="bytea">
SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'node_identities' AND COLUMN_NAME = 'identity_value'
</sqlCheck>
</not>
</preConditions>
<dropColumn tableName="node_identities" columnName="identity_value"/>
<addColumn tableName="node_identities">
<column name="identity_value" type="varbinary(64000)">
<changeSet author="R3.Corda" id="add-new-pk-hash-to-pk-table" dbms="h2,mssql">
<createTable tableName="node_hash_to_key">
<column name="pk_hash" type="NVARCHAR(130)">
<constraints nullable="false"/>
</column>
</addColumn>
<column name="public_key" type="blob">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="pk_hash" constraintName="node_hash_to_key_pk_hash" tableName="node_hash_to_key"/>
</changeSet>
<changeSet author="R3.Corda" id="add-new-pk-hash-to-pk-table-postgresql" dbms="postgresql">
<createTable tableName="node_hash_to_key">
<column name="pk_hash" type="NVARCHAR(130)">
<constraints nullable="false"/>
</column>
<column name="public_key" type="varbinary(64000)">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="pk_hash" constraintName="node_hash_to_key_pk_hash" tableName="node_hash_to_key"/>
</changeSet>
</databaseChangeLog>

View File

@ -1,21 +0,0 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="modify public_key column type" dbms="postgresql">
<preConditions onFail="MARK_RAN">
<not>
<sqlCheck expectedResult="bytea">
SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'node_hash_to_key' AND COLUMN_NAME = 'public_key'
</sqlCheck>
</not>
</preConditions>
<dropColumn tableName="node_hash_to_key" columnName="public_key"/>
<addColumn tableName="node_hash_to_key">
<column name="public_key" type="varbinary(64000)">
<constraints nullable="false"/>
</column>
</addColumn>
</changeSet>
</databaseChangeLog>

View File

@ -241,6 +241,11 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
val criteria = VaultQueryCriteria(participants = listOf(BIG_CORP))
val results = vaultService.queryBy<ContractState>(criteria)
assertThat(results.states).hasSize(1)
// same query using strict participant matching
val strictCriteria = VaultQueryCriteria().withExactParticipants(listOf(BIG_CORP))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(0) // all states include node identity (MEGA_CORP)
}
}
@ -254,6 +259,11 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
val criteria = VaultQueryCriteria(participants = listOf(MINI_CORP, BIG_CORP))
val results = vaultService.queryBy<ContractState>(criteria)
assertThat(results.states).hasSize(2)
// same query using strict participant matching
val strictCriteria = VaultQueryCriteria().withExactParticipants(listOf(MEGA_CORP, BIG_CORP))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(1)
}
}
@ -795,10 +805,48 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
identitySvc.verifyAndRegisterIdentity(BIG_CORP_IDENTITY)
vaultFiller.fillWithSomeTestLinearStates(2, "TEST", participants = listOf(MEGA_CORP, MINI_CORP))
vaultFiller.fillWithSomeTestDeals(listOf("456"), participants = listOf(MEGA_CORP, BIG_CORP))
vaultFiller.fillWithSomeTestDeals(listOf("123", "789"), participants = listOf(BIG_CORP))
val criteria = LinearStateQueryCriteria(participants = listOf(BIG_CORP))
vaultFiller.fillWithSomeTestDeals(listOf("123", "789"), participants = listOf(MEGA_CORP))
val criteria = LinearStateQueryCriteria(participants = listOf(MEGA_CORP))
val results = vaultService.queryBy<ContractState>(criteria)
assertThat(results.states).hasSize(3)
assertThat(results.states).hasSize(5)
// same query using strict participant matching
val strictCriteria = LinearStateQueryCriteria().withExactParticipants(listOf(MEGA_CORP))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(2)
}
}
@Test
fun `unconsumed dummy states for exact single participant`() {
database.transaction {
identitySvc.verifyAndRegisterIdentity(BIG_CORP_IDENTITY)
vaultFiller.fillWithDummyState(participants = listOf(MEGA_CORP, MINI_CORP))
vaultFiller.fillWithDummyState(participants = listOf(MEGA_CORP, BIG_CORP))
vaultFiller.fillWithDummyState(participants = listOf(MEGA_CORP)) // exact match
val strictCriteria = VaultQueryCriteria(exactParticipants = listOf(MEGA_CORP))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(1)
}
}
@Test
fun `unconsumed dummy states for exact two participants`() {
database.transaction {
identitySvc.verifyAndRegisterIdentity(BIG_CORP_IDENTITY)
vaultFiller.fillWithDummyState(participants = listOf(MEGA_CORP, MINI_CORP))
vaultFiller.fillWithDummyState(participants = listOf(MEGA_CORP, BIG_CORP)) // exact match
vaultFiller.fillWithDummyState(participants = listOf(MEGA_CORP))
val strictCriteria = VaultQueryCriteria(exactParticipants = listOf(MEGA_CORP, BIG_CORP))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(1)
// same query using strict participant matching (unordered list of participants)
val strictCriteriaUnordered = VaultQueryCriteria(exactParticipants = listOf(BIG_CORP, MEGA_CORP))
val strictResultsUnordered = vaultService.queryBy<ContractState>(strictCriteriaUnordered)
assertThat(strictResultsUnordered.states).hasSize(1)
}
}
@ -813,8 +861,19 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
val criteria = LinearStateQueryCriteria(participants = listOf(BIG_CORP, MINI_CORP))
val results = vaultService.queryBy<ContractState>(criteria)
// DOCEND VaultQueryExample5
assertThat(results.states).hasSize(3)
// same query using strict participant matching
// DOCSTART VaultQueryExample51
val strictCriteria = LinearStateQueryCriteria(exactParticipants = listOf(MEGA_CORP, BIG_CORP))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
// DOCEND VaultQueryExample51
assertThat(strictResults.states).hasSize(1)
// same query using strict participant matching (unordered list of participants)
val strictCriteriaUnordered = LinearStateQueryCriteria(exactParticipants = listOf(BIG_CORP, MEGA_CORP))
val strictResultsUnordered = vaultService.queryBy<ContractState>(strictCriteriaUnordered)
assertThat(strictResultsUnordered.states).hasSize(1)
}
}
@ -1959,6 +2018,13 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
// DOCEND VaultQueryExample11
assertThat(results.states).hasSize(1)
// same query using strict participant matching
// DOCSTART VaultQueryExample52
val strictCriteria = LinearStateQueryCriteria().withExactParticipants(parties)
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
// DOCEND VaultQueryExample52
assertThat(strictResults.states).hasSize(0) // node identity included (MEGA_CORP)
}
}
@ -2352,6 +2418,11 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
assertThat(results.states).hasSize(1)
assertThat(results.states[0].state.data.linearId.externalId).isEqualTo("TEST1")
// same query using strict participant matching
val strictCriteria = LinearStateQueryCriteria().withExactParticipants(listOf(ALICE))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(0) // all states include node identity (MEGA_CORP)
}
}
@ -2369,6 +2440,11 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
assertThat(results.states).hasSize(1)
assertThat(results.states[0].state.data.linearId.externalId).isEqualTo("TEST1")
// same query using strict participant matching
val strictCriteria = LinearStateQueryCriteria().withExactParticipants(listOf(MEGA_CORP, ALICE, BOB, CHARLIE))
val strictResults = vaultService.queryBy<ContractState>(strictCriteria)
assertThat(strictResults.states).hasSize(1)
}
}
@ -2401,7 +2477,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
}
@Test
fun `composite query for fungible and linear states for multiple participants`() {
fun `composite query for fungible, linear and dummy states for multiple participants`() {
database.transaction {
identitySvc.verifyAndRegisterIdentity(ALICE_IDENTITY)
identitySvc.verifyAndRegisterIdentity(BOB_IDENTITY)

View File

@ -1,8 +1,8 @@
package net.corda.bank.api
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.GracefulReconnect
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
@ -35,7 +35,9 @@ object BankOfCordaClientApi {
*
* @return a payment transaction (following successful issuance of cash to self).
*/
fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction = requestRPCIssueHA(listOf(rpcAddress), params)
fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction {
return requestRPCIssueHA(listOf(rpcAddress), params)
}
/**
* RPC API
@ -44,20 +46,28 @@ object BankOfCordaClientApi {
*/
fun requestRPCIssueHA(availableRpcServers: List<NetworkHostAndPort>, params: IssueRequestParams): SignedTransaction {
// TODO: privileged security controls required
ReconnectingCordaRPCOps(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD, CordaRPCClientConfiguration.DEFAULT).use { rpc->
rpc.waitUntilNetworkReady().getOrThrow()
CordaRPCClient(availableRpcServers)
.start(BOC_RPC_USER, BOC_RPC_PWD, gracefulReconnect = GracefulReconnect()).use { rpc->
rpc.proxy.waitUntilNetworkReady().getOrThrow()
// Resolve parties via RPC
val issueToParty = rpc.wellKnownPartyFromX500Name(params.issueToPartyName)
val issueToParty = rpc.proxy.wellKnownPartyFromX500Name(params.issueToPartyName)
?: throw IllegalStateException("Unable to locate ${params.issueToPartyName} in Network Map Service")
val notaryLegalIdentity = rpc.notaryIdentities().firstOrNull { it.name == params.notaryName }
val notaryLegalIdentity = rpc.proxy.notaryIdentities().firstOrNull { it.name == params.notaryName }
?: throw IllegalStateException("Couldn't locate notary ${params.notaryName} in NetworkMapCache")
val anonymous = true
val issuerBankPartyRef = OpaqueBytes.of(params.issuerBankPartyRef.toByte())
logger.info("${rpc.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...")
return rpc.startFlow(::CashIssueAndPaymentFlow, params.amount, issuerBankPartyRef, issueToParty, anonymous, notaryLegalIdentity)
logger.info("${rpc.proxy.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...")
return rpc.proxy.startFlow(
::CashIssueAndPaymentFlow,
params.amount,
issuerBankPartyRef,
issueToParty,
anonymous,
notaryLegalIdentity
)
.returnValue.getOrThrow().stx
}
}

View File

@ -3,8 +3,9 @@ package net.corda.webserver.internal
import com.google.common.html.HtmlEscapers.htmlEscaper
import io.netty.channel.unix.Errors
import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.contextLogger
@ -46,8 +47,12 @@ class NodeWebServer(val config: WebServerConfig) {
}
fun run() {
while (server.isRunning) {
Thread.sleep(100) // TODO: Redesign
try {
while (server.isRunning) {
Thread.sleep(100) // TODO: Redesign
}
} finally {
rpc.close()
}
}
@ -75,7 +80,11 @@ class NodeWebServer(val config: WebServerConfig) {
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
val sslConnector = ServerConnector(
server,
SslConnectionFactory(sslContextFactory, "http/1.1"),
HttpConnectionFactory(httpsConfiguration)
)
sslConnector.port = address.port
sslConnector
} else {
@ -175,9 +184,21 @@ class NodeWebServer(val config: WebServerConfig) {
}
}
private fun reconnectingCordaRPCOps() = ReconnectingCordaRPCOps(config.rpcAddress, config.runAs.username , config.runAs.password, CordaRPCClientConfiguration.DEFAULT, null, javaClass.classLoader)
private lateinit var rpc: CordaRPCConnection
private fun reconnectingCordaRPCOps(): CordaRPCOps {
rpc = CordaRPCClient(config.rpcAddress, null, javaClass.classLoader)
.start(
config.runAs.username,
config.runAs.password,
GracefulReconnect()
)
return rpc.proxy
}
/** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */
/**
* Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry
* files that exist in the classpath
*/
val pluginRegistries: List<WebServerPluginRegistry> by lazy {
ServiceLoader.load(WebServerPluginRegistry::class.java).toList()
}

View File

@ -20,7 +20,6 @@ import net.corda.core.contracts.withoutIssuer
import net.corda.core.flows.FlowException
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
@ -94,15 +93,14 @@ class NewTransaction : Fragment() {
initOwner(window)
show()
}
val handle: FlowHandle<AbstractCashFlow.Result> = when (request) {
is IssueAndPaymentRequest -> rpcProxy.value!!.startFlow(::CashIssueAndPaymentFlow, request)
is PaymentRequest -> rpcProxy.value!!.startFlow(::CashPaymentFlow, request)
is ExitRequest -> rpcProxy.value!!.startFlow(::CashExitFlow, request)
else -> throw IllegalArgumentException("Unexpected request type: $request")
}
runAsync {
try {
handle.returnValue.getOrThrow()
when (request) {
is IssueAndPaymentRequest -> rpcProxy.value!!.startFlow(::CashIssueAndPaymentFlow, request)
is PaymentRequest -> rpcProxy.value!!.startFlow(::CashPaymentFlow, request)
is ExitRequest -> rpcProxy.value!!.startFlow(::CashExitFlow, request)
else -> throw IllegalArgumentException("Unexpected request type: $request")
}.returnValue.getOrThrow()
} finally {
dialog.dialogPane.isDisable = false
}

View File

@ -11,8 +11,8 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
@ -22,7 +22,6 @@ import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.packageName_
import net.corda.core.messaging.*
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
@ -91,21 +90,24 @@ object InteractiveShell {
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
rpcOps = { username: String, password: String ->
if (standalone) {
ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT, configuration.ssl, classLoader).also {
rpcConn = it
}
val connection = if (standalone) {
CordaRPCClient(
configuration.hostAndPort,
configuration.ssl,
classLoader
).start(username, password, gracefulReconnect = GracefulReconnect())
} else {
val client = CordaRPCClient(hostAndPort = configuration.hostAndPort,
CordaRPCClient(
hostAndPort = configuration.hostAndPort,
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 1
),
sslConfiguration = configuration.ssl,
classLoader = classLoader)
val connection = client.start(username, password)
rpcConn = connection
connection.proxy as InternalCordaRPCOps
classLoader = classLoader
).start(username, password)
}
rpcConn = connection
connection.proxy as InternalCordaRPCOps
}
_startShell(configuration, classLoader)
}