mirror of
https://github.com/corda/corda.git
synced 2025-04-19 08:36:39 +00:00
Merge pull request #1288 from corda/shams-fix-broken-test
Revert "Remove non-compiling tests"
This commit is contained in:
commit
b44fc94997
@ -62,7 +62,6 @@ class ArtemisMessagingTest {
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
// THe
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
private val serverPort = portAllocation.nextPort()
|
||||
private val identity = generateKeyPair()
|
||||
@ -193,6 +192,169 @@ class ArtemisMessagingTest {
|
||||
assertThat(received.platformVersion).isEqualTo(3)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `we can fake send and receive`() {
|
||||
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient.deliver(fakeMsg)
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `redelivery from same client is ignored`() {
|
||||
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient.deliver(fakeMsg)
|
||||
messagingClient.deliver(fakeMsg)
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
}
|
||||
|
||||
// Redelivery from a sender who stops and restarts (some re-sends from the sender, with sender state reset with exception of recovered checkpoints)
|
||||
@Test
|
||||
fun `re-send from different client is ignored`() {
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient1.deliver(fakeMsg)
|
||||
|
||||
// Now change the sender
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
startNodeMessagingClient()
|
||||
|
||||
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
messagingClient1.deliver(fakeMsg2)
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset)
|
||||
@Test
|
||||
fun `re-receive from different client is ignored`() {
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient1.deliver(fakeMsg)
|
||||
|
||||
// Now change the receiver
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient2.deliver(fakeMsg)
|
||||
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset), but the original
|
||||
// messages were recorded as consumed out of order, and only the *second* message was acked.
|
||||
@Test
|
||||
fun `re-receive from different client is not ignored when acked out of order`() {
|
||||
// Don't ack first message, pretend we exit before that happens (but after second message is acked).
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer(dontAckCondition = { received -> String(received.data.bytes, Charsets.UTF_8) == "first msg" })
|
||||
val message1 = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val message2 = messagingClient1.createMessage(TOPIC, data = "second msg".toByteArray())
|
||||
val fakeMsg1 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message1)
|
||||
val fakeMsg2 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message2)
|
||||
fakeMsg1!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
messagingClient1.deliver(fakeMsg1)
|
||||
messagingClient1.deliver(fakeMsg2)
|
||||
|
||||
// Now change the receiver
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
// The try-finally causes the test to fail if there's a duplicate insert (which, naturally, is an error but otherwise gets swallowed).
|
||||
try {
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
} finally {
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient2.deliver(fakeMsg1)
|
||||
messagingClient2.deliver(fakeMsg2)
|
||||
|
||||
// Should receive 2 and then 1 (and not 2 again).
|
||||
val received = receivedMessages.take()
|
||||
assertThat(received.senderSeqNo).isEqualTo(1)
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2.senderSeqNo).isEqualTo(0)
|
||||
val received3 = receivedMessages.poll()
|
||||
assertThat(received3).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Re-receive on different client from re-started sender
|
||||
@Test
|
||||
fun `re-send from different client and re-receive from different client is ignored`() {
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
|
||||
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
messagingClient1.deliver(fakeMsg)
|
||||
|
||||
// Now change the send *and* receiver
|
||||
val messagingClient2 = createMessagingClient()
|
||||
try {
|
||||
startNodeMessagingClient()
|
||||
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
val messagingClient3 = createMessagingClient()
|
||||
messagingClient3.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient3.deliver(fakeMsg2)
|
||||
|
||||
val received = receivedMessages.take()
|
||||
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
messagingClient2.stop()
|
||||
}
|
||||
}
|
||||
|
||||
private fun startNodeMessagingClient() {
|
||||
messagingClient!!.start()
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ class MessagingExecutor(
|
||||
producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) })
|
||||
}
|
||||
|
||||
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
||||
fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
||||
return session.createMessage(true).apply {
|
||||
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
|
||||
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
|
||||
|
@ -145,7 +145,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val handlers = ConcurrentHashMap<String, MessageHandler>()
|
||||
|
||||
private val deduplicator = P2PMessageDeduplicator(database)
|
||||
internal var messagingExecutor: MessagingExecutor? = null
|
||||
var messagingExecutor: MessagingExecutor? = null
|
||||
|
||||
fun start() {
|
||||
state.locked {
|
||||
@ -378,7 +378,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
override fun toString() = "$topic#$data"
|
||||
}
|
||||
|
||||
internal fun deliver(artemisMessage: ClientMessage) {
|
||||
fun deliver(artemisMessage: ClientMessage) {
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
|
Loading…
x
Reference in New Issue
Block a user