CORDA-2083 Deserialize component groups lazily (#4122)

CORDA-2083 Deserialize component groups lazily
This commit is contained in:
Tudor Malene 2018-11-01 16:54:31 +00:00 committed by GitHub
parent b0771d6f2b
commit 4e0a956e20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 125 additions and 36 deletions

View File

@ -57,3 +57,16 @@ fun Class<out FlowLogic<*>>.isIdempotentFlow(): Boolean {
internal fun SignedTransaction.pushToLoggingContext() {
MDC.put("tx_id", id.toString())
}
/**
* List implementation that applies the expensive [transform] function only when the element is accessed and caches calculated values.
* Size is very cheap as it doesn't call [transform].
*/
class LazyMappedList<T, U>(val originalList: List<T>, val transform: (T, Int) -> U) : AbstractList<U>() {
private val partialResolvedList = MutableList<U?>(originalList.size) { null }
override val size = originalList.size
override fun get(index: Int) = partialResolvedList[index]
?: transform(originalList[index], index).also { computed -> partialResolvedList[index] = computed }
}

View File

@ -6,9 +6,11 @@ import net.corda.core.contracts.*
import net.corda.core.contracts.ComponentGroupEnum.*
import net.corda.core.crypto.*
import net.corda.core.identity.Party
import net.corda.core.internal.LazyMappedList
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.*
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.lazyMapped
import java.security.PublicKey
import java.util.function.Predicate
import kotlin.reflect.KClass
@ -69,21 +71,29 @@ abstract class TraversableTransaction(open val componentGroups: List<ComponentGr
private fun <T : Any> deserialiseComponentGroup(clazz: KClass<T>,
groupEnum: ComponentGroupEnum,
attachmentsContext: Boolean = false): List<T> {
val group = componentGroups.firstOrNull { it.groupIndex == groupEnum.ordinal }
if (group == null || group.components.isEmpty()) {
return emptyList()
}
// If the componentGroup is a [LazyMappedList] it means that the original deserialized version is already available.
val components = group.components
if (components is LazyMappedList<*, OpaqueBytes>) {
return components.originalList as List<T>
}
val factory = SerializationFactory.defaultFactory
val context = factory.defaultContext.let { if (attachmentsContext) it.withAttachmentsClassLoader(attachments) else it }
val group = componentGroups.firstOrNull { it.groupIndex == groupEnum.ordinal }
return if (group != null && group.components.isNotEmpty()) {
group.components.mapIndexed { internalIndex, component ->
try {
factory.deserialize(component, clazz.java, context)
} catch (e: MissingAttachmentsException) {
throw e
} catch (e: Exception) {
throw Exception("Malformed transaction, $groupEnum at index $internalIndex cannot be deserialised", e)
}
return components.lazyMapped { component, internalIndex ->
try {
factory.deserialize(component, clazz.java , context)
} catch (e: MissingAttachmentsException) {
throw e
} catch (e: Exception) {
throw Exception("Malformed transaction, $groupEnum at index $internalIndex cannot be deserialised", e)
}
} else {
emptyList()
}
}
@ -105,14 +115,14 @@ abstract class TraversableTransaction(open val componentGroups: List<ComponentGr
val leafIndices = componentHashes.map { group.partialMerkleTree.leafIndex(it) }
if (leafIndices.isNotEmpty())
check(leafIndices.max()!! < signersList.size) { "Invalid Transaction. A command with no corresponding signer detected" }
commandDataList.mapIndexed { index, commandData -> Command(commandData, signersList[leafIndices[index]]) }
commandDataList.lazyMapped { commandData, index -> Command(commandData, signersList[leafIndices[index]]) }
} else {
// It is a WireTransaction
// or a FilteredTransaction with no Commands (in which case group is null).
check(commandDataList.size == signersList.size) {
"Invalid Transaction. Sizes of CommandData (${commandDataList.size}) and Signers (${signersList.size}) do not match"
}
commandDataList.mapIndexed { index, commandData -> Command(commandData, signersList[index]) }
commandDataList.lazyMapped { commandData, index -> Command(commandData, signersList[index]) }
}
}
}
@ -335,7 +345,7 @@ class FilteredTransaction internal constructor(
private fun expectedNumOfCommands(publicKey: PublicKey, commandSigners: ComponentGroup?): Int {
checkAllComponentsVisible(SIGNERS_GROUP)
if (commandSigners == null) return 0
fun signersKeys (internalIndex: Int, opaqueBytes: OpaqueBytes): List<PublicKey> {
fun signersKeys(internalIndex: Int, opaqueBytes: OpaqueBytes): List<PublicKey> {
try {
return SerializedBytes<List<PublicKey>>(opaqueBytes.bytes).deserialize()
} catch (e: Exception) {
@ -344,7 +354,7 @@ class FilteredTransaction internal constructor(
}
return commandSigners.components
.mapIndexed { internalIndex, opaqueBytes -> signersKeys(internalIndex, opaqueBytes) }
.mapIndexed { internalIndex, opaqueBytes -> signersKeys(internalIndex, opaqueBytes) }
.filter { signers -> publicKey in signers }.size
}

View File

@ -14,6 +14,7 @@ import net.corda.core.node.services.AttachmentId
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.serialize
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.lazyMapped
import java.security.PublicKey
import java.security.SignatureException
import java.util.function.Predicate
@ -50,7 +51,8 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
@Deprecated("Required only in some unit-tests and for backwards compatibility purposes.", ReplaceWith("WireTransaction(val componentGroups: List<ComponentGroup>, override val privacySalt: PrivacySalt)"), DeprecationLevel.WARNING)
@DeleteForDJVM
@JvmOverloads constructor(
@JvmOverloads
constructor(
inputs: List<StateRef>,
attachments: List<SecureHash>,
outputs: List<TransactionState<ContractState>>,
@ -127,17 +129,19 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
networkParameters: NetworkParameters?
): LedgerTransaction {
// Look up public keys to authenticated identities.
val authenticatedArgs = commands.map {
val parties = it.signers.mapNotNull { pk -> resolveIdentity(pk) }
CommandWithParties(it.signers, parties, it.value)
val authenticatedArgs = commands.lazyMapped { cmd, _ ->
val parties = cmd.signers.mapNotNull { pk -> resolveIdentity(pk) }
CommandWithParties(cmd.signers, parties, cmd.value)
}
val resolvedInputs = inputs.map { ref ->
val resolvedInputs = inputs.lazyMapped { ref, _ ->
resolveStateRef(ref)?.let { StateAndRef(it, ref) } ?: throw TransactionResolutionException(ref.txhash)
}
val resolvedReferences = references.map { ref ->
val resolvedReferences = references.lazyMapped { ref, _ ->
resolveStateRef(ref)?.let { StateAndRef(it, ref) } ?: throw TransactionResolutionException(ref.txhash)
}
val attachments = attachments.map { resolveAttachment(it) ?: throw AttachmentResolutionException(it) }
val attachments = attachments.lazyMapped { att, _ ->
resolveAttachment(att) ?: throw AttachmentResolutionException(att)
}
val ltx = LedgerTransaction(resolvedInputs, outputs, authenticatedArgs, attachments, id, notary, timeWindow, privacySalt, networkParameters, resolvedReferences)
checkTransactionSize(ltx, networkParameters?.maxTransactionSize ?: 10485760)
return ltx
@ -151,13 +155,22 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
remainingTransactionSize -= size
}
// This calculates a value that is slightly lower than the actual re-serialized version. But it is stable and does not depend on the classloader.
fun componentGroupSize(componentGroup: ComponentGroupEnum): Int {
return this.componentGroups.firstOrNull { it.groupIndex == componentGroup.ordinal }?.let { cg -> cg.components.sumBy { it.size } + 4 } ?: 0
}
// Check attachments size first as they are most likely to go over the limit. With ContractAttachment instances
// it's likely that the same underlying Attachment CorDapp will occur more than once so we dedup on the attachment id.
ltx.attachments.distinctBy { it.id }.forEach { minus(it.size) }
// TODO - these can be optimized by creating a LazyStateAndRef class, that just stores (a pointer) the serialized output componentGroup from the previous transaction.
minus(ltx.references.serialize().size)
minus(ltx.inputs.serialize().size)
minus(ltx.commands.serialize().size)
minus(ltx.outputs.serialize().size)
// For Commands and outputs we can use the component groups as they are already serialized.
minus(componentGroupSize(COMMANDS_GROUP))
minus(componentGroupSize(OUTPUTS_GROUP))
}
/**
@ -253,18 +266,19 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
notary: Party?,
timeWindow: TimeWindow?,
references: List<StateRef> = emptyList()): List<ComponentGroup> {
val serialize = { value: Any, _: Int -> value.serialize() }
val componentGroupMap: MutableList<ComponentGroup> = mutableListOf()
if (inputs.isNotEmpty()) componentGroupMap.add(ComponentGroup(INPUTS_GROUP.ordinal, inputs.map { it.serialize() }))
if (references.isNotEmpty()) componentGroupMap.add(ComponentGroup(REFERENCES_GROUP.ordinal, references.map { it.serialize() }))
if (outputs.isNotEmpty()) componentGroupMap.add(ComponentGroup(OUTPUTS_GROUP.ordinal, outputs.map { it.serialize() }))
if (inputs.isNotEmpty()) componentGroupMap.add(ComponentGroup(INPUTS_GROUP.ordinal, inputs.lazyMapped(serialize)))
if (references.isNotEmpty()) componentGroupMap.add(ComponentGroup(REFERENCES_GROUP.ordinal, references.lazyMapped(serialize)))
if (outputs.isNotEmpty()) componentGroupMap.add(ComponentGroup(OUTPUTS_GROUP.ordinal, outputs.lazyMapped(serialize)))
// Adding commandData only to the commands group. Signers are added in their own group.
if (commands.isNotEmpty()) componentGroupMap.add(ComponentGroup(COMMANDS_GROUP.ordinal, commands.map { it.value.serialize() }))
if (attachments.isNotEmpty()) componentGroupMap.add(ComponentGroup(ATTACHMENTS_GROUP.ordinal, attachments.map { it.serialize() }))
if (notary != null) componentGroupMap.add(ComponentGroup(NOTARY_GROUP.ordinal, listOf(notary.serialize())))
if (timeWindow != null) componentGroupMap.add(ComponentGroup(TIMEWINDOW_GROUP.ordinal, listOf(timeWindow.serialize())))
if (commands.isNotEmpty()) componentGroupMap.add(ComponentGroup(COMMANDS_GROUP.ordinal, commands.map { it.value }.lazyMapped(serialize)))
if (attachments.isNotEmpty()) componentGroupMap.add(ComponentGroup(ATTACHMENTS_GROUP.ordinal, attachments.lazyMapped(serialize)))
if (notary != null) componentGroupMap.add(ComponentGroup(NOTARY_GROUP.ordinal, listOf(notary).lazyMapped(serialize)))
if (timeWindow != null) componentGroupMap.add(ComponentGroup(TIMEWINDOW_GROUP.ordinal, listOf(timeWindow).lazyMapped(serialize)))
// Adding signers to their own group. This is required for command visibility purposes: a party receiving
// a FilteredTransaction can now verify it sees all the commands it should sign.
if (commands.isNotEmpty()) componentGroupMap.add(ComponentGroup(SIGNERS_GROUP.ordinal, commands.map { it.signers.serialize() }))
if (commands.isNotEmpty()) componentGroupMap.add(ComponentGroup(SIGNERS_GROUP.ordinal, commands.map { it.signers }.lazyMapped(serialize)))
return componentGroupMap
}
}

View File

@ -1,8 +1,10 @@
@file:KeepForDJVM
package net.corda.core.utilities
import net.corda.core.DeleteForDJVM
import net.corda.core.KeepForDJVM
import net.corda.core.internal.LazyMappedList
import net.corda.core.internal.concurrent.get
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.CordaSerializable
@ -134,3 +136,9 @@ fun <V> Future<V>.getOrThrow(timeout: Duration? = null): V = try {
} catch (e: ExecutionException) {
throw e.cause!!
}
/**
* Returns a [List] implementation that applies the expensive [transform] function only when an element is accessed and then caches the calculated values.
* Size is very cheap as it doesn't call [transform].
*/
fun <T, U> List<T>.lazyMapped(transform: (T, Int) -> U): List<U> = LazyMappedList(this, transform)

View File

@ -138,7 +138,7 @@ class CompatibleTransactionTests {
timeWindowGroup,
signersGroup
)
assertFails { WireTransaction(componentGroupsB, privacySalt) }
assertFails { WireTransaction(componentGroupsB, privacySalt).attachments.toList() }
// Malformed tx - duplicated component group detected.
val componentGroupsDuplicatedCommands = listOf(

View File

@ -0,0 +1,35 @@
package net.corda.core.utilities
import org.junit.Test
import kotlin.test.assertEquals
class LazyMappedListTest {
@Test
fun `LazyMappedList works`() {
val originalList = (1 until 10).toList()
var callCounter = 0
val lazyList = originalList.lazyMapped { value, _ ->
callCounter++
value * value
}
// No transform called when created.
assertEquals(0, callCounter)
// No transform called when calling 'size'.
assertEquals(9, lazyList.size)
assertEquals(0, callCounter)
// Called once when getting an element.
assertEquals(16, lazyList[3])
assertEquals(1, callCounter)
// Not called again when getting the same element.
assertEquals(16, lazyList[3])
assertEquals(1, callCounter)
}
}

View File

@ -18,6 +18,7 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.AbstractAttachment
import net.corda.core.internal.LazyMappedList
import net.corda.core.internal.readFully
import net.corda.core.serialization.MissingAttachmentsException
import net.corda.core.serialization.SerializationWhitelist
@ -82,6 +83,7 @@ object DefaultKryoCustomizer {
// TODO: re-organise registrations into logical groups before v1.0
register(Arrays.asList("").javaClass, ArraysAsListSerializer())
register(LazyMappedList::class.java, LazyMappedListSerializer)
register(SignedTransaction::class.java, SignedTransactionSerializer)
register(WireTransaction::class.java, WireTransactionSerializer)
register(SerializedBytes::class.java, SerializedBytesSerializer)

View File

@ -12,13 +12,12 @@ import net.corda.core.contracts.PrivacySalt
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.internal.LazyMappedList
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.*
import net.corda.core.utilities.OpaqueBytes
import net.corda.serialization.internal.checkUseCase
import net.corda.serialization.internal.serializationContextKey
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -487,3 +486,11 @@ class ThrowableSerializer<T>(kryo: Kryo, type: Class<T>) : Serializer<Throwable>
private fun Throwable.setSuppressedToSentinel() = suppressedField.set(this, sentinelValue)
}
/** For serializing the utility [LazyMappedList]. It will serialize the fully resolved object.*/
@ThreadSafe
@SuppressWarnings("ALL")
object LazyMappedListSerializer : Serializer<List<*>>() {
override fun write(kryo: Kryo, output: Output, obj: List<*>) = kryo.writeClassAndObject(output, obj.toList())
override fun read(kryo: Kryo, input: Input, type: Class<List<*>>) = kryo.readClassAndObject(input) as List<*>
}