mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
cleanup of use of messaging service
This commit is contained in:
parent
8fdfed9392
commit
66eec9c2b2
@ -74,7 +74,7 @@ fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? =
|
||||
}
|
||||
}
|
||||
|
||||
fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any) {
|
||||
fun MessagingService.send(topic: String, obj: Any, to: MessageRecipients) {
|
||||
send(createMessage(topic, obj.serialize().bits), to)
|
||||
}
|
||||
|
||||
|
@ -127,7 +127,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
|
||||
if (!awaitingObjectOfType.isInstance(obj))
|
||||
throw ClassCastException("Received message of unexpected type: ${obj.javaClass.name} vs ${awaitingObjectOfType.name}")
|
||||
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
|
||||
iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpointKey) {
|
||||
iterateStateMachine(psm, logger, obj, checkpointKey) {
|
||||
try {
|
||||
Fiber.unparkDeserialized(it, scheduler)
|
||||
} catch(e: Throwable) {
|
||||
@ -167,7 +167,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
|
||||
// Need to add before iterating in case of immediate completion
|
||||
stateMachines.add(logic)
|
||||
executor.executeASAP {
|
||||
iterateStateMachine(fiber, serviceHub.networkService, logger, null, null) {
|
||||
iterateStateMachine(fiber, logger, null, null) {
|
||||
it.start()
|
||||
}
|
||||
totalStartedProtocols.inc()
|
||||
@ -190,24 +190,27 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
|
||||
return key
|
||||
}
|
||||
|
||||
private fun iterateStateMachine(psm: ProtocolStateMachine<*>, net: MessagingService, logger: Logger,
|
||||
obj: Any?, prevCheckpointKey: SecureHash?, resumeFunc: (ProtocolStateMachine<*>) -> Unit) {
|
||||
private fun iterateStateMachine(psm: ProtocolStateMachine<*>,
|
||||
logger: Logger,
|
||||
obj: Any?,
|
||||
prevCheckpointKey: SecureHash?,
|
||||
resumeFunc: (ProtocolStateMachine<*>) -> Unit) {
|
||||
executor.checkOnThread()
|
||||
val onSuspend = fun(request: FiberRequest, serFiber: ByteArray) {
|
||||
// We have a request to do something: send, receive, or send-and-receive.
|
||||
if (request is FiberRequest.ExpectingResponse<*>) {
|
||||
// Prepare a listener on the network that runs in the background thread when we received a message.
|
||||
checkpointAndSetupMessageHandler(logger, net, psm, request, prevCheckpointKey, serFiber)
|
||||
checkpointAndSetupMessageHandler(logger, psm, request, prevCheckpointKey, serFiber)
|
||||
}
|
||||
// If an object to send was provided (not null), send it now.
|
||||
request.obj?.let {
|
||||
val topic = "${request.topic}.${request.sessionIDForSend}"
|
||||
logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" }
|
||||
net.send(net.createMessage(topic, it.serialize().bits), request.destination!!)
|
||||
serviceHub.networkService.send(topic, it, request.destination!!)
|
||||
}
|
||||
if (request is FiberRequest.NotExpectingResponse) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
iterateStateMachine(psm, net, logger, null, prevCheckpointKey) {
|
||||
iterateStateMachine(psm, logger, null, prevCheckpointKey) {
|
||||
try {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
} catch(e: Throwable) {
|
||||
@ -230,8 +233,10 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkpointAndSetupMessageHandler(logger: Logger, net: MessagingService, psm: ProtocolStateMachine<*>,
|
||||
request: FiberRequest.ExpectingResponse<*>, prevCheckpointKey: SecureHash?,
|
||||
private fun checkpointAndSetupMessageHandler(logger: Logger,
|
||||
psm: ProtocolStateMachine<*>,
|
||||
request: FiberRequest.ExpectingResponse<*>,
|
||||
prevCheckpointKey: SecureHash?,
|
||||
serialisedFiber: ByteArray) {
|
||||
executor.checkOnThread()
|
||||
val topic = "${request.topic}.${request.sessionIDForReceive}"
|
||||
@ -241,7 +246,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
|
||||
val newCheckpointKey = curPersistedBytes.sha256()
|
||||
logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" }
|
||||
val consumed = AtomicBoolean()
|
||||
net.runOnNextMessage(topic, executor) { netMsg ->
|
||||
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
|
||||
// Some assertions to ensure we don't execute on the wrong thread or get executed more than once.
|
||||
executor.checkOnThread()
|
||||
check(netMsg.topic == topic) { "Topic mismatch: ${netMsg.topic} vs $topic" }
|
||||
@ -256,7 +261,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
|
||||
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
|
||||
if (!request.responseType.isInstance(obj))
|
||||
throw IllegalStateException("Expected message of type ${request.responseType.name} but got ${obj.javaClass.name}", request.stackTraceInCaseOfProblems)
|
||||
iterateStateMachine(psm, net, logger, obj, newCheckpointKey) {
|
||||
iterateStateMachine(psm, logger, obj, newCheckpointKey) {
|
||||
try {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
} catch(e: Throwable) {
|
||||
|
@ -9,8 +9,11 @@ import core.crypto.SecureHash
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.StateMachineManager
|
||||
import core.messaging.runOnNextMessage
|
||||
import core.messaging.send
|
||||
import core.node.NodeInfo
|
||||
import core.node.services.*
|
||||
import core.node.services.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
|
||||
import core.node.services.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
|
||||
import core.random63BitValue
|
||||
import core.serialization.deserialize
|
||||
import core.serialization.serialize
|
||||
@ -152,13 +155,13 @@ open class InMemoryNetworkMapCache() : NetworkMapCache {
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = SettableFuture.create<Unit>()
|
||||
net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC + "." + sessionID, MoreExecutors.directExecutor()) { message ->
|
||||
net.runOnNextMessage("$FETCH_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message ->
|
||||
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
|
||||
// We may not receive any nodes back, if the map hasn't changed since the version specified
|
||||
resp.nodes?.forEach { processRegistration(it) }
|
||||
future.set(Unit)
|
||||
}
|
||||
net.send(net.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC + ".0", req.serialize().bits), service.address)
|
||||
net.send("$FETCH_PROTOCOL_TOPIC.0", req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
@ -184,7 +187,7 @@ open class InMemoryNetworkMapCache() : NetworkMapCache {
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = SettableFuture.create<Unit>()
|
||||
net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC + "." + sessionID, MoreExecutors.directExecutor()) { message ->
|
||||
net.runOnNextMessage("$SUBSCRIPTION_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message ->
|
||||
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
|
||||
if (resp.confirmed) {
|
||||
future.set(Unit)
|
||||
@ -192,7 +195,7 @@ open class InMemoryNetworkMapCache() : NetworkMapCache {
|
||||
future.setException(NetworkCacheError.DeregistrationFailed())
|
||||
}
|
||||
}
|
||||
net.send(net.createMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC + ".0", req.serialize().bits), service.address)
|
||||
net.send("$SUBSCRIPTION_PROTOCOL_TOPIC.0", req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ class InMemoryMessagingTests {
|
||||
val node1 = network.createNode()
|
||||
var node2 = network.createNode()
|
||||
|
||||
node1.net.send("test.topic", node2.info.address, "hello!")
|
||||
node1.net.send("test.topic", "hello!", node2.info.address)
|
||||
network.runNetwork(rounds = 1) // No handler registered, so the message goes into a holding area.
|
||||
var runCount = 0
|
||||
node2.net.addMessageHandler("test.topic") { msg, registration ->
|
||||
@ -103,8 +103,8 @@ class InMemoryMessagingTests {
|
||||
// Shut node2 down for a while. Node 1 keeps sending it messages though.
|
||||
node2.stop()
|
||||
|
||||
node1.net.send("test.topic", node2.info.address, "are you there?")
|
||||
node1.net.send("test.topic", node2.info.address, "wake up!")
|
||||
node1.net.send("test.topic", "are you there?", node2.info.address)
|
||||
node1.net.send("test.topic", "wake up!", node2.info.address)
|
||||
|
||||
// Now re-create node2 with the same address as last time, and re-register a message handler.
|
||||
// Check that the messages that were sent whilst it was gone are still there, waiting for it.
|
||||
|
Loading…
Reference in New Issue
Block a user