mirror of
https://github.com/corda/corda.git
synced 2025-04-20 17:11:26 +00:00
Revert "Remove non-compiling tests"
This reverts commit 935c4d2. To make the test compile two internal fields in node where made public. It seems that the "internal" modifier does not see integration tests as part of the same module.
This commit is contained in:
parent
fae30c8703
commit
365c1e6ad2
@ -62,7 +62,6 @@ class ArtemisMessagingTest {
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
// THe
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
private val serverPort = portAllocation.nextPort()
|
||||
private val identity = generateKeyPair()
|
||||
@ -193,6 +192,169 @@ class ArtemisMessagingTest {
|
||||
assertThat(received.platformVersion).isEqualTo(3)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `we can fake send and receive`() {
|
||||
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient.deliver(fakeMsg)
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `redelivery from same client is ignored`() {
|
||||
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient.deliver(fakeMsg)
|
||||
messagingClient.deliver(fakeMsg)
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
}
|
||||
|
||||
// Redelivery from a sender who stops and restarts (some re-sends from the sender, with sender state reset with exception of recovered checkpoints)
|
||||
@Test
|
||||
fun `re-send from different client is ignored`() {
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient1.deliver(fakeMsg)
|
||||
|
||||
// Now change the sender
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
startNodeMessagingClient()
|
||||
|
||||
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
messagingClient1.deliver(fakeMsg2)
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset)
|
||||
@Test
|
||||
fun `re-receive from different client is ignored`() {
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient1.deliver(fakeMsg)
|
||||
|
||||
// Now change the receiver
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient2.deliver(fakeMsg)
|
||||
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset), but the original
|
||||
// messages were recorded as consumed out of order, and only the *second* message was acked.
|
||||
@Test
|
||||
fun `re-receive from different client is not ignored when acked out of order`() {
|
||||
// Don't ack first message, pretend we exit before that happens (but after second message is acked).
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer(dontAckCondition = { received -> String(received.data.bytes, Charsets.UTF_8) == "first msg" })
|
||||
val message1 = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val message2 = messagingClient1.createMessage(TOPIC, data = "second msg".toByteArray())
|
||||
val fakeMsg1 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message1)
|
||||
val fakeMsg2 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message2)
|
||||
fakeMsg1!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
messagingClient1.deliver(fakeMsg1)
|
||||
messagingClient1.deliver(fakeMsg2)
|
||||
|
||||
// Now change the receiver
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
// The try-finally causes the test to fail if there's a duplicate insert (which, naturally, is an error but otherwise gets swallowed).
|
||||
try {
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
} finally {
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient2.deliver(fakeMsg1)
|
||||
messagingClient2.deliver(fakeMsg2)
|
||||
|
||||
// Should receive 2 and then 1 (and not 2 again).
|
||||
val received = receivedMessages.take()
|
||||
assertThat(received.senderSeqNo).isEqualTo(1)
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2.senderSeqNo).isEqualTo(0)
|
||||
val received3 = receivedMessages.poll()
|
||||
assertThat(received3).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Re-receive on different client from re-started sender
|
||||
@Test
|
||||
fun `re-send from different client and re-receive from different client is ignored`() {
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient1.deliver(fakeMsg)
|
||||
|
||||
// Now change the send *and* receiver
|
||||
val messagingClient2 = createMessagingClient()
|
||||
try {
|
||||
startNodeMessagingClient()
|
||||
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
val messagingClient3 = createMessagingClient()
|
||||
messagingClient3.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient3.deliver(fakeMsg2)
|
||||
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
messagingClient2.stop()
|
||||
}
|
||||
}
|
||||
|
||||
private fun startNodeMessagingClient() {
|
||||
messagingClient!!.start()
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ class MessagingExecutor(
|
||||
producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) })
|
||||
}
|
||||
|
||||
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
||||
fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
||||
return session.createMessage(true).apply {
|
||||
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
|
||||
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
|
||||
|
@ -145,7 +145,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val handlers = ConcurrentHashMap<String, MessageHandler>()
|
||||
|
||||
private val deduplicator = P2PMessageDeduplicator(database)
|
||||
internal var messagingExecutor: MessagingExecutor? = null
|
||||
var messagingExecutor: MessagingExecutor? = null
|
||||
|
||||
fun start() {
|
||||
state.locked {
|
||||
@ -378,7 +378,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
override fun toString() = "$topic#$data"
|
||||
}
|
||||
|
||||
internal fun deliver(artemisMessage: ClientMessage) {
|
||||
fun deliver(artemisMessage: ClientMessage) {
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
|
Loading…
x
Reference in New Issue
Block a user