mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Simple Attachment Storage implementation using Requery/H2 database
This commit is contained in:
parent
0600bfa061
commit
1e78d6a3a7
@ -204,7 +204,7 @@ inline fun elapsedTime(block: () -> Unit): Duration {
|
||||
val start = System.nanoTime()
|
||||
block()
|
||||
val end = System.nanoTime()
|
||||
return Duration.ofNanos(end-start)
|
||||
return Duration.ofNanos(end - start)
|
||||
}
|
||||
|
||||
// TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError
|
||||
@ -280,13 +280,16 @@ class TransientProperty<out T>(private val initializer: () -> T) {
|
||||
/**
|
||||
* Given a path to a zip file, extracts it to the given directory.
|
||||
*/
|
||||
fun extractZipFile(zipFile: Path, toDirectory: Path) {
|
||||
val normalisedDirectory = toDirectory.normalize().createDirectories()
|
||||
fun extractZipFile(zipFile: Path, toDirectory: Path) = extractZipFile(Files.newInputStream(zipFile), toDirectory)
|
||||
|
||||
zipFile.read {
|
||||
val zip = ZipInputStream(BufferedInputStream(it))
|
||||
/**
|
||||
* Given a zip file input stream, extracts it to the given directory.
|
||||
*/
|
||||
fun extractZipFile(inputStream: InputStream, toDirectory: Path) {
|
||||
val normalisedDirectory = toDirectory.normalize().createDirectories()
|
||||
ZipInputStream(BufferedInputStream(inputStream)).use {
|
||||
while (true) {
|
||||
val e = zip.nextEntry ?: break
|
||||
val e = it.nextEntry ?: break
|
||||
val outPath = (normalisedDirectory / e.name).normalize()
|
||||
|
||||
// Security checks: we should reject a zip that contains tricksy paths that try to escape toDirectory.
|
||||
@ -297,9 +300,9 @@ fun extractZipFile(zipFile: Path, toDirectory: Path) {
|
||||
continue
|
||||
}
|
||||
outPath.write { out ->
|
||||
ByteStreams.copy(zip, out)
|
||||
ByteStreams.copy(it, out)
|
||||
}
|
||||
zip.closeEntry()
|
||||
it.closeEntry()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -394,13 +397,16 @@ private class ObservableToFuture<T>(observable: Observable<T>) : AbstractFuture<
|
||||
override fun onNext(value: T) {
|
||||
set(value)
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable) {
|
||||
setException(e)
|
||||
}
|
||||
|
||||
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
|
||||
subscription.unsubscribe()
|
||||
return super.cancel(mayInterruptIfRunning)
|
||||
}
|
||||
|
||||
override fun onCompleted() {}
|
||||
}
|
||||
|
||||
|
@ -3,11 +3,20 @@ package net.corda.core.node.services
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import java.io.InputStream
|
||||
import java.nio.file.Path
|
||||
|
||||
/**
|
||||
* An attachment store records potentially large binary objects, identified by their hash.
|
||||
*/
|
||||
interface AttachmentStorage {
|
||||
/**
|
||||
* If true, newly inserted attachments will be unzipped to a subdirectory of the [storePath]. This is intended for
|
||||
* human browsing convenience: the attachment itself will still be the file (that is, edits to the extracted directory
|
||||
* will not have any effect).
|
||||
*/
|
||||
var automaticallyExtractAttachments : Boolean
|
||||
var storePath : Path
|
||||
|
||||
/**
|
||||
* Returns a handle to a locally stored attachment, or null if it's not known. The handle can be used to open
|
||||
* a stream for the data, which will be a zip/jar file.
|
||||
|
@ -0,0 +1,28 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.Converter
|
||||
import java.sql.Blob
|
||||
import javax.sql.rowset.serial.SerialBlob
|
||||
|
||||
/**
|
||||
* Converts from a [ByteArray] to a [Blob].
|
||||
*/
|
||||
class BlobConverter : Converter<ByteArray, Blob> {
|
||||
|
||||
override fun getMappedType(): Class<ByteArray> = ByteArray::class.java
|
||||
|
||||
override fun getPersistedType(): Class<Blob> = Blob::class.java
|
||||
|
||||
/**
|
||||
* creates BLOB(INT.MAX) = 2 GB
|
||||
*/
|
||||
override fun getPersistedSize(): Int? = null
|
||||
|
||||
override fun convertToPersisted(value: ByteArray?): Blob? {
|
||||
return value?.let { SerialBlob(value) }
|
||||
}
|
||||
|
||||
override fun convertToMapped(type: Class<out ByteArray>?, value: Blob?): ByteArray? {
|
||||
return value?.getBytes(1, value.length().toInt())
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.Converter
|
||||
import net.corda.core.crypto.SecureHash
|
||||
|
||||
/**
|
||||
* Convert from a [SecureHash] to a [String]
|
||||
*/
|
||||
class SecureHashConverter : Converter<SecureHash, String> {
|
||||
|
||||
override fun getMappedType(): Class<SecureHash> = SecureHash::class.java
|
||||
|
||||
override fun getPersistedType(): Class<String> = String::class.java
|
||||
|
||||
/**
|
||||
* SecureHash consists of 32 bytes which need VARCHAR(64) in hex
|
||||
* TODO: think about other hash widths
|
||||
*/
|
||||
override fun getPersistedSize(): Int? = 64
|
||||
|
||||
override fun convertToPersisted(value: SecureHash?): String? {
|
||||
return value?.toString()
|
||||
}
|
||||
|
||||
override fun convertToMapped(type: Class<out SecureHash>, value: String?): SecureHash? {
|
||||
return value?.let { SecureHash.parse(value) }
|
||||
}
|
||||
}
|
@ -134,13 +134,20 @@ class ResolveTransactionsFlowTest {
|
||||
|
||||
@Test
|
||||
fun attachment() {
|
||||
val id = a.services.storageService.attachments.importAttachment("Some test file".toByteArray().opaque().open())
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
val id = databaseTransaction(a.database) {
|
||||
a.services.storageService.attachments.importAttachment("Some test file".toByteArray().opaque().open())
|
||||
}
|
||||
val stx2 = makeTransactions(withAttachment = id).second
|
||||
val p = ResolveTransactionsFlow(stx2, a.info.legalIdentity)
|
||||
val future = b.services.startFlow(p).resultFuture
|
||||
net.runNetwork()
|
||||
future.getOrThrow()
|
||||
assertNotNull(b.services.storageService.attachments.openAttachment(id))
|
||||
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
databaseTransaction(b.database) {
|
||||
assertNotNull(b.services.storageService.attachments.openAttachment(id))
|
||||
}
|
||||
}
|
||||
|
||||
// DOCSTART 2
|
||||
|
@ -0,0 +1,18 @@
|
||||
package net.corda.node.services.persistence.schemas
|
||||
|
||||
import io.requery.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.schemas.requery.converters.BlobConverter
|
||||
|
||||
@Table(name = "attachments")
|
||||
@Entity(model = "persistence")
|
||||
interface Attachment : Persistable {
|
||||
|
||||
@get:Key
|
||||
@get:Column(name = "att_id", index = true)
|
||||
var attId: SecureHash
|
||||
|
||||
@get:Column(name = "content")
|
||||
@get:Convert(BlobConverter::class)
|
||||
var content: ByteArray
|
||||
}
|
@ -151,6 +151,9 @@ dependencies {
|
||||
compile 'commons-codec:commons-codec:1.10'
|
||||
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
|
||||
|
||||
// Requery: object mapper for Kotlin
|
||||
compile "io.requery:requery-kotlin:$requery_version"
|
||||
|
||||
// Integration test helpers
|
||||
integrationTestCompile "junit:junit:$junit_version"
|
||||
}
|
||||
|
@ -502,7 +502,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
)
|
||||
}
|
||||
|
||||
protected open fun constructStorageService(attachments: NodeAttachmentService,
|
||||
protected open fun constructStorageService(attachments: AttachmentStorage,
|
||||
transactionStorage: TransactionStorage,
|
||||
stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage) =
|
||||
StorageServiceImpl(attachments, transactionStorage, stateMachineRecordedTransactionMappingStorage)
|
||||
@ -547,13 +547,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
|
||||
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
|
||||
|
||||
protected fun makeAttachmentStorage(dir: Path): NodeAttachmentService {
|
||||
protected fun makeAttachmentStorage(dir: Path): AttachmentStorage {
|
||||
val attachmentsDir = dir / "attachments"
|
||||
try {
|
||||
attachmentsDir.createDirectory()
|
||||
} catch (e: FileAlreadyExistsException) {
|
||||
}
|
||||
return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics)
|
||||
return NodeAttachmentService(attachmentsDir, configuration.dataSourceProperties, services.monitoringService.metrics)
|
||||
}
|
||||
|
||||
protected fun createNodeDir() {
|
||||
|
@ -109,9 +109,26 @@ class CordaRPCOpsImpl(
|
||||
)
|
||||
}
|
||||
|
||||
override fun attachmentExists(id: SecureHash) = services.storageService.attachments.openAttachment(id) != null
|
||||
override fun openAttachment(id: SecureHash) = services.storageService.attachments.openAttachment(id)!!.open()
|
||||
override fun uploadAttachment(jar: InputStream) = services.storageService.attachments.importAttachment(jar)
|
||||
override fun attachmentExists(id: SecureHash): Boolean {
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
return databaseTransaction(database){
|
||||
services.storageService.attachments.openAttachment(id) != null
|
||||
}
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): InputStream {
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
return databaseTransaction(database) {
|
||||
services.storageService.attachments.openAttachment(id)!!.open()
|
||||
}
|
||||
}
|
||||
|
||||
override fun uploadAttachment(jar: InputStream): SecureHash {
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
return databaseTransaction(database){
|
||||
services.storageService.attachments.importAttachment(jar)
|
||||
}
|
||||
}
|
||||
override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class<out UpgradedContract<*, *>>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass)
|
||||
override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state)
|
||||
override fun currentNodeTime(): Instant = Instant.now(services.clock)
|
||||
|
@ -9,9 +9,7 @@ import io.requery.sql.*
|
||||
import io.requery.sql.platform.H2
|
||||
import io.requery.util.function.Function
|
||||
import io.requery.util.function.Supplier
|
||||
import net.corda.core.schemas.requery.converters.InstantConverter
|
||||
import net.corda.core.schemas.requery.converters.StateRefConverter
|
||||
import net.corda.core.schemas.requery.converters.VaultStateStatusConverter
|
||||
import net.corda.core.schemas.requery.converters.*
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
@ -69,6 +67,8 @@ class KotlinConfigurationTransactionWrapper(private val model: EntityModel,
|
||||
val vaultStateStatusConverter = VaultStateStatusConverter()
|
||||
customMapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType)
|
||||
customMapping.addConverter(StateRefConverter(), StateRefConverter::getMappedType.javaClass)
|
||||
customMapping.addConverter(SecureHashConverter(), SecureHashConverter::getMappedType.javaClass)
|
||||
|
||||
return customMapping
|
||||
}
|
||||
|
||||
|
@ -5,13 +5,20 @@ import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.hash.Hashing
|
||||
import com.google.common.hash.HashingInputStream
|
||||
import com.google.common.io.CountingInputStream
|
||||
import net.corda.core.*
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.createDirectory
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.div
|
||||
import net.corda.core.extractZipFile
|
||||
import net.corda.core.isDirectory
|
||||
import net.corda.core.node.services.AttachmentStorage
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.AcceptsFileUpload
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.services.persistence.schemas.Models
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.FilterInputStream
|
||||
import java.io.InputStream
|
||||
import java.nio.file.*
|
||||
@ -20,74 +27,69 @@ import java.util.jar.JarInputStream
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* Stores attachments in the specified local directory, which must exist. Doesn't allow new attachments to be uploaded.
|
||||
* Stores attachments in H2 database.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : AttachmentStorage, AcceptsFileUpload {
|
||||
class NodeAttachmentService(override var storePath: Path, dataSourceProperties: Properties, metrics: MetricRegistry) : AttachmentStorage, AcceptsFileUpload {
|
||||
private val log = loggerFor<NodeAttachmentService>()
|
||||
|
||||
val configuration = RequeryConfiguration(dataSourceProperties)
|
||||
val session = configuration.sessionForModel(Models.PERSISTENCE)
|
||||
|
||||
@VisibleForTesting
|
||||
var checkAttachmentsOnLoad = true
|
||||
|
||||
private val attachmentCount = metrics.counter("Attachments")
|
||||
|
||||
init {
|
||||
attachmentCount.inc(countAttachments())
|
||||
}
|
||||
|
||||
// Just count all non-directories in the attachment store, and assume the admin hasn't dumped any junk there.
|
||||
private fun countAttachments() = storePath.list { it.filter { it.isRegularFile() }.count() }
|
||||
|
||||
/**
|
||||
* If true, newly inserted attachments will be unzipped to a subdirectory of the [storePath]. This is intended for
|
||||
* human browsing convenience: the attachment itself will still be the file (that is, edits to the extracted directory
|
||||
* will not have any effect).
|
||||
*/
|
||||
@Volatile var automaticallyExtractAttachments = false
|
||||
@Volatile override var automaticallyExtractAttachments = false
|
||||
|
||||
init {
|
||||
require(storePath.isDirectory()) { "$storePath must be a directory" }
|
||||
|
||||
session.withTransaction {
|
||||
attachmentCount.inc(session.count(AttachmentEntity::class).get().value().toLong())
|
||||
}
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
class OnDiskHashMismatch(val file: Path, val actual: SecureHash) : Exception() {
|
||||
override fun toString() = "File $file hashed to $actual: corruption in attachment store?"
|
||||
class HashMismatchException(val expected: SecureHash, val actual: SecureHash) : Exception() {
|
||||
override fun toString() = "File $expected hashed to $actual: corruption in attachment store?"
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a stream and hashes data as it is read: if the entire stream is consumed, then at the end the hash of
|
||||
* the read data is compared to the [expected] hash and [OnDiskHashMismatch] is thrown by [close] if they didn't
|
||||
* the read data is compared to the [expected] hash and [HashMismatchException] is thrown by [close] if they didn't
|
||||
* match. The goal of this is to detect cases where attachments in the store have been tampered with or corrupted
|
||||
* and no longer match their file name. It won't always work: if we read a zip for our own uses and skip around
|
||||
* inside it, we haven't read the whole file, so we can't check the hash. But when copying it over the network
|
||||
* this will provide an additional safety check against user error.
|
||||
*/
|
||||
private class HashCheckingStream(val expected: SecureHash.SHA256,
|
||||
val filePath: Path,
|
||||
val expectedSize: Int,
|
||||
input: InputStream,
|
||||
private val counter: CountingInputStream = CountingInputStream(input),
|
||||
private val stream: HashingInputStream = HashingInputStream(Hashing.sha256(), counter)) : FilterInputStream(stream) {
|
||||
|
||||
private val expectedSize = filePath.size
|
||||
|
||||
override fun close() {
|
||||
super.close()
|
||||
if (counter.count != expectedSize) return
|
||||
|
||||
if (counter.count != expectedSize.toLong()) return
|
||||
|
||||
val actual = SecureHash.SHA256(stream.hash().asBytes())
|
||||
if (actual != expected)
|
||||
throw OnDiskHashMismatch(filePath, actual)
|
||||
throw HashMismatchException(expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
// Deliberately not an inner class to avoid holding a reference to the attachments service.
|
||||
private class AttachmentImpl(override val id: SecureHash,
|
||||
private val path: Path,
|
||||
private val attachment: ByteArray,
|
||||
private val checkOnLoad: Boolean) : Attachment {
|
||||
override fun open(): InputStream {
|
||||
var stream = Files.newInputStream(path)
|
||||
|
||||
var stream = ByteArrayInputStream(attachment)
|
||||
|
||||
// This is just an optional safety check. If it slows things down too much it can be disabled.
|
||||
if (id is SecureHash.SHA256 && checkOnLoad)
|
||||
stream = HashCheckingStream(id, path, stream)
|
||||
return HashCheckingStream(id, attachment.size, stream)
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
@ -96,38 +98,54 @@ class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : Atta
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): Attachment? {
|
||||
val path = storePath / id.toString()
|
||||
if (!path.exists()) return null
|
||||
return AttachmentImpl(id, path, checkAttachmentsOnLoad)
|
||||
val attachment = session.withTransaction {
|
||||
try {
|
||||
session.select(AttachmentEntity::class)
|
||||
.where(AttachmentEntity.ATT_ID.eq(id))
|
||||
.get()
|
||||
.single()
|
||||
} catch (e: NoSuchElementException) {
|
||||
null
|
||||
}
|
||||
} ?: return null
|
||||
|
||||
return AttachmentImpl(id, attachment.content, checkAttachmentsOnLoad)
|
||||
}
|
||||
|
||||
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
|
||||
override fun importAttachment(jar: InputStream): SecureHash {
|
||||
require(jar !is JarInputStream)
|
||||
val hs = HashingInputStream(Hashing.sha256(), jar)
|
||||
val tmp = storePath / "tmp.${UUID.randomUUID()}"
|
||||
hs.copyTo(tmp)
|
||||
checkIsAValidJAR(tmp)
|
||||
val bytes = hs.readBytes()
|
||||
checkIsAValidJAR(hs)
|
||||
val id = SecureHash.SHA256(hs.hash().asBytes())
|
||||
val finalPath = storePath / id.toString()
|
||||
try {
|
||||
// Move into place atomically or fail if that isn't possible. We don't want a half moved attachment to
|
||||
// be exposed to parallel threads. This gives us thread safety.
|
||||
if (!finalPath.exists()) {
|
||||
log.info("Stored new attachment $id")
|
||||
attachmentCount.inc()
|
||||
} else {
|
||||
log.info("Replacing attachment $id - only bother doing this if you're trying to repair file corruption")
|
||||
}
|
||||
tmp.moveTo(finalPath, StandardCopyOption.ATOMIC_MOVE)
|
||||
} finally {
|
||||
tmp.deleteIfExists()
|
||||
|
||||
val count = session.withTransaction {
|
||||
session.count(AttachmentEntity::class)
|
||||
.where(AttachmentEntity.ATT_ID.eq(id))
|
||||
.get().value()
|
||||
}
|
||||
|
||||
if (count > 0) {
|
||||
throw FileAlreadyExistsException(id.toString())
|
||||
}
|
||||
|
||||
session.withTransaction {
|
||||
val attachment = AttachmentEntity()
|
||||
attachment.attId = id
|
||||
attachment.content = bytes
|
||||
session.insert(attachment)
|
||||
}
|
||||
|
||||
attachmentCount.inc()
|
||||
|
||||
log.info("Stored new attachment $id")
|
||||
|
||||
if (automaticallyExtractAttachments) {
|
||||
val extractTo = storePath / "$id.jar"
|
||||
try {
|
||||
extractTo.createDirectory()
|
||||
extractZipFile(finalPath, extractTo)
|
||||
extractZipFile(hs, extractTo)
|
||||
} catch(e: FileAlreadyExistsException) {
|
||||
log.trace("Did not extract attachment jar to directory because it already exists")
|
||||
} catch(e: Exception) {
|
||||
@ -135,20 +153,19 @@ class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : Atta
|
||||
// TODO: Delete the extractTo directory here.
|
||||
}
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
private fun checkIsAValidJAR(path: Path) {
|
||||
private fun checkIsAValidJAR(stream: InputStream) {
|
||||
// Just iterate over the entries with verification enabled: should be good enough to catch mistakes.
|
||||
path.read {
|
||||
val jar = JarInputStream(it)
|
||||
while (true) {
|
||||
val cursor = jar.nextJarEntry ?: break
|
||||
val entryPath = Paths.get(cursor.name)
|
||||
// Security check to stop zips trying to escape their rightful place.
|
||||
if (entryPath.isAbsolute || entryPath.normalize() != entryPath || '\\' in cursor.name)
|
||||
throw IllegalArgumentException("Path is either absolute or non-normalised: $entryPath")
|
||||
}
|
||||
val jar = JarInputStream(stream)
|
||||
while (true) {
|
||||
val cursor = jar.nextJarEntry ?: break
|
||||
val entryPath = Paths.get(cursor.name)
|
||||
// Security check to stop zips trying to escape their rightful place.
|
||||
if (entryPath.isAbsolute || entryPath.normalize() != entryPath || '\\' in cursor.name)
|
||||
throw IllegalArgumentException("Path is either absolute or non-normalised: $entryPath")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,22 +6,25 @@ import net.corda.core.crypto.sha256
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.write
|
||||
import net.corda.flows.FetchAttachmentsFlow
|
||||
import net.corda.flows.FetchDataFlow
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.i2p.crypto.eddsa.KeyPairGenerator
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.Closeable
|
||||
import java.math.BigInteger
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyPairGeneratorSpi
|
||||
import java.util.jar.JarOutputStream
|
||||
import java.util.zip.ZipEntry
|
||||
import kotlin.test.assertEquals
|
||||
@ -29,10 +32,17 @@ import kotlin.test.assertFailsWith
|
||||
|
||||
class AttachmentTests {
|
||||
lateinit var network: MockNetwork
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
network = MockNetwork()
|
||||
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
|
||||
configuration = RequeryConfiguration(dataSourceProperties)
|
||||
}
|
||||
|
||||
fun fakeAttachment(): ByteArray {
|
||||
@ -50,7 +60,9 @@ class AttachmentTests {
|
||||
val (n0, n1) = network.createTwoNodes()
|
||||
|
||||
// Insert an attachment into node zero's store directly.
|
||||
val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
|
||||
val id = databaseTransaction(n0.database) {
|
||||
n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
|
||||
}
|
||||
|
||||
// Get node one to run a flow to fetch it and insert it.
|
||||
network.runNetwork()
|
||||
@ -59,7 +71,10 @@ class AttachmentTests {
|
||||
assertEquals(0, f1.resultFuture.getOrThrow().fromDisk.size)
|
||||
|
||||
// Verify it was inserted into node one's store.
|
||||
val attachment = n1.storage.attachments.openAttachment(id)!!
|
||||
val attachment = databaseTransaction(n1.database) {
|
||||
n1.storage.attachments.openAttachment(id)!!
|
||||
}
|
||||
|
||||
assertEquals(id, attachment.open().readBytes().sha256())
|
||||
|
||||
// Shut down node zero and ensure node one can still resolve the attachment.
|
||||
@ -101,11 +116,23 @@ class AttachmentTests {
|
||||
}, true, null, null, ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type))
|
||||
val n1 = network.createNode(n0.info.address)
|
||||
|
||||
val attachment = fakeAttachment()
|
||||
// Insert an attachment into node zero's store directly.
|
||||
val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
|
||||
val id = databaseTransaction(n0.database) {
|
||||
n0.storage.attachments.importAttachment(ByteArrayInputStream(attachment))
|
||||
}
|
||||
|
||||
// Corrupt its store.
|
||||
network.filesystem.getPath("/nodes/0/attachments/$id").write { it.write(byteArrayOf(99, 99, 99, 99)) }
|
||||
val corruptBytes = "arggghhhh".toByteArray()
|
||||
System.arraycopy(corruptBytes, 0, attachment, 0, corruptBytes.size)
|
||||
|
||||
val corruptAttachment = AttachmentEntity()
|
||||
corruptAttachment.attId = id
|
||||
corruptAttachment.content = attachment
|
||||
databaseTransaction(n0.database) {
|
||||
(n0.storage.attachments as NodeAttachmentService).session.update(corruptAttachment)
|
||||
}
|
||||
|
||||
|
||||
// Get n1 to fetch the attachment. Should receive corrupted bytes.
|
||||
network.runNetwork()
|
||||
|
@ -24,7 +24,6 @@ import net.corda.flows.TwoPartyTradeFlow.Seller
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.StorageServiceImpl
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
@ -90,8 +89,11 @@ class TwoPartyTradeFlowTests {
|
||||
databaseTransaction(bobNode.database) {
|
||||
bobNode.services.fillWithSomeTestCash(2000.DOLLARS, outputNotary = notaryNode.info.notaryIdentity)
|
||||
}
|
||||
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second
|
||||
|
||||
val alicesFakePaper = databaseTransaction(aliceNode.database) {
|
||||
fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second
|
||||
}
|
||||
|
||||
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey, notaryKey)
|
||||
|
||||
@ -135,8 +137,10 @@ class TwoPartyTradeFlowTests {
|
||||
databaseTransaction(bobNode.database) {
|
||||
bobNode.services.fillWithSomeTestCash(2000.DOLLARS, outputNotary = notaryNode.info.notaryIdentity)
|
||||
}
|
||||
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second
|
||||
val alicesFakePaper = databaseTransaction(aliceNode.database) {
|
||||
fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second
|
||||
}
|
||||
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey, notaryKey)
|
||||
val aliceFuture = runBuyerAndSeller(notaryNode, aliceNode, bobNode, "alice's paper".outputStateAndRef()).sellerResult
|
||||
|
||||
@ -221,7 +225,7 @@ class TwoPartyTradeFlowTests {
|
||||
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
|
||||
// That constructs the storage service object in a customised way ...
|
||||
override fun constructStorageService(
|
||||
attachments: NodeAttachmentService,
|
||||
attachments: AttachmentStorage,
|
||||
transactionStorage: TransactionStorage,
|
||||
stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage
|
||||
): StorageServiceImpl {
|
||||
@ -248,15 +252,19 @@ class TwoPartyTradeFlowTests {
|
||||
it.write("Our commercial paper is top notch stuff".toByteArray())
|
||||
it.closeEntry()
|
||||
}
|
||||
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
val attachmentID = databaseTransaction(aliceNode.database) {
|
||||
attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
}
|
||||
|
||||
val extraKey = bobNode.keyManagement.freshKey()
|
||||
val bobsFakeCash = fillUpForBuyer(false, extraKey.public.composite,
|
||||
DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bobNode.services.legalIdentityKey, extraKey)
|
||||
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second
|
||||
val alicesFakePaper = databaseTransaction(aliceNode.database) {
|
||||
fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second
|
||||
}
|
||||
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey)
|
||||
|
||||
net.runNetwork() // Clear network map registration messages
|
||||
@ -283,10 +291,12 @@ class TwoPartyTradeFlowTests {
|
||||
}
|
||||
|
||||
// Bob has downloaded the attachment.
|
||||
bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use {
|
||||
it.nextJarEntry
|
||||
val contents = it.reader().readText()
|
||||
assertTrue(contents.contains("Our commercial paper is top notch stuff"))
|
||||
databaseTransaction(bobNode.database) {
|
||||
bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use {
|
||||
it.nextJarEntry
|
||||
val contents = it.reader().readText()
|
||||
assertTrue(contents.contains("Our commercial paper is top notch stuff"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -327,7 +337,7 @@ class TwoPartyTradeFlowTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `track() works`() {
|
||||
fun `track works`() {
|
||||
|
||||
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
|
||||
@ -343,14 +353,20 @@ class TwoPartyTradeFlowTests {
|
||||
it.write("Our commercial paper is top notch stuff".toByteArray())
|
||||
it.closeEntry()
|
||||
}
|
||||
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
val attachmentID = databaseTransaction(aliceNode.database) {
|
||||
attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
}
|
||||
|
||||
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public.composite,
|
||||
DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
insertFakeTransactions(bobsFakeCash, bobNode, notaryNode)
|
||||
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second
|
||||
|
||||
val alicesFakePaper = databaseTransaction(aliceNode.database) {
|
||||
fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second
|
||||
}
|
||||
|
||||
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey)
|
||||
|
||||
net.runNetwork() // Clear network map registration messages
|
||||
@ -444,8 +460,10 @@ class TwoPartyTradeFlowTests {
|
||||
|
||||
val bobsBadCash = fillUpForBuyer(bobError, bobKey.public.composite, DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` issuer, null, notaryNode.info.notaryIdentity).second
|
||||
val alicesFakePaper = databaseTransaction(aliceNode.database) {
|
||||
fillUpForSeller(aliceError, aliceNode.info.legalIdentity.owningKey,
|
||||
1200.DOLLARS `issued by` issuer, null, notaryNode.info.notaryIdentity).second
|
||||
}
|
||||
|
||||
insertFakeTransactions(bobsBadCash, bobNode, notaryNode, bobKey)
|
||||
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey)
|
||||
@ -468,6 +486,7 @@ class TwoPartyTradeFlowTests {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private fun insertFakeTransactions(
|
||||
wtxToSign: List<WireTransaction>,
|
||||
node: AbstractNode,
|
||||
|
@ -5,18 +5,27 @@ import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.div
|
||||
import net.corda.core.read
|
||||
import net.corda.core.readAll
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.core.write
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.nio.charset.Charset
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.FileSystem
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption.WRITE
|
||||
import java.util.*
|
||||
import java.util.jar.JarEntry
|
||||
import java.util.jar.JarOutputStream
|
||||
import kotlin.test.assertEquals
|
||||
@ -26,18 +35,35 @@ import kotlin.test.assertNull
|
||||
class NodeAttachmentStorageTest {
|
||||
// Use an in memory file system for testing attachment storage.
|
||||
lateinit var fs: FileSystem
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var dataSourceProperties: Properties
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
|
||||
dataSourceProperties = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProperties)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
|
||||
configuration = RequeryConfiguration(dataSourceProperties)
|
||||
fs = Jimfs.newFileSystem(Configuration.unix())
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
TransactionManager.current().close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `insert and retrieve`() {
|
||||
val testJar = makeTestJar()
|
||||
val expectedHash = testJar.readAll().sha256()
|
||||
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry())
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), dataSourceProperties, MetricRegistry())
|
||||
val id = testJar.read { storage.importAttachment(it) }
|
||||
assertEquals(expectedHash, id)
|
||||
|
||||
@ -49,31 +75,49 @@ class NodeAttachmentStorageTest {
|
||||
val e2 = stream.nextJarEntry!!
|
||||
assertEquals("test2.txt", e2.name)
|
||||
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content")
|
||||
|
||||
stream.close()
|
||||
|
||||
storage.openAttachment(id)!!.openAsJAR().use {
|
||||
it.nextJarEntry
|
||||
it.readBytes()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `duplicates not allowed`() {
|
||||
|
||||
val testJar = makeTestJar()
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry())
|
||||
testJar.read { storage.importAttachment(it) }
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), dataSourceProperties, MetricRegistry())
|
||||
testJar.read {
|
||||
storage.importAttachment(it)
|
||||
}
|
||||
assertFailsWith<FileAlreadyExistsException> {
|
||||
testJar.read { storage.importAttachment(it) }
|
||||
testJar.read {
|
||||
storage.importAttachment(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `corrupt entry throws exception`() {
|
||||
val testJar = makeTestJar()
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry())
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), dataSourceProperties, MetricRegistry())
|
||||
val id = testJar.read { storage.importAttachment(it) }
|
||||
|
||||
// Corrupt the file in the store.
|
||||
fs.getPath("/", id.toString()).write(options = WRITE) { it.write("arggghhhh".toByteArray()) }
|
||||
val bytes = testJar.readAll();
|
||||
val corruptBytes = "arggghhhh".toByteArray()
|
||||
System.arraycopy(corruptBytes, 0, bytes, 0, corruptBytes.size)
|
||||
val corruptAttachment = AttachmentEntity()
|
||||
corruptAttachment.attId = id
|
||||
corruptAttachment.content = bytes
|
||||
storage.session.update(corruptAttachment)
|
||||
|
||||
val e = assertFailsWith<NodeAttachmentService.OnDiskHashMismatch> {
|
||||
val e = assertFailsWith<NodeAttachmentService.HashMismatchException> {
|
||||
storage.openAttachment(id)!!.open().use { it.readBytes() }
|
||||
}
|
||||
assertEquals(e.file, storage.storePath / id.toString())
|
||||
assertEquals(e.expected, id)
|
||||
|
||||
// But if we skip around and read a single entry, no exception is thrown.
|
||||
storage.openAttachment(id)!!.openAsJAR().use {
|
||||
|
@ -14,7 +14,6 @@ import net.corda.core.utilities.Emoji
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.TwoPartyTradeFlow
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
||||
@ -28,7 +27,7 @@ class BuyerFlow(val otherParty: Party,
|
||||
init {
|
||||
// Buyer will fetch the attachment from the seller automatically when it resolves the transaction.
|
||||
// For demo purposes just extract attachment jars when saved to disk, so the user can explore them.
|
||||
val attachmentsPath = (services.storageService.attachments as NodeAttachmentService).let {
|
||||
val attachmentsPath = (services.storageService.attachments).let {
|
||||
it.automaticallyExtractAttachments = true
|
||||
it.storePath
|
||||
}
|
||||
|
@ -23,6 +23,8 @@ import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.File
|
||||
import java.io.InputStream
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.security.KeyPair
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
@ -98,6 +100,8 @@ class MockKeyManagementService(vararg initialKeys: KeyPair) : SingletonSerialize
|
||||
|
||||
class MockAttachmentStorage : AttachmentStorage {
|
||||
val files = HashMap<SecureHash, ByteArray>()
|
||||
override var automaticallyExtractAttachments = false
|
||||
override var storePath = Paths.get("")
|
||||
|
||||
override fun openAttachment(id: SecureHash): Attachment? {
|
||||
val f = files[id] ?: return null
|
||||
|
Loading…
Reference in New Issue
Block a user