Kryo serialisation whitelisting and misc enhancements. (#267)

Kryo serialisation whitelisting and misc enhancements
This commit is contained in:
Rick Parker
2017-02-28 08:12:18 +00:00
committed by GitHub
parent 3d04c91e61
commit c4c4c51d7d
103 changed files with 995 additions and 376 deletions

View File

@ -8,6 +8,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.flows.ServiceRequestMessage
@ -118,6 +119,7 @@ class P2PMessagingTest : NodeBasedTest() {
return net.sendRequest<Any>(javaClass.name, request, target)
}
@CordaSerializable
private data class TestRequest(override val sessionID: Long = random63BitValue(),
override val replyTo: SingleMessageRecipient) : ServiceRequestMessage
}

View File

@ -0,0 +1,52 @@
package net.corda.node.serialization
import com.esotericsoftware.kryo.KryoException
import com.google.common.net.HostAndPort
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.serialization.SerializationCustomization
import org.apache.activemq.artemis.api.core.SimpleString
import rx.Notification
import java.math.BigDecimal
import java.time.LocalDate
import java.time.Period
import java.util.*
class DefaultWhitelist : CordaPluginRegistry() {
override fun customizeSerialization(custom: SerializationCustomization): Boolean {
custom.apply {
addToWhitelist(Array<Any>(0, {}).javaClass)
addToWhitelist(Notification::class.java)
addToWhitelist(Notification.Kind::class.java)
addToWhitelist(ArrayList::class.java)
addToWhitelist(listOf<Any>().javaClass) // EmptyList
addToWhitelist(Pair::class.java)
addToWhitelist(ByteArray::class.java)
addToWhitelist(UUID::class.java)
addToWhitelist(LinkedHashSet::class.java)
addToWhitelist(setOf<Unit>().javaClass) // EmptySet
addToWhitelist(Currency::class.java)
addToWhitelist(listOf(Unit).javaClass) // SingletonList
addToWhitelist(setOf(Unit).javaClass) // SingletonSet
addToWhitelist(mapOf(Unit to Unit).javaClass) // SingletonSet
addToWhitelist(HostAndPort::class.java)
addToWhitelist(SimpleString::class.java)
addToWhitelist(KryoException::class.java)
addToWhitelist(StringBuffer::class.java)
addToWhitelist(Unit::class.java)
addToWhitelist(java.io.ByteArrayInputStream::class.java)
addToWhitelist(java.lang.Class::class.java)
addToWhitelist(java.math.BigDecimal::class.java)
addToWhitelist(java.security.KeyPair::class.java)
addToWhitelist(java.time.Duration::class.java)
addToWhitelist(java.time.Instant::class.java)
addToWhitelist(java.time.LocalDate::class.java)
addToWhitelist(java.util.Collections.singletonMap("A", "B").javaClass)
addToWhitelist(java.util.HashMap::class.java)
addToWhitelist(java.util.LinkedHashMap::class.java)
addToWhitelist(BigDecimal::class.java)
addToWhitelist(LocalDate::class.java)
addToWhitelist(Period::class.java)
}
return true
}
}

View File

