diff --git a/build.gradle b/build.gradle index 45d0ce3a92..1f4ac549f1 100644 --- a/build.gradle +++ b/build.gradle @@ -48,8 +48,6 @@ configurations.all() { } dependencies { - testCompile 'junit:junit:4.12' - compile project(':contracts') compile "com.google.code.findbugs:jsr305:3.0.1" @@ -96,6 +94,10 @@ dependencies { compile("commons-logging:commons-logging:1.2") { force = true } + + // Unit testing helpers. + testCompile 'junit:junit:4.12' + testCompile 'com.google.jimfs:jimfs:1.1' // in memory java.nio filesystem. } // These lines tell Gradle to add a couple of JVM command line arguments to unit test and program runs, which set up diff --git a/core/src/main/kotlin/core/Utils.kt b/core/src/main/kotlin/core/Utils.kt index 5754c21b8b..42148d588e 100644 --- a/core/src/main/kotlin/core/Utils.kt +++ b/core/src/main/kotlin/core/Utils.kt @@ -12,6 +12,9 @@ import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import org.slf4j.Logger +import java.io.InputStream +import java.nio.file.Files +import java.nio.file.Path import java.security.SecureRandom import java.time.Duration import java.time.temporal.Temporal @@ -45,6 +48,7 @@ infix fun ListenableFuture.then(body: () -> Unit): ListenableFuture = infix fun ListenableFuture.success(body: (T) -> Unit): ListenableFuture = apply { success(RunOnCallerThread, body) } infix fun ListenableFuture.failure(body: (Throwable) -> Unit): ListenableFuture = apply { failure(RunOnCallerThread, body) } +fun Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block) /** Executes the given block and sets the future to either the result, or any exception that was thrown. */ fun SettableFuture.setFrom(logger: Logger? = null, block: () -> T): SettableFuture { diff --git a/src/main/kotlin/core/Services.kt b/src/main/kotlin/core/Services.kt index c2049cbba7..3ce6616056 100644 --- a/src/main/kotlin/core/Services.kt +++ b/src/main/kotlin/core/Services.kt @@ -12,6 +12,7 @@ import core.crypto.SecureHash import core.crypto.generateKeyPair import core.messaging.MessagingService import core.messaging.NetworkMap +import java.io.InputStream import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -76,6 +77,7 @@ object DummyTimestampingAuthority { * anything like that, this interface is only big enough to support the prototyping work. */ interface StorageService { + /** TODO: Temp scaffolding that will go away eventually. */ fun getMap(tableName: String): MutableMap /** @@ -85,6 +87,9 @@ interface StorageService { */ val validatedTransactions: MutableMap + /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ + val attachments: AttachmentStorage + /** * A map of program hash->contract class type, used for verification. */ @@ -98,6 +103,33 @@ interface StorageService { val myLegalIdentityKey: KeyPair } +/** + * An attachment store records potentially large binary objects, identified by their hash. Note that attachments are + * immutable and can never be erased once inserted! + */ +interface AttachmentStorage { + /** + * Returns a newly opened stream for the given locally stored attachment, or null if no such attachment is known. + * The returned stream must be closed when you are done with it to avoid resource leaks. You should probably wrap + * the result in a [JarInputStream] unless you're sending it somewhere, there is a convenience helper for this + * on [Attachment]. + */ + fun openAttachment(id: SecureHash): Attachment? + + /** + * Inserts the given attachment into the store, does *not* close the input stream. This can be an intensive + * operation due to the need to copy the bytes to disk and hash them along the way. + * + * Note that you should not pass a [JarInputStream] into this method and it will throw if you do, because access + * to the raw byte stream is required. + * + * @throws FileAlreadyExistsException if the given byte stream has already been inserted. + * @throws IllegalArgumentException if the given byte stream is empty or a [JarInputStream] + * @throws IOException if something went wrong. + */ + fun importAttachment(jar: InputStream): SecureHash +} + /** * A service hub simply vends references to the other services a node has. Some of those services may be missing or * mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of diff --git a/src/main/kotlin/core/node/Node.kt b/src/main/kotlin/core/node/Node.kt index 22ec9a6748..2272810d38 100644 --- a/src/main/kotlin/core/node/Node.kt +++ b/src/main/kotlin/core/node/Node.kt @@ -185,6 +185,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon override val validatedTransactions: MutableMap get() = getMap("validated-transactions") + override val attachments: AttachmentStorage = NodeAttachmentStorage(dir.resolve("attachments")) override val contractPrograms = contractFactory override val myLegalIdentity = identity override val myLegalIdentityKey = keypair diff --git a/src/main/kotlin/core/node/NodeAttachmentStorage.kt b/src/main/kotlin/core/node/NodeAttachmentStorage.kt new file mode 100644 index 0000000000..ff1b4234fb --- /dev/null +++ b/src/main/kotlin/core/node/NodeAttachmentStorage.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.node + +import com.google.common.hash.Hashing +import com.google.common.hash.HashingInputStream +import com.google.common.io.CountingInputStream +import core.Attachment +import core.AttachmentStorage +import core.crypto.SecureHash +import core.utilities.loggerFor +import java.io.FilterInputStream +import java.io.InputStream +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardCopyOption +import java.util.* +import java.util.jar.JarInputStream +import javax.annotation.concurrent.ThreadSafe + +/** + * Stores attachments in the specified local directory, which must exist. Doesn't allow new attachments to be uploaded. + */ +@ThreadSafe +class NodeAttachmentStorage(val storePath: Path) : AttachmentStorage { + private val log = loggerFor() + + init { + require(Files.isDirectory(storePath)) { "$storePath must be a directory" } + } + + class OnDiskHashMismatch(val file: Path, val actual: SecureHash) : Exception() { + override fun toString() = "File $file hashed to $actual: corruption in attachment store?" + } + + /** + * Wraps a stream and hashes data as it is read: if the entire stream is consumed, then at the end the hash of + * the read data is compared to the [expected] hash and [OnDiskHashMismatch] is thrown by [close] if they didn't + * match. The goal of this is to detect cases where attachments in the store have been tampered with or corrupted + * and no longer match their file name. It won't always work: if we read a zip for our own uses and skip around + * inside it, we haven't read the whole file, so we can't check the hash. But when copying it over the network + * this will provide an additional safety check against user error. + */ + private class HashCheckingStream(val expected: SecureHash.SHA256, + val filePath: Path, + input: InputStream, + private val counter: CountingInputStream = CountingInputStream(input), + private val stream: HashingInputStream = HashingInputStream(Hashing.sha256(), counter)) : FilterInputStream(stream) { + + private val expectedSize = Files.size(filePath) + + override fun close() { + super.close() + if (counter.count != expectedSize) return + val actual = SecureHash.SHA256(stream.hash().asBytes()) + if (actual != expected) + throw OnDiskHashMismatch(filePath, actual) + } + } + + override fun openAttachment(id: SecureHash): Attachment? { + val path = storePath.resolve(id.toString()) + if (!Files.exists(path)) return null + var stream = Files.newInputStream(path) + // This is just an optional safety check. If it slows things down too much it can be disabled. + if (id is SecureHash.SHA256) + stream = HashCheckingStream(id, path, stream) + log.debug("Opening attachment $id") + return object : Attachment { + override fun open(): InputStream = stream + override val id: SecureHash = id + } + } + + override fun importAttachment(jar: InputStream): SecureHash { + require(jar !is JarInputStream) + val hs = HashingInputStream(Hashing.sha256(), jar) + val tmp = storePath.resolve("tmp.${UUID.randomUUID()}") + Files.copy(hs, tmp) + checkIsAValidJAR(tmp) + val id = SecureHash.SHA256(hs.hash().asBytes()) + val finalPath = storePath.resolve(id.toString()) + try { + // Move into place atomically or fail if that isn't possible. We don't want a half moved attachment to + // be exposed to parallel threads. This gives us thread safety. + Files.move(tmp, finalPath, StandardCopyOption.ATOMIC_MOVE) + } finally { + Files.deleteIfExists(tmp) + } + log.info("Stored new attachment $id") + return id + } + + private fun checkIsAValidJAR(path: Path) { + // Just iterate over the entries with verification enabled: should be good enough to catch mistakes. + JarInputStream(Files.newInputStream(path), true).use { stream -> + var cursor = stream.nextJarEntry + while (cursor != null) cursor = stream.nextJarEntry + } + } +} diff --git a/src/test/kotlin/core/MockServices.kt b/src/test/kotlin/core/MockServices.kt index 2300f38d04..e6407ef535 100644 --- a/src/test/kotlin/core/MockServices.kt +++ b/src/test/kotlin/core/MockServices.kt @@ -8,10 +8,7 @@ package core -import core.crypto.DigitalSignature -import core.crypto.SecureHash -import core.crypto.generateKeyPair -import core.crypto.signWithECDSA +import core.crypto.* import core.messaging.MessagingService import core.messaging.MockNetworkMap import core.messaging.NetworkMap @@ -24,6 +21,10 @@ import core.testutils.TEST_KEYS_TO_CORP_MAP import core.testutils.TEST_PROGRAM_MAP import core.testutils.TEST_TX_TIME import org.slf4j.LoggerFactory +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.File +import java.io.InputStream import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -31,6 +32,7 @@ import java.time.Clock import java.time.Duration import java.time.ZoneId import java.util.* +import java.util.jar.JarInputStream import javax.annotation.concurrent.ThreadSafe /** @@ -67,6 +69,34 @@ class MockWalletService(val states: List>) : WalletSer override val currentWallet = Wallet(states) } +class MockAttachmentStorage : AttachmentStorage { + val files = HashMap() + + override fun openAttachment(id: SecureHash): Attachment? { + val f = files[id] ?: return null + return object : Attachment { + override fun open(): JarInputStream = JarInputStream(ByteArrayInputStream(f)) + override val id: SecureHash = id + } + } + + override fun importAttachment(jar: InputStream): SecureHash { + // JIS makes read()/readBytes() return bytes of the current file, but we want to hash the entire container here. + require(jar !is JarInputStream) + + val bytes = run { + val s = ByteArrayOutputStream() + jar.copyTo(s) + s.toByteArray() + } + val sha256 = bytes.sha256() + if (files.containsKey(sha256)) + throw FileAlreadyExistsException(File("!! MOCK FILE NAME")) + files[sha256] = bytes + return sha256 + } +} + @ThreadSafe class MockStorageService(val recordingAs: Map? = null) : StorageService { override val myLegalIdentityKey: KeyPair = generateKeyPair() @@ -79,6 +109,8 @@ class MockStorageService(val recordingAs: Map? = null) : Storage override val contractPrograms = MockContractFactory + override val attachments: AttachmentStorage = MockAttachmentStorage() + @Suppress("UNCHECKED_CAST") override fun getMap(tableName: String): MutableMap { synchronized(tables) { diff --git a/src/test/kotlin/core/node/NodeAttachmentStorageTest.kt b/src/test/kotlin/core/node/NodeAttachmentStorageTest.kt new file mode 100644 index 0000000000..b3c67f6d48 --- /dev/null +++ b/src/test/kotlin/core/node/NodeAttachmentStorageTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.node + +import com.google.common.jimfs.Jimfs +import core.crypto.SecureHash +import core.use +import org.junit.Before +import org.junit.Test +import java.nio.charset.Charset +import java.nio.file.FileSystem +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardOpenOption +import java.util.jar.JarEntry +import java.util.jar.JarOutputStream +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertNull + +class NodeAttachmentStorageTest { + // Use an in memory file system for testing attachment storage. + lateinit var fs: FileSystem + + @Before + fun setUp() { + fs = Jimfs.newFileSystem() + } + + @Test + fun `insert and retrieve`() { + val testJar = makeTestJar() + val expectedHash = SecureHash.sha256(Files.readAllBytes(testJar)) + + val storage = NodeAttachmentStorage(fs.getPath("/")) + val id = testJar.use { storage.importAttachment(it) } + assertEquals(expectedHash, id) + + assertNull(storage.openAttachment(SecureHash.randomSHA256())) + val stream = storage.openAttachment(expectedHash)!!.openAsJAR() + val e1 = stream.nextJarEntry!! + assertEquals("test1.txt", e1.name) + assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "This is some useful content") + val e2 = stream.nextJarEntry!! + assertEquals("test2.txt", e2.name) + assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content") + } + + @Test + fun `duplicates not allowed`() { + val testJar = makeTestJar() + val storage = NodeAttachmentStorage(fs.getPath("/")) + testJar.use { storage.importAttachment(it) } + assertFailsWith { + testJar.use { storage.importAttachment(it) } + } + } + + @Test + fun `corrupt entry throws exception`() { + val testJar = makeTestJar() + val storage = NodeAttachmentStorage(fs.getPath("/")) + val id = testJar.use { storage.importAttachment(it) } + + // Corrupt the file in the store. + Files.write(fs.getPath("/", id.toString()), "arggghhhh".toByteArray(), StandardOpenOption.WRITE) + + val e = assertFailsWith { + storage.openAttachment(id)!!.open().use { it.readBytes() } + } + assertEquals(e.file, storage.storePath.resolve(id.toString())) + + // But if we skip around and read a single entry, no exception is thrown. + storage.openAttachment(id)!!.openAsJAR().use { + it.nextJarEntry + it.readBytes() + } + } + + private var counter = 0 + private fun makeTestJar(): Path { + counter++ + val f = fs.getPath("$counter.jar") + JarOutputStream(Files.newOutputStream(f)).use { + it.putNextEntry(JarEntry("test1.txt")) + it.write("This is some useful content".toByteArray()) + it.closeEntry() + it.putNextEntry(JarEntry("test2.txt")) + it.write("Some more useful content".toByteArray()) + it.closeEntry() + } + return f + } +} \ No newline at end of file