diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index cc35dcb343..91d8a67bf3 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -204,7 +204,7 @@ inline fun elapsedTime(block: () -> Unit): Duration { val start = System.nanoTime() block() val end = System.nanoTime() - return Duration.ofNanos(end-start) + return Duration.ofNanos(end - start) } // TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError @@ -280,13 +280,16 @@ class TransientProperty(private val initializer: () -> T) { /** * Given a path to a zip file, extracts it to the given directory. */ -fun extractZipFile(zipFile: Path, toDirectory: Path) { - val normalisedDirectory = toDirectory.normalize().createDirectories() +fun extractZipFile(zipFile: Path, toDirectory: Path) = extractZipFile(Files.newInputStream(zipFile), toDirectory) - zipFile.read { - val zip = ZipInputStream(BufferedInputStream(it)) +/** + * Given a zip file input stream, extracts it to the given directory. + */ +fun extractZipFile(inputStream: InputStream, toDirectory: Path) { + val normalisedDirectory = toDirectory.normalize().createDirectories() + ZipInputStream(BufferedInputStream(inputStream)).use { while (true) { - val e = zip.nextEntry ?: break + val e = it.nextEntry ?: break val outPath = (normalisedDirectory / e.name).normalize() // Security checks: we should reject a zip that contains tricksy paths that try to escape toDirectory. @@ -297,9 +300,9 @@ fun extractZipFile(zipFile: Path, toDirectory: Path) { continue } outPath.write { out -> - ByteStreams.copy(zip, out) + ByteStreams.copy(it, out) } - zip.closeEntry() + it.closeEntry() } } } @@ -394,13 +397,16 @@ private class ObservableToFuture(observable: Observable) : AbstractFuture< override fun onNext(value: T) { set(value) } + override fun onError(e: Throwable) { setException(e) } + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { subscription.unsubscribe() return super.cancel(mayInterruptIfRunning) } + override fun onCompleted() {} } diff --git a/core/src/main/kotlin/net/corda/core/node/services/AttachmentStorage.kt b/core/src/main/kotlin/net/corda/core/node/services/AttachmentStorage.kt index 1db661fdb8..bc98f28138 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/AttachmentStorage.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/AttachmentStorage.kt @@ -3,11 +3,20 @@ package net.corda.core.node.services import net.corda.core.contracts.Attachment import net.corda.core.crypto.SecureHash import java.io.InputStream +import java.nio.file.Path /** * An attachment store records potentially large binary objects, identified by their hash. */ interface AttachmentStorage { + /** + * If true, newly inserted attachments will be unzipped to a subdirectory of the [storePath]. This is intended for + * human browsing convenience: the attachment itself will still be the file (that is, edits to the extracted directory + * will not have any effect). + */ + var automaticallyExtractAttachments : Boolean + var storePath : Path + /** * Returns a handle to a locally stored attachment, or null if it's not known. The handle can be used to open * a stream for the data, which will be a zip/jar file. diff --git a/core/src/main/kotlin/net/corda/core/schemas/requery/converters/BlobConverter.kt b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/BlobConverter.kt new file mode 100644 index 0000000000..78e5b9e337 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/BlobConverter.kt @@ -0,0 +1,28 @@ +package net.corda.core.schemas.requery.converters + +import io.requery.Converter +import java.sql.Blob +import javax.sql.rowset.serial.SerialBlob + +/** + * Converts from a [ByteArray] to a [Blob]. + */ +class BlobConverter : Converter { + + override fun getMappedType(): Class = ByteArray::class.java + + override fun getPersistedType(): Class = Blob::class.java + + /** + * creates BLOB(INT.MAX) = 2 GB + */ + override fun getPersistedSize(): Int? = null + + override fun convertToPersisted(value: ByteArray?): Blob? { + return value?.let { SerialBlob(value) } + } + + override fun convertToMapped(type: Class?, value: Blob?): ByteArray? { + return value?.getBytes(1, value.length().toInt()) + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/schemas/requery/converters/SecureHashConverter.kt b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/SecureHashConverter.kt new file mode 100644 index 0000000000..1fcea459cc --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/SecureHashConverter.kt @@ -0,0 +1,28 @@ +package net.corda.core.schemas.requery.converters + +import io.requery.Converter +import net.corda.core.crypto.SecureHash + +/** + * Convert from a [SecureHash] to a [String] + */ +class SecureHashConverter : Converter { + + override fun getMappedType(): Class = SecureHash::class.java + + override fun getPersistedType(): Class = String::class.java + + /** + * SecureHash consists of 32 bytes which need VARCHAR(64) in hex + * TODO: think about other hash widths + */ + override fun getPersistedSize(): Int? = 64 + + override fun convertToPersisted(value: SecureHash?): String? { + return value?.toString() + } + + override fun convertToMapped(type: Class, value: String?): SecureHash? { + return value?.let { SecureHash.parse(value) } + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/net/corda/core/flows/ResolveTransactionsFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ResolveTransactionsFlowTest.kt index 51b7679474..0b75d89a38 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ResolveTransactionsFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ResolveTransactionsFlowTest.kt @@ -134,13 +134,20 @@ class ResolveTransactionsFlowTest { @Test fun attachment() { - val id = a.services.storageService.attachments.importAttachment("Some test file".toByteArray().opaque().open()) + // TODO: this operation should not require an explicit transaction + val id = databaseTransaction(a.database) { + a.services.storageService.attachments.importAttachment("Some test file".toByteArray().opaque().open()) + } val stx2 = makeTransactions(withAttachment = id).second val p = ResolveTransactionsFlow(stx2, a.info.legalIdentity) val future = b.services.startFlow(p).resultFuture net.runNetwork() future.getOrThrow() - assertNotNull(b.services.storageService.attachments.openAttachment(id)) + + // TODO: this operation should not require an explicit transaction + databaseTransaction(b.database) { + assertNotNull(b.services.storageService.attachments.openAttachment(id)) + } } // DOCSTART 2 diff --git a/node-schemas/src/main/kotlin/net/corda/node/services/persistence/schemas/AttachmentsSchema.kt b/node-schemas/src/main/kotlin/net/corda/node/services/persistence/schemas/AttachmentsSchema.kt new file mode 100644 index 0000000000..95ac8b89ca --- /dev/null +++ b/node-schemas/src/main/kotlin/net/corda/node/services/persistence/schemas/AttachmentsSchema.kt @@ -0,0 +1,18 @@ +package net.corda.node.services.persistence.schemas + +import io.requery.* +import net.corda.core.crypto.SecureHash +import net.corda.core.schemas.requery.converters.BlobConverter + +@Table(name = "attachments") +@Entity(model = "persistence") +interface Attachment : Persistable { + + @get:Key + @get:Column(name = "att_id", index = true) + var attId: SecureHash + + @get:Column(name = "content") + @get:Convert(BlobConverter::class) + var content: ByteArray +} \ No newline at end of file diff --git a/node/build.gradle b/node/build.gradle index a8c99a4461..46834b09b0 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -151,6 +151,9 @@ dependencies { compile 'commons-codec:commons-codec:1.10' compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87' + // Requery: object mapper for Kotlin + compile "io.requery:requery-kotlin:$requery_version" + // Integration test helpers integrationTestCompile "junit:junit:$junit_version" } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 7b09aa960a..297c113c11 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -502,7 +502,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, ) } - protected open fun constructStorageService(attachments: NodeAttachmentService, + protected open fun constructStorageService(attachments: AttachmentStorage, transactionStorage: TransactionStorage, stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage) = StorageServiceImpl(attachments, transactionStorage, stateMachineRecordedTransactionMappingStorage) @@ -547,13 +547,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, protected open fun generateKeyPair() = cryptoGenerateKeyPair() - protected fun makeAttachmentStorage(dir: Path): NodeAttachmentService { + protected fun makeAttachmentStorage(dir: Path): AttachmentStorage { val attachmentsDir = dir / "attachments" try { attachmentsDir.createDirectory() } catch (e: FileAlreadyExistsException) { } - return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics) + return NodeAttachmentService(attachmentsDir, configuration.dataSourceProperties, services.monitoringService.metrics) } protected fun createNodeDir() { diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index cbbfb88104..44606f2cc5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -109,9 +109,26 @@ class CordaRPCOpsImpl( ) } - override fun attachmentExists(id: SecureHash) = services.storageService.attachments.openAttachment(id) != null - override fun openAttachment(id: SecureHash) = services.storageService.attachments.openAttachment(id)!!.open() - override fun uploadAttachment(jar: InputStream) = services.storageService.attachments.importAttachment(jar) + override fun attachmentExists(id: SecureHash): Boolean { + // TODO: this operation should not require an explicit transaction + return databaseTransaction(database){ + services.storageService.attachments.openAttachment(id) != null + } + } + + override fun openAttachment(id: SecureHash): InputStream { + // TODO: this operation should not require an explicit transaction + return databaseTransaction(database) { + services.storageService.attachments.openAttachment(id)!!.open() + } + } + + override fun uploadAttachment(jar: InputStream): SecureHash { + // TODO: this operation should not require an explicit transaction + return databaseTransaction(database){ + services.storageService.attachments.importAttachment(jar) + } + } override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass) override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state) override fun currentNodeTime(): Instant = Instant.now(services.clock) diff --git a/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt b/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt index 5c6973242f..20e99c58dd 100644 --- a/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt +++ b/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt @@ -9,9 +9,7 @@ import io.requery.sql.* import io.requery.sql.platform.H2 import io.requery.util.function.Function import io.requery.util.function.Supplier -import net.corda.core.schemas.requery.converters.InstantConverter -import net.corda.core.schemas.requery.converters.StateRefConverter -import net.corda.core.schemas.requery.converters.VaultStateStatusConverter +import net.corda.core.schemas.requery.converters.* import org.jetbrains.exposed.sql.transactions.TransactionManager import java.sql.Connection import java.util.* @@ -69,6 +67,8 @@ class KotlinConfigurationTransactionWrapper(private val model: EntityModel, val vaultStateStatusConverter = VaultStateStatusConverter() customMapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType) customMapping.addConverter(StateRefConverter(), StateRefConverter::getMappedType.javaClass) + customMapping.addConverter(SecureHashConverter(), SecureHashConverter::getMappedType.javaClass) + return customMapping } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt index 3402537c81..0bdd5a3254 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt @@ -5,13 +5,20 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.hash.Hashing import com.google.common.hash.HashingInputStream import com.google.common.io.CountingInputStream -import net.corda.core.* import net.corda.core.contracts.Attachment +import net.corda.core.createDirectory import net.corda.core.crypto.SecureHash +import net.corda.core.div +import net.corda.core.extractZipFile +import net.corda.core.isDirectory import net.corda.core.node.services.AttachmentStorage import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.loggerFor import net.corda.node.services.api.AcceptsFileUpload +import net.corda.node.services.database.RequeryConfiguration +import net.corda.node.services.persistence.schemas.AttachmentEntity +import net.corda.node.services.persistence.schemas.Models +import java.io.ByteArrayInputStream import java.io.FilterInputStream import java.io.InputStream import java.nio.file.* @@ -20,74 +27,69 @@ import java.util.jar.JarInputStream import javax.annotation.concurrent.ThreadSafe /** - * Stores attachments in the specified local directory, which must exist. Doesn't allow new attachments to be uploaded. + * Stores attachments in H2 database. */ @ThreadSafe -class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : AttachmentStorage, AcceptsFileUpload { +class NodeAttachmentService(override var storePath: Path, dataSourceProperties: Properties, metrics: MetricRegistry) : AttachmentStorage, AcceptsFileUpload { private val log = loggerFor() + val configuration = RequeryConfiguration(dataSourceProperties) + val session = configuration.sessionForModel(Models.PERSISTENCE) + @VisibleForTesting var checkAttachmentsOnLoad = true private val attachmentCount = metrics.counter("Attachments") - - init { - attachmentCount.inc(countAttachments()) - } - - // Just count all non-directories in the attachment store, and assume the admin hasn't dumped any junk there. - private fun countAttachments() = storePath.list { it.filter { it.isRegularFile() }.count() } - - /** - * If true, newly inserted attachments will be unzipped to a subdirectory of the [storePath]. This is intended for - * human browsing convenience: the attachment itself will still be the file (that is, edits to the extracted directory - * will not have any effect). - */ - @Volatile var automaticallyExtractAttachments = false + @Volatile override var automaticallyExtractAttachments = false init { require(storePath.isDirectory()) { "$storePath must be a directory" } + + session.withTransaction { + attachmentCount.inc(session.count(AttachmentEntity::class).get().value().toLong()) + } } @CordaSerializable - class OnDiskHashMismatch(val file: Path, val actual: SecureHash) : Exception() { - override fun toString() = "File $file hashed to $actual: corruption in attachment store?" + class HashMismatchException(val expected: SecureHash, val actual: SecureHash) : Exception() { + override fun toString() = "File $expected hashed to $actual: corruption in attachment store?" } /** * Wraps a stream and hashes data as it is read: if the entire stream is consumed, then at the end the hash of - * the read data is compared to the [expected] hash and [OnDiskHashMismatch] is thrown by [close] if they didn't + * the read data is compared to the [expected] hash and [HashMismatchException] is thrown by [close] if they didn't * match. The goal of this is to detect cases where attachments in the store have been tampered with or corrupted * and no longer match their file name. It won't always work: if we read a zip for our own uses and skip around * inside it, we haven't read the whole file, so we can't check the hash. But when copying it over the network * this will provide an additional safety check against user error. */ private class HashCheckingStream(val expected: SecureHash.SHA256, - val filePath: Path, + val expectedSize: Int, input: InputStream, private val counter: CountingInputStream = CountingInputStream(input), private val stream: HashingInputStream = HashingInputStream(Hashing.sha256(), counter)) : FilterInputStream(stream) { - - private val expectedSize = filePath.size - override fun close() { super.close() - if (counter.count != expectedSize) return + + if (counter.count != expectedSize.toLong()) return + val actual = SecureHash.SHA256(stream.hash().asBytes()) if (actual != expected) - throw OnDiskHashMismatch(filePath, actual) + throw HashMismatchException(expected, actual) } } - // Deliberately not an inner class to avoid holding a reference to the attachments service. private class AttachmentImpl(override val id: SecureHash, - private val path: Path, + private val attachment: ByteArray, private val checkOnLoad: Boolean) : Attachment { override fun open(): InputStream { - var stream = Files.newInputStream(path) + + var stream = ByteArrayInputStream(attachment) + // This is just an optional safety check. If it slows things down too much it can be disabled. if (id is SecureHash.SHA256 && checkOnLoad) - stream = HashCheckingStream(id, path, stream) + return HashCheckingStream(id, attachment.size, stream) + return stream } @@ -96,38 +98,54 @@ class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : Atta } override fun openAttachment(id: SecureHash): Attachment? { - val path = storePath / id.toString() - if (!path.exists()) return null - return AttachmentImpl(id, path, checkAttachmentsOnLoad) + val attachment = session.withTransaction { + try { + session.select(AttachmentEntity::class) + .where(AttachmentEntity.ATT_ID.eq(id)) + .get() + .single() + } catch (e: NoSuchElementException) { + null + } + } ?: return null + + return AttachmentImpl(id, attachment.content, checkAttachmentsOnLoad) } // TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks. override fun importAttachment(jar: InputStream): SecureHash { require(jar !is JarInputStream) val hs = HashingInputStream(Hashing.sha256(), jar) - val tmp = storePath / "tmp.${UUID.randomUUID()}" - hs.copyTo(tmp) - checkIsAValidJAR(tmp) + val bytes = hs.readBytes() + checkIsAValidJAR(hs) val id = SecureHash.SHA256(hs.hash().asBytes()) - val finalPath = storePath / id.toString() - try { - // Move into place atomically or fail if that isn't possible. We don't want a half moved attachment to - // be exposed to parallel threads. This gives us thread safety. - if (!finalPath.exists()) { - log.info("Stored new attachment $id") - attachmentCount.inc() - } else { - log.info("Replacing attachment $id - only bother doing this if you're trying to repair file corruption") - } - tmp.moveTo(finalPath, StandardCopyOption.ATOMIC_MOVE) - } finally { - tmp.deleteIfExists() + + val count = session.withTransaction { + session.count(AttachmentEntity::class) + .where(AttachmentEntity.ATT_ID.eq(id)) + .get().value() } + + if (count > 0) { + throw FileAlreadyExistsException(id.toString()) + } + + session.withTransaction { + val attachment = AttachmentEntity() + attachment.attId = id + attachment.content = bytes + session.insert(attachment) + } + + attachmentCount.inc() + + log.info("Stored new attachment $id") + if (automaticallyExtractAttachments) { val extractTo = storePath / "$id.jar" try { extractTo.createDirectory() - extractZipFile(finalPath, extractTo) + extractZipFile(hs, extractTo) } catch(e: FileAlreadyExistsException) { log.trace("Did not extract attachment jar to directory because it already exists") } catch(e: Exception) { @@ -135,20 +153,19 @@ class NodeAttachmentService(val storePath: Path, metrics: MetricRegistry) : Atta // TODO: Delete the extractTo directory here. } } + return id } - private fun checkIsAValidJAR(path: Path) { + private fun checkIsAValidJAR(stream: InputStream) { // Just iterate over the entries with verification enabled: should be good enough to catch mistakes. - path.read { - val jar = JarInputStream(it) - while (true) { - val cursor = jar.nextJarEntry ?: break - val entryPath = Paths.get(cursor.name) - // Security check to stop zips trying to escape their rightful place. - if (entryPath.isAbsolute || entryPath.normalize() != entryPath || '\\' in cursor.name) - throw IllegalArgumentException("Path is either absolute or non-normalised: $entryPath") - } + val jar = JarInputStream(stream) + while (true) { + val cursor = jar.nextJarEntry ?: break + val entryPath = Paths.get(cursor.name) + // Security check to stop zips trying to escape their rightful place. + if (entryPath.isAbsolute || entryPath.normalize() != entryPath || '\\' in cursor.name) + throw IllegalArgumentException("Path is either absolute or non-normalised: $entryPath") } } diff --git a/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt b/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt index 8ace2bbc3f..67d078c3ca 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt @@ -6,22 +6,25 @@ import net.corda.core.crypto.sha256 import net.corda.core.getOrThrow import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.ServiceInfo -import net.corda.core.write import net.corda.flows.FetchAttachmentsFlow import net.corda.flows.FetchDataFlow import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.database.RequeryConfiguration import net.corda.node.services.network.NetworkMapService import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.persistence.schemas.AttachmentEntity import net.corda.node.services.transactions.SimpleNotaryService import net.corda.testing.node.MockNetwork -import net.i2p.crypto.eddsa.KeyPairGenerator +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.node.makeTestDataSourceProperties +import org.jetbrains.exposed.sql.Database import org.junit.Before import org.junit.Test import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.io.Closeable import java.math.BigInteger import java.security.KeyPair -import java.security.KeyPairGeneratorSpi import java.util.jar.JarOutputStream import java.util.zip.ZipEntry import kotlin.test.assertEquals @@ -29,10 +32,17 @@ import kotlin.test.assertFailsWith class AttachmentTests { lateinit var network: MockNetwork + lateinit var dataSource: Closeable + lateinit var database: Database + lateinit var configuration: RequeryConfiguration @Before fun setUp() { network = MockNetwork() + + val dataSourceProperties = makeTestDataSourceProperties() + + configuration = RequeryConfiguration(dataSourceProperties) } fun fakeAttachment(): ByteArray { @@ -50,7 +60,9 @@ class AttachmentTests { val (n0, n1) = network.createTwoNodes() // Insert an attachment into node zero's store directly. - val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment())) + val id = databaseTransaction(n0.database) { + n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment())) + } // Get node one to run a flow to fetch it and insert it. network.runNetwork() @@ -59,7 +71,10 @@ class AttachmentTests { assertEquals(0, f1.resultFuture.getOrThrow().fromDisk.size) // Verify it was inserted into node one's store. - val attachment = n1.storage.attachments.openAttachment(id)!! + val attachment = databaseTransaction(n1.database) { + n1.storage.attachments.openAttachment(id)!! + } + assertEquals(id, attachment.open().readBytes().sha256()) // Shut down node zero and ensure node one can still resolve the attachment. @@ -101,11 +116,23 @@ class AttachmentTests { }, true, null, null, ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)) val n1 = network.createNode(n0.info.address) + val attachment = fakeAttachment() // Insert an attachment into node zero's store directly. - val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment())) + val id = databaseTransaction(n0.database) { + n0.storage.attachments.importAttachment(ByteArrayInputStream(attachment)) + } // Corrupt its store. - network.filesystem.getPath("/nodes/0/attachments/$id").write { it.write(byteArrayOf(99, 99, 99, 99)) } + val corruptBytes = "arggghhhh".toByteArray() + System.arraycopy(corruptBytes, 0, attachment, 0, corruptBytes.size) + + val corruptAttachment = AttachmentEntity() + corruptAttachment.attId = id + corruptAttachment.content = attachment + databaseTransaction(n0.database) { + (n0.storage.attachments as NodeAttachmentService).session.update(corruptAttachment) + } + // Get n1 to fetch the attachment. Should receive corrupted bytes. network.runNetwork() diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 3f11bcda92..796a3b4138 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -24,7 +24,6 @@ import net.corda.flows.TwoPartyTradeFlow.Seller import net.corda.node.internal.AbstractNode import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.persistence.DBTransactionStorage -import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.StorageServiceImpl import net.corda.node.services.persistence.checkpoints import net.corda.node.utilities.databaseTransaction @@ -90,8 +89,11 @@ class TwoPartyTradeFlowTests { databaseTransaction(bobNode.database) { bobNode.services.fillWithSomeTestCash(2000.DOLLARS, outputNotary = notaryNode.info.notaryIdentity) } - val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, - 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second + + val alicesFakePaper = databaseTransaction(aliceNode.database) { + fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, + 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second + } insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey, notaryKey) @@ -135,8 +137,10 @@ class TwoPartyTradeFlowTests { databaseTransaction(bobNode.database) { bobNode.services.fillWithSomeTestCash(2000.DOLLARS, outputNotary = notaryNode.info.notaryIdentity) } - val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, - 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second + val alicesFakePaper = databaseTransaction(aliceNode.database) { + fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, + 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second + } insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey, notaryKey) val aliceFuture = runBuyerAndSeller(notaryNode, aliceNode, bobNode, "alice's paper".outputStateAndRef()).sellerResult @@ -221,7 +225,7 @@ class TwoPartyTradeFlowTests { return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { // That constructs the storage service object in a customised way ... override fun constructStorageService( - attachments: NodeAttachmentService, + attachments: AttachmentStorage, transactionStorage: TransactionStorage, stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage ): StorageServiceImpl { @@ -248,15 +252,19 @@ class TwoPartyTradeFlowTests { it.write("Our commercial paper is top notch stuff".toByteArray()) it.closeEntry() } - val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray())) + val attachmentID = databaseTransaction(aliceNode.database) { + attachment(ByteArrayInputStream(stream.toByteArray())) + } val extraKey = bobNode.keyManagement.freshKey() val bobsFakeCash = fillUpForBuyer(false, extraKey.public.composite, DUMMY_CASH_ISSUER.party, notaryNode.info.notaryIdentity).second val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bobNode.services.legalIdentityKey, extraKey) - val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, - 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second + val alicesFakePaper = databaseTransaction(aliceNode.database) { + fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, + 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second + } val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey) net.runNetwork() // Clear network map registration messages @@ -283,10 +291,12 @@ class TwoPartyTradeFlowTests { } // Bob has downloaded the attachment. - bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use { - it.nextJarEntry - val contents = it.reader().readText() - assertTrue(contents.contains("Our commercial paper is top notch stuff")) + databaseTransaction(bobNode.database) { + bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use { + it.nextJarEntry + val contents = it.reader().readText() + assertTrue(contents.contains("Our commercial paper is top notch stuff")) + } } } @@ -327,7 +337,7 @@ class TwoPartyTradeFlowTests { } @Test - fun `track() works`() { + fun `track works`() { val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name) val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name) @@ -343,14 +353,20 @@ class TwoPartyTradeFlowTests { it.write("Our commercial paper is top notch stuff".toByteArray()) it.closeEntry() } - val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray())) + val attachmentID = databaseTransaction(aliceNode.database) { + attachment(ByteArrayInputStream(stream.toByteArray())) + } val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public.composite, DUMMY_CASH_ISSUER.party, notaryNode.info.notaryIdentity).second insertFakeTransactions(bobsFakeCash, bobNode, notaryNode) - val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, - 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second + + val alicesFakePaper = databaseTransaction(aliceNode.database) { + fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey, + 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second + } + insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey) net.runNetwork() // Clear network map registration messages @@ -444,8 +460,10 @@ class TwoPartyTradeFlowTests { val bobsBadCash = fillUpForBuyer(bobError, bobKey.public.composite, DUMMY_CASH_ISSUER.party, notaryNode.info.notaryIdentity).second - val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.info.legalIdentity.owningKey, - 1200.DOLLARS `issued by` issuer, null, notaryNode.info.notaryIdentity).second + val alicesFakePaper = databaseTransaction(aliceNode.database) { + fillUpForSeller(aliceError, aliceNode.info.legalIdentity.owningKey, + 1200.DOLLARS `issued by` issuer, null, notaryNode.info.notaryIdentity).second + } insertFakeTransactions(bobsBadCash, bobNode, notaryNode, bobKey) insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey) @@ -468,6 +486,7 @@ class TwoPartyTradeFlowTests { } } + private fun insertFakeTransactions( wtxToSign: List, node: AbstractNode, diff --git a/node/src/test/kotlin/net/corda/node/services/NodeAttachmentStorageTest.kt b/node/src/test/kotlin/net/corda/node/services/NodeAttachmentStorageTest.kt index 49d0cb0a63..e0f0a355b2 100644 --- a/node/src/test/kotlin/net/corda/node/services/NodeAttachmentStorageTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/NodeAttachmentStorageTest.kt @@ -5,18 +5,27 @@ import com.google.common.jimfs.Configuration import com.google.common.jimfs.Jimfs import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 -import net.corda.core.div import net.corda.core.read import net.corda.core.readAll +import net.corda.core.utilities.LogHelper import net.corda.core.write +import net.corda.node.services.database.RequeryConfiguration import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.persistence.schemas.AttachmentEntity +import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.utilities.configureDatabase +import net.corda.testing.node.makeTestDataSourceProperties +import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.transactions.TransactionManager +import org.junit.After import org.junit.Before import org.junit.Test +import java.io.Closeable import java.nio.charset.Charset import java.nio.file.FileAlreadyExistsException import java.nio.file.FileSystem import java.nio.file.Path -import java.nio.file.StandardOpenOption.WRITE +import java.util.* import java.util.jar.JarEntry import java.util.jar.JarOutputStream import kotlin.test.assertEquals @@ -26,18 +35,35 @@ import kotlin.test.assertNull class NodeAttachmentStorageTest { // Use an in memory file system for testing attachment storage. lateinit var fs: FileSystem + lateinit var dataSource: Closeable + lateinit var database: Database + lateinit var dataSourceProperties: Properties + lateinit var configuration: RequeryConfiguration @Before fun setUp() { + LogHelper.setLevel(PersistentUniquenessProvider::class) + + dataSourceProperties = makeTestDataSourceProperties() + val dataSourceAndDatabase = configureDatabase(dataSourceProperties) + dataSource = dataSourceAndDatabase.first + database = dataSourceAndDatabase.second + + configuration = RequeryConfiguration(dataSourceProperties) fs = Jimfs.newFileSystem(Configuration.unix()) } + @After + fun tearDown() { + TransactionManager.current().close() + } + @Test fun `insert and retrieve`() { val testJar = makeTestJar() val expectedHash = testJar.readAll().sha256() - val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry()) + val storage = NodeAttachmentService(fs.getPath("/"), dataSourceProperties, MetricRegistry()) val id = testJar.read { storage.importAttachment(it) } assertEquals(expectedHash, id) @@ -49,31 +75,49 @@ class NodeAttachmentStorageTest { val e2 = stream.nextJarEntry!! assertEquals("test2.txt", e2.name) assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content") + + stream.close() + + storage.openAttachment(id)!!.openAsJAR().use { + it.nextJarEntry + it.readBytes() + } } @Test fun `duplicates not allowed`() { + val testJar = makeTestJar() - val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry()) - testJar.read { storage.importAttachment(it) } + val storage = NodeAttachmentService(fs.getPath("/"), dataSourceProperties, MetricRegistry()) + testJar.read { + storage.importAttachment(it) + } assertFailsWith { - testJar.read { storage.importAttachment(it) } + testJar.read { + storage.importAttachment(it) + } } } @Test fun `corrupt entry throws exception`() { val testJar = makeTestJar() - val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry()) + val storage = NodeAttachmentService(fs.getPath("/"), dataSourceProperties, MetricRegistry()) val id = testJar.read { storage.importAttachment(it) } // Corrupt the file in the store. - fs.getPath("/", id.toString()).write(options = WRITE) { it.write("arggghhhh".toByteArray()) } + val bytes = testJar.readAll(); + val corruptBytes = "arggghhhh".toByteArray() + System.arraycopy(corruptBytes, 0, bytes, 0, corruptBytes.size) + val corruptAttachment = AttachmentEntity() + corruptAttachment.attId = id + corruptAttachment.content = bytes + storage.session.update(corruptAttachment) - val e = assertFailsWith { + val e = assertFailsWith { storage.openAttachment(id)!!.open().use { it.readBytes() } } - assertEquals(e.file, storage.storePath / id.toString()) + assertEquals(e.expected, id) // But if we skip around and read a single entry, no exception is thrown. storage.openAttachment(id)!!.openAsJAR().use { diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt index 35382d9ba1..86c37b59ae 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt @@ -14,7 +14,6 @@ import net.corda.core.utilities.Emoji import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.unwrap import net.corda.flows.TwoPartyTradeFlow -import net.corda.node.services.persistence.NodeAttachmentService import java.nio.file.Path import java.util.* @@ -28,7 +27,7 @@ class BuyerFlow(val otherParty: Party, init { // Buyer will fetch the attachment from the seller automatically when it resolves the transaction. // For demo purposes just extract attachment jars when saved to disk, so the user can explore them. - val attachmentsPath = (services.storageService.attachments as NodeAttachmentService).let { + val attachmentsPath = (services.storageService.attachments).let { it.automaticallyExtractAttachments = true it.storePath } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index 80da704332..5d04793a01 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -23,6 +23,8 @@ import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.File import java.io.InputStream +import java.nio.file.Path +import java.nio.file.Paths import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -98,6 +100,8 @@ class MockKeyManagementService(vararg initialKeys: KeyPair) : SingletonSerialize class MockAttachmentStorage : AttachmentStorage { val files = HashMap() + override var automaticallyExtractAttachments = false + override var storePath = Paths.get("") override fun openAttachment(id: SecureHash): Attachment? { val f = files[id] ?: return null