From 6db3ded032c8d52fa44986112fcd0d7663281ada Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Mon, 12 Aug 2019 10:38:15 +0100 Subject: [PATCH] CORDA-3139: Cater for port already bound scenario during port allocation (#5361) * CORDA-3139: Cater for port already bound scenario during port allocation Also moved `SharedMemoryIncremental` into a separate file as it getting bigger and improved readability of logic and added some logging. * CORDA-3139: Fix the unit test * CORDA-3139: Improve logging when failing * CORDA-3139: Improve stability of the test --- .../kotlin/net/corda/testing/driver/Driver.kt | 57 +--------- .../internal/SharedMemoryIncremental.kt | 74 +++++++++++++ .../testing/node/PortAllocationRunner.java | 3 +- .../testing/driver/PortAllocationTest.kt | 100 ++++++++++-------- .../testing/internal/InternalTestUtils.kt | 17 +++ 5 files changed, 153 insertions(+), 98 deletions(-) create mode 100644 testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/SharedMemoryIncremental.kt diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 393a29f4be..40e79afdf9 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -16,6 +16,7 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.driver.internal.SharedMemoryIncremental import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.driver.internal.internalServices import net.corda.testing.node.NotarySpec @@ -26,12 +27,6 @@ import net.corda.testing.node.internal.genericDriver import net.corda.testing.node.internal.getTimestampAsDirectoryName import net.corda.testing.node.internal.newContext import rx.Observable -import sun.misc.Unsafe -import sun.nio.ch.DirectBuffer -import java.io.File -import java.io.RandomAccessFile -import java.nio.MappedByteBuffer -import java.nio.channels.FileChannel import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger @@ -110,6 +105,7 @@ data class WebserverHandle( ) @DoNotImplement +// Unfortunately cannot be an interface due to `defaultAllocator` abstract class PortAllocation { companion object { @@ -119,7 +115,6 @@ abstract class PortAllocation { const val FIRST_EPHEMERAL_PORT = 30_000 } - /** Get the next available port via [nextPort] and then return a [NetworkHostAndPort] **/ fun nextHostAndPort(): NetworkHostAndPort = NetworkHostAndPort("localhost", nextPort()) @@ -138,56 +133,8 @@ abstract class PortAllocation { return SharedMemoryIncremental.INSTANCE.nextPort() } } - - private class SharedMemoryIncremental private constructor(startPort: Int, endPort: Int, file: File = File(System.getProperty("user.home"), "corda-$startPort-to-$endPort-port-allocator.bin")) : PortAllocation() { - - private val startingPoint: Int = startPort - private val endPoint: Int = endPort - - private val backingFile: RandomAccessFile = RandomAccessFile(file, "rw") - private val mb: MappedByteBuffer - private val startingAddress: Long - - /** - * An implementation of [PortAllocation] which allocates ports sequentially - */ - - companion object { - - private val UNSAFE: Unsafe = getUnsafe() - private fun getUnsafe(): Unsafe { - val f = Unsafe::class.java.getDeclaredField("theUnsafe") - f.isAccessible = true - return f.get(null) as Unsafe - } - - val INSTANCE = SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT) - } - - override fun nextPort(): Int { - var oldValue: Long - var newValue: Long - do { - oldValue = UNSAFE.getLongVolatile(null, startingAddress) - newValue = if (oldValue + 1 >= endPoint || oldValue < startingPoint) { - startingPoint.toLong() - } else { - (oldValue + 1) - } - } while (!UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue)) - - return newValue.toInt() - } - - init { - mb = backingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 16) - startingAddress = (mb as DirectBuffer).address() - } - } } - - /** * A class containing configuration information for Jolokia JMX, to be used when creating a node via the [driver]. * diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/SharedMemoryIncremental.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/SharedMemoryIncremental.kt new file mode 100644 index 0000000000..79a99b97e1 --- /dev/null +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/SharedMemoryIncremental.kt @@ -0,0 +1,74 @@ +package net.corda.testing.driver.internal + +import net.corda.core.utilities.contextLogger +import net.corda.testing.driver.PortAllocation +import net.corda.testing.internal.isLocalPortBound +import sun.misc.Unsafe +import sun.nio.ch.DirectBuffer +import java.io.File +import java.io.RandomAccessFile +import java.nio.MappedByteBuffer +import java.nio.channels.FileChannel + +/** + * It uses backing file to store information about last allocated port. + * Implementation note: + * The (small)file is read into memory and then `Unsafe` operation is used to work directly with that memory + * performing atomic compare and swap operations as necessary + * This enables the same file to be used my multiple processed running on the same machine such that they will be + * able to concurrently allocate ports without clashing with each other. + */ +internal class SharedMemoryIncremental +private constructor(private val startPort: Int, private val endPort: Int, + file: File = File(System.getProperty("user.home"), "corda-$startPort-to-$endPort-port-allocator.bin")) : PortAllocation() { + + private val backingFile: RandomAccessFile = RandomAccessFile(file, "rw") + private val mb: MappedByteBuffer + private val memoryOffsetAddress: Long + + init { + mb = backingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 16) // TODO: Do we really need 16 bytes? Given that we care about Int it should be enough to have 4 + memoryOffsetAddress = (mb as DirectBuffer).address() + } + + /** + * An implementation of [PortAllocation] which allocates ports sequentially + */ + companion object { + + private val UNSAFE: Unsafe = getUnsafe() + private fun getUnsafe(): Unsafe { + val f = Unsafe::class.java.getDeclaredField("theUnsafe") + f.isAccessible = true + return f.get(null) as Unsafe + } + + val INSTANCE = SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT) + + val logger = contextLogger() + } + + override fun nextPort(): Int { + var newValue: Long + + do { + val oldValue = UNSAFE.getLongVolatile(null, memoryOffsetAddress) + newValue = if (oldValue + 1 >= endPort || oldValue < startPort) { + logger.warn("Port allocation rolling over: oldValue=$oldValue, startPort=$startPort, endPort=$endPort") + startPort.toLong() + } else { + (oldValue + 1) + } + val compareAndSwapSuccess = UNSAFE.compareAndSwapLong(null, memoryOffsetAddress, oldValue, newValue) + val success = if (!compareAndSwapSuccess) false else { + val alreadyBound = isLocalPortBound(newValue.toInt()) + if (alreadyBound) { + logger.warn("Port $newValue appears to be bound. Allocator will skip it.") + } + !alreadyBound + } + } while (!success) + + return newValue.toInt() + } +} \ No newline at end of file diff --git a/testing/node-driver/src/test/java/net/corda/testing/node/PortAllocationRunner.java b/testing/node-driver/src/test/java/net/corda/testing/node/PortAllocationRunner.java index 90d902abed..904ca17c1a 100644 --- a/testing/node-driver/src/test/java/net/corda/testing/node/PortAllocationRunner.java +++ b/testing/node-driver/src/test/java/net/corda/testing/node/PortAllocationRunner.java @@ -36,7 +36,8 @@ public class PortAllocationRunner { * allocate ports and print out for later consumption by the spawning test */ PortAllocation pa = PortAllocation.getDefaultAllocator(); - for (int i = 0; i < 10000; i++) { + int iterCount = Integer.parseInt(args[2]); + for (int i = 0; i < iterCount; i++) { System.out.println(pa.nextPort()); } } diff --git a/testing/node-driver/src/test/kotlin/net/corda/testing/driver/PortAllocationTest.kt b/testing/node-driver/src/test/kotlin/net/corda/testing/driver/PortAllocationTest.kt index 3d78781033..ebe231290f 100644 --- a/testing/node-driver/src/test/kotlin/net/corda/testing/driver/PortAllocationTest.kt +++ b/testing/node-driver/src/test/kotlin/net/corda/testing/driver/PortAllocationTest.kt @@ -1,25 +1,33 @@ package net.corda.testing.driver +import net.corda.core.utilities.Try +import net.corda.core.utilities.contextLogger import net.corda.testing.node.PortAllocationRunner import org.assertj.core.util.Files import org.hamcrest.CoreMatchers.`is` import org.hamcrest.core.IsNot.not import org.hamcrest.number.OrderingComparison import org.junit.Assert +import org.junit.Assume.assumeFalse import org.junit.Test import java.io.RandomAccessFile import java.nio.channels.FileChannel import java.util.concurrent.TimeUnit +import kotlin.streams.toList class PortAllocationTest { + companion object { + val logger = contextLogger() + } + @Test fun `should allocate a port whilst cycling back round if exceeding start of ephemeral range`() { val startingPoint = PortAllocation.DEFAULT_START_PORT val portAllocator = PortAllocation.defaultAllocator var previous = portAllocator.nextPort() - (0 until 1000000).forEach { _ -> + (0 until 50_000).forEach { _ -> val next = portAllocator.nextPort() Assert.assertThat(next, `is`(not(previous))) Assert.assertThat(next, `is`(OrderingComparison.lessThan(PortAllocation.FIRST_EPHEMERAL_PORT))) @@ -27,7 +35,7 @@ class PortAllocationTest { if (next == startingPoint) { Assert.assertThat(previous, `is`(PortAllocation.FIRST_EPHEMERAL_PORT - 1)) } else { - Assert.assertThat(next, `is`(previous + 1)) + Assert.assertTrue(next >= previous + 1) } previous = next } @@ -35,50 +43,60 @@ class PortAllocationTest { @Test(timeout = 120_000) fun `should support multiprocess port allocation`() { - if (!System.getProperty("os.name").toLowerCase().contains("windows")) { - println("Starting multiprocess port allocation test") - val spinnerFile = Files.newTemporaryFile().also { it.deleteOnExit() }.absolutePath - val process1 = buildJvmProcess(spinnerFile, 1) - val process2 = buildJvmProcess(spinnerFile, 2) + assumeFalse(System.getProperty("os.name").toLowerCase().contains("windows")) - println("Started child processes") + logger.info("Starting multiprocess port allocation test") + val spinnerFile = Files.newTemporaryFile().also { it.deleteOnExit() }.absolutePath + val iterCount = 8_000 // Default port range 10000-30000 since we will have 2 processes we want to make sure there is enough leg room + // If we rollover, we may well receive the ports that were already given to a different process + val process1 = buildJvmProcess(spinnerFile, 1, iterCount) + val process2 = buildJvmProcess(spinnerFile, 2, iterCount) - val processes = listOf(process1, process2) + logger.info("Started child processes") - val spinnerBackingFile = RandomAccessFile(spinnerFile, "rw") - println("Mapped spinner file") - val spinnerBuffer = spinnerBackingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 512) - println("Created spinner buffer") + val processes = listOf(process1, process2) - var timeWaited = 0L + val spinnerBackingFile = RandomAccessFile(spinnerFile, "rw") + logger.info("Mapped spinner file at: $spinnerFile") + val spinnerBuffer = spinnerBackingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 512) + logger.info("Created spinner buffer") - while (spinnerBuffer.getShort(1) != 10.toShort() && spinnerBuffer.getShort(2) != 10.toShort() && timeWaited < 60_000) { - println("Waiting to childProcesses to report back. waited ${timeWaited}ms") - Thread.sleep(1000) - timeWaited += 1000 - } + var timeWaited = 0L - //GO! - println("Instructing child processes to start allocating ports") - spinnerBuffer.putShort(0, 8) - println("Waiting for child processes to terminate") - processes.forEach { it.waitFor(1, TimeUnit.MINUTES) } - println("child processes terminated") - - val process1Output = process1.inputStream.reader().readLines().toSet() - val process2Output = process2.inputStream.reader().readLines().toSet() - - println("child process out captured") - - Assert.assertThat(process1Output.size, `is`(10_000)) - Assert.assertThat(process2Output.size, `is`(10_000)) - - //there should be no overlap between the outputs as each process should have been allocated a unique set of ports - Assert.assertThat(process1Output.intersect(process2Output), `is`(emptySet())) + while (spinnerBuffer.getShort(1) != 10.toShort() && spinnerBuffer.getShort(2) != 10.toShort() && timeWaited < 60_000) { + logger.info("Waiting to childProcesses to report back. waited ${timeWaited}ms") + Thread.sleep(1000) + timeWaited += 1000 } + + //GO! + logger.info("Instructing child processes to start allocating ports") + spinnerBuffer.putShort(0, 8) + logger.info("Waiting for child processes to terminate") + val terminationStatuses = processes.parallelStream().map { if(it.waitFor(1, TimeUnit.MINUTES)) "OK" else "STILL RUNNING" }.toList() + logger.info("child processes terminated: $terminationStatuses") + + fun List.setOfPorts() : Set { + // May include warnings when ports are busy + return map { Try.on { Integer.parseInt(it)} }.filter { it.isSuccess }.map { it.getOrThrow() }.toSet() + } + + val lines1 = process1.inputStream.reader().readLines() + val portsAllocated1 = lines1.setOfPorts() + val lines2 = process2.inputStream.reader().readLines() + val portsAllocated2 = lines2.setOfPorts() + + logger.info("child process out captured") + + Assert.assertThat(lines1.joinToString(), portsAllocated1.size, `is`(iterCount)) + Assert.assertThat(lines2.joinToString(), portsAllocated2.size, `is`(iterCount)) + + //there should be no overlap between the outputs as each process should have been allocated a unique set of ports + val intersect = portsAllocated1.intersect(portsAllocated2) + Assert.assertThat(intersect.joinToString(), intersect, `is`(emptySet())) } - private fun buildJvmProcess(spinnerFile: String, reportingIndex: Int): Process { + private fun buildJvmProcess(spinnerFile: String, reportingIndex: Int, iterCount: Int): Process { val separator = System.getProperty("file.separator") val classpath = System.getProperty("java.class.path") val path = (System.getProperty("java.home") @@ -87,11 +105,9 @@ class PortAllocationTest { classpath, PortAllocationRunner::class.java.name, spinnerFile, - reportingIndex.toString()) + reportingIndex.toString(), + iterCount.toString()) return processBuilder.start() } -} - - - +} \ No newline at end of file diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt index d76f1dfb73..b07ffa817d 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt @@ -40,6 +40,8 @@ import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.internal.stubs.CertificateStoreStubs import java.io.ByteArrayOutputStream +import java.io.IOException +import java.net.ServerSocket import java.nio.file.Files import java.nio.file.Path import java.security.KeyPair @@ -229,3 +231,18 @@ fun withTestSerializationEnvIfNotSet(block: () -> R): R { createTestSerializationEnv().asTestContextEnv { block() } } } + +/** + * Used to check if particular port is already bound i.e. not vacant + */ +fun isLocalPortBound(port: Int) : Boolean { + return try { + ServerSocket(port).use { + // Successful means that the port was vacant + false + } + } catch (e: IOException) { + // Failed to open server socket means that it is already bound by someone + true + } +} \ No newline at end of file