Refactor FetchTransactionsProtocol into FetchDataProtocol and add support for fetching attachments.

This commit is contained in:
Mike Hearn 2016-02-26 13:53:32 +01:00
parent 0224bca1a9
commit 37f1de8a4d
8 changed files with 268 additions and 82 deletions

View File

@ -15,6 +15,7 @@ buildscript {
ext.quasar_version = '0.7.4'
ext.asm_version = '0.5.3'
ext.artemis_version = '1.2.0'
ext.jetty_version = '9.3.7.v20160115'
repositories {
mavenCentral()
@ -95,6 +96,11 @@ dependencies {
force = true
}
// Web stuff: for HTTP[S] servlets
compile "org.eclipse.jetty:jetty-servlet:${jetty_version}"
compile "javax.servlet:javax.servlet-api:3.1.0"
compile "commons-fileupload:commons-fileupload:1.3.1"
// Unit testing helpers.
testCompile 'junit:junit:4.12'
testCompile 'com.google.jimfs:jimfs:1.1' // in memory java.nio filesystem.

View File

@ -24,7 +24,7 @@ elif [[ "$mode" == "seller" ]]; then
echo "myLegalName = Bank of Giza" >seller/config
fi
build/install/r3prototyping/bin/r3prototyping --dir=seller --fake-trade-with=localhost --network-address=localhost:31338 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost
build/install/r3prototyping/bin/r3prototyping --dir=seller --fake-trade-with=localhost --network-address=localhost:31340 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost
else
echo "Run like this, one in each tab:"
echo

View File

@ -0,0 +1,44 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package contracts.protocols
import core.Attachment
import core.crypto.SecureHash
import core.crypto.sha256
import core.messaging.SingleMessageRecipient
import java.io.ByteArrayInputStream
import java.io.InputStream
/**
* Given a set of hashes either loads from from local storage or requests them from the other peer. Downloaded
* attachments are saved to local storage automatically.
*/
class FetchAttachmentsProtocol(requests: Set<SecureHash>,
otherSide: SingleMessageRecipient) : FetchDataProtocol<Attachment, ByteArray>(requests, otherSide) {
companion object {
const val TOPIC = "platform.fetch.attachment"
}
override fun load(txid: SecureHash): Attachment? = serviceHub.storageService.attachments.openAttachment(txid)
override val queryTopic: String = TOPIC
override fun convert(wire: ByteArray): Attachment {
return object : Attachment {
override fun open(): InputStream = ByteArrayInputStream(wire)
override val id: SecureHash = wire.sha256()
}
}
override fun maybeWriteToDisk(downloaded: List<Attachment>) {
for (attachment in downloaded) {
serviceHub.storageService.attachments.importAttachment(attachment.open())
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package contracts.protocols
import co.paralleluniverse.fibers.Suspendable
import core.NamedByHash
import core.crypto.SecureHash
import core.messaging.SingleMessageRecipient
import core.node.DataVendingService
import core.protocols.ProtocolLogic
import core.random63BitValue
import core.utilities.UntrustworthyData
import java.util.*
/**
* An abstract protocol for fetching typed data from a remote peer.
*
* Given a set of hashes (IDs), either loads them from local disk or asks the remote peer to provide them.
*
* A malicious response in which the data provided by the remote peer does not hash to the requested hash results in
* [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a
* [HashNotFound] exception being thrown.
*
* By default this class does not insert data into any local database, if you want to do that after missing items were
* fetched then override [maybeWriteToDisk]. You *must* override [load] and [queryTopic]. If the wire type is not the
* same as the ultimate type, you must also override [convert].
*
* @param T The ultimate type of the data being fetched.
* @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
*/
abstract class FetchDataProtocol<T : NamedByHash, W : Any>(
protected val requests: Set<SecureHash>,
protected val otherSide: SingleMessageRecipient) : ProtocolLogic<FetchDataProtocol.Result<T>>() {
open class BadAnswer : Exception()
class HashNotFound(val requested: SecureHash) : BadAnswer()
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer()
data class Result<T : NamedByHash>(val fromDisk: List<T>, val downloaded: List<T>)
protected abstract val queryTopic: String
@Suspendable
override fun call(): Result<T> {
// Load the items we have from disk and figure out which we're missing.
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
Result(fromDisk, emptyList())
} else {
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
val sid = random63BitValue()
val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid)
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
val maybeItems = sendAndReceive<ArrayList<W?>>(queryTopic, otherSide, 0, sid, fetchReq)
// Check for a buggy/malicious peer answering with something that we didn't ask for.
val downloaded = validateFetchResponse(maybeItems, toFetch)
maybeWriteToDisk(downloaded)
Result(fromDisk, downloaded)
}
}
protected open fun maybeWriteToDisk(downloaded: List<T>) {
// Do nothing by default.
}
private fun loadWhatWeHave(): Pair<List<T>, List<SecureHash>> {
val fromDisk = ArrayList<T>()
val toFetch = ArrayList<SecureHash>()
for (txid in requests) {
val stx = load(txid)
if (stx == null)
toFetch += txid
else
fromDisk += stx
}
return Pair(fromDisk, toFetch)
}
protected abstract fun load(txid: SecureHash): T?
@Suppress("UNCHECKED_CAST")
protected open fun convert(wire: W): T = wire as T
private fun validateFetchResponse(maybeItems: UntrustworthyData<ArrayList<W?>>,
requests: List<SecureHash>): List<T> =
maybeItems.validate { response ->
if (response.size != requests.size)
throw BadAnswer()
for ((index, resp) in response.withIndex()) {
if (resp == null) throw HashNotFound(requests[index])
}
val answers = response.requireNoNulls().map { convert(it) }
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious protocol violator or buggy.
for ((index, item) in answers.withIndex())
if (item.id != requests[index])
throw DownloadedVsRequestedDataMismatch(requests[index], item.id)
answers
}
}

View File

@ -8,82 +8,24 @@
package contracts.protocols
import co.paralleluniverse.fibers.Suspendable
import core.SignedTransaction
import core.crypto.SecureHash
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.utilities.UntrustworthyData
import core.node.DataVendingService
import core.random63BitValue
import java.util.*
/**
* Given a set of tx hashes (IDs), either loads them from local disk or asks the remote peer to provide them.
*
* A malicious response in which the data provided by the remote peer does not hash to the requested hash results in
* [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a
* HashNotFound. Note that returned transactions are not inserted into the database, because it's up to the caller
* to actually verify the transactions are valid.
* [FetchDataProtocol.DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it
* results in a [FetchDataProtocol.HashNotFound] exception. Note that returned transactions are not inserted into
* the database, because it's up to the caller to actually verify the transactions are valid.
*/
class FetchTransactionsProtocol(private val requests: Set<SecureHash>,
private val otherSide: SingleMessageRecipient) : ProtocolLogic<FetchTransactionsProtocol.Result>() {
data class Result(val fromDisk: List<SignedTransaction>, val downloaded: List<SignedTransaction>)
@Suspendable
override fun call(): Result {
val sid = random63BitValue()
// Load the transactions we have from disk and figure out which we're missing.
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
Result(fromDisk, emptyList())
} else {
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid)
val maybeTxns = sendAndReceive<ArrayList<SignedTransaction?>>("platform.fetch.tx", otherSide, 0, sid, fetchReq)
// Check for a buggy/malicious peer answering with something that we didn't ask for.
// Note that we strip the UntrustworthyData marker here, but of course the returned transactions may be
// invalid in other ways! Perhaps we should keep it.
Result(fromDisk, validateTXFetchResponse(maybeTxns, toFetch))
}
class FetchTransactionsProtocol(requests: Set<SecureHash>, otherSide: SingleMessageRecipient) :
FetchDataProtocol<SignedTransaction, SignedTransaction>(requests, otherSide) {
companion object {
const val TOPIC = "platform.fetch.tx"
}
private fun loadWhatWeHave(): Pair<List<SignedTransaction>, List<SecureHash>> {
val fromDisk = ArrayList<SignedTransaction>()
val toFetch = ArrayList<SecureHash>()
for (txid in requests) {
val stx = serviceHub.storageService.validatedTransactions[txid]
if (stx == null)
toFetch += txid
else
fromDisk += stx
}
return Pair(fromDisk, toFetch)
}
private fun validateTXFetchResponse(maybeTxns: UntrustworthyData<ArrayList<SignedTransaction?>>,
requests: List<SecureHash>): List<SignedTransaction> =
maybeTxns.validate { response ->
if (response.size != requests.size)
throw BadAnswer()
for ((index, resp) in response.withIndex()) {
if (resp == null) throw HashNotFound(requests[index])
}
val answers = response.requireNoNulls()
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious protocol violator or buggy.
for ((index, stx) in answers.withIndex())
if (stx.id != requests[index])
throw DownloadedVsRequestedDataMismatch(requests[index], stx.id)
answers
}
open class BadAnswer : Exception()
class HashNotFound(val requested: SecureHash) : BadAnswer()
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer()
}
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions[txid]
override val queryTopic: String = TOPIC
}

View File

@ -8,6 +8,8 @@
package core.node
import contracts.protocols.FetchAttachmentsProtocol
import contracts.protocols.FetchTransactionsProtocol
import core.StorageService
import core.crypto.SecureHash
import core.messaging.Message
@ -16,6 +18,7 @@ import core.messaging.SingleMessageRecipient
import core.messaging.send
import core.serialization.deserialize
import core.utilities.loggerFor
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
/**
@ -33,15 +36,12 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class DataVendingService(private val net: MessagingService, private val storage: StorageService) {
companion object {
val TX_FETCH_TOPIC = "platform.fetch.tx"
val CONTRACT_FETCH_TOPIC = "platform.fetch.contract"
val logger = loggerFor<DataVendingService>()
}
init {
net.addMessageHandler("$TX_FETCH_TOPIC.0") { msg, registration -> handleTXRequest(msg) }
net.addMessageHandler("$CONTRACT_FETCH_TOPIC.0") { msg, registration -> handleContractRequest(msg) }
net.addMessageHandler("${FetchTransactionsProtocol.TOPIC}.0") { msg, registration -> handleTXRequest(msg) }
net.addMessageHandler("${FetchAttachmentsProtocol.TOPIC}.0") { msg, registration -> handleAttachmentRequest(msg) }
}
// TODO: Give all messages a respond-to address+session ID automatically.
@ -56,10 +56,22 @@ class DataVendingService(private val net: MessagingService, private val storage:
logger.info("Got request for unknown tx $it")
tx
}
net.send("$TX_FETCH_TOPIC.${req.sessionID}", req.responseTo, answers)
net.send("${FetchTransactionsProtocol.TOPIC}.${req.sessionID}", req.responseTo, answers)
}
private fun handleContractRequest(msg: Message) {
TODO("PLT-12: Basic module/sandbox system for contracts: $msg")
private fun handleAttachmentRequest(msg: Message) {
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
val req = msg.data.deserialize<Request>()
require(req.hashes.isNotEmpty())
val answers: List<ByteArray?> = req.hashes.map {
val jar: InputStream? = storage.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
}
}
net.send("${FetchAttachmentsProtocol.TOPIC}.${req.sessionID}", req.responseTo, answers)
}
}

View File

@ -14,9 +14,12 @@ import core.*
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.*
import core.node.servlets.AttachmentUploadServlet
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.loggerFor
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.channels.FileLock
@ -47,12 +50,12 @@ class NodeConfiguration(private val properties: Properties) {
* loads important data off disk and starts listening for connections.
*
* @param dir A [Path] to a location on disk where working files can be found or stored.
* @param myNetAddr The host and port that this server will use. It can't find out its own external hostname, so you
* @param p2pPort The host and port that this server will use. It can't find out its own external hostname, so you
* have to specify that yourself.
* @param configuration This is typically loaded from a .properties file
* @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one.
*/
class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeConfiguration,
class Node(val dir: Path, val p2pPort: HostAndPort, val configuration: NodeConfiguration,
timestamperAddress: LegallyIdentifiableNode?) {
private val log = loggerFor<Node>()
@ -92,6 +95,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
val keyManagement: E2ETestKeyManagementService
val inNodeTimestampingService: TimestamperNodeService?
val identity: IdentityService
val webServer: Server
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
@ -102,7 +106,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
storage = makeStorageService(dir)
smm = StateMachineManager(services, serverThread)
net = ArtemisMessagingService(dir, myNetAddr)
net = ArtemisMessagingService(dir, p2pPort)
wallet = E2ETestWalletService(services)
keyManagement = E2ETestKeyManagementService()
@ -129,16 +133,34 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
// service and so that keeps it from being collected.
DataVendingService(net, storage)
// Start up the MQ service.
net.start()
webServer = initWebServer()
}
private fun initWebServer(): Server {
val port = p2pPort.port + 1 // TODO: Move this into the node config file.
val server = Server(port)
val handler = ServletContextHandler()
handler.setAttribute("storage", storage)
handler.addServlet(AttachmentUploadServlet::class.java, "/attachments/upload")
server.handler = handler
server.start()
return server
}
fun stop() {
webServer.stop()
net.stop()
serverThread.shutdownNow()
nodeFileLock!!.release()
}
fun makeStorageService(dir: Path): StorageService {
val attachmentsDir = dir.resolve("attachments")
try { Files.createDirectory(attachmentsDir) } catch (e: java.nio.file.FileAlreadyExistsException) {}
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that
// is distributed to other peers and we use it (or a key signed by it) when we need to do something
// "permissioned". The identity file is what gets distributed and contains the node's legal name along with
@ -185,7 +207,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMap("validated-transactions")
override val attachments: AttachmentStorage = NodeAttachmentStorage(dir.resolve("attachments"))
override val attachments: AttachmentStorage = NodeAttachmentStorage(attachmentsDir)
override val contractPrograms = contractFactory
override val myLegalIdentity = identity
override val myLegalIdentityKey = keypair

View File

@ -0,0 +1,51 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node.servlets
import core.StorageService
import core.utilities.loggerFor
import org.apache.commons.fileupload.servlet.ServletFileUpload
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
class AttachmentUploadServlet : HttpServlet() {
private val log = loggerFor<AttachmentUploadServlet>()
override fun doPost(req: HttpServletRequest, resp: HttpServletResponse) {
@Suppress("DEPRECATION") // Bogus warning due to superclass static method being deprecated.
val isMultipart = ServletFileUpload.isMultipartContent(req)
if (!isMultipart) {
log.error("Got a non-file upload request to the attachments servlet")
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "This end point is for file uploads only.")
return
}
val upload = ServletFileUpload()
val iterator = upload.getItemIterator(req)
while (iterator.hasNext()) {
val item = iterator.next()
if (!item.name.endsWith(".jar")) {
log.error("Attempted upload of a non-JAR attachment: mime=${item.contentType} filename=${item.name}")
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"${item.name}: Must be have a MIME type of application/java-archive and a filename ending in .jar")
return
}
log.info("Receiving ${item.name}")
val storage = servletContext.getAttribute("storage") as StorageService
item.openStream().use {
val id = storage.attachments.importAttachment(it)
log.info("${item.name} successfully inserted into the attachment store with id $id")
}
}
}
}