diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 485fb66e00..12a44713fc 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -3906,6 +3906,7 @@ public static final class net.corda.testing.node.InMemoryMessagingNetwork$InMemo @org.jetbrains.annotations.Nullable public final String component5() @org.jetbrains.annotations.NotNull public final net.corda.testing.node.InMemoryMessagingNetwork$InMemoryMessage copy(String, net.corda.core.utilities.ByteSequence, net.corda.node.services.statemachine.DeduplicationId, java.time.Instant, String) public boolean equals(Object) + @org.jetbrains.annotations.NotNull public Map getAdditionalHeaders() @org.jetbrains.annotations.NotNull public net.corda.core.utilities.ByteSequence getData() @org.jetbrains.annotations.NotNull public java.time.Instant getDebugTimestamp() @org.jetbrains.annotations.Nullable public String getSenderUUID() @@ -3980,7 +3981,7 @@ public class net.corda.testing.node.MessagingServiceSpy extends java.lang.Object public (net.corda.node.services.messaging.MessagingService) @org.jetbrains.annotations.NotNull public net.corda.node.services.messaging.MessageHandlerRegistration addMessageHandler(String, kotlin.jvm.functions.Function3) public void cancelRedelivery(long) - @org.jetbrains.annotations.NotNull public net.corda.node.services.messaging.Message createMessage(String, byte[], net.corda.node.services.statemachine.DeduplicationId) + @org.jetbrains.annotations.NotNull public net.corda.node.services.messaging.Message createMessage(String, byte[], net.corda.node.services.statemachine.DeduplicationId, Map) @org.jetbrains.annotations.NotNull public net.corda.core.messaging.MessageRecipients getAddressOfParty(net.corda.core.node.services.PartyInfo) @org.jetbrains.annotations.NotNull public final net.corda.node.services.messaging.MessagingService getMessagingService() @org.jetbrains.annotations.NotNull public net.corda.core.messaging.SingleMessageRecipient getMyAddress() diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 0216c389d8..b75e9b3b1d 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -10,6 +10,9 @@ + + + @@ -27,6 +30,8 @@ + + @@ -65,6 +70,7 @@ + @@ -130,6 +136,12 @@ + + + + + + diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/FlowsExecutionModeRpcTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/FlowsExecutionModeRpcTest.kt index 011d7d13aa..5cb0945efa 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/FlowsExecutionModeRpcTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/FlowsExecutionModeRpcTest.kt @@ -12,16 +12,27 @@ import net.corda.node.services.Permissions import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.testing.core.* import net.corda.testing.driver.DriverParameters -import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver +import net.corda.testing.internal.IntegrationTest +import net.corda.testing.internal.IntegrationTestSchemas +import net.corda.testing.internal.toDatabaseSchemaName +import net.corda.testing.internal.toDatabaseSchemaNames import net.corda.testing.node.User import net.corda.testing.node.internal.NodeBasedTest import org.assertj.core.api.Assertions.assertThat import org.junit.Assume.assumeFalse import org.junit.Before +import org.junit.ClassRule import org.junit.Test -class FlowsExecutionModeRpcTest { +class FlowsExecutionModeRpcTest : IntegrationTest() { + + companion object { + + @ClassRule + @JvmField + val databaseSchemas = IntegrationTestSchemas(*listOf(ALICE_NAME, BOB_NAME, DUMMY_BANK_A_NAME).map { it.toDatabaseSchemaNames("", "_10000", "_10003", "_10006") }.flatten().toTypedArray(), DUMMY_NOTARY_NAME.toDatabaseSchemaName()) + } @Test fun `persistent state survives node restart`() { diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt index 8197cc2c09..c93cbb898f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt @@ -1,7 +1,11 @@ package net.corda.node.modes.draining import co.paralleluniverse.fibers.Suspendable -import net.corda.core.flows.* +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC import net.corda.core.identity.Party import net.corda.core.internal.concurrent.map import net.corda.core.messaging.startFlow @@ -19,18 +23,18 @@ import org.junit.After import org.junit.Before import org.junit.Ignore import org.junit.Test -import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import kotlin.test.fail -@Ignore("Pending implementation") class P2PFlowsDrainingModeTest { private val portAllocation = PortAllocation.Incremental(10000) private val user = User("mark", "dadada", setOf(Permissions.all())) private val users = listOf(user) - private var executor: ExecutorService? = null + private var executor: ScheduledExecutorService? = null companion object { private val logger = loggerFor() @@ -38,7 +42,7 @@ class P2PFlowsDrainingModeTest { @Before fun setup() { - executor = Executors.newSingleThreadExecutor() + executor = Executors.newSingleThreadScheduledExecutor() } @After @@ -49,7 +53,7 @@ class P2PFlowsDrainingModeTest { @Test fun `flows draining mode suspends consumption of initial session messages`() { - driver(DriverParameters(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation)) { + driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation)) { val initiatedNode = startNode().getOrThrow() val initiating = startNode(rpcUsers = users).getOrThrow().rpc val counterParty = initiatedNode.nodeInfo.chooseIdentity() @@ -61,11 +65,11 @@ class P2PFlowsDrainingModeTest { initiating.apply { val flow = startFlow(::InitiateSessionFlow, counterParty) // this should be really fast, for the flow has already started, so 5 seconds should never be a problem - executor!!.submit({ + executor!!.schedule({ logger.info("Now disabling flows draining mode for $counterParty.") shouldFail = false initiated.setFlowsDrainingModeEnabled(false) - }) + }, 5, TimeUnit.SECONDS) flow.returnValue.map { result -> if (shouldFail) { fail("Shouldn't happen until flows draining mode is switched off.") diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 71d03b8855..203707be8f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -193,8 +193,9 @@ open class Node(configuration: NodeConfiguration, services.networkMapCache, services.monitoringService.metrics, advertisedAddress, - networkParameters.maxMessageSize - ) + networkParameters.maxMessageSize, + nodeProperties.flowsDrainingMode::isEnabled, + nodeProperties.flowsDrainingMode.values) } private fun startLocalRpcBroker(networkParameters: NetworkParameters): BrokerAddresses? { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 8357288fea..8a1789f89d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -97,7 +97,7 @@ interface MessagingService { * @param additionalProperties optional additional message headers. * @param topic identifier for the topic the message is sent to. */ - fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom())): Message + fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom()), additionalHeaders: Map = emptyMap()): Message /** Given information about either a specific node or a service returns its corresponding address */ fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients @@ -106,9 +106,8 @@ interface MessagingService { val myAddress: SingleMessageRecipient } - -fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom()), retryId: Long? = null) - = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId), to, retryId) +fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom()), retryId: Long? = null, additionalHeaders: Map = emptyMap()) + = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to, retryId) interface MessageHandlerRegistration @@ -129,6 +128,7 @@ interface Message { val debugTimestamp: Instant val uniqueMessageId: DeduplicationId val senderUUID: String? + val additionalHeaders: Map } // TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that. @@ -165,3 +165,11 @@ interface AcknowledgeHandle { } typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, AcknowledgeHandle) -> Unit + +object P2PMessagingHeaders { + + object Type { + const val KEY = "corda_p2p_message_type" + const val SESSION_INIT_VALUE = "session_init" + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 244fc0a73f..901f39dc26 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -223,6 +223,7 @@ class MessagingExecutor( if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) { putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis) } + message.additionalHeaders.forEach { key, value -> putStringProperty(key, value) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 8f127f1722..da91235355 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -14,19 +14,31 @@ import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize +import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.* +import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.trace import net.corda.node.VersionInfo +import net.corda.node.internal.LifecycleSupport +import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.PersistentMap -import net.corda.nodeapi.internal.ArtemisMessagingClient -import net.corda.nodeapi.internal.ArtemisMessagingComponent.* +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -36,11 +48,16 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY +import rx.Observable import rx.Subscription +import rx.subjects.PublishSubject import java.security.PublicKey import java.time.Instant import java.util.* @@ -82,7 +99,7 @@ import javax.persistence.Lob @ThreadSafe class P2PMessagingClient(val config: NodeConfiguration, private val versionInfo: VersionInfo, - serverAddress: NetworkHostAndPort, + private val serverAddress: NetworkHostAndPort, private val myIdentity: PublicKey, private val serviceIdentity: PublicKey?, private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, @@ -90,8 +107,10 @@ class P2PMessagingClient(val config: NodeConfiguration, private val networkMap: NetworkMapCacheInternal, private val metricRegistry: MetricRegistry, advertisedAddress: NetworkHostAndPort = serverAddress, - maxMessageSize: Int -) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver { + private val maxMessageSize: Int, + private val isDrainingModeOn: () -> Boolean, + private val drainingModeWasChangedEvents: Observable> +) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable { companion object { private val log = contextLogger() // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". @@ -105,7 +124,7 @@ class P2PMessagingClient(val config: NodeConfiguration, val senderUUID = SimpleString("sender-uuid") val senderSeqNo = SimpleString("send-seq-no") - private val messageMaxRetryCount: Int = 3 + private const val messageMaxRetryCount: Int = 3 fun createMessageToRedeliver(): PersistentMap, RetryMessage, Long> { return PersistentMap( @@ -127,18 +146,25 @@ class P2PMessagingClient(val config: NodeConfiguration, ) } - private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?) : Message { + private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?, override val additionalHeaders: Map) : Message { override val debugTimestamp: Instant = Instant.now() override fun toString() = "$topic#${String(data.bytes)}" } } private class InnerState { + var started = false var running = false - var p2pConsumer: ClientConsumer? = null - var serviceConsumer: ClientConsumer? = null + var eventsSubscription: Subscription? = null + var p2pConsumer: P2PMessagingConsumer? = null + var locator: ServerLocator? = null + var producer: ClientProducer? = null + var producerSession: ClientSession? = null + var bridgeSession: ClientSession? = null var bridgeNotifyConsumer: ClientConsumer? = null var networkChangeSubscription: Subscription? = null + + fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message) } private val messagesToRedeliver = database.transaction { @@ -152,14 +178,6 @@ class P2PMessagingClient(val config: NodeConfiguration, override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong() - private val artemis = ArtemisMessagingClient( - config = config, - serverAddress = serverAddress, - maxMessageSize = maxMessageSize, - autoCommitSends = false, - autoCommitAcks = false, - confirmationWindowSize = config.enterpriseConfiguration.tuning.p2pConfirmationWindowSize - ) private val state = ThreadBox(InnerState()) private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap()) @@ -185,28 +203,46 @@ class P2PMessagingClient(val config: NodeConfiguration, fun start() { state.locked { - val started = artemis.start() - val session = started.session - val inbox = RemoteInboxAddress(myIdentity).queueName - val inboxes = mutableListOf(inbox) + started = true + log.info("Connecting to message broker: $serverAddress") + // TODO Add broker CN to config for host verification in case the embedded broker isn't used + val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config) + locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this + // would be the default and the two lines below can be deleted. + connectionTTL = -1 + clientFailureCheckPeriod = -1 + minLargeMessageSize = maxMessageSize + isUseGlobalPools = nodeSerializationEnv != null + } + val sessionFactory = locator!!.createSessionFactory() + // Login using the node username. The broker will authenticate us as its node (as opposed to another peer) + // using our TLS certificate. + // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer + // size of 1MB is acknowledged. + val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER, false, true, true, locator!!.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) } + + producerSession = createNewSession() + bridgeSession = createNewSession() + producerSession!!.start() + bridgeSession!!.start() + + val inboxes = mutableSetOf() // Create a queue, consumer and producer for handling P2P network messages. - createQueueIfAbsent(inbox) - p2pConsumer = session.createConsumer(inbox) - if (serviceIdentity != null) { - val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName - inboxes += serviceAddress - createQueueIfAbsent(serviceAddress) - val serviceHandler = session.createConsumer(serviceAddress) - serviceHandler.setMessageHandler { msg -> - val message: ReceivedMessage? = artemisToCordaMessage(msg) - if (message != null) - deliver(msg, message) - } + // Create a general purpose producer. + producer = producerSession!!.createProducer() + + inboxes += RemoteInboxAddress(myIdentity).queueName + serviceIdentity?.let { + inboxes += RemoteInboxAddress(it).queueName } + inboxes.forEach { createQueueIfAbsent(it, producerSession!!) } + p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents) + val messagingExecutor = MessagingExecutor( - session, - started.producer, + producerSession!!, + producer!!, versionInfo, this@P2PMessagingClient, metricRegistry, @@ -216,8 +252,8 @@ class P2PMessagingClient(val config: NodeConfiguration, this@P2PMessagingClient.messagingExecutor = messagingExecutor messagingExecutor.start() - registerBridgeControl(session, inboxes) - enumerateBridges(session, inboxes) + registerBridgeControl(bridgeSession!!, inboxes.toList()) + enumerateBridges(bridgeSession!!, inboxes.toList()) } resumeMessageRedelivery() @@ -237,7 +273,7 @@ class P2PMessagingClient(val config: NodeConfiguration, log.info(notifyMessage.toString()) when (notifyMessage) { is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes) - else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage") + else -> log.error("Unexpected Bridge Control message type on notify topic $notifyMessage") } msg.acknowledge() } @@ -245,22 +281,24 @@ class P2PMessagingClient(val config: NodeConfiguration, networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) } } - private fun sendBridgeControl(message: BridgeControl) { - val client = artemis.started!! - val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes - val artemisMessage = client.session.createMessage(false) - artemisMessage.writeBodyBufferBytes(controlPacket) - client.producer.send(BRIDGE_CONTROL, artemisMessage) - client.session.commit() + private fun sendBridgeControl(message: BridgeControl) { + state.locked { + val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes + val artemisMessage = producerSession!!.createMessage(false) + artemisMessage.writeBodyBufferBytes(controlPacket) + sendMessage(BRIDGE_CONTROL, artemisMessage) + } } private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) { log.info("Updating bridges on network map change: ${change.node}") fun gatherAddresses(node: NodeInfo): Sequence { - return node.legalIdentitiesAndCerts.map { - val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first()) - BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }) - }.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() + return state.locked { + node.legalIdentitiesAndCerts.map { + val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first()) + BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }) + }.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() + } } fun deployBridges(node: NodeInfo) { @@ -319,39 +357,31 @@ class P2PMessagingClient(val config: NodeConfiguration, private val shutdownLatch = CountDownLatch(1) - private fun processMessage(consumer: ClientConsumer): Boolean { - // Two possibilities here: - // - // 1. We block waiting for a message and the consumer is closed in another thread. In this case - // receive returns null and we break out of the loop. - // 2. We receive a message and process it, and stop() is called during delivery. In this case, - // calling receive will throw and we break out of the loop. - // - // It's safe to call into receive simultaneous with other threads calling send on a producer. - val artemisMessage: ClientMessage = try { - consumer.receive() - } catch (e: ActiveMQObjectClosedException) { - null - } ?: return false - - deliver(artemisMessage) - return true - } - /** * Starts the p2p event loop: this method only returns once [stop] has been called. */ fun run() { + val latch = CountDownLatch(1) try { val consumer = state.locked { - check(artemis.started != null) { "start must be called first" } + check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true // If it's null, it means we already called stop, so return immediately. - p2pConsumer ?: return + if (p2pConsumer == null) { + return + } + eventsSubscription = p2pConsumer!!.messages + .doOnError { error -> throw error } + .doOnNext { message -> deliver(message) } + // this `run()` method is semantically meant to block until the message consumption runs, hence the latch here + .doOnCompleted(latch::countDown) + .doOnError { error -> throw error } + .subscribe() + p2pConsumer!! } - - while (processMessage(consumer)) { } + consumer.start() + latch.await() } finally { shutdownLatch.countDown() } @@ -389,51 +419,51 @@ class P2PMessagingClient(val config: NodeConfiguration, private val message: ClientMessage) : ReceivedMessage { override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) } override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp) + override val additionalHeaders: Map = emptyMap() override fun toString() = "$topic#$data" } internal fun deliver(artemisMessage: ClientMessage) { - val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage) - if (message != null) - deliver(artemisMessage, message) + + artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> + if (!deduplicator.isDuplicate(cordaMessage)) { + deliver(cordaMessage, artemisMessage) + } else { + log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" } + } + } } - private fun deliver(artemisMessage: ClientMessage, msg: ReceivedMessage) { + private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) { + state.checkNotLocked() val deliverTo = handlers[msg.topic] - try { - // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will - // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler - // directly in order to ensure that we have the features of the AffinityExecutor class throughout - // the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor - // easily. - // - // Note that handlers may re-enter this class. We aren't holding any locks and methods like - // start/run/stop have re-entrancy assertions at the top, so it is OK. - if (deliverTo != null) { - if (deduplicator.isDuplicate(msg)) { - log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topic}" } - return - } - val acknowledgeHandle = object : AcknowledgeHandle { - override fun persistDeduplicationId() { - deduplicator.persistDeduplicationId(msg) - } - - // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it - // doesn't collide with a send here. Note that stop() could have been called whilst we were - // processing a message but if so, it'll be parked waiting for us to count down the latch, so - // the session itself is still around and we can still ack messages as a result. - override fun acknowledge() { - messagingExecutor!!.acknowledge(artemisMessage) - } - } - deliverTo(msg, HandlerRegistration(msg.topic, deliverTo), acknowledgeHandle) - } else { - log.warn("Received message ${msg.uniqueMessageId} for ${msg.topic} that doesn't have any registered handlers yet") + if (deliverTo != null) { + try { + deliverTo(msg, HandlerRegistration(msg.topic, deliverTo), acknowledgeHandleFor(artemisMessage, msg)) + } catch (e: Exception) { + log.error("Caught exception whilst executing message handler for ${msg.topic}", e) + } + } else { + log.warn("Received message ${msg.uniqueMessageId} for ${msg.topic} that doesn't have any registered handlers yet") + } + } + + private fun acknowledgeHandleFor(artemisMessage: ClientMessage, cordaMessage: ReceivedMessage): AcknowledgeHandle { + + return object : AcknowledgeHandle { + + override fun persistDeduplicationId() { + deduplicator.persistDeduplicationId(cordaMessage) + } + + // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it + // doesn't collide with a send here. Note that stop() could have been called whilst we were + // processing a message but if so, it'll be parked waiting for us to count down the latch, so + // the session itself is still around and we can still ack messages as a result. + override fun acknowledge() { + messagingExecutor!!.acknowledge(artemisMessage) } - } catch (e: Exception) { - log.error("Caught exception whilst executing message handler for ${msg.topic}", e) } } @@ -446,30 +476,24 @@ class P2PMessagingClient(val config: NodeConfiguration, fun stop() { val running = state.locked { // We allow stop() to be called without a run() in between, but it must have at least been started. - check(artemis.started != null) + check(started) val prevRunning = running running = false networkChangeSubscription?.unsubscribe() - val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice") - try { - c.close() - } catch (e: ActiveMQObjectClosedException) { - // Ignore it: this can happen if the server has gone away before we do. - } - try { - bridgeNotifyConsumer!!.close() - } catch (e: ActiveMQObjectClosedException) { - // Ignore it: this can happen if the server has gone away before we do. - } + require(p2pConsumer != null, {"stop can't be called twice"}) + require(producer != null, {"stop can't be called twice"}) + + close(p2pConsumer) p2pConsumer = null - val s = serviceConsumer - try { - s?.close() - } catch (e: ActiveMQObjectClosedException) { - // Ignore it: this can happen if the server has gone away before we do. - } - serviceConsumer = null + + close(producer) + producer = null + producerSession!!.commit() + + close(bridgeNotifyConsumer) knownQueues.clear() + eventsSubscription?.unsubscribe() + eventsSubscription = null prevRunning } if (running && !nodeExecutor.isOnThread) { @@ -478,13 +502,21 @@ class P2PMessagingClient(val config: NodeConfiguration, } // Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests. messagingExecutor?.close() - if (running) { - state.locked { - artemis.stop() - } + state.locked { + locator?.close() } } + private fun close(target: AutoCloseable?) { + try { + target?.close() + } catch (ignored: ActiveMQObjectClosedException) { + // swallow + } + } + + override fun close() = stop() + @Suspendable override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { messagingExecutor!!.send(message, target) @@ -495,7 +527,6 @@ class P2PMessagingClient(val config: NodeConfiguration, scheduledMessageRedeliveries[it] = nodeExecutor.schedule({ sendWithRetry(0, message, target, retryId) }, messageRedeliveryDelaySeconds, TimeUnit.SECONDS) - } } @@ -544,15 +575,16 @@ class P2PMessagingClient(val config: NodeConfiguration, // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the // broker's job to route the message to the target's P2P queue. val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") - createQueueIfAbsent(internalTargetQueue) + state.locked { + createQueueIfAbsent(internalTargetQueue, producerSession!!) + } internalTargetQueue } } /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ - private fun createQueueIfAbsent(queueName: String) { + private fun createQueueIfAbsent(queueName: String, session: ClientSession) { if (!knownQueues.contains(queueName)) { - val session = artemis.started!!.session val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") @@ -587,8 +619,9 @@ class P2PMessagingClient(val config: NodeConfiguration, handlers.remove(registration.topic) } - override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message { - return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID) + override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map): Message { + + return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders) } override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { @@ -598,3 +631,78 @@ class P2PMessagingClient(val config: NodeConfiguration, } } } + +private class P2PMessagingConsumer( + queueNames: Set, + createSession: () -> ClientSession, + private val isDrainingModeOn: () -> Boolean, + private val drainingModeWasChangedEvents: Observable>) : LifecycleSupport { + + private companion object { + private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" + private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" + } + + private var startedFlag = false + + val messages: PublishSubject = PublishSubject.create() + + private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages) + private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages) + private val subscriptions = mutableSetOf() + + override fun start() { + + synchronized(this) { + require(!startedFlag) + drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe() + drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe() + subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe() + subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe() + if (!isDrainingModeOn()) { + initialConsumer.start() + } + existingConsumer.start() + startedFlag = true + } + } + + override fun stop() { + + synchronized(this) { + if (startedFlag) { + initialConsumer.stop() + existingConsumer.stop() + subscriptions.forEach(Subscription::unsubscribe) + subscriptions.clear() + startedFlag = false + } + messages.onCompleted() + } + } + + override val started: Boolean + get() = startedFlag + + + private fun pauseInitial() { + + if (initialConsumer.started && initialConsumer.connected) { + initialConsumer.disconnect() + } + } + + private fun resumeInitial() { + + if(!initialConsumer.started) { + initialConsumer.start() + } + if (!initialConsumer.connected) { + initialConsumer.connect() + } + } + + private fun Pair.switchedOff() = first && !second + + private fun Pair.switchedOn() = !first && second +} diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/NodePropertiesPersistentStore.kt b/node/src/main/kotlin/net/corda/node/services/persistence/NodePropertiesPersistentStore.kt index deaff88abb..afa84c9f2a 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/NodePropertiesPersistentStore.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/NodePropertiesPersistentStore.kt @@ -29,10 +29,10 @@ class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, persistenc @Table(name = "${NODE_DATABASE_PREFIX}properties") class DBNodeProperty( @Id - @Column(name = "key") + @Column(name = "property_key") val key: String = "", - @Column(name = "value") + @Column(name = "property_value") var value: String? = "" ) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 9f149cc0ae..18678c2f9a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -10,6 +10,7 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.AcknowledgeHandle +import net.corda.node.services.messaging.P2PMessagingHeaders import net.corda.node.services.messaging.ReceivedMessage import java.io.NotSerializableException @@ -50,7 +51,7 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { @Suspendable override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId) { log.trace { "Sending message $deduplicationId $message to party $party" } - val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId) + val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders()) val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party") val address = serviceHub.networkService.getAddressOfParty(partyInfo) val sequenceKey = when (message) { @@ -60,6 +61,13 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { serviceHub.networkService.send(networkMessage, address, sequenceKey = sequenceKey) } + private fun SessionMessage.additionalHeaders(): Map { + return when (this) { + is InitialSessionMessage -> mapOf(P2PMessagingHeaders.Type.KEY to P2PMessagingHeaders.Type.SESSION_INIT_VALUE) + else -> emptyMap() + } + } + private fun serializeSessionMessage(message: SessionMessage): SerializedBytes { return try { message.serialize() diff --git a/node/src/main/resources/migration/node-info.changelog-master.xml b/node/src/main/resources/migration/node-info.changelog-master.xml index 01f59ba81b..dcce7894a6 100644 --- a/node/src/main/resources/migration/node-info.changelog-master.xml +++ b/node/src/main/resources/migration/node-info.changelog-master.xml @@ -5,5 +5,6 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"> + diff --git a/node/src/main/resources/migration/node-info.changelog-v1.xml b/node/src/main/resources/migration/node-info.changelog-v1.xml new file mode 100644 index 0000000000..b8173a2c4c --- /dev/null +++ b/node/src/main/resources/migration/node-info.changelog-v1.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/modes/draining/ScheduledFlowsDrainingModeTest.kt b/node/src/test/kotlin/net/corda/node/modes/draining/ScheduledFlowsDrainingModeTest.kt index eed40b383d..90b7d4082a 100644 --- a/node/src/test/kotlin/net/corda/node/modes/draining/ScheduledFlowsDrainingModeTest.kt +++ b/node/src/test/kotlin/net/corda/node/modes/draining/ScheduledFlowsDrainingModeTest.kt @@ -20,7 +20,6 @@ import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity -import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNodeParameters import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.startFlow diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 69afbb25b9..e09a95bc70 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -6,20 +6,29 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.generateKeyPair import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.configureDatabase -import net.corda.node.services.config.* +import net.corda.node.services.config.CertChainPolicyConfig +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.core.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.MAX_MESSAGE_SIZE +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.freeLocalHostAndPort +import net.corda.testing.core.freePort import net.corda.testing.internal.LogHelper import net.corda.testing.internal.rigorousMock import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ClientMessage import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After @@ -260,7 +269,6 @@ class ArtemisMessagingTest { } } - private fun startNodeMessagingClient() { messagingClient!!.start() } @@ -296,7 +304,9 @@ class ArtemisMessagingTest { database, networkMapCache, MetricRegistry(), - maxMessageSize = maxMessageSize).apply { + maxMessageSize = maxMessageSize, + isDrainingModeOn = { false }, + drainingModeWasChangedEvents = PublishSubject.create()).apply { config.configureWithDevSSLCertificate() messagingClient = this } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 5ee3dc18e1..4fdeb91e63 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -1,6 +1,5 @@ package net.corda.testing.node -import net.corda.core.CordaInternal import net.corda.core.DoNotImplement import net.corda.core.crypto.CompositeKey import net.corda.core.identity.CordaX500Name @@ -19,7 +18,12 @@ import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.trace -import net.corda.node.services.messaging.* +import net.corda.node.services.messaging.AcknowledgeHandle +import net.corda.node.services.messaging.Message +import net.corda.node.services.messaging.MessageHandler +import net.corda.node.services.messaging.MessageHandlerRegistration +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -260,6 +264,9 @@ class InMemoryMessagingNetwork private constructor( override val uniqueMessageId: DeduplicationId, override val debugTimestamp: Instant = Instant.now(), override val senderUUID: String? = null) : Message { + + override val additionalHeaders: Map = emptyMap() + override fun toString() = "$topic#${String(data.bytes)}" } @@ -270,7 +277,10 @@ class InMemoryMessagingNetwork private constructor( override val debugTimestamp: Instant, override val peer: CordaX500Name, override val senderUUID: String? = null, - override val senderSeqNo: Long? = null) : ReceivedMessage + override val senderSeqNo: Long? = null) : ReceivedMessage { + + override val additionalHeaders: Map = emptyMap() + } /** * A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to @@ -368,7 +378,7 @@ class InMemoryMessagingNetwork private constructor( override fun cancelRedelivery(retryId: Long) {} /** Returns the given (topic & session, data) pair as a newly created message object. */ - override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message { + override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map): Message { return InMemoryMessage(topic, OpaqueBytes(data), deduplicationId) }