Fixed flows draining mode after regression introduced by OS -> ENT merge (#474)

This commit is contained in:
Michele Sollecito 2018-02-23 15:19:37 +00:00 committed by GitHub
parent 089916d350
commit 191f412aba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 355 additions and 167 deletions

View File

@ -3906,6 +3906,7 @@ public static final class net.corda.testing.node.InMemoryMessagingNetwork$InMemo
@org.jetbrains.annotations.Nullable public final String component5()
@org.jetbrains.annotations.NotNull public final net.corda.testing.node.InMemoryMessagingNetwork$InMemoryMessage copy(String, net.corda.core.utilities.ByteSequence, net.corda.node.services.statemachine.DeduplicationId, java.time.Instant, String)
public boolean equals(Object)
@org.jetbrains.annotations.NotNull public Map getAdditionalHeaders()
@org.jetbrains.annotations.NotNull public net.corda.core.utilities.ByteSequence getData()
@org.jetbrains.annotations.NotNull public java.time.Instant getDebugTimestamp()
@org.jetbrains.annotations.Nullable public String getSenderUUID()
@ -3980,7 +3981,7 @@ public class net.corda.testing.node.MessagingServiceSpy extends java.lang.Object
public <init>(net.corda.node.services.messaging.MessagingService)
@org.jetbrains.annotations.NotNull public net.corda.node.services.messaging.MessageHandlerRegistration addMessageHandler(String, kotlin.jvm.functions.Function3)
public void cancelRedelivery(long)
@org.jetbrains.annotations.NotNull public net.corda.node.services.messaging.Message createMessage(String, byte[], net.corda.node.services.statemachine.DeduplicationId)
@org.jetbrains.annotations.NotNull public net.corda.node.services.messaging.Message createMessage(String, byte[], net.corda.node.services.statemachine.DeduplicationId, Map)
@org.jetbrains.annotations.NotNull public net.corda.core.messaging.MessageRecipients getAddressOfParty(net.corda.core.node.services.PartyInfo)
@org.jetbrains.annotations.NotNull public final net.corda.node.services.messaging.MessagingService getMessagingService()
@org.jetbrains.annotations.NotNull public net.corda.core.messaging.SingleMessageRecipient getMyAddress()

12
.idea/compiler.xml generated
View File

@ -10,6 +10,9 @@
<module name="bank-of-corda-demo_integrationTest" target="1.8" />
<module name="bank-of-corda-demo_main" target="1.8" />
<module name="bank-of-corda-demo_test" target="1.8" />
<module name="behave_main" target="1.8" />
<module name="behave_scenario" target="1.8" />
<module name="behave_test" target="1.8" />
<module name="bootstrapper_main" target="1.8" />
<module name="bootstrapper_test" target="1.8" />
<module name="buildSrc_main" target="1.8" />
@ -27,6 +30,8 @@
<module name="confidential-identities_test" target="1.8" />
<module name="corda-project_main" target="1.8" />
<module name="corda-project_test" target="1.8" />
<module name="cordapp-configuration_main" target="1.8" />
<module name="cordapp-configuration_test" target="1.8" />
<module name="cordapp_integrationTest" target="1.8" />
<module name="cordapp_main" target="1.8" />
<module name="cordapp_test" target="1.8" />
@ -65,6 +70,7 @@
<module name="intellij-plugin_test" target="1.8" />
<module name="irs-demo_integrationTest" target="1.8" />
<module name="irs-demo_main" target="1.8" />
<module name="irs-demo_systemTest" target="1.8" />
<module name="irs-demo_test" target="1.8" />
<module name="isolated_main" target="1.8" />
<module name="isolated_test" target="1.8" />
@ -130,6 +136,12 @@
<module name="simm-valuation-demo_integrationTest" target="1.8" />
<module name="simm-valuation-demo_main" target="1.8" />
<module name="simm-valuation-demo_test" target="1.8" />
<module name="smoke-test-utils_main" target="1.8" />
<module name="smoke-test-utils_test" target="1.8" />
<module name="test-common_main" target="1.8" />
<module name="test-common_test" target="1.8" />
<module name="test-utils_main" target="1.8" />
<module name="test-utils_test" target="1.8" />
<module name="testing-smoke-test-utils_main" target="1.8" />
<module name="testing-smoke-test-utils_test" target="1.8" />
<module name="testing-test-common_main" target="1.8" />

View File

@ -12,16 +12,27 @@ import net.corda.node.services.Permissions
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.testing.core.*
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.internal.toDatabaseSchemaNames
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Assume.assumeFalse
import org.junit.Before
import org.junit.ClassRule
import org.junit.Test
class FlowsExecutionModeRpcTest {
class FlowsExecutionModeRpcTest : IntegrationTest() {
companion object {
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas(*listOf(ALICE_NAME, BOB_NAME, DUMMY_BANK_A_NAME).map { it.toDatabaseSchemaNames("", "_10000", "_10003", "_10006") }.flatten().toTypedArray(), DUMMY_NOTARY_NAME.toDatabaseSchemaName())
}
@Test
fun `persistent state survives node restart`() {

View File

@ -1,7 +1,11 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.startFlow
@ -19,18 +23,18 @@ import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.test.fail
@Ignore("Pending implementation")
class P2PFlowsDrainingModeTest {
private val portAllocation = PortAllocation.Incremental(10000)
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
private var executor: ExecutorService? = null
private var executor: ScheduledExecutorService? = null
companion object {
private val logger = loggerFor<P2PFlowsDrainingModeTest>()
@ -38,7 +42,7 @@ class P2PFlowsDrainingModeTest {
@Before
fun setup() {
executor = Executors.newSingleThreadExecutor()
executor = Executors.newSingleThreadScheduledExecutor()
}
@After
@ -49,7 +53,7 @@ class P2PFlowsDrainingModeTest {
@Test
fun `flows draining mode suspends consumption of initial session messages`() {
driver(DriverParameters(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation)) {
driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation)) {
val initiatedNode = startNode().getOrThrow()
val initiating = startNode(rpcUsers = users).getOrThrow().rpc
val counterParty = initiatedNode.nodeInfo.chooseIdentity()
@ -61,11 +65,11 @@ class P2PFlowsDrainingModeTest {
initiating.apply {
val flow = startFlow(::InitiateSessionFlow, counterParty)
// this should be really fast, for the flow has already started, so 5 seconds should never be a problem
executor!!.submit({
executor!!.schedule({
logger.info("Now disabling flows draining mode for $counterParty.")
shouldFail = false
initiated.setFlowsDrainingModeEnabled(false)
})
}, 5, TimeUnit.SECONDS)
flow.returnValue.map { result ->
if (shouldFail) {
fail("Shouldn't happen until flows draining mode is switched off.")

View File

@ -193,8 +193,9 @@ open class Node(configuration: NodeConfiguration,
services.networkMapCache,
services.monitoringService.metrics,
advertisedAddress,
networkParameters.maxMessageSize
)
networkParameters.maxMessageSize,
nodeProperties.flowsDrainingMode::isEnabled,
nodeProperties.flowsDrainingMode.values)
}
private fun startLocalRpcBroker(networkParameters: NetworkParameters): BrokerAddresses? {

View File

@ -97,7 +97,7 @@ interface MessagingService {
* @param additionalProperties optional additional message headers.
* @param topic identifier for the topic the message is sent to.
*/
fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom())): Message
fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom()), additionalHeaders: Map<String, String> = emptyMap()): Message
/** Given information about either a specific node or a service returns its corresponding address */
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
@ -106,9 +106,8 @@ interface MessagingService {
val myAddress: SingleMessageRecipient
}
fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom()), retryId: Long? = null)
= send(createMessage(topicSession, payload.serialize().bytes, deduplicationId), to, retryId)
fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: DeduplicationId = DeduplicationId.createRandom(newSecureRandom()), retryId: Long? = null, additionalHeaders: Map<String, String> = emptyMap())
= send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to, retryId)
interface MessageHandlerRegistration
@ -129,6 +128,7 @@ interface Message {
val debugTimestamp: Instant
val uniqueMessageId: DeduplicationId
val senderUUID: String?
val additionalHeaders: Map<String, String>
}
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
@ -165,3 +165,11 @@ interface AcknowledgeHandle {
}
typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, AcknowledgeHandle) -> Unit
object P2PMessagingHeaders {
object Type {
const val KEY = "corda_p2p_message_type"
const val SESSION_INIT_VALUE = "session_init"
}
}

View File

@ -223,6 +223,7 @@ class MessagingExecutor(
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
message.additionalHeaders.forEach { key, value -> putStringProperty(key, value) }
}
}

View File

@ -14,19 +14,31 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.internal.LifecycleSupport
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -36,11 +48,16 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.time.Instant
import java.util.*
@ -82,7 +99,7 @@ import javax.persistence.Lob
@ThreadSafe
class P2PMessagingClient(val config: NodeConfiguration,
private val versionInfo: VersionInfo,
serverAddress: NetworkHostAndPort,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val serviceIdentity: PublicKey?,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
@ -90,8 +107,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val networkMap: NetworkMapCacheInternal,
private val metricRegistry: MetricRegistry,
advertisedAddress: NetworkHostAndPort = serverAddress,
maxMessageSize: Int
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver {
private val maxMessageSize: Int,
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
companion object {
private val log = contextLogger()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
@ -105,7 +124,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
val senderUUID = SimpleString("sender-uuid")
val senderSeqNo = SimpleString("send-seq-no")
private val messageMaxRetryCount: Int = 3
private const val messageMaxRetryCount: Int = 3
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
return PersistentMap(
@ -127,18 +146,25 @@ class P2PMessagingClient(val config: NodeConfiguration,
)
}
private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?) : Message {
private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?, override val additionalHeaders: Map<String, String>) : Message {
override val debugTimestamp: Instant = Instant.now()
override fun toString() = "$topic#${String(data.bytes)}"
}
}
private class InnerState {
var started = false
var running = false
var p2pConsumer: ClientConsumer? = null
var serviceConsumer: ClientConsumer? = null
var eventsSubscription: Subscription? = null
var p2pConsumer: P2PMessagingConsumer? = null
var locator: ServerLocator? = null
var producer: ClientProducer? = null
var producerSession: ClientSession? = null
var bridgeSession: ClientSession? = null
var bridgeNotifyConsumer: ClientConsumer? = null
var networkChangeSubscription: Subscription? = null
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
}
private val messagesToRedeliver = database.transaction {
@ -152,14 +178,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val artemis = ArtemisMessagingClient(
config = config,
serverAddress = serverAddress,
maxMessageSize = maxMessageSize,
autoCommitSends = false,
autoCommitAcks = false,
confirmationWindowSize = config.enterpriseConfiguration.tuning.p2pConfirmationWindowSize
)
private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
@ -185,28 +203,46 @@ class P2PMessagingClient(val config: NodeConfiguration,
fun start() {
state.locked {
val started = artemis.start()
val session = started.session
val inbox = RemoteInboxAddress(myIdentity).queueName
val inboxes = mutableListOf(inbox)
started = true
log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = -1
clientFailureCheckPeriod = -1
minLargeMessageSize = maxMessageSize
isUseGlobalPools = nodeSerializationEnv != null
}
val sessionFactory = locator!!.createSessionFactory()
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
// size of 1MB is acknowledged.
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER, false, true, true, locator!!.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
producerSession = createNewSession()
bridgeSession = createNewSession()
producerSession!!.start()
bridgeSession!!.start()
val inboxes = mutableSetOf<String>()
// Create a queue, consumer and producer for handling P2P network messages.
createQueueIfAbsent(inbox)
p2pConsumer = session.createConsumer(inbox)
if (serviceIdentity != null) {
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
inboxes += serviceAddress
createQueueIfAbsent(serviceAddress)
val serviceHandler = session.createConsumer(serviceAddress)
serviceHandler.setMessageHandler { msg ->
val message: ReceivedMessage? = artemisToCordaMessage(msg)
if (message != null)
deliver(msg, message)
}
// Create a general purpose producer.
producer = producerSession!!.createProducer()
inboxes += RemoteInboxAddress(myIdentity).queueName
serviceIdentity?.let {
inboxes += RemoteInboxAddress(it).queueName
}
inboxes.forEach { createQueueIfAbsent(it, producerSession!!) }
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
val messagingExecutor = MessagingExecutor(
session,
started.producer,
producerSession!!,
producer!!,
versionInfo,
this@P2PMessagingClient,
metricRegistry,
@ -216,8 +252,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
this@P2PMessagingClient.messagingExecutor = messagingExecutor
messagingExecutor.start()
registerBridgeControl(session, inboxes)
enumerateBridges(session, inboxes)
registerBridgeControl(bridgeSession!!, inboxes.toList())
enumerateBridges(bridgeSession!!, inboxes.toList())
}
resumeMessageRedelivery()
@ -237,7 +273,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
log.info(notifyMessage.toString())
when (notifyMessage) {
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage")
else -> log.error("Unexpected Bridge Control message type on notify topic $notifyMessage")
}
msg.acknowledge()
}
@ -245,22 +281,24 @@ class P2PMessagingClient(val config: NodeConfiguration,
networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) }
}
private fun sendBridgeControl(message: BridgeControl) {
val client = artemis.started!!
val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = client.session.createMessage(false)
artemisMessage.writeBodyBufferBytes(controlPacket)
client.producer.send(BRIDGE_CONTROL, artemisMessage)
client.session.commit()
private fun sendBridgeControl(message: BridgeControl) {
state.locked {
val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = producerSession!!.createMessage(false)
artemisMessage.writeBodyBufferBytes(controlPacket)
sendMessage(BRIDGE_CONTROL, artemisMessage)
}
}
private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) {
log.info("Updating bridges on network map change: ${change.node}")
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
return node.legalIdentitiesAndCerts.map {
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name })
}.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
return state.locked {
node.legalIdentitiesAndCerts.map {
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name })
}.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
}
}
fun deployBridges(node: NodeInfo) {
@ -319,39 +357,31 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val shutdownLatch = CountDownLatch(1)
private fun processMessage(consumer: ClientConsumer): Boolean {
// Two possibilities here:
//
// 1. We block waiting for a message and the consumer is closed in another thread. In this case
// receive returns null and we break out of the loop.
// 2. We receive a message and process it, and stop() is called during delivery. In this case,
// calling receive will throw and we break out of the loop.
//
// It's safe to call into receive simultaneous with other threads calling send on a producer.
val artemisMessage: ClientMessage = try {
consumer.receive()
} catch (e: ActiveMQObjectClosedException) {
null
} ?: return false
deliver(artemisMessage)
return true
}
/**
* Starts the p2p event loop: this method only returns once [stop] has been called.
*/
fun run() {
val latch = CountDownLatch(1)
try {
val consumer = state.locked {
check(artemis.started != null) { "start must be called first" }
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
// If it's null, it means we already called stop, so return immediately.
p2pConsumer ?: return
if (p2pConsumer == null) {
return
}
eventsSubscription = p2pConsumer!!.messages
.doOnError { error -> throw error }
.doOnNext { message -> deliver(message) }
// this `run()` method is semantically meant to block until the message consumption runs, hence the latch here
.doOnCompleted(latch::countDown)
.doOnError { error -> throw error }
.subscribe()
p2pConsumer!!
}
while (processMessage(consumer)) { }
consumer.start()
latch.await()
} finally {
shutdownLatch.countDown()
}
@ -389,51 +419,51 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val message: ClientMessage) : ReceivedMessage {
override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) }
override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp)
override val additionalHeaders: Map<String, String> = emptyMap()
override fun toString() = "$topic#$data"
}
internal fun deliver(artemisMessage: ClientMessage) {
val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage)
if (message != null)
deliver(artemisMessage, message)
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
if (!deduplicator.isDuplicate(cordaMessage)) {
deliver(cordaMessage, artemisMessage)
} else {
log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" }
}
}
}
private fun deliver(artemisMessage: ClientMessage, msg: ReceivedMessage) {
private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) {
state.checkNotLocked()
val deliverTo = handlers[msg.topic]
try {
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
// directly in order to ensure that we have the features of the AffinityExecutor class throughout
// the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor
// easily.
//
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
// start/run/stop have re-entrancy assertions at the top, so it is OK.
if (deliverTo != null) {
if (deduplicator.isDuplicate(msg)) {
log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topic}" }
return
}
val acknowledgeHandle = object : AcknowledgeHandle {
override fun persistDeduplicationId() {
deduplicator.persistDeduplicationId(msg)
}
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
// doesn't collide with a send here. Note that stop() could have been called whilst we were
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
override fun acknowledge() {
messagingExecutor!!.acknowledge(artemisMessage)
}
}
deliverTo(msg, HandlerRegistration(msg.topic, deliverTo), acknowledgeHandle)
} else {
log.warn("Received message ${msg.uniqueMessageId} for ${msg.topic} that doesn't have any registered handlers yet")
if (deliverTo != null) {
try {
deliverTo(msg, HandlerRegistration(msg.topic, deliverTo), acknowledgeHandleFor(artemisMessage, msg))
} catch (e: Exception) {
log.error("Caught exception whilst executing message handler for ${msg.topic}", e)
}
} else {
log.warn("Received message ${msg.uniqueMessageId} for ${msg.topic} that doesn't have any registered handlers yet")
}
}
private fun acknowledgeHandleFor(artemisMessage: ClientMessage, cordaMessage: ReceivedMessage): AcknowledgeHandle {
return object : AcknowledgeHandle {
override fun persistDeduplicationId() {
deduplicator.persistDeduplicationId(cordaMessage)
}
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
// doesn't collide with a send here. Note that stop() could have been called whilst we were
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
override fun acknowledge() {
messagingExecutor!!.acknowledge(artemisMessage)
}
} catch (e: Exception) {
log.error("Caught exception whilst executing message handler for ${msg.topic}", e)
}
}
@ -446,30 +476,24 @@ class P2PMessagingClient(val config: NodeConfiguration,
fun stop() {
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(artemis.started != null)
check(started)
val prevRunning = running
running = false
networkChangeSubscription?.unsubscribe()
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
try {
c.close()
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
try {
bridgeNotifyConsumer!!.close()
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
require(p2pConsumer != null, {"stop can't be called twice"})
require(producer != null, {"stop can't be called twice"})
close(p2pConsumer)
p2pConsumer = null
val s = serviceConsumer
try {
s?.close()
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
serviceConsumer = null
close(producer)
producer = null
producerSession!!.commit()
close(bridgeNotifyConsumer)
knownQueues.clear()
eventsSubscription?.unsubscribe()
eventsSubscription = null
prevRunning
}
if (running && !nodeExecutor.isOnThread) {
@ -478,13 +502,21 @@ class P2PMessagingClient(val config: NodeConfiguration,
}
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
messagingExecutor?.close()
if (running) {
state.locked {
artemis.stop()
}
state.locked {
locator?.close()
}
}
private fun close(target: AutoCloseable?) {
try {
target?.close()
} catch (ignored: ActiveMQObjectClosedException) {
// swallow
}
}
override fun close() = stop()
@Suspendable
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
messagingExecutor!!.send(message, target)
@ -495,7 +527,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
scheduledMessageRedeliveries[it] = nodeExecutor.schedule({
sendWithRetry(0, message, target, retryId)
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
}
}
@ -544,15 +575,16 @@ class P2PMessagingClient(val config: NodeConfiguration,
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
createQueueIfAbsent(internalTargetQueue)
state.locked {
createQueueIfAbsent(internalTargetQueue, producerSession!!)
}
internalTargetQueue
}
}
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: String) {
private fun createQueueIfAbsent(queueName: String, session: ClientSession) {
if (!knownQueues.contains(queueName)) {
val session = artemis.started!!.session
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
@ -587,8 +619,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
handlers.remove(registration.topic)
}
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message {
return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID)
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map<String, String>): Message {
return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders)
}
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
@ -598,3 +631,78 @@ class P2PMessagingClient(val config: NodeConfiguration,
}
}
}
private class P2PMessagingConsumer(
queueNames: Set<String>,
createSession: () -> ClientSession,
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
private companion object {
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
}
private var startedFlag = false
val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages)
private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages)
private val subscriptions = mutableSetOf<Subscription>()
override fun start() {
synchronized(this) {
require(!startedFlag)
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe()
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe()
subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe()
subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe()
if (!isDrainingModeOn()) {
initialConsumer.start()
}
existingConsumer.start()
startedFlag = true
}
}
override fun stop() {
synchronized(this) {
if (startedFlag) {
initialConsumer.stop()
existingConsumer.stop()
subscriptions.forEach(Subscription::unsubscribe)
subscriptions.clear()
startedFlag = false
}
messages.onCompleted()
}
}
override val started: Boolean
get() = startedFlag
private fun pauseInitial() {
if (initialConsumer.started && initialConsumer.connected) {
initialConsumer.disconnect()
}
}
private fun resumeInitial() {
if(!initialConsumer.started) {
initialConsumer.start()
}
if (!initialConsumer.connected) {
initialConsumer.connect()
}
}
private fun Pair<Boolean, Boolean>.switchedOff() = first && !second
private fun Pair<Boolean, Boolean>.switchedOn() = !first && second
}

