mirror of
https://github.com/corda/corda.git
synced 2025-01-21 03:55:00 +00:00
More memory to attachment demo test nodes
This commit is contained in:
parent
9d115a2111
commit
762b2a5123
@ -8,6 +8,7 @@ import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.testing.DUMMY_BANK_A
|
||||
import net.corda.testing.DUMMY_BANK_B
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.driver
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.CompletableFuture.supplyAsync
|
||||
@ -16,11 +17,11 @@ class AttachmentDemoTest {
|
||||
// run with a 10,000,000 bytes in-memory zip file. In practice, a slightly bigger file will be used (~10,002,000 bytes).
|
||||
@Test fun `attachment demo using a 10MB zip file`() {
|
||||
val numOfExpectedBytes = 10_000_000
|
||||
driver(dsl = {
|
||||
driver(isDebug = true, portAllocation = PortAllocation.Incremental(20000)) {
|
||||
val demoUser = listOf(User("demo", "demo", setOf(startFlowPermission<AttachmentDemoFlow>())))
|
||||
val (nodeA, nodeB) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser),
|
||||
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser),
|
||||
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser, maximumHeapSize = "1g"),
|
||||
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser, maximumHeapSize = "1g"),
|
||||
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))))
|
||||
.map { it.getOrThrow() }
|
||||
startWebserver(nodeB).getOrThrow()
|
||||
@ -39,6 +40,6 @@ class AttachmentDemoTest {
|
||||
|
||||
senderThread.getOrThrow()
|
||||
recipientThread.getOrThrow()
|
||||
}, isDebug = true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -93,6 +93,7 @@ private fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.
|
||||
// Make sure we have the file in storage
|
||||
if (!rpc.attachmentExists(hash)) {
|
||||
inputStream.use {
|
||||
val avail = inputStream.available()
|
||||
val id = rpc.uploadAttachment(it)
|
||||
require(hash == id) { "Id was '$id' instead of '$hash'" }
|
||||
}
|
||||
|
@ -94,7 +94,8 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
rpcUsers: List<User> = defaultParameters.rpcUsers,
|
||||
verifierType: VerifierType = defaultParameters.verifierType,
|
||||
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
|
||||
startInSameProcess: Boolean? = defaultParameters.startInSameProcess): CordaFuture<NodeHandle>
|
||||
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
|
||||
maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture<NodeHandle>
|
||||
|
||||
/**
|
||||
* Helper function for starting a [node] with custom parameters from Java.
|
||||
@ -109,7 +110,8 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
|
||||
fun startNodes(
|
||||
nodes: List<CordformNode>,
|
||||
startInSameProcess: Boolean? = null
|
||||
startInSameProcess: Boolean? = null,
|
||||
maximumHeapSize: String = "200m"
|
||||
): List<CordaFuture<NodeHandle>>
|
||||
|
||||
/**
|
||||
@ -137,7 +139,7 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
*
|
||||
* @param handle The handle for the node that this webserver connects to via RPC.
|
||||
*/
|
||||
fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle>
|
||||
fun startWebserver(handle: NodeHandle, maximumHeapSize: String = "200m"): CordaFuture<WebserverHandle>
|
||||
|
||||
/**
|
||||
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
|
||||
@ -145,7 +147,7 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
|
||||
* value will be used.
|
||||
*/
|
||||
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null): CordaFuture<NodeHandle>
|
||||
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null, maximumHeapSize: String = "200m"): CordaFuture<NodeHandle>
|
||||
|
||||
fun waitForAllNodesToFinish()
|
||||
|
||||
@ -250,7 +252,7 @@ sealed class PortAllocation {
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper builder for configuring a [node] from Java.
|
||||
* Helper builder for configuring a [Node] from Java.
|
||||
*/
|
||||
data class NodeParameters(
|
||||
val providedName: CordaX500Name? = null,
|
||||
@ -258,7 +260,8 @@ data class NodeParameters(
|
||||
val rpcUsers: List<User> = emptyList(),
|
||||
val verifierType: VerifierType = VerifierType.InMemory,
|
||||
val customOverrides: Map<String, Any?> = emptyMap(),
|
||||
val startInSameProcess: Boolean? = null
|
||||
val startInSameProcess: Boolean? = null,
|
||||
val maximumHeapSize: String = "200m"
|
||||
) {
|
||||
fun setProvidedName(providedName: CordaX500Name?) = copy(providedName = providedName)
|
||||
fun setAdvertisedServices(advertisedServices: Set<ServiceInfo>) = copy(advertisedServices = advertisedServices)
|
||||
@ -266,6 +269,7 @@ data class NodeParameters(
|
||||
fun setVerifierType(verifierType: VerifierType) = copy(verifierType = verifierType)
|
||||
fun setCustomerOverrides(customOverrides: Map<String, Any?>) = copy(customOverrides = customOverrides)
|
||||
fun setStartInSameProcess(startInSameProcess: Boolean?) = copy(startInSameProcess = startInSameProcess)
|
||||
fun setMaximumHeapSize(maximumHeapSize: String) = copy(maximumHeapSize = maximumHeapSize)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -674,7 +678,8 @@ class DriverDSL(
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String
|
||||
): CordaFuture<NodeHandle> {
|
||||
val p2pAddress = portAllocation.nextHostAndPort()
|
||||
val rpcAddress = portAllocation.nextHostAndPort()
|
||||
@ -700,10 +705,10 @@ class DriverDSL(
|
||||
"verifierType" to verifierType.name
|
||||
) + customOverrides
|
||||
)
|
||||
return startNodeInternal(config, webAddress, startInSameProcess)
|
||||
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
|
||||
}
|
||||
|
||||
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?): List<CordaFuture<NodeHandle>> {
|
||||
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?, maximumHeapSize: String): List<CordaFuture<NodeHandle>> {
|
||||
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
|
||||
return nodes.map { node ->
|
||||
portAllocation.nextHostAndPort() // rpcAddress
|
||||
@ -720,7 +725,7 @@ class DriverDSL(
|
||||
"notaryClusterAddresses" to node.notaryClusterAddresses
|
||||
)
|
||||
)
|
||||
startNodeInternal(config, webAddress, startInSameProcess)
|
||||
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
|
||||
}
|
||||
}
|
||||
|
||||
@ -781,9 +786,9 @@ class DriverDSL(
|
||||
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
|
||||
}
|
||||
|
||||
override fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle> {
|
||||
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
|
||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
val processFuture = DriverDSL.startWebserver(executorService, handle, debugPort)
|
||||
val processFuture = DriverDSL.startWebserver(executorService, handle, debugPort, maximumHeapSize)
|
||||
registerProcess(processFuture)
|
||||
return processFuture.map { queryWebserver(handle, it) }
|
||||
}
|
||||
@ -806,7 +811,7 @@ class DriverDSL(
|
||||
|
||||
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
|
||||
|
||||
override fun startDedicatedNetworkMapService(startInProcess: Boolean?): CordaFuture<NodeHandle> {
|
||||
override fun startDedicatedNetworkMapService(startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
|
||||
val webAddress = portAllocation.nextHostAndPort()
|
||||
val networkMapLegalName = networkMapStartStrategy.legalName
|
||||
val config = ConfigHelper.loadConfig(
|
||||
@ -822,10 +827,10 @@ class DriverDSL(
|
||||
"extraAdvertisedServiceIds" to listOf(ServiceInfo(NetworkMapService.type).toString())
|
||||
)
|
||||
)
|
||||
return startNodeInternal(config, webAddress, startInProcess)
|
||||
return startNodeInternal(config, webAddress, startInProcess, maximumHeapSize)
|
||||
}
|
||||
|
||||
private fun startNodeInternal(config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?): CordaFuture<NodeHandle> {
|
||||
private fun startNodeInternal(config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
|
||||
val nodeConfiguration = config.parseAs<FullNodeConfiguration>()
|
||||
if (startInProcess ?: startNodesInProcess) {
|
||||
val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config)
|
||||
@ -846,7 +851,7 @@ class DriverDSL(
|
||||
}
|
||||
} else {
|
||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, packagesToScanString.joinToString(","))
|
||||
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, packagesToScanString.joinToString(","), maximumHeapSize)
|
||||
registerProcess(processFuture)
|
||||
return processFuture.flatMap { process ->
|
||||
val processDeathFuture = poll(executorService, "process death") {
|
||||
@ -910,7 +915,8 @@ class DriverDSL(
|
||||
quasarJarPath: String,
|
||||
debugPort: Int?,
|
||||
overriddenSystemProperties: Map<String, String>,
|
||||
packagesToScanString: String
|
||||
packagesToScanString: String,
|
||||
maximumHeapSize: String
|
||||
): CordaFuture<Process> {
|
||||
val processFuture = executorService.fork {
|
||||
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}")
|
||||
@ -939,7 +945,8 @@ class DriverDSL(
|
||||
jdwpPort = debugPort,
|
||||
extraJvmArguments = extraJvmArguments,
|
||||
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
|
||||
workingDirectory = nodeConf.baseDirectory
|
||||
workingDirectory = nodeConf.baseDirectory,
|
||||
maximumHeapSize = maximumHeapSize
|
||||
)
|
||||
}
|
||||
return processFuture.flatMap { process ->
|
||||
@ -950,7 +957,8 @@ class DriverDSL(
|
||||
private fun startWebserver(
|
||||
executorService: ScheduledExecutorService,
|
||||
handle: NodeHandle,
|
||||
debugPort: Int?
|
||||
debugPort: Int?,
|
||||
maximumHeapSize: String
|
||||
): CordaFuture<Process> {
|
||||
return executorService.fork {
|
||||
val className = "net.corda.webserver.WebServer"
|
||||
@ -962,7 +970,9 @@ class DriverDSL(
|
||||
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
|
||||
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
|
||||
),
|
||||
errorLogPath = Paths.get("error.$className.log")
|
||||
errorLogPath = Paths.get("error.$className.log"),
|
||||
workingDirectory = null,
|
||||
maximumHeapSize = maximumHeapSize
|
||||
)
|
||||
}.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } }
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ object ProcessUtilities {
|
||||
arguments: List<String>,
|
||||
jdwpPort: Int? = null
|
||||
): Process {
|
||||
return startJavaProcessImpl(C::class.java.name, arguments, defaultClassPath, jdwpPort, emptyList(), null, null)
|
||||
return startJavaProcessImpl(C::class.java.name, arguments, defaultClassPath, jdwpPort, emptyList(), null, null, null)
|
||||
}
|
||||
|
||||
fun startCordaProcess(
|
||||
@ -19,11 +19,12 @@ object ProcessUtilities {
|
||||
jdwpPort: Int?,
|
||||
extraJvmArguments: List<String>,
|
||||
errorLogPath: Path?,
|
||||
workingDirectory: Path? = null
|
||||
workingDirectory: Path?,
|
||||
maximumHeapSize: String
|
||||
): Process {
|
||||
// FIXME: Instead of hacking our classpath, use the correct classpath for className.
|
||||
val classpath = defaultClassPath.split(pathSeparator).filter { !(it / "log4j2-test.xml").exists() }.joinToString(pathSeparator)
|
||||
return startJavaProcessImpl(className, arguments, classpath, jdwpPort, extraJvmArguments, errorLogPath, workingDirectory)
|
||||
return startJavaProcessImpl(className, arguments, classpath, jdwpPort, extraJvmArguments, errorLogPath, workingDirectory, maximumHeapSize)
|
||||
}
|
||||
|
||||
fun startJavaProcessImpl(
|
||||
@ -33,12 +34,13 @@ object ProcessUtilities {
|
||||
jdwpPort: Int?,
|
||||
extraJvmArguments: List<String>,
|
||||
errorLogPath: Path?,
|
||||
workingDirectory: Path?
|
||||
workingDirectory: Path?,
|
||||
maximumHeapSize: String?
|
||||
): Process {
|
||||
val command = mutableListOf<String>().apply {
|
||||
add((System.getProperty("java.home") / "bin" / "java").toString())
|
||||
(jdwpPort != null) && add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$jdwpPort")
|
||||
add("-Xmx200m")
|
||||
if (maximumHeapSize != null) add("-Xmx$maximumHeapSize")
|
||||
add("-XX:+UseG1GC")
|
||||
addAll(extraJvmArguments)
|
||||
add("-cp")
|
||||
|
Loading…
Reference in New Issue
Block a user