Made problematic CordaRPCClient c'tor private (with internal bridge methods) and added correct c'tors for public use. (#1653)

initialiseSerialization param has also been removed.
This commit is contained in:
Shams Asari 2017-09-27 12:17:31 +01:00 committed by josecoll
parent 98e7a8c42a
commit 324804eabc
13 changed files with 75 additions and 55 deletions

View File

@ -27,9 +27,9 @@ import net.corda.finance.flows.CashExitFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.testing.*
import net.corda.testing.driver.driver
import net.corda.testing.node.DriverBasedTest
@ -71,7 +71,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed()
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
monitor.register(aliceNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password, initialiseSerialization = false)
monitor.register(aliceNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password)
rpc = monitor.proxyObservable.value!!
notaryParty = notaryHandle.nodeInfo.legalIdentities[1]
@ -79,7 +79,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
bobNode = bobNodeHandle.nodeInfo
val monitorBob = NodeMonitorModel()
stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed()
monitorBob.register(bobNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password, initialiseSerialization = false)
monitorBob.register(bobNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password)
rpcBob = monitorBob.proxyObservable.value!!
runTest()
}

View File

@ -55,13 +55,10 @@ class NodeMonitorModel {
* Register for updates to/from a given vault.
* TODO provide an unsubscribe mechanism
*/
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, initialiseSerialization: Boolean = true) {
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
val client = CordaRPCClient(
hostAndPort = nodeHostAndPort,
configuration = CordaRPCClientConfiguration.default.copy(
connectionMaxRetryInterval = 10.seconds
),
initialiseSerialization = initialiseSerialization
nodeHostAndPort,
CordaRPCClientConfiguration.DEFAULT.copy(connectionMaxRetryInterval = 10.seconds)
)
val connection = client.start(username, password)
val proxy = connection.proxy

View File

@ -8,12 +8,12 @@ import net.corda.core.utilities.OpaqueBytes;
import net.corda.finance.flows.AbstractCashFlow;
import net.corda.finance.flows.CashIssueFlow;
import net.corda.finance.flows.CashPaymentFlow;
import net.corda.finance.schemas.*;
import net.corda.finance.schemas.CashSchemaV1;
import net.corda.node.internal.Node;
import net.corda.node.internal.StartedNode;
import net.corda.node.services.transactions.ValidatingNotaryService;
import net.corda.nodeapi.internal.ServiceInfo;
import net.corda.nodeapi.User;
import net.corda.nodeapi.internal.ServiceInfo;
import net.corda.testing.CoreTestUtils;
import net.corda.testing.node.NodeBasedTest;
import org.junit.After;
@ -24,14 +24,15 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import static java.util.Collections.*;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static kotlin.test.AssertionsKt.assertEquals;
import static net.corda.client.rpc.CordaRPCClientConfiguration.getDefault;
import static net.corda.finance.Currencies.DOLLARS;
import static net.corda.finance.contracts.GetBalances.getCashBalance;
import static net.corda.node.services.FlowPermissions.startFlowPermission;
import static net.corda.testing.CoreTestUtils.*;
import static net.corda.testing.CoreTestUtils.setCordappPackages;
import static net.corda.testing.CoreTestUtils.unsetCordappPackages;
import static net.corda.testing.TestConstants.getALICE;
public class CordaRPCJavaClientTest extends NodeBasedTest {
@ -56,7 +57,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest {
CordaFuture<StartedNode<Node>> nodeFuture = startNode(getALICE().getName(), 1, services, singletonList(rpcUser), emptyMap());
node = nodeFuture.get();
node.getInternals().registerCustomSchemas(Collections.singleton(CashSchemaV1.INSTANCE));
client = new CordaRPCClient(requireNonNull(node.getInternals().getConfiguration().getRpcAddress()), null, getDefault(), false);
client = new CordaRPCClient(requireNonNull(node.getInternals().getConfiguration().getRpcAddress()));
}
@After

View File

@ -19,9 +19,9 @@ import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.nodeapi.User
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.testing.ALICE
import net.corda.testing.chooseIdentity
import net.corda.testing.node.NodeBasedTest
@ -54,7 +54,7 @@ class CordaRPCClientTest : NodeBasedTest() {
setCordappPackages("net.corda.finance.contracts")
node = startNode(ALICE.name, rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow()
node.internals.registerCustomSchemas(setOf(CashSchemaV1))
client = CordaRPCClient(node.internals.configuration.rpcAddress!!, initialiseSerialization = false)
client = CordaRPCClient(node.internals.configuration.rpcAddress!!)
}
@After

View File

@ -35,8 +35,8 @@ data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration)
/**
* Returns the default configuration we recommend you use.
*/
@JvmStatic
val default = CordaRPCClientConfiguration(connectionMaxRetryInterval = RPCClientConfiguration.default.connectionMaxRetryInterval)
@JvmField
val DEFAULT = CordaRPCClientConfiguration(connectionMaxRetryInterval = RPCClientConfiguration.default.connectionMaxRetryInterval)
}
}
@ -66,19 +66,18 @@ data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration)
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
*/
class CordaRPCClient @JvmOverloads constructor(
hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default,
initialiseSerialization: Boolean = true
class CordaRPCClient
private constructor(hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) {
constructor(hostAndPort: NetworkHostAndPort) : this(hostAndPort, sslConfiguration = null)
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration) :
this(hostAndPort, sslConfiguration = null, configuration = configuration)
init {
// Init serialization. It's plausible there are multiple clients in a single JVM, so be tolerant of
// others having registered first.
// TODO: allow clients to have serialization factory etc injected and align with RPC protocol version?
if (initialiseSerialization) {
KryoClientSerializationScheme.initialiseSerialization()
}
KryoClientSerializationScheme.initialiseSerialization()
}
private val rpcClient = RPCClient<CordaRPCOps>(
@ -108,4 +107,13 @@ class CordaRPCClient @JvmOverloads constructor(
inline fun <A> use(username: String, password: String, block: (CordaRPCConnection) -> A): A {
return start(username, password).use(block)
}
companion object {
// This is workaround for making the default c'tor private
internal fun bridgeCordaRPCClient(hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT): CordaRPCClient {
return CordaRPCClient(hostAndPort, sslConfiguration, configuration)
}
}
}

View File

@ -0,0 +1,12 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.config.SSLConfiguration
fun internalCordaRPCClient(hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration?,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT): CordaRPCClient {
return CordaRPCClient.bridgeCordaRPCClient(hostAndPort, sslConfiguration, configuration)
}

View File

@ -5,6 +5,7 @@ import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.internal.serialization.*
import java.util.concurrent.atomic.AtomicBoolean
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
@ -23,7 +24,9 @@ class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
companion object {
val isInitialised = AtomicBoolean(false)
fun initialiseSerialization() {
if (!isInitialised.compareAndSet(false, true)) return
try {
SerializationDefaults.SERIALIZATION_FACTORY = SerializationFactoryImpl().apply {
registerScheme(KryoClientSerializationScheme())
@ -31,10 +34,14 @@ class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
}
SerializationDefaults.P2P_CONTEXT = KRYO_P2P_CONTEXT
SerializationDefaults.RPC_CLIENT_CONTEXT = KRYO_RPC_CLIENT_CONTEXT
} catch(e: IllegalStateException) {
} catch (e: IllegalStateException) {
// Check that it's registered as we expect
check(SerializationDefaults.SERIALIZATION_FACTORY is SerializationFactoryImpl) { "RPC client encountered conflicting configuration of serialization subsystem." }
check((SerializationDefaults.SERIALIZATION_FACTORY as SerializationFactoryImpl).alreadyRegisteredSchemes.any { it is KryoClientSerializationScheme }) { "RPC client encountered conflicting configuration of serialization subsystem." }
val factory = SerializationDefaults.SERIALIZATION_FACTORY
val checkedFactory = factory as? SerializationFactoryImpl
?: throw IllegalStateException("RPC client encountered conflicting configuration of serialization subsystem: $factory")
check(checkedFactory.alreadyRegisteredSchemes.any { it is KryoClientSerializationScheme }) {
"RPC client encountered conflicting configuration of serialization subsystem."
}
}
}
}

View File

@ -1,7 +1,7 @@
package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.internal.internalCordaRPCClient
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowLogic
@ -154,7 +154,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
fun loginToRPC(target: NetworkHostAndPort, rpcUser: User, sslConfiguration: SSLConfiguration? = null): CordaRPCOps {
return CordaRPCClient(target, sslConfiguration, initialiseSerialization = false).start(rpcUser.username, rpcUser.password).proxy
return internalCordaRPCClient(target, sslConfiguration).start(rpcUser.username, rpcUser.password).proxy
}
fun loginToRPCAndGetClientQueue(): String {

View File

@ -20,15 +20,11 @@ import net.corda.core.utilities.seconds
import net.corda.finance.plugin.registerFinanceJSONMappers
import net.corda.irs.contract.InterestRateSwap
import net.corda.irs.utilities.uploadFile
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.IntegrationTestCategory
import net.corda.testing.chooseIdentity
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.testing.*
import net.corda.testing.driver.driver
import net.corda.testing.http.HttpApi
import org.apache.commons.io.IOUtils
@ -101,7 +97,7 @@ class IRSDemoTest : IntegrationTestCategory {
}
private fun getFixingDateObservable(config: FullNodeConfiguration): Observable<LocalDate?> {
val client = CordaRPCClient(config.rpcAddress!!, initialiseSerialization = false)
val client = CordaRPCClient(config.rpcAddress!!)
val proxy = client.start("user", "password").proxy
val vaultUpdates = proxy.vaultTrackBy<InterestRateSwap.State>().updates

View File

@ -10,13 +10,11 @@ import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.schemas.CommercialPaperSchemaV1
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.nodeapi.User
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.testing.*
import net.corda.testing.driver.poll
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.setCordappPackages
import net.corda.testing.unsetCordappPackages
import net.corda.traderdemo.flow.BuyerFlow
import net.corda.traderdemo.flow.CommercialPaperIssueFlow
import net.corda.traderdemo.flow.SellerFlow
@ -56,11 +54,11 @@ class TraderDemoTest : NodeBasedTest() {
nodeB.internals.registerCustomSchemas(setOf(CashSchemaV1, CommercialPaperSchemaV1))
val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map {
val client = CordaRPCClient(it.internals.configuration.rpcAddress!!, initialiseSerialization = false)
val client = CordaRPCClient(it.internals.configuration.rpcAddress!!)
client.start(demoUser.username, demoUser.password).proxy
}
val nodeBankRpc = let {
val client = CordaRPCClient(bankNode.internals.configuration.rpcAddress!!, initialiseSerialization = false)
val client = CordaRPCClient(bankNode.internals.configuration.rpcAddress!!)
client.start(bankUser.username, bankUser.password).proxy
}

View File

@ -6,6 +6,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.internal.internalCordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.cordform.NodeDefinition
@ -23,8 +24,6 @@ import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.nodeapi.internal.ServiceType
import net.corda.node.services.config.*
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
@ -33,6 +32,8 @@ import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.parseAs
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.nodeapi.internal.ServiceType
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.*
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
@ -217,7 +218,7 @@ sealed class NodeHandle {
}
}
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!, initialiseSerialization = false)
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!)
/**
* Stops the referenced node.
@ -628,7 +629,7 @@ class DriverDSL(
}
private fun establishRpc(nodeAddress: NetworkHostAndPort, sslConfig: SSLConfiguration, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val client = CordaRPCClient(nodeAddress, sslConfig, initialiseSerialization = false)
val client = internalCordaRPCClient(nodeAddress, sslConfig)
val connectionFuture = poll(executorService, "RPC connection") {
try {
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)

View File

@ -17,11 +17,11 @@ inline fun <T> withTestSerialization(block: () -> T): T {
}
fun initialiseTestSerialization() {
// Stop the CordaRPCClient from trying to setup the defaults as we're about to do it now
KryoClientSerializationScheme.isInitialised.set(true)
// Check that everything is configured for testing with mutable delegating instances.
try {
check(SerializationDefaults.SERIALIZATION_FACTORY is TestSerializationFactory) {
"Found non-test serialization configuration: ${SerializationDefaults.SERIALIZATION_FACTORY}"
}
check(SerializationDefaults.SERIALIZATION_FACTORY is TestSerializationFactory)
} catch(e: IllegalStateException) {
SerializationDefaults.SERIALIZATION_FACTORY = TestSerializationFactory()
}

View File

@ -2,7 +2,7 @@ package net.corda.webserver.internal
import com.google.common.html.HtmlEscapers.htmlEscaper
import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.internal.internalCordaRPCClient
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.ArtemisMessagingComponent
@ -206,7 +206,7 @@ class NodeWebServer(val config: WebServerConfig) {
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
log.info("Connecting to node at ${config.p2pAddress} as node user")
val client = CordaRPCClient(config.p2pAddress, config)
val client = internalCordaRPCClient(config.p2pAddress, config)
val connection = client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return connection.proxy
}