View File

@ -29,10 +29,10 @@ class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, persistenc
@Table(name = "${NODE_DATABASE_PREFIX}properties")
class DBNodeProperty(
@Id
@Column(name = "key")
@Column(name = "property_key")
val key: String = "",
@Column(name = "value")
@Column(name = "property_value")
var value: String? = ""
)
}

View File

@ -10,6 +10,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.AcknowledgeHandle
import net.corda.node.services.messaging.P2PMessagingHeaders
import net.corda.node.services.messaging.ReceivedMessage
import java.io.NotSerializableException
@ -50,7 +51,7 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
@Suspendable
override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId) {
log.trace { "Sending message $deduplicationId $message to party $party" }
val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId)
val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders())
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party")
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
val sequenceKey = when (message) {
@ -60,6 +61,13 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
serviceHub.networkService.send(networkMessage, address, sequenceKey = sequenceKey)
}
private fun SessionMessage.additionalHeaders(): Map<String, String> {
return when (this) {
is InitialSessionMessage -> mapOf(P2PMessagingHeaders.Type.KEY to P2PMessagingHeaders.Type.SESSION_INIT_VALUE)
else -> emptyMap()
}
}
private fun serializeSessionMessage(message: SessionMessage): SerializedBytes<SessionMessage> {
return try {
message.serialize()

View File

@ -5,5 +5,6 @@
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<include file="migration/node-info.changelog-init.xml"/>
<include file="migration/node-info.changelog-v1.xml"/>
</databaseChangeLog>

View File

@ -0,0 +1,14 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="key/value node properties table">
<createTable tableName="node_properties">
<column name="property_key" type="NVARCHAR(255)">
<constraints nullable="false"/>
</column>
<column name="property_value" type="NVARCHAR(255)"/>
</createTable>
</changeSet>
</databaseChangeLog>

View File

@ -20,7 +20,6 @@ import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.startFlow

View File

@ -6,20 +6,29 @@ import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.generateKeyPair
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.configureDatabase
import net.corda.node.services.config.*
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.freeLocalHostAndPort
import net.corda.testing.core.freePort
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
@ -260,7 +269,6 @@ class ArtemisMessagingTest {
}
}
private fun startNodeMessagingClient() {
messagingClient!!.start()
}
@ -296,7 +304,9 @@ class ArtemisMessagingTest {
database,
networkMapCache,
MetricRegistry(),
maxMessageSize = maxMessageSize).apply {
maxMessageSize = maxMessageSize,
isDrainingModeOn = { false },
drainingModeWasChangedEvents = PublishSubject.create()).apply {
config.configureWithDevSSLCertificate()
messagingClient = this
}

View File

@ -1,6 +1,5 @@
package net.corda.testing.node
import net.corda.core.CordaInternal
import net.corda.core.DoNotImplement
import net.corda.core.crypto.CompositeKey
import net.corda.core.identity.CordaX500Name
@ -19,7 +18,12 @@ import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.trace
import net.corda.node.services.messaging.*
import net.corda.node.services.messaging.AcknowledgeHandle
import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.MessageHandler
import net.corda.node.services.messaging.MessageHandlerRegistration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -260,6 +264,9 @@ class InMemoryMessagingNetwork private constructor(
override val uniqueMessageId: DeduplicationId,
override val debugTimestamp: Instant = Instant.now(),
override val senderUUID: String? = null) : Message {
override val additionalHeaders: Map<String, String> = emptyMap()
override fun toString() = "$topic#${String(data.bytes)}"
}
@ -270,7 +277,10 @@ class InMemoryMessagingNetwork private constructor(
override val debugTimestamp: Instant,
override val peer: CordaX500Name,
override val senderUUID: String? = null,
override val senderSeqNo: Long? = null) : ReceivedMessage
override val senderSeqNo: Long? = null) : ReceivedMessage {
override val additionalHeaders: Map<String, String> = emptyMap()
}
/**
* A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to
@ -368,7 +378,7 @@ class InMemoryMessagingNetwork private constructor(
override fun cancelRedelivery(retryId: Long) {}
/** Returns the given (topic & session, data) pair as a newly created message object. */
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message {
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map<String, String>): Message {
return InMemoryMessage(topic, OpaqueBytes(data), deduplicationId)
}