CORDA-3139: Cater for port already bound scenario during port allocation (#5361)

* CORDA-3139: Cater for port already bound scenario during port allocation

Also moved `SharedMemoryIncremental` into a separate file as it getting bigger
and improved readability of logic and added some logging.

* CORDA-3139: Fix the unit test

* CORDA-3139: Improve logging when failing

* CORDA-3139: Improve stability of the test
This commit is contained in:
Viktor Kolomeyko
2019-08-12 10:38:15 +01:00
committed by Matthew Nesbit
parent b8e278680d
commit 6db3ded032
5 changed files with 153 additions and 98 deletions

View File

@ -16,6 +16,7 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.internal.SharedMemoryIncremental
import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.driver.internal.internalServices import net.corda.testing.driver.internal.internalServices
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
@ -26,12 +27,6 @@ import net.corda.testing.node.internal.genericDriver
import net.corda.testing.node.internal.getTimestampAsDirectoryName import net.corda.testing.node.internal.getTimestampAsDirectoryName
import net.corda.testing.node.internal.newContext import net.corda.testing.node.internal.newContext
import rx.Observable import rx.Observable
import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer
import java.io.File
import java.io.RandomAccessFile
import java.nio.MappedByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@ -110,6 +105,7 @@ data class WebserverHandle(
) )
@DoNotImplement @DoNotImplement
// Unfortunately cannot be an interface due to `defaultAllocator`
abstract class PortAllocation { abstract class PortAllocation {
companion object { companion object {
@ -119,7 +115,6 @@ abstract class PortAllocation {
const val FIRST_EPHEMERAL_PORT = 30_000 const val FIRST_EPHEMERAL_PORT = 30_000
} }
/** Get the next available port via [nextPort] and then return a [NetworkHostAndPort] **/ /** Get the next available port via [nextPort] and then return a [NetworkHostAndPort] **/
fun nextHostAndPort(): NetworkHostAndPort = NetworkHostAndPort("localhost", nextPort()) fun nextHostAndPort(): NetworkHostAndPort = NetworkHostAndPort("localhost", nextPort())
@ -138,56 +133,8 @@ abstract class PortAllocation {
return SharedMemoryIncremental.INSTANCE.nextPort() return SharedMemoryIncremental.INSTANCE.nextPort()
} }
} }
private class SharedMemoryIncremental private constructor(startPort: Int, endPort: Int, file: File = File(System.getProperty("user.home"), "corda-$startPort-to-$endPort-port-allocator.bin")) : PortAllocation() {
private val startingPoint: Int = startPort
private val endPoint: Int = endPort
private val backingFile: RandomAccessFile = RandomAccessFile(file, "rw")
private val mb: MappedByteBuffer
private val startingAddress: Long
/**
* An implementation of [PortAllocation] which allocates ports sequentially
*/
companion object {
private val UNSAFE: Unsafe = getUnsafe()
private fun getUnsafe(): Unsafe {
val f = Unsafe::class.java.getDeclaredField("theUnsafe")
f.isAccessible = true
return f.get(null) as Unsafe
}
val INSTANCE = SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT)
}
override fun nextPort(): Int {
var oldValue: Long
var newValue: Long
do {
oldValue = UNSAFE.getLongVolatile(null, startingAddress)
newValue = if (oldValue + 1 >= endPoint || oldValue < startingPoint) {
startingPoint.toLong()
} else {
(oldValue + 1)
}
} while (!UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue))
return newValue.toInt()
}
init {
mb = backingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 16)
startingAddress = (mb as DirectBuffer).address()
}
}
} }
/** /**
* A class containing configuration information for Jolokia JMX, to be used when creating a node via the [driver]. * A class containing configuration information for Jolokia JMX, to be used when creating a node via the [driver].
* *

View File

@ -0,0 +1,74 @@
package net.corda.testing.driver.internal
import net.corda.core.utilities.contextLogger
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.isLocalPortBound
import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer
import java.io.File
import java.io.RandomAccessFile
import java.nio.MappedByteBuffer
import java.nio.channels.FileChannel
/**
* It uses backing file to store information about last allocated port.
* Implementation note:
* The (small)file is read into memory and then `Unsafe` operation is used to work directly with that memory
* performing atomic compare and swap operations as necessary
* This enables the same file to be used my multiple processed running on the same machine such that they will be
* able to concurrently allocate ports without clashing with each other.
*/
internal class SharedMemoryIncremental
private constructor(private val startPort: Int, private val endPort: Int,
file: File = File(System.getProperty("user.home"), "corda-$startPort-to-$endPort-port-allocator.bin")) : PortAllocation() {
private val backingFile: RandomAccessFile = RandomAccessFile(file, "rw")
private val mb: MappedByteBuffer
private val memoryOffsetAddress: Long
init {
mb = backingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 16) // TODO: Do we really need 16 bytes? Given that we care about Int it should be enough to have 4
memoryOffsetAddress = (mb as DirectBuffer).address()
}
/**
* An implementation of [PortAllocation] which allocates ports sequentially
*/
companion object {
private val UNSAFE: Unsafe = getUnsafe()
private fun getUnsafe(): Unsafe {
val f = Unsafe::class.java.getDeclaredField("theUnsafe")
f.isAccessible = true
return f.get(null) as Unsafe
}
val INSTANCE = SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT)
val logger = contextLogger()
}
override fun nextPort(): Int {
var newValue: Long
do {
val oldValue = UNSAFE.getLongVolatile(null, memoryOffsetAddress)
newValue = if (oldValue + 1 >= endPort || oldValue < startPort) {
logger.warn("Port allocation rolling over: oldValue=$oldValue, startPort=$startPort, endPort=$endPort")
startPort.toLong()
} else {
(oldValue + 1)
}
val compareAndSwapSuccess = UNSAFE.compareAndSwapLong(null, memoryOffsetAddress, oldValue, newValue)
val success = if (!compareAndSwapSuccess) false else {
val alreadyBound = isLocalPortBound(newValue.toInt())
if (alreadyBound) {
logger.warn("Port $newValue appears to be bound. Allocator will skip it.")
}
!alreadyBound
}
} while (!success)
return newValue.toInt()
}
}

