mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
Merge remote-tracking branch 'open/master' into aslemmer-merge-19-Feb
This commit is contained in:
@ -10,6 +10,7 @@ import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.Id
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.Try
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
@ -72,6 +73,8 @@ object RPCApi {
|
||||
const val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions"
|
||||
const val RPC_TARGET_LEGAL_IDENTITY = "rpc-target-legal-identity"
|
||||
|
||||
const val DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME = "deduplication-sequence-number"
|
||||
|
||||
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
|
||||
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
|
||||
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
|
||||
@ -94,6 +97,8 @@ object RPCApi {
|
||||
OBSERVABLES_CLOSED
|
||||
}
|
||||
|
||||
abstract fun writeToClientMessage(message: ClientMessage)
|
||||
|
||||
/**
|
||||
* Request to a server to trigger the specified method with the provided arguments.
|
||||
*
|
||||
@ -105,13 +110,13 @@ object RPCApi {
|
||||
data class RpcRequest(
|
||||
val clientAddress: SimpleString,
|
||||
val methodName: String,
|
||||
val serialisedArguments: ByteArray,
|
||||
val serialisedArguments: OpaqueBytes,
|
||||
val replyId: InvocationId,
|
||||
val sessionId: SessionId,
|
||||
val externalTrace: Trace? = null,
|
||||
val impersonatedActor: Actor? = null
|
||||
) : ClientToServer() {
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
override fun writeToClientMessage(message: ClientMessage) {
|
||||
MessageUtil.setJMSReplyTo(message, clientAddress)
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REQUEST.ordinal)
|
||||
|
||||
@ -122,12 +127,12 @@ object RPCApi {
|
||||
impersonatedActor?.mapToImpersonated(message)
|
||||
|
||||
message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName)
|
||||
message.bodyBuffer.writeBytes(serialisedArguments)
|
||||
message.bodyBuffer.writeBytes(serialisedArguments.bytes)
|
||||
}
|
||||
}
|
||||
|
||||
data class ObservablesClosed(val ids: List<InvocationId>) : ClientToServer() {
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
override fun writeToClientMessage(message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVABLES_CLOSED.ordinal)
|
||||
val buffer = message.bodyBuffer
|
||||
buffer.writeInt(ids.size)
|
||||
@ -144,7 +149,7 @@ object RPCApi {
|
||||
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
|
||||
clientAddress = MessageUtil.getJMSReplyTo(message),
|
||||
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
|
||||
serialisedArguments = message.getBodyAsByteArray(),
|
||||
serialisedArguments = OpaqueBytes(message.getBodyAsByteArray()),
|
||||
replyId = message.replyId(),
|
||||
sessionId = message.sessionId(),
|
||||
externalTrace = message.externalTrace(),
|
||||
@ -175,13 +180,20 @@ object RPCApi {
|
||||
|
||||
abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage)
|
||||
|
||||
/** Reply in response to an [ClientToServer.RpcRequest]. */
|
||||
/** The identity used to identify the deduplication ID sequence. This should be unique per server JVM run */
|
||||
abstract val deduplicationIdentity: String
|
||||
|
||||
/**
|
||||
* Reply in response to an [ClientToServer.RpcRequest].
|
||||
*/
|
||||
data class RpcReply(
|
||||
val id: InvocationId,
|
||||
val result: Try<Any?>
|
||||
val result: Try<Any?>,
|
||||
override val deduplicationIdentity: String
|
||||
) : ServerToClient() {
|
||||
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REPLY.ordinal)
|
||||
message.putStringProperty(DEDUPLICATION_IDENTITY_FIELD_NAME, deduplicationIdentity)
|
||||
id.mapTo(message, RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME)
|
||||
message.bodyBuffer.writeBytes(result.safeSerialize(context) { Try.Failure<Any>(it) }.bytes)
|
||||
}
|
||||
@ -189,10 +201,12 @@ object RPCApi {
|
||||
|
||||
data class Observation(
|
||||
val id: InvocationId,
|
||||
val content: Notification<*>
|
||||
val content: Notification<*>,
|
||||
override val deduplicationIdentity: String
|
||||
) : ServerToClient() {
|
||||
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVATION.ordinal)
|
||||
message.putStringProperty(DEDUPLICATION_IDENTITY_FIELD_NAME, deduplicationIdentity)
|
||||
id.mapTo(message, OBSERVABLE_ID_FIELD_NAME, OBSERVABLE_ID_TIMESTAMP_FIELD_NAME)
|
||||
message.bodyBuffer.writeBytes(content.safeSerialize(context) { Notification.createOnError<Void?>(it) }.bytes)
|
||||
}
|
||||
@ -207,17 +221,26 @@ object RPCApi {
|
||||
|
||||
fun fromClientMessage(context: SerializationContext, message: ClientMessage): ServerToClient {
|
||||
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
|
||||
val deduplicationIdentity = message.getStringProperty(DEDUPLICATION_IDENTITY_FIELD_NAME)
|
||||
return when (tag) {
|
||||
RPCApi.ServerToClient.Tag.RPC_REPLY -> {
|
||||
val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id)
|
||||
RpcReply(id, message.getBodyAsByteArray().deserialize(context = poolWithIdContext))
|
||||
RpcReply(
|
||||
id = id,
|
||||
deduplicationIdentity = deduplicationIdentity,
|
||||
result = message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
|
||||
)
|
||||
}
|
||||
RPCApi.ServerToClient.Tag.OBSERVATION -> {
|
||||
val observableId = message.invocationId(OBSERVABLE_ID_FIELD_NAME, OBSERVABLE_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, observableId)
|
||||
val payload = message.getBodyAsByteArray().deserialize<Notification<*>>(context = poolWithIdContext)
|
||||
Observation(observableId, payload)
|
||||
Observation(
|
||||
id = observableId,
|
||||
deduplicationIdentity = deduplicationIdentity,
|
||||
content = payload
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -225,18 +248,6 @@ object RPCApi {
|
||||
}
|
||||
}
|
||||
|
||||
data class ArtemisProducer(
|
||||
val sessionFactory: ClientSessionFactory,
|
||||
val session: ClientSession,
|
||||
val producer: ClientProducer
|
||||
)
|
||||
|
||||
data class ArtemisConsumer(
|
||||
val sessionFactory: ClientSessionFactory,
|
||||
val session: ClientSession,
|
||||
val consumer: ClientConsumer
|
||||
)
|
||||
|
||||
private val TAG_FIELD_NAME = "tag"
|
||||
private val RPC_ID_FIELD_NAME = "rpc-id"
|
||||
private val RPC_ID_TIMESTAMP_FIELD_NAME = "rpc-id-timestamp"
|
||||
@ -249,6 +260,7 @@ private val RPC_EXTERNAL_SESSION_ID_TIMESTAMP_FIELD_NAME = "rpc-external-session
|
||||
private val RPC_IMPERSONATED_ACTOR_ID = "rpc-impersonated-actor-id"
|
||||
private val RPC_IMPERSONATED_ACTOR_STORE_ID = "rpc-impersonated-actor-store-id"
|
||||
private val RPC_IMPERSONATED_ACTOR_OWNING_LEGAL_IDENTITY = "rpc-impersonated-actor-owningLegalIdentity"
|
||||
private val DEDUPLICATION_IDENTITY_FIELD_NAME = "deduplication-identity"
|
||||
private val OBSERVABLE_ID_FIELD_NAME = "observable-id"
|
||||
private val OBSERVABLE_ID_TIMESTAMP_FIELD_NAME = "observable-id-timestamp"
|
||||
private val METHOD_NAME_FIELD_NAME = "method-name"
|
||||
|
@ -10,6 +10,12 @@ import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
|
||||
interface ArtemisSessionProvider {
|
||||
fun start(): ArtemisMessagingClient.Started
|
||||
fun stop()
|
||||
val started: ArtemisMessagingClient.Started?
|
||||
}
|
||||
|
||||
class ArtemisMessagingClient(
|
||||
private val config: SSLConfiguration,
|
||||
private val serverAddress: NetworkHostAndPort,
|
||||
@ -17,17 +23,17 @@ class ArtemisMessagingClient(
|
||||
private val autoCommitSends: Boolean = true,
|
||||
private val autoCommitAcks: Boolean = true,
|
||||
private val confirmationWindowSize: Int = -1
|
||||
) {
|
||||
): ArtemisSessionProvider {
|
||||
companion object {
|
||||
private val log = loggerFor<ArtemisMessagingClient>()
|
||||
}
|
||||
|
||||
class Started(val serverLocator: ServerLocator, val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer)
|
||||
|
||||
var started: Started? = null
|
||||
override var started: Started? = null
|
||||
private set
|
||||
|
||||
fun start(): Started = synchronized(this) {
|
||||
override fun start(): Started = synchronized(this) {
|
||||
check(started == null) { "start can't be called twice" }
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
@ -53,7 +59,7 @@ class ArtemisMessagingClient(
|
||||
return Started(locator, sessionFactory, session, producer).also { started = it }
|
||||
}
|
||||
|
||||
fun stop() = synchronized(this) {
|
||||
override fun stop() = synchronized(this) {
|
||||
started?.run {
|
||||
producer.close()
|
||||
// Ensure any trailing messages are committed to the journal
|
||||
|
@ -0,0 +1,30 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import com.google.common.cache.CacheLoader
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
/**
|
||||
* A class allowing the deduplication of a strictly incrementing sequence number.
|
||||
*/
|
||||
class DeduplicationChecker(cacheExpiry: Duration) {
|
||||
// dedupe identity -> watermark cache
|
||||
private val watermarkCache = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
|
||||
.build(WatermarkCacheLoader)
|
||||
|
||||
private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong>() {
|
||||
override fun load(key: Any) = AtomicLong(-1)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param identity the identity that generates the sequence numbers.
|
||||
* @param sequenceNumber the sequence number to check.
|
||||
* @return true if the message is unique, false if it's a duplicate.
|
||||
*/
|
||||
fun checkDuplicateMessageId(identity: Any, sequenceNumber: Long): Boolean {
|
||||
return watermarkCache[identity].getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber
|
||||
}
|
||||
}
|
@ -12,6 +12,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
@ -35,7 +36,7 @@ import kotlin.concurrent.withLock
|
||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
|
||||
class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||
@ -43,7 +44,9 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
|
||||
private val keyStore = config.loadSslKeyStore().internal
|
||||
private val keyStorePrivateKeyPassword: String = config.keyStorePassword
|
||||
private val trustStore = config.loadTrustStore().internal
|
||||
private var artemis: ArtemisMessagingClient? = null
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
|
||||
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
companion object {
|
||||
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
|
||||
@ -64,7 +67,7 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
|
||||
keyStorePrivateKeyPassword: String,
|
||||
trustStore: KeyStore,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
private val artemis: ArtemisMessagingClient) {
|
||||
private val artemis: ArtemisSessionProvider) {
|
||||
companion object {
|
||||
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||
}
|
||||
@ -155,9 +158,8 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
|
||||
val address = node.addresses.single()
|
||||
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
|
||||
private fun gatherAddresses(node: NodeInfo): List<ArtemisMessagingComponent.NodeAddress> {
|
||||
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, node.addresses[0]) }
|
||||
}
|
||||
|
||||
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
|
||||
@ -191,7 +193,7 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
|
||||
|
||||
override fun start() {
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
|
||||
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize)
|
||||
val artemis = artemisMessageClientFactory()
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
@ -18,14 +19,17 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import java.util.*
|
||||
|
||||
class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||
val p2pAddress: NetworkHostAndPort,
|
||||
val maxMessageSize: Int) : AutoCloseable {
|
||||
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
|
||||
private val bridgeId: String = UUID.randomUUID().toString()
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize)
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, artemisMessageClientFactory)
|
||||
private val validInboundQueues = mutableSetOf<String>()
|
||||
private var artemis: ArtemisMessagingClient? = null
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private var controlConsumer: ClientConsumer? = null
|
||||
|
||||
constructor(config: NodeSSLConfiguration,
|
||||
p2pAddress: NetworkHostAndPort,
|
||||
maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
@ -33,7 +37,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||
fun start() {
|
||||
stop()
|
||||
bridgeManager.start()
|
||||
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize)
|
||||
val artemis = artemisMessageClientFactory()
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
val artemisClient = artemis.started!!
|
||||
@ -56,6 +60,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
validInboundQueues.clear()
|
||||
controlConsumer?.close()
|
||||
controlConsumer = null
|
||||
artemis?.stop()
|
||||
@ -65,6 +70,10 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||
|
||||
override fun close() = stop()
|
||||
|
||||
fun validateReceiveTopic(topic: String): Boolean {
|
||||
return topic in validInboundQueues
|
||||
}
|
||||
|
||||
private fun validateInboxQueueName(queueName: String): Boolean {
|
||||
return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
|
||||
}
|
||||
@ -90,7 +99,6 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||
for (outQueue in controlMessage.sendQueues) {
|
||||
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
|
||||
}
|
||||
// TODO For now we just record the inboxes, but we don't use the information, but eventually out of process bridges will use this for validating inbound messages.
|
||||
validInboundQueues.addAll(controlMessage.inboxQueues)
|
||||
}
|
||||
is BridgeControl.BridgeToNodeSnapshotRequest -> {
|
||||
|
@ -47,6 +47,7 @@ private val _contextDatabase = ThreadLocal<CordaPersistence>()
|
||||
var contextDatabase: CordaPersistence
|
||||
get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
|
||||
set(database) = _contextDatabase.set(database)
|
||||
val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
|
||||
|
||||
class CordaPersistence(
|
||||
val dataSource: DataSource,
|
||||
|
@ -168,12 +168,14 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
||||
val transport = event.transport
|
||||
log.debug { "Transport Head Closed $transport" }
|
||||
transport.close_tail()
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
override fun onTransportTailClosed(event: Event) {
|
||||
val transport = event.transport
|
||||
log.debug { "Transport Tail Closed $transport" }
|
||||
transport.close_head()
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
override fun onTransportClosed(event: Event) {
|
||||
@ -195,6 +197,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
||||
} else {
|
||||
log.info("Error (no description returned).")
|
||||
}
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
override fun onTransport(event: Event) {
|
||||
|
@ -79,7 +79,10 @@ internal class EventProcessor(channel: Channel,
|
||||
if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) {
|
||||
val now = System.currentTimeMillis()
|
||||
val tickDelay = Math.max(0L, connection.transport.tick(now) - now)
|
||||
executor.schedule({ tick(connection) }, tickDelay, TimeUnit.MILLISECONDS)
|
||||
executor.schedule({
|
||||
tick(connection)
|
||||
processEvents()
|
||||
}, tickDelay, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
connection.transport.close()
|
||||
|
@ -38,8 +38,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
|
||||
private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler")
|
||||
private lateinit var remoteAddress: InetSocketAddress
|
||||
private lateinit var localCert: X509Certificate
|
||||
private lateinit var remoteCert: X509Certificate
|
||||
private var localCert: X509Certificate? = null
|
||||
private var remoteCert: X509Certificate? = null
|
||||
private var eventProcessor: EventProcessor? = null
|
||||
|
||||
override fun channelActive(ctx: ChannelHandlerContext) {
|
||||
@ -51,7 +51,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
|
||||
private fun createAMQPEngine(ctx: ChannelHandlerContext) {
|
||||
val ch = ctx.channel()
|
||||
eventProcessor = EventProcessor(ch, serverMode, localCert.subjectX500Principal.toString(), remoteCert.subjectX500Principal.toString(), userName, password)
|
||||
eventProcessor = EventProcessor(ch, serverMode, localCert!!.subjectX500Principal.toString(), remoteCert!!.subjectX500Principal.toString(), userName, password)
|
||||
val connection = eventProcessor!!.connection
|
||||
val transport = connection.transport as ProtonJTransport
|
||||
if (trace) {
|
||||
@ -72,7 +72,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||
val ch = ctx.channel()
|
||||
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
||||
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, null, false)))
|
||||
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false)))
|
||||
eventProcessor?.close()
|
||||
ctx.fireChannelInactive()
|
||||
}
|
||||
@ -84,7 +84,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
localCert = sslHandler.engine().session.localCertificates[0].x509
|
||||
remoteCert = sslHandler.engine().session.peerCertificates[0].x509
|
||||
try {
|
||||
val remoteX500Name = CordaX500Name.build(remoteCert.subjectX500Principal)
|
||||
val remoteX500Name = CordaX500Name.build(remoteCert!!.subjectX500Principal)
|
||||
require(allowedRemoteLegalNames == null || remoteX500Name in allowedRemoteLegalNames)
|
||||
log.info("handshake completed subject: $remoteX500Name")
|
||||
} catch (ex: IllegalArgumentException) {
|
||||
@ -124,7 +124,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
require(inetAddress == remoteAddress) {
|
||||
"Message for incorrect endpoint"
|
||||
}
|
||||
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert.subjectX500Principal)) {
|
||||
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
|
||||
"Message for incorrect legal identity"
|
||||
}
|
||||
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||
|
@ -38,7 +38,7 @@ class AMQPServer(val hostName: String,
|
||||
private val userName: String?,
|
||||
private val password: String?,
|
||||
private val keyStore: KeyStore,
|
||||
private val keyStorePrivateKeyPassword: String,
|
||||
private val keyStorePrivateKeyPassword: CharArray,
|
||||
private val trustStore: KeyStore,
|
||||
private val trace: Boolean = false) : AutoCloseable {
|
||||
|
||||
@ -59,15 +59,21 @@ class AMQPServer(val hostName: String,
|
||||
private var serverChannel: Channel? = null
|
||||
private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>()
|
||||
|
||||
init {
|
||||
}
|
||||
constructor(hostName: String,
|
||||
port: Int,
|
||||
userName: String?,
|
||||
password: String?,
|
||||
keyStore: KeyStore,
|
||||
keyStorePrivateKeyPassword: String,
|
||||
trustStore: KeyStore,
|
||||
trace: Boolean = false) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, trace)
|
||||
|
||||
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
|
||||
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
|
||||
|
||||
init {
|
||||
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray())
|
||||
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword)
|
||||
trustManagerFactory.init(parent.trustStore)
|
||||
}
|
||||
|
||||
@ -169,6 +175,13 @@ class AMQPServer(val hostName: String,
|
||||
}
|
||||
}
|
||||
|
||||
fun dropConnection(connectionRemoteHost: InetSocketAddress) {
|
||||
val channel = clientChannels[connectionRemoteHost]
|
||||
if (channel != null) {
|
||||
channel.close()
|
||||
}
|
||||
}
|
||||
|
||||
fun complete(delivery: Delivery, target: InetSocketAddress) {
|
||||
val channel = clientChannels[target]
|
||||
channel?.apply {
|
||||
|
@ -213,8 +213,13 @@ internal fun <T : Any> propertiesForSerializationFromConstructor(
|
||||
|
||||
return mutableListOf<PropertyAccessor>().apply {
|
||||
kotlinConstructor.parameters.withIndex().forEach { param ->
|
||||
val name = param.value.name ?: throw NotSerializableException(
|
||||
"Constructor parameter of $clazz has no name.")
|
||||
// If a parameter doesn't have a name *at all* then chances are it's a synthesised
|
||||
// one. A good example of this is non static nested classes in Java where instances
|
||||
// of the nested class require access to the outer class without breaking
|
||||
// encapsulation. Thus a parameter is inserted into the constructor that passes a
|
||||
// reference to the enclosing class. In this case we can't do anything with
|
||||
// it so just ignore it as it'll be supplied at runtime anyway on invocation
|
||||
val name = param.value.name ?: return@forEach
|
||||
|
||||
val propertyReader = if (name in classProperties) {
|
||||
if (classProperties[name]!!.getter != null) {
|
||||
|
@ -1,5 +1,8 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import net.corda.core.contracts.ContractState;
|
||||
import net.corda.core.identity.AbstractParty;
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist;
|
||||
import net.corda.core.serialization.SerializedBytes;
|
||||
import org.apache.qpid.proton.codec.DecoderImpl;
|
||||
@ -9,6 +12,7 @@ import org.junit.Test;
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.NotSerializableException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -237,4 +241,24 @@ public class JavaSerializationOutputTests {
|
||||
BoxedFooNotNull obj = new BoxedFooNotNull("Hello World!", 123);
|
||||
serdes(obj);
|
||||
}
|
||||
|
||||
protected class DummyState implements ContractState {
|
||||
@Override
|
||||
public List<AbstractParty> getParticipants() {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dummyStateSerialize() throws NotSerializableException {
|
||||
SerializerFactory factory1 = new SerializerFactory(
|
||||
AllWhitelist.INSTANCE,
|
||||
ClassLoader.getSystemClassLoader(),
|
||||
new EvolutionSerializerGetter(),
|
||||
new SerializerFingerPrinter());
|
||||
|
||||
SerializationOutput serializer = new SerializationOutput(factory1);
|
||||
|
||||
serializer.serialize(new DummyState());
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.MockCordappConfigProvider
|
||||
import net.corda.testing.services.MockAttachmentStorage
|
||||
import org.junit.Assert.*
|
||||
import org.junit.Rule
|
||||
@ -58,7 +59,7 @@ class AttachmentsClassLoaderStaticContractTests {
|
||||
}
|
||||
|
||||
private val serviceHub = rigorousMock<ServicesForResolution>().also {
|
||||
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.nodeapi.internal")), MockAttachmentStorage())).whenever(it).cordappProvider
|
||||
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.nodeapi.internal")), MockCordappConfigProvider(), MockAttachmentStorage())).whenever(it).cordappProvider
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -23,6 +23,7 @@ import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.kryoSpecific
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.MockCordappConfigProvider
|
||||
import net.corda.testing.services.MockAttachmentStorage
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.junit.Assert.*
|
||||
@ -57,7 +58,7 @@ class AttachmentsClassLoaderTests {
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
private val attachments = MockAttachmentStorage()
|
||||
private val cordappProvider = CordappProviderImpl(CordappLoader.createDevMode(listOf(ISOLATED_CONTRACTS_JAR_PATH)), attachments)
|
||||
private val cordappProvider = CordappProviderImpl(CordappLoader.createDevMode(listOf(ISOLATED_CONTRACTS_JAR_PATH)), MockCordappConfigProvider(), attachments)
|
||||
private val cordapp get() = cordappProvider.cordapps.first()
|
||||
private val attachmentId get() = cordappProvider.getCordappAttachmentId(cordapp)!!
|
||||
private val appContext get() = cordappProvider.getAppContext(cordapp)
|
||||
|
@ -25,7 +25,7 @@ class ContractAttachmentSerializerTest {
|
||||
private lateinit var factory: SerializationFactory
|
||||
private lateinit var context: SerializationContext
|
||||
private lateinit var contextWithToken: SerializationContext
|
||||
private val mockServices = MockServices(emptyList(), rigorousMock(), CordaX500Name("MegaCorp", "London", "GB"))
|
||||
private val mockServices = MockServices(emptyList(), CordaX500Name("MegaCorp", "London", "GB"), rigorousMock())
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
|
Reference in New Issue
Block a user