mirror of
https://github.com/corda/corda.git
synced 2025-05-08 11:38:09 +00:00
In checkpoints, serialize hash of attachment instead of its data (#543)
This commit is contained in:
parent
e75a24937d
commit
cb3522588f
@ -7,9 +7,7 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogicRef
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import java.io.FileNotFoundException
|
||||
import java.io.InputStream
|
||||
@ -513,3 +511,21 @@ interface Attachment : NamedByHash {
|
||||
throw FileNotFoundException(path)
|
||||
}
|
||||
}
|
||||
|
||||
abstract class AbstractAttachment(dataLoader: () -> ByteArray) : Attachment {
|
||||
companion object {
|
||||
fun SerializeAsTokenContext.attachmentDataLoader(id: SecureHash): () -> ByteArray {
|
||||
val storage = serviceHub.storageService.attachments
|
||||
return {
|
||||
val a = storage.openAttachment(id) ?: throw MissingAttachmentsException(listOf(id))
|
||||
if (a is AbstractAttachment) a.attachmentData else a.open().use { it.readBytes() }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected val attachmentData: ByteArray by lazy(dataLoader)
|
||||
override fun open(): InputStream = attachmentData.inputStream()
|
||||
override fun equals(other: Any?) = other === this || other is Attachment && other.id == this.id
|
||||
override fun hashCode() = id.hashCode()
|
||||
override fun toString() = "${javaClass.simpleName}(id=$id)"
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken
|
||||
|
||||
/**
|
||||
* The interfaces and classes in this file allow large, singleton style classes to
|
||||
@ -74,15 +76,15 @@ class SerializeAsTokenSerializer<T : SerializeAsToken> : Serializer<T>() {
|
||||
* Then it is a case of using the companion object methods on [SerializeAsTokenSerializer] to set and clear context as necessary
|
||||
* on the Kryo instance when serializing to enable/disable tokenization.
|
||||
*/
|
||||
class SerializeAsTokenContext(toBeTokenized: Any, kryoPool: KryoPool) {
|
||||
internal val tokenToTokenized = mutableMapOf<SerializationToken, SerializeAsToken>()
|
||||
internal var readOnly = false
|
||||
class SerializeAsTokenContext(toBeTokenized: Any, kryoPool: KryoPool, val serviceHub: ServiceHub) {
|
||||
private val classNameToSingleton = mutableMapOf<String, SerializeAsToken>()
|
||||
private var readOnly = false
|
||||
|
||||
init {
|
||||
/*
|
||||
/**
|
||||
* Go ahead and eagerly serialize the object to register all of the tokens in the context.
|
||||
*
|
||||
* This results in the toToken() method getting called for any [SerializeAsStringToken] instances which
|
||||
* This results in the toToken() method getting called for any [SingletonSerializeAsToken] instances which
|
||||
* are encountered in the object graph as they are serialized by Kryo and will therefore register the token to
|
||||
* object mapping for those instances. We then immediately set the readOnly flag to stop further adhoc or
|
||||
* accidental registrations from occuring as these could not be deserialized in a deserialization-first
|
||||
@ -95,30 +97,33 @@ class SerializeAsTokenContext(toBeTokenized: Any, kryoPool: KryoPool) {
|
||||
}
|
||||
readOnly = true
|
||||
}
|
||||
|
||||
internal fun putSingleton(toBeTokenized: SerializeAsToken) {
|
||||
val className = toBeTokenized.javaClass.name
|
||||
if (className !in classNameToSingleton) {
|
||||
// Only allowable if we are in SerializeAsTokenContext init (readOnly == false)
|
||||
if (readOnly) {
|
||||
throw UnsupportedOperationException("Attempt to write token for lazy registered ${className}. All tokens should be registered during context construction.")
|
||||
}
|
||||
classNameToSingleton[className] = toBeTokenized
|
||||
}
|
||||
}
|
||||
|
||||
internal fun getSingleton(className: String) = classNameToSingleton[className] ?: throw IllegalStateException("Unable to find tokenized instance of $className in context $this")
|
||||
}
|
||||
|
||||
/**
|
||||
* A class representing a [SerializationToken] for some object that is not serializable but can be looked up
|
||||
* (when deserialized) via just the class name.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class SingletonSerializationToken private constructor(private val className: String) : SerializationToken {
|
||||
constructor(toBeTokenized: SerializeAsToken) : this(toBeTokenized.javaClass.name)
|
||||
class SingletonSerializationToken private constructor(private val className: String) : SerializationToken {
|
||||
|
||||
override fun fromToken(context: SerializeAsTokenContext): Any = context.tokenToTokenized[this] ?:
|
||||
throw IllegalStateException("Unable to find tokenized instance of $className in context $context")
|
||||
override fun fromToken(context: SerializeAsTokenContext) = context.getSingleton(className)
|
||||
|
||||
fun registerWithContext(context: SerializeAsTokenContext, toBeTokenized: SerializeAsToken) = also { context.putSingleton(toBeTokenized) }
|
||||
|
||||
companion object {
|
||||
fun registerWithContext(token: SingletonSerializationToken, toBeTokenized: SerializeAsToken, context: SerializeAsTokenContext): SerializationToken =
|
||||
if (token in context.tokenToTokenized) token else registerNewToken(token, toBeTokenized, context)
|
||||
|
||||
// Only allowable if we are in SerializeAsTokenContext init (readOnly == false)
|
||||
private fun registerNewToken(token: SingletonSerializationToken, toBeTokenized: SerializeAsToken, context: SerializeAsTokenContext): SerializationToken {
|
||||
if (context.readOnly) throw UnsupportedOperationException("Attempt to write token for lazy registered ${toBeTokenized.javaClass.name}. " +
|
||||
"All tokens should be registered during context construction.")
|
||||
context.tokenToTokenized[token] = toBeTokenized
|
||||
return token
|
||||
}
|
||||
fun <T : SerializeAsToken> singletonSerializationToken(toBeTokenized: Class<T>) = SingletonSerializationToken(toBeTokenized.name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,8 +132,7 @@ data class SingletonSerializationToken private constructor(private val className
|
||||
* to indicate which instance the token is a serialized form of.
|
||||
*/
|
||||
abstract class SingletonSerializeAsToken : SerializeAsToken {
|
||||
@Suppress("LeakingThis")
|
||||
private val token = SingletonSerializationToken(this)
|
||||
private val token = singletonSerializationToken(javaClass)
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
|
||||
override fun toToken(context: SerializeAsTokenContext) = token.registerWithContext(context, this)
|
||||
}
|
||||
|
@ -1,10 +1,13 @@
|
||||
package net.corda.flows
|
||||
|
||||
import net.corda.core.contracts.AbstractAttachment
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import java.io.InputStream
|
||||
import net.corda.core.serialization.SerializationToken
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
|
||||
/**
|
||||
* Given a set of hashes either loads from from local storage or requests them from the other peer. Downloaded
|
||||
@ -15,7 +18,7 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||
|
||||
override fun load(txid: SecureHash): Attachment? = serviceHub.storageService.attachments.openAttachment(txid)
|
||||
|
||||
override fun convert(wire: ByteArray): Attachment = ByteArrayAttachment(wire)
|
||||
override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire })
|
||||
|
||||
override fun maybeWriteToDisk(downloaded: List<Attachment>) {
|
||||
for (attachment in downloaded) {
|
||||
@ -23,11 +26,13 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||
}
|
||||
}
|
||||
|
||||
private class ByteArrayAttachment(private val wire: ByteArray) : Attachment {
|
||||
override val id: SecureHash by lazy { wire.sha256() }
|
||||
override fun open(): InputStream = wire.inputStream()
|
||||
override fun equals(other: Any?) = other === this || other is Attachment && other.id == this.id
|
||||
override fun hashCode(): Int = id.hashCode()
|
||||
override fun toString(): String = "${javaClass.simpleName}(id=$id)"
|
||||
private class FetchedAttachment(dataLoader: () -> ByteArray) : AbstractAttachment(dataLoader), SerializeAsToken {
|
||||
override val id: SecureHash by lazy { attachmentData.sha256() }
|
||||
|
||||
private class Token(private val id: SecureHash) : SerializationToken {
|
||||
override fun fromToken(context: SerializeAsTokenContext) = FetchedAttachment(context.attachmentDataLoader(id))
|
||||
}
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = Token(id)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,188 @@
|
||||
package net.corda.core.serialization
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.FetchAttachmentsFlow
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.math.BigInteger
|
||||
import java.nio.charset.StandardCharsets.UTF_8
|
||||
import java.security.KeyPair
|
||||
import java.util.zip.ZipEntry
|
||||
import java.util.zip.ZipOutputStream
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
private fun createAttachmentData(content: String) = ByteArrayOutputStream().apply {
|
||||
ZipOutputStream(this).use {
|
||||
with(it) {
|
||||
putNextEntry(ZipEntry("content"))
|
||||
write(content.toByteArray(UTF_8))
|
||||
}
|
||||
}
|
||||
}.toByteArray()
|
||||
|
||||
private fun Attachment.extractContent() = ByteArrayOutputStream().apply { extractFile("content", this) }.toString(UTF_8.name())
|
||||
|
||||
private fun MockNetwork.MockNode.attachments() = services.storageService.attachments as NodeAttachmentService
|
||||
private fun MockNetwork.MockNode.saveAttachment(content: String) = database.transaction { attachments().importAttachment(createAttachmentData(content).inputStream()) }
|
||||
private fun MockNetwork.MockNode.hackAttachment(attachmentId: SecureHash, content: String) = database.transaction { attachments().updateAttachment(attachmentId, createAttachmentData(content)) }
|
||||
|
||||
/**
|
||||
* @see NodeAttachmentService.importAttachment
|
||||
*/
|
||||
private fun NodeAttachmentService.updateAttachment(attachmentId: SecureHash, data: ByteArray) {
|
||||
with(session) {
|
||||
withTransaction {
|
||||
update(AttachmentEntity().apply {
|
||||
attId = attachmentId
|
||||
content = data
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class AttachmentSerializationTest {
|
||||
private lateinit var network: MockNetwork
|
||||
private lateinit var server: MockNetwork.MockNode
|
||||
private lateinit var client: MockNetwork.MockNode
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
network = MockNetwork()
|
||||
server = network.createNode(advertisedServices = ServiceInfo(NetworkMapService.type))
|
||||
client = network.createNode(server.info.address)
|
||||
client.disableDBCloseOnStop() // Otherwise the in-memory database may disappear (taking the checkpoint with it) while we reboot the client.
|
||||
network.runNetwork()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
network.stopNodes()
|
||||
}
|
||||
|
||||
private class ServerLogic(private val client: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
receive<String>(client).unwrap { assertEquals("ping one", it) }
|
||||
sendAndReceive<String>(client, "pong").unwrap { assertEquals("ping two", it) }
|
||||
}
|
||||
}
|
||||
|
||||
private class ClientResult(internal val attachmentContent: String)
|
||||
|
||||
private abstract class ClientLogic(server: MockNetwork.MockNode) : FlowLogic<ClientResult>() {
|
||||
internal val server = server.info.legalIdentity
|
||||
|
||||
@Suspendable
|
||||
internal fun communicate() {
|
||||
sendAndReceive<String>(server, "ping one").unwrap { assertEquals("pong", it) }
|
||||
send(server, "ping two")
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call() = ClientResult(getAttachmentContent())
|
||||
|
||||
@Suspendable // This annotation is required by the instrumentation verifier.
|
||||
internal abstract fun getAttachmentContent(): String
|
||||
}
|
||||
|
||||
private class CustomAttachment(override val id: SecureHash, internal val customContent: String) : Attachment {
|
||||
override fun open() = throw UnsupportedOperationException("Not implemented.")
|
||||
}
|
||||
|
||||
private class CustomAttachmentLogic(server: MockNetwork.MockNode, private val attachmentId: SecureHash, private val customContent: String) : ClientLogic(server) {
|
||||
@Suspendable
|
||||
override fun getAttachmentContent(): String {
|
||||
val customAttachment = CustomAttachment(attachmentId, customContent)
|
||||
communicate()
|
||||
return customAttachment.customContent
|
||||
}
|
||||
}
|
||||
|
||||
private class OpenAttachmentLogic(server: MockNetwork.MockNode, private val attachmentId: SecureHash) : ClientLogic(server) {
|
||||
@Suspendable
|
||||
override fun getAttachmentContent(): String {
|
||||
val localAttachment = serviceHub.storageService.attachments.openAttachment(attachmentId)!!
|
||||
communicate()
|
||||
return localAttachment.extractContent()
|
||||
}
|
||||
}
|
||||
|
||||
private class FetchAttachmentLogic(server: MockNetwork.MockNode, private val attachmentId: SecureHash) : ClientLogic(server) {
|
||||
@Suspendable
|
||||
override fun getAttachmentContent(): String {
|
||||
val (downloadedAttachment) = subFlow(FetchAttachmentsFlow(setOf(attachmentId), server)).downloaded
|
||||
communicate()
|
||||
return downloadedAttachment.extractContent()
|
||||
}
|
||||
}
|
||||
|
||||
private fun launchFlow(clientLogic: ClientLogic, rounds: Int) {
|
||||
server.services.registerFlowInitiator(clientLogic.javaClass, ::ServerLogic)
|
||||
client.services.startFlow(clientLogic)
|
||||
network.runNetwork(rounds)
|
||||
}
|
||||
|
||||
private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String {
|
||||
client.stop()
|
||||
client = network.createNode(server.info.address, client.id, object : MockNetwork.Factory {
|
||||
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set<ServiceInfo>, id: Int, overrideServices: Map<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): MockNetwork.MockNode {
|
||||
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
|
||||
override fun startMessagingService(rpcOps: RPCOps) {
|
||||
attachments().checkAttachmentsOnLoad = checkAttachmentsOnLoad
|
||||
super.startMessagingService(rpcOps)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
return (client.smm.allStateMachines[0].stateMachine.resultFuture.apply { network.runNetwork() }.getOrThrow() as ClientResult).attachmentContent
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `custom (and non-persisted) attachment should be saved in checkpoint`() {
|
||||
val attachmentId = SecureHash.sha256("any old data")
|
||||
launchFlow(CustomAttachmentLogic(server, attachmentId, "custom"), 1)
|
||||
assertEquals("custom", rebootClientAndGetAttachmentContent())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `custom attachment should be saved in checkpoint even if its data was persisted`() {
|
||||
val attachmentId = client.saveAttachment("genuine")
|
||||
launchFlow(CustomAttachmentLogic(server, attachmentId, "custom"), 1)
|
||||
client.hackAttachment(attachmentId, "hacked") // Should not be reloaded, checkAttachmentsOnLoad may cause next line to blow up if client attempts it.
|
||||
assertEquals("custom", rebootClientAndGetAttachmentContent())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `only the hash of a regular attachment should be saved in checkpoint`() {
|
||||
val attachmentId = client.saveAttachment("genuine")
|
||||
client.attachments().checkAttachmentsOnLoad = false // Cached by AttachmentImpl.
|
||||
launchFlow(OpenAttachmentLogic(server, attachmentId), 1)
|
||||
client.hackAttachment(attachmentId, "hacked")
|
||||
assertEquals("hacked", rebootClientAndGetAttachmentContent(false)) // Pass in false to allow non-genuine data to be loaded.
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `only the hash of a FetchAttachmentsFlow attachment should be saved in checkpoint`() {
|
||||
val attachmentId = server.saveAttachment("genuine")
|
||||
launchFlow(FetchAttachmentLogic(server, attachmentId), 2)
|
||||
client.hackAttachment(attachmentId, "hacked")
|
||||
assertEquals("hacked", rebootClientAndGetAttachmentContent(false))
|
||||
}
|
||||
}
|
@ -3,6 +3,8 @@ package net.corda.core.serialization
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import net.corda.core.node.ServiceHub
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
@ -36,10 +38,14 @@ class SerializationTokenTest {
|
||||
override fun equals(other: Any?) = other is LargeTokenizable && other.bytes.size == this.bytes.size
|
||||
}
|
||||
|
||||
companion object {
|
||||
private fun serializeAsTokenContext(toBeTokenized: Any) = SerializeAsTokenContext(toBeTokenized, storageKryo(), mock<ServiceHub>())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `write token and read tokenizable`() {
|
||||
val tokenizableBefore = LargeTokenizable()
|
||||
val context = SerializeAsTokenContext(tokenizableBefore, storageKryo())
|
||||
val context = serializeAsTokenContext(tokenizableBefore)
|
||||
SerializeAsTokenSerializer.setContext(kryo, context)
|
||||
val serializedBytes = tokenizableBefore.serialize(kryo)
|
||||
assertThat(serializedBytes.size).isLessThan(tokenizableBefore.numBytes)
|
||||
@ -52,7 +58,7 @@ class SerializationTokenTest {
|
||||
@Test
|
||||
fun `write and read singleton`() {
|
||||
val tokenizableBefore = UnitSerializeAsToken()
|
||||
val context = SerializeAsTokenContext(tokenizableBefore, storageKryo())
|
||||
val context = serializeAsTokenContext(tokenizableBefore)
|
||||
SerializeAsTokenSerializer.setContext(kryo, context)
|
||||
val serializedBytes = tokenizableBefore.serialize(kryo)
|
||||
val tokenizableAfter = serializedBytes.deserialize(kryo)
|
||||
@ -62,7 +68,7 @@ class SerializationTokenTest {
|
||||
@Test(expected = UnsupportedOperationException::class)
|
||||
fun `new token encountered after context init`() {
|
||||
val tokenizableBefore = UnitSerializeAsToken()
|
||||
val context = SerializeAsTokenContext(emptyList<Any>(), storageKryo())
|
||||
val context = serializeAsTokenContext(emptyList<Any>())
|
||||
SerializeAsTokenSerializer.setContext(kryo, context)
|
||||
tokenizableBefore.serialize(kryo)
|
||||
}
|
||||
@ -70,9 +76,9 @@ class SerializationTokenTest {
|
||||
@Test(expected = UnsupportedOperationException::class)
|
||||
fun `deserialize unregistered token`() {
|
||||
val tokenizableBefore = UnitSerializeAsToken()
|
||||
val context = SerializeAsTokenContext(emptyList<Any>(), storageKryo())
|
||||
val context = serializeAsTokenContext(emptyList<Any>())
|
||||
SerializeAsTokenSerializer.setContext(kryo, context)
|
||||
val serializedBytes = tokenizableBefore.toToken(SerializeAsTokenContext(emptyList<Any>(), storageKryo())).serialize(kryo)
|
||||
val serializedBytes = tokenizableBefore.toToken(serializeAsTokenContext(emptyList<Any>())).serialize(kryo)
|
||||
serializedBytes.deserialize(kryo)
|
||||
}
|
||||
|
||||
@ -85,7 +91,7 @@ class SerializationTokenTest {
|
||||
@Test(expected = KryoException::class)
|
||||
fun `deserialize non-token`() {
|
||||
val tokenizableBefore = UnitSerializeAsToken()
|
||||
val context = SerializeAsTokenContext(tokenizableBefore, storageKryo())
|
||||
val context = serializeAsTokenContext(tokenizableBefore)
|
||||
SerializeAsTokenSerializer.setContext(kryo, context)
|
||||
val stream = ByteArrayOutputStream()
|
||||
Output(stream).use {
|
||||
@ -107,7 +113,7 @@ class SerializationTokenTest {
|
||||
@Test(expected = KryoException::class)
|
||||
fun `token returns unexpected type`() {
|
||||
val tokenizableBefore = WrongTypeSerializeAsToken()
|
||||
val context = SerializeAsTokenContext(tokenizableBefore, storageKryo())
|
||||
val context = serializeAsTokenContext(tokenizableBefore)
|
||||
SerializeAsTokenSerializer.setContext(kryo, context)
|
||||
val serializedBytes = tokenizableBefore.serialize(kryo)
|
||||
serializedBytes.deserialize(kryo)
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.serialization
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.serialization.SingletonSerializationToken
|
||||
import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId
|
||||
@ -15,9 +16,9 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
class NodeClock(private val delegateClock: Clock = Clock.systemUTC()) : Clock(), SerializeAsToken {
|
||||
|
||||
private val token = SingletonSerializationToken(this)
|
||||
private val token = singletonSerializationToken(javaClass)
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
|
||||
override fun toToken(context: SerializeAsTokenContext) = token.registerWithContext(context, this)
|
||||
|
||||
override fun instant(): Instant {
|
||||
return delegateClock.instant()
|
||||
|
@ -5,6 +5,7 @@ import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.hash.Hashing
|
||||
import com.google.common.hash.HashingInputStream
|
||||
import com.google.common.io.CountingInputStream
|
||||
import net.corda.core.contracts.AbstractAttachment
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.createDirectory
|
||||
import net.corda.core.crypto.SecureHash
|
||||
@ -13,6 +14,9 @@ import net.corda.core.extractZipFile
|
||||
import net.corda.core.isDirectory
|
||||
import net.corda.core.node.services.AttachmentStorage
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationToken
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.AcceptsFileUpload
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
@ -82,38 +86,31 @@ class NodeAttachmentService(override var storePath: Path, dataSourceProperties:
|
||||
}
|
||||
}
|
||||
|
||||
private class AttachmentImpl(override val id: SecureHash,
|
||||
private val attachment: ByteArray,
|
||||
private val checkOnLoad: Boolean) : Attachment {
|
||||
private class AttachmentImpl(override val id: SecureHash, dataLoader: () -> ByteArray, private val checkOnLoad: Boolean) : AbstractAttachment(dataLoader), SerializeAsToken {
|
||||
override fun open(): InputStream {
|
||||
|
||||
val stream = ByteArrayInputStream(attachment)
|
||||
|
||||
val stream = super.open()
|
||||
// This is just an optional safety check. If it slows things down too much it can be disabled.
|
||||
if (id is SecureHash.SHA256 && checkOnLoad)
|
||||
return HashCheckingStream(id, attachment.size, stream)
|
||||
|
||||
return stream
|
||||
return if (checkOnLoad && id is SecureHash.SHA256) HashCheckingStream(id, attachmentData.size, stream) else stream
|
||||
}
|
||||
|
||||
override fun equals(other: Any?) = other is Attachment && other.id == id
|
||||
override fun hashCode(): Int = id.hashCode()
|
||||
private class Token(private val id: SecureHash, private val checkOnLoad: Boolean) : SerializationToken {
|
||||
override fun fromToken(context: SerializeAsTokenContext) = AttachmentImpl(id, context.attachmentDataLoader(id), checkOnLoad)
|
||||
}
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = Token(id, checkOnLoad)
|
||||
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): Attachment? {
|
||||
val attachment = session.withTransaction {
|
||||
try {
|
||||
session.select(AttachmentEntity::class)
|
||||
.where(AttachmentEntity.ATT_ID.eq(id))
|
||||
.get()
|
||||
.single()
|
||||
} catch (e: NoSuchElementException) {
|
||||
null
|
||||
}
|
||||
} ?: return null
|
||||
|
||||
return AttachmentImpl(id, attachment.content, checkAttachmentsOnLoad)
|
||||
}
|
||||
override fun openAttachment(id: SecureHash): Attachment? = session.withTransaction {
|
||||
try {
|
||||
session.select(AttachmentEntity::class)
|
||||
.where(AttachmentEntity.ATT_ID.eq(id))
|
||||
.get()
|
||||
.single()
|
||||
} catch (e: NoSuchElementException) {
|
||||
null
|
||||
}
|
||||
}?.run { AttachmentImpl(id, { content }, checkAttachmentsOnLoad) }
|
||||
|
||||
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
|
||||
override fun importAttachment(jar: InputStream): SecureHash {
|
||||
|
@ -151,7 +151,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
|
||||
|
||||
// Context for tokenized services in checkpoints
|
||||
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo())
|
||||
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo(), serviceHub)
|
||||
|
||||
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
|
||||
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.utilities
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.serialization.SingletonSerializationToken
|
||||
import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken
|
||||
import java.time.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
@ -12,9 +13,9 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock(), SerializeAsToken {
|
||||
|
||||
private val token = SingletonSerializationToken(this)
|
||||
private val token = singletonSerializationToken(javaClass)
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
|
||||
override fun toToken(context: SerializeAsTokenContext) = token.registerWithContext(context, this)
|
||||
|
||||
@Synchronized fun updateDate(date: LocalDate): Boolean {
|
||||
val currentDate = LocalDate.now(this)
|
||||
|
@ -3,6 +3,7 @@ package net.corda.testing.node
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.serialization.SingletonSerializationToken
|
||||
import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken
|
||||
import net.corda.node.utilities.MutableClock
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
@ -17,9 +18,9 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock(), SerializeAsToken {
|
||||
|
||||
private val token = SingletonSerializationToken(this)
|
||||
private val token = singletonSerializationToken(javaClass)
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
|
||||
override fun toToken(context: SerializeAsTokenContext) = token.registerWithContext(context, this)
|
||||
|
||||
/**
|
||||
* Advance this [Clock] by the specified [Duration] for testing purposes.
|
||||
|
Loading…
x
Reference in New Issue
Block a user