Merge remote-tracking branch 'corda-public/master'

This commit is contained in:
Chris Rankin
2017-02-02 14:00:32 +00:00
1318 changed files with 10223 additions and 6461 deletions

View File

@ -1,5 +1,6 @@
package net.corda.node.driver
import com.google.common.net.HostAndPort
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
@ -25,6 +26,14 @@ class DriverTests {
// Check that the port is bound
addressMustNotBeBound(executorService, hostAndPort)
}
fun webserverMustBeUp(webserverAddr: HostAndPort) {
addressMustBeBound(executorService, webserverAddr)
}
fun webserverMustBeDown(webserverAddr: HostAndPort) {
addressMustNotBeBound(executorService, webserverAddr)
}
}
@Test
@ -60,4 +69,15 @@ class DriverTests {
}
nodeMustBeDown(nodeInfo.nodeInfo)
}
@Test
fun `starting a node and independent web server works`() {
val addr = driver {
val node = startNode("test").getOrThrow()
val webserverAddr = startWebserver(node).getOrThrow()
webserverMustBeUp(webserverAddr)
webserverAddr
}
webserverMustBeDown(addr)
}
}

View File

@ -5,14 +5,15 @@ import net.corda.core.contracts.Amount
import net.corda.core.contracts.POUNDS
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.CashFlowResult
import net.corda.node.driver.DriverBasedTest
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver
@ -137,13 +138,13 @@ class DistributedServiceTests : DriverBasedTest() {
val issueHandle = aliceProxy.startFlow(
::CashFlow,
CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
issueHandle.returnValue.toFuture().getOrThrow()
}
private fun paySelf(amount: Amount<Currency>) {
val payHandle = aliceProxy.startFlow(
::CashFlow,
CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
payHandle.returnValue.toFuture().getOrThrow()
}
}

View File

@ -21,6 +21,7 @@ class ArgsParser {
.withRequiredArg()
.defaultsTo("node.conf")
private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
private val isWebserverArg = optionParser.accepts("webserver")
private val helpArg = optionParser.accepts("help").forHelp()
fun parse(vararg args: String): CmdLineOptions {
@ -30,13 +31,14 @@ class ArgsParser {
}
val baseDirectory = Paths.get(optionSet.valueOf(baseDirectoryArg)).normalize().toAbsolutePath()
val configFile = baseDirectory / optionSet.valueOf(configFileArg)
return CmdLineOptions(baseDirectory, configFile, optionSet.has(helpArg), optionSet.has(logToConsoleArg))
val isWebserver = optionSet.has(isWebserverArg)
return CmdLineOptions(baseDirectory, configFile, optionSet.has(helpArg), optionSet.has(logToConsoleArg), isWebserver)
}
fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink)
}
data class CmdLineOptions(val baseDirectory: Path, val configFile: Path?, val help: Boolean, val logToConsole: Boolean) {
data class CmdLineOptions(val baseDirectory: Path, val configFile: Path?, val help: Boolean, val logToConsole: Boolean, val isWebserver: Boolean) {
fun loadConfig(allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
return ConfigHelper.loadConfig(baseDirectory, configFile, allowMissingConfig, configOverrides)
}

View File

@ -7,6 +7,7 @@ import net.corda.core.utilities.Emoji
import net.corda.node.internal.Node
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.utilities.ANSIProgressObserver
import net.corda.node.webserver.WebServer
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import org.slf4j.LoggerFactory
@ -57,7 +58,8 @@ fun main(args: Array<String>) {
drawBanner()
System.setProperty("log-path", (cmdlineOptions.baseDirectory / "logs").toString())
val logDir = if (cmdlineOptions.isWebserver) "logs/web" else "logs"
System.setProperty("log-path", (cmdlineOptions.baseDirectory / logDir).toString())
val log = LoggerFactory.getLogger("Main")
printBasicNodeInfo("Logs can be found in", System.getProperty("log-path"))
@ -78,33 +80,39 @@ fun main(args: Array<String>) {
log.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}")
log.info("Machine: ${InetAddress.getLocalHost().hostName}")
log.info("Working Directory: ${cmdlineOptions.baseDirectory}")
if(cmdlineOptions.isWebserver) {
log.info("Starting as webserver on ${conf.webAddress}")
} else {
log.info("Starting as node on ${conf.artemisAddress}")
}
try {
cmdlineOptions.baseDirectory.createDirectories()
val node = conf.createNode()
node.start()
printPluginsAndServices(node)
// TODO: Webserver should be split and start from inside a WAR container
if (!cmdlineOptions.isWebserver) {
val node = conf.createNode()
node.start()
printPluginsAndServices(node)
thread {
Thread.sleep(30.seconds.toMillis())
while (!node.networkMapRegistrationFuture.isDone) {
printBasicNodeInfo("Waiting for response from network map ...")
Thread.sleep(30.seconds.toMillis())
node.networkMapRegistrationFuture.success {
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
printBasicNodeInfo("Node started up and registered in $elapsed sec")
if (renderBasicInfoToConsole)
ANSIProgressObserver(node.smm)
} failure {
log.error("Error during network map registration", it)
exitProcess(1)
}
}
node.networkMapRegistrationFuture.success {
node.run()
} else {
val server = WebServer(conf)
server.start()
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
printBasicNodeInfo("Node started up and registered in $elapsed sec")
if (renderBasicInfoToConsole)
ANSIProgressObserver(node.smm)
} failure {
log.error("Error during network map registration", it)
exitProcess(1)
printBasicNodeInfo("Webserver started up in $elapsed sec")
server.run()
}
node.run()
} catch (e: Exception) {
log.error("Exception during node startup", e)
exitProcess(1)

View File

@ -1,153 +0,0 @@
package net.corda.node.api
import net.corda.core.contracts.*
import net.corda.node.api.StatesQuery
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import java.time.Instant
import java.time.LocalDateTime
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
/**
* Top level interface to external interaction with the distributed ledger.
*
* Wherever a list is returned by a fetchXXX method that corresponds with an input list, that output list will have optional elements
* where a null indicates "missing" and the elements returned will be in the order corresponding with the input list.
*
*/
@Path("")
interface APIServer {
/**
* Report current UTC time as understood by the platform.
*/
@GET
@Path("servertime")
@Produces(MediaType.APPLICATION_JSON)
fun serverTime(): LocalDateTime
/**
* Report whether this node is started up or not.
*/
@GET
@Path("status")
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this node's configuration and identities.
* Currently tunnels the NodeInfo as an encoding of the Kryo serialised form.
* TODO this functionality should be available via the RPC
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
/**
* Query your "local" states (containing only outputs involving you) and return the hashes & indexes associated with them
* to probably be later inflated by fetchLedgerTransactions() or fetchStates() although because immutable you can cache them
* to avoid calling fetchLedgerTransactions() many times.
*
* @param query Some "where clause" like expression.
* @return Zero or more matching States.
*/
fun queryStates(query: StatesQuery): List<StateRef>
fun fetchStates(states: List<StateRef>): Map<StateRef, TransactionState<ContractState>?>
/**
* Query for immutable transactions (results can be cached indefinitely by their id/hash).
*
* @param txs The hashes (from [StateRef.txhash] returned from [queryStates]) you would like full transactions for.
* @return null values indicate missing transactions from the requested list.
*/
fun fetchTransactions(txs: List<SecureHash>): Map<SecureHash, SignedTransaction?>
/**
* TransactionBuildSteps would be invocations of contract.generateXXX() methods that all share a common TransactionBuilder
* and a common contract type (e.g. Cash or CommercialPaper)
* which would automatically be passed as the first argument (we'd need that to be a criteria/pattern of the generateXXX methods).
*/
fun buildTransaction(type: ContractDefRef, steps: List<TransactionBuildStep>): SerializedBytes<WireTransaction>
/**
* Generate a signature for this transaction signed by us.
*/
fun generateTransactionSignature(tx: SerializedBytes<WireTransaction>): DigitalSignature.WithKey
/**
* Attempt to commit transaction (returned from build transaction) with the necessary signatures for that to be
* successful, otherwise exception is thrown.
*/
fun commitTransaction(tx: SerializedBytes<WireTransaction>, signatures: List<DigitalSignature.WithKey>): SecureHash
/**
* This method would not return until the flow is finished (hence the "Sync").
*
* Longer term we'd add an Async version that returns some kind of FlowInvocationRef that could be queried and
* would appear on some kind of event message that is broadcast informing of progress.
*
* Will throw exception if flow fails.
*/
fun invokeFlowSync(type: FlowRef, args: Map<String, Any?>): Any?
// fun invokeFlowAsync(type: FlowRef, args: Map<String, Any?>): FlowInstanceRef
/**
* Fetch flows that require a response to some prompt/question by a human (on the "bank" side).
*/
fun fetchFlowsRequiringAttention(query: StatesQuery): Map<StateRef, FlowRequiringAttention>
/**
* Provide the response that a flow is waiting for.
*
* @param flow Should refer to a previously supplied FlowRequiringAttention.
* @param stepId Which step of the flow are we referring too.
* @param choice Should be one of the choices presented in the FlowRequiringAttention.
* @param args Any arguments required.
*/
fun provideFlowResponse(flow: FlowInstanceRef, choice: SecureHash, args: Map<String, Any?>)
}
/**
* Encapsulates the contract type. e.g. Cash or CommercialPaper etc.
*/
interface ContractDefRef {
}
data class ContractClassRef(val className: String) : ContractDefRef
data class ContractLedgerRef(val hash: SecureHash) : ContractDefRef
/**
* Encapsulates the flow to be instantiated. e.g. TwoPartyTradeFlow.Buyer.
*/
interface FlowRef {
}
data class FlowClassRef(val className: String) : FlowRef
data class FlowInstanceRef(val flowInstance: SecureHash, val flowClass: FlowClassRef, val flowStepId: String)
/**
* Thinking that Instant is OK for short lived flow deadlines.
*/
data class FlowRequiringAttention(val ref: FlowInstanceRef, val prompt: String, val choiceIdsToMessages: Map<SecureHash, String>, val dueBy: Instant)
/**
* Encapsulate a generateXXX method call on a contract.
*/
data class TransactionBuildStep(val generateMethodName: String, val args: Map<String, Any?>)

View File

@ -2,8 +2,6 @@
package net.corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
@ -12,6 +10,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
@ -19,12 +18,15 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.JsonSupport
import net.corda.node.utilities.ServiceIdentityGenerator
import okhttp3.OkHttpClient
import okhttp3.Request
import org.slf4j.Logger
import java.io.File
import java.net.*
@ -34,14 +36,12 @@ import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
*
@ -84,6 +84,13 @@ interface DriverDSLExposedInterface {
type: ServiceType = RaftValidatingNotaryService.type,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
/**
* Starts a web server for a node
*
* @param handle The handle for the node that this webserver connects to via RPC.
*/
fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort>
fun waitForAllNodesToFinish()
}
@ -94,6 +101,7 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
data class NodeHandle(
val nodeInfo: NodeInfo,
val rpc: CordaRPCOps,
val configuration: FullNodeConfiguration,
val process: Process
) {
@ -187,6 +195,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
return returnValue
} catch (exception: Throwable) {
println("Driver shutting down because of exception $exception")
exception.printStackTrace()
throw exception
} finally {
driverDsl.shutdown()
@ -318,24 +327,16 @@ open class DriverDSL(
executorService.shutdown()
}
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
val url = URL("http://$webAddress/api/info")
try {
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
if (conn.responseCode != 200) {
log.error("Received response code ${conn.responseCode} from $url during startup.")
return null
private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture<CordaRPCOps> {
val client = CordaRPCClient(nodeAddress, sslConfig)
return poll(executorService, "for RPC connection") {
try {
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return@poll client.proxy()
} catch(e: Exception) {
log.error("Exception $e, Retrying RPC connection at $nodeAddress")
null
}
// For now the NodeInfo is tunneled in its Kryo format over the Node's Web interface.
val om = ObjectMapper()
val module = SimpleModule("NodeInfo")
module.addDeserializer(NodeInfo::class.java, JsonSupport.NodeInfoDeserializer)
om.registerModule(module)
return om.readValue(conn.inputStream, NodeInfo::class.java)
} catch(e: Exception) {
log.error("Could not query node info at $url due to an exception.", e)
return null
}
}
@ -375,10 +376,14 @@ open class DriverDSL(
)
)
val startNode = startNode(executorService, configuration, quasarJarPath, debugPort)
registerProcess(startNode)
return startNode.map {
NodeHandle(queryNodeInfo(apiAddress)!!, configuration, it)
val processFuture = startNode(executorService, configuration, quasarJarPath, debugPort)
registerProcess(processFuture)
return processFuture.flatMap { process ->
establishRpc(messagingAddress, configuration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().toFuture().map {
NodeHandle(rpc.nodeIdentity(), rpc, configuration, process)
}
}
}
}
@ -413,12 +418,39 @@ open class DriverDSL(
}
}
private fun queryWebserver(configuration: FullNodeConfiguration): HostAndPort? {
val protocol = if (configuration.useHTTPS) {
"https://"
} else {
"http://"
}
val url = URL(protocol + configuration.webAddress.toString() + "/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build()
while (true) try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return configuration.webAddress
}
} catch(e: ConnectException) {
log.debug("Retrying webserver info at ${configuration.webAddress}")
}
}
override fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
return future {
registerProcess(DriverDSL.startWebserver(executorService, handle.configuration, debugPort))
queryWebserver(handle.configuration)!!
}
}
override fun start() {
startNetworkMapService()
}
private fun startNetworkMapService(): ListenableFuture<Process> {
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val baseDirectory = driverDirectory / networkMapLegalName
@ -428,7 +460,6 @@ open class DriverDSL(
configOverrides = mapOf(
"myLegalName" to networkMapLegalName,
"artemisAddress" to networkMapAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to "",
"useTestClock" to useTestClock
)
@ -468,30 +499,66 @@ open class DriverDSL(
else
""
val javaArgs = listOf(
path,
"-Dname=${nodeConf.myLegalName}",
"-javaagent:$quasarJarPath",
debugPortArg,
"-Dvisualvm.display.name=Corda",
"-Xmx200m",
"-XX:+UseG1GC",
"-cp", classpath,
className,
"--base-directory=${nodeConf.baseDirectory}"
).filter(String::isNotEmpty)
val additionalKeys = listOf("amq.delivery.delay.ms")
val systemArgs = mutableMapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "Corda"
)
for (key in additionalKeys) {
if (System.getProperty(key) != null) {
systemArgs.set(key, System.getProperty(key))
}
}
val javaArgs = listOf(path) +
systemArgs.map { "-D${it.key}=${it.value}" } +
listOf(
"-javaagent:$quasarJarPath",
debugPortArg,
"-Xmx200m",
"-XX:+UseG1GC",
"-cp", classpath,
className,
"--base-directory=${nodeConf.baseDirectory}"
).filter(String::isNotEmpty)
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return Futures.allAsList(
addressMustBeBound(executorService, nodeConf.artemisAddress),
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
// the web api address to be bound as well, as that starts after the services. Needs rethinking.
addressMustBeBound(executorService, nodeConf.webAddress)
).map { process }
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. Needs rethinking.
return addressMustBeBound(executorService, nodeConf.artemisAddress).map { process }
}
private fun startWebserver(
executorService: ScheduledExecutorService,
nodeConf: FullNodeConfiguration,
debugPort: Int?): ListenableFuture<Process> {
val className = "net.corda.node.Corda" // cannot directly get class for this, so just use string
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val debugPortArg = if (debugPort != null)
listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort")
else
emptyList()
val javaArgs = listOf(path) +
listOf("-Dname=node-${nodeConf.artemisAddress}-webserver") + debugPortArg +
listOf(
"-cp", classpath, className,
"--base-directory", nodeConf.baseDirectory.toString(),
"--webserver")
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return addressMustBeBound(executorService, nodeConf.webAddress).map { process }
}
}
}

View File

@ -1,92 +0,0 @@
package net.corda.node.internal
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.DealState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.api.*
import java.time.LocalDateTime
import javax.ws.rs.core.Response
class APIServerImpl(val node: AbstractNode) : APIServer {
override fun serverTime(): LocalDateTime = LocalDateTime.now(node.services.clock)
override fun status(): Response {
return if (node.started) {
Response.ok("started").build()
} else {
Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("not started").build()
}
}
override fun info() = node.services.myInfo
override fun queryStates(query: StatesQuery): List<StateRef> {
// We're going to hard code two options here for now and assume that all LinearStates are deals
// Would like to maybe move to a model where we take something like a JEXL string, although don't want to develop
// something we can't later implement against a persistent store (i.e. need to pick / build a query engine)
if (query is StatesQuery.Selection) {
if (query.criteria is StatesQuery.Criteria.AllDeals) {
val states = node.services.vaultService.linearHeads
return states.values.map { it.ref }
} else if (query.criteria is StatesQuery.Criteria.Deal) {
val states = node.services.vaultService.linearHeadsOfType<DealState>().filterValues {
it.state.data.ref == query.criteria.ref
}
return states.values.map { it.ref }
}
}
return emptyList()
}
override fun fetchStates(states: List<StateRef>): Map<StateRef, TransactionState<ContractState>?> {
return node.services.vaultService.statesForRefs(states)
}
override fun fetchTransactions(txs: List<SecureHash>): Map<SecureHash, SignedTransaction?> {
throw UnsupportedOperationException()
}
override fun buildTransaction(type: ContractDefRef, steps: List<TransactionBuildStep>): SerializedBytes<WireTransaction> {
throw UnsupportedOperationException()
}
override fun generateTransactionSignature(tx: SerializedBytes<WireTransaction>): DigitalSignature.WithKey {
throw UnsupportedOperationException()
}
override fun commitTransaction(tx: SerializedBytes<WireTransaction>, signatures: List<DigitalSignature.WithKey>): SecureHash {
throw UnsupportedOperationException()
}
override fun invokeFlowSync(type: FlowRef, args: Map<String, Any?>): Any? {
return invokeFlowAsync(type, args).get()
}
private fun invokeFlowAsync(type: FlowRef, args: Map<String, Any?>): ListenableFuture<out Any?> {
if (type is FlowClassRef) {
val flowLogicRef = node.services.flowLogicRefFactory.createKotlin(type.className, args)
val flowInstance = node.services.flowLogicRefFactory.toFlowLogic(flowLogicRef)
return node.services.startFlow(flowInstance).resultFuture
} else {
throw UnsupportedOperationException("Unsupported FlowRef type: $type")
}
}
override fun fetchFlowsRequiringAttention(query: StatesQuery): Map<StateRef, FlowRequiringAttention> {
throw UnsupportedOperationException()
}
override fun provideFlowResponse(flow: FlowInstanceRef, choice: SecureHash, args: Map<String, Any?>) {
throw UnsupportedOperationException()
}
}

View File

@ -24,7 +24,6 @@ import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.sendRequest
import net.corda.node.api.APIServer
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
@ -52,6 +51,7 @@ import net.corda.node.utilities.databaseTransaction
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
import java.io.File
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Path
import java.security.KeyPair
@ -104,11 +104,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// low-performance prototyping period.
protected abstract val serverThread: AffinityExecutor
// Objects in this list will be scanned by the DataUploadServlet and can be handed new data via HTTP.
// Don't mutate this after startup.
protected val _servicesThatAcceptUploads = ArrayList<AcceptsFileUpload>()
val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads
private val flowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
protected val partyKeys = mutableSetOf<KeyPair>()
@ -163,7 +158,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
lateinit var identity: IdentityService
lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
lateinit var api: APIServer
lateinit var scheduler: NodeSchedulerService
lateinit var flowLogicFactory: FlowLogicRefFactory
lateinit var schemas: SchemaService
@ -219,7 +213,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = makeKeyManagementService()
api = APIServerImpl(this@AbstractNode)
flowLogicFactory = initialiseFlowLogicFactory()
scheduler = NodeSchedulerService(database, services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
@ -228,6 +221,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
customServices.clear()
customServices.addAll(buildPluginServices(tokenizableServices))
val uploaders: List<FileUploader> = listOf(storageServices.first.attachments as NodeAttachmentService) +
customServices.filterIsInstance(AcceptsFileUpload::class.java)
(storage as StorageServiceImpl).initUploaders(uploaders)
// TODO: uniquenessProvider creation should be inside makeNotaryService(), but notary service initialisation
// depends on smm, while smm depends on tokenizableServices, which uniquenessProvider is part of
advertisedServices.singleOrNull { it.type.isNotary() }?.let {
@ -275,16 +272,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun makeInfo(): NodeInfo {
val services = makeServiceEntries()
val advertisedServiceEntries = makeServiceEntries()
val legalIdentity = obtainLegalIdentity()
return NodeInfo(net.myAddress, legalIdentity, services, findMyLocation())
return NodeInfo(net.myAddress, legalIdentity, advertisedServiceEntries, findMyLocation())
}
/**
* A service entry contains the advertised [ServiceInfo] along with the service identity. The identity *name* is
* taken from the configuration or, if non specified, generated by combining the node's legal name and the service id.
*/
protected fun makeServiceEntries(): List<ServiceEntry> {
protected open fun makeServiceEntries(): List<ServiceEntry> {
return advertisedServices.map {
val serviceId = it.type.id
val serviceName = it.name ?: "$serviceId|${configuration.myLegalName}"
@ -353,9 +350,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val service = serviceConstructor.apply(services)
serviceList.add(service)
tokenizableServices.add(service)
if (service is AcceptsFileUpload) {
_servicesThatAcceptUploads += service
}
}
return serviceList
}
@ -485,7 +479,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val attachments = makeAttachmentStorage(dir)
val checkpointStorage = DBCheckpointStorage()
val transactionStorage = DBTransactionStorage()
_servicesThatAcceptUploads += attachments
val stateMachineTransactionMappingStorage = DBTransactionMappingStorage()
return Pair(
constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage),

View File

@ -104,7 +104,14 @@ class CordaRPCOpsImpl(
override fun attachmentExists(id: SecureHash) = services.storageService.attachments.openAttachment(id) != null
override fun uploadAttachment(jar: InputStream) = services.storageService.attachments.importAttachment(jar)
override fun currentNodeTime(): Instant = Instant.now(services.clock)
override fun uploadFile(dataType: String, name: String?, file: InputStream): String {
val acceptor = services.storageService.uploaders.firstOrNull { it.accepts(dataType) }
return databaseTransaction(database) {
acceptor?.upload(file) ?: throw RuntimeException("Cannot find file upload acceptor for $dataType")
}
}
override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.toObservable()
override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key)
override fun partyFromName(name: String) = services.identityService.partyFromName(name)

View File

@ -6,13 +6,12 @@ import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
@ -20,39 +19,19 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.servlets.AttachmentDownloadServlet
import net.corda.node.servlets.Config
import net.corda.node.servlets.DataUploadServlet
import net.corda.node.servlets.ResponseFilter
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.databaseTransaction
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer
import org.jetbrains.exposed.sql.Database
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.lang.reflect.InvocationTargetException
import java.net.InetAddress
import java.nio.channels.FileLock
import java.time.Clock
import java.util.*
import javax.management.ObjectName
import javax.servlet.*
import kotlin.concurrent.thread
@ -111,7 +90,6 @@ class Node(override val configuration: FullNodeConfiguration,
// serialisation/deserialisation work.
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
lateinit var webServer: Server
var messageBroker: ArtemisMessagingServer? = null
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
@ -157,116 +135,6 @@ class Node(override val configuration: FullNodeConfiguration,
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
// TODO: add flag to enable/disable webserver
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
val handlerCollection = HandlerCollection()
// Export JMX monitoring statistics and data over REST/JSON.
if (configuration.exportJMXto.split(',').contains("http")) {
val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator"))
val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
if (warpath != null) {
handlerCollection.addHandler(WebAppContext().apply {
// Find the jolokia WAR file on the classpath.
contextPath = "/monitoring/json"
setInitParameter("mimeType", "application/json")
war = warpath
})
} else {
log.warn("Unable to locate Jolokia WAR on classpath")
}
}
// API, data upload and download to services (attachments, rates oracles etc)
handlerCollection.addHandler(buildServletContextHandler(localRpc))
val server = Server()
val connector = if (configuration.useHTTPS) {
val httpsConfiguration = HttpConfiguration()
httpsConfiguration.outputBufferSize = 32768
httpsConfiguration.addCustomizer(SecureRequestCustomizer())
val sslContextFactory = SslContextFactory()
sslContextFactory.keyStorePath = configuration.keyStoreFile.toString()
sslContextFactory.setKeyStorePassword(configuration.keyStorePassword)
sslContextFactory.setKeyManagerPassword(configuration.keyStorePassword)
sslContextFactory.setTrustStorePath(configuration.trustStoreFile.toString())
sslContextFactory.setTrustStorePassword(configuration.trustStorePassword)
sslContextFactory.setExcludeProtocols("SSL.*", "TLSv1", "TLSv1.1")
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
sslConnector.port = configuration.webAddress.port
sslConnector
} else {
val httpConfiguration = HttpConfiguration()
httpConfiguration.outputBufferSize = 32768
val httpConnector = ServerConnector(server, HttpConnectionFactory(httpConfiguration))
httpConnector.port = configuration.webAddress.port
httpConnector
}
server.connectors = arrayOf<Connector>(connector)
server.handler = handlerCollection
runOnStop += Runnable { server.stop() }
server.start()
printBasicNodeInfo("Embedded web server is listening on", "http://${InetAddress.getLocalHost().hostAddress}:${connector.port}/")
return server
}
private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
setAttribute("node", this@Node)
addServlet(DataUploadServlet::class.java, "/upload/*")
addServlet(AttachmentDownloadServlet::class.java, "/attachments/*")
val resourceConfig = ResourceConfig()
// Add your API provider classes (annotated for JAX-RS) here
resourceConfig.register(Config(services))
resourceConfig.register(ResponseFilter())
resourceConfig.register(api)
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
for (webapi in webAPIsOnClasspath) {
log.info("Add plugin web API from attachment $webapi")
val customAPI = try {
webapi.apply(localRpc)
} catch (ex: InvocationTargetException) {
log.error("Constructor $webapi threw an error: ", ex.targetException)
continue
}
resourceConfig.register(customAPI)
}
val staticDirMaps = pluginRegistries.map { x -> x.staticServeDirs }
val staticDirs = staticDirMaps.flatMap { it.keys }.zip(staticDirMaps.flatMap { it.values })
staticDirs.forEach {
val staticDir = ServletHolder(DefaultServlet::class.java)
staticDir.setInitParameter("resourceBase", it.second)
staticDir.setInitParameter("dirAllowed", "true")
staticDir.setInitParameter("pathInfoOnly", "true")
addServlet(staticDir, "/web/${it.first}/*")
}
// Give the app a slightly better name in JMX rather than a randomly generated one and enable JMX
resourceConfig.addProperties(mapOf(ServerProperties.APPLICATION_NAME to "node.api",
ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED to "true"))
val container = ServletContainer(resourceConfig)
val jerseyServlet = ServletHolder(container)
addServlet(jerseyServlet, "/api/*")
jerseyServlet.initOrder = 0 // Initialise at server start
// Wrap all API calls in a database transaction.
val filterHolder = FilterHolder(DatabaseTransactionFilter(database))
addFilter(filterHolder, "/api/*", EnumSet.of(DispatcherType.REQUEST))
addFilter(filterHolder, "/upload/*", EnumSet.of(DispatcherType.REQUEST))
}
}
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
return when (type) {
RaftValidatingNotaryService.type -> with(configuration) {
@ -305,43 +173,27 @@ class Node(override val configuration: FullNodeConfiguration,
super.initialiseDatabasePersistence(insideTransaction)
}
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
val client = CordaRPCClient(configuration.artemisAddress, configuration)
client.start(NODE_USER, NODE_USER)
return client.proxy()
}
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
// Only start the service API requests once the network map registration is successfully complete
networkMapRegistrationFuture.success {
// This needs to be in a seperate thread so that we can reply to our own request to become RPC clients
thread(name = "WebServer") {
try {
webServer = initWebServer(connectLocalRpcAsNodeUser())
} catch(ex: Exception) {
// TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API
// is not critical and we continue anyway.
log.error("Web server startup failed", ex)
}
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
// Only start the service API requests once the network map registration is complete
thread(name = "WebServer") {
networkMapRegistrationFuture.getOrThrow()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownThread = thread(start = false) {
@ -420,6 +272,7 @@ class Node(override val configuration: FullNodeConfiguration,
chain.doFilter(request, response)
}
}
override fun init(filterConfig: FilterConfig?) {}
override fun destroy() {}
}

View File

@ -1,5 +1,7 @@
package net.corda.node.services.api
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.FileUploader
import java.io.InputStream
/**
@ -7,16 +9,12 @@ import java.io.InputStream
*
* TODO: In future, also accept uploads over the MQ interface too.
*/
interface AcceptsFileUpload {
interface AcceptsFileUpload: FileUploader {
/** A string that prefixes the URLs, e.g. "attachments" or "interest-rates". Should be OK for URLs. */
val dataTypePrefix: String
/** What file extensions are acceptable for the file to be handed to upload() */
val acceptableFileExtensions: List<String>
/**
* Accepts the data in the given input stream, and returns some sort of useful return message that will be sent
* back to the user in the response.
*/
fun upload(data: InputStream): String
override fun accepts(prefix: String) = prefix == dataTypePrefix
}

View File

@ -4,6 +4,8 @@ import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.logElapsedTime
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.minutes
import net.corda.core.seconds
import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
@ -11,6 +13,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Observable
import java.io.Closeable
import java.time.Duration
@ -24,7 +27,7 @@ import javax.annotation.concurrent.ThreadSafe
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will not be authenticated, nor will RPC traffic be encrypted.
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?) : Closeable, ArtemisMessagingComponent() {
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
private companion object {
val log = loggerFor<CordaRPCClient>()
}
@ -49,11 +52,15 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio
check(!running)
log.logElapsedTime("Startup") {
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.
// See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port)).apply {
// TODO: Put these in config file or make it user configurable?
threadPoolMaxSize = 1
confirmationWindowSize = 100000 // a guess
retryInterval = 5.seconds.toMillis()
retryIntervalMultiplier = 1.5 // Exponential backoff
maxRetryInterval = 3.minutes.toMillis()
serviceConfigurationOverride?.invoke(this)
}
sessionFactory = serverLocator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()

View File

@ -15,10 +15,10 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.bouncycastle.asn1.x500.X500Name
@ -67,6 +67,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// confusion.
const val TOPIC_PROPERTY = "platform-topic"
const val SESSION_ID_PROPERTY = "session-id"
val AMQ_DELAY = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
}
private class InnerState {
@ -102,12 +103,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
fun start(rpcOps: RPCOps, userService: RPCUserService) {
state.locked {
@ -368,6 +369,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
writeBodyBufferBytes(message.data)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (AMQ_DELAY > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + AMQ_DELAY);
}
}
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " +
"uuid: ${message.uniqueMessageId}")
@ -376,6 +382,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun getMQAddress(target: MessageRecipients): String {
return if (target == myAddress) {
// If we are sending to ourselves then route the message directly to our P2P queue.

View File

@ -28,7 +28,6 @@ import net.corda.core.node.services.*
import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.flows.CashFlowResult
import net.corda.node.internal.AbstractNode
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
@ -36,6 +35,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAdd
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.commons.fileupload.MultipartStream
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -146,6 +146,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(BufferedInputStream::class.java, InputStreamSerializer)
register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer)
register(MultipartStream.ItemInputStream::class.java, InputStreamSerializer)
noReferencesWithin<WireTransaction>()
@ -190,8 +191,6 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Cash.Clauses.ConserveAmount::class.java)
register(listOf(Unit).javaClass) // SingletonList
register(setOf(Unit).javaClass) // SingletonSet
register(CashFlowResult.Success::class.java)
register(CashFlowResult.Failed::class.java)
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)
@ -208,6 +207,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(SimpleString::class.java)
register(ServiceEntry::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(RuntimeException::class.java)
register(IllegalArgumentException::class.java)
register(ArrayIndexOutOfBoundsException::class.java)
register(IndexOutOfBoundsException::class.java)

View File

@ -2,16 +2,17 @@ package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.*
import net.corda.core.node.CordaPluginRegistry
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
import java.util.function.Function
import javax.annotation.concurrent.ThreadSafe
object DataVending {
@ -33,55 +34,40 @@ object DataVending {
*/
@ThreadSafe
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
companion object {
val logger = loggerFor<DataVending.Service>()
}
init {
services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler)
services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler)
services.registerFlowInitiator(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
}
private class FetchTransactionsHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
require(it.hashes.isNotEmpty())
it
}
val txs = request.hashes.map {
val tx = serviceHub.storageService.validatedTransactions.getTransaction(it)
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
}
send(otherParty, txs)
private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
override fun getData(id: SecureHash): SignedTransaction? {
return serviceHub.storageService.validatedTransactions.getTransaction(id)
}
}
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
private class FetchAttachmentsHandler(val otherParty: Party) : FlowLogic<Unit>() {
private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
override fun getData(id: SecureHash): ByteArray? {
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
}
}
private abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
@Throws(FetchDataFlow.HashNotFound::class)
override fun call() {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
require(it.hashes.isNotEmpty())
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
it
}
val attachments = request.hashes.map {
val jar: InputStream? = serviceHub.storageService.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
}
val response = request.hashes.map {
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
}
send(otherParty, attachments)
send(otherParty, response)
}
protected abstract fun getData(id: SecureHash): T?
}

View File

@ -1,12 +1,15 @@
package net.corda.node.services.persistence
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.TransactionStorage
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.node.services.*
import net.corda.core.serialization.SingletonSerializeAsToken
open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val validatedTransactions: TransactionStorage,
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage)
: SingletonSerializeAsToken(), TxWritableStorageService
: SingletonSerializeAsToken(), TxWritableStorageService {
lateinit override var uploaders: List<FileUploader>
fun initUploaders(uploadersList: List<FileUploader>) {
uploaders = uploadersList
}
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.ExecutionException
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>,
scheduler: FiberScheduler) : Fiber<R>("flow", scheduler), FlowStateMachine<R> {
scheduler: FiberScheduler) : Fiber<Unit>("flow", scheduler), FlowStateMachine<R> {
companion object {
// Used to work around a small limitation in Quasar.
private val QUASAR_UNBLOCKER = run {
@ -49,7 +49,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Transient override lateinit var serviceHub: ServiceHubInternal
@Transient internal lateinit var database: Database
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
@Transient internal lateinit var actionOnEnd: () -> Unit
@Transient internal lateinit var actionOnEnd: (Pair<FlowException, Boolean>?) -> Unit
@Transient internal var fromCheckpoint: Boolean = false
@Transient private var txTrampoline: Transaction? = null
@ -80,26 +80,35 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun run(): R {
override fun run() {
createTransaction()
val result = try {
logic.call()
} catch (e: FlowException) {
// Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive).
val propagated = e.stackTrace[0].className == javaClass.name
actionOnEnd(Pair(e, propagated))
_resultFuture?.setException(e)
return
} catch (t: Throwable) {
actionOnEnd()
actionOnEnd(null)
_resultFuture?.setException(t)
throw ExecutionException(t)
}
// Only sessions which have a single send and nothing else will block here
openSessions.values
.filter { it.state is FlowSessionState.Initiating }
.forEach { it.waitForConfirmation() }
// This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd()
actionOnEnd(null)
_resultFuture?.set(result)
return result
}
private fun createTransaction() {
// Make sure we have a database transaction
createDatabaseTransaction(database)
logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}." }
logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}" }
}
internal fun commitTransaction() {
@ -121,34 +130,48 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
otherParty: Party,
payload: Any,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
val (session, new) = getSession(otherParty, sessionFlow, payload)
val receivedSessionData = if (new) {
val session = getConfirmedSession(otherParty, sessionFlow)
return if (session == null) {
// Only do a receive here as the session init has carried the payload
receiveInternal<SessionData>(session)
receiveInternal<SessionData>(startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true))
} else {
val sendSessionData = createSessionData(session, payload)
sendAndReceiveInternal<SessionData>(session, sendSessionData)
}
return receivedSessionData.checkPayloadIs(receiveType)
sendAndReceiveInternal<SessionData>(session, createSessionData(session, payload))
}.checkPayloadIs(receiveType)
}
@Suspendable
override fun <T : Any> receive(receiveType: Class<T>,
otherParty: Party,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
val session = getSession(otherParty, sessionFlow, null).first
val session = getConfirmedSession(otherParty, sessionFlow) ?: startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
return receiveInternal<SessionData>(session).checkPayloadIs(receiveType)
}
@Suspendable
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
val (session, new) = getSession(otherParty, sessionFlow, payload)
if (!new) {
val session = getConfirmedSession(otherParty, sessionFlow)
if (session == null) {
// Don't send the payload again if it was already piggy-backed on a session init
startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = false)
} else {
sendInternal(session, createSessionData(session, payload))
}
}
/**
* This method will suspend the state machine and wait for incoming session init response from other party.
*/
@Suspendable
private fun FlowSession.waitForConfirmation() {
val (peerParty, sessionInitResponse) = receiveInternal<SessionInitResponse>(this)
if (sessionInitResponse is SessionConfirm) {
state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
} else {
sessionInitResponse as SessionReject
throw FlowException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}")
}
}
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val sessionState = session.state
val peerSessionId = when (sessionState) {
@ -174,12 +197,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
private fun getSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): Pair<FlowSession, Boolean> {
val session = openSessions[Pair(sessionFlow, otherParty)]
return if (session != null) {
Pair(session, false)
} else {
Pair(startNewSession(otherParty, sessionFlow, firstPayload), true)
private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession? {
return openSessions[Pair(sessionFlow, otherParty)]?.apply {
if (state is FlowSessionState.Initiating) {
// Session still initiating, try to retrieve the init response.
waitForConfirmation()
}
}
}
@ -190,24 +213,21 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
*/
@Suspendable
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): FlowSession {
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession {
logger.trace { "Initiating a new session with $otherParty" }
val session = FlowSession(sessionFlow, random63BitValue(), FlowSessionState.Initiating(otherParty))
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty))
openSessions[Pair(sessionFlow, otherParty)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name
val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload)
val (peerParty, sessionInitResponse) = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit)
if (sessionInitResponse is SessionConfirm) {
require(session.state is FlowSessionState.Initiating)
session.state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
return session
} else {
sessionInitResponse as SessionReject
throw FlowException("Party $otherParty rejected session request: ${sessionInitResponse.errorMessage}")
sendInternal(session, sessionInit)
if (waitForConfirmation) {
session.waitForConfirmation()
}
return session
}
@Suspendable
@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
private fun <M : ExistingSessionMessage> suspendAndExpectReceive(receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> {
val session = receiveRequest.session
fun getReceivedMessage(): ReceivedSessionMessage<ExistingSessionMessage>? = session.receivedMessages.poll()
@ -224,19 +244,23 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
suspend(receiveRequest)
getReceivedMessage() ?:
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but instead " +
"got nothing: $receiveRequest")
"got nothing for $receiveRequest")
}
if (receivedMessage.message is SessionEnd) {
openSessions.values.remove(session)
throw FlowException("Party ${session.state.sendToParty} has ended their flow but we were expecting to " +
"receive ${receiveRequest.receiveType.simpleName} from them")
} else if (receiveRequest.receiveType.isInstance(receivedMessage.message)) {
@Suppress("UNCHECKED_CAST")
if (receiveRequest.receiveType.isInstance(receivedMessage.message)) {
return receivedMessage as ReceivedSessionMessage<M>
} else if (receivedMessage.message is SessionEnd) {
openSessions.values.remove(session)
if (receivedMessage.message.errorResponse != null) {
(receivedMessage.message.errorResponse as java.lang.Throwable).fillInStackTrace()
throw receivedMessage.message.errorResponse
} else {
throw FlowSessionException("${session.state.sendToParty} has ended their flow but we were expecting " +
"to receive ${receiveRequest.receiveType.simpleName} from them")
}
} else {
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but instead got " +
"${receivedMessage.message}: $receiveRequest")
"${receivedMessage.message} for $receiveRequest")
}
}

View File

@ -29,7 +29,7 @@ data class SessionData(override val recipientSessionId: Long, val payload: Any)
}
}
data class SessionEnd(override val recipientSessionId: Long) : ExistingSessionMessage
data class SessionEnd(override val recipientSessionId: Long, val errorResponse: FlowException?) : ExistingSessionMessage
data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party, val message: M)
@ -37,7 +37,9 @@ fun <T> ReceivedSessionMessage<SessionData>.checkPayloadIs(type: Class<T>): Untr
if (type.isInstance(message.payload)) {
return UntrustworthyData(type.cast(message.payload))
} else {
throw FlowException("We were expecting a ${type.name} from $sender but we instead got a " +
throw FlowSessionException("We were expecting a ${type.name} from $sender but we instead got a " +
"${message.payload.javaClass.name} (${message.payload})")
}
}
}
class FlowSessionException(message: String) : RuntimeException(message)

