mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Merge pull request #1650 from corda/aslemmer-rpc-delay-arg-deserialisation
Delay RPC arguments deserialisation to allow routing of errors
This commit is contained in:
@ -7,6 +7,7 @@ import net.corda.core.internal.concurrent.fork
|
|||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
@ -315,9 +316,9 @@ class RPCStabilityTests {
|
|||||||
clientAddress = SimpleString(myQueue),
|
clientAddress = SimpleString(myQueue),
|
||||||
id = RPCApi.RpcRequestId(random63BitValue()),
|
id = RPCApi.RpcRequestId(random63BitValue()),
|
||||||
methodName = SlowConsumerRPCOps::streamAtInterval.name,
|
methodName = SlowConsumerRPCOps::streamAtInterval.name,
|
||||||
arguments = listOf(10.millis, 123456)
|
serialisedArguments = listOf(10.millis, 123456).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes
|
||||||
)
|
)
|
||||||
request.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message)
|
request.writeToClientMessage(message)
|
||||||
producer.send(message)
|
producer.send(message)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ import net.corda.core.internal.LifeCycle
|
|||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.Try
|
import net.corda.core.utilities.Try
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
@ -208,11 +209,12 @@ class RPCClientProxyHandler(
|
|||||||
val rpcId = RPCApi.RpcRequestId(random63BitValue())
|
val rpcId = RPCApi.RpcRequestId(random63BitValue())
|
||||||
callSiteMap?.set(rpcId.toLong, Throwable("<Call site of root RPC '${method.name}'>"))
|
callSiteMap?.set(rpcId.toLong, Throwable("<Call site of root RPC '${method.name}'>"))
|
||||||
try {
|
try {
|
||||||
val request = RPCApi.ClientToServer.RpcRequest(clientAddress, rpcId, method.name, arguments?.toList() ?: emptyList())
|
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
|
||||||
|
val request = RPCApi.ClientToServer.RpcRequest(clientAddress, rpcId, method.name, serialisedArguments.bytes)
|
||||||
val replyFuture = SettableFuture.create<Any>()
|
val replyFuture = SettableFuture.create<Any>()
|
||||||
sessionAndProducerPool.run {
|
sessionAndProducerPool.run {
|
||||||
val message = it.session.createMessage(false)
|
val message = it.session.createMessage(false)
|
||||||
request.writeToClientMessage(serializationContextWithObservableContext, message)
|
request.writeToClientMessage(message)
|
||||||
|
|
||||||
log.debug {
|
log.debug {
|
||||||
val argumentsString = arguments?.joinToString() ?: ""
|
val argumentsString = arguments?.joinToString() ?: ""
|
||||||
|
@ -546,7 +546,6 @@ public class FlowCookbookJava {
|
|||||||
// DOCSTART 37
|
// DOCSTART 37
|
||||||
twiceSignedTx.checkSignaturesAreValid();
|
twiceSignedTx.checkSignaturesAreValid();
|
||||||
// DOCEND 37
|
// DOCEND 37
|
||||||
|
|
||||||
} catch (GeneralSecurityException e) {
|
} catch (GeneralSecurityException e) {
|
||||||
// Handle this as required.
|
// Handle this as required.
|
||||||
}
|
}
|
||||||
|
@ -97,20 +97,20 @@ object RPCApi {
|
|||||||
* @param clientAddress return address to contact the client at.
|
* @param clientAddress return address to contact the client at.
|
||||||
* @param id a unique ID for the request, which the server will use to identify its response with.
|
* @param id a unique ID for the request, which the server will use to identify its response with.
|
||||||
* @param methodName name of the method (procedure) to be called.
|
* @param methodName name of the method (procedure) to be called.
|
||||||
* @param arguments arguments to pass to the method, if any.
|
* @param serialisedArguments Serialised arguments to pass to the method, if any.
|
||||||
*/
|
*/
|
||||||
data class RpcRequest(
|
data class RpcRequest(
|
||||||
val clientAddress: SimpleString,
|
val clientAddress: SimpleString,
|
||||||
val id: RpcRequestId,
|
val id: RpcRequestId,
|
||||||
val methodName: String,
|
val methodName: String,
|
||||||
val arguments: List<Any?>
|
val serialisedArguments: ByteArray
|
||||||
) : ClientToServer() {
|
) : ClientToServer() {
|
||||||
fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
fun writeToClientMessage(message: ClientMessage) {
|
||||||
MessageUtil.setJMSReplyTo(message, clientAddress)
|
MessageUtil.setJMSReplyTo(message, clientAddress)
|
||||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REQUEST.ordinal)
|
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REQUEST.ordinal)
|
||||||
message.putLongProperty(RPC_ID_FIELD_NAME, id.toLong)
|
message.putLongProperty(RPC_ID_FIELD_NAME, id.toLong)
|
||||||
message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName)
|
message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName)
|
||||||
message.bodyBuffer.writeBytes(arguments.serialize(context = context).bytes)
|
message.bodyBuffer.writeBytes(serialisedArguments)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,14 +128,14 @@ object RPCApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun fromClientMessage(context: SerializationContext, message: ClientMessage): ClientToServer {
|
fun fromClientMessage(message: ClientMessage): ClientToServer {
|
||||||
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
|
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
|
||||||
return when (tag) {
|
return when (tag) {
|
||||||
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
|
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
|
||||||
clientAddress = MessageUtil.getJMSReplyTo(message),
|
clientAddress = MessageUtil.getJMSReplyTo(message),
|
||||||
id = RpcRequestId(message.getLongProperty(RPC_ID_FIELD_NAME)),
|
id = RpcRequestId(message.getLongProperty(RPC_ID_FIELD_NAME)),
|
||||||
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
|
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
|
||||||
arguments = message.getBodyAsByteArray().deserialize(context = context)
|
serialisedArguments = message.getBodyAsByteArray()
|
||||||
)
|
)
|
||||||
RPCApi.ClientToServer.Tag.OBSERVABLES_CLOSED -> {
|
RPCApi.ClientToServer.Tag.OBSERVABLES_CLOSED -> {
|
||||||
val ids = ArrayList<ObservableId>()
|
val ids = ArrayList<ObservableId>()
|
||||||
|
@ -19,6 +19,7 @@ import net.corda.core.internal.LifeCycle
|
|||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
||||||
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.utilities.Try
|
import net.corda.core.utilities.Try
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
@ -260,18 +261,28 @@ class RPCServer(
|
|||||||
|
|
||||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||||
lifeCycle.requireState(State.STARTED)
|
lifeCycle.requireState(State.STARTED)
|
||||||
val clientToServer = RPCApi.ClientToServer.fromClientMessage(RPC_SERVER_CONTEXT, artemisMessage)
|
val clientToServer = RPCApi.ClientToServer.fromClientMessage(artemisMessage)
|
||||||
log.debug { "-> RPC -> $clientToServer" }
|
log.debug { "-> RPC -> $clientToServer" }
|
||||||
when (clientToServer) {
|
when (clientToServer) {
|
||||||
is RPCApi.ClientToServer.RpcRequest -> {
|
is RPCApi.ClientToServer.RpcRequest -> {
|
||||||
val rpcContext = RpcContext(
|
val arguments = Try.on {
|
||||||
currentUser = getUser(artemisMessage)
|
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
|
||||||
)
|
}
|
||||||
|
when (arguments) {
|
||||||
|
is Try.Success -> {
|
||||||
|
val rpcContext = RpcContext(currentUser = getUser(artemisMessage))
|
||||||
rpcExecutor!!.submit {
|
rpcExecutor!!.submit {
|
||||||
val result = invokeRpc(rpcContext, clientToServer.methodName, clientToServer.arguments)
|
val result = invokeRpc(rpcContext, clientToServer.methodName, arguments.value)
|
||||||
sendReply(clientToServer.id, clientToServer.clientAddress, result)
|
sendReply(clientToServer.id, clientToServer.clientAddress, result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
is Try.Failure -> {
|
||||||
|
// We failed to deserialise the arguments, route back the error
|
||||||
|
log.warn("Inbound RPC failed", arguments.exception)
|
||||||
|
sendReply(clientToServer.id, clientToServer.clientAddress, arguments)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
is RPCApi.ClientToServer.ObservablesClosed -> {
|
is RPCApi.ClientToServer.ObservablesClosed -> {
|
||||||
observableMap.invalidateAll(clientToServer.ids)
|
observableMap.invalidateAll(clientToServer.ids)
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import net.corda.nodeapi.internal.ServiceInfo
|
|||||||
import net.corda.testing.DUMMY_BANK_A
|
import net.corda.testing.DUMMY_BANK_A
|
||||||
import net.corda.testing.DUMMY_BANK_B
|
import net.corda.testing.DUMMY_BANK_B
|
||||||
import net.corda.testing.DUMMY_NOTARY
|
import net.corda.testing.DUMMY_NOTARY
|
||||||
|
import net.corda.testing.driver.PortAllocation
|
||||||
import net.corda.testing.driver.driver
|
import net.corda.testing.driver.driver
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.util.concurrent.CompletableFuture.supplyAsync
|
import java.util.concurrent.CompletableFuture.supplyAsync
|
||||||
@ -16,11 +17,11 @@ class AttachmentDemoTest {
|
|||||||
// run with a 10,000,000 bytes in-memory zip file. In practice, a slightly bigger file will be used (~10,002,000 bytes).
|
// run with a 10,000,000 bytes in-memory zip file. In practice, a slightly bigger file will be used (~10,002,000 bytes).
|
||||||
@Test fun `attachment demo using a 10MB zip file`() {
|
@Test fun `attachment demo using a 10MB zip file`() {
|
||||||
val numOfExpectedBytes = 10_000_000
|
val numOfExpectedBytes = 10_000_000
|
||||||
driver(dsl = {
|
driver(isDebug = true, portAllocation = PortAllocation.Incremental(20000)) {
|
||||||
val demoUser = listOf(User("demo", "demo", setOf(startFlowPermission<AttachmentDemoFlow>())))
|
val demoUser = listOf(User("demo", "demo", setOf(startFlowPermission<AttachmentDemoFlow>())))
|
||||||
val (nodeA, nodeB) = listOf(
|
val (nodeA, nodeB) = listOf(
|
||||||
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser),
|
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser, maximumHeapSize = "1g"),
|
||||||
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser),
|
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser, maximumHeapSize = "1g"),
|
||||||
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))))
|
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))))
|
||||||
.map { it.getOrThrow() }
|
.map { it.getOrThrow() }
|
||||||
startWebserver(nodeB).getOrThrow()
|
startWebserver(nodeB).getOrThrow()
|
||||||
@ -39,6 +40,6 @@ class AttachmentDemoTest {
|
|||||||
|
|
||||||
senderThread.getOrThrow()
|
senderThread.getOrThrow()
|
||||||
recipientThread.getOrThrow()
|
recipientThread.getOrThrow()
|
||||||
}, isDebug = true)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,6 +93,7 @@ private fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.
|
|||||||
// Make sure we have the file in storage
|
// Make sure we have the file in storage
|
||||||
if (!rpc.attachmentExists(hash)) {
|
if (!rpc.attachmentExists(hash)) {
|
||||||
inputStream.use {
|
inputStream.use {
|
||||||
|
val avail = inputStream.available()
|
||||||
val id = rpc.uploadAttachment(it)
|
val id = rpc.uploadAttachment(it)
|
||||||
require(hash == id) { "Id was '$id' instead of '$hash'" }
|
require(hash == id) { "Id was '$id' instead of '$hash'" }
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,8 @@ interface DriverDSLExposedInterface : CordformContext {
|
|||||||
rpcUsers: List<User> = defaultParameters.rpcUsers,
|
rpcUsers: List<User> = defaultParameters.rpcUsers,
|
||||||
verifierType: VerifierType = defaultParameters.verifierType,
|
verifierType: VerifierType = defaultParameters.verifierType,
|
||||||
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
|
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
|
||||||
startInSameProcess: Boolean? = defaultParameters.startInSameProcess): CordaFuture<NodeHandle>
|
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
|
||||||
|
maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture<NodeHandle>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper function for starting a [node] with custom parameters from Java.
|
* Helper function for starting a [node] with custom parameters from Java.
|
||||||
@ -107,7 +108,8 @@ interface DriverDSLExposedInterface : CordformContext {
|
|||||||
|
|
||||||
fun startNodes(
|
fun startNodes(
|
||||||
nodes: List<CordformNode>,
|
nodes: List<CordformNode>,
|
||||||
startInSameProcess: Boolean? = null
|
startInSameProcess: Boolean? = null,
|
||||||
|
maximumHeapSize: String = "200m"
|
||||||
): List<CordaFuture<NodeHandle>>
|
): List<CordaFuture<NodeHandle>>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -135,7 +137,7 @@ interface DriverDSLExposedInterface : CordformContext {
|
|||||||
*
|
*
|
||||||
* @param handle The handle for the node that this webserver connects to via RPC.
|
* @param handle The handle for the node that this webserver connects to via RPC.
|
||||||
*/
|
*/
|
||||||
fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle>
|
fun startWebserver(handle: NodeHandle, maximumHeapSize: String = "200m"): CordaFuture<WebserverHandle>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
|
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
|
||||||
@ -143,7 +145,7 @@ interface DriverDSLExposedInterface : CordformContext {
|
|||||||
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
|
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
|
||||||
* value will be used.
|
* value will be used.
|
||||||
*/
|
*/
|
||||||
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null): CordaFuture<NodeHandle>
|
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null, maximumHeapSize: String = "200m"): CordaFuture<NodeHandle>
|
||||||
|
|
||||||
fun waitForAllNodesToFinish()
|
fun waitForAllNodesToFinish()
|
||||||
|
|
||||||
@ -252,7 +254,7 @@ sealed class PortAllocation {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper builder for configuring a [node] from Java.
|
* Helper builder for configuring a [Node] from Java.
|
||||||
*/
|
*/
|
||||||
data class NodeParameters(
|
data class NodeParameters(
|
||||||
val providedName: CordaX500Name? = null,
|
val providedName: CordaX500Name? = null,
|
||||||
@ -260,7 +262,8 @@ data class NodeParameters(
|
|||||||
val rpcUsers: List<User> = emptyList(),
|
val rpcUsers: List<User> = emptyList(),
|
||||||
val verifierType: VerifierType = VerifierType.InMemory,
|
val verifierType: VerifierType = VerifierType.InMemory,
|
||||||
val customOverrides: Map<String, Any?> = emptyMap(),
|
val customOverrides: Map<String, Any?> = emptyMap(),
|
||||||
val startInSameProcess: Boolean? = null
|
val startInSameProcess: Boolean? = null,
|
||||||
|
val maximumHeapSize: String = "200m"
|
||||||
) {
|
) {
|
||||||
fun setProvidedName(providedName: CordaX500Name?) = copy(providedName = providedName)
|
fun setProvidedName(providedName: CordaX500Name?) = copy(providedName = providedName)
|
||||||
fun setAdvertisedServices(advertisedServices: Set<ServiceInfo>) = copy(advertisedServices = advertisedServices)
|
fun setAdvertisedServices(advertisedServices: Set<ServiceInfo>) = copy(advertisedServices = advertisedServices)
|
||||||
@ -268,6 +271,7 @@ data class NodeParameters(
|
|||||||
fun setVerifierType(verifierType: VerifierType) = copy(verifierType = verifierType)
|
fun setVerifierType(verifierType: VerifierType) = copy(verifierType = verifierType)
|
||||||
fun setCustomerOverrides(customOverrides: Map<String, Any?>) = copy(customOverrides = customOverrides)
|
fun setCustomerOverrides(customOverrides: Map<String, Any?>) = copy(customOverrides = customOverrides)
|
||||||
fun setStartInSameProcess(startInSameProcess: Boolean?) = copy(startInSameProcess = startInSameProcess)
|
fun setStartInSameProcess(startInSameProcess: Boolean?) = copy(startInSameProcess = startInSameProcess)
|
||||||
|
fun setMaximumHeapSize(maximumHeapSize: String) = copy(maximumHeapSize = maximumHeapSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -677,7 +681,8 @@ class DriverDSL(
|
|||||||
rpcUsers: List<User>,
|
rpcUsers: List<User>,
|
||||||
verifierType: VerifierType,
|
verifierType: VerifierType,
|
||||||
customOverrides: Map<String, Any?>,
|
customOverrides: Map<String, Any?>,
|
||||||
startInSameProcess: Boolean?
|
startInSameProcess: Boolean?,
|
||||||
|
maximumHeapSize: String
|
||||||
): CordaFuture<NodeHandle> {
|
): CordaFuture<NodeHandle> {
|
||||||
val p2pAddress = portAllocation.nextHostAndPort()
|
val p2pAddress = portAllocation.nextHostAndPort()
|
||||||
val rpcAddress = portAllocation.nextHostAndPort()
|
val rpcAddress = portAllocation.nextHostAndPort()
|
||||||
@ -703,10 +708,10 @@ class DriverDSL(
|
|||||||
"verifierType" to verifierType.name
|
"verifierType" to verifierType.name
|
||||||
) + customOverrides
|
) + customOverrides
|
||||||
)
|
)
|
||||||
return startNodeInternal(config, webAddress, startInSameProcess)
|
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?): List<CordaFuture<NodeHandle>> {
|
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?, maximumHeapSize: String): List<CordaFuture<NodeHandle>> {
|
||||||
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
|
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
|
||||||
return nodes.map { node ->
|
return nodes.map { node ->
|
||||||
portAllocation.nextHostAndPort() // rpcAddress
|
portAllocation.nextHostAndPort() // rpcAddress
|
||||||
@ -723,7 +728,7 @@ class DriverDSL(
|
|||||||
"notaryClusterAddresses" to node.notaryClusterAddresses
|
"notaryClusterAddresses" to node.notaryClusterAddresses
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
startNodeInternal(config, webAddress, startInSameProcess)
|
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -784,9 +789,9 @@ class DriverDSL(
|
|||||||
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
|
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle> {
|
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
|
||||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||||
val processFuture = DriverDSL.startWebserver(executorService, handle, debugPort)
|
val processFuture = DriverDSL.startWebserver(executorService, handle, debugPort, maximumHeapSize)
|
||||||
registerProcess(processFuture)
|
registerProcess(processFuture)
|
||||||
return processFuture.map { queryWebserver(handle, it) }
|
return processFuture.map { queryWebserver(handle, it) }
|
||||||
}
|
}
|
||||||
@ -809,7 +814,7 @@ class DriverDSL(
|
|||||||
|
|
||||||
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
|
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
|
||||||
|
|
||||||
override fun startDedicatedNetworkMapService(startInProcess: Boolean?): CordaFuture<NodeHandle> {
|
override fun startDedicatedNetworkMapService(startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
|
||||||
val webAddress = portAllocation.nextHostAndPort()
|
val webAddress = portAllocation.nextHostAndPort()
|
||||||
val rpcAddress = portAllocation.nextHostAndPort()
|
val rpcAddress = portAllocation.nextHostAndPort()
|
||||||
val networkMapLegalName = networkMapStartStrategy.legalName
|
val networkMapLegalName = networkMapStartStrategy.legalName
|
||||||
@ -828,10 +833,10 @@ class DriverDSL(
|
|||||||
"extraAdvertisedServiceIds" to listOf(ServiceInfo(NetworkMapService.type).toString())
|
"extraAdvertisedServiceIds" to listOf(ServiceInfo(NetworkMapService.type).toString())
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return startNodeInternal(config, webAddress, startInProcess)
|
return startNodeInternal(config, webAddress, startInProcess, maximumHeapSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun startNodeInternal(config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?): CordaFuture<NodeHandle> {
|
private fun startNodeInternal(config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
|
||||||
val nodeConfiguration = config.parseAs<FullNodeConfiguration>()
|
val nodeConfiguration = config.parseAs<FullNodeConfiguration>()
|
||||||
if (startInProcess ?: startNodesInProcess) {
|
if (startInProcess ?: startNodesInProcess) {
|
||||||
val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config)
|
val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config)
|
||||||
@ -852,7 +857,7 @@ class DriverDSL(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||||
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, packagesToScanString.joinToString(","))
|
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, packagesToScanString.joinToString(","), maximumHeapSize)
|
||||||
registerProcess(processFuture)
|
registerProcess(processFuture)
|
||||||
return processFuture.flatMap { process ->
|
return processFuture.flatMap { process ->
|
||||||
val processDeathFuture = poll(executorService, "process death") {
|
val processDeathFuture = poll(executorService, "process death") {
|
||||||
@ -917,7 +922,8 @@ class DriverDSL(
|
|||||||
quasarJarPath: String,
|
quasarJarPath: String,
|
||||||
debugPort: Int?,
|
debugPort: Int?,
|
||||||
overriddenSystemProperties: Map<String, String>,
|
overriddenSystemProperties: Map<String, String>,
|
||||||
packagesToScanString: String
|
packagesToScanString: String,
|
||||||
|
maximumHeapSize: String
|
||||||
): CordaFuture<Process> {
|
): CordaFuture<Process> {
|
||||||
val processFuture = executorService.fork {
|
val processFuture = executorService.fork {
|
||||||
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}")
|
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}")
|
||||||
@ -946,7 +952,8 @@ class DriverDSL(
|
|||||||
jdwpPort = debugPort,
|
jdwpPort = debugPort,
|
||||||
extraJvmArguments = extraJvmArguments,
|
extraJvmArguments = extraJvmArguments,
|
||||||
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
|
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
|
||||||
workingDirectory = nodeConf.baseDirectory
|
workingDirectory = nodeConf.baseDirectory,
|
||||||
|
maximumHeapSize = maximumHeapSize
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return processFuture.flatMap { process ->
|
return processFuture.flatMap { process ->
|
||||||
@ -957,7 +964,8 @@ class DriverDSL(
|
|||||||
private fun startWebserver(
|
private fun startWebserver(
|
||||||
executorService: ScheduledExecutorService,
|
executorService: ScheduledExecutorService,
|
||||||
handle: NodeHandle,
|
handle: NodeHandle,
|
||||||
debugPort: Int?
|
debugPort: Int?,
|
||||||
|
maximumHeapSize: String
|
||||||
): CordaFuture<Process> {
|
): CordaFuture<Process> {
|
||||||
return executorService.fork {
|
return executorService.fork {
|
||||||
val className = "net.corda.webserver.WebServer"
|
val className = "net.corda.webserver.WebServer"
|
||||||
@ -969,7 +977,9 @@ class DriverDSL(
|
|||||||
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
|
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
|
||||||
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
|
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
|
||||||
),
|
),
|
||||||
errorLogPath = Paths.get("error.$className.log")
|
errorLogPath = Paths.get("error.$className.log"),
|
||||||
|
workingDirectory = null,
|
||||||
|
maximumHeapSize = maximumHeapSize
|
||||||
)
|
)
|
||||||
}.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } }
|
}.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } }
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ object ProcessUtilities {
|
|||||||
arguments: List<String>,
|
arguments: List<String>,
|
||||||
jdwpPort: Int? = null
|
jdwpPort: Int? = null
|
||||||
): Process {
|
): Process {
|
||||||
return startJavaProcessImpl(C::class.java.name, arguments, defaultClassPath, jdwpPort, emptyList(), null, null)
|
return startJavaProcessImpl(C::class.java.name, arguments, defaultClassPath, jdwpPort, emptyList(), null, null, null)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun startCordaProcess(
|
fun startCordaProcess(
|
||||||
@ -19,11 +19,12 @@ object ProcessUtilities {
|
|||||||
jdwpPort: Int?,
|
jdwpPort: Int?,
|
||||||
extraJvmArguments: List<String>,
|
extraJvmArguments: List<String>,
|
||||||
errorLogPath: Path?,
|
errorLogPath: Path?,
|
||||||
workingDirectory: Path? = null
|
workingDirectory: Path?,
|
||||||
|
maximumHeapSize: String
|
||||||
): Process {
|
): Process {
|
||||||
// FIXME: Instead of hacking our classpath, use the correct classpath for className.
|
// FIXME: Instead of hacking our classpath, use the correct classpath for className.
|
||||||
val classpath = defaultClassPath.split(pathSeparator).filter { !(it / "log4j2-test.xml").exists() }.joinToString(pathSeparator)
|
val classpath = defaultClassPath.split(pathSeparator).filter { !(it / "log4j2-test.xml").exists() }.joinToString(pathSeparator)
|
||||||
return startJavaProcessImpl(className, arguments, classpath, jdwpPort, extraJvmArguments, errorLogPath, workingDirectory)
|
return startJavaProcessImpl(className, arguments, classpath, jdwpPort, extraJvmArguments, errorLogPath, workingDirectory, maximumHeapSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun startJavaProcessImpl(
|
fun startJavaProcessImpl(
|
||||||
@ -33,12 +34,13 @@ object ProcessUtilities {
|
|||||||
jdwpPort: Int?,
|
jdwpPort: Int?,
|
||||||
extraJvmArguments: List<String>,
|
extraJvmArguments: List<String>,
|
||||||
errorLogPath: Path?,
|
errorLogPath: Path?,
|
||||||
workingDirectory: Path?
|
workingDirectory: Path?,
|
||||||
|
maximumHeapSize: String?
|
||||||
): Process {
|
): Process {
|
||||||
val command = mutableListOf<String>().apply {
|
val command = mutableListOf<String>().apply {
|
||||||
add((System.getProperty("java.home") / "bin" / "java").toString())
|
add((System.getProperty("java.home") / "bin" / "java").toString())
|
||||||
(jdwpPort != null) && add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$jdwpPort")
|
(jdwpPort != null) && add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$jdwpPort")
|
||||||
add("-Xmx200m")
|
if (maximumHeapSize != null) add("-Xmx$maximumHeapSize")
|
||||||
add("-XX:+UseG1GC")
|
add("-XX:+UseG1GC")
|
||||||
addAll(extraJvmArguments)
|
addAll(extraJvmArguments)
|
||||||
add("-cp")
|
add("-cp")
|
||||||
|
Reference in New Issue
Block a user