Added tests to make sure the platform version is correctly available

This commit is contained in:
Shams Asari
2017-11-24 14:32:32 +00:00
parent 2ceb6283af
commit 4ca54b73fe
12 changed files with 125 additions and 85 deletions

View File

@ -10,7 +10,7 @@ import net.corda.node.internal.NodeStartup
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.ProjectStructure.projectRootDir
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy

View File

@ -190,11 +190,13 @@ fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): CordaFutu
return messageFuture
}
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(TopicSession(topic, sessionID), payload, to, uuid)
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) {
send(TopicSession(topic, sessionID), payload, to, uuid)
}
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null)
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId)
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null) {
send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId)
}
interface MessageHandlerRegistration

View File

@ -1,9 +1,6 @@
package net.corda.node.services.messaging
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
@ -27,13 +24,13 @@ import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.net.ServerSocket
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertNull
//TODO This needs to be merged into P2PMessagingTest as that creates a more realistic environment
class ArtemisMessagingTests {
companion object {
const val TOPIC = "platform.self"
@ -54,21 +51,19 @@ class ArtemisMessagingTests {
private lateinit var config: NodeConfiguration
private lateinit var database: CordaPersistence
private lateinit var userService: RPCUserService
private lateinit var networkMapRegistrationFuture: CordaFuture<Unit>
private var messagingClient: P2PMessagingClient? = null
private var messagingServer: ArtemisMessagingServer? = null
private lateinit var networkMapCache: NetworkMapCacheImpl
@Before
fun setUp() {
val baseDirectory = temporaryFolder.root.toPath()
userService = RPCUserServiceImpl(emptyList())
config = testNodeConfiguration(
baseDirectory = baseDirectory,
baseDirectory = temporaryFolder.root.toPath(),
myLegalName = ALICE.name)
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
networkMapRegistrationFuture = doneFuture(Unit)
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database), rigorousMock())
}
@ -76,8 +71,6 @@ class ArtemisMessagingTests {
fun cleanUp() {
messagingClient?.stop()
messagingServer?.stop()
messagingClient = null
messagingServer = null
database.close()
LogHelper.reset(PersistentUniquenessProvider::class)
}
@ -120,9 +113,7 @@ class ArtemisMessagingTests {
@Test
fun `client should be able to send message to itself`() {
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
@ -132,76 +123,45 @@ class ArtemisMessagingTests {
}
@Test
fun `client should be able to send message to itself before network map is available, and receive after`() {
val settableFuture = openFuture<Unit>()
networkMapRegistrationFuture = settableFuture
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
fun `platform version is included in the message`() {
val (messagingClient, receivedMessages) = createAndStartClientAndServer(platformVersion = 3)
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
settableFuture.set(Unit)
val firstActual: Message = receivedMessages.take()
assertEquals("first msg", String(firstActual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
@Test
fun `client should be able to send large numbers of messages to itself before network map is available and survive restart, then receive messages`() {
// Crank the iteration up as high as you want... just takes longer to run.
val iterations = 100
networkMapRegistrationFuture = openFuture()
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val message = messagingClient.createMessage(TOPIC, data = "first msg $iter".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
}
// Stop client and server and create afresh.
messagingClient.stop()
messagingServer?.stop()
networkMapRegistrationFuture = doneFuture(Unit)
createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val firstActual: Message = receivedMessages.take()
assertThat(String(firstActual.data)).isEqualTo("first msg $iter")
}
assertNull(receivedMessages.poll(200, MILLISECONDS))
val received = receivedMessages.take()
assertThat(received.platformVersion).isEqualTo(3)
}
private fun startNodeMessagingClient() {
messagingClient!!.start()
}
private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue<Message>): P2PMessagingClient {
private fun createAndStartClientAndServer(platformVersion: Int = 1): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
val receivedMessages = LinkedBlockingQueue<ReceivedMessage>()
createMessagingServer().start()
val messagingClient = createMessagingClient()
val messagingClient = createMessagingClient(platformVersion = platformVersion)
startNodeMessagingClient()
messagingClient.addMessageHandler(TOPIC) { message, _ ->
receivedMessages.add(message)
}
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread { messagingClient.run() }
return messagingClient
thread(isDaemon = true) { messagingClient.run() }
return Pair(messagingClient, receivedMessages)
}
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort)): P2PMessagingClient {
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort), platformVersion: Int = 1): P2PMessagingClient {
return database.transaction {
P2PMessagingClient(
config,
MOCK_VERSION_INFO,
MOCK_VERSION_INFO.copy(platformVersion = platformVersion),
server,
identity.public,
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database).apply {
database
).apply {
config.configureWithDevSSLCertificate()
messagingClient = this
}