From 365c1e6ad2e423b7b0c07f176edef5ff04321ed0 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 20 Jul 2018 14:18:28 +0100 Subject: [PATCH] Revert "Remove non-compiling tests" This reverts commit 935c4d2. To make the test compile two internal fields in node where made public. It seems that the "internal" modifier does not see integration tests as part of the same module. --- .../messaging/ArtemisMessagingTest.kt | 164 +++++++++++++++++- .../services/messaging/MessagingExecutor.kt | 2 +- .../services/messaging/P2PMessagingClient.kt | 4 +- 3 files changed, 166 insertions(+), 4 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 1d230dc672..e8f089a4b0 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -62,7 +62,6 @@ class ArtemisMessagingTest { @JvmField val temporaryFolder = TemporaryFolder() - // THe private val portAllocation = PortAllocation.Incremental(10000) private val serverPort = portAllocation.nextPort() private val identity = generateKeyPair() @@ -193,6 +192,169 @@ class ArtemisMessagingTest { assertThat(received.platformVersion).isEqualTo(3) } + @Test + fun `we can fake send and receive`() { + val (messagingClient, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient.deliver(fakeMsg) + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + } + + @Test + fun `redelivery from same client is ignored`() { + val (messagingClient, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient.deliver(fakeMsg) + messagingClient.deliver(fakeMsg) + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } + + // Redelivery from a sender who stops and restarts (some re-sends from the sender, with sender state reset with exception of recovered checkpoints) + @Test + fun `re-send from different client is ignored`() { + val (messagingClient1, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient1.deliver(fakeMsg) + + // Now change the sender + try { + val messagingClient2 = createMessagingClient() + startNodeMessagingClient() + + val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + + messagingClient1.deliver(fakeMsg2) + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } finally { + messagingClient1.stop() + } + } + + // Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset) + @Test + fun `re-receive from different client is ignored`() { + val (messagingClient1, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient1.deliver(fakeMsg) + + // Now change the receiver + try { + val messagingClient2 = createMessagingClient() + messagingClient2.addMessageHandler(TOPIC) { msg, _, handle -> + database.transaction { handle.insideDatabaseTransaction() } + handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] + receivedMessages.add(msg) + } + startNodeMessagingClient() + + messagingClient2.deliver(fakeMsg) + + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } finally { + messagingClient1.stop() + } + } + + // Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset), but the original + // messages were recorded as consumed out of order, and only the *second* message was acked. + @Test + fun `re-receive from different client is not ignored when acked out of order`() { + // Don't ack first message, pretend we exit before that happens (but after second message is acked). + val (messagingClient1, receivedMessages) = createAndStartClientAndServer(dontAckCondition = { received -> String(received.data.bytes, Charsets.UTF_8) == "first msg" }) + val message1 = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val message2 = messagingClient1.createMessage(TOPIC, data = "second msg".toByteArray()) + val fakeMsg1 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message1) + val fakeMsg2 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message2) + fakeMsg1!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + + messagingClient1.deliver(fakeMsg1) + messagingClient1.deliver(fakeMsg2) + + // Now change the receiver + try { + val messagingClient2 = createMessagingClient() + messagingClient2.addMessageHandler(TOPIC) { msg, _, handle -> + // The try-finally causes the test to fail if there's a duplicate insert (which, naturally, is an error but otherwise gets swallowed). + try { + database.transaction { handle.insideDatabaseTransaction() } + } finally { + handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] + receivedMessages.add(msg) + } + } + startNodeMessagingClient() + + messagingClient2.deliver(fakeMsg1) + messagingClient2.deliver(fakeMsg2) + + // Should receive 2 and then 1 (and not 2 again). + val received = receivedMessages.take() + assertThat(received.senderSeqNo).isEqualTo(1) + val received2 = receivedMessages.poll() + assertThat(received2.senderSeqNo).isEqualTo(0) + val received3 = receivedMessages.poll() + assertThat(received3).isNull() + } finally { + messagingClient1.stop() + } + } + + // Re-receive on different client from re-started sender + @Test + fun `re-send from different client and re-receive from different client is ignored`() { + val (messagingClient1, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient1.deliver(fakeMsg) + + // Now change the send *and* receiver + val messagingClient2 = createMessagingClient() + try { + startNodeMessagingClient() + val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + + val messagingClient3 = createMessagingClient() + messagingClient3.addMessageHandler(TOPIC) { msg, _, handle -> + database.transaction { handle.insideDatabaseTransaction() } + handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] + receivedMessages.add(msg) + } + startNodeMessagingClient() + + messagingClient3.deliver(fakeMsg2) + + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } finally { + messagingClient1.stop() + messagingClient2.stop() + } + } + private fun startNodeMessagingClient() { messagingClient!!.start() } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 9c554f08a0..2c4e3097d7 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -165,7 +165,7 @@ class MessagingExecutor( producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) }) } - internal fun cordaToArtemisMessage(message: Message): ClientMessage? { + fun cordaToArtemisMessage(message: Message): ClientMessage? { return session.createMessage(true).apply { putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 8bb0c2a716..1ec6d171a8 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -145,7 +145,7 @@ class P2PMessagingClient(val config: NodeConfiguration, private val handlers = ConcurrentHashMap() private val deduplicator = P2PMessageDeduplicator(database) - internal var messagingExecutor: MessagingExecutor? = null + var messagingExecutor: MessagingExecutor? = null fun start() { state.locked { @@ -378,7 +378,7 @@ class P2PMessagingClient(val config: NodeConfiguration, override fun toString() = "$topic#$data" } - internal fun deliver(artemisMessage: ClientMessage) { + fun deliver(artemisMessage: ClientMessage) { artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> if (!deduplicator.isDuplicate(cordaMessage)) { deduplicator.signalMessageProcessStart(cordaMessage)