@ -7,6 +7,7 @@ import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.read
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound
@ -65,6 +66,7 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
val hostAndPort: HostAndPort
}
@CordaSerializable
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
override val queueName: String get() = NETWORK_MAP_QUEUE
}
@ -80,6 +82,7 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
* @param queueName The name of the queue this address is associated with.
* @param hostAndPort The address of the node.
*/
@CordaSerializable
data class NodeAddress(override val queueName: String, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
companion object {
fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {

View File

@ -3,50 +3,22 @@
package net.corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer
import de.javakaffee.kryoserializers.guava.*
import net.corda.contracts.asset.Cash
import net.corda.core.ErrorOr
import net.corda.core.contracts.*
import net.corda.core.crypto.*
import net.corda.core.flows.FlowException
import net.corda.core.flows.IllegalFlowLogicException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.*
import net.corda.core.toFuture
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.internal.AbstractNode
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.statemachine.FlowSessionException
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.commons.fileupload.MultipartStream
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Notification
import rx.Observable
import java.io.BufferedInputStream
import java.time.Instant
import java.util.*
/** Global RPC logger */
val rpcLog: Logger by lazy { LoggerFactory.getLogger("net.corda.rpc") }
@ -95,6 +67,7 @@ fun requirePermission(permission: String) {
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked
* method.
*/
@CordaSerializable
open class RPCException(msg: String, cause: Throwable?) : RuntimeException(msg, cause) {
constructor(msg: String) : this(msg, null)
@ -112,129 +85,20 @@ object ClassSerializer : Serializer<Class<*>>() {
}
}
@CordaSerializable
class PermissionException(msg: String) : RuntimeException(msg)
// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null) : Kryo() {
companion object {
private val pluginRegistries: List<CordaPluginRegistry> by lazy {
val unusedKryo = Kryo()
// Sorting required to give a stable ordering, as Kryo allocates integer tokens for each registered class.
ServiceLoader.load(CordaPluginRegistry::class.java).toList().filter { it.registerRPCKryoTypes(unusedKryo) }.sortedBy { it.javaClass.name }
}
}
private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null) : CordaKryo(makeStandardClassResolver()) {
init {
isRegistrationRequired = true
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
// no-arg constructor available.
instantiatorStrategy = Kryo.DefaultInstantiatorStrategy(StdInstantiatorStrategy())
DefaultKryoCustomizer.customize(this)
register(Arrays.asList("").javaClass, ArraysAsListSerializer())
register(Instant::class.java, ReferencesAwareJavaSerializer)
register(SignedTransaction::class.java, ImmutableClassSerializer(SignedTransaction::class))
register(WireTransaction::class.java, WireTransactionSerializer)
register(SerializedBytes::class.java, SerializedBytesSerializer)
register(AnonymousParty::class.java)
register(Party::class.java)
register(Array<Any>(0,{}).javaClass)
// RPC specific classes
register(Class::class.java, ClassSerializer)
UnmodifiableCollectionsSerializer.registerSerializers(this)
ImmutableListSerializer.registerSerializers(this)
ImmutableSetSerializer.registerSerializers(this)
ImmutableSortedSetSerializer.registerSerializers(this)
ImmutableMapSerializer.registerSerializers(this)
ImmutableMultimapSerializer.registerSerializers(this)
register(BufferedInputStream::class.java, InputStreamSerializer)
register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer)
register(MultipartStream.ItemInputStream::class.java, InputStreamSerializer)
noReferencesWithin<WireTransaction>()
register(ErrorOr::class.java)
register(MarshalledObservation::class.java, ImmutableClassSerializer(MarshalledObservation::class))
register(Notification::class.java)
register(Notification.Kind::class.java)
register(ArrayList::class.java)
register(listOf<Any>().javaClass) // EmptyList
register(IllegalStateException::class.java)
register(Pair::class.java)
register(StateMachineUpdate.Added::class.java)
register(StateMachineUpdate.Removed::class.java)
register(StateMachineInfo::class.java)
register(DigitalSignature.WithKey::class.java)
register(DigitalSignature.LegallyIdentifiable::class.java)
register(ByteArray::class.java)
register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer)
register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer)
register(CompositeKey.Leaf::class.java)
register(CompositeKey.Node::class.java)
register(Vault::class.java)
register(Vault.Update::class.java)
register(StateMachineRunId::class.java)
register(StateMachineTransactionMapping::class.java)
register(UUID::class.java)
register(UniqueIdentifier::class.java)
register(LinkedHashSet::class.java)
register(LinkedHashMap::class.java)
register(StateAndRef::class.java)
register(setOf<Unit>().javaClass) // EmptySet
register(StateRef::class.java)
register(SecureHash.SHA256::class.java)
register(TransactionState::class.java)
register(Cash.State::class.java)
register(Amount::class.java)
register(Issued::class.java)
register(PartyAndReference::class.java)
register(OpaqueBytes::class.java)
register(Currency::class.java)
register(Cash::class.java)
register(Cash.Clauses.ConserveAmount::class.java)
register(listOf(Unit).javaClass) // SingletonList
register(setOf(Unit).javaClass) // SingletonSet
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)
register(NetworkMapCache.MapChange.Added::class.java)
register(NetworkMapCache.MapChange.Removed::class.java)
register(NetworkMapCache.MapChange.Modified::class.java)
register(ArtemisMessagingComponent.NodeAddress::class.java)
register(NetworkMapAddress::class.java)
register(ServiceInfo::class.java)
register(ServiceType.getServiceType("ab", "ab").javaClass)
register(ServiceType.parse("ab").javaClass)
register(WorldCoordinate::class.java)
register(HostAndPort::class.java)
register(SimpleString::class.java)
register(ServiceEntry::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(Array<StackTraceElement>::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, obj -> })
register(FlowException::class.java)
register(FlowSessionException::class.java)
register(IllegalFlowLogicException::class.java)
register(RuntimeException::class.java)
register(IllegalArgumentException::class.java)
register(ArrayIndexOutOfBoundsException::class.java)
register(IndexOutOfBoundsException::class.java)
register(NoSuchElementException::class.java)
register(RPCException::class.java)
register(PermissionException::class.java)
register(Throwable::class.java)
register(FlowHandle::class.java)
register(KryoException::class.java)
register(StringBuffer::class.java)
register(Unit::class.java)
for ((_flow, argumentTypes) in AbstractNode.defaultFlowWhiteList) {
for (type in argumentTypes) {
register(type)
}
}
pluginRegistries.forEach { it.registerRPCKryoTypes(this) }
}
// TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes

