CORDA-540: Fixes to make "node" tests pass in AMQP mode, part 1 (#1455)

This commit is contained in:
Viktor Kolomeyko 2017-09-08 13:46:22 +01:00 committed by GitHub
parent 691d9ea0bc
commit 8710090887
6 changed files with 31 additions and 13 deletions

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.TransactionSignature
import net.corda.core.utilities.toBase58String
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable
import java.security.PublicKey
/**
@ -13,6 +14,7 @@ import java.security.PublicKey
* old and new notaries. Output states can be computed by applying the notary modification to corresponding inputs
* on the fly.
*/
@CordaSerializable
data class NotaryChangeWireTransaction(
override val inputs: List<StateRef>,
override val notary: Party,

View File

@ -15,7 +15,7 @@ import kotlin.reflect.jvm.javaConstructor
open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPSerializer<Any> {
override val type: Type get() = clazz
open val kotlinConstructor = constructorForDeserialization(clazz)
val javaConstructor by lazy { kotlinConstructor?.javaConstructor?.apply { isAccessible = true } }
val javaConstructor by lazy { kotlinConstructor?.javaConstructor }
private val logger = loggerFor<ObjectSerializer>()

View File

@ -13,6 +13,7 @@ import kotlin.reflect.KFunction
import kotlin.reflect.KParameter
import kotlin.reflect.full.findAnnotation
import kotlin.reflect.full.primaryConstructor
import kotlin.reflect.jvm.isAccessible
import kotlin.reflect.jvm.javaType
/**
@ -48,7 +49,9 @@ internal fun constructorForDeserialization(type: Type): KFunction<Any>? {
preferredCandidate = kotlinConstructor
}
}
return preferredCandidate ?: throw NotSerializableException("No constructor for deserialization found for $clazz.")
return preferredCandidate?.apply { isAccessible = true}
?: throw NotSerializableException("No constructor for deserialization found for $clazz.")
} else {
return null
}

View File

@ -2,6 +2,7 @@ package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.core.CordaRuntimeException
import net.corda.core.CordaThrowable
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
import net.corda.nodeapi.internal.serialization.amqp.constructorForDeserialization
@ -9,6 +10,11 @@ import net.corda.nodeapi.internal.serialization.amqp.propertiesForSerialization
import java.io.NotSerializableException
class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<Throwable, ThrowableSerializer.ThrowableProxy>(Throwable::class.java, ThrowableProxy::class.java, factory) {
companion object {
private val logger = loggerFor<ThrowableSerializer>()
}
override val additionalSerializers: Iterable<CustomSerializer<out Any>> = listOf(StackTraceElementSerializer(factory))
override fun toProxy(obj: Throwable): ThrowableProxy {
@ -33,7 +39,7 @@ class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<T
override fun fromProxy(proxy: ThrowableProxy): Throwable {
try {
// TODO: This will need reworking when we have multiple class loaders
val clazz = Class.forName(proxy.exceptionClass, false, this.javaClass.classLoader)
val clazz = Class.forName(proxy.exceptionClass, false, factory.classloader)
// If it is CordaException or CordaRuntimeException, we can seek any constructor and then set the properties
// Otherwise we just make a CordaRuntimeException
if (CordaThrowable::class.java.isAssignableFrom(clazz) && Throwable::class.java.isAssignableFrom(clazz)) {
@ -50,7 +56,7 @@ class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<T
}
}
} catch (e: Exception) {
// If attempts to rebuild the exact exception fail, we fall through and build a runtime exception.
logger.warn("Unexpected exception de-serializing throwable: ${proxy.exceptionClass}. Converting to CordaRuntimeException.", e)
}
// If the criteria are not met or we experience an exception constructing the exception, we fall back to our own unchecked exception.
return CordaRuntimeException(proxy.exceptionClass).apply {

View File

@ -40,10 +40,10 @@ import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
import net.corda.nodeapi.internal.serialization.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger
import rx.Observable
import rx.subjects.PublishSubject
import java.io.NotSerializableException
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@ -609,13 +609,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val serialized = try {
message.serialize()
} catch (e: KryoException) {
if (message !is ErrorSessionEnd || message.errorResponse == null) throw e
logger.warn("Something in ${message.errorResponse.javaClass.name} is not serialisable. " +
"Instead sending back an exception which is serialisable to ensure session end occurs properly.", e)
// The subclass may have overridden toString so we use that
val exMessage = message.errorResponse.let { if (it.javaClass != FlowException::class.java) it.toString() else it.message }
message.copy(errorResponse = FlowException(exMessage)).serialize()
} catch (e: Exception) {
when(e) {
// Handling Kryo and AMQP serialization problems. Unfortunately the two exception types do not share much of a common exception interface.
is KryoException,
is NotSerializableException -> {
if (message !is ErrorSessionEnd || message.errorResponse == null) throw e
logger.warn("Something in ${message.errorResponse.javaClass.name} is not serialisable. " +
"Instead sending back an exception which is serialisable to ensure session end occurs properly.", e)
// The subclass may have overridden toString so we use that
val exMessage = message.errorResponse.let { if (it.javaClass != FlowException::class.java) it.toString() else it.message }
message.copy(errorResponse = FlowException(exMessage)).serialize()
}
else -> throw e
}
}
serviceHub.networkService.apply {

View File

@ -122,7 +122,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
resetTestSerialization()
}
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, private val myIdentity: Party) : LinearState, SchedulableState {
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, val myIdentity: Party) : LinearState, SchedulableState {
override val participants: List<AbstractParty>
get() = listOf(myIdentity)