CORDA-540: Make Verifier work in AMQP mode (#1870)

This commit is contained in:
Viktor Kolomeyko 2017-10-17 10:44:27 +01:00 committed by GitHub
parent 63b7eb3f70
commit cac3057877
11 changed files with 123 additions and 41 deletions

View File

@ -2435,6 +2435,17 @@ public final class net.corda.core.serialization.MissingAttachmentsException exte
public <init>(List)
@org.jetbrains.annotations.NotNull public final List getIds()
##
public final class net.corda.core.serialization.ObjectWithCompatibleContext extends java.lang.Object
public <init>(Object, net.corda.core.serialization.SerializationContext)
@org.jetbrains.annotations.NotNull public final Object component1()
@org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext component2()
@org.jetbrains.annotations.NotNull public final net.corda.core.serialization.ObjectWithCompatibleContext copy(Object, net.corda.core.serialization.SerializationContext)
public boolean equals(Object)
@org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext getContext()
@org.jetbrains.annotations.NotNull public final Object getObj()
public int hashCode()
public String toString()
##
public final class net.corda.core.serialization.SerializationAPIKt extends java.lang.Object
@org.jetbrains.annotations.NotNull public static final net.corda.core.serialization.SerializedBytes serialize(Object, net.corda.core.serialization.SerializationFactory, net.corda.core.serialization.SerializationContext)
##
@ -2476,6 +2487,7 @@ public abstract class net.corda.core.serialization.SerializationFactory extends
public <init>()
public final Object asCurrent(kotlin.jvm.functions.Function1)
@org.jetbrains.annotations.NotNull public abstract Object deserialize(net.corda.core.utilities.ByteSequence, Class, net.corda.core.serialization.SerializationContext)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.ObjectWithCompatibleContext deserializeWithCompatibleContext(net.corda.core.utilities.ByteSequence, Class, net.corda.core.serialization.SerializationContext)
@org.jetbrains.annotations.Nullable public final net.corda.core.serialization.SerializationContext getCurrentContext()
@org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext getDefaultContext()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializedBytes serialize(Object, net.corda.core.serialization.SerializationContext)

View File

@ -2,6 +2,7 @@ package net.corda.core.contracts
import net.corda.core.identity.Party
import net.corda.core.internal.extractFile
import net.corda.core.serialization.CordaSerializable
import java.io.FileNotFoundException
import java.io.InputStream
import java.io.OutputStream
@ -17,6 +18,7 @@ import java.util.jar.JarInputStream
* - Legal documents
* - Facts generated by oracles which might be reused a lot
*/
@CordaSerializable
interface Attachment : NamedByHash {
fun open(): InputStream
fun openAsJAR(): JarInputStream {

View File

@ -19,7 +19,7 @@ sealed class TransactionVerificationException(val txId: SecureHash, message: Str
class ContractConstraintRejection(txId: SecureHash, contractClass: String)
: TransactionVerificationException(txId, "Contract constraints failed for $contractClass", null)
class MissingAttachmentRejection(txId: SecureHash, contractClass: String)
class MissingAttachmentRejection(txId: SecureHash, val contractClass: String)
: TransactionVerificationException(txId, "Contract constraints failed, could not find attachment for: $contractClass", null)
class ContractCreationError(txId: SecureHash, contractClass: String, cause: Throwable)

View File

@ -8,6 +8,8 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.sequence
import java.sql.Blob
data class ObjectWithCompatibleContext<out T : Any>(val obj: T, val context: SerializationContext)
/**
* An abstraction for serializing and deserializing objects, with support for versioning of the wire format via
* a header / prefix in the bytes.
@ -22,6 +24,16 @@ abstract class SerializationFactory {
*/
abstract fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): T
/**
* Deserialize the bytes in to an object, using the prefixed bytes to determine the format.
*
* @param byteSequence The bytes to deserialize, including a format header prefix.
* @param clazz The class or superclass or the object to be deserialized, or [Any] or [Object] if unknown.
* @param context A context that configures various parameters to deserialization.
* @return deserialized object along with [SerializationContext] to identify encoding used.
*/
abstract fun <T : Any> deserializeWithCompatibleContext(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): ObjectWithCompatibleContext<T>
/**
* Serialize an object to bytes using the preferred serialization format version from the context.
*
@ -87,6 +99,8 @@ abstract class SerializationFactory {
}
}
typealias VersionHeader = ByteSequence
/**
* Parameters to serialization and deserialization.
*/
@ -94,7 +108,7 @@ interface SerializationContext {
/**
* When serializing, use the format this header sequence represents.
*/
val preferredSerializationVersion: ByteSequence
val preferredSerializationVersion: VersionHeader
/**
* The class loader to use for deserialization.
*/
@ -147,7 +161,7 @@ interface SerializationContext {
/**
* Helper method to return a new context based on this context but with serialization using the format this header sequence represents.
*/
fun withPreferredSerializationVersion(versionHeader: ByteSequence): SerializationContext
fun withPreferredSerializationVersion(versionHeader: VersionHeader): SerializationContext
/**
* The use case that we are serializing for, since it influences the implementations chosen.
@ -174,6 +188,15 @@ inline fun <reified T : Any> ByteSequence.deserialize(serializationFactory: Seri
return serializationFactory.deserialize(this, T::class.java, context)
}
/**
* Additionally returns [SerializationContext] which was used for encoding.
* It might be helpful to know [SerializationContext] to use the same encoding in the reply.
*/
inline fun <reified T : Any> ByteSequence.deserializeWithCompatibleContext(serializationFactory: SerializationFactory = SerializationFactory.defaultFactory,
context: SerializationContext = serializationFactory.defaultContext): ObjectWithCompatibleContext<T> {
return serializationFactory.deserializeWithCompatibleContext(this, T::class.java, context)
}
/**
* Convenience extension method for deserializing SerializedBytes with type matching, utilising the defaults.
*/

View File

@ -1,8 +1,8 @@
package net.corda.nodeapi
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.serialization.*
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.sequence
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.reader.MessageUtil
@ -20,12 +20,15 @@ object VerifierApi {
val responseAddress: SimpleString
) {
companion object {
fun fromClientMessage(message: ClientMessage): VerificationRequest {
return VerificationRequest(
fun fromClientMessage(message: ClientMessage): ObjectWithCompatibleContext<VerificationRequest> {
val bytes = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val bytesSequence = bytes.sequence()
val (transaction, context) = bytesSequence.deserializeWithCompatibleContext<LedgerTransaction>()
val request = VerificationRequest(
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }.deserialize(),
MessageUtil.getJMSReplyTo(message)
)
transaction,
MessageUtil.getJMSReplyTo(message))
return ObjectWithCompatibleContext(request, context)
}
}
@ -49,11 +52,11 @@ object VerifierApi {
}
}
fun writeToClientMessage(message: ClientMessage) {
fun writeToClientMessage(message: ClientMessage, context: SerializationContext) {
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
if (exception != null) {
message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize().bytes)
message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize(context = context).bytes)
}
}
}
}
}

View File

@ -3,6 +3,6 @@ package net.corda.nodeapi.internal.serialization
import net.corda.core.crypto.sha256
import net.corda.core.internal.AbstractAttachment
class GeneratedAttachment(bytes: ByteArray) : AbstractAttachment({ bytes }) {
class GeneratedAttachment(val bytes: ByteArray) : AbstractAttachment({ bytes }) {
override val id = bytes.sha256()
}

View File

@ -19,6 +19,7 @@ import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.OpaqueBytes
import net.corda.nodeapi.internal.AttachmentsClassLoader
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.io.NotSerializableException
import java.util.*
@ -37,7 +38,7 @@ object NotSupportedSerializationScheme : SerializationScheme {
override fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T> = doThrow()
}
data class SerializationContextImpl(override val preferredSerializationVersion: ByteSequence,
data class SerializationContextImpl(override val preferredSerializationVersion: VersionHeader,
override val deserializationClassLoader: ClassLoader,
override val whitelist: ClassWhitelist,
override val properties: Map<Any, Any>,
@ -88,36 +89,54 @@ data class SerializationContextImpl(override val preferredSerializationVersion:
})
}
override fun withPreferredSerializationVersion(versionHeader: ByteSequence) = copy(preferredSerializationVersion = versionHeader)
override fun withPreferredSerializationVersion(versionHeader: VersionHeader) = copy(preferredSerializationVersion = versionHeader)
}
private const val HEADER_SIZE: Int = 8
fun ByteSequence.obtainHeaderSignature(): VersionHeader = take(HEADER_SIZE).copy()
open class SerializationFactoryImpl : SerializationFactory() {
private val creator: List<StackTraceElement> = Exception().stackTrace.asList()
private val registeredSchemes: MutableCollection<SerializationScheme> = Collections.synchronizedCollection(mutableListOf())
private val logger = LoggerFactory.getLogger(javaClass)
// TODO: This is read-mostly. Probably a faster implementation to be found.
private val schemes: ConcurrentHashMap<Pair<ByteSequence, SerializationContext.UseCase>, SerializationScheme> = ConcurrentHashMap()
private fun schemeFor(byteSequence: ByteSequence, target: SerializationContext.UseCase): SerializationScheme {
private fun schemeFor(byteSequence: ByteSequence, target: SerializationContext.UseCase): Pair<SerializationScheme, VersionHeader> {
// truncate sequence to 8 bytes, and make sure it's a copy to avoid holding onto large ByteArrays
return schemes.computeIfAbsent(byteSequence.take(HEADER_SIZE).copy() to target) {
val lookupKey = byteSequence.obtainHeaderSignature() to target
val scheme = schemes.computeIfAbsent(lookupKey) {
registeredSchemes
.filter { scheme -> scheme.canDeserializeVersion(it.first, it.second) }
.forEach { return@computeIfAbsent it }
logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes")
NotSupportedSerializationScheme
}
return scheme to lookupKey.first
}
@Throws(NotSerializableException::class)
override fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): T {
return asCurrent { withCurrentContext(context) { schemeFor(byteSequence, context.useCase).deserialize(byteSequence, clazz, context) } }
return asCurrent { withCurrentContext(context) { schemeFor(byteSequence, context.useCase).first.deserialize(byteSequence, clazz, context) } }
}
@Throws(NotSerializableException::class)
override fun <T : Any> deserializeWithCompatibleContext(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): ObjectWithCompatibleContext<T> {
return asCurrent {
withCurrentContext(context) {
val (scheme, versionHeader) = schemeFor(byteSequence, context.useCase)
val deserializedObject = scheme.deserialize(byteSequence, clazz, context)
ObjectWithCompatibleContext(deserializedObject, context.withPreferredSerializationVersion(versionHeader))
}
}
}
override fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T> {
return asCurrent { withCurrentContext(context) { schemeFor(context.preferredSerializationVersion, context.useCase).serialize(obj, context) } }
return asCurrent { withCurrentContext(context) { schemeFor(context.preferredSerializationVersion, context.useCase).first.serialize(obj, context) } }
}
fun registerScheme(scheme: SerializationScheme) {

View File

@ -10,7 +10,6 @@ import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.internal.AttachmentsClassLoader
import net.corda.nodeapi.internal.AttachmentsClassLoaderTests
import net.corda.testing.node.MockAttachmentStorage
@ -108,16 +107,6 @@ class CordaClassResolverTests {
val emptyMapClass = mapOf<Any, Any>().javaClass
}
val factory: SerializationFactory = object : SerializationFactory() {
override fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): T {
TODO("not implemented")
}
override fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T> {
TODO("not implemented")
}
}
private val emptyWhitelistContext: SerializationContext = SerializationContextImpl(KryoHeaderV0_1, this.javaClass.classLoader, EmptyWhitelist, emptyMap(), true, SerializationContext.UseCase.P2P)
private val allButBlacklistedContext: SerializationContext = SerializationContextImpl(KryoHeaderV0_1, this.javaClass.classLoader, AllButBlacklisted, emptyMap(), true, SerializationContext.UseCase.P2P)

View File

@ -103,6 +103,10 @@ class TestSerializationFactory : SerializationFactory() {
return delegate!!.deserialize(byteSequence, clazz, context)
}
override fun <T : Any> deserializeWithCompatibleContext(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): ObjectWithCompatibleContext<T> {
return delegate!!.deserializeWithCompatibleContext(byteSequence, clazz, context)
}
override fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T> {
return delegate!!.serialize(obj, context)
}
@ -147,7 +151,7 @@ class TestSerializationContext : SerializationContext {
return TestSerializationContext().apply { delegate = this@TestSerializationContext.delegate!!.withWhitelisted(clazz) }
}
override fun withPreferredSerializationVersion(versionHeader: ByteSequence): SerializationContext {
override fun withPreferredSerializationVersion(versionHeader: VersionHeader): SerializationContext {
return TestSerializationContext().apply { delegate = this@TestSerializationContext.delegate!!.withPreferredSerializationVersion(versionHeader) }
}

View File

@ -1,5 +1,6 @@
package net.corda.verifier
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
@ -18,6 +19,7 @@ import net.corda.testing.driver.NetworkMapStartStrategy
import org.junit.Test
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertTrue
import kotlin.test.assertNotNull
class VerifierTests {
@ -50,6 +52,21 @@ class VerifierTests {
}
}
@Test
fun `single verification fails`() {
verifierDriver(extraCordappPackagesToScan = listOf("net.corda.finance.contracts")) {
val aliceFuture = startVerificationRequestor(ALICE.name)
// Generate transactions as per usual, but then remove attachments making transaction invalid.
val transactions = generateTransactions(1).map { it.copy(attachments = emptyList()) }
val alice = aliceFuture.get()
startVerifier(alice)
alice.waitUntilNumberOfVerifiers(1)
val verificationRejection = transactions.map { alice.verifyTransaction(it) }.transpose().get().single()
assertTrue { verificationRejection is TransactionVerificationException.MissingAttachmentRejection}
assertTrue { verificationRejection!!.message!!.contains("Contract constraints failed, could not find attachment") }
}
}
@Test
fun `multiple verifiers work with requestor`() {
verifierDriver {

View File

@ -17,17 +17,16 @@ import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.getValue
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.*
import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import java.nio.file.Path
import java.nio.file.Paths
data class VerifierConfiguration(
override val baseDirectory: Path,
val config: Config
val config: Config // NB: This property is being used via reflection.
) : NodeSSLConfiguration {
val nodeHostAndPort: NetworkHostAndPort by config
override val keyStorePassword: String by config
@ -66,7 +65,7 @@ class Verifier {
val consumer = session.createConsumer(VERIFICATION_REQUESTS_QUEUE_NAME)
val replyProducer = session.createProducer()
consumer.setMessageHandler {
val request = VerifierApi.VerificationRequest.fromClientMessage(it)
val (request, context) = VerifierApi.VerificationRequest.fromClientMessage(it)
log.debug { "Received verification request with id ${request.verificationId}" }
val error = try {
request.transaction.verify()
@ -77,7 +76,7 @@ class Verifier {
}
val reply = session.createMessage(false)
val response = VerifierApi.VerificationResponse(request.verificationId, error)
response.writeToClientMessage(reply)
response.writeToClientMessage(reply, context)
replyProducer.send(request.responseAddress, reply)
it.acknowledge()
}
@ -88,13 +87,18 @@ class Verifier {
private fun initialiseSerialization() {
SerializationDefaults.SERIALIZATION_FACTORY = SerializationFactoryImpl().apply {
registerScheme(KryoVerifierSerializationScheme())
registerScheme(KryoVerifierSerializationScheme)
registerScheme(AMQPVerifierSerializationScheme)
}
/**
* Even though default context is set to Kryo P2P, the encoding will be adjusted depending on the incoming
* request received, see use of [context] in [main] method.
*/
SerializationDefaults.P2P_CONTEXT = KRYO_P2P_CONTEXT
}
}
class KryoVerifierSerializationScheme : AbstractKryoSerializationScheme() {
private object KryoVerifierSerializationScheme : AbstractKryoSerializationScheme() {
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
return byteSequence == KryoHeaderV0_1 && target == SerializationContext.UseCase.P2P
}
@ -102,4 +106,13 @@ class Verifier {
override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
}
private object AMQPVerifierSerializationScheme : AbstractAMQPSerializationScheme() {
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
return (byteSequence == AmqpHeaderV1_0 && (target == SerializationContext.UseCase.P2P))
}
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory = throw UnsupportedOperationException()
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory = throw UnsupportedOperationException()
}
}