View File

@ -16,6 +16,7 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.ServiceType
import net.corda.core.random63BitValue
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -73,27 +74,33 @@ interface NetworkMapService {
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
@CordaSerializable
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
class QueryIdentityRequest(val identity: Party,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long) : ServiceRequestMessage
@CordaSerializable
data class QueryIdentityResponse(val node: NodeInfo?)
class RegistrationRequest(val wireReg: WireNodeRegistration,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
@CordaSerializable
data class RegistrationResponse(val success: Boolean)
class SubscribeRequest(val subscribe: Boolean,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
@CordaSerializable
data class SubscribeResponse(val confirmed: Boolean)
@CordaSerializable
data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients)
@CordaSerializable
data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
}
@ -331,6 +338,7 @@ abstract class AbstractNetworkMapService
*/
// TODO: This might alternatively want to have a node and party, with the node being optional, so registering a node
// involves providing both node and paerty, and deregistering a node involves a request with party but no node.
@CordaSerializable
class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) {
/**
* Build a node registration in wire format.
@ -348,6 +356,7 @@ class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemo
/**
* A node registration and its signature as a pair.
*/
@CordaSerializable
class WireNodeRegistration(raw: SerializedBytes<NodeRegistration>, sig: DigitalSignature.WithKey) : SignedData<NodeRegistration>(raw, sig) {
@Throws(IllegalArgumentException::class)
override fun verifyData(data: NodeRegistration) {
@ -355,6 +364,7 @@ class WireNodeRegistration(raw: SerializedBytes<NodeRegistration>, sig: DigitalS
}
}
@CordaSerializable
sealed class NodeMapError : Exception() {
/** Thrown if the signature on the node info does not match the public key for the identity */
@ -367,5 +377,8 @@ sealed class NodeMapError : Exception() {
class UnknownChangeType : NodeMapError()
}
@CordaSerializable
data class LastAcknowledgeInfo(val mapVersion: Int)
@CordaSerializable
data class NodeRegistrationInfo(val reg: NodeRegistration, val mapVersion: Int)

View File

@ -4,6 +4,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.serialization.threadLocalStorageKryo
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.utilities.*
@ -38,7 +39,7 @@ class DBCheckpointStorage : CheckpointStorage {
private val checkpointStorage = synchronizedMap(CheckpointMap())
override fun addCheckpoint(checkpoint: Checkpoint) {
checkpointStorage.put(checkpoint.id, checkpoint.serialize())
checkpointStorage.put(checkpoint.id, checkpoint.serialize(threadLocalStorageKryo(), true))
}
override fun removeCheckpoint(checkpoint: Checkpoint) {

View File

@ -9,6 +9,7 @@ import net.corda.core.*
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.AcceptsFileUpload
import java.io.FilterInputStream
@ -48,6 +49,7 @@ class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : Atta
require(storePath.isDirectory()) { "$storePath must be a directory" }
}
@CordaSerializable
class OnDiskHashMismatch(val file: Path, val actual: SecureHash) : Exception() {
override fun toString() = "File $file hashed to $actual: corruption in attachment store?"
}

View File

@ -2,8 +2,10 @@ package net.corda.node.services.statemachine
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowException
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.UntrustworthyData
@CordaSerializable
interface SessionMessage
data class SessionInit(val initiatorSessionId: Long, val flowName: String, val firstPayload: Any?) : SessionMessage

View File

@ -369,7 +369,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun quasarKryo(): Kryo {
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
return createKryo(serializer.kryo)
return createKryo(serializer.kryo).apply {
// Because we like to stick a Kryo object in a ThreadLocal to speed things up a bit, we can end up trying to
// serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so
// we avoid it here. This is checkpointing specific.
register(Kryo::class,
read = { kryo, input -> createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) },
write = { kryo, output, obj -> }
)
}
}
private fun <T> createFiber(logic: FlowLogic<T>): FlowStateMachineImpl<T> {

View File

@ -1,23 +1,26 @@
package net.corda.node.services.transactions
import bftsmart.tom.ServiceProxy
import bftsmart.tom.MessageContext
import bftsmart.tom.ServiceProxy
import bftsmart.tom.ServiceReplica
import bftsmart.tom.server.defaultservices.DefaultRecoverable
import bftsmart.tom.server.defaultservices.DefaultReplier
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.utilities.JDBCHashMap
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import java.util.LinkedHashMap
import java.util.*
@CordaSerializable
enum class RequestType {
Get,
Put
}
/** Sent from [BFTSmartClient] to [BFTSmartServer] */
@CordaSerializable
data class Request(val type: RequestType, val data: Any)
class BFTSmartClient<K: Any, V: Any>(id: Int) {

View File

@ -1,8 +1,11 @@
package net.corda.node.utilities
import net.corda.core.serialization.CordaSerializable
/**
* Enum for when adding/removing something, for example adding or removing an entry in a directory.
*/
@CordaSerializable
enum class AddOrRemove {
ADD,
REMOVE

View File

@ -3,6 +3,7 @@ package net.corda.node.utilities
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.serialization.threadLocalStorageKryo
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import org.jetbrains.exposed.sql.*
@ -64,11 +65,11 @@ fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit>
return blob
}
fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(), finalizables)
fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(threadLocalStorageKryo(), true), finalizables)
fun <T : Any> bytesFromBlob(blob: Blob): SerializedBytes<T> {
try {
return SerializedBytes(blob.getBytes(0, blob.length().toInt()))
return SerializedBytes(blob.getBytes(0, blob.length().toInt()), true)
} finally {
blob.free()
}

View File

@ -1,5 +1,6 @@
package net.corda.node.utilities.registration
import net.corda.core.serialization.CordaSerializable
import org.bouncycastle.pkcs.PKCS10CertificationRequest
import java.security.cert.Certificate
@ -12,4 +13,5 @@ interface NetworkRegistrationService {
fun retrieveCertificates(requestId: String): Array<Certificate>?
}
@CordaSerializable
class CertificateRequestException(message: String) : Exception(message)

View File

@ -1,3 +1,4 @@
# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry
net.corda.node.services.NotaryChange$Plugin
net.corda.node.services.persistence.DataVending$Plugin
net.corda.node.serialization.DefaultWhitelist

View File

@ -12,10 +12,10 @@ import net.corda.core.node.recordTransactions
import net.corda.core.node.services.VaultService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase

View File

@ -10,7 +10,6 @@ import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.FinalityFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService