Adding optional delay to AMQ message sending via system properties (#185)

This commit is contained in:
Richard Green 2017-01-30 14:39:40 +00:00 committed by GitHub
parent 7b57cbe351
commit e54d6388fd
2 changed files with 39 additions and 20 deletions

View File

@ -468,18 +468,30 @@ open class DriverDSL(
else
""
val javaArgs = listOf(
path,
"-Dname=${nodeConf.myLegalName}",
"-javaagent:$quasarJarPath",
debugPortArg,
"-Dvisualvm.display.name=Corda",
"-Xmx200m",
"-XX:+UseG1GC",
"-cp", classpath,
className,
"--base-directory=${nodeConf.baseDirectory}"
).filter(String::isNotEmpty)
val additionalKeys = listOf("amq.delivery.delay.ms")
val systemArgs = mutableMapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "Corda"
)
for (key in additionalKeys) {
if (System.getProperty(key) != null) {
systemArgs.set(key, System.getProperty(key))
}
}
val javaArgs = listOf(path) +
systemArgs.map { "-D${it.key}=${it.value}" } +
listOf(
"-javaagent:$quasarJarPath",
debugPortArg,
"-Xmx200m",
"-XX:+UseG1GC",
"-cp", classpath,
className,
"--base-directory=${nodeConf.baseDirectory}"
).filter(String::isNotEmpty)
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()

View File

@ -15,10 +15,10 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.bouncycastle.asn1.x500.X500Name
@ -67,6 +67,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// confusion.
const val TOPIC_PROPERTY = "platform-topic"
const val SESSION_ID_PROPERTY = "session-id"
val AMQ_DELAY = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
}
private class InnerState {
@ -102,12 +103,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
fun start(rpcOps: RPCOps, userService: RPCUserService) {
state.locked {
@ -368,6 +369,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
writeBodyBufferBytes(message.data)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (AMQ_DELAY > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + AMQ_DELAY);
}
}
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " +
"uuid: ${message.uniqueMessageId}")
@ -376,6 +382,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun getMQAddress(target: MessageRecipients): String {
return if (target == myAddress) {
// If we are sending to ourselves then route the message directly to our P2P queue.