Introduce bulk get methods on IdentityService.kt, AttachmentStorage.kt and AttachmentStorage.kt and then use in FetchDataFlow.kt and DataVendingFlow

This commit is contained in:
rick.parker 2023-10-21 21:13:49 +01:00
parent c626d3a435
commit a3d54f1370
6 changed files with 75 additions and 20 deletions

View File

@ -224,13 +224,13 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
var batchFetchCountExceeded = false
var numSent = 0
payload = when (dataRequest.dataType) {
FetchDataFlow.DataType.TRANSACTION -> dataRequest.hashes.map { txId ->
logger.trace { "Sending: TRANSACTION (dataRequest.hashes.size=${dataRequest.hashes.size})" }
FetchDataFlow.DataType.TRANSACTION -> serviceHub.validatedTransactions.getTransactions(dataRequest.hashes.map { txId ->
if (!authorisedTransactions.isAuthorised(txId)) {
throw FetchDataFlow.IllegalTransactionRequest(txId)
}
val tx = serviceHub.validatedTransactions.getTransaction(txId)
?: throw FetchDataFlow.HashNotFound(txId)
txId
}).map { (txId, stx) ->
val tx = stx ?: throw FetchDataFlow.HashNotFound(txId)
if (idx == otherSessions.size - 1)
authorisedTransactions.removeAuthorised(tx.id)
authorisedTransactions.addAuthorised(getInputTransactions(tx))
@ -240,19 +240,18 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
}
FetchDataFlow.DataType.TRANSACTION_RECOVERY -> throw NotImplementedError("Enterprise only feature")
// Loop on all items returned using dataRequest.hashes.map:
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
FetchDataFlow.DataType.BATCH_TRANSACTION -> serviceHub.validatedTransactions.getTransactions(dataRequest.hashes.map { txId ->
if (!authorisedTransactions.isAuthorised(txId)) {
throw FetchDataFlow.IllegalTransactionRequest(txId)
}
txId
}).map { (txId, stx) ->
logger.trace { "Transaction authorised OK: '$txId'" }
val tx = stx ?: throw FetchDataFlow.HashNotFound(txId)
// Maybe we should not just throw here as it's not recoverable on the client side. Might be better to send a reason code or
// remove the restriction on sending once.
logger.trace { "Transaction authorised OK: '$txId'" }
var serialized: SerializedBytes<SignedTransaction>? = null
if (!batchFetchCountExceeded) {
// Only fetch and serialize if we have not already exceeded the maximum byte count. Once we have, no more fetching
// is required, just reject all additional items.
val tx = serviceHub.validatedTransactions.getTransaction(txId)
?: throw FetchDataFlow.HashNotFound(txId)
logger.trace { "Transaction get OK: '$txId'" }
serialized = tx.serialize()
@ -286,11 +285,11 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
firstItem = false
maybeserialized
} // Batch response loop end
FetchDataFlow.DataType.ATTACHMENT -> dataRequest.hashes.map {
logger.trace { "Sending: Attachments for '$it'" }
serviceHub.attachments.openAttachment(it)?.open()?.readFully()
?: throw FetchDataFlow.HashNotFound(it)
}
FetchDataFlow.DataType.ATTACHMENT -> serviceHub.attachments.openAttachments(dataRequest.hashes)
.map { (id, attachment) ->
logger.trace { "Sending: Attachments for '$id'" }
attachment?.open()?.readFully() ?: throw FetchDataFlow.HashNotFound(id)
}
FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map {
logger.trace { "Sending: Parameters for '$it'" }
(serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it)

View File

@ -12,6 +12,7 @@ import net.corda.core.flows.MaybeSerializedSignedTransaction
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
import net.corda.core.internal.FetchDataFlow.HashNotFound
import net.corda.core.node.NetworkParameters
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.core.node.services.TransactionStatus
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
@ -142,8 +143,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
private fun loadWhatWeHave(): Pair<List<SecureHash>, Set<SecureHash>> {
val fromDisk = ArrayList<SecureHash>()
val toFetch = LinkedHashSet<SecureHash>()
for (txid in requests) {
val stx = load(txid)
loadAll(requests).forEach { (txid, stx) ->
if (stx == null)
toFetch += txid
else
@ -155,7 +155,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
}
private fun loadExpected(ids: List<SecureHash>): List<T> {
val loaded = ids.mapNotNull { load(it) }
val loaded = loadAll(ids).values.filterNotNull()
require(ids.size == loaded.size) {
"Expected to find ${ids.size} items in database but only found ${loaded.size} items"
}
@ -164,6 +164,10 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
protected abstract fun load(txid: SecureHash): T?
protected open fun loadAll(ids: Iterable<SecureHash>): Map<SecureHash, T?> {
return ids.map { it to load(it) }.toMap()
}
protected open fun convert(wire: W): T = uncheckedCast(wire)
@Suppress("ComplexMethod")
@ -228,6 +232,10 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid)
override fun loadAll(ids: Iterable<SecureHash>): Map<SecureHash, Attachment?> {
return serviceHub.attachments.openAttachments(ids)
}
override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire }, uploader)
override fun maybeWriteToDisk(downloaded: List<Attachment>) {
@ -273,6 +281,10 @@ class FetchTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>,
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, dataType) {
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
override fun loadAll(ids: Iterable<SecureHash>): Map<SecureHash, SignedTransaction?> {
return serviceHub.validatedTransactions.getTransactions(ids)
}
}
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession, private val recoveryMode: Boolean = false) :
@ -281,6 +293,16 @@ class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSessi
override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? {
val tranAndStatus = serviceHub.validatedTransactions.getTransactionWithStatus(txid)
return maybeSerializedSignedTransaction(tranAndStatus, txid)
}
override fun loadAll(ids: Iterable<SecureHash>): Map<SecureHash, MaybeSerializedSignedTransaction?> {
return serviceHub.validatedTransactions.getTransactionsWithStatus(ids).mapValues { (txid, tranAndStatus) ->
maybeSerializedSignedTransaction(tranAndStatus, txid)
}
}
private fun maybeSerializedSignedTransaction(tranAndStatus: SignedTransactionWithStatus?, txid: SecureHash): MaybeSerializedSignedTransaction? {
@Suppress("ComplexCondition")
return if (tranAndStatus == null || tranAndStatus.status == TransactionStatus.UNVERIFIED || (!recoveryMode && tranAndStatus.status == TransactionStatus.IN_FLIGHT)) {
null

View File

@ -25,6 +25,15 @@ interface AttachmentStorage {
*/
fun openAttachment(id: AttachmentId): Attachment?
/**
* Bulk form of [openAttachment]. Every input [AttachmentId] is present in the returned map.
*
* From 4.11 onwards.
*/
fun openAttachments(ids: Iterable<AttachmentId>): Map<AttachmentId, Attachment?> {
return ids.map { it to openAttachment(it) }.toMap()
}
/**
* Inserts the given attachment into the store, does *not* close the input stream. This can be an intensive
* operation due to the need to copy the bytes to disk and hash them along the way.

View File

@ -116,6 +116,15 @@ interface IdentityService {
}
}
/**
* Bulk version of [wellKnownPartyFromAnonymous]. Every input [AbstractParty] is present in the returned map.
*
* From 4.11 onwards.
*/
fun wellKnownPartiesFromAnonumous(parties: Iterable<AbstractParty>): Map<AbstractParty, Party?> {
return parties.map { it to wellKnownPartyFromAnonymous(it) }.toMap()
}
/**
* Resolves a (optionally) confidential identity to the corresponding well known identity [Party].
* Convenience method which unwraps the [Party] from the [PartyAndReference] and then resolves the

View File

@ -19,6 +19,15 @@ interface TransactionStorage {
*/
fun getTransaction(id: SecureHash): SignedTransaction?
/**
* Bulk load as per [getTransaction]. All input [SecureHash] will be present in the returned map.
*
* From 4.11 onwards.
*/
fun getTransactions(ids: Iterable<SecureHash>): Map<SecureHash, SignedTransaction?> {
return ids.map { id -> id to getTransaction(id) }.toMap()
}
/**
* Return the transaction with its status for the given [id], or null if no such transaction exists.
*/

View File

@ -33,8 +33,12 @@ import java.math.BigInteger
import java.security.KeyPairGenerator
import java.security.SecureRandom
import java.security.Security
import java.util.*
import kotlin.test.*
import java.util.Random
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.test.fail
/**
* Run tests for cryptographic algorithms.
@ -936,6 +940,7 @@ class CryptoUtilsTest {
@Test(timeout=300_000)
fun `test default SecureRandom uses platformSecureRandom`() {
Assume.assumeFalse(IS_OPENJ9) // See CORDA-4055
Assume.assumeFalse(IS_MAC)
// Note than in Corda, [CordaSecurityProvider] is registered as the first provider.
// Remove [CordaSecurityProvider] in case it is already registered.
@ -955,5 +960,7 @@ class CryptoUtilsTest {
val secureRandomRegisteredFirstCordaProvider = SecureRandom()
assertEquals(PlatformSecureRandomService.algorithm, secureRandomRegisteredFirstCordaProvider.algorithm)
}
private val IS_OPENJ9 = System.getProperty("java.vm.name").toLowerCase().contains("openj9")
private val IS_MAC = System.getProperty("os.name").toLowerCase().contains("mac")
}