mirror of
https://github.com/corda/corda.git
synced 2025-03-13 07:54:32 +00:00
ENT-3658, ENT-3660: Add timeouts and hospital flow handling to CryptoServices (#5226)
This commit is contained in:
parent
2bfd2c8cb5
commit
6df142bf7a
@ -2,10 +2,14 @@ package net.corda.nodeapi.internal.cryptoservice
|
||||
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.crypto.SignatureScheme
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import org.bouncycastle.operator.ContentSigner
|
||||
import java.security.KeyPair
|
||||
import java.security.PublicKey
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
/**
|
||||
* Unlike [CryptoService] can only perform "read-only" operations but never create new key pairs.
|
||||
@ -48,7 +52,27 @@ interface SignOnlyCryptoService {
|
||||
* Fully-powered crypto service which can sign as well as create new key pairs.
|
||||
*/
|
||||
@DoNotImplement
|
||||
interface CryptoService : SignOnlyCryptoService {
|
||||
abstract class CryptoService(private val timeout: Duration? = null) : AutoCloseable, SignOnlyCryptoService {
|
||||
private val executor = Executors.newCachedThreadPool()
|
||||
|
||||
override fun close() {
|
||||
executor.shutdown()
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a timeout for the given [func].
|
||||
* @param timeout The time to wait on the function completing (in milliseconds)
|
||||
* @param func The call that we're waiting on
|
||||
* @return the return value of the function call
|
||||
* @throws TimedCryptoServiceException if we reach the timeout
|
||||
*/
|
||||
private fun <A> withTimeout(timeout: Duration?, func: () -> A) : A {
|
||||
try {
|
||||
return executor.submit(func).getOrThrow(timeout)
|
||||
} catch (e: TimeoutException) {
|
||||
throw TimedCryptoServiceException("Timed-out while waiting for ${timeout?.toMillis()} milliseconds")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate and store a new [KeyPair].
|
||||
@ -58,7 +82,39 @@ interface CryptoService : SignOnlyCryptoService {
|
||||
*
|
||||
* Returns the [PublicKey] of the generated [KeyPair].
|
||||
*/
|
||||
fun generateKeyPair(alias: String, scheme: SignatureScheme): PublicKey
|
||||
fun generateKeyPair(alias: String, scheme: SignatureScheme): PublicKey =
|
||||
withTimeout(timeout) { _generateKeyPair(alias, scheme) }
|
||||
protected abstract fun _generateKeyPair(alias: String, scheme: SignatureScheme): PublicKey
|
||||
|
||||
/** Check if this [CryptoService] has a private key entry for the input alias. */
|
||||
override fun containsKey(alias: String): Boolean =
|
||||
withTimeout(timeout) { _containsKey(alias) }
|
||||
protected abstract fun _containsKey(alias: String): Boolean
|
||||
|
||||
/**
|
||||
* Returns the [PublicKey] of the input alias or null if it doesn't exist.
|
||||
*/
|
||||
override fun getPublicKey(alias: String): PublicKey =
|
||||
withTimeout(timeout) { _getPublicKey(alias) }
|
||||
protected abstract fun _getPublicKey(alias: String): PublicKey
|
||||
|
||||
/**
|
||||
* Sign a [ByteArray] using the private key identified by the input alias.
|
||||
* Returns the signature bytes formatted according to the signature scheme.
|
||||
* The signAlgorithm if specified determines the signature scheme used for signing, if
|
||||
* not specified then the signature scheme is based on the private key scheme.
|
||||
*/
|
||||
override fun sign(alias: String, data: ByteArray, signAlgorithm: String?): ByteArray =
|
||||
withTimeout(timeout) { _sign(alias, data, signAlgorithm) }
|
||||
protected abstract fun _sign(alias: String, data: ByteArray, signAlgorithm: String?): ByteArray
|
||||
|
||||
/**
|
||||
* Returns [ContentSigner] for the key identified by the input alias.
|
||||
*/
|
||||
override fun getSigner(alias: String): ContentSigner =
|
||||
withTimeout(timeout) { _getSigner(alias) }
|
||||
protected abstract fun _getSigner(alias: String): ContentSigner
|
||||
}
|
||||
|
||||
open class CryptoServiceException(message: String?, cause: Throwable? = null) : Exception(message, cause)
|
||||
class TimedCryptoServiceException(message: String?, cause: Throwable? = null) : CryptoServiceException(message, cause)
|
@ -3,11 +3,16 @@ package net.corda.nodeapi.internal.cryptoservice
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||
import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
|
||||
class CryptoServiceFactory {
|
||||
companion object {
|
||||
fun makeCryptoService(cryptoServiceName: SupportedCryptoServices, legalName: CordaX500Name, signingCertificateStore: FileBasedCertificateStoreSupplier? = null, cryptoServiceConf: Path? = null): CryptoService {
|
||||
fun makeCryptoService(
|
||||
cryptoServiceName: SupportedCryptoServices,
|
||||
legalName: CordaX500Name,
|
||||
signingCertificateStore: FileBasedCertificateStoreSupplier? = null,
|
||||
timeout: Duration? = null
|
||||
): CryptoService {
|
||||
// The signing certificate store can be null for other services as only BCC requires is at the moment.
|
||||
if (cryptoServiceName != SupportedCryptoServices.BC_SIMPLE || signingCertificateStore == null) {
|
||||
throw IllegalArgumentException("Currently only BouncyCastle is used as a crypto service. A valid signing certificate store is required.")
|
||||
|
@ -23,13 +23,14 @@ import javax.security.auth.x500.X500Principal
|
||||
* and a Java KeyStore in the form of [CertificateStore] to store private keys.
|
||||
* This service reuses the [NodeConfiguration.signingCertificateStore] to store keys.
|
||||
*/
|
||||
class BCCryptoService(private val legalName: X500Principal, private val certificateStoreSupplier: CertificateStoreSupplier) : CryptoService {
|
||||
class BCCryptoService(private val legalName: X500Principal,
|
||||
private val certificateStoreSupplier: CertificateStoreSupplier) : CryptoService() {
|
||||
|
||||
// TODO check if keyStore exists.
|
||||
// TODO make it private when E2ETestKeyManagementService does not require direct access to the private key.
|
||||
var certificateStore: CertificateStore = certificateStoreSupplier.get(true)
|
||||
|
||||
override fun generateKeyPair(alias: String, scheme: SignatureScheme): PublicKey {
|
||||
override fun _generateKeyPair(alias: String, scheme: SignatureScheme): PublicKey {
|
||||
try {
|
||||
val keyPair = Crypto.generateKeyPair(scheme)
|
||||
importKey(alias, keyPair)
|
||||
@ -39,11 +40,11 @@ class BCCryptoService(private val legalName: X500Principal, private val certific
|
||||
}
|
||||
}
|
||||
|
||||
override fun containsKey(alias: String): Boolean {
|
||||
override fun _containsKey(alias: String): Boolean {
|
||||
return certificateStore.contains(alias)
|
||||
}
|
||||
|
||||
override fun getPublicKey(alias: String): PublicKey {
|
||||
override fun _getPublicKey(alias: String): PublicKey {
|
||||
try {
|
||||
return certificateStore.query { getPublicKey(alias) }
|
||||
} catch (e: Exception) {
|
||||
@ -51,8 +52,7 @@ class BCCryptoService(private val legalName: X500Principal, private val certific
|
||||
}
|
||||
}
|
||||
|
||||
@JvmOverloads
|
||||
override fun sign(alias: String, data: ByteArray, signAlgorithm: String?): ByteArray {
|
||||
override fun _sign(alias: String, data: ByteArray, signAlgorithm: String?): ByteArray {
|
||||
try {
|
||||
return when(signAlgorithm) {
|
||||
null -> Crypto.doSign(certificateStore.query { getPrivateKey(alias, certificateStore.entryPassword) }, data)
|
||||
@ -71,7 +71,7 @@ class BCCryptoService(private val legalName: X500Principal, private val certific
|
||||
return signature.sign()
|
||||
}
|
||||
|
||||
override fun getSigner(alias: String): ContentSigner {
|
||||
override fun _getSigner(alias: String): ContentSigner {
|
||||
try {
|
||||
val privateKey = certificateStore.query { getPrivateKey(alias, certificateStore.entryPassword) }
|
||||
val signatureScheme = Crypto.findSignatureScheme(privateKey)
|
||||
|
@ -0,0 +1,68 @@
|
||||
package net.corda.nodeapi.internal.cryptoservice
|
||||
|
||||
import net.corda.core.crypto.SignatureScheme
|
||||
import net.corda.core.internal.times
|
||||
import org.bouncycastle.operator.ContentSigner
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.security.PublicKey
|
||||
import java.time.Duration
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.expect
|
||||
|
||||
class CryptoServiceTest {
|
||||
|
||||
private val TEST_TIMEOUT = Duration.ofMillis(500)
|
||||
private var sleepTime = TEST_TIMEOUT
|
||||
private lateinit var stub: CryptoService
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
stub = CryptoServiceStub()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
stub.close()
|
||||
}
|
||||
|
||||
inner class CryptoServiceStub : CryptoService(TEST_TIMEOUT) {
|
||||
private fun sleeper() {
|
||||
Thread.sleep(sleepTime.toMillis())
|
||||
}
|
||||
override fun _generateKeyPair(alias: String, scheme: SignatureScheme): PublicKey {
|
||||
throw NotImplementedError("Not needed for this test")
|
||||
}
|
||||
|
||||
override fun _containsKey(alias: String): Boolean {
|
||||
sleeper()
|
||||
return true
|
||||
}
|
||||
|
||||
override fun _getPublicKey(alias: String): PublicKey {
|
||||
throw NotImplementedError("Not needed for this test")
|
||||
}
|
||||
|
||||
override fun _sign(alias: String, data: ByteArray, signAlgorithm: String?): ByteArray {
|
||||
throw NotImplementedError("Not needed for this test")
|
||||
}
|
||||
|
||||
override fun _getSigner(alias: String): ContentSigner {
|
||||
throw NotImplementedError("Not needed for this test")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `if no timeout is reached then correct value is returned`() {
|
||||
sleepTime = Duration.ZERO
|
||||
expect(true) { stub.containsKey("Test") }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `when timeout is reached the correct exception is thrown`() {
|
||||
sleepTime = TEST_TIMEOUT.times(2)
|
||||
assertFailsWith(TimedCryptoServiceException::class) { stub.containsKey("Test") }
|
||||
}
|
||||
}
|
||||
|
@ -111,7 +111,6 @@ import java.util.concurrent.TimeUnit.MINUTES
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import java.util.function.Consumer
|
||||
import javax.persistence.EntityManager
|
||||
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
|
||||
|
||||
/**
|
||||
* A base node implementation that can be customised either for production (with real implementations that do real
|
||||
@ -176,7 +175,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
|
||||
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
|
||||
val attachments = NodeAttachmentService(metricRegistry, cacheFactory, database, configuration.devMode).tokenize()
|
||||
val cryptoService = CryptoServiceFactory.makeCryptoService(SupportedCryptoServices.BC_SIMPLE, configuration.myLegalName, configuration.signingCertificateStore)
|
||||
val cryptoService = CryptoServiceFactory.makeCryptoService(
|
||||
SupportedCryptoServices.BC_SIMPLE,
|
||||
configuration.myLegalName,
|
||||
configuration.signingCertificateStore
|
||||
).closeOnStop()
|
||||
@Suppress("LeakingThis")
|
||||
val networkParametersStorage = makeNetworkParametersStorage()
|
||||
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(configuration.cordappDirectories), attachments).tokenize()
|
||||
|
@ -11,6 +11,7 @@ import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.FinalityHandler
|
||||
import net.corda.nodeapi.internal.cryptoservice.TimedCryptoServiceException
|
||||
import org.hibernate.exception.ConstraintViolationException
|
||||
import rx.subjects.PublishSubject
|
||||
import java.sql.SQLException
|
||||
@ -25,7 +26,7 @@ import kotlin.math.pow
|
||||
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
|
||||
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, CryptoServiceTimeout, FinalityDoctor)
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
@ -303,6 +304,24 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts [TimedFlow], keeping track of the number of retries and making sure it does not
|
||||
* exceed the limit specified by the [FlowTimeoutException].
|
||||
*/
|
||||
object CryptoServiceTimeout : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (newError is TimedCryptoServiceException) {
|
||||
if (history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
|
||||
Diagnosis.DISCHARGE
|
||||
} else {
|
||||
Diagnosis.OVERNIGHT_OBSERVATION
|
||||
}
|
||||
} else {
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object FinalityDoctor : Staff {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (currentState.flowLogic is FinalityHandler || isFromReceiveFinalityFlow(newError)) {
|
||||
|
@ -26,6 +26,7 @@ import net.corda.core.utilities.ProgressTracker.Change
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
import net.corda.nodeapi.internal.cryptoservice.TimedCryptoServiceException
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
@ -424,6 +425,25 @@ class FlowFrameworkTests {
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `timed out CryptoService is sent to the flow hospital`() {
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) {
|
||||
ExceptionFlow { TimedCryptoServiceException("We timed out!") }
|
||||
}
|
||||
|
||||
aliceNode.services.startFlow(ReceiveFlow(bob)).resultFuture
|
||||
|
||||
mockNet.runNetwork()
|
||||
|
||||
assertThat(receivedSessionMessages.filter { it.message is ExistingSessionMessage && it.message.payload is ErrorSessionMessage }).hasSize(1)
|
||||
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
|
||||
|
||||
// We expect three discharges and then overnight observation (in that order)
|
||||
assertThat(medicalRecords).hasSize(4)
|
||||
assertThat(medicalRecords.filter { it.outcome == StaffedFlowHospital.Outcome.DISCHARGE }).hasSize(3)
|
||||
assertThat(medicalRecords.last().outcome == StaffedFlowHospital.Outcome.OVERNIGHT_OBSERVATION)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `non-FlowException thrown on other side`() {
|
||||
val erroringFlowFuture = bobNode.registerCordappFlowFactory(ReceiveFlow::class) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user