mirror of
https://github.com/corda/corda.git
synced 2024-12-28 00:38:55 +00:00
Merge remote-tracking branch 'remotes/open/master' into mnesbit-merge-20180307
# Conflicts: # node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt # node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt # node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt
This commit is contained in:
parent
2653da9f12
commit
b6831eed6a
@ -15,11 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever
|
|||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.services.config.*
|
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.node.services.config.CertChainPolicyConfig
|
import net.corda.node.services.config.*
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
|
@ -18,13 +18,12 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.trace
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.services.statemachine.FlowMessagingImpl
|
import net.corda.node.services.statemachine.FlowMessagingImpl
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
import java.util.*
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue
|
import java.util.concurrent.ArrayBlockingQueue
|
||||||
import java.util.concurrent.ExecutionException
|
import java.util.concurrent.ExecutionException
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
@ -161,18 +160,18 @@ class MessagingExecutor(
|
|||||||
|
|
||||||
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
||||||
return session.createMessage(true).apply {
|
return session.createMessage(true).apply {
|
||||||
putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor)
|
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
|
||||||
putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion)
|
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
|
||||||
putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion)
|
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
|
||||||
putStringProperty(P2PMessagingClient.topicProperty, SimpleString(message.topic))
|
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
|
||||||
sendMessageSizeMetric.update(message.data.bytes.size)
|
sendMessageSizeMetric.update(message.data.bytes.size)
|
||||||
writeBodyBufferBytes(message.data.bytes)
|
writeBodyBufferBytes(message.data.bytes)
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
// Use the magic deduplication property built into Artemis as our message identity too
|
||||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
|
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
|
||||||
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
|
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
|
||||||
if (ourSenderUUID == message.senderUUID) {
|
if (ourSenderUUID == message.senderUUID) {
|
||||||
putStringProperty(P2PMessagingClient.senderUUID, SimpleString(ourSenderUUID))
|
putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID))
|
||||||
putLongProperty(P2PMessagingClient.senderSeqNo, ourSenderSeqNo.getAndIncrement())
|
putLongProperty(P2PMessagingHeaders.senderSeqNo, ourSenderSeqNo.getAndIncrement())
|
||||||
}
|
}
|
||||||
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
|
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
|
||||||
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
|
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
|
||||||
|
@ -27,11 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.ByteSequence
|
import net.corda.core.utilities.*
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
|
||||||
import net.corda.core.utilities.contextLogger
|
|
||||||
import net.corda.core.utilities.trace
|
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.LifecycleSupport
|
import net.corda.node.internal.LifecycleSupport
|
||||||
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
|
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
|
||||||
@ -43,14 +39,11 @@ import net.corda.node.utilities.PersistentMap
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
import net.corda.nodeapi.ConnectionDirection
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -60,12 +53,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
|||||||
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
import org.apache.activemq.artemis.api.core.client.*
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
|
||||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
@ -399,8 +387,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) }
|
val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) }
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
// Use the magic deduplication property built into Artemis as our message identity too
|
||||||
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
|
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
|
||||||
val receivedSenderUUID = message.getStringProperty(senderUUID)
|
val receivedSenderUUID = message.getStringProperty(P2PMessagingHeaders.senderUUID)
|
||||||
val receivedSenderSeqNo = if (message.containsProperty(senderSeqNo)) message.getLongProperty(senderSeqNo) else null
|
val receivedSenderSeqNo = if (message.containsProperty(P2PMessagingHeaders.senderSeqNo)) message.getLongProperty(P2PMessagingHeaders.senderSeqNo) else null
|
||||||
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" }
|
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" }
|
||||||
|
|
||||||
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message)
|
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message)
|
||||||
|
@ -20,8 +20,8 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.trace
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import net.corda.node.services.messaging.AcknowledgeHandle
|
import net.corda.node.services.messaging.AcknowledgeHandle
|
||||||
import net.corda.node.services.messaging.P2PMessagingHeaders
|
|
||||||
import net.corda.node.services.messaging.ReceivedMessage
|
import net.corda.node.services.messaging.ReceivedMessage
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user