diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 09c8c5a4a9..4ae3373aac 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -552,7 +552,7 @@ class RPCStabilityTests { // Construct an RPC session manually so that we can hang in the message handler val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val session = startArtemisSession(server.broker.hostAndPort!!) - session.createQueue(QueueConfiguration(myQueue) + session.createQueue(QueueConfiguration.of(myQueue) .setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType()) .setAddress(myQueue) .setTemporary(true) @@ -569,7 +569,7 @@ class RPCStabilityTests { val message = session.createMessage(false) val request = RPCApi.ClientToServer.RpcRequest( - clientAddress = SimpleString(myQueue), + clientAddress = SimpleString.of(myQueue), methodName = SlowConsumerRPCOps::streamAtInterval.name, serialisedArguments = listOf(100.millis, 1234).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT), replyId = Trace.InvocationId.newInstance(), @@ -593,7 +593,7 @@ class RPCStabilityTests { // Construct an RPC client session manually val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val session = startArtemisSession(server.broker.hostAndPort!!) - session.createQueue(QueueConfiguration(myQueue) + session.createQueue(QueueConfiguration.of(myQueue) .setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType()) .setAddress(myQueue) .setTemporary(true) @@ -612,7 +612,7 @@ class RPCStabilityTests { val message = session.createMessage(false) val request = RPCApi.ClientToServer.RpcRequest( - clientAddress = SimpleString(myQueue), + clientAddress = SimpleString.of(myQueue), methodName = DummyOps::protocolVersion.name, serialisedArguments = emptyList().serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT), replyId = Trace.InvocationId.newInstance(), diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index ba8e70786d..141ad76b79 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -652,9 +652,9 @@ internal class RPCClientProxyHandler( producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME) consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, 16384) - clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}") + clientAddress = SimpleString.of("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}") log.debug { "Client address: $clientAddress" } - consumerSession!!.createQueue(QueueConfiguration(clientAddress).setAddress(clientAddress).setRoutingType(RoutingType.ANYCAST) + consumerSession!!.createQueue(QueueConfiguration.of(clientAddress).setAddress(clientAddress).setRoutingType(RoutingType.ANYCAST) .setTemporary(true).setDurable(false)) rpcConsumer = consumerSession!!.createConsumer(clientAddress) rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) diff --git a/constants.properties b/constants.properties index 8d82d0fbe5..b6af48b7ca 100644 --- a/constants.properties +++ b/constants.properties @@ -45,7 +45,7 @@ commonsTextVersion=1.10.0 # We must configure it manually to use the latest capsule version. capsuleVersion=1.0.4_r3 asmVersion=9.5 -artemisVersion=2.32.0 +artemisVersion=2.35.0 # TODO Upgrade Jackson only when corda is using kotlin 1.3.10 jacksonVersion=2.17.0 jacksonKotlinVersion=2.17.0 diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt index 7e61e90630..25172487ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt @@ -42,18 +42,18 @@ class ArtemisMessagingComponent { // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid // confusion. - val topicProperty = SimpleString("platform-topic") - val cordaVendorProperty = SimpleString("corda-vendor") - val releaseVersionProperty = SimpleString("release-version") - val platformVersionProperty = SimpleString("platform-version") - val senderUUID = SimpleString("sender-uuid") - val senderSeqNo = SimpleString("send-seq-no") + val topicProperty = SimpleString.of("platform-topic") + val cordaVendorProperty = SimpleString.of("corda-vendor") + val releaseVersionProperty = SimpleString.of("release-version") + val platformVersionProperty = SimpleString.of("platform-version") + val senderUUID = SimpleString.of("sender-uuid") + val senderSeqNo = SimpleString.of("send-seq-no") /** * In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header * as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward * the equivalent information from the Float. */ - val bridgedCertificateSubject = SimpleString("sender-subject-name") + val bridgedCertificateSubject = SimpleString.of("sender-subject-name") object Type { const val KEY = "corda_p2p_message_type" diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 84974450d4..9c60a9885f 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -88,7 +88,7 @@ class BridgeControlListener(private val keyStore: CertificateStore, registerBridgeControlListener(artemisSession) registerBridgeDuplicateChecker(artemisSession) // Attempt to read available inboxes directly from Artemis before requesting updates from connected nodes - validInboundQueues.addAll(artemisSession.addressQuery(SimpleString("$P2P_PREFIX#")).queueNames.map { it.toString() }) + validInboundQueues.addAll(artemisSession.addressQuery(SimpleString.of("$P2P_PREFIX#")).queueNames.map { it.toString() }) log.info("Found inboxes: $validInboundQueues") if (active) { _activeChange.onNext(true) @@ -107,7 +107,7 @@ class BridgeControlListener(private val keyStore: CertificateStore, private fun registerBridgeControlListener(artemisSession: ClientSession) { try { artemisSession.createQueue( - QueueConfiguration(bridgeControlQueue).setAddress(BRIDGE_CONTROL).setRoutingType(RoutingType.MULTICAST) + QueueConfiguration.of(bridgeControlQueue).setAddress(BRIDGE_CONTROL).setRoutingType(RoutingType.MULTICAST) .setTemporary(true).setDurable(false)) } catch (ex: ActiveMQQueueExistsException) { // Ignore if there is a queue still not cleaned up @@ -129,7 +129,7 @@ class BridgeControlListener(private val keyStore: CertificateStore, private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) { try { artemisSession.createQueue( - QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST) + QueueConfiguration.of(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST) .setTemporary(true).setDurable(false)) } catch (ex: ActiveMQQueueExistsException) { // Ignore if there is a queue still not cleaned up @@ -189,11 +189,11 @@ class BridgeControlListener(private val keyStore: CertificateStore, } private fun validateInboxQueueName(queueName: String): Boolean { - return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists + return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString.of(queueName)).isExists } private fun validateBridgingQueueName(queueName: String): Boolean { - return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists + return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString.of(queueName)).isExists } private fun processControlMessage(msg: ClientMessage) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt index 2dd9f8bff0..ce68cfbd96 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt @@ -136,7 +136,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore, private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { logDebugWithMDC { "Loopback Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty(MESSAGE_ID_KEY)}" } val peerInbox = translateLocalQueueToInboxAddress(queueName) - producer?.send(SimpleString(peerInbox), artemisMessage) { artemisMessage.individualAcknowledge() } + producer?.send(SimpleString.of(peerInbox), artemisMessage) { artemisMessage.individualAcknowledge() } bridgeMetricsService?.let { metricsService -> val properties = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.mapNotNull { key -> if (artemisMessage.containsProperty(key)) { diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RoundTripObservableSerializerTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RoundTripObservableSerializerTests.kt index df0383d642..e7a89bf0a3 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RoundTripObservableSerializerTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RoundTripObservableSerializerTests.kt @@ -70,7 +70,7 @@ class RoundTripObservableSerializerTests { subscriptionMap(id), clientAddressToObservables = ConcurrentHashMap(), deduplicationIdentity = "thisIsATest", - clientAddress = SimpleString("clientAddress")) + clientAddress = SimpleString.of("clientAddress")) val serverSerializer = serializationScheme.rpcServerSerializerFactory(serverObservableContext) diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RpcServerObservableSerializerTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RpcServerObservableSerializerTests.kt index b48a8b1d0e..2f0025f569 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RpcServerObservableSerializerTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/RpcServerObservableSerializerTests.kt @@ -49,7 +49,7 @@ class RpcServerObservableSerializerTests { subscriptionMap(), clientAddressToObservables = ConcurrentHashMap(), deduplicationIdentity = "thisIsATest", - clientAddress = SimpleString("clientAddress")) + clientAddress = SimpleString.of("clientAddress")) val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable) @@ -65,7 +65,7 @@ class RpcServerObservableSerializerTests { subscriptionMap(), clientAddressToObservables = ConcurrentHashMap(), deduplicationIdentity = "thisIsATest", - clientAddress = SimpleString(testClientAddress)) + clientAddress = SimpleString.of(testClientAddress)) val sf = SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader).apply { register(RpcServerObservableSerializer()) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 859e3cdf95..3e1fb603e0 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -62,7 +62,7 @@ class AMQPBridgeTest { putIntProperty(P2PMessagingHeaders.senderUUID, i) writeBodyBufferBytes("Test$i".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString.of(UUID.randomUUID().toString())) } artemis.producer.send(sourceQueueName, artemisMessage) } @@ -139,7 +139,7 @@ class AMQPBridgeTest { putIntProperty(P2PMessagingHeaders.senderUUID, 3) writeBodyBufferBytes("Test3".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString.of(UUID.randomUUID().toString())) } artemis.producer.send(sourceQueueName, artemisMessage) @@ -224,7 +224,7 @@ class AMQPBridgeTest { if (sourceQueueName != null) { // Local queue for outgoing messages artemis.session.createQueue( - QueueConfiguration(sourceQueueName).setRoutingType(RoutingType.ANYCAST).setAddress(sourceQueueName).setDurable(true)) + QueueConfiguration.of(sourceQueueName).setRoutingType(RoutingType.ANYCAST).setAddress(sourceQueueName).setDurable(true)) bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(bob.name)) } return Triple(artemisServer, artemisClient, bridgeManager) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index 3970c13add..6536742a25 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -499,7 +499,7 @@ class ArtemisServerRevocationTest : AbstractServerRevocationTest() { val queueName = "${P2P_PREFIX}Test" artemisNode.client.started!!.session.createQueue( - QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true) + QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true) ) val clientConnectionChangeStatus = client.waitForInitialConnectionAndCaptureChanges(expectedConnectedStatus) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 6b573fe2d3..4ff3a1b26d 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -374,7 +374,7 @@ class ProtonWrapperTests { assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) val artemis = artemisClient.started!! val sendAddress = P2P_PREFIX + "Test" - artemis.session.createQueue(QueueConfiguration("queue") + artemis.session.createQueue(QueueConfiguration.of("queue") .setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true)) val consumer = artemis.session.createConsumer("queue") val testData = "Test".toByteArray() @@ -404,7 +404,7 @@ class ProtonWrapperTests { assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) val artemis = artemisClient.started!! val sendAddress = P2P_PREFIX + "Test" - artemis.session.createQueue(QueueConfiguration("queue") + artemis.session.createQueue(QueueConfiguration.of("queue") .setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true)) val consumer = artemis.session.createConsumer("queue") diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index 3b012b7672..5290d4fd08 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -117,7 +117,7 @@ abstract class MQSecurityTest : NodeBasedTest() { fun loginToRPCAndGetClientQueue(): String { loginToRPC(alice.node.configuration.rpcOptions.address, rpcUser) - val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*") + val clientQueueQuery = SimpleString.of("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*") val client = clientTo(alice.node.configuration.rpcOptions.address) client.start(rpcUser.username, rpcUser.password, false) return client.session.addressQuery(clientQueueQuery).queueNames.single().toString() @@ -131,7 +131,7 @@ abstract class MQSecurityTest : NodeBasedTest() { fun assertTempQueueCreationAttackFails(queue: String) { assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") { - attacker.session.createQueue(QueueConfiguration(queue) + attacker.session.createQueue(QueueConfiguration.of(queue) .setRoutingType(RoutingType.MULTICAST) .setAddress(queue) .setTemporary(true) @@ -153,7 +153,7 @@ abstract class MQSecurityTest : NodeBasedTest() { val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE" assertAttackFails(queue, permission) { attacker.session.createQueue( - QueueConfiguration(queue).setAddress(queue).setRoutingType(RoutingType.MULTICAST).setDurable(durable)) + QueueConfiguration.of(queue).setAddress(queue).setRoutingType(RoutingType.MULTICAST).setDurable(durable)) } // Double-check assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index f1b62d527e..0521c97ebf 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -169,7 +169,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio() journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio() journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB. - managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) + managementNotificationAddress = SimpleString.of(NOTIFICATIONS_ADDRESS) // JMX enablement if (config.jmxMonitoringHttpPort != null) { @@ -189,7 +189,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, * 4. Verifiers. These are given read access to the verification request queue and write access to the response queue. */ private fun ConfigurationImpl.configureAddressSecurity(): Configuration { - val nodeInternalRole = Role(NODE_P2P_ROLE, true, true, true, true, true, true, true, true, true, true) + val nodeInternalRole = Role(NODE_P2P_ROLE, true, true, true, true, true, true, true, true, true, true, false, false) securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true)) securityInvalidationInterval = SECURITY_INVALIDATION_INTERVAL @@ -200,7 +200,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false, deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role { return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, - deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue) + deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue, false, false) } private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 0734c958e1..148d692aba 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -33,8 +33,8 @@ class MessagingExecutor( val resolver: AddressToArtemisQueueResolver, val ourSenderUUID: String ) { - private val cordaVendor = SimpleString(versionInfo.vendor) - private val releaseVersion = SimpleString(versionInfo.releaseVersion) + private val cordaVendor = SimpleString.of(versionInfo.vendor) + private val releaseVersion = SimpleString.of(versionInfo.releaseVersion) private val ourSenderSeqNo = AtomicLong() private companion object { @@ -50,7 +50,7 @@ class MessagingExecutor( "Send to: $mqAddress topic: ${message.topic} " + "sessionID: ${message.topic} id: ${message.uniqueMessageId}" } - producer.send(SimpleString(mqAddress), artemisMessage) + producer.send(SimpleString.of(mqAddress), artemisMessage) } @Synchronized @@ -72,13 +72,13 @@ class MessagingExecutor( putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion) - putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic)) + putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString.of(message.topic)) writeBodyBufferBytes(message.data.bytes) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString)) + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.of(message.uniqueMessageId.toString)) // If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut. if (ourSenderUUID == message.senderUUID) { - putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID)) + putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString.of(ourSenderUUID)) putLongProperty(P2PMessagingHeaders.senderSeqNo, ourSenderSeqNo.getAndIncrement()) } // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 4d2e875573..33a5a8786d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -279,8 +279,8 @@ class P2PMessagingClient(val config: NodeConfiguration, private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List) { val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" - if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) { - session.createQueue(QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST) + if (!session.queueQuery(SimpleString.of(bridgeNotifyQueue)).isExists) { + session.createQueue(QueueConfiguration.of(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST) .setTemporary(true).setDurable(false)) } val bridgeConsumer = session.createConsumer(bridgeNotifyQueue) @@ -316,7 +316,7 @@ class P2PMessagingClient(val config: NodeConfiguration, node.legalIdentitiesAndCerts.map { partyAndCertificate -> val messagingAddress = NodeAddress(partyAndCertificate.party.owningKey) BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false) - }.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() + }.filter { producerSession!!.queueQuery(SimpleString.of(it.queueName)).isExists }.asSequence() } } @@ -360,7 +360,7 @@ class P2PMessagingClient(val config: NodeConfiguration, } } - val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames + val queues = session.addressQuery(SimpleString.of("$PEERS_PREFIX#")).queueNames knownQueues.clear() for (queue in queues) { val queueQuery = session.queueQuery(queue) @@ -604,10 +604,10 @@ class P2PMessagingClient(val config: NodeConfiguration, sendBridgeCreateMessage() delayStartQueues -= queueName } else { - val queueQuery = session.queueQuery(SimpleString(queueName)) + val queueQuery = session.queueQuery(SimpleString.of(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session.createQueue(QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName) + session.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName) .setDurable(true).setAutoCreated(false) .setMaxConsumers(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()) .setPurgeOnNoConsumers(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers()) diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt index 9d50bc72d3..20d16d996a 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt @@ -321,14 +321,14 @@ class RPCServer( require(notificationType == CoreNotificationType.BINDING_REMOVED.name){"Message contained notification type of $notificationType instead of expected ${CoreNotificationType.BINDING_REMOVED.name}"} val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME) log.info("Detected RPC client disconnect on address $clientAddress, scheduling for reaping") - invalidateClient(SimpleString(clientAddress)) + invalidateClient(SimpleString.of(clientAddress)) } private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) { lifeCycle.requireState(State.STARTED) val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) require(notificationType == CoreNotificationType.BINDING_ADDED.name){"Message contained notification type of $notificationType instead of expected ${CoreNotificationType.BINDING_ADDED.name}"} - val clientAddress = SimpleString(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)) + val clientAddress = SimpleString.of(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)) log.debug("RPC client queue created on address $clientAddress") val buffer = stopBuffering(clientAddress) diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt index 14da13763b..66446e76d1 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt @@ -39,7 +39,7 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, queueConfigs = queueConfigurations() - managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS) + managementNotificationAddress = SimpleString.of(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS) addressSettings = mapOf( "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply { maxSizeBytes = 5L * maxMessageSize @@ -51,7 +51,7 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, globalMaxSize = Runtime.getRuntime().maxMemory() / 8 initialiseSettings(maxMessageSize, journalBufferTimeout) - val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true) + val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true, false, false) val addRPCRoleToUsers = if (shouldStartLocalShell) listOf(INTERNAL_SHELL_USER) else emptyList() val rolesAdderOnLogin = RolesAdderOnLogin(addRPCRoleToUsers) { username -> @@ -127,12 +127,12 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, } private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): QueueConfiguration { - return QueueConfiguration(name).setAddress(address).setFilterString(filter).setDurable(durable) + return QueueConfiguration.of(name).setAddress(address).setFilterString(filter).setDurable(durable) } private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false, deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role { - return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue) + return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue, false, false) } } \ No newline at end of file diff --git a/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt b/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt index d5a09d413a..07507ae219 100644 --- a/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt +++ b/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt @@ -1276,7 +1276,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi ) factory2.register(net.corda.serialization.internal.amqp.custom.SimpleStringSerializer) - val obj = SimpleString("Bob") + val obj = SimpleString.of("Bob") serdes(obj, factory, factory2) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 0abc936e6d..d4aaf87130 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -192,16 +192,16 @@ data class RPCDriverDSL( private fun ConfigurationImpl.configureCommonSettings(maxFileSize: Int, maxBufferedBytesPerClient: Long) { name = "RPCDriver" - managementNotificationAddress = SimpleString(NOTIFICATION_ADDRESS) + managementNotificationAddress = SimpleString.of(NOTIFICATION_ADDRESS) isPopulateValidatedUser = true journalBufferSize_NIO = maxFileSize journalBufferSize_AIO = maxFileSize journalFileSize = maxFileSize queueConfigs = listOf( - QueueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME).setAddress(RPCApi.RPC_SERVER_QUEUE_NAME).setDurable(false), - QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_REMOVALS).setAddress(NOTIFICATION_ADDRESS) + QueueConfiguration.of(RPCApi.RPC_SERVER_QUEUE_NAME).setAddress(RPCApi.RPC_SERVER_QUEUE_NAME).setDurable(false), + QueueConfiguration.of(RPCApi.RPC_CLIENT_BINDING_REMOVALS).setAddress(NOTIFICATION_ADDRESS) .setFilterString(RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION).setDurable(false), - QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_ADDITIONS).setAddress(NOTIFICATION_ADDRESS) + QueueConfiguration.of(RPCApi.RPC_CLIENT_BINDING_ADDITIONS).setAddress(NOTIFICATION_ADDRESS) .setFilterString(RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION).setDurable(false) ) addressSettings = mapOf(