mirror of
https://github.com/corda/corda.git
synced 2024-12-18 12:46:29 +00:00
Merge pull request #7770 from corda/adel/ENT-12008-os
ENT-12008: Upgrade artemis and resolved deprecated methods.
This commit is contained in:
commit
d6c514c86e
@ -552,7 +552,7 @@ class RPCStabilityTests {
|
||||
// Construct an RPC session manually so that we can hang in the message handler
|
||||
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
|
||||
val session = startArtemisSession(server.broker.hostAndPort!!)
|
||||
session.createQueue(QueueConfiguration(myQueue)
|
||||
session.createQueue(QueueConfiguration.of(myQueue)
|
||||
.setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType())
|
||||
.setAddress(myQueue)
|
||||
.setTemporary(true)
|
||||
@ -569,7 +569,7 @@ class RPCStabilityTests {
|
||||
|
||||
val message = session.createMessage(false)
|
||||
val request = RPCApi.ClientToServer.RpcRequest(
|
||||
clientAddress = SimpleString(myQueue),
|
||||
clientAddress = SimpleString.of(myQueue),
|
||||
methodName = SlowConsumerRPCOps::streamAtInterval.name,
|
||||
serialisedArguments = listOf(100.millis, 1234).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT),
|
||||
replyId = Trace.InvocationId.newInstance(),
|
||||
@ -593,7 +593,7 @@ class RPCStabilityTests {
|
||||
// Construct an RPC client session manually
|
||||
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
|
||||
val session = startArtemisSession(server.broker.hostAndPort!!)
|
||||
session.createQueue(QueueConfiguration(myQueue)
|
||||
session.createQueue(QueueConfiguration.of(myQueue)
|
||||
.setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType())
|
||||
.setAddress(myQueue)
|
||||
.setTemporary(true)
|
||||
@ -612,7 +612,7 @@ class RPCStabilityTests {
|
||||
|
||||
val message = session.createMessage(false)
|
||||
val request = RPCApi.ClientToServer.RpcRequest(
|
||||
clientAddress = SimpleString(myQueue),
|
||||
clientAddress = SimpleString.of(myQueue),
|
||||
methodName = DummyOps::protocolVersion.name,
|
||||
serialisedArguments = emptyList<Any>().serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT),
|
||||
replyId = Trace.InvocationId.newInstance(),
|
||||
|
@ -652,9 +652,9 @@ internal class RPCClientProxyHandler(
|
||||
producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, 16384)
|
||||
clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}")
|
||||
clientAddress = SimpleString.of("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}")
|
||||
log.debug { "Client address: $clientAddress" }
|
||||
consumerSession!!.createQueue(QueueConfiguration(clientAddress).setAddress(clientAddress).setRoutingType(RoutingType.ANYCAST)
|
||||
consumerSession!!.createQueue(QueueConfiguration.of(clientAddress).setAddress(clientAddress).setRoutingType(RoutingType.ANYCAST)
|
||||
.setTemporary(true).setDurable(false))
|
||||
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
|
||||
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
|
||||
|
@ -45,7 +45,7 @@ commonsTextVersion=1.10.0
|
||||
# We must configure it manually to use the latest capsule version.
|
||||
capsuleVersion=1.0.4_r3
|
||||
asmVersion=9.5
|
||||
artemisVersion=2.32.0
|
||||
artemisVersion=2.35.0
|
||||
# TODO Upgrade Jackson only when corda is using kotlin 1.3.10
|
||||
jacksonVersion=2.17.0
|
||||
jacksonKotlinVersion=2.17.0
|
||||
|
@ -42,18 +42,18 @@ class ArtemisMessagingComponent {
|
||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
||||
// confusion.
|
||||
val topicProperty = SimpleString("platform-topic")
|
||||
val cordaVendorProperty = SimpleString("corda-vendor")
|
||||
val releaseVersionProperty = SimpleString("release-version")
|
||||
val platformVersionProperty = SimpleString("platform-version")
|
||||
val senderUUID = SimpleString("sender-uuid")
|
||||
val senderSeqNo = SimpleString("send-seq-no")
|
||||
val topicProperty = SimpleString.of("platform-topic")
|
||||
val cordaVendorProperty = SimpleString.of("corda-vendor")
|
||||
val releaseVersionProperty = SimpleString.of("release-version")
|
||||
val platformVersionProperty = SimpleString.of("platform-version")
|
||||
val senderUUID = SimpleString.of("sender-uuid")
|
||||
val senderSeqNo = SimpleString.of("send-seq-no")
|
||||
/**
|
||||
* In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header
|
||||
* as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward
|
||||
* the equivalent information from the Float.
|
||||
*/
|
||||
val bridgedCertificateSubject = SimpleString("sender-subject-name")
|
||||
val bridgedCertificateSubject = SimpleString.of("sender-subject-name")
|
||||
|
||||
object Type {
|
||||
const val KEY = "corda_p2p_message_type"
|
||||
|
@ -88,7 +88,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
registerBridgeControlListener(artemisSession)
|
||||
registerBridgeDuplicateChecker(artemisSession)
|
||||
// Attempt to read available inboxes directly from Artemis before requesting updates from connected nodes
|
||||
validInboundQueues.addAll(artemisSession.addressQuery(SimpleString("$P2P_PREFIX#")).queueNames.map { it.toString() })
|
||||
validInboundQueues.addAll(artemisSession.addressQuery(SimpleString.of("$P2P_PREFIX#")).queueNames.map { it.toString() })
|
||||
log.info("Found inboxes: $validInboundQueues")
|
||||
if (active) {
|
||||
_activeChange.onNext(true)
|
||||
@ -107,7 +107,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
private fun registerBridgeControlListener(artemisSession: ClientSession) {
|
||||
try {
|
||||
artemisSession.createQueue(
|
||||
QueueConfiguration(bridgeControlQueue).setAddress(BRIDGE_CONTROL).setRoutingType(RoutingType.MULTICAST)
|
||||
QueueConfiguration.of(bridgeControlQueue).setAddress(BRIDGE_CONTROL).setRoutingType(RoutingType.MULTICAST)
|
||||
.setTemporary(true).setDurable(false))
|
||||
} catch (ex: ActiveMQQueueExistsException) {
|
||||
// Ignore if there is a queue still not cleaned up
|
||||
@ -129,7 +129,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) {
|
||||
try {
|
||||
artemisSession.createQueue(
|
||||
QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST)
|
||||
QueueConfiguration.of(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST)
|
||||
.setTemporary(true).setDurable(false))
|
||||
} catch (ex: ActiveMQQueueExistsException) {
|
||||
// Ignore if there is a queue still not cleaned up
|
||||
@ -189,11 +189,11 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
}
|
||||
|
||||
private fun validateInboxQueueName(queueName: String): Boolean {
|
||||
return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
|
||||
return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString.of(queueName)).isExists
|
||||
}
|
||||
|
||||
private fun validateBridgingQueueName(queueName: String): Boolean {
|
||||
return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
|
||||
return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString.of(queueName)).isExists
|
||||
}
|
||||
|
||||
private fun processControlMessage(msg: ClientMessage) {
|
||||
|
@ -136,7 +136,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
logDebugWithMDC { "Loopback Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty(MESSAGE_ID_KEY)}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
producer?.send(SimpleString(peerInbox), artemisMessage) { artemisMessage.individualAcknowledge() }
|
||||
producer?.send(SimpleString.of(peerInbox), artemisMessage) { artemisMessage.individualAcknowledge() }
|
||||
bridgeMetricsService?.let { metricsService ->
|
||||
val properties = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.mapNotNull { key ->
|
||||
if (artemisMessage.containsProperty(key)) {
|
||||
|
@ -70,7 +70,7 @@ class RoundTripObservableSerializerTests {
|
||||
subscriptionMap(id),
|
||||
clientAddressToObservables = ConcurrentHashMap(),
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString("clientAddress"))
|
||||
clientAddress = SimpleString.of("clientAddress"))
|
||||
|
||||
val serverSerializer = serializationScheme.rpcServerSerializerFactory(serverObservableContext)
|
||||
|
||||
|
@ -49,7 +49,7 @@ class RpcServerObservableSerializerTests {
|
||||
subscriptionMap(),
|
||||
clientAddressToObservables = ConcurrentHashMap(),
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString("clientAddress"))
|
||||
clientAddress = SimpleString.of("clientAddress"))
|
||||
|
||||
val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable)
|
||||
|
||||
@ -65,7 +65,7 @@ class RpcServerObservableSerializerTests {
|
||||
subscriptionMap(),
|
||||
clientAddressToObservables = ConcurrentHashMap(),
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString(testClientAddress))
|
||||
clientAddress = SimpleString.of(testClientAddress))
|
||||
|
||||
val sf = SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcServerObservableSerializer())
|
||||
|
@ -62,7 +62,7 @@ class AMQPBridgeTest {
|
||||
putIntProperty(P2PMessagingHeaders.senderUUID, i)
|
||||
writeBodyBufferBytes("Test$i".toByteArray())
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString.of(UUID.randomUUID().toString()))
|
||||
}
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
}
|
||||
@ -139,7 +139,7 @@ class AMQPBridgeTest {
|
||||
putIntProperty(P2PMessagingHeaders.senderUUID, 3)
|
||||
writeBodyBufferBytes("Test3".toByteArray())
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString.of(UUID.randomUUID().toString()))
|
||||
}
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
|
||||
@ -224,7 +224,7 @@ class AMQPBridgeTest {
|
||||
if (sourceQueueName != null) {
|
||||
// Local queue for outgoing messages
|
||||
artemis.session.createQueue(
|
||||
QueueConfiguration(sourceQueueName).setRoutingType(RoutingType.ANYCAST).setAddress(sourceQueueName).setDurable(true))
|
||||
QueueConfiguration.of(sourceQueueName).setRoutingType(RoutingType.ANYCAST).setAddress(sourceQueueName).setDurable(true))
|
||||
bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(bob.name))
|
||||
}
|
||||
return Triple(artemisServer, artemisClient, bridgeManager)
|
||||
|
@ -499,7 +499,7 @@ class ArtemisServerRevocationTest : AbstractServerRevocationTest() {
|
||||
|
||||
val queueName = "${P2P_PREFIX}Test"
|
||||
artemisNode.client.started!!.session.createQueue(
|
||||
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true)
|
||||
QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true)
|
||||
)
|
||||
|
||||
val clientConnectionChangeStatus = client.waitForInitialConnectionAndCaptureChanges(expectedConnectedStatus)
|
||||
|
@ -374,7 +374,7 @@ class ProtonWrapperTests {
|
||||
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
|
||||
val artemis = artemisClient.started!!
|
||||
val sendAddress = P2P_PREFIX + "Test"
|
||||
artemis.session.createQueue(QueueConfiguration("queue")
|
||||
artemis.session.createQueue(QueueConfiguration.of("queue")
|
||||
.setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true))
|
||||
val consumer = artemis.session.createConsumer("queue")
|
||||
val testData = "Test".toByteArray()
|
||||
@ -404,7 +404,7 @@ class ProtonWrapperTests {
|
||||
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
|
||||
val artemis = artemisClient.started!!
|
||||
val sendAddress = P2P_PREFIX + "Test"
|
||||
artemis.session.createQueue(QueueConfiguration("queue")
|
||||
artemis.session.createQueue(QueueConfiguration.of("queue")
|
||||
.setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true))
|
||||
val consumer = artemis.session.createConsumer("queue")
|
||||
|
||||
|
@ -117,7 +117,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
fun loginToRPCAndGetClientQueue(): String {
|
||||
loginToRPC(alice.node.configuration.rpcOptions.address, rpcUser)
|
||||
val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*")
|
||||
val clientQueueQuery = SimpleString.of("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*")
|
||||
val client = clientTo(alice.node.configuration.rpcOptions.address)
|
||||
client.start(rpcUser.username, rpcUser.password, false)
|
||||
return client.session.addressQuery(clientQueueQuery).queueNames.single().toString()
|
||||
@ -131,7 +131,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
fun assertTempQueueCreationAttackFails(queue: String) {
|
||||
assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") {
|
||||
attacker.session.createQueue(QueueConfiguration(queue)
|
||||
attacker.session.createQueue(QueueConfiguration.of(queue)
|
||||
.setRoutingType(RoutingType.MULTICAST)
|
||||
.setAddress(queue)
|
||||
.setTemporary(true)
|
||||
@ -153,7 +153,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE"
|
||||
assertAttackFails(queue, permission) {
|
||||
attacker.session.createQueue(
|
||||
QueueConfiguration(queue).setAddress(queue).setRoutingType(RoutingType.MULTICAST).setDurable(durable))
|
||||
QueueConfiguration.of(queue).setAddress(queue).setRoutingType(RoutingType.MULTICAST).setDurable(durable))
|
||||
}
|
||||
// Double-check
|
||||
assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy {
|
||||
|
@ -169,7 +169,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio()
|
||||
journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio()
|
||||
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
|
||||
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
|
||||
managementNotificationAddress = SimpleString.of(NOTIFICATIONS_ADDRESS)
|
||||
|
||||
// JMX enablement
|
||||
if (config.jmxMonitoringHttpPort != null) {
|
||||
@ -189,7 +189,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
* 4. Verifiers. These are given read access to the verification request queue and write access to the response queue.
|
||||
*/
|
||||
private fun ConfigurationImpl.configureAddressSecurity(): Configuration {
|
||||
val nodeInternalRole = Role(NODE_P2P_ROLE, true, true, true, true, true, true, true, true, true, true)
|
||||
val nodeInternalRole = Role(NODE_P2P_ROLE, true, true, true, true, true, true, true, true, true, true, false, false)
|
||||
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
|
||||
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
|
||||
securityInvalidationInterval = SECURITY_INVALIDATION_INTERVAL
|
||||
@ -200,7 +200,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
|
||||
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {
|
||||
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue,
|
||||
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue)
|
||||
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue, false, false)
|
||||
}
|
||||
|
||||
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
|
||||
|
@ -33,8 +33,8 @@ class MessagingExecutor(
|
||||
val resolver: AddressToArtemisQueueResolver,
|
||||
val ourSenderUUID: String
|
||||
) {
|
||||
private val cordaVendor = SimpleString(versionInfo.vendor)
|
||||
private val releaseVersion = SimpleString(versionInfo.releaseVersion)
|
||||
private val cordaVendor = SimpleString.of(versionInfo.vendor)
|
||||
private val releaseVersion = SimpleString.of(versionInfo.releaseVersion)
|
||||
private val ourSenderSeqNo = AtomicLong()
|
||||
|
||||
private companion object {
|
||||
@ -50,7 +50,7 @@ class MessagingExecutor(
|
||||
"Send to: $mqAddress topic: ${message.topic} " +
|
||||
"sessionID: ${message.topic} id: ${message.uniqueMessageId}"
|
||||
}
|
||||
producer.send(SimpleString(mqAddress), artemisMessage)
|
||||
producer.send(SimpleString.of(mqAddress), artemisMessage)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
@ -72,13 +72,13 @@ class MessagingExecutor(
|
||||
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
|
||||
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
|
||||
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
|
||||
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
|
||||
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString.of(message.topic))
|
||||
writeBodyBufferBytes(message.data.bytes)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.of(message.uniqueMessageId.toString))
|
||||
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
|
||||
if (ourSenderUUID == message.senderUUID) {
|
||||
putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID))
|
||||
putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString.of(ourSenderUUID))
|
||||
putLongProperty(P2PMessagingHeaders.senderSeqNo, ourSenderSeqNo.getAndIncrement())
|
||||
}
|
||||
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
|
||||
|
@ -279,8 +279,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
|
||||
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
|
||||
val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}"
|
||||
if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) {
|
||||
session.createQueue(QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST)
|
||||
if (!session.queueQuery(SimpleString.of(bridgeNotifyQueue)).isExists) {
|
||||
session.createQueue(QueueConfiguration.of(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST)
|
||||
.setTemporary(true).setDurable(false))
|
||||
}
|
||||
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
|
||||
@ -316,7 +316,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
node.legalIdentitiesAndCerts.map { partyAndCertificate ->
|
||||
val messagingAddress = NodeAddress(partyAndCertificate.party.owningKey)
|
||||
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false)
|
||||
}.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
|
||||
}.filter { producerSession!!.queueQuery(SimpleString.of(it.queueName)).isExists }.asSequence()
|
||||
}
|
||||
}
|
||||
|
||||
@ -360,7 +360,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames
|
||||
val queues = session.addressQuery(SimpleString.of("$PEERS_PREFIX#")).queueNames
|
||||
knownQueues.clear()
|
||||
for (queue in queues) {
|
||||
val queueQuery = session.queueQuery(queue)
|
||||
@ -604,10 +604,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
sendBridgeCreateMessage()
|
||||
delayStartQueues -= queueName
|
||||
} else {
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
val queueQuery = session.queueQuery(SimpleString.of(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName)
|
||||
session.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName)
|
||||
.setDurable(true).setAutoCreated(false)
|
||||
.setMaxConsumers(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
|
||||
.setPurgeOnNoConsumers(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers())
|
||||
|
@ -321,14 +321,14 @@ class RPCServer(
|
||||
require(notificationType == CoreNotificationType.BINDING_REMOVED.name){"Message contained notification type of $notificationType instead of expected ${CoreNotificationType.BINDING_REMOVED.name}"}
|
||||
val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)
|
||||
log.info("Detected RPC client disconnect on address $clientAddress, scheduling for reaping")
|
||||
invalidateClient(SimpleString(clientAddress))
|
||||
invalidateClient(SimpleString.of(clientAddress))
|
||||
}
|
||||
|
||||
private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
lifeCycle.requireState(State.STARTED)
|
||||
val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
|
||||
require(notificationType == CoreNotificationType.BINDING_ADDED.name){"Message contained notification type of $notificationType instead of expected ${CoreNotificationType.BINDING_ADDED.name}"}
|
||||
val clientAddress = SimpleString(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME))
|
||||
val clientAddress = SimpleString.of(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME))
|
||||
log.debug("RPC client queue created on address $clientAddress")
|
||||
|
||||
val buffer = stopBuffering(clientAddress)
|
||||
|
@ -39,7 +39,7 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
||||
|
||||
queueConfigs = queueConfigurations()
|
||||
|
||||
managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
|
||||
managementNotificationAddress = SimpleString.of(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
|
||||
addressSettings = mapOf(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
||||
maxSizeBytes = 5L * maxMessageSize
|
||||
@ -51,7 +51,7 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
||||
globalMaxSize = Runtime.getRuntime().maxMemory() / 8
|
||||
initialiseSettings(maxMessageSize, journalBufferTimeout)
|
||||
|
||||
val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true)
|
||||
val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true, false, false)
|
||||
|
||||
val addRPCRoleToUsers = if (shouldStartLocalShell) listOf(INTERNAL_SHELL_USER) else emptyList()
|
||||
val rolesAdderOnLogin = RolesAdderOnLogin(addRPCRoleToUsers) { username ->
|
||||
@ -127,12 +127,12 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
||||
}
|
||||
|
||||
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): QueueConfiguration {
|
||||
return QueueConfiguration(name).setAddress(address).setFilterString(filter).setDurable(durable)
|
||||
return QueueConfiguration.of(name).setAddress(address).setFilterString(filter).setDurable(durable)
|
||||
}
|
||||
|
||||
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
|
||||
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
|
||||
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {
|
||||
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue)
|
||||
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue, false, false)
|
||||
}
|
||||
}
|
@ -1276,7 +1276,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
)
|
||||
factory2.register(net.corda.serialization.internal.amqp.custom.SimpleStringSerializer)
|
||||
|
||||
val obj = SimpleString("Bob")
|
||||
val obj = SimpleString.of("Bob")
|
||||
serdes(obj, factory, factory2)
|
||||
}
|
||||
|
||||
|
@ -192,16 +192,16 @@ data class RPCDriverDSL(
|
||||
|
||||
private fun ConfigurationImpl.configureCommonSettings(maxFileSize: Int, maxBufferedBytesPerClient: Long) {
|
||||
name = "RPCDriver"
|
||||
managementNotificationAddress = SimpleString(NOTIFICATION_ADDRESS)
|
||||
managementNotificationAddress = SimpleString.of(NOTIFICATION_ADDRESS)
|
||||
isPopulateValidatedUser = true
|
||||
journalBufferSize_NIO = maxFileSize
|
||||
journalBufferSize_AIO = maxFileSize
|
||||
journalFileSize = maxFileSize
|
||||
queueConfigs = listOf(
|
||||
QueueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME).setAddress(RPCApi.RPC_SERVER_QUEUE_NAME).setDurable(false),
|
||||
QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_REMOVALS).setAddress(NOTIFICATION_ADDRESS)
|
||||
QueueConfiguration.of(RPCApi.RPC_SERVER_QUEUE_NAME).setAddress(RPCApi.RPC_SERVER_QUEUE_NAME).setDurable(false),
|
||||
QueueConfiguration.of(RPCApi.RPC_CLIENT_BINDING_REMOVALS).setAddress(NOTIFICATION_ADDRESS)
|
||||
.setFilterString(RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION).setDurable(false),
|
||||
QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_ADDITIONS).setAddress(NOTIFICATION_ADDRESS)
|
||||
QueueConfiguration.of(RPCApi.RPC_CLIENT_BINDING_ADDITIONS).setAddress(NOTIFICATION_ADDRESS)
|
||||
.setFilterString(RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION).setDurable(false)
|
||||
)
|
||||
addressSettings = mapOf(
|
||||
|
Loading…
Reference in New Issue
Block a user