AttachmentCriteriaQuery class and infrastructure (#2022)

* Attachments metadata support
This commit is contained in:
Maksymilian Pawlak
2017-11-14 10:22:02 +00:00
committed by GitHub
parent 2a961b8e2c
commit 1a02c9a74f
13 changed files with 593 additions and 138 deletions

View File

@ -13,13 +13,13 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.rpcContext
@ -177,6 +177,25 @@ internal class CordaRPCOpsImpl(
}
}
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader:String, filename:String): SecureHash {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.attachments.importAttachment(jar, uploader, filename)
}
}
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
try {
return database.transaction {
services.attachments.queryAttachments(query, sorting)
}
} catch (e: Exception) {
// log and rethrow exception so we keep a copy server side
log.error(e.message)
throw e.cause ?: e
}
}
override fun currentNodeTime(): Instant = Instant.now(services.clock)
override fun waitUntilNetworkReady(): CordaFuture<Void?> = services.networkMapCache.nodeReady
@ -264,5 +283,6 @@ internal class CordaRPCOpsImpl(
is StateMachineManager.Change.Removed -> StateMachineUpdate.Removed(change.logic.runId, change.result)
}
}
private val log = loggerFor<CordaRPCOpsImpl>()
}
}

View File

@ -10,11 +10,10 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.NodeState
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.*
import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.messaging.requireEitherPermission
import rx.Observable
@ -24,6 +23,14 @@ import java.security.PublicKey
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val context: () -> RpcContext, private val permissionsAllowing: (methodName: String, args: List<Any?>) -> Set<String>) : CordaRPCOps {
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash = guard("uploadAttachmentWithMetadata") {
implementation.uploadAttachmentWithMetadata(jar, uploader, filename)
}
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> = guard("queryAttachments") {
implementation.queryAttachments(query, sorting)
}
override fun stateMachinesSnapshot() = guard("stateMachinesSnapshot") {
implementation.stateMachinesSnapshot()
}

View File

@ -7,19 +7,26 @@ import com.google.common.hash.Hashing
import com.google.common.hash.HashingInputStream
import com.google.common.io.CountingInputStream
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.*
import net.corda.core.internal.AbstractAttachment
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.vault.*
import net.corda.core.serialization.*
import net.corda.core.utilities.loggerFor
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.currentDBSession
import java.io.*
import java.lang.Exception
import java.nio.file.Paths
import java.time.Instant
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.Column
/**
* Stores attachments using Hibernate to database.
@ -37,7 +44,16 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
@Column(name = "content")
@Lob
var content: ByteArray
var content: ByteArray,
@Column(name = "insertion_date", nullable = false, updatable = false)
var insertionDate: Instant = Instant.now(),
@Column(name = "uploader", updatable = false)
var uploader: String? = null,
@Column(name = "filename", updatable = false)
var filename: String? = null
) : Serializable
companion object {
@ -147,8 +163,16 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
return null
}
override fun importAttachment(jar: InputStream): AttachmentId {
return import(jar, null, null)
}
override fun importAttachment(jar: InputStream, uploader: String, filename: String): AttachmentId {
return import(jar, uploader, filename)
}
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
override fun importAttachment(jar: InputStream): SecureHash {
private fun import(jar: InputStream, uploader: String?, filename: String?): AttachmentId {
require(jar !is JarInputStream)
// Read the file into RAM, hashing it to find the ID as we go. The attachment must fit into memory.
@ -169,7 +193,7 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
criteriaQuery.where(criteriaBuilder.equal(attachments.get<String>(DBAttachment::attId.name), id.toString()))
val count = session.createQuery(criteriaQuery).singleResult
if (count == 0L) {
val attachment = NodeAttachmentService.DBAttachment(attId = id.toString(), content = bytes)
val attachment = NodeAttachmentService.DBAttachment(attId = id.toString(), content = bytes, uploader = uploader, filename = filename)
session.save(attachment)
attachmentCount.inc()
@ -179,6 +203,30 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
return id
}
override fun queryAttachments(criteria: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
log.info("Attachment query criteria: $criteria, sorting: $sorting")
val session = DatabaseTransactionManager.current().session
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBAttachment::class.java)
val root = criteriaQuery.from(DBAttachment::class.java)
val criteriaParser = HibernateAttachmentQueryCriteriaParser(criteriaBuilder, criteriaQuery, root)
// parse criteria and build where predicates
criteriaParser.parse(criteria, sorting)
// prepare query for execution
val query = session.createQuery(criteriaQuery)
// execution
val results = query.resultList
return results.map { AttachmentId.parse(it.attId) }
}
private fun checkIsAValidJAR(stream: InputStream) {
// Just iterate over the entries with verification enabled: should be good enough to catch mistakes.
// Note that JarInputStream won't throw any kind of error at all if the file stream is in fact not

View File

@ -14,19 +14,161 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.toHexString
import net.corda.core.utilities.trace
import net.corda.node.services.persistence.NodeAttachmentService
import org.hibernate.query.criteria.internal.expression.LiteralExpression
import org.hibernate.query.criteria.internal.predicate.ComparisonPredicate
import org.hibernate.query.criteria.internal.predicate.InPredicate
import java.time.Instant
import java.util.*
import javax.persistence.Tuple
import javax.persistence.criteria.*
abstract class AbstractQueryCriteriaParser<Q : GenericQueryCriteria<Q,P>, in P: BaseQueryCriteriaParser<Q, P, S>, in S: BaseSort> : BaseQueryCriteriaParser<Q, P, S> {
abstract val criteriaBuilder: CriteriaBuilder
override fun parseOr(left: Q, right: Q): Collection<Predicate> {
val predicateSet = mutableSetOf<Predicate>()
val leftPredicates = parse(left)
val rightPredicates = parse(right)
val orPredicate = criteriaBuilder.or(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())
predicateSet.add(orPredicate)
return predicateSet
}
override fun parseAnd(left: Q, right: Q): Collection<Predicate> {
val predicateSet = mutableSetOf<Predicate>()
val leftPredicates = parse(left)
val rightPredicates = parse(right)
val andPredicate = criteriaBuilder.and(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())
predicateSet.add(andPredicate)
return predicateSet
}
protected fun columnPredicateToPredicate(column: Path<out Any?>, columnPredicate: ColumnPredicate<*>): Predicate {
return when (columnPredicate) {
is ColumnPredicate.EqualityComparison -> {
val literal = columnPredicate.rightLiteral
when (columnPredicate.operator) {
EqualityComparisonOperator.EQUAL -> criteriaBuilder.equal(column, literal)
EqualityComparisonOperator.NOT_EQUAL -> criteriaBuilder.notEqual(column, literal)
}
}
is ColumnPredicate.BinaryComparison -> {
val literal: Comparable<Any?>? = uncheckedCast(columnPredicate.rightLiteral)
@Suppress("UNCHECKED_CAST")
column as Path<Comparable<Any?>?>
when (columnPredicate.operator) {
BinaryComparisonOperator.GREATER_THAN -> criteriaBuilder.greaterThan(column, literal)
BinaryComparisonOperator.GREATER_THAN_OR_EQUAL -> criteriaBuilder.greaterThanOrEqualTo(column, literal)
BinaryComparisonOperator.LESS_THAN -> criteriaBuilder.lessThan(column, literal)
BinaryComparisonOperator.LESS_THAN_OR_EQUAL -> criteriaBuilder.lessThanOrEqualTo(column, literal)
}
}
is ColumnPredicate.Likeness -> {
@Suppress("UNCHECKED_CAST")
column as Path<String?>
when (columnPredicate.operator) {
LikenessOperator.LIKE -> criteriaBuilder.like(column, columnPredicate.rightLiteral)
LikenessOperator.NOT_LIKE -> criteriaBuilder.notLike(column, columnPredicate.rightLiteral)
}
}
is ColumnPredicate.CollectionExpression -> {
when (columnPredicate.operator) {
CollectionOperator.IN -> column.`in`(columnPredicate.rightLiteral)
CollectionOperator.NOT_IN -> criteriaBuilder.not(column.`in`(columnPredicate.rightLiteral))
}
}
is ColumnPredicate.Between -> {
@Suppress("UNCHECKED_CAST")
column as Path<Comparable<Any?>?>
val fromLiteral: Comparable<Any?>? = uncheckedCast(columnPredicate.rightFromLiteral)
val toLiteral: Comparable<Any?>? = uncheckedCast(columnPredicate.rightToLiteral)
criteriaBuilder.between(column, fromLiteral, toLiteral)
}
is ColumnPredicate.NullExpression -> {
when (columnPredicate.operator) {
NullOperator.IS_NULL -> criteriaBuilder.isNull(column)
NullOperator.NOT_NULL -> criteriaBuilder.isNotNull(column)
}
}
else -> throw VaultQueryException("Not expecting $columnPredicate")
}
}
}
class HibernateAttachmentQueryCriteriaParser(override val criteriaBuilder: CriteriaBuilder,
private val criteriaQuery: CriteriaQuery<NodeAttachmentService.DBAttachment>, val root: Root<NodeAttachmentService.DBAttachment>) :
AbstractQueryCriteriaParser<AttachmentQueryCriteria, AttachmentsQueryCriteriaParser, AttachmentSort>(), AttachmentsQueryCriteriaParser {
private companion object {
val log = loggerFor<HibernateAttachmentQueryCriteriaParser>()
}
init {
criteriaQuery.select(root)
}
override fun parse(criteria: AttachmentQueryCriteria, sorting: AttachmentSort?): Collection<Predicate> {
val predicateSet = criteria.visit(this)
sorting?.let {
if (sorting.columns.isNotEmpty())
parse(sorting)
}
criteriaQuery.where(*predicateSet.toTypedArray())
return predicateSet
}
private fun parse(sorting: AttachmentSort) {
log.trace { "Parsing sorting specification: $sorting" }
val orderCriteria = mutableListOf<Order>()
sorting.columns.map { (sortAttribute, direction) ->
when (direction) {
Sort.Direction.ASC -> orderCriteria.add(criteriaBuilder.asc(root.get<String>(sortAttribute.columnName)))
Sort.Direction.DESC -> orderCriteria.add(criteriaBuilder.desc(root.get<String>(sortAttribute.columnName)))
}
}
if (orderCriteria.isNotEmpty()) {
criteriaQuery.orderBy(orderCriteria)
}
}
override fun parseCriteria(criteria: AttachmentQueryCriteria.AttachmentsQueryCriteria): Collection<Predicate> {
log.trace { "Parsing AttachmentsQueryCriteria: $criteria" }
val predicateSet = mutableSetOf<Predicate>()
criteria.filenameCondition?.let {
predicateSet.add(columnPredicateToPredicate(root.get<String>("filename"), it))
}
criteria.uploaderCondition?.let {
predicateSet.add(columnPredicateToPredicate(root.get<String>("uploader"), it))
}
criteria.uploadDateCondition?.let {
predicateSet.add(columnPredicateToPredicate(root.get<Instant>("upload_date"), it))
}
return predicateSet
}
}
class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractState>,
val contractStateTypeMappings: Map<String, Set<String>>,
val criteriaBuilder: CriteriaBuilder,
override val criteriaBuilder: CriteriaBuilder,
val criteriaQuery: CriteriaQuery<Tuple>,
val vaultStates: Root<VaultSchemaV1.VaultStates>) : IQueryCriteriaParser {
val vaultStates: Root<VaultSchemaV1.VaultStates>) : AbstractQueryCriteriaParser<QueryCriteria, IQueryCriteriaParser, Sort>(), IQueryCriteriaParser {
private companion object {
val log = loggerFor<HibernateQueryCriteriaParser>()
}
@ -102,56 +244,6 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
}
}
private fun columnPredicateToPredicate(column: Path<out Any?>, columnPredicate: ColumnPredicate<*>): Predicate {
return when (columnPredicate) {
is ColumnPredicate.EqualityComparison -> {
val literal = columnPredicate.rightLiteral
when (columnPredicate.operator) {
EqualityComparisonOperator.EQUAL -> criteriaBuilder.equal(column, literal)
EqualityComparisonOperator.NOT_EQUAL -> criteriaBuilder.notEqual(column, literal)
}
}
is ColumnPredicate.BinaryComparison -> {
val literal: Comparable<Any?>? = uncheckedCast(columnPredicate.rightLiteral)
@Suppress("UNCHECKED_CAST")
column as Path<Comparable<Any?>?>
when (columnPredicate.operator) {
BinaryComparisonOperator.GREATER_THAN -> criteriaBuilder.greaterThan(column, literal)
BinaryComparisonOperator.GREATER_THAN_OR_EQUAL -> criteriaBuilder.greaterThanOrEqualTo(column, literal)
BinaryComparisonOperator.LESS_THAN -> criteriaBuilder.lessThan(column, literal)
BinaryComparisonOperator.LESS_THAN_OR_EQUAL -> criteriaBuilder.lessThanOrEqualTo(column, literal)
}
}
is ColumnPredicate.Likeness -> {
@Suppress("UNCHECKED_CAST")
column as Path<String?>
when (columnPredicate.operator) {
LikenessOperator.LIKE -> criteriaBuilder.like(column, columnPredicate.rightLiteral)
LikenessOperator.NOT_LIKE -> criteriaBuilder.notLike(column, columnPredicate.rightLiteral)
}
}
is ColumnPredicate.CollectionExpression -> {
when (columnPredicate.operator) {
CollectionOperator.IN -> column.`in`(columnPredicate.rightLiteral)
CollectionOperator.NOT_IN -> criteriaBuilder.not(column.`in`(columnPredicate.rightLiteral))
}
}
is ColumnPredicate.Between -> {
@Suppress("UNCHECKED_CAST")
column as Path<Comparable<Any?>?>
val fromLiteral: Comparable<Any?>? = uncheckedCast(columnPredicate.rightFromLiteral)
val toLiteral: Comparable<Any?>? = uncheckedCast(columnPredicate.rightToLiteral)
criteriaBuilder.between(column, fromLiteral, toLiteral)
}
is ColumnPredicate.NullExpression -> {
when (columnPredicate.operator) {
NullOperator.IS_NULL -> criteriaBuilder.isNull(column)
NullOperator.NOT_NULL -> criteriaBuilder.isNotNull(column)
}
}
else -> throw VaultQueryException("Not expecting $columnPredicate")
}
}
private fun <O> parseExpression(entityRoot: Root<O>, expression: CriteriaExpression<O, Boolean>, predicateSet: MutableSet<Predicate>) {
if (expression is CriteriaExpression.AggregateFunctionExpression<O, *>) {
@ -326,32 +418,6 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
return predicateSet
}
override fun parseOr(left: QueryCriteria, right: QueryCriteria): Collection<Predicate> {
log.trace { "Parsing OR QueryCriteria composition: $left OR $right" }
val predicateSet = mutableSetOf<Predicate>()
val leftPredicates = parse(left)
val rightPredicates = parse(right)
val orPredicate = criteriaBuilder.or(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())
predicateSet.add(orPredicate)
return predicateSet
}
override fun parseAnd(left: QueryCriteria, right: QueryCriteria): Collection<Predicate> {
log.trace { "Parsing AND QueryCriteria composition: $left AND $right" }
val predicateSet = mutableSetOf<Predicate>()
val leftPredicates = parse(left)
val rightPredicates = parse(right)
val andPredicate = criteriaBuilder.and(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())
predicateSet.add(andPredicate)
return predicateSet
}
override fun parse(criteria: QueryCriteria, sorting: Sort?): Collection<Predicate> {
val predicateSet = criteria.visit(this)

View File

@ -9,6 +9,10 @@ import net.corda.core.internal.read
import net.corda.core.internal.readAll
import net.corda.core.internal.write
import net.corda.core.internal.writeLines
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
@ -51,8 +55,7 @@ class NodeAttachmentStorageTest {
@Test
fun `insert and retrieve`() {
val testJar = makeTestJar()
val expectedHash = testJar.readAll().sha256()
val (testJar,expectedHash) = makeTestJar()
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
@ -77,10 +80,87 @@ class NodeAttachmentStorageTest {
}
}
@Test
fun `metadata can be used to search`() {
val (jarA,hashA) = makeTestJar()
val (jarB,hashB) = makeTestJar(listOf(Pair("file","content")))
val (jarC,hashC) = makeTestJar(listOf(Pair("magic_file","magic_content_puff")))
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
jarA.read { storage.importAttachment(it) }
jarB.read { storage.importAttachment(it, "uploaderB", "fileB.zip") }
jarC.read { storage.importAttachment(it, "uploaderC", "fileC.zip") }
assertEquals(
listOf(hashB),
storage.queryAttachments( AttachmentQueryCriteria.AttachmentsQueryCriteria( Builder.equal("uploaderB")))
)
assertEquals (
listOf(hashB, hashC),
storage.queryAttachments( AttachmentQueryCriteria.AttachmentsQueryCriteria( Builder.like ("%uploader%")))
)
}
}
@Test
fun `sorting and compound conditions work`() {
val (jarA,hashA) = makeTestJar(listOf(Pair("a","a")))
val (jarB,hashB) = makeTestJar(listOf(Pair("b","b")))
val (jarC,hashC) = makeTestJar(listOf(Pair("c","c")))
fun uploaderCondition(s:String) = AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal(s))
fun filenamerCondition(s:String) = AttachmentQueryCriteria.AttachmentsQueryCriteria(filenameCondition = Builder.equal(s))
fun filenameSort(direction: Sort.Direction) = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.FILENAME, direction)))
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
jarA.read { storage.importAttachment(it, "complexA", "archiveA.zip") }
jarB.read { storage.importAttachment(it, "complexB", "archiveB.zip") }
jarC.read { storage.importAttachment(it, "complexC", "archiveC.zip") }
// DOCSTART AttachmentQueryExample1
assertEquals(
emptyList(),
storage.queryAttachments(
AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal("complexA"))
.and(AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal("complexB"))))
)
assertEquals(
listOf(hashA, hashB),
storage.queryAttachments(
AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal("complexA"))
.or(AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal("complexB"))))
)
val complexCondition =
(uploaderCondition("complexB").and(filenamerCondition("archiveB.zip"))).or(filenamerCondition("archiveC.zip"))
// DOCEND AttachmentQueryExample1
assertEquals (
listOf(hashB, hashC),
storage.queryAttachments(complexCondition, sorting = filenameSort(Sort.Direction.ASC))
)
assertEquals (
listOf(hashC, hashB),
storage.queryAttachments(complexCondition, sorting = filenameSort(Sort.Direction.DESC))
)
}
}
@Ignore("We need to be able to restart nodes - make importing attachments idempotent?")
@Test
fun `duplicates not allowed`() {
val testJar = makeTestJar()
val (testJar,_) = makeTestJar()
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
testJar.read {
@ -96,7 +176,7 @@ class NodeAttachmentStorageTest {
@Test
fun `corrupt entry throws exception`() {
val testJar = makeTestJar()
val (testJar,_) = makeTestJar()
val id = database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val id = testJar.read { storage.importAttachment(it) }
@ -139,7 +219,7 @@ class NodeAttachmentStorageTest {
}
private var counter = 0
private fun makeTestJar(): Path {
private fun makeTestJar(extraEntries: List<Pair<String,String>> = emptyList()): Pair<Path, SecureHash> {
counter++
val file = fs.getPath("$counter.jar")
file.write {
@ -149,8 +229,12 @@ class NodeAttachmentStorageTest {
jar.closeEntry()
jar.putNextEntry(JarEntry("test2.txt"))
jar.write("Some more useful content".toByteArray())
extraEntries.forEach {
jar.putNextEntry(JarEntry(it.first))
jar.write(it.second.toByteArray())
}
jar.closeEntry()
}
return file
return Pair(file, file.readAll().sha256())
}
}