From 37f1de8a4d12ce62d3cbebb7fd6dc925028a2ac3 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Fri, 26 Feb 2016 13:53:32 +0100 Subject: [PATCH] Refactor FetchTransactionsProtocol into FetchDataProtocol and add support for fetching attachments. --- build.gradle | 6 + scripts/trader-demo.sh | 2 +- .../protocols/FetchAttachmentsProtocol.kt | 44 +++++++ .../contracts/protocols/FetchDataProtocol.kt | 109 ++++++++++++++++++ .../protocols/FetchTransactionsProtocol.kt | 80 ++----------- .../kotlin/core/node/DataVendingService.kt | 28 +++-- src/main/kotlin/core/node/Node.kt | 30 ++++- .../node/servlets/AttachmentUploadServlet.kt | 51 ++++++++ 8 files changed, 268 insertions(+), 82 deletions(-) create mode 100644 src/main/kotlin/contracts/protocols/FetchAttachmentsProtocol.kt create mode 100644 src/main/kotlin/contracts/protocols/FetchDataProtocol.kt create mode 100644 src/main/kotlin/core/node/servlets/AttachmentUploadServlet.kt diff --git a/build.gradle b/build.gradle index 1f4ac549f1..646ec1c2dc 100644 --- a/build.gradle +++ b/build.gradle @@ -15,6 +15,7 @@ buildscript { ext.quasar_version = '0.7.4' ext.asm_version = '0.5.3' ext.artemis_version = '1.2.0' + ext.jetty_version = '9.3.7.v20160115' repositories { mavenCentral() @@ -95,6 +96,11 @@ dependencies { force = true } + // Web stuff: for HTTP[S] servlets + compile "org.eclipse.jetty:jetty-servlet:${jetty_version}" + compile "javax.servlet:javax.servlet-api:3.1.0" + compile "commons-fileupload:commons-fileupload:1.3.1" + // Unit testing helpers. testCompile 'junit:junit:4.12' testCompile 'com.google.jimfs:jimfs:1.1' // in memory java.nio filesystem. diff --git a/scripts/trader-demo.sh b/scripts/trader-demo.sh index 38daf4affb..59c39b2cb6 100755 --- a/scripts/trader-demo.sh +++ b/scripts/trader-demo.sh @@ -24,7 +24,7 @@ elif [[ "$mode" == "seller" ]]; then echo "myLegalName = Bank of Giza" >seller/config fi - build/install/r3prototyping/bin/r3prototyping --dir=seller --fake-trade-with=localhost --network-address=localhost:31338 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost + build/install/r3prototyping/bin/r3prototyping --dir=seller --fake-trade-with=localhost --network-address=localhost:31340 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost else echo "Run like this, one in each tab:" echo diff --git a/src/main/kotlin/contracts/protocols/FetchAttachmentsProtocol.kt b/src/main/kotlin/contracts/protocols/FetchAttachmentsProtocol.kt new file mode 100644 index 0000000000..955f0aa9e2 --- /dev/null +++ b/src/main/kotlin/contracts/protocols/FetchAttachmentsProtocol.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package contracts.protocols + +import core.Attachment +import core.crypto.SecureHash +import core.crypto.sha256 +import core.messaging.SingleMessageRecipient +import java.io.ByteArrayInputStream +import java.io.InputStream + +/** + * Given a set of hashes either loads from from local storage or requests them from the other peer. Downloaded + * attachments are saved to local storage automatically. + */ +class FetchAttachmentsProtocol(requests: Set, + otherSide: SingleMessageRecipient) : FetchDataProtocol(requests, otherSide) { + companion object { + const val TOPIC = "platform.fetch.attachment" + } + + override fun load(txid: SecureHash): Attachment? = serviceHub.storageService.attachments.openAttachment(txid) + + override val queryTopic: String = TOPIC + + override fun convert(wire: ByteArray): Attachment { + return object : Attachment { + override fun open(): InputStream = ByteArrayInputStream(wire) + override val id: SecureHash = wire.sha256() + } + } + + override fun maybeWriteToDisk(downloaded: List) { + for (attachment in downloaded) { + serviceHub.storageService.attachments.importAttachment(attachment.open()) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/contracts/protocols/FetchDataProtocol.kt b/src/main/kotlin/contracts/protocols/FetchDataProtocol.kt new file mode 100644 index 0000000000..030a4f3ca6 --- /dev/null +++ b/src/main/kotlin/contracts/protocols/FetchDataProtocol.kt @@ -0,0 +1,109 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package contracts.protocols + +import co.paralleluniverse.fibers.Suspendable +import core.NamedByHash +import core.crypto.SecureHash +import core.messaging.SingleMessageRecipient +import core.node.DataVendingService +import core.protocols.ProtocolLogic +import core.random63BitValue +import core.utilities.UntrustworthyData +import java.util.* + +/** + * An abstract protocol for fetching typed data from a remote peer. + * + * Given a set of hashes (IDs), either loads them from local disk or asks the remote peer to provide them. + * + * A malicious response in which the data provided by the remote peer does not hash to the requested hash results in + * [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a + * [HashNotFound] exception being thrown. + * + * By default this class does not insert data into any local database, if you want to do that after missing items were + * fetched then override [maybeWriteToDisk]. You *must* override [load] and [queryTopic]. If the wire type is not the + * same as the ultimate type, you must also override [convert]. + * + * @param T The ultimate type of the data being fetched. + * @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type. + */ +abstract class FetchDataProtocol( + protected val requests: Set, + protected val otherSide: SingleMessageRecipient) : ProtocolLogic>() { + + open class BadAnswer : Exception() + class HashNotFound(val requested: SecureHash) : BadAnswer() + class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer() + + data class Result(val fromDisk: List, val downloaded: List) + + protected abstract val queryTopic: String + + @Suspendable + override fun call(): Result { + // Load the items we have from disk and figure out which we're missing. + val (fromDisk, toFetch) = loadWhatWeHave() + + return if (toFetch.isEmpty()) { + Result(fromDisk, emptyList()) + } else { + logger.trace("Requesting ${toFetch.size} dependency(s) for verification") + + val sid = random63BitValue() + val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid) + // TODO: Support "large message" response streaming so response sizes are not limited by RAM. + val maybeItems = sendAndReceive>(queryTopic, otherSide, 0, sid, fetchReq) + // Check for a buggy/malicious peer answering with something that we didn't ask for. + val downloaded = validateFetchResponse(maybeItems, toFetch) + maybeWriteToDisk(downloaded) + Result(fromDisk, downloaded) + } + } + + protected open fun maybeWriteToDisk(downloaded: List) { + // Do nothing by default. + } + + private fun loadWhatWeHave(): Pair, List> { + val fromDisk = ArrayList() + val toFetch = ArrayList() + for (txid in requests) { + val stx = load(txid) + if (stx == null) + toFetch += txid + else + fromDisk += stx + } + return Pair(fromDisk, toFetch) + } + + protected abstract fun load(txid: SecureHash): T? + + @Suppress("UNCHECKED_CAST") + protected open fun convert(wire: W): T = wire as T + + private fun validateFetchResponse(maybeItems: UntrustworthyData>, + requests: List): List = + maybeItems.validate { response -> + if (response.size != requests.size) + throw BadAnswer() + for ((index, resp) in response.withIndex()) { + if (resp == null) throw HashNotFound(requests[index]) + } + val answers = response.requireNoNulls().map { convert(it) } + // Check transactions actually hash to what we requested, if this fails the remote node + // is a malicious protocol violator or buggy. + for ((index, item) in answers.withIndex()) + if (item.id != requests[index]) + throw DownloadedVsRequestedDataMismatch(requests[index], item.id) + + answers + } +} \ No newline at end of file diff --git a/src/main/kotlin/contracts/protocols/FetchTransactionsProtocol.kt b/src/main/kotlin/contracts/protocols/FetchTransactionsProtocol.kt index c934801a40..59f08bdf2b 100644 --- a/src/main/kotlin/contracts/protocols/FetchTransactionsProtocol.kt +++ b/src/main/kotlin/contracts/protocols/FetchTransactionsProtocol.kt @@ -8,82 +8,24 @@ package contracts.protocols -import co.paralleluniverse.fibers.Suspendable import core.SignedTransaction import core.crypto.SecureHash -import core.protocols.ProtocolLogic import core.messaging.SingleMessageRecipient -import core.utilities.UntrustworthyData -import core.node.DataVendingService -import core.random63BitValue -import java.util.* /** * Given a set of tx hashes (IDs), either loads them from local disk or asks the remote peer to provide them. + * * A malicious response in which the data provided by the remote peer does not hash to the requested hash results in - * [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a - * HashNotFound. Note that returned transactions are not inserted into the database, because it's up to the caller - * to actually verify the transactions are valid. + * [FetchDataProtocol.DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it + * results in a [FetchDataProtocol.HashNotFound] exception. Note that returned transactions are not inserted into + * the database, because it's up to the caller to actually verify the transactions are valid. */ -class FetchTransactionsProtocol(private val requests: Set, - private val otherSide: SingleMessageRecipient) : ProtocolLogic() { - - data class Result(val fromDisk: List, val downloaded: List) - - @Suspendable - override fun call(): Result { - val sid = random63BitValue() - - // Load the transactions we have from disk and figure out which we're missing. - val (fromDisk, toFetch) = loadWhatWeHave() - - return if (toFetch.isEmpty()) { - Result(fromDisk, emptyList()) - } else { - logger.trace("Requesting ${toFetch.size} dependency(s) for verification") - - val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid) - val maybeTxns = sendAndReceive>("platform.fetch.tx", otherSide, 0, sid, fetchReq) - // Check for a buggy/malicious peer answering with something that we didn't ask for. - // Note that we strip the UntrustworthyData marker here, but of course the returned transactions may be - // invalid in other ways! Perhaps we should keep it. - Result(fromDisk, validateTXFetchResponse(maybeTxns, toFetch)) - } +class FetchTransactionsProtocol(requests: Set, otherSide: SingleMessageRecipient) : + FetchDataProtocol(requests, otherSide) { + companion object { + const val TOPIC = "platform.fetch.tx" } - private fun loadWhatWeHave(): Pair, List> { - val fromDisk = ArrayList() - val toFetch = ArrayList() - for (txid in requests) { - val stx = serviceHub.storageService.validatedTransactions[txid] - if (stx == null) - toFetch += txid - else - fromDisk += stx - } - return Pair(fromDisk, toFetch) - } - - private fun validateTXFetchResponse(maybeTxns: UntrustworthyData>, - requests: List): List = - maybeTxns.validate { response -> - if (response.size != requests.size) - throw BadAnswer() - for ((index, resp) in response.withIndex()) { - if (resp == null) throw HashNotFound(requests[index]) - } - val answers = response.requireNoNulls() - // Check transactions actually hash to what we requested, if this fails the remote node - // is a malicious protocol violator or buggy. - for ((index, stx) in answers.withIndex()) - if (stx.id != requests[index]) - throw DownloadedVsRequestedDataMismatch(requests[index], stx.id) - - answers - } - - open class BadAnswer : Exception() - class HashNotFound(val requested: SecureHash) : BadAnswer() - class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer() -} - + override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions[txid] + override val queryTopic: String = TOPIC +} \ No newline at end of file diff --git a/src/main/kotlin/core/node/DataVendingService.kt b/src/main/kotlin/core/node/DataVendingService.kt index f13e40ef7d..2e39572dea 100644 --- a/src/main/kotlin/core/node/DataVendingService.kt +++ b/src/main/kotlin/core/node/DataVendingService.kt @@ -8,6 +8,8 @@ package core.node +import contracts.protocols.FetchAttachmentsProtocol +import contracts.protocols.FetchTransactionsProtocol import core.StorageService import core.crypto.SecureHash import core.messaging.Message @@ -16,6 +18,7 @@ import core.messaging.SingleMessageRecipient import core.messaging.send import core.serialization.deserialize import core.utilities.loggerFor +import java.io.InputStream import javax.annotation.concurrent.ThreadSafe /** @@ -33,15 +36,12 @@ import javax.annotation.concurrent.ThreadSafe @ThreadSafe class DataVendingService(private val net: MessagingService, private val storage: StorageService) { companion object { - val TX_FETCH_TOPIC = "platform.fetch.tx" - val CONTRACT_FETCH_TOPIC = "platform.fetch.contract" - val logger = loggerFor() } init { - net.addMessageHandler("$TX_FETCH_TOPIC.0") { msg, registration -> handleTXRequest(msg) } - net.addMessageHandler("$CONTRACT_FETCH_TOPIC.0") { msg, registration -> handleContractRequest(msg) } + net.addMessageHandler("${FetchTransactionsProtocol.TOPIC}.0") { msg, registration -> handleTXRequest(msg) } + net.addMessageHandler("${FetchAttachmentsProtocol.TOPIC}.0") { msg, registration -> handleAttachmentRequest(msg) } } // TODO: Give all messages a respond-to address+session ID automatically. @@ -56,10 +56,22 @@ class DataVendingService(private val net: MessagingService, private val storage: logger.info("Got request for unknown tx $it") tx } - net.send("$TX_FETCH_TOPIC.${req.sessionID}", req.responseTo, answers) + net.send("${FetchTransactionsProtocol.TOPIC}.${req.sessionID}", req.responseTo, answers) } - private fun handleContractRequest(msg: Message) { - TODO("PLT-12: Basic module/sandbox system for contracts: $msg") + private fun handleAttachmentRequest(msg: Message) { + // TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer. + val req = msg.data.deserialize() + require(req.hashes.isNotEmpty()) + val answers: List = req.hashes.map { + val jar: InputStream? = storage.attachments.openAttachment(it)?.open() + if (jar == null) { + logger.info("Got request for unknown attachment $it") + null + } else { + jar.readBytes() + } + } + net.send("${FetchAttachmentsProtocol.TOPIC}.${req.sessionID}", req.responseTo, answers) } } diff --git a/src/main/kotlin/core/node/Node.kt b/src/main/kotlin/core/node/Node.kt index 2272810d38..2974cb5e0d 100644 --- a/src/main/kotlin/core/node/Node.kt +++ b/src/main/kotlin/core/node/Node.kt @@ -14,9 +14,12 @@ import core.* import core.crypto.SecureHash import core.crypto.generateKeyPair import core.messaging.* +import core.node.servlets.AttachmentUploadServlet import core.serialization.deserialize import core.serialization.serialize import core.utilities.loggerFor +import org.eclipse.jetty.server.Server +import org.eclipse.jetty.servlet.ServletContextHandler import java.io.RandomAccessFile import java.lang.management.ManagementFactory import java.nio.channels.FileLock @@ -47,12 +50,12 @@ class NodeConfiguration(private val properties: Properties) { * loads important data off disk and starts listening for connections. * * @param dir A [Path] to a location on disk where working files can be found or stored. - * @param myNetAddr The host and port that this server will use. It can't find out its own external hostname, so you + * @param p2pPort The host and port that this server will use. It can't find out its own external hostname, so you * have to specify that yourself. * @param configuration This is typically loaded from a .properties file * @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one. */ -class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeConfiguration, +class Node(val dir: Path, val p2pPort: HostAndPort, val configuration: NodeConfiguration, timestamperAddress: LegallyIdentifiableNode?) { private val log = loggerFor() @@ -92,6 +95,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon val keyManagement: E2ETestKeyManagementService val inNodeTimestampingService: TimestamperNodeService? val identity: IdentityService + val webServer: Server // Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us // when our process shuts down, but we try in stop() anyway just to be nice. @@ -102,7 +106,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon storage = makeStorageService(dir) smm = StateMachineManager(services, serverThread) - net = ArtemisMessagingService(dir, myNetAddr) + net = ArtemisMessagingService(dir, p2pPort) wallet = E2ETestWalletService(services) keyManagement = E2ETestKeyManagementService() @@ -129,16 +133,34 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon // service and so that keeps it from being collected. DataVendingService(net, storage) + // Start up the MQ service. net.start() + + webServer = initWebServer() + } + + private fun initWebServer(): Server { + val port = p2pPort.port + 1 // TODO: Move this into the node config file. + val server = Server(port) + val handler = ServletContextHandler() + handler.setAttribute("storage", storage) + handler.addServlet(AttachmentUploadServlet::class.java, "/attachments/upload") + server.handler = handler + server.start() + return server } fun stop() { + webServer.stop() net.stop() serverThread.shutdownNow() nodeFileLock!!.release() } fun makeStorageService(dir: Path): StorageService { + val attachmentsDir = dir.resolve("attachments") + try { Files.createDirectory(attachmentsDir) } catch (e: java.nio.file.FileAlreadyExistsException) {} + // Load the private identity key, creating it if necessary. The identity key is a long term well known key that // is distributed to other peers and we use it (or a key signed by it) when we need to do something // "permissioned". The identity file is what gets distributed and contains the node's legal name along with @@ -185,7 +207,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon override val validatedTransactions: MutableMap get() = getMap("validated-transactions") - override val attachments: AttachmentStorage = NodeAttachmentStorage(dir.resolve("attachments")) + override val attachments: AttachmentStorage = NodeAttachmentStorage(attachmentsDir) override val contractPrograms = contractFactory override val myLegalIdentity = identity override val myLegalIdentityKey = keypair diff --git a/src/main/kotlin/core/node/servlets/AttachmentUploadServlet.kt b/src/main/kotlin/core/node/servlets/AttachmentUploadServlet.kt new file mode 100644 index 0000000000..45e08f7e11 --- /dev/null +++ b/src/main/kotlin/core/node/servlets/AttachmentUploadServlet.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.node.servlets + +import core.StorageService +import core.utilities.loggerFor +import org.apache.commons.fileupload.servlet.ServletFileUpload +import javax.servlet.http.HttpServlet +import javax.servlet.http.HttpServletRequest +import javax.servlet.http.HttpServletResponse + +class AttachmentUploadServlet : HttpServlet() { + private val log = loggerFor() + + override fun doPost(req: HttpServletRequest, resp: HttpServletResponse) { + @Suppress("DEPRECATION") // Bogus warning due to superclass static method being deprecated. + val isMultipart = ServletFileUpload.isMultipartContent(req) + + if (!isMultipart) { + log.error("Got a non-file upload request to the attachments servlet") + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "This end point is for file uploads only.") + return + } + + val upload = ServletFileUpload() + val iterator = upload.getItemIterator(req) + while (iterator.hasNext()) { + val item = iterator.next() + if (!item.name.endsWith(".jar")) { + log.error("Attempted upload of a non-JAR attachment: mime=${item.contentType} filename=${item.name}") + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, + "${item.name}: Must be have a MIME type of application/java-archive and a filename ending in .jar") + return + } + + log.info("Receiving ${item.name}") + + val storage = servletContext.getAttribute("storage") as StorageService + item.openStream().use { + val id = storage.attachments.importAttachment(it) + log.info("${item.name} successfully inserted into the attachment store with id $id") + } + } + } +}