Merged in PLT-52-attachments (pull request #24)

Attachments support: part one
This commit is contained in:
Mike Hearn 2016-02-29 22:03:49 +01:00
commit a7e407bd74
24 changed files with 1066 additions and 288 deletions

View File

@ -15,6 +15,7 @@ buildscript {
ext.quasar_version = '0.7.4'
ext.asm_version = '0.5.3'
ext.artemis_version = '1.2.0'
ext.jetty_version = '9.3.7.v20160115'
repositories {
mavenCentral()
@ -48,8 +49,6 @@ configurations.all() {
}
dependencies {
testCompile 'junit:junit:4.12'
compile project(':contracts')
compile "com.google.code.findbugs:jsr305:3.0.1"
@ -96,6 +95,15 @@ dependencies {
compile("commons-logging:commons-logging:1.2") {
force = true
}
// Web stuff: for HTTP[S] servlets
compile "org.eclipse.jetty:jetty-servlet:${jetty_version}"
compile "javax.servlet:javax.servlet-api:3.1.0"
compile "commons-fileupload:commons-fileupload:1.3.1"
// Unit testing helpers.
testCompile 'junit:junit:4.12'
testCompile 'com.google.jimfs:jimfs:1.1' // in memory java.nio filesystem.
}
// These lines tell Gradle to add a couple of JVM command line arguments to unit test and program runs, which set up

View File

@ -66,7 +66,7 @@ data class WireTransaction(val inputs: List<StateRef>,
companion object {
fun deserialize(bits: SerializedBytes<WireTransaction>): WireTransaction {
val wtx = bits.deserialize()
val wtx = bits.bits.deserialize<WireTransaction>()
wtx.cachedBits = bits
return wtx
}

View File

@ -12,6 +12,9 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import org.slf4j.Logger
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.security.SecureRandom
import java.time.Duration
import java.time.temporal.Temporal
@ -45,6 +48,7 @@ infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> =
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
fun <R> Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block)
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
fun <T> SettableFuture<T>.setFrom(logger: Logger? = null, block: () -> T): SettableFuture<T> {

View File

@ -17,6 +17,7 @@ import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.JavaSerializer
import core.SignedTransaction
import core.WireTransaction
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.crypto.sha256
@ -82,6 +83,9 @@ inline fun <reified T : Any> ByteArray.deserialize(kryo: Kryo = THREAD_LOCAL_KRY
inline fun <reified T : Any> OpaqueBytes.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): T {
return this.bits.deserialize(kryo, includeClassName)
}
// The more specific deserialize version results in the bytes being cached, which is faster.
@JvmName("SerializedBytesWireTransaction")
fun SerializedBytes<WireTransaction>.deserialize(): WireTransaction = WireTransaction.deserialize(this)
inline fun <reified T : Any> SerializedBytes<T>.deserialize(): T = bits.deserialize()
/**

View File

@ -24,7 +24,7 @@ elif [[ "$mode" == "seller" ]]; then
echo "myLegalName = Bank of Giza" >seller/config
fi
build/install/r3prototyping/bin/r3prototyping --dir=seller --fake-trade-with=localhost --network-address=localhost:31338 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost
build/install/r3prototyping/bin/r3prototyping --dir=seller --fake-trade-with=localhost --network-address=localhost:31340 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost
else
echo "Run like this, one in each tab:"
echo

View File

@ -0,0 +1,46 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package contracts.protocols
import core.Attachment
import core.crypto.SecureHash
import core.crypto.sha256
import core.messaging.SingleMessageRecipient
import java.io.ByteArrayInputStream
import java.io.InputStream
/**
* Given a set of hashes either loads from from local storage or requests them from the other peer. Downloaded
* attachments are saved to local storage automatically.
*/
class FetchAttachmentsProtocol(requests: Set<SecureHash>,
otherSide: SingleMessageRecipient) : FetchDataProtocol<Attachment, ByteArray>(requests, otherSide) {
companion object {
const val TOPIC = "platform.fetch.attachment"
}
override fun load(txid: SecureHash): Attachment? = serviceHub.storageService.attachments.openAttachment(txid)
override val queryTopic: String = TOPIC
override fun convert(wire: ByteArray): Attachment {
return object : Attachment {
override fun open(): InputStream = ByteArrayInputStream(wire)
override val id: SecureHash = wire.sha256()
override fun equals(other: Any?) = (other is Attachment) && other.id == id
override fun hashCode(): Int = id.hashCode()
}
}
override fun maybeWriteToDisk(downloaded: List<Attachment>) {
for (attachment in downloaded) {
serviceHub.storageService.attachments.importAttachment(attachment.open())
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package contracts.protocols
import co.paralleluniverse.fibers.Suspendable
import core.NamedByHash
import core.crypto.SecureHash
import core.messaging.SingleMessageRecipient
import core.node.DataVendingService
import core.protocols.ProtocolLogic
import core.random63BitValue
import core.utilities.UntrustworthyData
import java.util.*
/**
* An abstract protocol for fetching typed data from a remote peer.
*
* Given a set of hashes (IDs), either loads them from local disk or asks the remote peer to provide them.
*
* A malicious response in which the data provided by the remote peer does not hash to the requested hash results in
* [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a
* [HashNotFound] exception being thrown.
*
* By default this class does not insert data into any local database, if you want to do that after missing items were
* fetched then override [maybeWriteToDisk]. You *must* override [load] and [queryTopic]. If the wire type is not the
* same as the ultimate type, you must also override [convert].
*
* @param T The ultimate type of the data being fetched.
* @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
*/
abstract class FetchDataProtocol<T : NamedByHash, W : Any>(
protected val requests: Set<SecureHash>,
protected val otherSide: SingleMessageRecipient) : ProtocolLogic<FetchDataProtocol.Result<T>>() {
open class BadAnswer : Exception()
class HashNotFound(val requested: SecureHash) : BadAnswer()
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer()
data class Result<T : NamedByHash>(val fromDisk: List<T>, val downloaded: List<T>)
protected abstract val queryTopic: String
@Suspendable
override fun call(): Result<T> {
// Load the items we have from disk and figure out which we're missing.
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
Result(fromDisk, emptyList())
} else {
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
val sid = random63BitValue()
val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid)
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
val maybeItems = sendAndReceive<ArrayList<W?>>(queryTopic, otherSide, 0, sid, fetchReq)
// Check for a buggy/malicious peer answering with something that we didn't ask for.
val downloaded = validateFetchResponse(maybeItems, toFetch)
maybeWriteToDisk(downloaded)
Result(fromDisk, downloaded)
}
}
protected open fun maybeWriteToDisk(downloaded: List<T>) {
// Do nothing by default.
}
private fun loadWhatWeHave(): Pair<List<T>, List<SecureHash>> {
val fromDisk = ArrayList<T>()
val toFetch = ArrayList<SecureHash>()
for (txid in requests) {
val stx = load(txid)
if (stx == null)
toFetch += txid
else
fromDisk += stx
}
return Pair(fromDisk, toFetch)
}
protected abstract fun load(txid: SecureHash): T?
@Suppress("UNCHECKED_CAST")
protected open fun convert(wire: W): T = wire as T
private fun validateFetchResponse(maybeItems: UntrustworthyData<ArrayList<W?>>,
requests: List<SecureHash>): List<T> =
maybeItems.validate { response ->
if (response.size != requests.size)
throw BadAnswer()
for ((index, resp) in response.withIndex()) {
if (resp == null) throw HashNotFound(requests[index])
}
val answers = response.requireNoNulls().map { convert(it) }
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious protocol violator or buggy.
for ((index, item) in answers.withIndex())
if (item.id != requests[index])
throw DownloadedVsRequestedDataMismatch(requests[index], item.id)
answers
}
}

View File

@ -8,82 +8,24 @@
package contracts.protocols
import co.paralleluniverse.fibers.Suspendable
import core.SignedTransaction
import core.crypto.SecureHash
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.utilities.UntrustworthyData
import core.node.DataVendingService
import core.random63BitValue
import java.util.*
/**
* Given a set of tx hashes (IDs), either loads them from local disk or asks the remote peer to provide them.
*
* A malicious response in which the data provided by the remote peer does not hash to the requested hash results in
* [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a
* HashNotFound. Note that returned transactions are not inserted into the database, because it's up to the caller
* to actually verify the transactions are valid.
* [FetchDataProtocol.DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it
* results in a [FetchDataProtocol.HashNotFound] exception. Note that returned transactions are not inserted into
* the database, because it's up to the caller to actually verify the transactions are valid.
*/
class FetchTransactionsProtocol(private val requests: Set<SecureHash>,
private val otherSide: SingleMessageRecipient) : ProtocolLogic<FetchTransactionsProtocol.Result>() {
data class Result(val fromDisk: List<SignedTransaction>, val downloaded: List<SignedTransaction>)
@Suspendable
override fun call(): Result {
val sid = random63BitValue()
// Load the transactions we have from disk and figure out which we're missing.
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
Result(fromDisk, emptyList())
} else {
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid)
val maybeTxns = sendAndReceive<ArrayList<SignedTransaction?>>("platform.fetch.tx", otherSide, 0, sid, fetchReq)
// Check for a buggy/malicious peer answering with something that we didn't ask for.
// Note that we strip the UntrustworthyData marker here, but of course the returned transactions may be
// invalid in other ways! Perhaps we should keep it.
Result(fromDisk, validateTXFetchResponse(maybeTxns, toFetch))
}
class FetchTransactionsProtocol(requests: Set<SecureHash>, otherSide: SingleMessageRecipient) :
FetchDataProtocol<SignedTransaction, SignedTransaction>(requests, otherSide) {
companion object {
const val TOPIC = "platform.fetch.tx"
}
private fun loadWhatWeHave(): Pair<List<SignedTransaction>, List<SecureHash>> {
val fromDisk = ArrayList<SignedTransaction>()
val toFetch = ArrayList<SecureHash>()
for (txid in requests) {
val stx = serviceHub.storageService.validatedTransactions[txid]
if (stx == null)
toFetch += txid
else
fromDisk += stx
}
return Pair(fromDisk, toFetch)
}
private fun validateTXFetchResponse(maybeTxns: UntrustworthyData<ArrayList<SignedTransaction?>>,
requests: List<SecureHash>): List<SignedTransaction> =
maybeTxns.validate { response ->
if (response.size != requests.size)
throw BadAnswer()
for ((index, resp) in response.withIndex()) {
if (resp == null) throw HashNotFound(requests[index])
}
val answers = response.requireNoNulls()
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious protocol violator or buggy.
for ((index, stx) in answers.withIndex())
if (stx.id != requests[index])
throw DownloadedVsRequestedDataMismatch(requests[index], stx.id)
answers
}
open class BadAnswer : Exception()
class HashNotFound(val requested: SecureHash) : BadAnswer()
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer()
}
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions[txid]
override val queryTopic: String = TOPIC
}

View File

@ -12,6 +12,7 @@ import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.MessagingService
import core.messaging.NetworkMap
import java.io.InputStream
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -76,6 +77,7 @@ object DummyTimestampingAuthority {
* anything like that, this interface is only big enough to support the prototyping work.
*/
interface StorageService {
/** TODO: Temp scaffolding that will go away eventually. */
fun <K,V> getMap(tableName: String): MutableMap<K, V>
/**
@ -85,6 +87,9 @@ interface StorageService {
*/
val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
/** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */
val attachments: AttachmentStorage
/**
* A map of program hash->contract class type, used for verification.
*/
@ -98,6 +103,33 @@ interface StorageService {
val myLegalIdentityKey: KeyPair
}
/**
* An attachment store records potentially large binary objects, identified by their hash. Note that attachments are
* immutable and can never be erased once inserted!
*/
interface AttachmentStorage {
/**
* Returns a newly opened stream for the given locally stored attachment, or null if no such attachment is known.
* The returned stream must be closed when you are done with it to avoid resource leaks. You should probably wrap
* the result in a [JarInputStream] unless you're sending it somewhere, there is a convenience helper for this
* on [Attachment].
*/
fun openAttachment(id: SecureHash): Attachment?
/**
* Inserts the given attachment into the store, does *not* close the input stream. This can be an intensive
* operation due to the need to copy the bytes to disk and hash them along the way.
*
* Note that you should not pass a [JarInputStream] into this method and it will throw if you do, because access
* to the raw byte stream is required.
*
* @throws FileAlreadyExistsException if the given byte stream has already been inserted.
* @throws IllegalArgumentException if the given byte stream is empty or a [JarInputStream]
* @throws IOException if something went wrong.
*/
fun importAttachment(jar: InputStream): SecureHash
}
/**
* A service hub simply vends references to the other services a node has. Some of those services may be missing or
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of

View File

@ -0,0 +1,192 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import contracts.*
import core.*
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.*
import core.serialization.deserialize
import core.serialization.serialize
import org.slf4j.Logger
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.util.*
import java.util.concurrent.Executors
/**
* A base node implementation that can be customised either for production (with real implementations that do real
* I/O), or a mock implementation suitable for unit test environments.
*/
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val timestamperAddress: LegallyIdentifiableNode?) {
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
}
protected abstract val log: Logger
// We will run as much stuff in this thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
protected open val serverThread = Executors.newSingleThreadExecutor()
val services = object : ServiceHub {
override val networkService: MessagingService get() = net
override val networkMapService: NetworkMap = MockNetworkMap()
override val storageService: StorageService get() = storage
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
}
val legallyIdentifableAddress: LegallyIdentifiableNode get() = LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
// TODO: This will be obsoleted by "PLT-12: Basic module/sandbox system for contracts"
protected val contractFactory = object : ContractFactory {
private val contracts = mapOf(
CASH_PROGRAM_ID to Cash::class.java,
CP_PROGRAM_ID to CommercialPaper::class.java,
CROWDFUND_PROGRAM_ID to CrowdFund::class.java,
DUMMY_PROGRAM_ID to DummyContract::class.java
)
override fun <T : Contract> get(hash: SecureHash): T {
val c = contracts[hash] ?: throw UnknownContractException()
@Suppress("UNCHECKED_CAST")
return c.newInstance() as T
}
}
lateinit var storage: StorageService
lateinit var smm: StateMachineManager
lateinit var wallet: WalletService
lateinit var keyManagement: E2ETestKeyManagementService
var inNodeTimestampingService: TimestamperNodeService? = null
lateinit var identity: IdentityService
lateinit var net: MessagingService
open fun start(): AbstractNode {
storage = makeStorageService(dir)
net = makeMessagingService()
smm = StateMachineManager(services, serverThread)
wallet = E2ETestWalletService(services)
keyManagement = E2ETestKeyManagementService()
// Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are
// given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping.
val tsid = if (timestamperAddress != null) {
inNodeTimestampingService = null
timestamperAddress
} else {
inNodeTimestampingService = TimestamperNodeService(net, storage.myLegalIdentity, storage.myLegalIdentityKey)
LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
}
(services.networkMapService as MockNetworkMap).timestampingNodes.add(tsid)
// We don't have any identity infrastructure right now, so we just throw together the only two identities we
// know about: our own, and the identity of the remote timestamper node (if any).
val knownIdentities = if (timestamperAddress != null)
listOf(storage.myLegalIdentity, timestamperAddress.identity)
else
listOf(storage.myLegalIdentity)
identity = FixedIdentityService(knownIdentities)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage)
return this
}
open fun stop() {
net.stop()
serverThread.shutdownNow()
}
protected abstract fun makeMessagingService(): MessagingService
protected fun makeStorageService(dir: Path): StorageService {
val attachments = makeAttachmentStorage(dir)
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that
// is distributed to other peers and we use it (or a key signed by it) when we need to do something
// "permissioned". The identity file is what gets distributed and contains the node's legal name along with
// the public key. Obviously in a real system this would need to be a certificate chain of some kind to ensure
// the legal name is actually validated in some way.
val privKeyFile = dir.resolve(PRIVATE_KEY_FILE_NAME)
val pubIdentityFile = dir.resolve(PUBLIC_IDENTITY_FILE_NAME)
val (identity, keypair) = if (!Files.exists(privKeyFile)) {
log.info("Identity key not found, generating fresh key!")
val keypair: KeyPair = generateKeyPair()
keypair.serialize().writeToFile(privKeyFile)
val myIdentity = Party(configuration.myLegalName, keypair.public)
// We include the Party class with the file here to help catch mixups when admins provide files of the
// wrong type by mistake.
myIdentity.serialize(includeClassName = true).writeToFile(pubIdentityFile)
Pair(myIdentity, keypair)
} else {
// Check that the identity in the config file matches the identity file we have stored to disk.
// This is just a sanity check. It shouldn't fail unless the admin has fiddled with the files and messed
// things up for us.
val myIdentity = Files.readAllBytes(pubIdentityFile).deserialize<Party>(includeClassName = true)
if (myIdentity.name != configuration.myLegalName)
throw ConfigurationException("The legal name in the config file doesn't match the stored identity file:" +
"${configuration.myLegalName} vs ${myIdentity.name}")
// Load the private key.
val keypair = Files.readAllBytes(privKeyFile).deserialize<KeyPair>()
Pair(myIdentity, keypair)
}
log.info("Node owned by ${identity.name} starting up ...")
val ss = object : StorageService {
private val tables = HashMap<String, MutableMap<Any, Any>>()
@Suppress("UNCHECKED_CAST")
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
// TODO: This should become a database.
synchronized(tables) {
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
}
}
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMap("validated-transactions")
override val attachments: AttachmentStorage = attachments
override val contractPrograms = contractFactory
override val myLegalIdentity = identity
override val myLegalIdentityKey = keypair
}
return ss
}
private fun makeAttachmentStorage(dir: Path): NodeAttachmentStorage {
val attachmentsDir = dir.resolve("attachments")
try {
Files.createDirectory(attachmentsDir)
} catch (e: FileAlreadyExistsException) {
}
val attachments = NodeAttachmentStorage(attachmentsDir)
return attachments
}
}

View File

@ -8,6 +8,8 @@
package core.node
import contracts.protocols.FetchAttachmentsProtocol
import contracts.protocols.FetchTransactionsProtocol
import core.StorageService
import core.crypto.SecureHash
import core.messaging.Message
@ -16,6 +18,7 @@ import core.messaging.SingleMessageRecipient
import core.messaging.send
import core.serialization.deserialize
import core.utilities.loggerFor
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
/**
@ -33,15 +36,12 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class DataVendingService(private val net: MessagingService, private val storage: StorageService) {
companion object {
val TX_FETCH_TOPIC = "platform.fetch.tx"
val CONTRACT_FETCH_TOPIC = "platform.fetch.contract"
val logger = loggerFor<DataVendingService>()
}
init {
net.addMessageHandler("$TX_FETCH_TOPIC.0") { msg, registration -> handleTXRequest(msg) }
net.addMessageHandler("$CONTRACT_FETCH_TOPIC.0") { msg, registration -> handleContractRequest(msg) }
net.addMessageHandler("${FetchTransactionsProtocol.TOPIC}.0") { msg, registration -> handleTXRequest(msg) }
net.addMessageHandler("${FetchAttachmentsProtocol.TOPIC}.0") { msg, registration -> handleAttachmentRequest(msg) }
}
// TODO: Give all messages a respond-to address+session ID automatically.
@ -56,10 +56,22 @@ class DataVendingService(private val net: MessagingService, private val storage:
logger.info("Got request for unknown tx $it")
tx
}
net.send("$TX_FETCH_TOPIC.${req.sessionID}", req.responseTo, answers)
net.send("${FetchTransactionsProtocol.TOPIC}.${req.sessionID}", req.responseTo, answers)
}
private fun handleContractRequest(msg: Message) {
TODO("PLT-12: Basic module/sandbox system for contracts: $msg")
private fun handleAttachmentRequest(msg: Message) {
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
val req = msg.data.deserialize<Request>()
require(req.hashes.isNotEmpty())
val answers: List<ByteArray?> = req.hashes.map {
val jar: InputStream? = storage.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
}
}
net.send("${FetchAttachmentsProtocol.TOPIC}.${req.sessionID}", req.responseTo, answers)
}
}

View File

@ -9,190 +9,76 @@
package core.node
import com.google.common.net.HostAndPort
import contracts.*
import core.*
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.*
import core.serialization.deserialize
import core.serialization.serialize
import core.messaging.LegallyIdentifiableNode
import core.messaging.MessagingService
import core.node.servlets.AttachmentUploadServlet
import core.utilities.loggerFor
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.channels.FileLock
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.security.KeyPair
import java.util.*
import java.util.concurrent.Executors
class ConfigurationException(message: String) : Exception(message)
// TODO: Split this into a regression testing environment
/**
* A simple wrapper around a plain old Java .properties file. The keys have the same name as in the source code.
*
* TODO: Replace Java properties file with a better config file format (maybe yaml).
* We want to be able to configure via a GUI too, so an ability to round-trip whitespace, comments etc when machine
* editing the file is a must-have.
*/
class NodeConfiguration(private val properties: Properties) {
val myLegalName: String by properties
}
/**
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
* loads important data off disk and starts listening for connections.
*
* @param dir A [Path] to a location on disk where working files can be found or stored.
* @param myNetAddr The host and port that this server will use. It can't find out its own external hostname, so you
* have to specify that yourself.
* @param p2pAddr The host and port that this server will use. It can't find out its own external hostname, so you
* have to specify that yourself.
* @param configuration This is typically loaded from a .properties file
* @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one.
*/
class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeConfiguration,
timestamperAddress: LegallyIdentifiableNode?) {
private val log = loggerFor<Node>()
// We will run as much stuff in this thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
val serverThread = Executors.newSingleThreadExecutor()
val services = object : ServiceHub {
override val networkService: MessagingService get() = net
override val networkMapService: NetworkMap = MockNetworkMap()
override val storageService: StorageService get() = storage
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration,
timestamperAddress: LegallyIdentifiableNode?) : AbstractNode(dir, configuration, timestamperAddress) {
companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
val DEFAULT_PORT = 31337
}
// TODO: This will be obsoleted by "PLT-12: Basic module/sandbox system for contracts"
private val contractFactory = object : ContractFactory {
private val contracts = mapOf(
CASH_PROGRAM_ID to Cash::class.java,
CP_PROGRAM_ID to CommercialPaper::class.java,
CROWDFUND_PROGRAM_ID to CrowdFund::class.java,
DUMMY_PROGRAM_ID to DummyContract::class.java
)
override val log = loggerFor<Node>()
override fun <T : Contract> get(hash: SecureHash): T {
val c = contracts[hash] ?: throw UnknownContractException()
@Suppress("UNCHECKED_CAST")
return c.newInstance() as T
}
}
val storage: StorageService
val smm: StateMachineManager
val net: ArtemisMessagingService
val wallet: WalletService
val keyManagement: E2ETestKeyManagementService
val inNodeTimestampingService: TimestamperNodeService?
val identity: IdentityService
lateinit var webServer: Server
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null
init {
override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr)
private fun initWebServer(): Server {
val port = p2pAddr.port + 1 // TODO: Move this into the node config file.
val server = Server(port)
val handler = ServletContextHandler()
handler.setAttribute("storage", storage)
handler.addServlet(AttachmentUploadServlet::class.java, "/attachments/upload")
server.handler = handler
server.start()
return server
}
override fun start(): Node {
alreadyRunningNodeCheck()
storage = makeStorageService(dir)
smm = StateMachineManager(services, serverThread)
net = ArtemisMessagingService(dir, myNetAddr)
wallet = E2ETestWalletService(services)
keyManagement = E2ETestKeyManagementService()
// Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are
// given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping.
val tsid = if (timestamperAddress != null) {
inNodeTimestampingService = null
timestamperAddress
} else {
inNodeTimestampingService = TimestamperNodeService(net, storage.myLegalIdentity, storage.myLegalIdentityKey)
LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
}
(services.networkMapService as MockNetworkMap).timestampingNodes.add(tsid)
// We don't have any identity infrastructure right now, so we just throw together the only two identities we
// know about: our own, and the identity of the remote timestamper node (if any).
val knownIdentities = if (timestamperAddress != null)
listOf(storage.myLegalIdentity, timestamperAddress.identity)
else
listOf(storage.myLegalIdentity)
identity = FixedIdentityService(knownIdentities)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage)
net.start()
super.start()
webServer = initWebServer()
// Start up the MQ service.
(net as ArtemisMessagingService).start()
return this
}
fun stop() {
net.stop()
serverThread.shutdownNow()
override fun stop() {
webServer.stop()
super.stop()
nodeFileLock!!.release()
}
fun makeStorageService(dir: Path): StorageService {
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that
// is distributed to other peers and we use it (or a key signed by it) when we need to do something
// "permissioned". The identity file is what gets distributed and contains the node's legal name along with
// the public key. Obviously in a real system this would need to be a certificate chain of some kind to ensure
// the legal name is actually validated in some way.
val privKeyFile = dir.resolve(PRIVATE_KEY_FILE_NAME)
val pubIdentityFile = dir.resolve(PUBLIC_IDENTITY_FILE_NAME)
val (identity, keypair) = if (!Files.exists(privKeyFile)) {
log.info("Identity key not found, generating fresh key!")
val keypair: KeyPair = generateKeyPair()
keypair.serialize().writeToFile(privKeyFile)
val myIdentity = Party(configuration.myLegalName, keypair.public)
// We include the Party class with the file here to help catch mixups when admins provide files of the
// wrong type by mistake.
myIdentity.serialize(includeClassName = true).writeToFile(pubIdentityFile)
Pair(myIdentity, keypair)
} else {
// Check that the identity in the config file matches the identity file we have stored to disk.
// This is just a sanity check. It shouldn't fail unless the admin has fiddled with the files and messed
// things up for us.
val myIdentity = Files.readAllBytes(pubIdentityFile).deserialize<Party>(includeClassName = true)
if (myIdentity.name != configuration.myLegalName)
throw ConfigurationException("The legal name in the config file doesn't match the stored identity file:" +
"${configuration.myLegalName} vs ${myIdentity.name}")
// Load the private key.
val keypair = Files.readAllBytes(privKeyFile).deserialize<KeyPair>()
Pair(myIdentity, keypair)
}
log.info("Node owned by ${identity.name} starting up ...")
val ss = object : StorageService {
private val tables = HashMap<String, MutableMap<Any, Any>>()
@Suppress("UNCHECKED_CAST")
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
// TODO: This should become a database.
synchronized(tables) {
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
}
}
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMap("validated-transactions")
override val contractPrograms = contractFactory
override val myLegalIdentity = identity
override val myLegalIdentityKey = keypair
}
return ss
}
private fun alreadyRunningNodeCheck() {
// Write out our process ID (which may or may not resemble a UNIX process id - to us it's just a string) to a
// file that we'll do our best to delete on exit. But if we don't, it'll be overwritten next time. If it already
@ -216,12 +102,4 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
if (nodeFileLock == null)
nodeFileLock = RandomAccessFile(file, "rw").channel.lock()
}
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
val DEFAULT_PORT = 31337
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import com.google.common.annotations.VisibleForTesting
import com.google.common.hash.Hashing
import com.google.common.hash.HashingInputStream
import com.google.common.io.CountingInputStream
import core.Attachment
import core.AttachmentStorage
import core.crypto.SecureHash
import core.utilities.loggerFor
import java.io.FilterInputStream
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.*
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
/**
* Stores attachments in the specified local directory, which must exist. Doesn't allow new attachments to be uploaded.
*/
@ThreadSafe
class NodeAttachmentStorage(val storePath: Path) : AttachmentStorage {
private val log = loggerFor<NodeAttachmentStorage>()
@VisibleForTesting
var checkAttachmentsOnLoad = true
init {
require(Files.isDirectory(storePath)) { "$storePath must be a directory" }
}
class OnDiskHashMismatch(val file: Path, val actual: SecureHash) : Exception() {
override fun toString() = "File $file hashed to $actual: corruption in attachment store?"
}
/**
* Wraps a stream and hashes data as it is read: if the entire stream is consumed, then at the end the hash of
* the read data is compared to the [expected] hash and [OnDiskHashMismatch] is thrown by [close] if they didn't
* match. The goal of this is to detect cases where attachments in the store have been tampered with or corrupted
* and no longer match their file name. It won't always work: if we read a zip for our own uses and skip around
* inside it, we haven't read the whole file, so we can't check the hash. But when copying it over the network
* this will provide an additional safety check against user error.
*/
private class HashCheckingStream(val expected: SecureHash.SHA256,
val filePath: Path,
input: InputStream,
private val counter: CountingInputStream = CountingInputStream(input),
private val stream: HashingInputStream = HashingInputStream(Hashing.sha256(), counter)) : FilterInputStream(stream) {
private val expectedSize = Files.size(filePath)
override fun close() {
super.close()
if (counter.count != expectedSize) return
val actual = SecureHash.SHA256(stream.hash().asBytes())
if (actual != expected)
throw OnDiskHashMismatch(filePath, actual)
}
}
override fun openAttachment(id: SecureHash): Attachment? {
val path = storePath.resolve(id.toString())
if (!Files.exists(path)) return null
return object : Attachment {
override fun open(): InputStream {
var stream = Files.newInputStream(path)
// This is just an optional safety check. If it slows things down too much it can be disabled.
if (id is SecureHash.SHA256 && checkAttachmentsOnLoad)
stream = HashCheckingStream(id, path, stream)
log.debug("Opening attachment $id")
return stream
}
override val id: SecureHash = id
override fun equals(other: Any?) = other is Attachment && other.id == id
override fun hashCode(): Int = id.hashCode()
}
}
override fun importAttachment(jar: InputStream): SecureHash {
require(jar !is JarInputStream)
val hs = HashingInputStream(Hashing.sha256(), jar)
val tmp = storePath.resolve("tmp.${UUID.randomUUID()}")
Files.copy(hs, tmp)
checkIsAValidJAR(tmp)
val id = SecureHash.SHA256(hs.hash().asBytes())
val finalPath = storePath.resolve(id.toString())
try {
// Move into place atomically or fail if that isn't possible. We don't want a half moved attachment to
// be exposed to parallel threads. This gives us thread safety.
Files.move(tmp, finalPath, StandardCopyOption.ATOMIC_MOVE)
} finally {
Files.deleteIfExists(tmp)
}
log.info("Stored new attachment $id")
return id
}
private fun checkIsAValidJAR(path: Path) {
// Just iterate over the entries with verification enabled: should be good enough to catch mistakes.
JarInputStream(Files.newInputStream(path), true).use { stream ->
var cursor = stream.nextJarEntry
while (cursor != null) cursor = stream.nextJarEntry
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import java.util.*
interface NodeConfiguration {
val myLegalName: String
}
/**
* A simple wrapper around a plain old Java .properties file. The keys have the same name as in the source code.
*
* TODO: Replace Java properties file with a better config file format (maybe yaml).
* We want to be able to configure via a GUI too, so an ability to round-trip whitespace, comments etc when machine
* editing the file is a must-have.
*/
class NodeConfigurationFromProperties(private val properties: Properties) : NodeConfiguration {
override val myLegalName: String by properties
}

View File

@ -90,7 +90,7 @@ fun main(args: Array<String>) {
LegallyIdentifiableNode(ArtemisMessagingService.makeRecipient(addr), party)
} else null
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId) }
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId).start() }
if (listening) {
val buyer = TraderDemoProtocolBuyer()
@ -251,7 +251,7 @@ private fun loadConfigFile(configFile: Path): NodeConfiguration {
Properties().apply { load(it) }
}
val config = NodeConfiguration(configProps)
val config = NodeConfigurationFromProperties(configProps)
// Make sure admin did actually edit at least the legal name.
if (config.myLegalName == defaultLegalName)

View File

@ -0,0 +1,51 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node.servlets
import core.StorageService
import core.utilities.loggerFor
import org.apache.commons.fileupload.servlet.ServletFileUpload
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
class AttachmentUploadServlet : HttpServlet() {
private val log = loggerFor<AttachmentUploadServlet>()
override fun doPost(req: HttpServletRequest, resp: HttpServletResponse) {
@Suppress("DEPRECATION") // Bogus warning due to superclass static method being deprecated.
val isMultipart = ServletFileUpload.isMultipartContent(req)
if (!isMultipart) {
log.error("Got a non-file upload request to the attachments servlet")
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "This end point is for file uploads only.")
return
}
val upload = ServletFileUpload()
val iterator = upload.getItemIterator(req)
while (iterator.hasNext()) {
val item = iterator.next()
if (!item.name.endsWith(".jar")) {
log.error("Attempted upload of a non-JAR attachment: mime=${item.contentType} filename=${item.name}")
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"${item.name}: Must be have a MIME type of application/java-archive and a filename ending in .jar")
return
}
log.info("Receiving ${item.name}")
val storage = servletContext.getAttribute("storage") as StorageService
item.openStream().use {
val id = storage.attachments.importAttachment(it)
log.info("${item.name} successfully inserted into the attachment store with id $id")
}
}
}
}

View File

@ -8,10 +8,7 @@
package core
import core.crypto.DigitalSignature
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.crypto.signWithECDSA
import core.crypto.*
import core.messaging.MessagingService
import core.messaging.MockNetworkMap
import core.messaging.NetworkMap
@ -24,6 +21,10 @@ import core.testutils.TEST_KEYS_TO_CORP_MAP
import core.testutils.TEST_PROGRAM_MAP
import core.testutils.TEST_TX_TIME
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.InputStream
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -31,6 +32,7 @@ import java.time.Clock
import java.time.Duration
import java.time.ZoneId
import java.util.*
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
/**
@ -67,6 +69,34 @@ class MockWalletService(val states: List<StateAndRef<OwnableState>>) : WalletSer
override val currentWallet = Wallet(states)
}
class MockAttachmentStorage : AttachmentStorage {
val files = HashMap<SecureHash, ByteArray>()
override fun openAttachment(id: SecureHash): Attachment? {
val f = files[id] ?: return null
return object : Attachment {
override fun open(): JarInputStream = JarInputStream(ByteArrayInputStream(f))
override val id: SecureHash = id
}
}
override fun importAttachment(jar: InputStream): SecureHash {
// JIS makes read()/readBytes() return bytes of the current file, but we want to hash the entire container here.
require(jar !is JarInputStream)
val bytes = run {
val s = ByteArrayOutputStream()
jar.copyTo(s)
s.toByteArray()
}
val sha256 = bytes.sha256()
if (files.containsKey(sha256))
throw FileAlreadyExistsException(File("!! MOCK FILE NAME"))
files[sha256] = bytes
return sha256
}
}
@ThreadSafe
class MockStorageService(val recordingAs: Map<String, String>? = null) : StorageService {
override val myLegalIdentityKey: KeyPair = generateKeyPair()
@ -79,6 +109,8 @@ class MockStorageService(val recordingAs: Map<String, String>? = null) : Storage
override val contractPrograms = MockContractFactory
override val attachments: AttachmentStorage = MockAttachmentStorage()
@Suppress("UNCHECKED_CAST")
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
synchronized(tables) {

View File

@ -0,0 +1,119 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.messaging
import contracts.protocols.FetchAttachmentsProtocol
import contracts.protocols.FetchDataProtocol
import core.Attachment
import core.crypto.SecureHash
import core.crypto.sha256
import core.node.MockNetwork
import core.node.NodeAttachmentStorage
import core.serialization.OpaqueBytes
import core.testutils.rootCauseExceptions
import core.utilities.BriefLogFormatter
import org.junit.Before
import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class AttachmentTests {
lateinit var network: MockNetwork
init {
BriefLogFormatter.init()
}
@Before
fun setUp() {
network = MockNetwork()
}
fun fakeAttachment(): ByteArray {
val bs = ByteArrayOutputStream()
val js = JarOutputStream(bs)
js.putNextEntry(ZipEntry("file1.txt"))
js.writer().append("Some useful content")
js.closeEntry()
js.close()
return bs.toByteArray()
}
@Test
fun `download and store`() {
val (n0, n1) = network.createTwoNodes()
// Insert an attachment into node zero's store directly.
val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
// Get node one to run a protocol to fetch it and insert it.
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress))
network.runNetwork()
assertEquals(0, f1.get().fromDisk.size)
// Verify it was inserted into node one's store.
val attachment = n1.storage.attachments.openAttachment(id)!!
assertEquals(id, attachment.open().readBytes().sha256())
// Shut down node zero and ensure node one can still resolve the attachment.
n0.stop()
val response: FetchDataProtocol.Result<Attachment> = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress)).get()
assertEquals(attachment, response.fromDisk[0])
}
@Test
fun `missing`() {
val (n0, n1) = network.createTwoNodes()
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.net.myAddress))
network.runNetwork()
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.get() } }
assertEquals(hash, e.requested)
}
@Test
fun maliciousResponse() {
// Make a node that doesn't do sanity checking at load time.
val n0 = network.createNode(null) { path, config, net, ts ->
object : MockNetwork.MockNode(path, config, net, ts) {
override fun start(): MockNetwork.MockNode {
super.start()
(storage.attachments as NodeAttachmentStorage).checkAttachmentsOnLoad = false
return this
}
}
}
val n1 = network.createNode(n0.legallyIdentifableAddress)
// Insert an attachment into node zero's store directly.
val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
// Corrupt its store.
val writer = Files.newByteChannel(network.filesystem.getPath("/nodes/0/attachments/$id"), StandardOpenOption.WRITE)
writer.write(ByteBuffer.wrap(OpaqueBytes.of(99, 99, 99, 99).bits))
writer.close()
// Get n1 to fetch the attachment. Should receive corrupted bytes.
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress))
network.runNetwork()
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
rootCauseExceptions { f1.get() }
}
}
}

View File

@ -24,35 +24,35 @@ import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.thread
/**
* An in-memory network allows you to manufacture [InMemoryNode]s for a set of participants. Each
* [InMemoryNode] maintains a queue of messages it has received, and a background thread that dispatches
* An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each
* [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
* testing).
*/
@ThreadSafe
class InMemoryNetwork {
class InMemoryMessagingNetwork {
private var counter = 0 // -1 means stopped.
private val handleNodeMap = HashMap<Handle, InMemoryNode>()
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<Message>>()
val nodes: List<InMemoryNode> @Synchronized get() = handleNodeMap.values.toList()
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
/**
* Creates a node and returns the new object that identifies its location on the network to senders, and the
* [InMemoryNode] that the recipient/in-memory node uses to receive messages and send messages itself.
* [InMemoryMessaging] that the recipient/in-memory node uses to receive messages and send messages itself.
*
* If [manuallyPumped] is set to true, then you are expected to call the [InMemoryNode.pump] method on the [InMemoryNode]
* If [manuallyPumped] is set to true, then you are expected to call the [InMemoryMessaging.pump] method on the [InMemoryMessaging]
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
* executor.
*/
@Synchronized
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<InMemoryNode>> {
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<InMemoryMessaging>> {
check(counter >= 0) { "In memory network stopped: please recreate."}
val builder = createNodeWithID(manuallyPumped, counter) as Builder
counter++
@ -61,19 +61,19 @@ class InMemoryNetwork {
}
/** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder<InMemoryNode> {
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder<InMemoryMessaging> {
return Builder(manuallyPumped, Handle(id))
}
@Synchronized
private fun netSend(message: Message, recipients: MessageRecipients) {
private fun msgSend(message: Message, recipients: MessageRecipients) {
when (recipients) {
is Handle -> getQueueForHandle(recipients).add(message)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
for (handle in handleNodeMap.keys)
for (handle in handleEndpointMap.keys)
getQueueForHandle(handle).add(message)
}
else -> throw IllegalArgumentException("Unknown type of recipient handle")
@ -82,7 +82,7 @@ class InMemoryNetwork {
@Synchronized
private fun netNodeHasShutdown(handle: Handle) {
handleNodeMap.remove(handle)
handleEndpointMap.remove(handle)
}
@Synchronized
@ -93,21 +93,21 @@ class InMemoryNetwork {
fun stop() {
val nodes = synchronized(this) {
counter = -1
handleNodeMap.values.toList()
handleEndpointMap.values.toList()
}
for (node in nodes)
node.stop()
handleNodeMap.clear()
handleEndpointMap.clear()
messageQueues.clear()
}
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryNode> {
override fun start(): ListenableFuture<InMemoryNode> {
synchronized(this@InMemoryNetwork) {
val node = InMemoryNode(manuallyPumped, id)
handleNodeMap[id] = node
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryMessaging> {
override fun start(): ListenableFuture<InMemoryMessaging> {
synchronized(this@InMemoryMessagingNetwork) {
val node = InMemoryMessaging(manuallyPumped, id)
handleEndpointMap[id] = node
return Futures.immediateFuture(node)
}
}
@ -122,7 +122,7 @@ class InMemoryNetwork {
private var timestampingAdvert: LegallyIdentifiableNode? = null
@Synchronized
fun setupTimestampingNode(manuallyPumped: Boolean): Pair<LegallyIdentifiableNode, InMemoryNode> {
fun setupTimestampingNode(manuallyPumped: Boolean): Pair<LegallyIdentifiableNode, InMemoryMessaging> {
check(timestampingAdvert == null)
val (handle, builder) = createNode(manuallyPumped)
val node = builder.start().get()
@ -132,14 +132,14 @@ class InMemoryNetwork {
}
/**
* An [InMemoryNode] provides a [MessagingService] that isn't backed by any kind of network or disk storage
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
* when all entities on 'the network' are being simulated in-process.
*
* An instance can be obtained by creating a builder and then using the start method.
*/
@ThreadSafe
inner class InMemoryNode(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService {
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService {
inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@Volatile
@ -172,7 +172,7 @@ class InMemoryNetwork {
Pair(handler, items)
}
for (it in items)
netSend(it, handle)
msgSend(it, handle)
return handler
}
@ -183,7 +183,7 @@ class InMemoryNetwork {
override fun send(message: Message, target: MessageRecipients) {
check(running)
netSend(message, target)
msgSend(message, target)
}
override fun stop() {
@ -245,7 +245,7 @@ class InMemoryNetwork {
try {
handler.callback(message, handler)
} catch(e: Exception) {
loggerFor<InMemoryNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
}
}
}

View File

@ -21,10 +21,10 @@ import kotlin.test.assertFalse
import kotlin.test.assertTrue
open class TestWithInMemoryNetwork {
val nodes: MutableMap<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode> = HashMap()
lateinit var network: InMemoryNetwork
val nodes: MutableMap<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging> = HashMap()
lateinit var network: InMemoryMessagingNetwork
fun makeNode(inBackground: Boolean = false): Pair<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode> {
fun makeNode(inBackground: Boolean = false): Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging> {
// The manuallyPumped = true bit means that we must call the pump method on the system in order to
val (address, builder) = network.createNode(!inBackground)
val node = builder.start().get()
@ -34,7 +34,7 @@ open class TestWithInMemoryNetwork {
@Before
fun setupNetwork() {
network = InMemoryNetwork()
network = InMemoryMessagingNetwork()
nodes.clear()
}
@ -43,7 +43,7 @@ open class TestWithInMemoryNetwork {
network.stop()
}
fun pumpAll(blocking: Boolean) = network.nodes.map { it.pump(blocking) }
fun pumpAll(blocking: Boolean) = network.endpoints.map { it.pump(blocking) }
// Keep calling "pump" in rounds until every node in the network reports that it had nothing to do
fun <T> runNetwork(body: () -> T): T {

View File

@ -24,7 +24,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
// TODO: Refactor this test some more to cut down on messy setup duplication.
// TODO: Refactor this test to use the MockNode class, which will clean this file up significantly.
/**
* In this example, Alice wishes to sell her commercial paper to Bob in return for $1,000,000 and they wish to do

View File

@ -0,0 +1,105 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.MoreExecutors
import core.messaging.InMemoryMessagingNetwork
import core.messaging.LegallyIdentifiableNode
import core.messaging.MessagingService
import core.utilities.loggerFor
import org.slf4j.Logger
import java.nio.file.Files
import java.nio.file.Path
import java.util.*
import java.util.concurrent.ExecutorService
/**
* A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
* Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem.
*
* Mock network nodes require manual pumping by default: they will not run asynchronous. This means that
* for message exchanges to take place (and associated handlers to run), you must call the [runNetwork]
* method.
*/
class MockNetwork {
private var counter = 0
val filesystem = Jimfs.newFileSystem(Configuration.unix())
val messagingNetwork = InMemoryMessagingNetwork()
private val _nodes = ArrayList<MockNode>()
/** A read only view of the current set of executing nodes. */
val nodes: List<MockNode> = _nodes
init {
Files.createDirectory(filesystem.getPath("/nodes"))
}
open class MockNode(dir: Path, config: NodeConfiguration, val network: InMemoryMessagingNetwork,
withTimestamper: LegallyIdentifiableNode?) : AbstractNode(dir, config, withTimestamper) {
override val log: Logger = loggerFor<MockNode>()
override val serverThread: ExecutorService = MoreExecutors.newDirectExecutorService()
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingService {
return network.createNode(true).second.start().get()
}
override fun start(): MockNode {
super.start()
return this
}
}
fun createNode(withTimestamper: LegallyIdentifiableNode?,
factory: (Path, NodeConfiguration, network: InMemoryMessagingNetwork, LegallyIdentifiableNode?) -> MockNode = { p, n, n2, l -> MockNode(p, n, n2, l) }): MockNode {
val path = filesystem.getPath("/nodes/$counter")
Files.createDirectory(path)
val config = object : NodeConfiguration {
override val myLegalName: String = "Mock Company $counter"
}
val node = factory(path, config, messagingNetwork, withTimestamper).start()
_nodes.add(node)
counter++
return node
}
/**
* Asks every node in order to process any queued up inbound messages. This may in turn result in nodes
* sending more messages to each other, thus, a typical usage is to call runNetwork with the [rounds]
* parameter set to -1 (the default) which simply runs as many rounds as necessary to result in network
* stability (no nodes sent any messages in the last round).
*/
fun runNetwork(rounds: Int = -1) {
fun pumpAll() = messagingNetwork.endpoints.map { it.pump(false) }
if (rounds == -1)
while (pumpAll().any { it }) {}
else
repeat(rounds) { pumpAll() }
}
/**
* Sets up a two node network in which the first node runs a timestamping service and the other doesn't.
*/
fun createTwoNodes(): Pair<MockNode, MockNode> {
require(nodes.isEmpty())
return Pair(createNode(null), createNode(nodes[0].legallyIdentifableAddress))
}
}

View File

@ -0,0 +1,101 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import core.crypto.SecureHash
import core.use
import org.junit.Before
import org.junit.Test
import java.nio.charset.Charset
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.jar.JarEntry
import java.util.jar.JarOutputStream
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
class NodeAttachmentStorageTest {
// Use an in memory file system for testing attachment storage.
lateinit var fs: FileSystem
@Before
fun setUp() {
fs = Jimfs.newFileSystem(Configuration.unix())
}
@Test
fun `insert and retrieve`() {
val testJar = makeTestJar()
val expectedHash = SecureHash.sha256(Files.readAllBytes(testJar))
val storage = NodeAttachmentStorage(fs.getPath("/"))
val id = testJar.use { storage.importAttachment(it) }
assertEquals(expectedHash, id)
assertNull(storage.openAttachment(SecureHash.randomSHA256()))
val stream = storage.openAttachment(expectedHash)!!.openAsJAR()
val e1 = stream.nextJarEntry!!
assertEquals("test1.txt", e1.name)
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "This is some useful content")
val e2 = stream.nextJarEntry!!
assertEquals("test2.txt", e2.name)
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content")
}
@Test
fun `duplicates not allowed`() {
val testJar = makeTestJar()
val storage = NodeAttachmentStorage(fs.getPath("/"))
testJar.use { storage.importAttachment(it) }
assertFailsWith<java.nio.file.FileAlreadyExistsException> {
testJar.use { storage.importAttachment(it) }
}
}
@Test
fun `corrupt entry throws exception`() {
val testJar = makeTestJar()
val storage = NodeAttachmentStorage(fs.getPath("/"))
val id = testJar.use { storage.importAttachment(it) }
// Corrupt the file in the store.
Files.write(fs.getPath("/", id.toString()), "arggghhhh".toByteArray(), StandardOpenOption.WRITE)
val e = assertFailsWith<NodeAttachmentStorage.OnDiskHashMismatch> {
storage.openAttachment(id)!!.open().use { it.readBytes() }
}
assertEquals(e.file, storage.storePath.resolve(id.toString()))
// But if we skip around and read a single entry, no exception is thrown.
storage.openAttachment(id)!!.openAsJAR().use {
it.nextJarEntry
it.readBytes()
}
}
private var counter = 0
private fun makeTestJar(): Path {
counter++
val f = fs.getPath("$counter.jar")
JarOutputStream(Files.newOutputStream(f)).use {
it.putNextEntry(JarEntry("test1.txt"))
it.write("This is some useful content".toByteArray())
it.closeEntry()
it.putNextEntry(JarEntry("test2.txt"))
it.write("Some more useful content".toByteArray())
it.closeEntry()
}
return f
}
}

View File

@ -27,9 +27,11 @@ import java.time.ZoneId
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
// TODO: Refactor this to use MockNode.
class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
lateinit var myNode: Pair<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode>
lateinit var serviceNode: Pair<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode>
lateinit var myMessaging: Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging>
lateinit var serviceMessaging: Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging>
lateinit var service: TimestamperNodeService
val ptx = TransactionBuilder().apply {
@ -47,16 +49,16 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
@Before
fun setup() {
myNode = makeNode()
serviceNode = makeNode()
mockServices = MockServices(net = serviceNode.second, storage = MockStorageService())
myMessaging = makeNode()
serviceMessaging = makeNode()
mockServices = MockServices(net = serviceMessaging.second, storage = MockStorageService())
val timestampingNodeID = network.setupTimestampingNode(true).first
(mockServices.networkMapService as MockNetworkMap).timestampingNodes.add(timestampingNodeID)
serverKey = timestampingNodeID.identity.owningKey
// And a separate one to be tested directly, to make the unit tests a bit faster.
service = TimestamperNodeService(serviceNode.second, Party("Unit test suite", ALICE), ALICE_KEY)
service = TimestamperNodeService(serviceMessaging.second, Party("Unit test suite", ALICE), ALICE_KEY)
}
class TestPSM(val server: LegallyIdentifiableNode, val now: Instant) : ProtocolLogic<Boolean>() {
@ -78,7 +80,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
@Test
fun successWithNetwork() {
val psm = runNetwork {
val smm = StateMachineManager(MockServices(net = myNode.second), RunOnCallerThread)
val smm = StateMachineManager(MockServices(net = myMessaging.second), RunOnCallerThread)
val logName = TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC
val psm = TestPSM(mockServices.networkMapService.timestampingNodes[0], clock.instant())
smm.add(logName, psm)
@ -91,14 +93,14 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
// Zero commands is not OK.
assertFailsWith(TimestampingError.RequiresExactlyOneCommand::class) {
val wtx = ptx.toWireTransaction()
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
}
// More than one command is not OK.
assertFailsWith(TimestampingError.RequiresExactlyOneCommand::class) {
ptx.addCommand(TimestampCommand(clock.instant(), 30.seconds), ALICE)
ptx.addCommand(TimestampCommand(clock.instant(), 40.seconds), ALICE)
val wtx = ptx.toWireTransaction()
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
}
}
@ -108,7 +110,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
val now = clock.instant()
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), ALICE)
val wtx = ptx.toWireTransaction()
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
}
}
@ -118,7 +120,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
val now = clock.instant()
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), ALICE)
val wtx = ptx.toWireTransaction()
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
}
}
@ -127,7 +129,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
val now = clock.instant()
ptx.addCommand(TimestampCommand(now - 20.seconds, now + 20.seconds), ALICE)
val wtx = ptx.toWireTransaction()
val sig = service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
val sig = service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
ptx.checkAndAddSignature(sig)
ptx.toSignedTransaction(false).verifySignatures()
}