CORDA-1098 - Fixed Artemis filters for flows draining mode. (#2612)

This commit is contained in:
Michele Sollecito 2018-02-22 17:55:50 +00:00 committed by Katelyn Baker
parent de577904eb
commit a4f67602dd
2 changed files with 8 additions and 7 deletions

View File

@ -18,8 +18,9 @@ import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.test.fail import kotlin.test.fail
class P2PFlowsDrainingModeTest { class P2PFlowsDrainingModeTest {
@ -28,7 +29,7 @@ class P2PFlowsDrainingModeTest {
private val user = User("mark", "dadada", setOf(Permissions.all())) private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user) private val users = listOf(user)
private var executor: ExecutorService? = null private var executor: ScheduledExecutorService? = null
companion object { companion object {
private val logger = loggerFor<P2PFlowsDrainingModeTest>() private val logger = loggerFor<P2PFlowsDrainingModeTest>()
@ -36,7 +37,7 @@ class P2PFlowsDrainingModeTest {
@Before @Before
fun setup() { fun setup() {
executor = Executors.newSingleThreadExecutor() executor = Executors.newSingleThreadScheduledExecutor()
} }
@After @After
@ -59,11 +60,11 @@ class P2PFlowsDrainingModeTest {
initiating.apply { initiating.apply {
val flow = startFlow(::InitiateSessionFlow, counterParty) val flow = startFlow(::InitiateSessionFlow, counterParty)
// this should be really fast, for the flow has already started, so 5 seconds should never be a problem // this should be really fast, for the flow has already started, so 5 seconds should never be a problem
executor!!.submit({ executor!!.schedule({
logger.info("Now disabling flows draining mode for $counterParty.") logger.info("Now disabling flows draining mode for $counterParty.")
shouldFail = false shouldFail = false
initiated.setFlowsDrainingModeEnabled(false) initiated.setFlowsDrainingModeEnabled(false)
}) }, 5, TimeUnit.SECONDS)
flow.returnValue.map { result -> flow.returnValue.map { result ->
if (shouldFail) { if (shouldFail) {
fail("Shouldn't happen until flows draining mode is switched off.") fail("Shouldn't happen until flows draining mode is switched off.")

View File

@ -668,8 +668,8 @@ private class P2PMessagingConsumer(
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport { private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
private companion object { private companion object {
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}=${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}" private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}" private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
} }
private var startedFlag = false private var startedFlag = false