mirror of
https://github.com/corda/corda.git
synced 2024-12-20 05:28:21 +00:00
Make webserver reconnect to rebooted node. (#662)
This commit is contained in:
parent
8cb7c5eab1
commit
1df29ab06f
1
.gitignore
vendored
1
.gitignore
vendored
@ -65,6 +65,7 @@ lib/dokka.jar
|
|||||||
|
|
||||||
# IntelliJ
|
# IntelliJ
|
||||||
/out/
|
/out/
|
||||||
|
/classes/
|
||||||
|
|
||||||
# mpeltonen/sbt-idea plugin
|
# mpeltonen/sbt-idea plugin
|
||||||
.idea_modules/
|
.idea_modules/
|
||||||
|
@ -6,6 +6,7 @@ import com.esotericsoftware.kryo.io.Input
|
|||||||
import com.esotericsoftware.kryo.io.Output
|
import com.esotericsoftware.kryo.io.Output
|
||||||
import com.esotericsoftware.kryo.pool.KryoPool
|
import com.esotericsoftware.kryo.pool.KryoPool
|
||||||
import com.google.common.util.concurrent.Futures
|
import com.google.common.util.concurrent.Futures
|
||||||
|
import net.corda.core.getOrThrow
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.millis
|
import net.corda.core.millis
|
||||||
import net.corda.core.random63BitValue
|
import net.corda.core.random63BitValue
|
||||||
@ -21,6 +22,7 @@ import rx.subjects.UnicastSubject
|
|||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
|
||||||
class RPCStabilityTests {
|
class RPCStabilityTests {
|
||||||
@ -57,6 +59,31 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface ReconnectOps : RPCOps {
|
||||||
|
fun ping(): String
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `client reconnects to rebooted server`() {
|
||||||
|
rpcDriver {
|
||||||
|
val ops = object : ReconnectOps {
|
||||||
|
override val protocolVersion = 0
|
||||||
|
override fun ping() = "pong"
|
||||||
|
}
|
||||||
|
val serverFollower = shutdownManager.follower()
|
||||||
|
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().hostAndPort
|
||||||
|
serverFollower.unfollow()
|
||||||
|
val clientFollower = shutdownManager.follower()
|
||||||
|
val client = startRpcClient<ReconnectOps>(serverPort).getOrThrow()
|
||||||
|
clientFollower.unfollow()
|
||||||
|
assertEquals("pong", client.ping())
|
||||||
|
serverFollower.shutdown()
|
||||||
|
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
|
||||||
|
assertEquals("pong", client.ping())
|
||||||
|
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
interface TrackSubscriberOps : RPCOps {
|
interface TrackSubscriberOps : RPCOps {
|
||||||
fun subscribe(): Observable<Unit>
|
fun subscribe(): Observable<Unit>
|
||||||
}
|
}
|
||||||
|
@ -53,10 +53,12 @@ data class RPCClientConfiguration(
|
|||||||
val connectionRetryIntervalMultiplier: Double,
|
val connectionRetryIntervalMultiplier: Double,
|
||||||
/** Maximum retry interval */
|
/** Maximum retry interval */
|
||||||
val connectionMaxRetryInterval: Duration,
|
val connectionMaxRetryInterval: Duration,
|
||||||
|
val maxReconnectAttempts: Int,
|
||||||
/** Maximum file size */
|
/** Maximum file size */
|
||||||
val maxFileSize: Int
|
val maxFileSize: Int
|
||||||
) {
|
) {
|
||||||
companion object {
|
companion object {
|
||||||
|
val unlimitedReconnectAttempts = -1
|
||||||
@JvmStatic
|
@JvmStatic
|
||||||
val default = RPCClientConfiguration(
|
val default = RPCClientConfiguration(
|
||||||
minimumServerProtocolVersion = 0,
|
minimumServerProtocolVersion = 0,
|
||||||
@ -68,6 +70,7 @@ data class RPCClientConfiguration(
|
|||||||
connectionRetryInterval = 5.seconds,
|
connectionRetryInterval = 5.seconds,
|
||||||
connectionRetryIntervalMultiplier = 1.5,
|
connectionRetryIntervalMultiplier = 1.5,
|
||||||
connectionMaxRetryInterval = 3.minutes,
|
connectionMaxRetryInterval = 3.minutes,
|
||||||
|
maxReconnectAttempts = unlimitedReconnectAttempts,
|
||||||
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
||||||
maxFileSize = 10485760
|
maxFileSize = 10485760
|
||||||
)
|
)
|
||||||
@ -139,6 +142,7 @@ class RPCClient<I : RPCOps>(
|
|||||||
retryInterval = rpcConfiguration.connectionRetryInterval.toMillis()
|
retryInterval = rpcConfiguration.connectionRetryInterval.toMillis()
|
||||||
retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier
|
retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier
|
||||||
maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis()
|
maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis()
|
||||||
|
reconnectAttempts = rpcConfiguration.maxReconnectAttempts
|
||||||
minLargeMessageSize = rpcConfiguration.maxFileSize
|
minLargeMessageSize = rpcConfiguration.maxFileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,6 +381,28 @@ class ShutdownManager(private val executorService: ExecutorService) {
|
|||||||
}
|
}
|
||||||
registerShutdown(processShutdown)
|
registerShutdown(processShutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface Follower {
|
||||||
|
fun unfollow()
|
||||||
|
fun shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun follower() = object : Follower {
|
||||||
|
private val start = state.locked { registeredShutdowns.size }
|
||||||
|
private val end = AtomicInteger(start - 1)
|
||||||
|
override fun unfollow() = end.set(state.locked { registeredShutdowns.size })
|
||||||
|
override fun shutdown() = end.get().let { end ->
|
||||||
|
start > end && throw IllegalStateException("You haven't called unfollow.")
|
||||||
|
state.locked {
|
||||||
|
registeredShutdowns.subList(start, end).listIterator(end - start).run {
|
||||||
|
while (hasPrevious()) {
|
||||||
|
previous().getOrThrow().invoke()
|
||||||
|
set(Futures.immediateFuture {}) // Don't break other followers by doing a remove.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class DriverDSL(
|
class DriverDSL(
|
||||||
|
@ -108,6 +108,7 @@ interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface {
|
|||||||
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
|
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
|
||||||
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
|
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
|
||||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
||||||
|
customPort: HostAndPort? = null,
|
||||||
ops : I
|
ops : I
|
||||||
) : ListenableFuture<RpcServerHandle>
|
) : ListenableFuture<RpcServerHandle>
|
||||||
|
|
||||||
@ -340,9 +341,10 @@ data class RPCDriverDSL(
|
|||||||
maxFileSize: Int,
|
maxFileSize: Int,
|
||||||
maxBufferedBytesPerClient: Long,
|
maxBufferedBytesPerClient: Long,
|
||||||
configuration: RPCServerConfiguration,
|
configuration: RPCServerConfiguration,
|
||||||
|
customPort: HostAndPort?,
|
||||||
ops: I
|
ops: I
|
||||||
): ListenableFuture<RpcServerHandle> {
|
): ListenableFuture<RpcServerHandle> {
|
||||||
val hostAndPort = driverDSL.portAllocation.nextHostAndPort()
|
val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort()
|
||||||
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
|
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
|
||||||
return driverDSL.executorService.submit<RpcServerHandle> {
|
return driverDSL.executorService.submit<RpcServerHandle> {
|
||||||
val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)
|
val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)
|
||||||
|
Loading…
Reference in New Issue
Block a user