View File

@ -12,6 +12,7 @@ import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
@ -194,7 +195,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
checkpointStorage.forEach {
// If a flow is added before start() then don't attempt to restore it
if (!stateMachines.containsValue(it)) {
val fiber = deserializeFiber(it.serializedFiber)
val fiber = deserializeFiber(it)
initFiber(fiber)
stateMachines[fiber] = it
}
@ -256,7 +257,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (peerParty != null) {
if (message is SessionConfirm) {
logger.debug { "Received session confirmation but associated fiber has already terminated, so sending session end" }
sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId))
sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId, null))
} else {
logger.trace { "Ignoring session end message for already closed session: $message" }
}
@ -269,30 +270,44 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun onSessionInit(sessionInit: SessionInit, sender: Party) {
logger.trace { "Received $sessionInit $sender" }
val otherPartySessionId = sessionInit.initiatorSessionId
try {
val markerClass = Class.forName(sessionInit.flowName)
val flowFactory = serviceHub.getFlowFactory(markerClass)
if (flowFactory != null) {
val flow = flowFactory(sender)
val fiber = createFiber(flow)
val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(sender, otherPartySessionId))
if (sessionInit.firstPayload != null) {
session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload))
}
openSessions[session.ourSessionId] = session
fiber.openSessions[Pair(flow, sender)] = session
updateCheckpoint(fiber)
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), fiber)
fiber.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(fiber)
} else {
logger.warn("Unknown flow marker class in $sessionInit")
sendSessionMessage(sender, SessionReject(otherPartySessionId, "Don't know ${markerClass.name}"))
}
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
val markerClass = try {
Class.forName(sessionInit.flowName)
} catch (e: Exception) {
logger.warn("Received invalid $sessionInit", e)
sendSessionMessage(sender, SessionReject(otherPartySessionId, "Unable to establish session"))
sendSessionReject("Don't know ${sessionInit.flowName}")
return
}
val flowFactory = serviceHub.getFlowFactory(markerClass)
if (flowFactory == null) {
logger.warn("Unknown flow marker class in $sessionInit")
sendSessionReject("Don't know ${markerClass.name}")
return
}
val session = try {
val flow = flowFactory(sender)
val fiber = createFiber(flow)
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
if (sessionInit.firstPayload != null) {
session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload))
}
openSessions[session.ourSessionId] = session
fiber.openSessions[Pair(flow, sender)] = session
updateCheckpoint(fiber)
session
} catch (e: Exception) {
logger.warn("Couldn't start session for $sessionInit", e)
sendSessionReject("Unable to establish session")
return
}
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber)
session.fiber.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(session.fiber)
}
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
@ -302,11 +317,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
return fiber.serialize(kryo)
}
private fun deserializeFiber(serialisedFiber: SerializedBytes<FlowStateMachineImpl<*>>): FlowStateMachineImpl<*> {
private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> {
val kryo = quasarKryo()
// put the map of token -> tokenized into the kryo context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return serialisedFiber.deserialize(kryo).apply { fromCheckpoint = true }
return checkpoint.serializedFiber.deserialize(kryo).apply { fromCheckpoint = true }
}
private fun quasarKryo(): Kryo {
@ -330,14 +345,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
processIORequest(ioRequest)
decrementLiveFibers()
}
fiber.actionOnEnd = {
fiber.actionOnEnd = { errorResponse: Pair<FlowException, Boolean>? ->
try {
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
mutex.locked {
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
notifyChangeObservers(fiber, AddOrRemove.REMOVE)
}
endAllFiberSessions(fiber)
endAllFiberSessions(fiber, errorResponse)
} finally {
fiber.commitTransaction()
decrementLiveFibers()
@ -352,14 +367,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>) {
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, errorResponse: Pair<FlowException, Boolean>?) {
// TODO Blanking the stack trace prevents the receiving flow from filling in its own stack trace
// @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
// (errorResponse?.first as java.lang.Throwable?)?.stackTrace = emptyArray()
openSessions.values.removeIf { session ->
if (session.fiber == fiber) {
val initiatedState = session.state as? FlowSessionState.Initiated
if (initiatedState != null) {
sendSessionMessage(initiatedState.peerParty, SessionEnd(initiatedState.peerSessionId), fiber)
recentlyClosedSessions[session.ourSessionId] = initiatedState.peerParty
}
session.endSession(errorResponse)
true
} else {
false
@ -367,6 +381,25 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun FlowSession.endSession(errorResponse: Pair<FlowException, Boolean>?) {
val initiatedState = state as? Initiated ?: return
val propagatedException = errorResponse?.let {
val (exception, propagated) = it
if (propagated) {
// This exception was propagated to us. We only propagate it down the invocation chain to the flow that
// initiated us, not to flows we've started sessions with.
if (initiatingParty != null) exception else null
} else {
exception // Our local flow threw the exception so propagate it
}
}
sendSessionMessage(
initiatedState.peerParty,
SessionEnd(initiatedState.peerSessionId, propagatedException),
fiber)
recentlyClosedSessions[ourSessionId] = initiatedState.peerParty
}
private fun startFiber(fiber: FlowStateMachineImpl<*>) {
try {
resumeFiber(fiber)
@ -483,6 +516,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
data class FlowSession(
val flow: FlowLogic<*>,
val ourSessionId: Long,
val initiatingParty: Party?,
var state: FlowSessionState,
@Volatile var waitingForResponse: Boolean = false
) {

View File

@ -0,0 +1,9 @@
package net.corda.node.utilities
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import java.nio.file.Path
import java.nio.file.Paths
fun Config.getHostAndPort(name: String): HostAndPort = HostAndPort.fromString(getString(name))
fun Config.getPath(name: String): Path = Paths.get(getString(name))

View File

@ -11,11 +11,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import net.corda.core.contracts.BusinessCalendar
import net.corda.core.crypto.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.i2p.crypto.eddsa.EdDSAPublicKey
import net.corda.core.node.services.IdentityService
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
@ -23,9 +24,25 @@ import java.time.LocalDateTime
/**
* Utilities and serialisers for working with JSON representations of basic types. This adds Jackson support for
* the java.time API, some core types, and Kotlin data classes.
*
* TODO: This does not belong in node. It should be moved to the client module or a dedicated webserver module.
*/
object JsonSupport {
val javaTimeModule : Module by lazy {
interface PartyObjectMapper {
fun partyFromName(partyName: String): Party?
}
class RpcObjectMapper(val rpc: CordaRPCOps) : PartyObjectMapper, ObjectMapper() {
override fun partyFromName(partyName: String): Party? = rpc.partyFromName(partyName)
}
class IdentityObjectMapper(val identityService: IdentityService) : PartyObjectMapper, ObjectMapper(){
override fun partyFromName(partyName: String) = identityService.partyFromName(partyName)
}
class NoPartyObjectMapper: PartyObjectMapper, ObjectMapper() {
override fun partyFromName(partyName: String) = throw UnsupportedOperationException()
}
val javaTimeModule: Module by lazy {
SimpleModule("java.time").apply {
addSerializer(LocalDate::class.java, ToStringSerializer)
addDeserializer(LocalDate::class.java, LocalDateDeserializer)
@ -34,7 +51,7 @@ object JsonSupport {
}
}
val cordaModule : Module by lazy {
val cordaModule: Module by lazy {
SimpleModule("core").apply {
addSerializer(Party::class.java, PartySerializer)
addDeserializer(Party::class.java, PartyDeserializer)
@ -61,18 +78,24 @@ object JsonSupport {
}
}
fun createDefaultMapper(identities: IdentityService): ObjectMapper =
ServiceHubObjectMapper(identities).apply {
enable(SerializationFeature.INDENT_OUTPUT)
enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
/* Mapper requiring RPC support to deserialise parties from names */
fun createDefaultMapper(rpc: CordaRPCOps): ObjectMapper = configureMapper(RpcObjectMapper(rpc))
registerModule(javaTimeModule)
registerModule(cordaModule)
registerModule(KotlinModule())
}
/* For testing or situations where deserialising parties is not required */
fun createNonRpcMapper(): ObjectMapper = configureMapper(NoPartyObjectMapper())
class ServiceHubObjectMapper(val identities: IdentityService) : ObjectMapper()
/* For testing with an in memory identity service */
fun createInMemoryMapper(identityService: IdentityService) = configureMapper(IdentityObjectMapper(identityService))
private fun configureMapper(mapper: ObjectMapper): ObjectMapper = mapper.apply {
enable(SerializationFeature.INDENT_OUTPUT)
enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
registerModule(javaTimeModule)
registerModule(cordaModule)
registerModule(KotlinModule())
}
object ToStringSerializer : JsonSerializer<Any>() {
override fun serialize(obj: Any, generator: JsonGenerator, provider: SerializerProvider) {
@ -108,9 +131,10 @@ object JsonSupport {
if (parser.currentToken == JsonToken.FIELD_NAME) {
parser.nextToken()
}
val mapper = parser.codec as ServiceHubObjectMapper
val mapper = parser.codec as PartyObjectMapper
// TODO this needs to use some industry identifier(s) not just these human readable names
return mapper.identities.partyFromName(parser.text) ?: throw JsonParseException(parser, "Could not find a Party with name: ${parser.text}")
return mapper.partyFromName(parser.text) ?: throw JsonParseException(parser, "Could not find a Party with name ${parser.text}")
}
}

View File

@ -0,0 +1,179 @@
package net.corda.node.webserver
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.webserver.internal.APIServerImpl
import net.corda.node.webserver.servlets.AttachmentDownloadServlet
import net.corda.node.webserver.servlets.DataUploadServlet
import net.corda.node.webserver.servlets.ObjectMapperConfig
import net.corda.node.webserver.servlets.ResponseFilter
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer
import java.lang.reflect.InvocationTargetException
import java.net.InetAddress
import java.util.*
// TODO: Split into a separate module under client that packages into WAR formats.
class WebServer(val config: FullNodeConfiguration) {
private companion object {
val log = loggerFor<WebServer>()
val retryDelay = 1000L // Milliseconds
}
val address = config.webAddress
private lateinit var server: Server
fun start() {
printBasicNodeInfo("Starting as webserver: ${config.webAddress}")
server = initWebServer(retryConnectLocalRpc())
}
fun run() {
while (server.isRunning) {
Thread.sleep(100) // TODO: Redesign
}
}
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
val handlerCollection = HandlerCollection()
// TODO: Move back into the node itself.
// Export JMX monitoring statistics and data over REST/JSON.
if (config.exportJMXto.split(',').contains("http")) {
val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator"))
val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
if (warpath != null) {
handlerCollection.addHandler(WebAppContext().apply {
// Find the jolokia WAR file on the classpath.
contextPath = "/monitoring/json"
setInitParameter("mimeType", "application/json")
war = warpath
})
} else {
log.warn("Unable to locate Jolokia WAR on classpath")
}
}
// API, data upload and download to services (attachments, rates oracles etc)
handlerCollection.addHandler(buildServletContextHandler(localRpc))
val server = Server()
val connector = if (config.useHTTPS) {
val httpsConfiguration = HttpConfiguration()
httpsConfiguration.outputBufferSize = 32768
httpsConfiguration.addCustomizer(SecureRequestCustomizer())
val sslContextFactory = SslContextFactory()
sslContextFactory.keyStorePath = config.keyStoreFile.toString()
sslContextFactory.setKeyStorePassword(config.keyStorePassword)
sslContextFactory.setKeyManagerPassword(config.keyStorePassword)
sslContextFactory.setTrustStorePath(config.trustStoreFile.toString())
sslContextFactory.setTrustStorePassword(config.trustStorePassword)
sslContextFactory.setExcludeProtocols("SSL.*", "TLSv1", "TLSv1.1")
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
sslConnector.port = address.port
sslConnector
} else {
val httpConfiguration = HttpConfiguration()
httpConfiguration.outputBufferSize = 32768
val httpConnector = ServerConnector(server, HttpConnectionFactory(httpConfiguration))
log.info("Starting webserver on address $address")
httpConnector.port = address.port
httpConnector
}
server.connectors = arrayOf<Connector>(connector)
server.handler = handlerCollection
//runOnStop += Runnable { server.stop() }
server.start()
log.info("Server started")
log.info("Embedded web server is listening on", "http://${InetAddress.getLocalHost().hostAddress}:${connector.port}/")
return server
}
private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
setAttribute("rpc", localRpc)
addServlet(DataUploadServlet::class.java, "/upload/*")
addServlet(AttachmentDownloadServlet::class.java, "/attachments/*")
val resourceConfig = ResourceConfig()
resourceConfig.register(ObjectMapperConfig(localRpc))
resourceConfig.register(ResponseFilter())
resourceConfig.register(APIServerImpl(localRpc))
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
for (webapi in webAPIsOnClasspath) {
log.info("Add plugin web API from attachment $webapi")
val customAPI = try {
webapi.apply(localRpc)
} catch (ex: InvocationTargetException) {
log.error("Constructor $webapi threw an error: ", ex.targetException)
continue
}
resourceConfig.register(customAPI)
}
val staticDirMaps = pluginRegistries.map { x -> x.staticServeDirs }
val staticDirs = staticDirMaps.flatMap { it.keys }.zip(staticDirMaps.flatMap { it.values })
staticDirs.forEach {
val staticDir = ServletHolder(DefaultServlet::class.java)
staticDir.setInitParameter("resourceBase", it.second)
staticDir.setInitParameter("dirAllowed", "true")
staticDir.setInitParameter("pathInfoOnly", "true")
addServlet(staticDir, "/web/${it.first}/*")
}
// Give the app a slightly better name in JMX rather than a randomly generated one and enable JMX
resourceConfig.addProperties(mapOf(ServerProperties.APPLICATION_NAME to "node.api",
ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED to "true"))
val container = ServletContainer(resourceConfig)
val jerseyServlet = ServletHolder(container)
addServlet(jerseyServlet, "/api/*")
jerseyServlet.initOrder = 0 // Initialise at server start
}
}
private fun retryConnectLocalRpc(): CordaRPCOps {
while (true) {
try {
return connectLocalRpcAsNodeUser()
} catch (e: ActiveMQNotConnectedException) {
log.debug("Could not connect to ${config.artemisAddress} due to exception: ", e)
Thread.sleep(retryDelay)
}
}
}
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
log.info("Connecting to node at ${config.artemisAddress} as node user")
val client = CordaRPCClient(config.artemisAddress, config)
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return client.proxy()
}
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
val pluginRegistries: List<CordaPluginRegistry> by lazy {
ServiceLoader.load(CordaPluginRegistry::class.java).toList()
}
}

View File

@ -0,0 +1,43 @@
package net.corda.node.webserver.api
import net.corda.core.node.NodeInfo
import java.time.LocalDateTime
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
/**
* Top level interface to external interaction with the distributed ledger.
*
* Wherever a list is returned by a fetchXXX method that corresponds with an input list, that output list will have optional elements
* where a null indicates "missing" and the elements returned will be in the order corresponding with the input list.
*
*/
@Path("")
interface APIServer {
/**
* Report current UTC time as understood by the platform.
*/
@GET
@Path("servertime")
@Produces(MediaType.APPLICATION_JSON)
fun serverTime(): LocalDateTime
/**
* Report whether this node is started up or not.
*/
@GET
@Path("status")
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this node's configuration and identities.
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
}

View File

@ -1,4 +1,4 @@
package net.corda.node.api
package net.corda.node.webserver.api
/**
* Extremely rudimentary query language which should most likely be replaced with a product.

View File

@ -0,0 +1,23 @@
package net.corda.node.webserver.internal
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.webserver.api.*
import java.time.LocalDateTime
import java.time.ZoneId
import javax.ws.rs.core.Response
class APIServerImpl(val rpcOps: CordaRPCOps) : APIServer {
override fun serverTime(): LocalDateTime {
return LocalDateTime.ofInstant(rpcOps.currentNodeTime(), ZoneId.of("UTC"))
}
/**
* This endpoint is for polling if the webserver is serving. It will always return 200.
*/
override fun status(): Response {
return Response.ok("started").build()
}
override fun info() = rpcOps.nodeIdentity()
}

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.StorageService

View File

@ -1,5 +1,6 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node
import net.corda.node.services.api.AcceptsFileUpload
@ -12,26 +13,19 @@ import javax.servlet.http.HttpServletResponse
/**
* Accepts binary streams, finds the right [AcceptsFileUpload] implementor and hands the stream off to it.
*/
class DataUploadServlet : HttpServlet() {
class DataUploadServlet: HttpServlet() {
private val log = loggerFor<DataUploadServlet>()
override fun doPost(req: HttpServletRequest, resp: HttpServletResponse) {
val node = servletContext.getAttribute("node") as Node
@Suppress("DEPRECATION") // Bogus warning due to superclass static method being deprecated.
val isMultipart = ServletFileUpload.isMultipartContent(req)
val rpc = servletContext.getAttribute("rpc") as CordaRPCOps
if (!isMultipart) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "This end point is for data uploads only.")
return
}
val acceptor: AcceptsFileUpload? = findAcceptor(node, req)
if (acceptor == null) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Got a file upload request for an unknown data type")
return
}
val upload = ServletFileUpload()
val iterator = upload.getItemIterator(req)
val messages = ArrayList<String>()
@ -43,18 +37,15 @@ class DataUploadServlet : HttpServlet() {
while (iterator.hasNext()) {
val item = iterator.next()
if (item.name != null && !acceptor.acceptableFileExtensions.any { item.name.endsWith(it) }) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"${item.name}: Must be have a filename ending in one of: ${acceptor.acceptableFileExtensions}")
return
}
log.info("Receiving ${item.name}")
item.openStream().use {
val message = acceptor.upload(it)
log.info("${item.name} successfully accepted: $message")
messages += message
try {
val dataType = req.pathInfo.substring(1).substringBefore('/')
messages += rpc.uploadFile(dataType, item.name, item.openStream())
log.info("${item.name} successfully accepted: ${messages.last()}")
} catch(e: RuntimeException) {
println(e)
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Got a file upload request for an unknown data type")
}
}
@ -62,8 +53,4 @@ class DataUploadServlet : HttpServlet() {
val writer = resp.writer
messages.forEach { writer.println(it) }
}
private fun findAcceptor(node: Node, req: HttpServletRequest): AcceptsFileUpload? {
return node.servicesThatAcceptUploads.firstOrNull { req.pathInfo.substring(1).substringBefore('/') == it.dataTypePrefix }
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.ServiceHub
import net.corda.node.utilities.JsonSupport
import javax.ws.rs.ext.ContextResolver
@ -11,7 +12,7 @@ import javax.ws.rs.ext.Provider
* and to organise serializers / deserializers for java.time.* classes as necessary.
*/
@Provider
class Config(val services: ServiceHub) : ContextResolver<ObjectMapper> {
val defaultObjectMapper = JsonSupport.createDefaultMapper(services.identityService)
class ObjectMapperConfig(rpc: CordaRPCOps) : ContextResolver<ObjectMapper> {
val defaultObjectMapper = JsonSupport.createDefaultMapper(rpc)
override fun getContext(type: Class<*>) = defaultObjectMapper
}

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import javax.ws.rs.container.ContainerRequestContext
import javax.ws.rs.container.ContainerResponseContext

View File

@ -17,7 +17,8 @@ class ArgsParserTest {
baseDirectory = workingDirectory,
configFile = workingDirectory / "node.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -28,7 +29,8 @@ class ArgsParserTest {
baseDirectory = expectedBaseDir,
configFile = expectedBaseDir / "node.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -39,7 +41,8 @@ class ArgsParserTest {
baseDirectory = baseDirectory,
configFile = baseDirectory / "node.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -49,7 +52,8 @@ class ArgsParserTest {
baseDirectory = workingDirectory,
configFile = workingDirectory / "different.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -60,7 +64,19 @@ class ArgsParserTest {
baseDirectory = workingDirectory,
configFile = configFile,
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
fun `just webserver `() {
val cmdLineOptions = parser.parse("--webserver")
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = workingDirectory,
configFile = workingDirectory / "node.conf",
help = false,
logToConsole = false,
isWebserver = true))
}
@Test

View File

@ -15,7 +15,7 @@ import kotlin.test.assertEquals
class JsonSupportTest {
companion object {
val mapper = JsonSupport.createDefaultMapper(MockIdentityService(mutableListOf()))
val mapper = JsonSupport.createNonRpcMapper()
}
@Property

View File

@ -14,11 +14,14 @@ import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.node.MockNetwork
import net.i2p.crypto.eddsa.KeyPairGenerator
import org.junit.Before
import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.math.BigInteger
import java.security.KeyPair
import java.security.KeyPairGeneratorSpi
import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import kotlin.test.assertEquals
@ -84,8 +87,10 @@ class AttachmentTests {
// Make a node that doesn't do sanity checking at load time.
val n0 = network.createNode(null, -1, object : MockNetwork.Factory {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, keyPair) {
advertisedServices: Set<ServiceInfo>, id: Int,
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun start(): MockNetwork.MockNode {
super.start()
(storage.attachments as NodeAttachmentService).checkAttachmentsOnLoad = false

View File

@ -20,7 +20,6 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.core.utilities.LogHelper
import net.corda.core.utilities.TEST_TX_TIME
import net.corda.flows.TwoPartyTradeFlow.Buyer
@ -43,6 +42,7 @@ import org.junit.Test
import rx.Observable
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.math.BigInteger
import java.security.KeyPair
import java.util.*
import java.util.concurrent.Future
@ -60,9 +60,6 @@ import kotlin.test.assertTrue
*/
class TwoPartyTradeFlowTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var aliceNode: MockNetwork.MockNode
lateinit var bobNode: MockNetwork.MockNode
@Before
fun before() {
@ -84,9 +81,9 @@ class TwoPartyTradeFlowTests {
net = MockNetwork(false, true)
ledger {
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = net.createPartyNode(notaryNode.info.address, BOB.name, BOB_KEY)
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
val aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name)
val bobNode = net.createPartyNode(notaryNode.info.address, BOB.name)
val aliceKey = aliceNode.services.legalIdentityKey
val notaryKey = notaryNode.services.notaryIdentityKey
@ -99,9 +96,10 @@ class TwoPartyTradeFlowTests {
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second
insertFakeTransactions(alicesFakePaper, aliceNode, aliceKey, notaryKey)
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey, notaryKey)
val (bobStateMachine, aliceResult) = runBuyerAndSeller("alice's paper".outputStateAndRef())
val (bobStateMachine, aliceResult) = runBuyerAndSeller(notaryNode, aliceNode, bobNode,
"alice's paper".outputStateAndRef())
// TODO: Verify that the result was inserted into the transaction database.
// assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id])
@ -124,9 +122,9 @@ class TwoPartyTradeFlowTests {
@Test
fun `shutdown and restore`() {
ledger {
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = net.createPartyNode(notaryNode.info.address, BOB.name, BOB_KEY)
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
val aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name)
var bobNode = net.createPartyNode(notaryNode.info.address, BOB.name)
aliceNode.disableDBCloseOnStop()
bobNode.disableDBCloseOnStop()
val aliceKey = aliceNode.services.legalIdentityKey
@ -142,8 +140,8 @@ class TwoPartyTradeFlowTests {
}
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null, notaryNode.info.notaryIdentity).second
insertFakeTransactions(alicesFakePaper, aliceNode, aliceKey, notaryKey)
val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerResult
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey, notaryKey)
val aliceFuture = runBuyerAndSeller(notaryNode, aliceNode, bobNode, "alice's paper".outputStateAndRef()).sellerResult
// Everything is on this thread so we can now step through the flow one step at a time.
// Seller Alice already sent a message to Buyer Bob. Pump once:
@ -179,10 +177,11 @@ class TwoPartyTradeFlowTests {
// that Bob was waiting on before the reboot occurred.
bobNode = net.createNode(networkMapAddr, bobAddr.id, object : MockNetwork.Factory {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, bobAddr.id, BOB_KEY)
advertisedServices: Set<ServiceInfo>, id: Int, overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, bobAddr.id, overrideServices, entropyRoot)
}
}, true, BOB.name, BOB_KEY)
}, true, BOB.name)
// Find the future representing the result of this state machine again.
val bobFuture = bobNode.smm.findStateMachines(Buyer::class.java).single().second
@ -213,12 +212,16 @@ class TwoPartyTradeFlowTests {
// Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order
// of gets and puts.
private fun makeNodeWithTracking(networkMapAddr: SingleMessageRecipient?, name: String, keyPair: KeyPair): MockNetwork.MockNode {
private fun makeNodeWithTracking(networkMapAddr: SingleMessageRecipient?, name: String, overrideServices: Map<ServiceInfo, KeyPair>? = null): MockNetwork.MockNode {
// Create a node in the mock network ...
return net.createNode(networkMapAddr, -1, object : MockNetwork.Factory {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, keyPair) {
override fun create(config: NodeConfiguration,
network: MockNetwork,
networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int,
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
// That constructs the storage service object in a customised way ...
override fun constructStorageService(
attachments: NodeAttachmentService,
@ -229,14 +232,14 @@ class TwoPartyTradeFlowTests {
}
}
}
}, true, name, keyPair)
}, true, name, overrideServices)
}
@Test
fun `check dependencies of sale asset are resolved`() {
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name, BOB_KEY)
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name)
val aliceKey = aliceNode.services.legalIdentityKey
ledger(aliceNode.services) {
@ -250,15 +253,18 @@ class TwoPartyTradeFlowTests {
}
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public.composite, notaryNode.info.notaryIdentity).second
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode)
val extraKey = bobNode.keyManagement.freshKey()
val bobsFakeCash = fillUpForBuyer(false, extraKey.public.composite,
DUMMY_CASH_ISSUER.party,
notaryNode.info.notaryIdentity).second
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bobNode.services.legalIdentityKey, extraKey)
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, aliceKey)
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey)
net.runNetwork() // Clear network map registration messages
runBuyerAndSeller("alice's paper".outputStateAndRef())
runBuyerAndSeller(notaryNode, aliceNode, bobNode, "alice's paper".outputStateAndRef())
net.runNetwork()
@ -326,9 +332,9 @@ class TwoPartyTradeFlowTests {
@Test
fun `track() works`() {
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name, BOB_KEY)
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name)
val aliceKey = aliceNode.services.legalIdentityKey
ledger(aliceNode.services) {
@ -342,11 +348,13 @@ class TwoPartyTradeFlowTests {
}
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public.composite, notaryNode.info.notaryIdentity).second
insertFakeTransactions(bobsFakeCash, bobNode)
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public.composite,
DUMMY_CASH_ISSUER.party,
notaryNode.info.notaryIdentity).second
insertFakeTransactions(bobsFakeCash, bobNode, notaryNode)
val alicesFakePaper = fillUpForSeller(false, aliceNode.info.legalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID, notaryNode.info.notaryIdentity).second
insertFakeTransactions(alicesFakePaper, aliceNode, aliceKey)
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey)
net.runNetwork() // Clear network map registration messages
@ -356,7 +364,8 @@ class TwoPartyTradeFlowTests {
val aliceTxMappings = databaseTransaction(aliceNode.database) {
aliceMappingsStorage.track().second
}
val aliceSmId = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerId
val aliceSmId = runBuyerAndSeller(notaryNode, aliceNode, bobNode,
"alice's paper".outputStateAndRef()).sellerId
net.runNetwork()
@ -412,12 +421,15 @@ class TwoPartyTradeFlowTests {
val sellerId: StateMachineRunId
)
private fun runBuyerAndSeller(assetToSell: StateAndRef<OwnableState>): RunResult {
val buyerFuture = bobNode.initiateSingleShotFlow(Seller::class) { otherParty ->
private fun runBuyerAndSeller(notaryNode: MockNetwork.MockNode,
sellerNode: MockNetwork.MockNode,
buyerNode: MockNetwork.MockNode,
assetToSell: StateAndRef<OwnableState>): RunResult {
val buyerFuture = buyerNode.initiateSingleShotFlow(Seller::class) { otherParty ->
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
}.map { it.stateMachine }
val seller = Seller(bobNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY)
val sellerResultFuture = aliceNode.services.startFlow(seller).resultFuture
val seller = Seller(buyerNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, sellerNode.services.legalIdentityKey)
val sellerResultFuture = sellerNode.services.startFlow(seller).resultFuture
return RunResult(buyerFuture, sellerResultFuture, seller.stateMachine.id)
}
@ -426,23 +438,24 @@ class TwoPartyTradeFlowTests {
aliceError: Boolean,
expectedMessageSubstring: String
) {
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = net.createPartyNode(notaryNode.info.address, BOB.name, BOB_KEY)
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
val aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name)
val bobNode = net.createPartyNode(notaryNode.info.address, BOB.name)
val aliceKey = aliceNode.services.legalIdentityKey
val bobKey = bobNode.services.legalIdentityKey
val issuer = MEGA_CORP.ref(1, 2, 3)
val bobsBadCash = fillUpForBuyer(bobError, bobKey.public.composite, notaryNode.info.notaryIdentity).second
val bobsBadCash = fillUpForBuyer(bobError, bobKey.public.composite, DUMMY_CASH_ISSUER.party,
notaryNode.info.notaryIdentity).second
val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.info.legalIdentity.owningKey,
1200.DOLLARS `issued by` issuer, null, notaryNode.info.notaryIdentity).second
insertFakeTransactions(bobsBadCash, bobNode, bobKey)
insertFakeTransactions(alicesFakePaper, aliceNode, aliceKey)
insertFakeTransactions(bobsBadCash, bobNode, notaryNode, bobKey)
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, aliceKey)
net.runNetwork() // Clear network map registration messages
val (bobStateMachine, aliceResult) = runBuyerAndSeller("alice's paper".outputStateAndRef())
val (bobStateMachine, aliceResult) = runBuyerAndSeller(notaryNode, aliceNode, bobNode, "alice's paper".outputStateAndRef())
net.runNetwork()
@ -461,9 +474,10 @@ class TwoPartyTradeFlowTests {
private fun insertFakeTransactions(
wtxToSign: List<WireTransaction>,
node: AbstractNode,
notaryNode: MockNetwork.MockNode,
vararg extraKeys: KeyPair): Map<SecureHash, SignedTransaction> {
val signed: List<SignedTransaction> = signAll(wtxToSign, extraKeys.toList() + notaryNode.services.notaryIdentityKey + DUMMY_CASH_ISSUER_KEY)
return databaseTransaction(node.database) {
val signed: List<SignedTransaction> = signAll(wtxToSign, extraKeys.toList() + DUMMY_CASH_ISSUER_KEY)
node.services.recordTransactions(signed)
val validatedTransactions = node.services.storageService.validatedTransactions
if (validatedTransactions is RecordingTransactionStorage) {
@ -475,20 +489,21 @@ class TwoPartyTradeFlowTests {
private fun LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>.fillUpForBuyer(
withError: Boolean,
owner: CompositeKey = BOB_PUBKEY,
owner: CompositeKey,
issuer: Party,
notary: Party): Pair<Vault, List<WireTransaction>> {
val issuer = DUMMY_CASH_ISSUER
val interimOwnerKey = MEGA_CORP_PUBKEY
// Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she
// wants to sell to Bob.
val eb1 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
// Issued money to itself.
output("elbonian money 1", notary = notary) { 800.DOLLARS.CASH `issued by` issuer `owned by` MEGA_CORP_PUBKEY }
output("elbonian money 2", notary = notary) { 1000.DOLLARS.CASH `issued by` issuer `owned by` MEGA_CORP_PUBKEY }
output("elbonian money 1", notary = notary) { 800.DOLLARS.CASH `issued by` issuer `owned by` interimOwnerKey }
output("elbonian money 2", notary = notary) { 1000.DOLLARS.CASH `issued by` issuer `owned by` interimOwnerKey }
if (!withError) {
command(DUMMY_CASH_ISSUER_KEY.public.composite) { Cash.Commands.Issue() }
command(issuer.owningKey) { Cash.Commands.Issue() }
} else {
// Put a broken command on so at least a signature is created
command(DUMMY_CASH_ISSUER_KEY.public.composite) { Cash.Commands.Move() }
command(issuer.owningKey) { Cash.Commands.Move() }
}
timestamp(TEST_TX_TIME)
if (withError) {
@ -502,15 +517,15 @@ class TwoPartyTradeFlowTests {
val bc1 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
input("elbonian money 1")
output("bob cash 1", notary = notary) { 800.DOLLARS.CASH `issued by` issuer `owned by` owner }
command(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
command(interimOwnerKey) { Cash.Commands.Move() }
this.verifies()
}
val bc2 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
input("elbonian money 2")
output("bob cash 2", notary = notary) { 300.DOLLARS.CASH `issued by` issuer `owned by` owner }
output(notary = notary) { 700.DOLLARS.CASH `issued by` issuer `owned by` MEGA_CORP_PUBKEY } // Change output.
command(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
output(notary = notary) { 700.DOLLARS.CASH `issued by` issuer `owned by` interimOwnerKey } // Change output.
command(interimOwnerKey) { Cash.Commands.Move() }
this.verifies()
}

View File

@ -1,7 +1,5 @@
package net.corda.node.services
import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.network.NetworkMapService
@ -9,6 +7,7 @@ import net.corda.node.utilities.databaseTransaction
import net.corda.testing.expect
import net.corda.testing.node.MockNetwork
import org.junit.Test
import java.math.BigInteger
import kotlin.test.assertEquals
class InMemoryNetworkMapCacheTest {
@ -24,19 +23,23 @@ class InMemoryNetworkMapCacheTest {
@Test
fun `key collision`() {
val keyPair = generateKeyPair()
val nodeA = network.createNode(null, -1, MockNetwork.DefaultFactory, true, "Node A", keyPair, ServiceInfo(NetworkMapService.type))
val nodeB = network.createNode(null, -1, MockNetwork.DefaultFactory, true, "Node B", keyPair, ServiceInfo(NetworkMapService.type))
val entropy = BigInteger.valueOf(24012017L)
val nodeA = network.createNode(null, -1, MockNetwork.DefaultFactory, true, "Node A", null, entropy, ServiceInfo(NetworkMapService.type))
val nodeB = network.createNode(null, -1, MockNetwork.DefaultFactory, true, "Node B", null, entropy, ServiceInfo(NetworkMapService.type))
assertEquals(nodeA.info.legalIdentity, nodeB.info.legalIdentity)
// Node A currently knows only about itself, so this returns node A
assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(keyPair.public.composite), nodeA.info)
assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info)
databaseTransaction(nodeA.database) {
nodeA.netMapCache.addNode(nodeB.info)
}
// Now both nodes match, so it throws an error
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByLegalIdentityKey(keyPair.public.composite)
nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey)
}
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByLegalIdentityKey(nodeB.info.legalIdentity.owningKey)
}
}
}

View File

@ -11,18 +11,16 @@ import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.NotaryChangeFlow.Instigator
import net.corda.flows.StateReplacementException
import net.corda.flows.StateReplacementRefused
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import java.time.Instant
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class NotaryChangeTests {
@ -37,7 +35,6 @@ class NotaryChangeTests {
net = MockNetwork()
oldNotaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
keyPair = DUMMY_NOTARY_KEY,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)))
clientNodeA = net.createNode(networkMapAddress = oldNotaryNode.info.address)
clientNodeB = net.createNode(networkMapAddress = oldNotaryNode.info.address)
@ -84,8 +81,9 @@ class NotaryChangeTests {
net.runNetwork()
val ex = assertFailsWith(StateReplacementException::class) { future.resultFuture.getOrThrow() }
assertThat(ex.error).isInstanceOf(StateReplacementRefused::class.java)
assertThatExceptionOfType(StateReplacementException::class.java).isThrownBy {
future.resultFuture.getOrThrow()
}
}
@Test

View File

@ -23,6 +23,7 @@ import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Test
import java.security.KeyPair
import java.time.Instant
import java.util.*
import kotlin.test.assertEquals
@ -32,14 +33,15 @@ class NotaryServiceTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
lateinit var clientKeyPair: KeyPair
@Before fun setup() {
net = MockNetwork()
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
keyPair = DUMMY_NOTARY_KEY,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)))
clientNode = net.createNode(networkMapAddress = notaryNode.info.address, keyPair = MINI_CORP_KEY)
clientNode = net.createNode(networkMapAddress = notaryNode.info.address)
clientKeyPair = clientNode.keyManagement.toKeyPair(clientNode.info.legalIdentity.owningKey.keys.single())
net.runNetwork() // Clear network map registration messages
}
@ -48,7 +50,7 @@ class NotaryServiceTests {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.setTime(Instant.now(), 30.seconds)
tx.signWith(clientNode.keyPair!!)
tx.signWith(clientKeyPair)
tx.toSignedTransaction(false)
}
@ -61,7 +63,7 @@ class NotaryServiceTests {
val stx = run {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.signWith(clientNode.keyPair!!)
tx.signWith(clientKeyPair)
tx.toSignedTransaction(false)
}
@ -75,7 +77,7 @@ class NotaryServiceTests {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.setTime(Instant.now().plusSeconds(3600), 30.seconds)
tx.signWith(clientNode.keyPair!!)
tx.signWith(clientKeyPair)
tx.toSignedTransaction(false)
}
@ -89,7 +91,7 @@ class NotaryServiceTests {
val stx = run {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.signWith(clientNode.keyPair!!)
tx.signWith(clientKeyPair)
tx.toSignedTransaction(false)
}
@ -107,13 +109,13 @@ class NotaryServiceTests {
val inputState = issueState(clientNode)
val stx = run {
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.signWith(clientNode.keyPair!!)
tx.signWith(clientKeyPair)
tx.toSignedTransaction(false)
}
val stx2 = run {
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.addInputState(issueState(clientNode))
tx.signWith(clientNode.keyPair!!)
tx.signWith(clientKeyPair)
tx.toSignedTransaction(false)
}

View File

@ -14,7 +14,9 @@ import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.math.BigInteger
import java.security.KeyPair
import java.security.KeyPairGeneratorSpi
/**
* This class mirrors [InMemoryNetworkMapServiceTest] but switches in a [PersistentNetworkMapService] and
@ -55,8 +57,10 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
private object NodeFactory : MockNetwork.Factory {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, keyPair) {
advertisedServices: Set<ServiceInfo>, id: Int,
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun makeNetworkMapService() {
inNodeNetworkMapService = SwizzleNetworkMapService(services)

View File

@ -98,7 +98,6 @@ class ScheduledFlowTests {
net = MockNetwork(threadPerNode = true)
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
keyPair = DUMMY_NOTARY_KEY,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type)))
nodeA = net.createNode(notaryNode.info.address, start = false)
nodeB = net.createNode(notaryNode.info.address, start = false)

View File

@ -34,10 +34,9 @@ class ValidatingNotaryServiceTests {
net = MockNetwork()
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
keyPair = DUMMY_NOTARY_KEY,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type))
)
clientNode = net.createNode(networkMapAddress = notaryNode.info.address, keyPair = MINI_CORP_KEY)
clientNode = net.createNode(networkMapAddress = notaryNode.info.address)
net.runNetwork() // Clear network map registration messages
}
@ -45,7 +44,8 @@ class ValidatingNotaryServiceTests {
val stx = run {
val inputState = issueInvalidState(clientNode, notaryNode.info.notaryIdentity)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
tx.signWith(clientNode.keyPair!!)
val keyPair = clientNode.services.keyManagementService.toKeyPair(clientNode.info.legalIdentity.owningKey.keys.single())
tx.signWith(keyPair)
tx.toSignedTransaction(false)
}
@ -62,7 +62,8 @@ class ValidatingNotaryServiceTests {
val command = Command(DummyContract.Commands.Move(), expectedMissingKey)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState, command)
tx.signWith(clientNode.keyPair!!)
val keyPair = clientNode.services.keyManagementService.toKeyPair(clientNode.info.legalIdentity.owningKey.keys.single())
tx.signWith(keyPair)
tx.toSignedTransaction(false)
}

View File

@ -13,6 +13,8 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.map
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.random63BitValue
import net.corda.core.rootCause
import net.corda.core.serialization.OpaqueBytes
@ -21,6 +23,7 @@ import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.NotaryFlow
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.expect
import net.corda.testing.expectEvents
@ -31,7 +34,8 @@ import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.sequence
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -55,10 +59,12 @@ class StateMachineManagerTests {
node1 = nodes.first
node2 = nodes.second
val notaryKeyPair = generateKeyPair()
val notaryService = ServiceInfo(ValidatingNotaryService.type, "notary-service-2000")
val overrideServices = mapOf(Pair(notaryService, notaryKeyPair))
// Note that these notaries don't operate correctly as they don't share their state. They are only used for testing
// service addressing.
notary1 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000")
notary2 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000")
notary1 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, overrideServices = overrideServices, serviceName = "notary-service-2000")
notary2 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, overrideServices = overrideServices, serviceName = "notary-service-2000")
net.messagingNetwork.receivedMessages.toSessionTransfers().forEach { sessionTransfers += it }
net.runNetwork()
@ -244,14 +250,14 @@ class StateMachineManagerTests {
assertSessionTransfers(node2,
node1 sent sessionInit(SendFlow::class, payload) to node2,
node2 sent sessionConfirm to node1,
node1 sent sessionEnd to node2
node1 sent sessionEnd() to node2
//There's no session end from the other flows as they're manually suspended
)
assertSessionTransfers(node3,
node1 sent sessionInit(SendFlow::class, payload) to node3,
node3 sent sessionConfirm to node1,
node1 sent sessionEnd to node3
node1 sent sessionEnd() to node3
//There's no session end from the other flows as they're manually suspended
)
@ -278,14 +284,14 @@ class StateMachineManagerTests {
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionData(node2Payload) to node1,
node2 sent sessionEnd to node1
node2 sent sessionEnd() to node1
)
assertSessionTransfers(node3,
node1 sent sessionInit(ReceiveFlow::class) to node3,
node3 sent sessionConfirm to node1,
node3 sent sessionData(node3Payload) to node1,
node3 sent sessionEnd to node1
node3 sent sessionEnd() to node1
)
}
@ -301,7 +307,7 @@ class StateMachineManagerTests {
node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2,
node2 sent sessionData(21L) to node1,
node1 sent sessionEnd to node2
node1 sent sessionEnd() to node2
)
}
@ -321,6 +327,8 @@ class StateMachineManagerTests {
net.runNetwork()
}
val endpoint = net.messagingNetwork.endpoint(notary1.net.myAddress as InMemoryMessagingNetwork.PeerHandle)!!
val party1Info = notary1.services.networkMapCache.getPartyInfo(notary1.info.notaryIdentity)!!
assert(party1Info is PartyInfo.Service)
val notary1Address: MessageRecipients = endpoint.getAddressOfParty(notary1.services.networkMapCache.getPartyInfo(notary1.info.notaryIdentity)!!)
assert(notary1Address is InMemoryMessagingNetwork.ServiceHandle)
assertEquals(notary1Address, endpoint.getAddressOfParty(notary2.services.networkMapCache.getPartyInfo(notary2.info.notaryIdentity)!!))
@ -361,18 +369,119 @@ class StateMachineManagerTests {
}
@Test
fun `exception thrown on other side`() {
val erroringFiber = node2.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow }.map { it.stateMachine as FlowStateMachineImpl }
fun `FlowException thrown on other side`() {
val erroringFlowFuture = node2.initiateSingleShotFlow(ReceiveFlow::class) {
ExceptionFlow { MyFlowException("Nothing useful") }
}
val receivingFiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)) as FlowStateMachineImpl
net.runNetwork()
assertThatThrownBy { receivingFiber.resultFuture.getOrThrow() }.isInstanceOf(FlowException::class.java)
assertThatExceptionOfType(MyFlowException::class.java)
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful")
.withStackTraceContaining("ReceiveFlow") // Make sure the stack trace is that of the receiving flow
databaseTransaction(node2.database) {
assertThat(node2.checkpointStorage.checkpoints()).isEmpty()
}
val errorFlow = erroringFlowFuture.getOrThrow()
assertThat(receivingFiber.isTerminated).isTrue()
assertThat(erroringFiber.getOrThrow().isTerminated).isTrue()
assertThat((errorFlow.stateMachine as FlowStateMachineImpl).isTerminated).isTrue()
assertSessionTransfers(
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionEnd to node1
node2 sent sessionEnd(errorFlow.exceptionThrown) to node1
)
// TODO see StateMachineManager.endAllFiberSessions
// // Make sure the original stack trace isn't sent down the wire
// assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty()
}
@Test
fun `FlowException propagated in invocation chain`() {
val node3 = net.createNode(node1.info.address)
net.runNetwork()
node3.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } }
node2.initiateSingleShotFlow(ReceiveFlow::class) { ReceiveFlow(node3.info.legalIdentity) }
val receivingFiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity))
net.runNetwork()
assertThatExceptionOfType(MyFlowException::class.java)
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Chain")
}
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
sendAndReceive<Any>(otherParty, payload)
}
}
@Test
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
val node3 = net.createNode(node1.info.address)
net.runNetwork()
// Node 2 will send its payload and then block waiting for the receive from node 1. Meanwhile node 1 will move
// onto node 3 which will throw the exception
val node2Fiber = node2
.initiateSingleShotFlow(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") }
.map { it.stateMachine }
node3.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } }
val node1Fiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity)) as FlowStateMachineImpl
net.runNetwork()
// Node 1 will terminate with the error it received from node 3 but it won't propagate that to node 2 (as it's
// not relevant to it) but it will end its session with it
assertThatExceptionOfType(MyFlowException::class.java).isThrownBy {
node1Fiber.resultFuture.getOrThrow()
}
val node2ResultFuture = node2Fiber.getOrThrow().resultFuture
assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
node2ResultFuture.getOrThrow()
}
assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionData("Hello") to node1,
node1 sent sessionEnd() to node2 // Unexpected session-end
)
}
private class ConditionalExceptionFlow(val otherParty: Party, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val throwException = receive<Boolean>(otherParty).unwrap { it }
if (throwException) {
throw MyFlowException("Throwing exception as requested")
}
send(otherParty, sendPayload)
}
}
@Test
fun `retry subFlow due to receiving FlowException`() {
class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic<String>() {
@Suspendable
override fun call(): String = sendAndReceive<String>(otherParty, throwException).unwrap { it }
}
class RetryOnExceptionFlow(val otherParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
return try {
subFlow(AskForExceptionFlow(otherParty, throwException = true))
} catch (e: MyFlowException) {
subFlow(AskForExceptionFlow(otherParty, throwException = false))
}
}
}
node2.services.registerFlowInitiator(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
}
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(
@ -396,15 +505,16 @@ class StateMachineManagerTests {
private fun sessionData(payload: Any) = SessionData(0, payload)
private val sessionEnd = SessionEnd(0)
private fun sessionEnd(error: FlowException? = null) = SessionEnd(0, error)
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
assertThat(sessionTransfers).containsExactly(*expected)
}
private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer) {
private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer): List<SessionTransfer> {
val actualForNode = sessionTransfers.filter { it.from == node.id || it.to == node.net.myAddress }
assertThat(actualForNode).containsExactly(*expected)
return actualForNode
}
private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
@ -433,7 +543,6 @@ class StateMachineManagerTests {
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.net.myAddress)
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@Transient var flowStarted = false
@ -491,7 +600,16 @@ class StateMachineManagerTests {
}
}
private object ExceptionFlow : FlowLogic<Nothing>() {
override fun call(): Nothing = throw Exception()
private class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<Nothing>() {
lateinit var exceptionThrown: E
override fun call(): Nothing {
exceptionThrown = exception()
throw exceptionThrown
}
}
private class MyFlowException(message: String) : FlowException(message) {
override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message
override fun hashCode(): Int = message?.hashCode() ?: 31
}
}