Break out message handler changes

Special serializer for kotlin object definitions
This commit is contained in:
rick.parker 2016-06-21 11:18:23 +01:00
parent 859ee053d2
commit 717a5ab197
8 changed files with 56 additions and 18 deletions

View File

@ -1,6 +1,7 @@
package com.r3corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef
import com.r3corda.core.serialization.serialize
import java.time.Instant
import java.util.concurrent.Executor
@ -133,3 +134,9 @@ interface MessageRecipientGroup : MessageRecipients
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
interface AllPossibleRecipients : MessageRecipients
/**
* A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement
* from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response.
*/
object Ack : DeserializeAsKotlinObjectDef

View File

@ -37,7 +37,6 @@ import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.reflect.*
import kotlin.reflect.jvm.javaType
import java.security.PrivateKey
/**
* Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead
@ -305,6 +304,19 @@ object Ed25519PublicKeySerializer : Serializer<EdDSAPublicKey>() {
}
}
/** Marker interface for kotlin object definitions so that they are deserialized as the singleton instance. */
interface DeserializeAsKotlinObjectDef
/** Serializer to deserialize kotlin object definitions marked with [DeserializeAsKotlinObjectDef]. */
object KotlinObjectSerializer : Serializer<DeserializeAsKotlinObjectDef>() {
override fun read(kryo: Kryo, input: Input, type: Class<DeserializeAsKotlinObjectDef>): DeserializeAsKotlinObjectDef {
// read the public static INSTANCE field that kotlin compiler generates.
return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef
}
override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {
}
}
fun createKryo(k: Kryo = Kryo()): Kryo {
return k.apply {
@ -354,6 +366,9 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
// This ensures a NonEmptySetSerializer is constructed with an initial value.
register(NonEmptySet::class.java, NonEmptySetSerializer)
/** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */
addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer)
noReferencesWithin<WireTransaction>()
}

View File

@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SignedData
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.TimestampChecker
@ -55,7 +56,7 @@ object NotaryProtocol {
val receiveSessionID = random63BitValue()
val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID)
sendAndReceive<Unit>(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake)
sendAndReceive<Ack>(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake)
val request = SignRequest(wtx.serialized, serviceHub.storageService.myLegalIdentity)
val response = sendAndReceive<Result>(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request)

View File

@ -5,6 +5,7 @@ import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic
@ -104,7 +105,7 @@ object NotaryChangeProtocol {
val proposal = Proposal(originalState.ref, newNotary, stx)
val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive)
sendAndReceive<Unit>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
sendAndReceive<Ack>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal)
val participantSignature = response.validate {

View File

@ -3,6 +3,7 @@ package com.r3corda.core.serialization
import com.google.common.primitives.Ints
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.Ack
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
@ -73,6 +74,14 @@ class KryoTests {
assertThatThrownBy { deserialisedSignature.verifyWithECDSA(wrongBits) }
}
@Test
fun `write and read Ack`() {
val tokenizableBefore = Ack
val serializedBytes = tokenizableBefore.serialize(kryo)
val tokenizableAfter = serializedBytes.deserialize(kryo)
assertThat(tokenizableAfter).isSameAs(tokenizableBefore)
}
private data class Person(val name: String, val birthday: Instant?)
@Suppress("unused")

View File

@ -1,5 +1,6 @@
package com.r3corda.node.services
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.node.services.api.AbstractNodeService
@ -17,11 +18,12 @@ class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) :
)
}
private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake) {
private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake): Ack {
val protocol = NotaryChangeProtocol.Acceptor(
req.replyTo as SingleMessageRecipient,
req.sessionID!!,
req.sessionIdForSend)
smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
return Ack
}
}

View File

@ -3,6 +3,7 @@ package com.r3corda.node.services.api
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.protocols.AbstractRequestMessage
@ -12,14 +13,15 @@ import javax.annotation.concurrent.ThreadSafe
* Abstract superclass for services that a node can host, which provides helper functions.
*/
@ThreadSafe
abstract class AbstractNodeService(val net: MessagingService) {
abstract class AbstractNodeService(val net: MessagingService) : SingletonSerializeAsToken() {
/**
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
* common boilerplate code. Exceptions are caught and passed to the provided consumer.
* common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple
* acknowledgement response with no content, use [com.r3corda.core.messaging.Ack]
*
* @param topic the topic, without the default session ID postfix (".0)
* @param handler a function to handle the deserialised request and return a response
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
* @param exceptionConsumer a function to which any thrown exception is passed.
*/
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
@ -30,8 +32,11 @@ abstract class AbstractNodeService(val net: MessagingService) {
try {
val req = message.data.deserialize<Q>()
val data = handler(req)
val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits)
net.send(msg, req.replyTo)
// If the return type R is Unit, then do not send a response
if (data.javaClass != Unit.javaClass) {
val msg = net.createMessage("$topic.${req.sessionID}", data.serialize().bits)
net.send(msg, req.replyTo)
}
} catch(e: Exception) {
exceptionConsumer(message, e)
}
@ -40,19 +45,15 @@ abstract class AbstractNodeService(val net: MessagingService) {
/**
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
* common boilerplate code. Exceptions are propagated to the messaging layer.
* common boilerplate code. Exceptions are propagated to the messaging layer. If you just want a simple
* acknowledgement response with no content, use [com.r3corda.core.messaging.Ack]
*
* @param topic the topic, without the default session ID postfix (".0)
* @param handler a function to handle the deserialised request and return a response
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
*/
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
addMessageHandler(topic: String,
crossinline handler: (Q) -> R) {
net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
val req = message.data.deserialize<Q>()
val data = handler(req)
val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits)
net.send(msg, req.replyTo)
}
addMessageHandler(topic, handler, { message: Message, exception: Exception -> throw exception })
}
}

View File

@ -1,5 +1,6 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.ServiceType
@ -35,12 +36,13 @@ abstract class NotaryService(val smm: StateMachineManager,
)
}
private fun processRequest(req: NotaryProtocol.Handshake) {
private fun processRequest(req: NotaryProtocol.Handshake): Ack {
val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient,
req.sessionID!!,
req.sendSessionID,
timestampChecker,
uniquenessProvider)
smm.add(NotaryProtocol.TOPIC, protocol)
return Ack
}
}