Add a simple file backed attachment store, with tests.

This commit is contained in:
Mike Hearn 2016-02-25 15:25:14 +01:00
parent 105f9e1069
commit 0224bca1a9
7 changed files with 284 additions and 6 deletions

View File

@ -48,8 +48,6 @@ configurations.all() {
}
dependencies {
testCompile 'junit:junit:4.12'
compile project(':contracts')
compile "com.google.code.findbugs:jsr305:3.0.1"
@ -96,6 +94,10 @@ dependencies {
compile("commons-logging:commons-logging:1.2") {
force = true
}
// Unit testing helpers.
testCompile 'junit:junit:4.12'
testCompile 'com.google.jimfs:jimfs:1.1' // in memory java.nio filesystem.
}
// These lines tell Gradle to add a couple of JVM command line arguments to unit test and program runs, which set up

View File

@ -12,6 +12,9 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import org.slf4j.Logger
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.security.SecureRandom
import java.time.Duration
import java.time.temporal.Temporal
@ -45,6 +48,7 @@ infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> =
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
fun <R> Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block)
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
fun <T> SettableFuture<T>.setFrom(logger: Logger? = null, block: () -> T): SettableFuture<T> {

View File

@ -12,6 +12,7 @@ import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.MessagingService
import core.messaging.NetworkMap
import java.io.InputStream
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -76,6 +77,7 @@ object DummyTimestampingAuthority {
* anything like that, this interface is only big enough to support the prototyping work.
*/
interface StorageService {
/** TODO: Temp scaffolding that will go away eventually. */
fun <K,V> getMap(tableName: String): MutableMap<K, V>
/**
@ -85,6 +87,9 @@ interface StorageService {
*/
val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
/** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */
val attachments: AttachmentStorage
/**
* A map of program hash->contract class type, used for verification.
*/
@ -98,6 +103,33 @@ interface StorageService {
val myLegalIdentityKey: KeyPair
}
/**
* An attachment store records potentially large binary objects, identified by their hash. Note that attachments are
* immutable and can never be erased once inserted!
*/
interface AttachmentStorage {
/**
* Returns a newly opened stream for the given locally stored attachment, or null if no such attachment is known.
* The returned stream must be closed when you are done with it to avoid resource leaks. You should probably wrap
* the result in a [JarInputStream] unless you're sending it somewhere, there is a convenience helper for this
* on [Attachment].
*/
fun openAttachment(id: SecureHash): Attachment?
/**
* Inserts the given attachment into the store, does *not* close the input stream. This can be an intensive
* operation due to the need to copy the bytes to disk and hash them along the way.
*
* Note that you should not pass a [JarInputStream] into this method and it will throw if you do, because access
* to the raw byte stream is required.
*
* @throws FileAlreadyExistsException if the given byte stream has already been inserted.
* @throws IllegalArgumentException if the given byte stream is empty or a [JarInputStream]
* @throws IOException if something went wrong.
*/
fun importAttachment(jar: InputStream): SecureHash
}
/**
* A service hub simply vends references to the other services a node has. Some of those services may be missing or
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of

View File

@ -185,6 +185,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMap("validated-transactions")
override val attachments: AttachmentStorage = NodeAttachmentStorage(dir.resolve("attachments"))
override val contractPrograms = contractFactory
override val myLegalIdentity = identity
override val myLegalIdentityKey = keypair

View File

@ -0,0 +1,107 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import com.google.common.hash.Hashing
import com.google.common.hash.HashingInputStream
import com.google.common.io.CountingInputStream
import core.Attachment
import core.AttachmentStorage
import core.crypto.SecureHash
import core.utilities.loggerFor
import java.io.FilterInputStream
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.*
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
/**
* Stores attachments in the specified local directory, which must exist. Doesn't allow new attachments to be uploaded.
*/
@ThreadSafe
class NodeAttachmentStorage(val storePath: Path) : AttachmentStorage {
private val log = loggerFor<NodeAttachmentStorage>()
init {
require(Files.isDirectory(storePath)) { "$storePath must be a directory" }
}
class OnDiskHashMismatch(val file: Path, val actual: SecureHash) : Exception() {
override fun toString() = "File $file hashed to $actual: corruption in attachment store?"
}
/**
* Wraps a stream and hashes data as it is read: if the entire stream is consumed, then at the end the hash of
* the read data is compared to the [expected] hash and [OnDiskHashMismatch] is thrown by [close] if they didn't
* match. The goal of this is to detect cases where attachments in the store have been tampered with or corrupted
* and no longer match their file name. It won't always work: if we read a zip for our own uses and skip around
* inside it, we haven't read the whole file, so we can't check the hash. But when copying it over the network
* this will provide an additional safety check against user error.
*/
private class HashCheckingStream(val expected: SecureHash.SHA256,
val filePath: Path,
input: InputStream,
private val counter: CountingInputStream = CountingInputStream(input),
private val stream: HashingInputStream = HashingInputStream(Hashing.sha256(), counter)) : FilterInputStream(stream) {
private val expectedSize = Files.size(filePath)
override fun close() {
super.close()
if (counter.count != expectedSize) return
val actual = SecureHash.SHA256(stream.hash().asBytes())
if (actual != expected)
throw OnDiskHashMismatch(filePath, actual)
}
}
override fun openAttachment(id: SecureHash): Attachment? {
val path = storePath.resolve(id.toString())
if (!Files.exists(path)) return null
var stream = Files.newInputStream(path)
// This is just an optional safety check. If it slows things down too much it can be disabled.
if (id is SecureHash.SHA256)
stream = HashCheckingStream(id, path, stream)
log.debug("Opening attachment $id")
return object : Attachment {
override fun open(): InputStream = stream
override val id: SecureHash = id
}
}
override fun importAttachment(jar: InputStream): SecureHash {
require(jar !is JarInputStream)
val hs = HashingInputStream(Hashing.sha256(), jar)
val tmp = storePath.resolve("tmp.${UUID.randomUUID()}")
Files.copy(hs, tmp)
checkIsAValidJAR(tmp)
val id = SecureHash.SHA256(hs.hash().asBytes())
val finalPath = storePath.resolve(id.toString())
try {
// Move into place atomically or fail if that isn't possible. We don't want a half moved attachment to
// be exposed to parallel threads. This gives us thread safety.
Files.move(tmp, finalPath, StandardCopyOption.ATOMIC_MOVE)
} finally {
Files.deleteIfExists(tmp)
}
log.info("Stored new attachment $id")
return id
}
private fun checkIsAValidJAR(path: Path) {
// Just iterate over the entries with verification enabled: should be good enough to catch mistakes.
JarInputStream(Files.newInputStream(path), true).use { stream ->
var cursor = stream.nextJarEntry
while (cursor != null) cursor = stream.nextJarEntry
}
}
}

View File

@ -8,10 +8,7 @@
package core
import core.crypto.DigitalSignature
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.crypto.signWithECDSA
import core.crypto.*
import core.messaging.MessagingService
import core.messaging.MockNetworkMap
import core.messaging.NetworkMap
@ -24,6 +21,10 @@ import core.testutils.TEST_KEYS_TO_CORP_MAP
import core.testutils.TEST_PROGRAM_MAP
import core.testutils.TEST_TX_TIME
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.InputStream
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -31,6 +32,7 @@ import java.time.Clock
import java.time.Duration
import java.time.ZoneId
import java.util.*
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
/**
@ -67,6 +69,34 @@ class MockWalletService(val states: List<StateAndRef<OwnableState>>) : WalletSer
override val currentWallet = Wallet(states)
}
class MockAttachmentStorage : AttachmentStorage {
val files = HashMap<SecureHash, ByteArray>()
override fun openAttachment(id: SecureHash): Attachment? {
val f = files[id] ?: return null
return object : Attachment {
override fun open(): JarInputStream = JarInputStream(ByteArrayInputStream(f))
override val id: SecureHash = id
}
}
override fun importAttachment(jar: InputStream): SecureHash {
// JIS makes read()/readBytes() return bytes of the current file, but we want to hash the entire container here.
require(jar !is JarInputStream)
val bytes = run {
val s = ByteArrayOutputStream()
jar.copyTo(s)
s.toByteArray()
}
val sha256 = bytes.sha256()
if (files.containsKey(sha256))
throw FileAlreadyExistsException(File("!! MOCK FILE NAME"))
files[sha256] = bytes
return sha256
}
}
@ThreadSafe
class MockStorageService(val recordingAs: Map<String, String>? = null) : StorageService {
override val myLegalIdentityKey: KeyPair = generateKeyPair()
@ -79,6 +109,8 @@ class MockStorageService(val recordingAs: Map<String, String>? = null) : Storage
override val contractPrograms = MockContractFactory
override val attachments: AttachmentStorage = MockAttachmentStorage()
@Suppress("UNCHECKED_CAST")
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
synchronized(tables) {

View File

@ -0,0 +1,100 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import com.google.common.jimfs.Jimfs
import core.crypto.SecureHash
import core.use
import org.junit.Before
import org.junit.Test
import java.nio.charset.Charset
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.jar.JarEntry
import java.util.jar.JarOutputStream
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
class NodeAttachmentStorageTest {
// Use an in memory file system for testing attachment storage.
lateinit var fs: FileSystem
@Before
fun setUp() {
fs = Jimfs.newFileSystem()
}
@Test
fun `insert and retrieve`() {
val testJar = makeTestJar()
val expectedHash = SecureHash.sha256(Files.readAllBytes(testJar))
val storage = NodeAttachmentStorage(fs.getPath("/"))
val id = testJar.use { storage.importAttachment(it) }
assertEquals(expectedHash, id)
assertNull(storage.openAttachment(SecureHash.randomSHA256()))
val stream = storage.openAttachment(expectedHash)!!.openAsJAR()
val e1 = stream.nextJarEntry!!
assertEquals("test1.txt", e1.name)
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "This is some useful content")
val e2 = stream.nextJarEntry!!
assertEquals("test2.txt", e2.name)
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content")
}
@Test
fun `duplicates not allowed`() {
val testJar = makeTestJar()
val storage = NodeAttachmentStorage(fs.getPath("/"))
testJar.use { storage.importAttachment(it) }
assertFailsWith<java.nio.file.FileAlreadyExistsException> {
testJar.use { storage.importAttachment(it) }
}
}
@Test
fun `corrupt entry throws exception`() {
val testJar = makeTestJar()
val storage = NodeAttachmentStorage(fs.getPath("/"))
val id = testJar.use { storage.importAttachment(it) }
// Corrupt the file in the store.
Files.write(fs.getPath("/", id.toString()), "arggghhhh".toByteArray(), StandardOpenOption.WRITE)
val e = assertFailsWith<NodeAttachmentStorage.OnDiskHashMismatch> {
storage.openAttachment(id)!!.open().use { it.readBytes() }
}
assertEquals(e.file, storage.storePath.resolve(id.toString()))
// But if we skip around and read a single entry, no exception is thrown.
storage.openAttachment(id)!!.openAsJAR().use {
it.nextJarEntry
it.readBytes()
}
}
private var counter = 0
private fun makeTestJar(): Path {
counter++
val f = fs.getPath("$counter.jar")
JarOutputStream(Files.newOutputStream(f)).use {
it.putNextEntry(JarEntry("test1.txt"))
it.write("This is some useful content".toByteArray())
it.closeEntry()
it.putNextEntry(JarEntry("test2.txt"))
it.write("Some more useful content".toByteArray())
it.closeEntry()
}
return f
}
}