View File

@ -36,7 +36,8 @@ public class PortAllocationRunner {
* allocate ports and print out for later consumption by the spawning test * allocate ports and print out for later consumption by the spawning test
*/ */
PortAllocation pa = PortAllocation.getDefaultAllocator(); PortAllocation pa = PortAllocation.getDefaultAllocator();
for (int i = 0; i < 10000; i++) { int iterCount = Integer.parseInt(args[2]);
for (int i = 0; i < iterCount; i++) {
System.out.println(pa.nextPort()); System.out.println(pa.nextPort());
} }
} }

View File

@ -1,25 +1,33 @@
package net.corda.testing.driver package net.corda.testing.driver
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.testing.node.PortAllocationRunner import net.corda.testing.node.PortAllocationRunner
import org.assertj.core.util.Files import org.assertj.core.util.Files
import org.hamcrest.CoreMatchers.`is` import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.core.IsNot.not import org.hamcrest.core.IsNot.not
import org.hamcrest.number.OrderingComparison import org.hamcrest.number.OrderingComparison
import org.junit.Assert import org.junit.Assert
import org.junit.Assume.assumeFalse
import org.junit.Test import org.junit.Test
import java.io.RandomAccessFile import java.io.RandomAccessFile
import java.nio.channels.FileChannel import java.nio.channels.FileChannel
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.streams.toList
class PortAllocationTest { class PortAllocationTest {
companion object {
val logger = contextLogger()
}
@Test @Test
fun `should allocate a port whilst cycling back round if exceeding start of ephemeral range`() { fun `should allocate a port whilst cycling back round if exceeding start of ephemeral range`() {
val startingPoint = PortAllocation.DEFAULT_START_PORT val startingPoint = PortAllocation.DEFAULT_START_PORT
val portAllocator = PortAllocation.defaultAllocator val portAllocator = PortAllocation.defaultAllocator
var previous = portAllocator.nextPort() var previous = portAllocator.nextPort()
(0 until 1000000).forEach { _ -> (0 until 50_000).forEach { _ ->
val next = portAllocator.nextPort() val next = portAllocator.nextPort()
Assert.assertThat(next, `is`(not(previous))) Assert.assertThat(next, `is`(not(previous)))
Assert.assertThat(next, `is`(OrderingComparison.lessThan(PortAllocation.FIRST_EPHEMERAL_PORT))) Assert.assertThat(next, `is`(OrderingComparison.lessThan(PortAllocation.FIRST_EPHEMERAL_PORT)))
@ -27,7 +35,7 @@ class PortAllocationTest {
if (next == startingPoint) { if (next == startingPoint) {
Assert.assertThat(previous, `is`(PortAllocation.FIRST_EPHEMERAL_PORT - 1)) Assert.assertThat(previous, `is`(PortAllocation.FIRST_EPHEMERAL_PORT - 1))
} else { } else {
Assert.assertThat(next, `is`(previous + 1)) Assert.assertTrue(next >= previous + 1)
} }
previous = next previous = next
} }
@ -35,50 +43,60 @@ class PortAllocationTest {
@Test(timeout = 120_000) @Test(timeout = 120_000)
fun `should support multiprocess port allocation`() { fun `should support multiprocess port allocation`() {
if (!System.getProperty("os.name").toLowerCase().contains("windows")) { assumeFalse(System.getProperty("os.name").toLowerCase().contains("windows"))
println("Starting multiprocess port allocation test")
val spinnerFile = Files.newTemporaryFile().also { it.deleteOnExit() }.absolutePath
val process1 = buildJvmProcess(spinnerFile, 1)
val process2 = buildJvmProcess(spinnerFile, 2)
println("Started child processes") logger.info("Starting multiprocess port allocation test")
val spinnerFile = Files.newTemporaryFile().also { it.deleteOnExit() }.absolutePath
val iterCount = 8_000 // Default port range 10000-30000 since we will have 2 processes we want to make sure there is enough leg room
// If we rollover, we may well receive the ports that were already given to a different process
val process1 = buildJvmProcess(spinnerFile, 1, iterCount)
val process2 = buildJvmProcess(spinnerFile, 2, iterCount)
val processes = listOf(process1, process2) logger.info("Started child processes")
val spinnerBackingFile = RandomAccessFile(spinnerFile, "rw") val processes = listOf(process1, process2)
println("Mapped spinner file")
val spinnerBuffer = spinnerBackingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 512)
println("Created spinner buffer")
var timeWaited = 0L val spinnerBackingFile = RandomAccessFile(spinnerFile, "rw")
logger.info("Mapped spinner file at: $spinnerFile")
val spinnerBuffer = spinnerBackingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 512)
logger.info("Created spinner buffer")
while (spinnerBuffer.getShort(1) != 10.toShort() && spinnerBuffer.getShort(2) != 10.toShort() && timeWaited < 60_000) { var timeWaited = 0L
println("Waiting to childProcesses to report back. waited ${timeWaited}ms")
Thread.sleep(1000)
timeWaited += 1000
}
//GO! while (spinnerBuffer.getShort(1) != 10.toShort() && spinnerBuffer.getShort(2) != 10.toShort() && timeWaited < 60_000) {
println("Instructing child processes to start allocating ports") logger.info("Waiting to childProcesses to report back. waited ${timeWaited}ms")
spinnerBuffer.putShort(0, 8) Thread.sleep(1000)
println("Waiting for child processes to terminate") timeWaited += 1000
processes.forEach { it.waitFor(1, TimeUnit.MINUTES) }
println("child processes terminated")
val process1Output = process1.inputStream.reader().readLines().toSet()
val process2Output = process2.inputStream.reader().readLines().toSet()
println("child process out captured")
Assert.assertThat(process1Output.size, `is`(10_000))
Assert.assertThat(process2Output.size, `is`(10_000))
//there should be no overlap between the outputs as each process should have been allocated a unique set of ports
Assert.assertThat(process1Output.intersect(process2Output), `is`(emptySet()))
} }
//GO!
logger.info("Instructing child processes to start allocating ports")
spinnerBuffer.putShort(0, 8)
logger.info("Waiting for child processes to terminate")
val terminationStatuses = processes.parallelStream().map { if(it.waitFor(1, TimeUnit.MINUTES)) "OK" else "STILL RUNNING" }.toList()
logger.info("child processes terminated: $terminationStatuses")
fun List<String>.setOfPorts() : Set<Int> {
// May include warnings when ports are busy
return map { Try.on { Integer.parseInt(it)} }.filter { it.isSuccess }.map { it.getOrThrow() }.toSet()
}
val lines1 = process1.inputStream.reader().readLines()
val portsAllocated1 = lines1.setOfPorts()
val lines2 = process2.inputStream.reader().readLines()
val portsAllocated2 = lines2.setOfPorts()
logger.info("child process out captured")
Assert.assertThat(lines1.joinToString(), portsAllocated1.size, `is`(iterCount))
Assert.assertThat(lines2.joinToString(), portsAllocated2.size, `is`(iterCount))
//there should be no overlap between the outputs as each process should have been allocated a unique set of ports
val intersect = portsAllocated1.intersect(portsAllocated2)
Assert.assertThat(intersect.joinToString(), intersect, `is`(emptySet()))
} }
private fun buildJvmProcess(spinnerFile: String, reportingIndex: Int): Process { private fun buildJvmProcess(spinnerFile: String, reportingIndex: Int, iterCount: Int): Process {
val separator = System.getProperty("file.separator") val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path") val classpath = System.getProperty("java.class.path")
val path = (System.getProperty("java.home") val path = (System.getProperty("java.home")
@ -87,11 +105,9 @@ class PortAllocationTest {
classpath, classpath,
PortAllocationRunner::class.java.name, PortAllocationRunner::class.java.name,
spinnerFile, spinnerFile,
reportingIndex.toString()) reportingIndex.toString(),
iterCount.toString())
return processBuilder.start() return processBuilder.start()
} }
} }

View File

@ -40,6 +40,8 @@ import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.stubs.CertificateStoreStubs import net.corda.testing.internal.stubs.CertificateStoreStubs
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.IOException
import java.net.ServerSocket
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.security.KeyPair import java.security.KeyPair
@ -229,3 +231,18 @@ fun <R> withTestSerializationEnvIfNotSet(block: () -> R): R {
createTestSerializationEnv().asTestContextEnv { block() } createTestSerializationEnv().asTestContextEnv { block() }
} }
} }
/**
* Used to check if particular port is already bound i.e. not vacant
*/
fun isLocalPortBound(port: Int) : Boolean {
return try {
ServerSocket(port).use {
// Successful means that the port was vacant
false
}
} catch (e: IOException) {
// Failed to open server socket means that it is already bound by someone
true
}
}