mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Add message to uses of require(...) (#4192)
This commit is contained in:
committed by
Michele Sollecito
parent
51e66af2fd
commit
8f463c46a9
@ -39,7 +39,7 @@ private class MultiplexingReactiveArtemisConsumer(private val queueNames: Set<St
|
||||
override fun start() {
|
||||
|
||||
synchronized(this) {
|
||||
require(!startedFlag)
|
||||
require(!startedFlag) { "Must not be started" }
|
||||
connect()
|
||||
startedFlag = true
|
||||
}
|
||||
@ -59,7 +59,7 @@ private class MultiplexingReactiveArtemisConsumer(private val queueNames: Set<St
|
||||
override fun connect() {
|
||||
|
||||
synchronized(this) {
|
||||
require(!connected)
|
||||
require(!connected) { "Must not be connected" }
|
||||
queueNames.forEach { queue ->
|
||||
createSession().apply {
|
||||
start()
|
||||
|
@ -72,8 +72,9 @@ class ImmutableClassSerializer<T : Any>(val klass: KClass<T>) : Serializer<T>()
|
||||
val constructor = klass.primaryConstructor!!
|
||||
|
||||
init {
|
||||
// Verify that this class is immutable (all properties are final)
|
||||
require(props.none { it is KMutableProperty<*> })
|
||||
props.forEach {
|
||||
require(it !is KMutableProperty<*>) { "$it mutable property of class: ${klass} is unsupported" }
|
||||
}
|
||||
}
|
||||
|
||||
// Just a utility to help us catch cases where nodes are running out of sync versions.
|
||||
|
@ -265,8 +265,8 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ
|
||||
val users: List<User>? = null) {
|
||||
init {
|
||||
when (type) {
|
||||
AuthDataSourceType.INMEMORY -> require(users != null && connection == null)
|
||||
AuthDataSourceType.DB -> require(users == null && connection != null)
|
||||
AuthDataSourceType.INMEMORY -> require(users != null && connection == null) { "In-memory authentication must specify a user list, and must not configure a database" }
|
||||
AuthDataSourceType.DB -> require(users == null && connection != null) { "Database-backed authentication must not specify a user list, and must configure a database" }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -569,7 +569,7 @@ private class P2PMessagingConsumer(
|
||||
override fun start() {
|
||||
|
||||
synchronized(this) {
|
||||
require(!startedFlag)
|
||||
require(!startedFlag){"Must not already be started"}
|
||||
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe()
|
||||
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe()
|
||||
subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe()
|
||||
|
@ -253,7 +253,7 @@ class RPCServer(
|
||||
private fun bindingRemovalArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
lifeCycle.requireState(State.STARTED)
|
||||
val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
|
||||
require(notificationType == CoreNotificationType.BINDING_REMOVED.name)
|
||||
require(notificationType == CoreNotificationType.BINDING_REMOVED.name){"Message contained notification type of $notificationType instead of expected ${CoreNotificationType.BINDING_REMOVED.name}"}
|
||||
val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)
|
||||
log.warn("Detected RPC client disconnect on address $clientAddress, scheduling for reaping")
|
||||
invalidateClient(SimpleString(clientAddress))
|
||||
@ -262,7 +262,7 @@ class RPCServer(
|
||||
private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
lifeCycle.requireState(State.STARTED)
|
||||
val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
|
||||
require(notificationType == CoreNotificationType.BINDING_ADDED.name)
|
||||
require(notificationType == CoreNotificationType.BINDING_ADDED.name){"Message contained notification type of $notificationType instead of expected ${CoreNotificationType.BINDING_ADDED.name}"}
|
||||
val clientAddress = SimpleString(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME))
|
||||
log.debug("RPC client queue created on address $clientAddress")
|
||||
|
||||
|
@ -300,7 +300,7 @@ class NodeAttachmentService(
|
||||
private fun import(jar: InputStream, uploader: String?, filename: String?): AttachmentId {
|
||||
return database.transaction {
|
||||
withContractsInJar(jar) { contractClassNames, inputStream ->
|
||||
require(inputStream !is JarInputStream)
|
||||
require(inputStream !is JarInputStream){"Input stream must not be a JarInputStream"}
|
||||
|
||||
// Read the file into RAM and then calculate its hash. The attachment must fit into memory.
|
||||
// TODO: Switch to a two-phase insert so we can handle attachments larger than RAM.
|
||||
|
@ -1,7 +1,7 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer
|
||||
|
||||
/**
|
||||
* Quasar-compatible latch that may be incremented.
|
||||
@ -56,7 +56,7 @@ class CountUpDownLatch(initialValue: Int) {
|
||||
}
|
||||
|
||||
fun countDown(number: Int = 1) {
|
||||
require(number > 0)
|
||||
require(number > 0){"Number to count down by must be greater than 0"}
|
||||
sync.releaseShared(number)
|
||||
}
|
||||
|
||||
|
@ -193,7 +193,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
"Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation."
|
||||
}
|
||||
} else {
|
||||
require(contextTransactionOrNull == null)
|
||||
require(contextTransactionOrNull == null){"Transaction is marked as not present, but is not null"}
|
||||
}
|
||||
}
|
||||
|
||||
@ -388,7 +388,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = false
|
||||
)
|
||||
require(continuation == FlowContinuation.ProcessEvents)
|
||||
require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "}
|
||||
unpark(SERIALIZER_BLOCKER)
|
||||
}
|
||||
return uncheckedCast(processEventsUntilFlowIsResumed(
|
||||
|
@ -172,7 +172,7 @@ class SingleThreadedStateMachineManager(
|
||||
* @param allowedUnsuspendedFiberCount Optional parameter is used in some tests.
|
||||
*/
|
||||
override fun stop(allowedUnsuspendedFiberCount: Int) {
|
||||
require(allowedUnsuspendedFiberCount >= 0)
|
||||
require(allowedUnsuspendedFiberCount >= 0){"allowedUnsuspendedFiberCount must be greater than or equal to zero"}
|
||||
mutex.locked {
|
||||
if (stopping) throw IllegalStateException("Already stopping!")
|
||||
stopping = true
|
||||
@ -775,10 +775,10 @@ class SingleThreadedStateMachineManager(
|
||||
) {
|
||||
drainFlowEventQueue(flow)
|
||||
// final sanity checks
|
||||
require(lastState.pendingDeduplicationHandlers.isEmpty())
|
||||
require(lastState.isRemoved)
|
||||
require(lastState.checkpoint.subFlowStack.size == 1)
|
||||
require(flow.fiber.id !in sessionToFlow.values)
|
||||
require(lastState.pendingDeduplicationHandlers.isEmpty()) { "Flow cannot be removed until all pending deduplications have completed" }
|
||||
require(lastState.isRemoved) { "Flow must be in removable state before removal" }
|
||||
require(lastState.checkpoint.subFlowStack.size == 1) { "Checkpointed stack must be empty" }
|
||||
require(flow.fiber.id !in sessionToFlow.values) { "Flow fibre must not be needed by an existing session" }
|
||||
flow.resultFuture.set(removalReason.flowReturnValue)
|
||||
lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue)))
|
||||
|
@ -2,17 +2,11 @@ package net.corda.node.services.statemachine.interceptors
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.internal.checkpointDeserialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.statemachine.ActionExecutor
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.FlowFiber
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
import net.corda.node.services.statemachine.TransitionExecutor
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
@ -69,7 +63,7 @@ class FiberDeserializationChecker {
|
||||
private var foundUnrestorableFibers: Boolean = false
|
||||
|
||||
fun start(checkpointSerializationContext: CheckpointSerializationContext) {
|
||||
require(checkerThread == null)
|
||||
require(checkerThread == null){"Checking thread must not already be started"}
|
||||
checkerThread = thread(name = "FiberDeserializationChecker") {
|
||||
while (true) {
|
||||
val job = jobQueue.take()
|
||||
|
@ -372,7 +372,7 @@ class NodeRegistrationHelper(
|
||||
private class FixedPeriodLimitedRetrialStrategy(times: Int, private val period: Duration) : (Duration?) -> Duration? {
|
||||
|
||||
init {
|
||||
require(times > 0)
|
||||
require(times > 0){"Retry attempts must be larger than zero"}
|
||||
}
|
||||
|
||||
private var counter = times
|
||||
|
Reference in New Issue
Block a user