mirror of
https://github.com/corda/corda.git
synced 2025-05-29 13:44:25 +00:00
Merge remote-tracking branch 'open/master' into andrius/merge-06-21-2
This commit is contained in:
commit
a9cdb576e6
@ -408,6 +408,7 @@ bintrayConfig {
|
|||||||
'corda-confidential-identities',
|
'corda-confidential-identities',
|
||||||
'corda-launcher',
|
'corda-launcher',
|
||||||
'corda-shell',
|
'corda-shell',
|
||||||
|
'corda-shell-cli',
|
||||||
'corda-serialization',
|
'corda-serialization',
|
||||||
'corda-serialization-deterministic',
|
'corda-serialization-deterministic',
|
||||||
'corda-tools-blob-inspector',
|
'corda-tools-blob-inspector',
|
||||||
|
@ -16,7 +16,7 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.internal.VisibleForTesting
|
import net.corda.core.internal.VisibleForTesting
|
||||||
import net.corda.core.node.NodeInfo
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||||
@ -34,7 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA
|
|||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.MDC
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import java.security.KeyStore
|
import java.security.KeyStore
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
@ -88,9 +88,28 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
|||||||
private val maxMessageSize: Int) {
|
private val maxMessageSize: Int) {
|
||||||
companion object {
|
companion object {
|
||||||
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||||
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
|
private fun withMDC(block: () -> Unit) {
|
||||||
|
MDC.put("queueName", queueName)
|
||||||
|
MDC.put("target", target.toString())
|
||||||
|
MDC.put("bridgeName", bridgeName)
|
||||||
|
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||||
|
MDC.put("maxMessageSize", maxMessageSize.toString())
|
||||||
|
block()
|
||||||
|
MDC.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logDebugWithMDC(msg: () -> String) {
|
||||||
|
if (log.isDebugEnabled) {
|
||||||
|
withMDC { log.debug(msg()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||||
|
|
||||||
|
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||||
|
|
||||||
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail,
|
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail,
|
||||||
sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize)
|
sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize)
|
||||||
@ -101,13 +120,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
|||||||
private var connectedSubscription: Subscription? = null
|
private var connectedSubscription: Subscription? = null
|
||||||
|
|
||||||
fun start() {
|
fun start() {
|
||||||
log.info("Create new AMQP bridge")
|
logInfoWithMDC("Create new AMQP bridge")
|
||||||
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) })
|
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) })
|
||||||
amqpClient.start()
|
amqpClient.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun stop() {
|
fun stop() {
|
||||||
log.info("Stopping AMQP bridge")
|
logInfoWithMDC("Stopping AMQP bridge")
|
||||||
lock.withLock {
|
lock.withLock {
|
||||||
synchronized(artemis) {
|
synchronized(artemis) {
|
||||||
consumer?.apply {
|
consumer?.apply {
|
||||||
@ -133,7 +152,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
|||||||
lock.withLock {
|
lock.withLock {
|
||||||
synchronized(artemis) {
|
synchronized(artemis) {
|
||||||
if (connected) {
|
if (connected) {
|
||||||
log.info("Bridge Connected")
|
logInfoWithMDC("Bridge Connected")
|
||||||
val sessionFactory = artemis.started!!.sessionFactory
|
val sessionFactory = artemis.started!!.sessionFactory
|
||||||
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||||
this.session = session
|
this.session = session
|
||||||
@ -142,7 +161,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
|||||||
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
|
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
|
||||||
session.start()
|
session.start()
|
||||||
} else {
|
} else {
|
||||||
log.info("Bridge Disconnected")
|
logInfoWithMDC("Bridge Disconnected")
|
||||||
consumer?.apply {
|
consumer?.apply {
|
||||||
if (!isClosed) {
|
if (!isClosed) {
|
||||||
close()
|
close()
|
||||||
@ -162,7 +181,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
|||||||
|
|
||||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||||
if (artemisMessage.bodySize > maxMessageSize) {
|
if (artemisMessage.bodySize > maxMessageSize) {
|
||||||
log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
|
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
|
||||||
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||||
// Ack the message to prevent same message being sent to us again.
|
// Ack the message to prevent same message being sent to us again.
|
||||||
artemisMessage.acknowledge()
|
artemisMessage.acknowledge()
|
||||||
@ -179,18 +198,18 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
|||||||
properties[key] = value
|
properties[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||||
legalNames.first().toString(),
|
legalNames.first().toString(),
|
||||||
properties)
|
properties)
|
||||||
sendableMessage.onComplete.then {
|
sendableMessage.onComplete.then {
|
||||||
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||||
lock.withLock {
|
lock.withLock {
|
||||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||||
artemisMessage.acknowledge()
|
artemisMessage.acknowledge()
|
||||||
} else {
|
} else {
|
||||||
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||||
// We need to commit any acknowledged messages before rolling back the failed
|
// We need to commit any acknowledged messages before rolling back the failed
|
||||||
// (unacknowledged) message.
|
// (unacknowledged) message.
|
||||||
session?.commit()
|
session?.commit()
|
||||||
|
@ -16,7 +16,7 @@ import io.netty.buffer.Unpooled
|
|||||||
import io.netty.channel.Channel
|
import io.netty.channel.Channel
|
||||||
import io.netty.channel.ChannelHandlerContext
|
import io.netty.channel.ChannelHandlerContext
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
@ -33,7 +33,7 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode
|
|||||||
import org.apache.qpid.proton.engine.*
|
import org.apache.qpid.proton.engine.*
|
||||||
import org.apache.qpid.proton.message.Message
|
import org.apache.qpid.proton.message.Message
|
||||||
import org.apache.qpid.proton.message.ProtonJMessage
|
import org.apache.qpid.proton.message.ProtonJMessage
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.MDC
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.util.*
|
import java.util.*
|
||||||
@ -45,7 +45,7 @@ import java.util.*
|
|||||||
* but this threading lock is managed by the EventProcessor class that calls this.
|
* but this threading lock is managed by the EventProcessor class that calls this.
|
||||||
* It ultimately posts application packets to/from from the netty transport pipeline.
|
* It ultimately posts application packets to/from from the netty transport pipeline.
|
||||||
*/
|
*/
|
||||||
internal class ConnectionStateMachine(serverMode: Boolean,
|
internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||||
collector: Collector,
|
collector: Collector,
|
||||||
private val localLegalName: String,
|
private val localLegalName: String,
|
||||||
private val remoteLegalName: String,
|
private val remoteLegalName: String,
|
||||||
@ -53,10 +53,28 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
password: String?) : BaseHandler() {
|
password: String?) : BaseHandler() {
|
||||||
companion object {
|
companion object {
|
||||||
private const val IDLE_TIMEOUT = 10000
|
private const val IDLE_TIMEOUT = 10000
|
||||||
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun withMDC(block: () -> Unit) {
|
||||||
|
MDC.put("serverMode", serverMode.toString())
|
||||||
|
MDC.put("localLegalName", localLegalName)
|
||||||
|
MDC.put("remoteLegalName", remoteLegalName)
|
||||||
|
block()
|
||||||
|
MDC.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logDebugWithMDC(msg: () -> String) {
|
||||||
|
if (log.isDebugEnabled) {
|
||||||
|
withMDC { log.debug(msg()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||||
|
|
||||||
|
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
|
||||||
|
|
||||||
val connection: Connection
|
val connection: Connection
|
||||||
private val log = LoggerFactory.getLogger(localLegalName)
|
|
||||||
private val transport: Transport
|
private val transport: Transport
|
||||||
private val id = UUID.randomUUID().toString()
|
private val id = UUID.randomUUID().toString()
|
||||||
private var session: Session? = null
|
private var session: Session? = null
|
||||||
@ -103,12 +121,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onConnectionInit(event: Event) {
|
override fun onConnectionInit(event: Event) {
|
||||||
val connection = event.connection
|
val connection = event.connection
|
||||||
log.debug { "Connection init $connection" }
|
logDebugWithMDC { "Connection init $connection" }
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onConnectionLocalOpen(event: Event) {
|
override fun onConnectionLocalOpen(event: Event) {
|
||||||
val connection = event.connection
|
val connection = event.connection
|
||||||
log.info("Connection local open $connection")
|
logInfoWithMDC("Connection local open $connection")
|
||||||
val session = connection.session()
|
val session = connection.session()
|
||||||
session.open()
|
session.open()
|
||||||
this.session = session
|
this.session = session
|
||||||
@ -119,7 +137,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onConnectionLocalClose(event: Event) {
|
override fun onConnectionLocalClose(event: Event) {
|
||||||
val connection = event.connection
|
val connection = event.connection
|
||||||
log.info("Connection local close $connection")
|
logInfoWithMDC("Connection local close $connection")
|
||||||
connection.close()
|
connection.close()
|
||||||
connection.free()
|
connection.free()
|
||||||
}
|
}
|
||||||
@ -137,7 +155,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onConnectionFinal(event: Event) {
|
override fun onConnectionFinal(event: Event) {
|
||||||
val connection = event.connection
|
val connection = event.connection
|
||||||
log.debug { "Connection final $connection" }
|
logDebugWithMDC { "Connection final $connection" }
|
||||||
if (connection == this.connection) {
|
if (connection == this.connection) {
|
||||||
this.connection.context = null
|
this.connection.context = null
|
||||||
for (queue in messageQueues.values) {
|
for (queue in messageQueues.values) {
|
||||||
@ -177,21 +195,21 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onTransportHeadClosed(event: Event) {
|
override fun onTransportHeadClosed(event: Event) {
|
||||||
val transport = event.transport
|
val transport = event.transport
|
||||||
log.debug { "Transport Head Closed $transport" }
|
logDebugWithMDC { "Transport Head Closed $transport" }
|
||||||
transport.close_tail()
|
transport.close_tail()
|
||||||
onTransportInternal(transport)
|
onTransportInternal(transport)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onTransportTailClosed(event: Event) {
|
override fun onTransportTailClosed(event: Event) {
|
||||||
val transport = event.transport
|
val transport = event.transport
|
||||||
log.debug { "Transport Tail Closed $transport" }
|
logDebugWithMDC { "Transport Tail Closed $transport" }
|
||||||
transport.close_head()
|
transport.close_head()
|
||||||
onTransportInternal(transport)
|
onTransportInternal(transport)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onTransportClosed(event: Event) {
|
override fun onTransportClosed(event: Event) {
|
||||||
val transport = event.transport
|
val transport = event.transport
|
||||||
log.debug { "Transport Closed $transport" }
|
logDebugWithMDC { "Transport Closed $transport" }
|
||||||
if (transport == this.transport) {
|
if (transport == this.transport) {
|
||||||
transport.unbind()
|
transport.unbind()
|
||||||
transport.free()
|
transport.free()
|
||||||
@ -201,19 +219,19 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onTransportError(event: Event) {
|
override fun onTransportError(event: Event) {
|
||||||
val transport = event.transport
|
val transport = event.transport
|
||||||
log.info("Transport Error $transport")
|
logInfoWithMDC("Transport Error $transport")
|
||||||
val condition = event.transport.condition
|
val condition = event.transport.condition
|
||||||
if (condition != null) {
|
if (condition != null) {
|
||||||
log.info("Error: ${condition.description}")
|
logInfoWithMDC("Error: ${condition.description}")
|
||||||
} else {
|
} else {
|
||||||
log.info("Error (no description returned).")
|
logInfoWithMDC("Error (no description returned).")
|
||||||
}
|
}
|
||||||
onTransportInternal(transport)
|
onTransportInternal(transport)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onTransport(event: Event) {
|
override fun onTransport(event: Event) {
|
||||||
val transport = event.transport
|
val transport = event.transport
|
||||||
log.debug { "Transport $transport" }
|
logDebugWithMDC { "Transport $transport" }
|
||||||
onTransportInternal(transport)
|
onTransportInternal(transport)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,12 +248,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onSessionInit(event: Event) {
|
override fun onSessionInit(event: Event) {
|
||||||
val session = event.session
|
val session = event.session
|
||||||
log.debug { "Session init $session" }
|
logDebugWithMDC { "Session init $session" }
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onSessionLocalOpen(event: Event) {
|
override fun onSessionLocalOpen(event: Event) {
|
||||||
val session = event.session
|
val session = event.session
|
||||||
log.debug { "Session local open $session" }
|
logDebugWithMDC { "Session local open $session" }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getSender(target: String): Sender {
|
private fun getSender(target: String): Sender {
|
||||||
@ -261,14 +279,14 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onSessionLocalClose(event: Event) {
|
override fun onSessionLocalClose(event: Event) {
|
||||||
val session = event.session
|
val session = event.session
|
||||||
log.debug { "Session local close $session" }
|
logDebugWithMDC { "Session local close $session" }
|
||||||
session.close()
|
session.close()
|
||||||
session.free()
|
session.free()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onSessionFinal(event: Event) {
|
override fun onSessionFinal(event: Event) {
|
||||||
val session = event.session
|
val session = event.session
|
||||||
log.debug { "Session final $session" }
|
logDebugWithMDC { "Session final $session" }
|
||||||
if (session == this.session) {
|
if (session == this.session) {
|
||||||
this.session = null
|
this.session = null
|
||||||
}
|
}
|
||||||
@ -277,12 +295,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
override fun onLinkLocalOpen(event: Event) {
|
override fun onLinkLocalOpen(event: Event) {
|
||||||
val link = event.link
|
val link = event.link
|
||||||
if (link is Sender) {
|
if (link is Sender) {
|
||||||
log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
|
logDebugWithMDC { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
|
||||||
senders[link.target.address] = link
|
senders[link.target.address] = link
|
||||||
transmitMessages(link)
|
transmitMessages(link)
|
||||||
}
|
}
|
||||||
if (link is Receiver) {
|
if (link is Receiver) {
|
||||||
log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
|
logDebugWithMDC { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
|
||||||
receivers[link.target.address] = link
|
receivers[link.target.address] = link
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -291,7 +309,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
val link = event.link
|
val link = event.link
|
||||||
if (link is Receiver) {
|
if (link is Receiver) {
|
||||||
if (link.remoteTarget is Coordinator) {
|
if (link.remoteTarget is Coordinator) {
|
||||||
log.debug { "Coordinator link received" }
|
logDebugWithMDC { "Coordinator link received" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -299,11 +317,11 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
override fun onLinkFinal(event: Event) {
|
override fun onLinkFinal(event: Event) {
|
||||||
val link = event.link
|
val link = event.link
|
||||||
if (link is Sender) {
|
if (link is Sender) {
|
||||||
log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" }
|
logDebugWithMDC { "Sender Link final ${link.name} ${link.source} ${link.target}" }
|
||||||
senders.remove(link.target.address)
|
senders.remove(link.target.address)
|
||||||
}
|
}
|
||||||
if (link is Receiver) {
|
if (link is Receiver) {
|
||||||
log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
|
logDebugWithMDC { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
|
||||||
receivers.remove(link.target.address)
|
receivers.remove(link.target.address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -311,12 +329,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
override fun onLinkFlow(event: Event) {
|
override fun onLinkFlow(event: Event) {
|
||||||
val link = event.link
|
val link = event.link
|
||||||
if (link is Sender) {
|
if (link is Sender) {
|
||||||
log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
|
logDebugWithMDC { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
|
||||||
if (senders.containsKey(link.target.address)) {
|
if (senders.containsKey(link.target.address)) {
|
||||||
transmitMessages(link)
|
transmitMessages(link)
|
||||||
}
|
}
|
||||||
} else if (link is Receiver) {
|
} else if (link is Receiver) {
|
||||||
log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
|
logDebugWithMDC { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,7 +345,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
private fun transmitMessages(sender: Sender) {
|
private fun transmitMessages(sender: Sender) {
|
||||||
val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() })
|
val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() })
|
||||||
while (sender.credit > 0) {
|
while (sender.credit > 0) {
|
||||||
log.debug { "Sender credit: ${sender.credit}" }
|
logDebugWithMDC { "Sender credit: ${sender.credit}" }
|
||||||
val nextMessage = messageQueue.poll()
|
val nextMessage = messageQueue.poll()
|
||||||
if (nextMessage != null) {
|
if (nextMessage != null) {
|
||||||
try {
|
try {
|
||||||
@ -338,7 +356,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
delivery.context = nextMessage
|
delivery.context = nextMessage
|
||||||
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
|
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
|
||||||
nextMessage.status = MessageStatus.Sent
|
nextMessage.status = MessageStatus.Sent
|
||||||
log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
|
logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||||
unackedQueue.offer(nextMessage)
|
unackedQueue.offer(nextMessage)
|
||||||
sender.advance()
|
sender.advance()
|
||||||
} finally {
|
} finally {
|
||||||
@ -352,7 +370,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
|
|
||||||
override fun onDelivery(event: Event) {
|
override fun onDelivery(event: Event) {
|
||||||
val delivery = event.delivery
|
val delivery = event.delivery
|
||||||
log.debug { "Delivery $delivery" }
|
logDebugWithMDC { "Delivery $delivery" }
|
||||||
val link = delivery.link
|
val link = delivery.link
|
||||||
if (link is Receiver) {
|
if (link is Receiver) {
|
||||||
if (delivery.isReadable && !delivery.isPartial) {
|
if (delivery.isReadable && !delivery.isPartial) {
|
||||||
@ -376,7 +394,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
appProperties,
|
appProperties,
|
||||||
channel,
|
channel,
|
||||||
delivery)
|
delivery)
|
||||||
log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
|
logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
|
||||||
channel.writeAndFlush(receivedMessage)
|
channel.writeAndFlush(receivedMessage)
|
||||||
if (link.current() == delivery) {
|
if (link.current() == delivery) {
|
||||||
link.advance()
|
link.advance()
|
||||||
@ -387,7 +405,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (link is Sender) {
|
} else if (link is Sender) {
|
||||||
log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
|
logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
|
||||||
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
|
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
|
||||||
val sourceMessage = delivery.context as? SendableMessageImpl
|
val sourceMessage = delivery.context as? SendableMessageImpl
|
||||||
unackedQueue.remove(sourceMessage)
|
unackedQueue.remove(sourceMessage)
|
||||||
@ -405,7 +423,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
buffer.readBytes(bytes)
|
buffer.readBytes(bytes)
|
||||||
return Unpooled.wrappedBuffer(bytes)
|
return Unpooled.wrappedBuffer(bytes)
|
||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
log.error("Unable to encode message as AMQP packet", ex)
|
logErrorWithMDC("Unable to encode message as AMQP packet", ex)
|
||||||
throw ex
|
throw ex
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -13,7 +13,7 @@ package net.corda.nodeapi.internal.protonwrapper.engine
|
|||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.channel.Channel
|
import io.netty.channel.Channel
|
||||||
import io.netty.channel.ChannelHandlerContext
|
import io.netty.channel.ChannelHandlerContext
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
@ -26,7 +26,7 @@ import org.apache.qpid.proton.engine.*
|
|||||||
import org.apache.qpid.proton.engine.impl.CollectorImpl
|
import org.apache.qpid.proton.engine.impl.CollectorImpl
|
||||||
import org.apache.qpid.proton.reactor.FlowController
|
import org.apache.qpid.proton.reactor.FlowController
|
||||||
import org.apache.qpid.proton.reactor.Handshaker
|
import org.apache.qpid.proton.reactor.Handshaker
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.MDC
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
@ -40,16 +40,30 @@ import kotlin.concurrent.withLock
|
|||||||
* Everything here is single threaded, because the proton-j library has to be run that way.
|
* Everything here is single threaded, because the proton-j library has to be run that way.
|
||||||
*/
|
*/
|
||||||
internal class EventProcessor(channel: Channel,
|
internal class EventProcessor(channel: Channel,
|
||||||
serverMode: Boolean,
|
private val serverMode: Boolean,
|
||||||
localLegalName: String,
|
private val localLegalName: String,
|
||||||
remoteLegalName: String,
|
private val remoteLegalName: String,
|
||||||
userName: String?,
|
userName: String?,
|
||||||
password: String?) : BaseHandler() {
|
password: String?) : BaseHandler() {
|
||||||
companion object {
|
companion object {
|
||||||
private const val FLOW_WINDOW_SIZE = 10
|
private const val FLOW_WINDOW_SIZE = 10
|
||||||
|
private val log = contextLogger()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun withMDC(block: () -> Unit) {
|
||||||
|
MDC.put("serverMode", serverMode.toString())
|
||||||
|
MDC.put("localLegalName", localLegalName)
|
||||||
|
MDC.put("remoteLegalName", remoteLegalName)
|
||||||
|
block()
|
||||||
|
MDC.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logDebugWithMDC(msg: () -> String) {
|
||||||
|
if (log.isDebugEnabled) {
|
||||||
|
withMDC { log.debug(msg()) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val log = LoggerFactory.getLogger(localLegalName)
|
|
||||||
private val lock = ReentrantLock()
|
private val lock = ReentrantLock()
|
||||||
private var pendingExecute: Boolean = false
|
private var pendingExecute: Boolean = false
|
||||||
private val executor: ScheduledExecutorService = channel.eventLoop()
|
private val executor: ScheduledExecutorService = channel.eventLoop()
|
||||||
@ -104,16 +118,16 @@ internal class EventProcessor(channel: Channel,
|
|||||||
fun processEvents() {
|
fun processEvents() {
|
||||||
lock.withLock {
|
lock.withLock {
|
||||||
pendingExecute = false
|
pendingExecute = false
|
||||||
log.debug { "Process Events" }
|
logDebugWithMDC { "Process Events" }
|
||||||
while (true) {
|
while (true) {
|
||||||
val ev = popEvent() ?: break
|
val ev = popEvent() ?: break
|
||||||
log.debug { "Process event: $ev" }
|
logDebugWithMDC { "Process event: $ev" }
|
||||||
for (handler in handlers) {
|
for (handler in handlers) {
|
||||||
handler.handle(ev)
|
handler.handle(ev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stateMachine.processTransport()
|
stateMachine.processTransport()
|
||||||
log.debug { "Process Events Done" }
|
logDebugWithMDC { "Process Events Done" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ import io.netty.handler.ssl.SslHandler
|
|||||||
import io.netty.handler.ssl.SslHandshakeCompletionEvent
|
import io.netty.handler.ssl.SslHandshakeCompletionEvent
|
||||||
import io.netty.util.ReferenceCountUtil
|
import io.netty.util.ReferenceCountUtil
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.nodeapi.internal.crypto.x509
|
import net.corda.nodeapi.internal.crypto.x509
|
||||||
import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor
|
import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
@ -31,7 +31,7 @@ import org.apache.qpid.proton.engine.ProtonJTransport
|
|||||||
import org.apache.qpid.proton.engine.Transport
|
import org.apache.qpid.proton.engine.Transport
|
||||||
import org.apache.qpid.proton.engine.impl.ProtocolTracer
|
import org.apache.qpid.proton.engine.impl.ProtocolTracer
|
||||||
import org.apache.qpid.proton.framing.TransportFrame
|
import org.apache.qpid.proton.framing.TransportFrame
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.MDC
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.nio.channels.ClosedChannelException
|
import java.nio.channels.ClosedChannelException
|
||||||
import java.security.cert.X509Certificate
|
import java.security.cert.X509Certificate
|
||||||
@ -49,7 +49,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
private val onOpen: (Pair<SocketChannel, ConnectionChange>) -> Unit,
|
private val onOpen: (Pair<SocketChannel, ConnectionChange>) -> Unit,
|
||||||
private val onClose: (Pair<SocketChannel, ConnectionChange>) -> Unit,
|
private val onClose: (Pair<SocketChannel, ConnectionChange>) -> Unit,
|
||||||
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
|
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
|
||||||
private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler")
|
companion object {
|
||||||
|
private val log = contextLogger()
|
||||||
|
}
|
||||||
|
|
||||||
private lateinit var remoteAddress: InetSocketAddress
|
private lateinit var remoteAddress: InetSocketAddress
|
||||||
private var localCert: X509Certificate? = null
|
private var localCert: X509Certificate? = null
|
||||||
private var remoteCert: X509Certificate? = null
|
private var remoteCert: X509Certificate? = null
|
||||||
@ -57,11 +60,34 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
private var suppressClose: Boolean = false
|
private var suppressClose: Boolean = false
|
||||||
private var badCert: Boolean = false
|
private var badCert: Boolean = false
|
||||||
|
|
||||||
|
private fun withMDC(block: () -> Unit) {
|
||||||
|
MDC.put("serverMode", serverMode.toString())
|
||||||
|
MDC.put("remoteAddress", remoteAddress.toString())
|
||||||
|
MDC.put("localCert", localCert?.subjectDN?.toString())
|
||||||
|
MDC.put("remoteCert", remoteCert?.subjectDN?.toString())
|
||||||
|
MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() })
|
||||||
|
block()
|
||||||
|
MDC.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logDebugWithMDC(msg: () -> String) {
|
||||||
|
if (log.isDebugEnabled) {
|
||||||
|
withMDC { log.debug(msg()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||||
|
|
||||||
|
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||||
|
|
||||||
|
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
|
||||||
|
|
||||||
|
|
||||||
override fun channelActive(ctx: ChannelHandlerContext) {
|
override fun channelActive(ctx: ChannelHandlerContext) {
|
||||||
val ch = ctx.channel()
|
val ch = ctx.channel()
|
||||||
remoteAddress = ch.remoteAddress() as InetSocketAddress
|
remoteAddress = ch.remoteAddress() as InetSocketAddress
|
||||||
val localAddress = ch.localAddress() as InetSocketAddress
|
val localAddress = ch.localAddress() as InetSocketAddress
|
||||||
log.info("New client connection ${ch.id()} from $remoteAddress to $localAddress")
|
logInfoWithMDC("New client connection ${ch.id()} from $remoteAddress to $localAddress")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createAMQPEngine(ctx: ChannelHandlerContext) {
|
private fun createAMQPEngine(ctx: ChannelHandlerContext) {
|
||||||
@ -72,11 +98,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
if (trace) {
|
if (trace) {
|
||||||
transport.protocolTracer = object : ProtocolTracer {
|
transport.protocolTracer = object : ProtocolTracer {
|
||||||
override fun sentFrame(transportFrame: TransportFrame) {
|
override fun sentFrame(transportFrame: TransportFrame) {
|
||||||
log.info("${transportFrame.body}")
|
logInfoWithMDC("${transportFrame.body}")
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun receivedFrame(transportFrame: TransportFrame) {
|
override fun receivedFrame(transportFrame: TransportFrame) {
|
||||||
log.info("${transportFrame.body}")
|
logInfoWithMDC("${transportFrame.body}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -86,7 +112,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
|
|
||||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||||
val ch = ctx.channel()
|
val ch = ctx.channel()
|
||||||
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
||||||
if (!suppressClose) {
|
if (!suppressClose) {
|
||||||
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)))
|
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)))
|
||||||
}
|
}
|
||||||
@ -107,29 +133,29 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
CordaX500Name.build(remoteCert!!.subjectX500Principal)
|
CordaX500Name.build(remoteCert!!.subjectX500Principal)
|
||||||
} catch (ex: IllegalArgumentException) {
|
} catch (ex: IllegalArgumentException) {
|
||||||
badCert = true
|
badCert = true
|
||||||
log.error("Certificate subject not a valid CordaX500Name", ex)
|
logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex)
|
||||||
ctx.close()
|
ctx.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
|
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
|
||||||
badCert = true
|
badCert = true
|
||||||
log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
|
logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
|
||||||
ctx.close()
|
ctx.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.info("Handshake completed with subject: $remoteX500Name")
|
logInfoWithMDC("Handshake completed with subject: $remoteX500Name")
|
||||||
createAMQPEngine(ctx)
|
createAMQPEngine(ctx)
|
||||||
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
|
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
|
||||||
} else {
|
} else {
|
||||||
// This happens when the peer node is closed during SSL establishment.
|
// This happens when the peer node is closed during SSL establishment.
|
||||||
if (evt.cause() is ClosedChannelException) {
|
if (evt.cause() is ClosedChannelException) {
|
||||||
log.warn("SSL Handshake closed early.")
|
logWarnWithMDC("SSL Handshake closed early.")
|
||||||
} else {
|
} else {
|
||||||
badCert = true
|
badCert = true
|
||||||
}
|
}
|
||||||
log.error("Handshake failure ${evt.cause().message}")
|
logErrorWithMDC("Handshake failure ${evt.cause().message}")
|
||||||
if (log.isTraceEnabled) {
|
if (log.isTraceEnabled) {
|
||||||
log.trace("Handshake failure", evt.cause())
|
withMDC { log.trace("Handshake failure", evt.cause()) }
|
||||||
}
|
}
|
||||||
ctx.close()
|
ctx.close()
|
||||||
}
|
}
|
||||||
@ -138,9 +164,9 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
|
|
||||||
@Suppress("OverridingDeprecatedMember")
|
@Suppress("OverridingDeprecatedMember")
|
||||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||||
log.warn("Closing channel due to nonrecoverable exception ${cause.message}")
|
logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}")
|
||||||
if (log.isTraceEnabled) {
|
if (log.isTraceEnabled) {
|
||||||
log.trace("Pipeline uncaught exception", cause)
|
withMDC { log.trace("Pipeline uncaught exception", cause) }
|
||||||
}
|
}
|
||||||
if (cause is ProxyConnectException) {
|
if (cause is ProxyConnectException) {
|
||||||
log.warn("Proxy connection failed ${cause.message}")
|
log.warn("Proxy connection failed ${cause.message}")
|
||||||
@ -167,13 +193,12 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
// Transfers application packet into the AMQP engine.
|
// Transfers application packet into the AMQP engine.
|
||||||
is SendableMessageImpl -> {
|
is SendableMessageImpl -> {
|
||||||
val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port)
|
val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port)
|
||||||
require(inetAddress == remoteAddress) {
|
logDebugWithMDC { "Message for endpoint $inetAddress , expected $remoteAddress "}
|
||||||
"Message for incorrect endpoint $inetAddress expected $remoteAddress"
|
|
||||||
}
|
|
||||||
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
|
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
|
||||||
"Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}"
|
"Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}"
|
||||||
}
|
}
|
||||||
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
|
logDebugWithMDC { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||||
eventProcessor!!.transportWriteMessage(msg)
|
eventProcessor!!.transportWriteMessage(msg)
|
||||||
}
|
}
|
||||||
// A received AMQP packet has been completed and this self-posted packet will be signalled out to the
|
// A received AMQP packet has been completed and this self-posted packet will be signalled out to the
|
||||||
@ -191,7 +216,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
log.error("Error in AMQP write processing", ex)
|
logErrorWithMDC("Error in AMQP write processing", ex)
|
||||||
throw ex
|
throw ex
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -29,6 +29,7 @@ import net.corda.core.node.NotaryInfo
|
|||||||
import net.corda.core.node.services.CordaService
|
import net.corda.core.node.services.CordaService
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
|
import net.corda.core.utilities.ProgressTracker
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.node.internal.StartedNode
|
import net.corda.node.internal.StartedNode
|
||||||
import net.corda.node.services.config.FlowTimeoutConfiguration
|
import net.corda.node.services.config.FlowTimeoutConfiguration
|
||||||
@ -55,6 +56,8 @@ import org.junit.rules.RuleChain
|
|||||||
import org.slf4j.MDC
|
import org.slf4j.MDC
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.test.assertNotEquals
|
||||||
|
|
||||||
class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
||||||
|
|
||||||
@ -154,8 +157,15 @@ class TimedFlowTests {
|
|||||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||||
}
|
}
|
||||||
val flow = NotaryFlow.Client(issueTx)
|
val flow = NotaryFlow.Client(issueTx)
|
||||||
|
val progressTracker = flow.progressTracker
|
||||||
|
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
|
||||||
val notarySignatures = services.startFlow(flow).resultFuture.get()
|
val notarySignatures = services.startFlow(flow).resultFuture.get()
|
||||||
(issueTx + notarySignatures).verifyRequiredSignatures()
|
(issueTx + notarySignatures).verifyRequiredSignatures()
|
||||||
|
assertEquals(
|
||||||
|
ProgressTracker.DONE,
|
||||||
|
progressTracker.currentStep,
|
||||||
|
"Ensure the same progress tracker object is re-used after flow restart"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,8 +177,15 @@ class TimedFlowTests {
|
|||||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||||
}
|
}
|
||||||
val flow = FinalityFlow(issueTx)
|
val flow = FinalityFlow(issueTx)
|
||||||
|
val progressTracker = flow.progressTracker
|
||||||
|
|
||||||
val stx = services.startFlow(flow).resultFuture.get()
|
val stx = services.startFlow(flow).resultFuture.get()
|
||||||
stx.verifyRequiredSignatures()
|
stx.verifyRequiredSignatures()
|
||||||
|
assertEquals(
|
||||||
|
ProgressTracker.DONE,
|
||||||
|
progressTracker.currentStep,
|
||||||
|
"Ensure the same progress tracker object is re-used after flow restart"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
|
|
||||||
if (!configuration.messagingServerExternal) {
|
if (!configuration.messagingServerExternal) {
|
||||||
val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
|
val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
|
||||||
messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
|
messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, info.legalIdentities.map { it.owningKey })
|
||||||
}
|
}
|
||||||
|
|
||||||
val serverAddress = configuration.messagingServerAddress
|
val serverAddress = configuration.messagingServerAddress
|
||||||
|
@ -23,14 +23,18 @@ import net.corda.node.services.config.NodeConfiguration
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport
|
||||||
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
|
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
|
||||||
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
|
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||||
import org.apache.activemq.artemis.core.config.Configuration
|
import org.apache.activemq.artemis.core.config.Configuration
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
|
||||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
|
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
|
||||||
import org.apache.activemq.artemis.core.security.Role
|
import org.apache.activemq.artemis.core.security.Role
|
||||||
@ -39,6 +43,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
|||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.security.KeyStoreException
|
import java.security.KeyStoreException
|
||||||
|
import java.security.PublicKey
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import javax.security.auth.login.AppConfigurationEntry
|
import javax.security.auth.login.AppConfigurationEntry
|
||||||
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
|
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
|
||||||
@ -59,7 +64,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
|
|||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||||
private val messagingServerAddress: NetworkHostAndPort,
|
private val messagingServerAddress: NetworkHostAndPort,
|
||||||
private val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
|
private val maxMessageSize: Int,
|
||||||
|
private val identities: List<PublicKey> = emptyList()) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
@ -115,29 +121,48 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
log.info("P2P messaging server listening on $messagingServerAddress")
|
log.info("P2P messaging server listening on $messagingServerAddress")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
|
private fun createArtemisConfig(): Configuration {
|
||||||
val artemisDir = config.baseDirectory / "artemis"
|
val addressConfigs = identities.map {
|
||||||
bindingsDirectory = (artemisDir / "bindings").toString()
|
val queueName = ArtemisMessagingComponent.RemoteInboxAddress(it).queueName
|
||||||
journalDirectory = (artemisDir / "journal").toString()
|
log.info("Configuring address $queueName")
|
||||||
largeMessagesDirectory = (artemisDir / "large-messages").toString()
|
val queueConfig = CoreQueueConfiguration().apply {
|
||||||
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config))
|
address = queueName
|
||||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
name = queueName
|
||||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
routingType = RoutingType.ANYCAST
|
||||||
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
isExclusive = true
|
||||||
isPersistIDCache = true
|
}
|
||||||
isPopulateValidatedUser = true
|
CoreAddressConfiguration().apply {
|
||||||
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
|
name = queueName
|
||||||
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
|
queueConfigurations = listOf(queueConfig)
|
||||||
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
|
addRoutingType(RoutingType.ANYCAST)
|
||||||
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
|
}
|
||||||
connectionTtlCheckInterval = config.enterpriseConfiguration.tuning.brokerConnectionTtlCheckIntervalMs
|
|
||||||
// JMX enablement
|
|
||||||
if (config.jmxMonitoringHttpPort != null) {
|
|
||||||
isJMXManagementEnabled = true
|
|
||||||
isJMXUseBrokerName = true
|
|
||||||
}
|
}
|
||||||
|
return SecureArtemisConfiguration().apply {
|
||||||
|
val artemisDir = config.baseDirectory / "artemis"
|
||||||
|
bindingsDirectory = (artemisDir / "bindings").toString()
|
||||||
|
journalDirectory = (artemisDir / "journal").toString()
|
||||||
|
largeMessagesDirectory = (artemisDir / "large-messages").toString()
|
||||||
|
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config))
|
||||||
|
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||||
|
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||||
|
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
||||||
|
isPersistIDCache = true
|
||||||
|
isPopulateValidatedUser = true
|
||||||
|
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
|
||||||
|
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
|
||||||
|
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
|
||||||
|
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
|
||||||
|
connectionTtlCheckInterval = config.enterpriseConfiguration.tuning.brokerConnectionTtlCheckIntervalMs
|
||||||
|
addressConfigurations = addressConfigs
|
||||||
|
|
||||||
}.configureAddressSecurity()
|
// JMX enablement
|
||||||
|
if (config.jmxMonitoringHttpPort != null) {
|
||||||
|
isJMXManagementEnabled = true
|
||||||
|
isJMXUseBrokerName = true
|
||||||
|
}
|
||||||
|
|
||||||
|
}.configureAddressSecurity()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Authenticated clients connecting to us fall in one of the following groups:
|
* Authenticated clients connecting to us fall in one of the following groups:
|
||||||
|
@ -43,6 +43,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.creat
|
|||||||
import net.corda.node.services.statemachine.interceptors.*
|
import net.corda.node.services.statemachine.interceptors.*
|
||||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
|
import net.corda.node.utilities.injectOldProgressTracker
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||||
import net.corda.serialization.internal.SerializeAsTokenContextImpl
|
import net.corda.serialization.internal.SerializeAsTokenContextImpl
|
||||||
@ -371,7 +372,10 @@ class SingleThreadedStateMachineManager(
|
|||||||
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
|
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
|
||||||
sessionToFlow.remove(sessionId)
|
sessionToFlow.remove(sessionId)
|
||||||
}
|
}
|
||||||
if (flow != null) addAndStartFlow(flowId, flow)
|
if (flow != null) {
|
||||||
|
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
|
||||||
|
addAndStartFlow(flowId, flow)
|
||||||
|
}
|
||||||
// Deliver all the external events from the old flow instance.
|
// Deliver all the external events from the old flow instance.
|
||||||
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
|
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
|
||||||
do {
|
do {
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
package net.corda.node.utilities
|
||||||
|
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.utilities.ProgressTracker
|
||||||
|
import net.corda.node.services.statemachine.StateMachineManagerInternal
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
|
||||||
|
* any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
|
||||||
|
* updates are correctly sent out after the flow is retried.
|
||||||
|
*/
|
||||||
|
fun StateMachineManagerInternal.injectOldProgressTracker(oldProgressTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
|
||||||
|
if (oldProgressTracker != null) {
|
||||||
|
try {
|
||||||
|
val field = newFlowLogic::class.java.getDeclaredField("progressTracker")
|
||||||
|
field.isAccessible = true
|
||||||
|
field.set(newFlowLogic, oldProgressTracker)
|
||||||
|
} catch (e: NoSuchFieldException) {
|
||||||
|
// The flow does not use a progress tracker.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -63,6 +63,7 @@ include 'tools:bootstrapper'
|
|||||||
include 'tools:blobinspector'
|
include 'tools:blobinspector'
|
||||||
include 'tools:dbmigration'
|
include 'tools:dbmigration'
|
||||||
include 'tools:shell'
|
include 'tools:shell'
|
||||||
|
include 'tools:shell-cli'
|
||||||
include 'tools:network-bootstrapper'
|
include 'tools:network-bootstrapper'
|
||||||
include 'example-code'
|
include 'example-code'
|
||||||
project(':example-code').projectDir = file("$settingsDir/docs/source/example-code")
|
project(':example-code').projectDir = file("$settingsDir/docs/source/example-code")
|
||||||
|
103
tools/shell-cli/build.gradle
Normal file
103
tools/shell-cli/build.gradle
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
buildscript {
|
||||||
|
|
||||||
|
repositories {
|
||||||
|
mavenLocal()
|
||||||
|
mavenCentral()
|
||||||
|
jcenter()
|
||||||
|
}
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
classpath "org.jetbrains.kotlin:kotlin-noarg:$kotlin_version"
|
||||||
|
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
|
||||||
|
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.1'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
repositories {
|
||||||
|
mavenLocal()
|
||||||
|
mavenCentral()
|
||||||
|
jcenter()
|
||||||
|
}
|
||||||
|
|
||||||
|
apply plugin: 'kotlin'
|
||||||
|
apply plugin: 'java'
|
||||||
|
apply plugin: 'application'
|
||||||
|
apply plugin: 'net.corda.plugins.publish-utils'
|
||||||
|
apply plugin: 'com.jfrog.artifactory'
|
||||||
|
apply plugin: 'com.github.johnrengelman.shadow'
|
||||||
|
apply plugin: 'maven-publish'
|
||||||
|
|
||||||
|
description 'Corda Shell CLI'
|
||||||
|
|
||||||
|
configurations {
|
||||||
|
integrationTestCompile.extendsFrom testCompile
|
||||||
|
integrationTestRuntime.extendsFrom testRuntime
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
integrationTest {
|
||||||
|
kotlin {
|
||||||
|
compileClasspath += main.output + test.output
|
||||||
|
runtimeClasspath += main.output + test.output
|
||||||
|
srcDir file('src/integration-test/kotlin')
|
||||||
|
}
|
||||||
|
resources {
|
||||||
|
srcDir file('src/integration-test/resources')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
test {
|
||||||
|
resources {
|
||||||
|
srcDir file('src/test/resources')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||||
|
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
|
||||||
|
|
||||||
|
compile project(':tools:shell')
|
||||||
|
|
||||||
|
// Unit testing helpers.
|
||||||
|
testCompile "junit:junit:$junit_version"
|
||||||
|
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||||
|
testCompile project(':test-utils')
|
||||||
|
testCompile project(':finance')
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
mainClassName = 'net.corda.tools.shell.StandaloneShellKt'
|
||||||
|
|
||||||
|
jar {
|
||||||
|
baseName 'corda-shell-cli'
|
||||||
|
}
|
||||||
|
|
||||||
|
processResources {
|
||||||
|
from file("$rootDir/config/dev/log4j2.xml")
|
||||||
|
}
|
||||||
|
|
||||||
|
task integrationTest(type: Test) {
|
||||||
|
testClassesDirs = sourceSets.integrationTest.output.classesDirs
|
||||||
|
classpath = sourceSets.integrationTest.runtimeClasspath
|
||||||
|
}
|
||||||
|
|
||||||
|
publishing {
|
||||||
|
publications {
|
||||||
|
shadow(MavenPublication) { publication ->
|
||||||
|
project.shadow.component(publication)
|
||||||
|
artifactId = "corda-tools-shell-cli"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
shadowJar {
|
||||||
|
//transform(de.sebastianboegl.gradle.plugins.shadow.transformers.Log4j2PluginsFileTransformer)
|
||||||
|
archiveName = "corda-shell-cli-${version}.jar"
|
||||||
|
|
||||||
|
baseName = 'corda-shell-cli'
|
||||||
|
classifier = null
|
||||||
|
mainClassName = 'net.corda.tools.shell.StandaloneShellKt'
|
||||||
|
mergeServiceFiles()
|
||||||
|
}
|
||||||
|
|
||||||
|
task buildShellCli(dependsOn: shadowJar)
|
29
tools/shell-cli/src/test/resources/config.conf
Normal file
29
tools/shell-cli/src/test/resources/config.conf
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
node {
|
||||||
|
addresses {
|
||||||
|
rpc {
|
||||||
|
host : "alocalhost"
|
||||||
|
port : 1234
|
||||||
|
}
|
||||||
|
}
|
||||||
|
user : demo
|
||||||
|
password : abcd1234
|
||||||
|
}
|
||||||
|
extensions {
|
||||||
|
cordapps {
|
||||||
|
path : "/x/y/cordapps"
|
||||||
|
}
|
||||||
|
sshd {
|
||||||
|
enabled : "true"
|
||||||
|
port : 2223
|
||||||
|
}
|
||||||
|
commands {
|
||||||
|
path : /x/y/commands
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ssl {
|
||||||
|
truststore {
|
||||||
|
path : "/x/y/truststore.jks"
|
||||||
|
type : "JKS"
|
||||||
|
password : "pass2"
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,5 @@
|
|||||||
apply plugin: 'kotlin'
|
apply plugin: 'kotlin'
|
||||||
apply plugin: 'java'
|
apply plugin: 'java'
|
||||||
apply plugin: 'application'
|
|
||||||
apply plugin: 'net.corda.plugins.quasar-utils'
|
apply plugin: 'net.corda.plugins.quasar-utils'
|
||||||
apply plugin: 'net.corda.plugins.publish-utils'
|
apply plugin: 'net.corda.plugins.publish-utils'
|
||||||
apply plugin: 'com.jfrog.artifactory'
|
apply plugin: 'com.jfrog.artifactory'
|
||||||
@ -80,21 +79,15 @@ dependencies {
|
|||||||
integrationTestCompile project(':node-driver')
|
integrationTestCompile project(':node-driver')
|
||||||
}
|
}
|
||||||
|
|
||||||
mainClassName = 'net.corda.tools.shell.StandaloneShellKt'
|
|
||||||
|
|
||||||
jar {
|
|
||||||
baseName 'corda-shell'
|
|
||||||
}
|
|
||||||
|
|
||||||
processResources {
|
|
||||||
from file("$rootDir/config/dev/log4j2.xml")
|
|
||||||
}
|
|
||||||
|
|
||||||
task integrationTest(type: Test) {
|
task integrationTest(type: Test) {
|
||||||
testClassesDirs = sourceSets.integrationTest.output.classesDirs
|
testClassesDirs = sourceSets.integrationTest.output.classesDirs
|
||||||
classpath = sourceSets.integrationTest.runtimeClasspath
|
classpath = sourceSets.integrationTest.runtimeClasspath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jar {
|
||||||
|
baseName 'corda-shell'
|
||||||
|
}
|
||||||
|
|
||||||
publish {
|
publish {
|
||||||
name jar.baseName
|
name jar.baseName
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user