diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md
index 761ec63330..5a8f885e93 100644
--- a/CONTRIBUTORS.md
+++ b/CONTRIBUTORS.md
@@ -13,6 +13,7 @@ see changes to this list.
* agoldvarg
* Ajitha Thayaharan (BCS Technology International)
* Alberto Arri (R3)
+* Alex Karnezis
* amiracam
* Amol Pednekar
* Andras Slemmer (R3)
diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt
index 288f931c1e..1d13b53c66 100644
--- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt
+++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt
@@ -96,5 +96,5 @@ class FinalityFlowTests : WithFinality {
}
/** "Old" CorDapp which will force its node to keep its FinalityHandler enabled */
- private fun tokenOldCordapp() = cordappWithPackages("com.template").copy(targetPlatformVersion = 3)
+ private fun tokenOldCordapp() = cordappWithPackages().copy(targetPlatformVersion = 3)
}
diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt
index fb5e3bc34a..35a1639714 100644
--- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt
+++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt
@@ -116,7 +116,7 @@ class FlowIsKilledTest {
}
@Test(timeout = 300_000)
- fun `manually handle killed flows using checkForIsNotKilled`() {
+ fun `manually handle killed flows using checkFlowIsNotKilled`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
@@ -135,7 +135,7 @@ class FlowIsKilledTest {
}
@Test(timeout = 300_000)
- fun `manually handle killed flows using checkForIsNotKilled with lazy message`() {
+ fun `manually handle killed flows using checkFlowIsNotKilled with lazy message`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
diff --git a/detekt-baseline.xml b/detekt-baseline.xml
index 974e679f57..401dfbe681 100644
--- a/detekt-baseline.xml
+++ b/detekt-baseline.xml
@@ -9,23 +9,11 @@
ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step
ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_
ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step
- ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$AWAITING_REQUEST : Step
- ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$SENDING_TOP_UP_ISSUE_REQUEST : Step
ClassNaming:DeserializeNeedingCarpentryTests.kt$DeserializeNeedingCarpentryTests$outer
ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$FINALISING_TRANSACTION : Step
ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$GENERATING_TRANSACTION : Step
ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$SIGNING_TRANSACTION : Step
ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$VERIFYING_TRANSACTION : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$EXTRACTING_VAULT_STATES : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$ID_OTHER_NODES : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$OTHER_TX_COMPONENTS : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SENDING_AND_RECEIVING_DATA : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SIGS_GATHERING : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_BUILDING : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_SIGNING : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_VERIFICATION : Step
- ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$VERIFYING_SIGS : Step
- ClassNaming:FlowCookbook.kt$ResponderFlow.Companion$RECEIVING_AND_SENDING_DATA : Step
ClassNaming:FlowFrameworkTests.kt$ExceptionFlow$START_STEP : Step
ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$RECEIVED_STEP : Step
ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$START_STEP : Step
@@ -178,7 +166,6 @@
ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage)
ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection?
ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type
- ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test(timeout=300_000) fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`()
ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean
ComplexMethod:SchemaMigration.kt$SchemaMigration$private fun doRunMigration( run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null )
ComplexMethod:SendTransactionFlow.kt$DataVendingFlow$@Suspendable override fun call(): Void?
@@ -310,7 +297,6 @@
ForbiddenComment:CordappProviderImplTests.kt$CordappProviderImplTests.Companion$// TODO: Cordapp name should differ from the JAR name
ForbiddenComment:CoreFlowHandlers.kt$NotaryChangeHandler$// TODO: Right now all nodes will automatically approve the notary change. We need to figure out if stricter controls are necessary.
ForbiddenComment:CrossCashTest.kt$CrossCashState$// TODO: Alternative: We may possibly reduce the complexity of the search even further using some form of
- ForbiddenComment:Crypto.kt$Crypto$// TODO: Check if non-ECC keys satisfy params (i.e. approved/valid RSA modulus size).
ForbiddenComment:Crypto.kt$Crypto$// TODO: We currently use SHA256(seed) when retrying, but BIP32 just skips a counter (i) that results to an invalid key.
ForbiddenComment:Crypto.kt$Crypto$// TODO: change the val name to a more descriptive one as it's now confusing and looks like a Key type.
ForbiddenComment:Crypto.kt$Crypto$// TODO: change val name to SPHINCS256_SHA512. This will break backwards compatibility.
@@ -435,7 +421,6 @@
ForbiddenComment:RatesFixFlow.kt$RatesFixFlow.FixQueryFlow$// TODO: add deadline to receive
ForbiddenComment:ResolveTransactionsFlow.kt$ResolveTransactionsFlow$// TODO: This could be done in parallel with other fetches for extra speed.
ForbiddenComment:ResolveTransactionsFlowTest.kt$ResolveTransactionsFlowTest$// TODO: this operation should not require an explicit transaction
- ForbiddenComment:RestrictedEntityManager.kt$RestrictedEntityManager$// TODO: Figure out which other methods on EntityManager need to be blocked?
ForbiddenComment:ScheduledActivityObserver.kt$ScheduledActivityObserver.Companion$// TODO: Beware we are calling dynamically loaded contract code inside here.
ForbiddenComment:ScheduledFlowIntegrationTests.kt$ScheduledFlowIntegrationTests$// TODO: the queries below are not atomic so we need to allow enough time for the scheduler to finish. Would be better to query scheduler.
ForbiddenComment:SendTransactionFlow.kt$DataVendingFlow$// Security TODO: Check for abnormally large or malformed data requests
@@ -622,7 +607,6 @@
FunctionNaming:VersionExtractorTest.kt$VersionExtractorTest$@Test(timeout=300_000) fun version_header_extraction_present()
LargeClass:AbstractNode.kt$AbstractNode<S> : SingletonSerializeAsToken
LargeClass:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager : StateMachineManagerStateMachineManagerInternal
- LongMethod:FlowCookbook.kt$InitiatorFlow$@Suppress("RemoveExplicitTypeArguments") @Suspendable override fun call()
LongMethod:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection<Predicate>
LongParameterList:AMQPSerializer.kt$AMQPSerializer$(obj: Any, data: Data, type: Type, output: SerializationOutput, context: SerializationContext, debugIndent: Int = 0)
LongParameterList:AbstractCashSelection.kt$AbstractCashSelection$(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean)
@@ -634,7 +618,6 @@
LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)
LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)
LongParameterList:ArtemisRpcTests.kt$ArtemisRpcTests$(nodeSSlconfig: MutualSslConfiguration, brokerSslOptions: BrokerRpcSslOptions?, useSslForBroker: Boolean, clientSslOptions: ClientRpcSslOptions?, address: NetworkHostAndPort = ports.nextHostAndPort(), adminAddress: NetworkHostAndPort = ports.nextHostAndPort(), baseDirectory: Path = tempFolder.root.toPath() )
- LongParameterList:AttachmentsClassLoader.kt$AttachmentsClassLoaderBuilder$(attachments: List<Attachment>, params: NetworkParameters, txId: SecureHash, isAttachmentTrusted: (Attachment) -> Boolean, parent: ClassLoader = ClassLoader.getSystemClassLoader(), block: (ClassLoader) -> T)
LongParameterList:BFTSmart.kt$BFTSmart.Replica$( states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef> = emptyList() )
LongParameterList:BusinessCalendar.kt$BusinessCalendar.Companion$(startDate: LocalDate, period: Frequency, calendar: BusinessCalendar = EMPTY, dateRollConvention: DateRollConvention = DateRollConvention.Following, noOfAdditionalPeriods: Int = Integer.MAX_VALUE, endDate: LocalDate? = null, periodOffset: Int? = null)
LongParameterList:Cash.kt$Cash$(inputs: List<State>, outputs: List<State>, tx: LedgerTransaction, issueCommand: CommandWithParties<Commands.Issue>, currency: Currency, issuer: PartyAndReference)
@@ -732,7 +715,6 @@
LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuer: X500Principal, issuerPublicKey: PublicKey, issuerSigner: ContentSigner, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair<Date, Date>, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)
LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuer: X500Principal, issuerPublicKey: PublicKey, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair<Date, Date>, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)
LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuerCertificate: X509Certificate, issuerKeyPair: KeyPair, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair<Duration, Duration> = DEFAULT_VALIDITY_WINDOW, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)
- LongParameterList:internalAccessTestHelpers.kt$( inputs: List<StateAndRef<ContractState>>, outputs: List<TransactionState<ContractState>>, commands: List<CommandWithParties<CommandData>>, attachments: List<Attachment>, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt, networkParameters: NetworkParameters, references: List<StateAndRef<ContractState>>, componentGroups: List<ComponentGroup>? = null, serializedInputs: List<SerializedStateAndRef>? = null, serializedReferences: List<SerializedStateAndRef>? = null, isAttachmentTrusted: (Attachment) -> Boolean )
MagicNumber:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$128
MagicNumber:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme$128
MagicNumber:AMQPServer.kt$AMQPServer$100
@@ -753,7 +735,6 @@
MagicNumber:AttachmentDemo.kt$10009
MagicNumber:AttachmentDemo.kt$10010
MagicNumber:AttachmentTrustTable.kt$AttachmentTrustTable$3
- MagicNumber:AttachmentsClassLoader.kt$AttachmentsClassLoader$4
MagicNumber:AzureSmbVolume.kt$AzureSmbVolume$5000
MagicNumber:BFTSmart.kt$BFTSmart.Client$100
MagicNumber:BFTSmart.kt$BFTSmart.Replica.<no name provided>$20000
@@ -775,12 +756,6 @@
MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$16
MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$30.0
MagicNumber:ClassCarpenter.kt$ClassCarpenterImpl$3
- MagicNumber:ClientRpcExample.kt$ClientRpcExample$3
- MagicNumber:ClientRpcTutorial.kt$0.7
- MagicNumber:ClientRpcTutorial.kt$0.8
- MagicNumber:ClientRpcTutorial.kt$1000
- MagicNumber:ClientRpcTutorial.kt$10000
- MagicNumber:ClientRpcTutorial.kt$2000
MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$10
MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$30
MagicNumber:CompositeSignature.kt$CompositeSignature$1024
@@ -846,11 +821,6 @@
MagicNumber:ExchangeRateModel.kt$1.18
MagicNumber:ExchangeRateModel.kt$1.31
MagicNumber:FixingFlow.kt$FixingFlow.Fixer.<no name provided>$30
- MagicNumber:FlowCookbook.kt$InitiatorFlow$30
- MagicNumber:FlowCookbook.kt$InitiatorFlow$45
- MagicNumber:FlowCookbook.kt$InitiatorFlow$777
- MagicNumber:FlowCookbook.kt$ResponderFlow$99
- MagicNumber:FlowCookbook.kt$ResponderFlow.<no name provided>$777
MagicNumber:FlowLogic.kt$FlowLogic$300
MagicNumber:FlowLogic.kt$FlowLogic.Companion$5
MagicNumber:FlowMonitor.kt$FlowMonitor$1000
@@ -864,7 +834,6 @@
MagicNumber:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$10
MagicNumber:HttpUtils.kt$HttpUtils$5
MagicNumber:HttpUtils.kt$HttpUtils$60
- MagicNumber:IOUFlowResponder.kt$IOUFlowResponder.<no name provided>$100
MagicNumber:IRS.kt$RatePaymentEvent$360.0
MagicNumber:IRS.kt$RatePaymentEvent$4
MagicNumber:IRS.kt$RatePaymentEvent$8
@@ -1026,7 +995,6 @@
MagicNumber:NodeWebServer.kt$NodeWebServer$100
MagicNumber:NodeWebServer.kt$NodeWebServer$32768
MagicNumber:NodeWebServer.kt$NodeWebServer$40
- MagicNumber:NonValidatingNotaryFlow.kt$NonValidatingNotaryFlow$4
MagicNumber:Notarise.kt$10
MagicNumber:Notarise.kt$10003
MagicNumber:NullKeys.kt$NullKeys$32
@@ -1143,7 +1111,6 @@
MagicNumber:StandaloneShell.kt$StandaloneShell$7
MagicNumber:StateRevisionFlow.kt$StateRevisionFlow.Requester$30
MagicNumber:Structures.kt$PrivacySalt$32
- MagicNumber:TargetVersionDependentRules.kt$StateContractValidationEnforcementRule$4
MagicNumber:TestNodeInfoBuilder.kt$TestNodeInfoBuilder$1234
MagicNumber:TestUtils.kt$10000
MagicNumber:TestUtils.kt$30000
@@ -1154,7 +1121,6 @@
MagicNumber:TraderDemoClientApi.kt$TraderDemoClientApi$3
MagicNumber:TransactionBuilder.kt$TransactionBuilder$4
MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30
- MagicNumber:TransactionUtils.kt$4
MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3
MagicNumber:TransactionViewer.kt$TransactionViewer$15.0
MagicNumber:TransactionViewer.kt$TransactionViewer$20.0
@@ -1179,8 +1145,6 @@
MagicNumber:WebServer.kt$100.0
MagicNumber:WebServer.kt$WebServer$500
MagicNumber:WireTransaction.kt$WireTransaction$4
- MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitCompletionFlow$60
- MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitTradeApprovalFlow$60
MagicNumber:X509Utilities.kt$X509Utilities$3650
MagicNumber:errorAndTerminate.kt$10
MatchingDeclarationName:AMQPSerializerFactories.kt$net.corda.serialization.internal.amqp.AMQPSerializerFactories.kt
@@ -1225,7 +1189,6 @@
MatchingDeclarationName:TestConstants.kt$net.corda.testing.core.TestConstants.kt
MatchingDeclarationName:TestUtils.kt$net.corda.testing.core.TestUtils.kt
MatchingDeclarationName:TransactionTypes.kt$net.corda.explorer.model.TransactionTypes.kt
- MatchingDeclarationName:TutorialFlowStateMachines.kt$net.corda.docs.kotlin.tutorial.flowstatemachines.TutorialFlowStateMachines.kt
MatchingDeclarationName:Utils.kt$io.cryptoblk.core.Utils.kt
MatchingDeclarationName:VirtualCordapps.kt$net.corda.node.internal.cordapp.VirtualCordapps.kt
ModifierOrder:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected
@@ -1317,13 +1280,8 @@
SpreadOperator:FlowFrameworkTripartyTests.kt$FlowFrameworkTripartyTests$(*expected)
SpreadOperator:FlowLogicRefFactoryImpl.kt$FlowLogicRefFactoryImpl$(flowClass, *args)
SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeAClasses.toTypedArray())
- SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeBClasses.toTypedArray())
SpreadOperator:FlowTestsUtils.kt$(*allSessions)
SpreadOperator:FlowTestsUtils.kt$(session, *sessions)
- SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourInputStates.toTypedArray())
- SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())
- SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirInputStates.toTypedArray())
- SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())
SpreadOperator:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$(OpaqueBytes(request.encoded), "Platform-Version" to "${versionInfo.platformVersion}", "Client-Version" to versionInfo.releaseVersion, "Private-Network-Map" to (config.pnm?.toString() ?: ""), *(config.csrToken?.let { arrayOf(CENM_SUBMISSION_TOKEN to it) } ?: arrayOf()))
SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray())
SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())
@@ -1453,7 +1411,6 @@
ThrowsCount:StructuresTests.kt$AttachmentTest$@Test(timeout=300_000) fun `openAsJAR does not leak file handle if attachment has corrupted manifest`()
ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$ private fun getUniqueContractAttachmentsByContract(): Map<ContractClassName, ContractAttachment>
ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$// Using basic graph theory, a full cycle of encumbered (co-dependent) states should exist to achieve bi-directional // encumbrances. This property is important to ensure that no states involved in an encumbrance-relationship // can be spent on their own. Briefly, if any of the states is having more than one encumbrance references by // other states, a full cycle detection will fail. As a result, all of the encumbered states must be present // as "from" and "to" only once (or zero times if no encumbrance takes place). For instance, // a -> b // c -> b and a -> b // b -> a b -> c // do not satisfy the bi-directionality (full cycle) property. // // In the first example "b" appears twice in encumbrance ("to") list and "c" exists in the encumbered ("from") list only. // Due the above, one could consume "a" and "b" in the same transaction and then, because "b" is already consumed, "c" cannot be spent. // // Similarly, the second example does not form a full cycle because "a" and "c" exist in one of the lists only. // As a result, one can consume "b" and "c" in the same transactions, which will make "a" impossible to be spent. // // On other hand the following are valid constructions: // a -> b a -> c // b -> c and c -> b // c -> a b -> a // and form a full cycle, meaning that the bi-directionality property is satisfied. private fun checkBidirectionalOutputEncumbrances(statesAndEncumbrance: List<Pair<Int, Int>>)
- ThrowsCount:WireTransaction.kt$WireTransaction$private fun toLedgerTransactionInternal( resolveIdentity: (PublicKey) -> Party?, resolveAttachment: (SecureHash) -> Attachment?, resolveStateRefAsSerialized: (StateRef) -> SerializedBytes<TransactionState<ContractState>>?, resolveParameters: (SecureHash?) -> NetworkParameters?, isAttachmentTrusted: (Attachment) -> Boolean ): LedgerTransaction
ThrowsCount:WireTransaction.kt$WireTransaction.Companion$ @CordaInternal fun resolveStateRefBinaryComponent(stateRef: StateRef, services: ServicesForResolution): SerializedBytes<TransactionState<ContractState>>?
TooGenericExceptionCaught:AMQPChannelHandler.kt$AMQPChannelHandler$ex: Exception
TooGenericExceptionCaught:AMQPExceptions.kt$th: Throwable
@@ -1502,7 +1459,6 @@
TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl.Companion$th: Throwable
TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable
TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception
- TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception
TooGenericExceptionCaught:ErrorHandling.kt$ErrorHandling.CheckpointAfterErrorFlow$t: Throwable
TooGenericExceptionCaught:EventProcessor.kt$EventProcessor$ex: Exception
TooGenericExceptionCaught:Eventually.kt$e: Exception
@@ -1574,7 +1530,6 @@
TooGenericExceptionCaught:NotaryUtils.kt$e: Exception
TooGenericExceptionCaught:ObjectDiffer.kt$ObjectDiffer$throwable: Exception
TooGenericExceptionCaught:P2PMessagingClient.kt$P2PMessagingClient$e: Exception
- TooGenericExceptionCaught:PersistentIdentityMigrationNewTableTest.kt$PersistentIdentityMigrationNewTableTest$e: Exception
TooGenericExceptionCaught:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$e: Exception
TooGenericExceptionCaught:ProfileController.kt$ProfileController$e: Exception
TooGenericExceptionCaught:PropertyValidationTest.kt$PropertyValidationTest$e: Exception
@@ -1715,6 +1670,7 @@
TooManyFunctions:RPCApi.kt$net.corda.nodeapi.RPCApi.kt
TooManyFunctions:RPCClientProxyHandler.kt$RPCClientProxyHandler : InvocationHandler
TooManyFunctions:RPCServer.kt$RPCServer
+ TooManyFunctions:SSLHelper.kt$net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper.kt
TooManyFunctions:SerializationHelper.kt$net.corda.serialization.internal.amqp.SerializationHelper.kt
TooManyFunctions:ServiceHub.kt$ServiceHub : ServicesForResolution
TooManyFunctions:SignedTransaction.kt$SignedTransaction : TransactionWithSignatures
@@ -1742,8 +1698,6 @@
UnusedImports:Amount.kt$import net.corda.core.crypto.CompositeKey
UnusedImports:Amount.kt$import net.corda.core.identity.Party
UnusedImports:DummyLinearStateSchemaV1.kt$import net.corda.core.contracts.ContractState
- UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.core.internal.packageName
- UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.finance.schemas.CashSchemaV1
UnusedImports:InternalTestUtils.kt$import java.nio.file.Files
UnusedImports:InternalTestUtils.kt$import net.corda.nodeapi.internal.loadDevCaTrustStore
UnusedImports:NetworkMap.kt$import net.corda.core.node.NodeInfo
@@ -2012,8 +1966,6 @@
WildcardImport:CordaModule.kt$import net.corda.core.identity.*
WildcardImport:CordaModule.kt$import net.corda.core.transactions.*
WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.*
- WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*
- WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*
WildcardImport:CordaServiceTest.kt$import kotlin.test.*
WildcardImport:CordaViewModel.kt$import tornadofx.*
WildcardImport:Cordapp.kt$import net.corda.core.cordapp.Cordapp.Info.*
@@ -2031,10 +1983,6 @@
WildcardImport:CryptoSignUtils.kt$import net.corda.core.crypto.*
WildcardImport:CryptoUtilsTest.kt$import kotlin.test.*
WildcardImport:CustomCordapp.kt$import net.corda.core.internal.*
- WildcardImport:CustomVaultQuery.kt$import net.corda.core.flows.*
- WildcardImport:CustomVaultQuery.kt$import net.corda.core.utilities.*
- WildcardImport:CustomVaultQueryTest.kt$import net.corda.core.node.services.vault.*
- WildcardImport:CustomVaultQueryTest.kt$import net.corda.finance.*
WildcardImport:DBNetworkParametersStorage.kt$import javax.persistence.*
WildcardImport:DBRunnerExtension.kt$import org.junit.jupiter.api.extension.*
WildcardImport:DBTransactionStorage.kt$import javax.persistence.*
@@ -2057,7 +2005,6 @@
WildcardImport:DeserializeSimpleTypesTests.kt$import net.corda.serialization.internal.amqp.testutils.*
WildcardImport:DigitalSignatureWithCert.kt$import java.security.cert.*
WildcardImport:DistributedServiceTests.kt$import net.corda.testing.core.*
- WildcardImport:DoRemainingWorkTransition.kt$import net.corda.node.services.statemachine.*
WildcardImport:DockerInstantiator.kt$import com.github.dockerjava.api.model.*
WildcardImport:DriverDSLImpl.kt$import net.corda.testing.driver.*
WildcardImport:DummyContract.kt$import net.corda.core.contracts.*
@@ -2076,8 +2023,6 @@
WildcardImport:EvolutionSerializerFactoryTests.kt$import kotlin.test.*
WildcardImport:EvolutionSerializerFactoryTests.kt$import net.corda.serialization.internal.amqp.testutils.*
WildcardImport:Explorer.kt$import tornadofx.*
- WildcardImport:FiberDeserializationCheckingInterceptor.kt$import net.corda.node.services.statemachine.*
- WildcardImport:FinalityFlowMigration.kt$import net.corda.core.flows.*
WildcardImport:FinalityFlowTests.kt$import net.corda.testing.core.*
WildcardImport:FinalityFlowTests.kt$import net.corda.testing.node.internal.*
WildcardImport:FinalityHandlerTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*
@@ -2089,11 +2034,7 @@
WildcardImport:FlowCheckpointCordapp.kt$import net.corda.core.flows.*
WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.flows.*
WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.internal.*
- WildcardImport:FlowCookbook.kt$import net.corda.core.contracts.*
- WildcardImport:FlowCookbook.kt$import net.corda.core.flows.*
WildcardImport:FlowFrameworkPersistenceTests.kt$import net.corda.testing.node.internal.*
- WildcardImport:FlowFrameworkTests.kt$import net.corda.core.flows.*
- WildcardImport:FlowFrameworkTests.kt$import net.corda.testing.node.internal.*
WildcardImport:FlowFrameworkTripartyTests.kt$import net.corda.testing.node.internal.*
WildcardImport:FlowLogicRefFactoryImpl.kt$import net.corda.core.flows.*
WildcardImport:FlowMatchers.kt$import net.corda.coretesting.internal.matchers.*
@@ -2101,10 +2042,7 @@
WildcardImport:FlowRetryTest.kt$import net.corda.core.flows.*
WildcardImport:FlowStackSnapshotTest.kt$import net.corda.core.flows.*
WildcardImport:FlowStateMachine.kt$import net.corda.core.flows.*
- WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.flows.*
- WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.internal.*
WildcardImport:FlowsDrainingModeContentionTest.kt$import net.corda.core.flows.*
- WildcardImport:FxTransactionBuildTutorialTest.kt$import net.corda.finance.*
WildcardImport:GenericsTests.kt$import net.corda.serialization.internal.amqp.testutils.*
WildcardImport:Gui.kt$import tornadofx.*
WildcardImport:GuiUtilities.kt$import tornadofx.*
@@ -2121,8 +2059,6 @@
WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.EqualityComparisonOperator.*
WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.LikenessOperator.*
WildcardImport:HibernateStatistics.kt$import org.hibernate.stat.*
- WildcardImport:IOUContract.kt$import net.corda.core.contracts.*
- WildcardImport:IOUFlowResponder.kt$import net.corda.core.flows.*
WildcardImport:IRS.kt$import net.corda.core.contracts.*
WildcardImport:IRS.kt$import net.corda.finance.contracts.*
WildcardImport:IRSState.kt$import net.corda.core.contracts.*
@@ -2169,7 +2105,6 @@
WildcardImport:JarSignatureCollectorTest.kt$import net.corda.core.internal.*
WildcardImport:KeyStoreUtilities.kt$import java.security.*
WildcardImport:KeyStoreUtilities.kt$import net.corda.core.internal.*
- WildcardImport:KotlinIntegrationTestingTutorial.kt$import net.corda.testing.core.*
WildcardImport:Kryo.kt$import com.esotericsoftware.kryo.*
WildcardImport:Kryo.kt$import net.corda.core.transactions.*
WildcardImport:KryoStreamsTest.kt$import java.io.*
@@ -2420,8 +2355,6 @@
WildcardImport:SignedTransaction.kt$import net.corda.core.contracts.*
WildcardImport:SignedTransaction.kt$import net.corda.core.crypto.*
WildcardImport:SimpleMQClient.kt$import org.apache.activemq.artemis.api.core.client.*
- WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.core.internal.*
- WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.node.services.statemachine.interceptors.*
WildcardImport:SpringDriver.kt$import net.corda.testing.node.internal.*
WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.messaging.*
WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.node.services.vault.*
@@ -2473,8 +2406,6 @@
WildcardImport:TransactionViewer.kt$import net.corda.client.jfx.utils.*
WildcardImport:TransactionViewer.kt$import net.corda.core.contracts.*
WildcardImport:TransactionViewer.kt$import tornadofx.*
- WildcardImport:TutorialContract.kt$import net.corda.core.contracts.*
- WildcardImport:TutorialTestDSL.kt$import net.corda.testing.core.*
WildcardImport:TwoPartyDealFlow.kt$import net.corda.core.flows.*
WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.contracts.*
WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.flows.*
@@ -2502,7 +2433,6 @@
WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.core.flows.*
WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.testing.node.internal.*
WildcardImport:VaultFiller.kt$import net.corda.core.contracts.*
- WildcardImport:VaultFlowTest.kt$import net.corda.core.flows.*
WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.*
WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.*
WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.QueryCriteria.*
diff --git a/lib/quasar.jar b/lib/quasar.jar
index 789576ab93..6f1e8c2fca 100644
Binary files a/lib/quasar.jar and b/lib/quasar.jar differ
diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt
index 9efd7d10dc..233b19a712 100644
--- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt
+++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt
@@ -200,10 +200,7 @@ internal fun createClientSslHelper(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
- val sslContext = SSLContext.getInstance("TLS")
- val keyManagers = keyManagerFactory.keyManagers
- val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
- sslContext.init(keyManagers, trustManagers, newSecureRandom())
+ val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
@@ -239,10 +236,7 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
internal fun createServerSslHandler(keyStore: CertificateStore,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
- val sslContext = SSLContext.getInstance("TLS")
- val keyManagers = keyManagerFactory.keyManagers
- val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
- sslContext.init(keyManagers, trustManagers, newSecureRandom())
+ val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine()
sslEngine.useClientMode = false
sslEngine.needClientAuth = true
@@ -256,6 +250,15 @@ internal fun createServerSslHandler(keyStore: CertificateStore,
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
+fun createAndInitSslContext(keyManagerFactory: KeyManagerFactory, trustManagerFactory: TrustManagerFactory): SSLContext {
+ val sslContext = SSLContext.getInstance("TLS")
+ val keyManagers = keyManagerFactory.keyManagers
+ val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java)
+ .map { LoggingTrustManagerWrapper(it) }.toTypedArray()
+ sslContext.init(keyManagers, trustManagers, newSecureRandom())
+ return sslContext
+}
+
@VisibleForTesting
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore, revocationConfig: RevocationConfig): ManagerFactoryParameters {
val pkixParams = PKIXBuilderParameters(trustStore.value.internal, X509CertSelector())
diff --git a/node/src/integration-test/java/net/corda/node/amqp/NioSslClient.java b/node/src/integration-test/java/net/corda/node/amqp/NioSslClient.java
new file mode 100644
index 0000000000..3a1de2d0a1
--- /dev/null
+++ b/node/src/integration-test/java/net/corda/node/amqp/NioSslClient.java
@@ -0,0 +1,216 @@
+package net.corda.node.amqp;
+
+import net.corda.nodeapi.internal.protonwrapper.netty.SSLHelperKt;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * An SSL/TLS client that connects to a server using its IP address and port.
+ *
+ * After initialization of a {@link NioSslClient} object, {@link NioSslClient#connect()} should be called,
+ * in order to establish connection with the server.
+ *
+ * When the connection between the client and the object is established, {@link NioSslClient} provides
+ * a public write and read method, in order to communicate with its peer.
+ *
+ * @author Alex Karnezis
+ */
+public class NioSslClient extends NioSslPeer {
+
+ /**
+ * The remote address of the server this client is configured to connect to.
+ */
+ private final String remoteAddress;
+
+ /**
+ * The port of the server this client is configured to connect to.
+ */
+ private final int port;
+
+ /**
+ * The engine that will be used to encrypt/decrypt data between this client and the server.
+ */
+ private final SSLEngine engine;
+
+ /**
+ * The socket channel that will be used as the transport link between this client and the server.
+ */
+ private SocketChannel socketChannel;
+
+
+ /**
+ * Initiates the engine to run as a client using peer information, and allocates space for the
+ * buffers that will be used by the engine.
+ *
+ * @param remoteAddress The IP address of the peer.
+ * @param port The peer's port that will be used.
+ */
+ public NioSslClient(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, String remoteAddress, int port) {
+ this.remoteAddress = remoteAddress;
+ this.port = port;
+
+ SSLContext context = SSLHelperKt.createAndInitSslContext(keyManagerFactory, trustManagerFactory);
+ engine = context.createSSLEngine(remoteAddress, port);
+ engine.setUseClientMode(true);
+
+ SSLSession session = engine.getSession();
+ myAppData = ByteBuffer.allocate(1024);
+ myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+ peerAppData = ByteBuffer.allocate(1024);
+ peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+ }
+
+ /**
+ * Opens a socket channel to communicate with the configured server and tries to complete the handshake protocol.
+ *
+ * @return True if client established a connection with the server, false otherwise.
+ */
+ public boolean connect() throws Exception {
+ socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
+ socketChannel.connect(new InetSocketAddress(remoteAddress, port));
+ while (!socketChannel.finishConnect()) {
+ // can do something here...
+ }
+
+ engine.beginHandshake();
+ return doHandshake(socketChannel, engine);
+ }
+
+ /**
+ * Public method to send a message to the server.
+ *
+ * @param message - message to be sent to the server.
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ public void write(String message) throws IOException {
+ write(socketChannel, engine, message);
+ }
+
+ /**
+ * Implements the write method that sends a message to the server the client is connected to,
+ * but should not be called by the user, since socket channel and engine are inner class' variables.
+ * {@link NioSslClient#write(String)} should be called instead.
+ *
+ * @param message - message to be sent to the server.
+ * @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ @Override
+ protected void write(SocketChannel socketChannel, SSLEngine engine, String message) throws IOException {
+
+ log.debug("About to write to the server...");
+
+ myAppData.clear();
+ myAppData.put(message.getBytes());
+ myAppData.flip();
+ while (myAppData.hasRemaining()) {
+ // The loop has a meaning for (outgoing) messages larger than 16KB.
+ // Every wrap call will remove 16KB from the original message and send it to the remote peer.
+ myNetData.clear();
+ SSLEngineResult result = engine.wrap(myAppData, myNetData);
+ switch (result.getStatus()) {
+ case OK:
+ myNetData.flip();
+ while (myNetData.hasRemaining()) {
+ socketChannel.write(myNetData);
+ }
+ log.debug("Message sent to the server: " + message);
+ break;
+ case BUFFER_OVERFLOW:
+ myNetData = enlargePacketBuffer(engine, myNetData);
+ break;
+ case BUFFER_UNDERFLOW:
+ throw new SSLException("Buffer underflow occured after a wrap. I don't think we should ever get here.");
+ case CLOSED:
+ closeConnection(socketChannel, engine);
+ return;
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+ }
+ }
+
+ }
+
+ /**
+ * Public method to try to read from the server.
+ */
+ public void read() throws Exception {
+ read(socketChannel, engine);
+ }
+
+ /**
+ * Will wait for response from the remote peer, until it actually gets something.
+ * Uses {@link SocketChannel#read(ByteBuffer)}, which is non-blocking, and if
+ * it gets nothing from the peer, waits for {@code waitToReadMillis} and tries again.
+ *
+ * Just like {@link NioSslPeer#read(SocketChannel, SSLEngine)} it uses inner class' socket channel
+ * and engine and should not be used by the client. {@link NioSslClient#read()} should be called instead.
+ *
+ * @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
+ */
+ @Override
+ protected void read(SocketChannel socketChannel, SSLEngine engine) throws Exception {
+
+ log.debug("About to read from the server...");
+
+ peerNetData.clear();
+ int waitToReadMillis = 50;
+ boolean exitReadLoop = false;
+ while (!exitReadLoop) {
+ int bytesRead = socketChannel.read(peerNetData);
+ if (bytesRead > 0) {
+ peerNetData.flip();
+ while (peerNetData.hasRemaining()) {
+ peerAppData.clear();
+ SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
+ switch (result.getStatus()) {
+ case OK:
+ peerAppData.flip();
+ log.debug("Server response: " + peerAppDataAsString());
+ exitReadLoop = true;
+ break;
+ case BUFFER_OVERFLOW:
+ peerAppData = enlargeApplicationBuffer(engine, peerAppData);
+ break;
+ case BUFFER_UNDERFLOW:
+ peerNetData = handleBufferUnderflow(engine, peerNetData);
+ break;
+ case CLOSED:
+ closeConnection(socketChannel, engine);
+ return;
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+ }
+ }
+ } else if (bytesRead < 0) {
+ handleEndOfStream(socketChannel, engine);
+ return;
+ }
+ Thread.sleep(waitToReadMillis);
+ }
+ }
+
+ /**
+ * Should be called when the client wants to explicitly close the connection to the server.
+ *
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ public void shutdown() throws IOException {
+ log.debug("About to close connection with the server...");
+ closeConnection(socketChannel, engine);
+ executor.shutdown();
+ log.debug("Goodbye!");
+ }
+}
\ No newline at end of file
diff --git a/node/src/integration-test/java/net/corda/node/amqp/NioSslPeer.java b/node/src/integration-test/java/net/corda/node/amqp/NioSslPeer.java
new file mode 100644
index 0000000000..060f918846
--- /dev/null
+++ b/node/src/integration-test/java/net/corda/node/amqp/NioSslPeer.java
@@ -0,0 +1,329 @@
+package net.corda.node.amqp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class that represents an SSL/TLS peer, and can be extended to create a client or a server.
+ *
+ * It makes use of the JSSE framework, and specifically the {@link SSLEngine} logic, which
+ * is described by Oracle as "an advanced API, not appropriate for casual use", since
+ * it requires the user to implement much of the communication establishment procedure himself.
+ * More information about it can be found here: http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLEngine
+ *
+ * {@link NioSslPeer} implements the handshake protocol, required to establish a connection between two peers,
+ * which is common for both client and server and provides the abstract {@link NioSslPeer#read(SocketChannel, SSLEngine)} and
+ * {@link NioSslPeer#write(SocketChannel, SSLEngine, String)} methods, that need to be implemented by the specific SSL/TLS peer
+ * that is going to extend this class.
+ *
+ * @author Alex Karnezis
+ */
+public abstract class NioSslPeer {
+
+ /**
+ * Class' logger.
+ */
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ /**
+ * Will contain this peer's application data in plaintext, that will be later encrypted
+ * using {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} and sent to the other peer. This buffer can typically
+ * be of any size, as long as it is large enough to contain this peer's outgoing messages.
+ * If this peer tries to send a message bigger than buffer's capacity a {@link BufferOverflowException}
+ * will be thrown.
+ */
+ protected ByteBuffer myAppData;
+
+ /**
+ * Will contain this peer's encrypted data, that will be generated after {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)}
+ * is applied on {@link NioSslPeer#myAppData}. It should be initialized using {@link SSLSession#getPacketBufferSize()},
+ * which returns the size up to which, SSL/TLS packets will be generated from the engine under a session.
+ * All SSLEngine network buffers should be sized at least this large to avoid insufficient space problems when performing wrap and unwrap calls.
+ */
+ protected ByteBuffer myNetData;
+
+ /**
+ * Will contain the other peer's (decrypted) application data. It must be large enough to hold the application data
+ * from any peer. Can be initialized with {@link SSLSession#getApplicationBufferSize()} for an estimation
+ * of the other peer's application data and should be enlarged if this size is not enough.
+ */
+ protected ByteBuffer peerAppData;
+
+ /**
+ * Will contain the other peer's encrypted data. The SSL/TLS protocols specify that implementations should produce packets containing at most 16 KB of plaintext,
+ * so a buffer sized to this value should normally cause no capacity problems. However, some implementations violate the specification and generate large records up to 32 KB.
+ * If the {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} detects large inbound packets, the buffer sizes returned by SSLSession will be updated dynamically, so the this peer
+ * should check for overflow conditions and enlarge the buffer using the session's (updated) buffer size.
+ */
+ protected ByteBuffer peerNetData;
+
+ /**
+ * Will be used to execute tasks that may emerge during handshake in parallel with the server's main thread.
+ */
+ protected ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ protected abstract void read(SocketChannel socketChannel, SSLEngine engine) throws Exception;
+
+ protected abstract void write(SocketChannel socketChannel, SSLEngine engine, String message) throws Exception;
+
+ /**
+ * Implements the handshake protocol between two peers, required for the establishment of the SSL/TLS connection.
+ * During the handshake, encryption configuration information - such as the list of available cipher suites - will be exchanged
+ * and if the handshake is successful will lead to an established SSL/TLS session.
+ *
+ *
+ * A typical handshake will usually contain the following steps:
+ *
+ *
+ * - 1. wrap: ClientHello
+ * - 2. unwrap: ServerHello/Cert/ServerHelloDone
+ * - 3. wrap: ClientKeyExchange
+ * - 4. wrap: ChangeCipherSpec
+ * - 5. wrap: Finished
+ * - 6. unwrap: ChangeCipherSpec
+ * - 7. unwrap: Finished
+ *
+ *
+ * Handshake is also used during the end of the session, in order to properly close the connection between the two peers.
+ * A proper connection close will typically include the one peer sending a CLOSE message to another, and then wait for
+ * the other's CLOSE message to close the transport link. The other peer from his perspective would read a CLOSE message
+ * from his peer and then enter the handshake procedure to send his own CLOSE message as well.
+ *
+ * @param socketChannel - the socket channel that connects the two peers.
+ * @param engine - the engine that will be used for encryption/decryption of the data exchanged with the other peer.
+ * @return True if the connection handshake was successful or false if an error occurred.
+ * @throws IOException - if an error occurs during read/write to the socket channel.
+ */
+ protected boolean doHandshake(SocketChannel socketChannel, SSLEngine engine) throws IOException {
+
+ log.debug("About to do handshake...");
+
+ SSLEngineResult result;
+ HandshakeStatus handshakeStatus;
+
+ // NioSslPeer's fields myAppData and peerAppData are supposed to be large enough to hold all message data the peer
+ // will send and expects to receive from the other peer respectively. Since the messages to be exchanged will usually be less
+ // than 16KB long the capacity of these fields should also be smaller. Here we initialize these two local buffers
+ // to be used for the handshake, while keeping client's buffers at the same size.
+ int appBufferSize = engine.getSession().getApplicationBufferSize();
+ ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
+ ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
+ myNetData.clear();
+ peerNetData.clear();
+
+ handshakeStatus = engine.getHandshakeStatus();
+ while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+ switch (handshakeStatus) {
+ case NEED_UNWRAP:
+ if (socketChannel.read(peerNetData) < 0) {
+ if (engine.isInboundDone() && engine.isOutboundDone()) {
+ return false;
+ }
+ try {
+ engine.closeInbound();
+ } catch (SSLException e) {
+ log.error("This engine was forced to close inbound, without having received the proper SSL/TLS close " +
+ "notification message from the peer, due to end of stream.", e);
+ }
+ engine.closeOutbound();
+ // After closeOutbound the engine will be set to WRAP state, in order to try to send a close message to the client.
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ }
+ peerNetData.flip();
+ try {
+ result = engine.unwrap(peerNetData, peerAppData);
+ peerNetData.compact();
+ handshakeStatus = result.getHandshakeStatus();
+ } catch (SSLException sslException) {
+ log.error("A problem was encountered while processing the data that caused the SSLEngine to abort." +
+ " Will try to properly close connection...", sslException);
+ engine.closeOutbound();
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ }
+ switch (result.getStatus()) {
+ case OK:
+ break;
+ case BUFFER_OVERFLOW:
+ // Will occur when peerAppData's capacity is smaller than the data derived from peerNetData's unwrap.
+ peerAppData = enlargeApplicationBuffer(engine, peerAppData);
+ break;
+ case BUFFER_UNDERFLOW:
+ // Will occur either when no data was read from the peer or when the peerNetData buffer was too small to hold all peer's data.
+ peerNetData = handleBufferUnderflow(engine, peerNetData);
+ break;
+ case CLOSED:
+ if (engine.isOutboundDone()) {
+ return false;
+ } else {
+ engine.closeOutbound();
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ }
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+ }
+ break;
+ case NEED_WRAP:
+ myNetData.clear();
+ try {
+ result = engine.wrap(myAppData, myNetData);
+ handshakeStatus = result.getHandshakeStatus();
+ } catch (SSLException sslException) {
+ log.error("A problem was encountered while processing the data that caused the SSLEngine to abort." +
+ "Will try to properly close connection...", sslException);
+ engine.closeOutbound();
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ }
+ switch (result.getStatus()) {
+ case OK :
+ myNetData.flip();
+ while (myNetData.hasRemaining()) {
+ socketChannel.write(myNetData);
+ }
+ break;
+ case BUFFER_OVERFLOW:
+ // Will occur if there is not enough space in myNetData buffer to write all the data that would be generated by the method wrap.
+ // Since myNetData is set to session's packet size we should not get to this point because SSLEngine is supposed
+ // to produce messages smaller or equal to that, but a general handling would be the following:
+ myNetData = enlargePacketBuffer(engine, myNetData);
+ break;
+ case BUFFER_UNDERFLOW:
+ throw new SSLException("Buffer underflow occurred after a wrap. I don't think we should ever get here.");
+ case CLOSED:
+ try {
+ myNetData.flip();
+ while (myNetData.hasRemaining()) {
+ socketChannel.write(myNetData);
+ }
+ // At this point the handshake status will probably be NEED_UNWRAP so we make sure that peerNetData is clear to read.
+ peerNetData.clear();
+ } catch (Exception e) {
+ log.error("Failed to send server's CLOSE message due to socket channel's failure.");
+ handshakeStatus = engine.getHandshakeStatus();
+ }
+ break;
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+ }
+ break;
+ case NEED_TASK:
+ Runnable task;
+ while ((task = engine.getDelegatedTask()) != null) {
+ executor.execute(task);
+ }
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
+ }
+ }
+ log.debug("Handshake status: " + handshakeStatus);
+
+ return true;
+
+ }
+
+ protected ByteBuffer enlargePacketBuffer(SSLEngine engine, ByteBuffer buffer) {
+ return enlargeBuffer(buffer, engine.getSession().getPacketBufferSize());
+ }
+
+ protected ByteBuffer enlargeApplicationBuffer(SSLEngine engine, ByteBuffer buffer) {
+ return enlargeBuffer(buffer, engine.getSession().getApplicationBufferSize());
+ }
+
+ /**
+ * Compares sessionProposedCapacity with buffer's capacity. If buffer's capacity is smaller,
+ * returns a buffer with the proposed capacity. If it's equal or larger, returns a buffer
+ * with capacity twice the size of the initial one.
+ *
+ * @param buffer - the buffer to be enlarged.
+ * @param sessionProposedCapacity - the minimum size of the new buffer, proposed by {@link SSLSession}.
+ * @return A new buffer with a larger capacity.
+ */
+ protected ByteBuffer enlargeBuffer(ByteBuffer buffer, int sessionProposedCapacity) {
+ if (sessionProposedCapacity > buffer.capacity()) {
+ buffer = ByteBuffer.allocate(sessionProposedCapacity);
+ } else {
+ buffer = ByteBuffer.allocate(buffer.capacity() * 2);
+ }
+ return buffer;
+ }
+
+ /**
+ * Handles {@link SSLEngineResult.Status#BUFFER_UNDERFLOW}. Will check if the buffer is already filled, and if there is no space problem
+ * will return the same buffer, so the client tries to read again. If the buffer is already filled will try to enlarge the buffer either to
+ * session's proposed size or to a larger capacity. A buffer underflow can happen only after an unwrap, so the buffer will always be a
+ * peerNetData buffer.
+ *
+ * @param buffer - will always be peerNetData buffer.
+ * @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
+ * @return The same buffer if there is no space problem or a new buffer with the same data but more space.
+ */
+ protected ByteBuffer handleBufferUnderflow(SSLEngine engine, ByteBuffer buffer) {
+ if (engine.getSession().getPacketBufferSize() < buffer.limit()) {
+ return buffer;
+ } else {
+ ByteBuffer replaceBuffer = enlargePacketBuffer(engine, buffer);
+ buffer.flip();
+ replaceBuffer.put(buffer);
+ return replaceBuffer;
+ }
+ }
+
+ /**
+ * This method should be called when this peer wants to explicitly close the connection
+ * or when a close message has arrived from the other peer, in order to provide an orderly shutdown.
+ *
+ * It first calls {@link SSLEngine#closeOutbound()} which prepares this peer to send its own close message and
+ * sets {@link SSLEngine} to the NEED_WRAP
state. Then, it delegates the exchange of close messages
+ * to the handshake method and finally, it closes socket channel.
+ *
+ * @param socketChannel - the transport link used between the two peers.
+ * @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ protected void closeConnection(SocketChannel socketChannel, SSLEngine engine) throws IOException {
+ engine.closeOutbound();
+ doHandshake(socketChannel, engine);
+ socketChannel.close();
+ }
+
+ /**
+ * In addition to orderly shutdowns, an unorderly shutdown may occur, when the transport link (socket channel)
+ * is severed before close messages are exchanged. This may happen by getting an -1 or {@link IOException}
+ * when trying to read from the socket channel, or an {@link IOException} when trying to write to it.
+ * In both cases {@link SSLEngine#closeInbound()} should be called and then try to follow the standard procedure.
+ *
+ * @param socketChannel - the transport link used between the two peers.
+ * @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ protected void handleEndOfStream(SocketChannel socketChannel, SSLEngine engine) throws IOException {
+ try {
+ engine.closeInbound();
+ } catch (Exception e) {
+ log.error("This engine was forced to close inbound, without having received the proper SSL/TLS close notification message from the peer, due to end of stream.");
+ }
+ closeConnection(socketChannel, engine);
+ }
+
+ protected String peerAppDataAsString() {
+ return new String(Arrays.copyOf(peerAppData.array(), peerAppData.limit()));
+ }
+}
\ No newline at end of file
diff --git a/node/src/integration-test/java/net/corda/node/amqp/NioSslServer.java b/node/src/integration-test/java/net/corda/node/amqp/NioSslServer.java
new file mode 100644
index 0000000000..d316c62c0b
--- /dev/null
+++ b/node/src/integration-test/java/net/corda/node/amqp/NioSslServer.java
@@ -0,0 +1,263 @@
+package net.corda.node.amqp;
+
+import net.corda.nodeapi.internal.protonwrapper.netty.SSLHelperKt;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.time.Duration;
+import java.util.Iterator;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * An SSL/TLS server, that will listen to a specific address and port and serve SSL/TLS connections
+ * compatible with the protocol it applies.
+ *
+ * After initialization {@link NioSslServer#start()} should be called so the server starts to listen to
+ * new connection requests. At this point, start is blocking, so, in order to be able to gracefully stop
+ * the server, a {@link Runnable} containing a server object should be created. This runnable should
+ * start the server in its run method and also provide a stop method, which will call {@link NioSslServer#stop()}.
+ *
+ * NioSslServer makes use of Java NIO, and specifically listens to new connection requests with a {@link ServerSocketChannel}, which will
+ * create new {@link SocketChannel}s and a {@link Selector} which serves all the connections in one thread.
+ *
+ * @author Alex Karnezis
+ */
+public class NioSslServer extends NioSslPeer {
+
+ private final Duration handshakeDelay;
+ /**
+ * Declares if the server is active to serve and create new connections.
+ */
+ private boolean active;
+
+ /**
+ * The context will be initialized with a specific SSL/TLS protocol and will then be used
+ * to create {@link SSLEngine} classes for each new connection that arrives to the server.
+ */
+ private final SSLContext context;
+
+ /**
+ * A part of Java NIO that will be used to serve all connections to the server in one thread.
+ */
+ private final Selector selector;
+
+
+ /**
+ * Server is designed to apply an SSL/TLS protocol and listen to an IP address and port.
+ *
+ * @param hostAddress - the IP address this server will listen to.
+ * @param port - the port this server will listen to.
+ * @param handshakeDelay - if not [null] specifies for how long the handshake should be delayed
+ */
+ public NioSslServer(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, String hostAddress, int port,
+ Duration handshakeDelay) throws Exception {
+
+ context = SSLHelperKt.createAndInitSslContext(keyManagerFactory, trustManagerFactory);
+
+ SSLSession dummySession = context.createSSLEngine().getSession();
+ myAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
+ myNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
+ peerAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
+ peerNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
+ dummySession.invalidate();
+
+ selector = SelectorProvider.provider().openSelector();
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ serverSocketChannel.socket().bind(new InetSocketAddress(hostAddress, port));
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ this.handshakeDelay = handshakeDelay;
+
+ active = true;
+
+ }
+
+ /**
+ * Should be called in order the server to start listening to new connections.
+ * This method will run in a loop as long as the server is active. In order to stop the server
+ * you should use {@link NioSslServer#stop()} which will set it to inactive state
+ * and also wake up the listener, which may be in blocking select() state.
+ */
+ public void start() throws Exception {
+
+ log.debug("Initialized and waiting for new connections...");
+
+ while (isActive()) {
+ selector.select();
+ Iterator selectedKeys = selector.selectedKeys().iterator();
+ while (selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+ if (!key.isValid()) {
+ continue;
+ }
+ if (key.isAcceptable()) {
+ accept(key);
+ } else if (key.isReadable()) {
+ read((SocketChannel) key.channel(), (SSLEngine) key.attachment());
+ }
+ }
+ }
+
+ log.debug("Goodbye!");
+
+ }
+
+ /**
+ * Sets the server to an inactive state, in order to exit the reading loop in {@link NioSslServer#start()}
+ * and also wakes up the selector, which may be in select() blocking state.
+ */
+ public void stop() {
+ log.debug("Will now close server...");
+ active = false;
+ executor.shutdown();
+ selector.wakeup();
+ }
+
+ /**
+ * Will be called after a new connection request arrives to the server. Creates the {@link SocketChannel} that will
+ * be used as the network layer link, and the {@link SSLEngine} that will encrypt and decrypt all the data
+ * that will be exchanged during the session with this specific client.
+ *
+ * @param key - the key dedicated to the {@link ServerSocketChannel} used by the server to listen to new connection requests.
+ */
+ private void accept(SelectionKey key) throws Exception {
+
+ log.debug("New connection request!");
+
+ SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
+ socketChannel.configureBlocking(false);
+
+ SSLEngine engine = context.createSSLEngine();
+ engine.setUseClientMode(false);
+ // Demand client to present its certificate
+ engine.setNeedClientAuth(true);
+ engine.beginHandshake();
+
+ if (handshakeDelay != null) {
+ log.info("Deliberately sleeping during handshake for: " + handshakeDelay);
+ Thread.sleep(handshakeDelay.toMillis());
+ }
+
+ if (doHandshake(socketChannel, engine)) {
+ socketChannel.register(selector, SelectionKey.OP_READ, engine);
+ } else {
+ socketChannel.close();
+ log.debug("Connection closed due to handshake failure.");
+ }
+ }
+
+ /**
+ * Will be called by the selector when the specific socket channel has data to be read.
+ * As soon as the server reads these data, it will call {@link NioSslServer#write(SocketChannel, SSLEngine, String)}
+ * to send back a trivial response.
+ *
+ * @param socketChannel - the transport link used between the two peers.
+ * @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ @Override
+ protected void read(SocketChannel socketChannel, SSLEngine engine) throws IOException {
+
+ log.debug("About to read from a client...");
+
+ peerNetData.clear();
+ int bytesRead = socketChannel.read(peerNetData);
+ if (bytesRead > 0) {
+ peerNetData.flip();
+ while (peerNetData.hasRemaining()) {
+ peerAppData.clear();
+ SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
+ switch (result.getStatus()) {
+ case OK:
+ peerAppData.flip();
+ log.debug("Incoming message: " + peerAppDataAsString());
+ break;
+ case BUFFER_OVERFLOW:
+ peerAppData = enlargeApplicationBuffer(engine, peerAppData);
+ break;
+ case BUFFER_UNDERFLOW:
+ peerNetData = handleBufferUnderflow(engine, peerNetData);
+ break;
+ case CLOSED:
+ log.debug("Client wants to close connection...");
+ closeConnection(socketChannel, engine);
+ log.debug("Goodbye client!");
+ return;
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+ }
+ }
+
+ write(socketChannel, engine, "Hello! I am your server!");
+
+ } else if (bytesRead < 0) {
+ log.error("Received end of stream. Will try to close connection with client...");
+ handleEndOfStream(socketChannel, engine);
+ log.debug("Goodbye client!");
+ }
+ }
+
+ /**
+ * Will send a message back to a client.
+ *
+ * @param message - the message to be sent.
+ * @throws IOException if an I/O error occurs to the socket channel.
+ */
+ @Override
+ protected void write(SocketChannel socketChannel, SSLEngine engine, String message) throws IOException {
+
+ log.debug("About to write to a client...");
+
+ myAppData.clear();
+ myAppData.put(message.getBytes());
+ myAppData.flip();
+ while (myAppData.hasRemaining()) {
+ // The loop has a meaning for (outgoing) messages larger than 16KB.
+ // Every wrap call will remove 16KB from the original message and send it to the remote peer.
+ myNetData.clear();
+ SSLEngineResult result = engine.wrap(myAppData, myNetData);
+ switch (result.getStatus()) {
+ case OK:
+ myNetData.flip();
+ while (myNetData.hasRemaining()) {
+ socketChannel.write(myNetData);
+ }
+ log.debug("Message sent to the client: " + message);
+ break;
+ case BUFFER_OVERFLOW:
+ myNetData = enlargePacketBuffer(engine, myNetData);
+ break;
+ case BUFFER_UNDERFLOW:
+ throw new SSLException("Buffer underflow occurred after a wrap. I don't think we should ever get here.");
+ case CLOSED:
+ closeConnection(socketChannel, engine);
+ return;
+ default:
+ throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+ }
+ }
+ }
+
+ /**
+ * Determines if the the server is active or not.
+ *
+ * @return if the server is active or not.
+ */
+ public boolean isActive() {
+ return active;
+ }
+}
\ No newline at end of file
diff --git a/node/src/integration-test/java/net/corda/node/amqp/ServerThread.java b/node/src/integration-test/java/net/corda/node/amqp/ServerThread.java
new file mode 100644
index 0000000000..ee108893fb
--- /dev/null
+++ b/node/src/integration-test/java/net/corda/node/amqp/ServerThread.java
@@ -0,0 +1,69 @@
+package net.corda.node.amqp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.time.Duration;
+
+/**
+ * This class provides a runnable that can be used to initialize a {@link NioSslServer} thread.
+ *
+ * Run starts the server, which will start listening to the configured IP address and port for
+ * new SSL/TLS connections and serve the ones already connected to it.
+ *
+ * Also a stop method is provided in order to gracefully close the server and stop the thread.
+ *
+ * @author Alex Karnezis
+ */
+public class ServerThread implements AutoCloseable {
+
+ private final static Logger log = LoggerFactory.getLogger(ServerThread.class);
+
+ private static final long JOIN_TIMEOUT_MS = 10000;
+
+ private final NioSslServer server;
+
+ private Thread serverThread;
+
+ public ServerThread(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, int port) throws Exception {
+ this(keyManagerFactory, trustManagerFactory, port, null);
+ }
+
+ public ServerThread(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, int port, @Nullable Duration handshakeDelay) throws Exception {
+ server = new NioSslServer(keyManagerFactory, trustManagerFactory, "localhost", port, handshakeDelay);
+ }
+
+ public void start() {
+
+ Runnable serverRunnable = () -> {
+ try {
+ server.start();
+ } catch (Exception e) {
+ log.error("Exception starting server", e);
+ }
+ };
+
+ serverThread = new Thread(serverRunnable, this.getClass().getSimpleName() + "-ServerThread");
+ serverThread.start();
+ }
+
+ /**
+ * Should be called in order to gracefully stop the server.
+ */
+ public void stop() throws InterruptedException {
+ server.stop();
+ serverThread.join(JOIN_TIMEOUT_MS);
+ }
+
+ public boolean isActive() {
+ return server.isActive();
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop();
+ }
+}
\ No newline at end of file
diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPClientSslErrorsTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPClientSslErrorsTest.kt
new file mode 100644
index 0000000000..27a8a28189
--- /dev/null
+++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPClientSslErrorsTest.kt
@@ -0,0 +1,208 @@
+package net.corda.node.amqp
+
+import com.nhaarman.mockito_kotlin.doReturn
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import net.corda.core.internal.div
+import net.corda.core.toFuture
+import net.corda.core.utilities.NetworkHostAndPort
+import net.corda.core.utilities.contextLogger
+import net.corda.core.utilities.seconds
+import net.corda.coretesting.internal.stubs.CertificateStoreStubs
+import net.corda.node.services.config.NodeConfiguration
+import net.corda.node.services.config.configureWithDevSSLCertificate
+import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
+import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
+import net.corda.nodeapi.internal.protonwrapper.netty.init
+import net.corda.nodeapi.internal.protonwrapper.netty.initialiseTrustStoreAndEnableCrlChecking
+import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
+import net.corda.testing.core.ALICE_NAME
+import net.corda.testing.core.BOB_NAME
+import net.corda.testing.driver.internal.incrementalPortAllocation
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import javax.net.ssl.KeyManagerFactory
+import javax.net.ssl.TrustManagerFactory
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+/**
+ * This test verifies some edge case scenarios like handshake timeouts when [AMQPClient] connected to the server
+ *
+ * In order to have control over handshake internals a simple TLS server is created which may have a configurable handshake delay.
+ */
+@RunWith(Parameterized::class)
+class AMQPClientSslErrorsTest(@Suppress("unused") private val iteration: Int) {
+
+ companion object {
+ private const val MAX_MESSAGE_SIZE = 10 * 1024
+ private val log = contextLogger()
+
+ @JvmStatic
+ @Parameterized.Parameters(name = "iteration = {0}")
+ fun iterations(): Iterable> {
+ // It is possible to change this value to a greater number
+ // to ensure that the test is not flaking when executed on CI
+ val repsCount = 1
+ return (1..repsCount).map { arrayOf(it) }
+ }
+ }
+
+ @Rule
+ @JvmField
+ val temporaryFolder = TemporaryFolder()
+
+ private val portAllocation = incrementalPortAllocation()
+
+ private lateinit var serverKeyManagerFactory: KeyManagerFactory
+ private lateinit var serverTrustManagerFactory: TrustManagerFactory
+
+ private lateinit var clientKeyManagerFactory: KeyManagerFactory
+ private lateinit var clientTrustManagerFactory: TrustManagerFactory
+
+ private lateinit var clientAmqpConfig: AMQPConfiguration
+
+ @Before
+ fun setup() {
+ setupServerCertificates()
+ setupClientCertificates()
+ }
+
+ private fun setupServerCertificates() {
+ val baseDirectory = temporaryFolder.root.toPath() / "server"
+ val certificatesDirectory = baseDirectory / "certificates"
+ val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
+ val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
+ val serverConfig = mock().also {
+ doReturn(baseDirectory).whenever(it).baseDirectory
+ doReturn(certificatesDirectory).whenever(it).certificatesDirectory
+ doReturn(ALICE_NAME).whenever(it).myLegalName
+ doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
+ doReturn(signingCertificateStore).whenever(it).signingCertificateStore
+ }
+ serverConfig.configureWithDevSSLCertificate()
+ val keyStore = serverConfig.p2pSslOptions.keyStore.get()
+ val serverAmqpConfig = object : AMQPConfiguration {
+ override val keyStore = keyStore
+ override val trustStore = serverConfig.p2pSslOptions.trustStore.get()
+ override val revocationConfig = true.toRevocationConfig()
+ override val maxMessageSize: Int = MAX_MESSAGE_SIZE
+ }
+
+ serverKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
+ serverTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
+
+ serverKeyManagerFactory.init(keyStore)
+ serverTrustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(serverAmqpConfig.trustStore, serverAmqpConfig.revocationConfig))
+ }
+
+ private fun setupClientCertificates() {
+ val baseDirectory = temporaryFolder.root.toPath() / "client"
+ val certificatesDirectory = baseDirectory / "certificates"
+ val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
+ val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
+ val clientConfig = mock().also {
+ doReturn(baseDirectory).whenever(it).baseDirectory
+ doReturn(certificatesDirectory).whenever(it).certificatesDirectory
+ doReturn(BOB_NAME).whenever(it).myLegalName
+ doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
+ doReturn(signingCertificateStore).whenever(it).signingCertificateStore
+ doReturn(true).whenever(it).crlCheckSoftFail
+ }
+ clientConfig.configureWithDevSSLCertificate()
+ //val nodeCert = (signingCertificateStore to p2pSslConfiguration).recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint)
+ val keyStore = clientConfig.p2pSslOptions.keyStore.get()
+
+ clientAmqpConfig = object : AMQPConfiguration {
+ override val keyStore = keyStore
+ override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
+ override val maxMessageSize: Int = MAX_MESSAGE_SIZE
+ override val sslHandshakeTimeout: Long = 3000
+ }
+
+ clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
+ clientTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
+
+ clientKeyManagerFactory.init(keyStore)
+ clientTrustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(clientAmqpConfig.trustStore, clientAmqpConfig.revocationConfig))
+ }
+
+ @Test(timeout = 300_000)
+ fun trivialClientServerExchange() {
+ val serverPort = portAllocation.nextPort()
+ val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort).also { it.start() }
+
+ //System.setProperty("javax.net.debug", "all");
+
+ serverThread.use {
+ val client = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
+ client.connect()
+ client.write("Hello! I am a client!")
+ client.read()
+ client.shutdown()
+
+ val client2 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
+ val client3 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
+ val client4 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
+
+ client2.connect()
+ client2.write("Hello! I am another client!")
+ client2.read()
+ client2.shutdown()
+
+ client3.connect()
+ client4.connect()
+ client3.write("Hello from client3!!!")
+ client4.write("Hello from client4!!!")
+ client3.read()
+ client4.read()
+ client3.shutdown()
+ client4.shutdown()
+ }
+ assertFalse(serverThread.isActive)
+ }
+
+ @Test(timeout = 300_000)
+ fun amqpClientServerConnect() {
+ val serverPort = portAllocation.nextPort()
+ val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort)
+ .also { it.start() }
+ serverThread.use {
+ val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), clientAmqpConfig)
+
+ amqpClient.use {
+ val clientConnected = amqpClient.onConnection.toFuture()
+ amqpClient.start()
+ val clientConnect = clientConnected.get()
+ assertTrue(clientConnect.connected)
+
+ log.info("Confirmed connected")
+ }
+ }
+ assertFalse(serverThread.isActive)
+ }
+
+ @Test(timeout = 300_000)
+ fun amqpClientServerHandshakeTimeout() {
+ val serverPort = portAllocation.nextPort()
+ val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort, 5.seconds)
+ .also { it.start() }
+ serverThread.use {
+ val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), clientAmqpConfig)
+
+ amqpClient.use {
+ val clientConnected = amqpClient.onConnection.toFuture()
+ amqpClient.start()
+ val clientConnect = clientConnected.get()
+ assertFalse(clientConnect.connected)
+ // Not a badCert, but a timeout during handshake
+ assertFalse(clientConnect.badCert)
+ }
+ }
+ assertFalse(serverThread.isActive)
+ }
+}
\ No newline at end of file
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt
index 34a1b672f2..30b242e879 100644
--- a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt
@@ -1,8 +1,10 @@
package net.corda.node.services.network
import net.corda.core.crypto.random63BitValue
+import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.messaging.ParametersUpdateInfo
+import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
@@ -11,6 +13,7 @@ import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
+import net.corda.testing.common.internal.addNotary
import net.corda.testing.common.internal.eventually
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
@@ -74,7 +77,6 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
)
}
-
@Before
fun start() {
networkMapServer = NetworkMapServer(cacheTimeout, portAllocation.nextHostAndPort())
@@ -142,6 +144,106 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
}
}
+ @Test(timeout = 300_000)
+ fun `Can hotload parameters if the notary changes`() {
+ internalDriver(
+ portAllocation = portAllocation,
+ compatibilityZone = compatibilityZone,
+ notarySpecs = emptyList(),
+ allowHibernateToManageAppSchema = false
+ ) {
+
+ val notary: Party = TestIdentity.fresh("test notary").party
+ val oldParams = networkMapServer.networkParameters
+ val paramsWithNewNotary = oldParams.copy(
+ epoch = 3,
+ modifiedTime = Instant.ofEpochMilli(random63BitValue())).addNotary(notary)
+
+ val alice = startNodeAndRunFlagDay(paramsWithNewNotary)
+ eventually { assertEquals(paramsWithNewNotary, alice.rpc.networkParameters) }
+
+ }
+ }
+
+ @Test(timeout = 300_000)
+ fun `If only the notary changes but parameters were not accepted, the node will still shut down on the flag day`() {
+ internalDriver(
+ portAllocation = portAllocation,
+ compatibilityZone = compatibilityZone,
+ notarySpecs = emptyList(),
+ allowHibernateToManageAppSchema = false
+ ) {
+
+ val notary: Party = TestIdentity.fresh("test notary").party
+ val oldParams = networkMapServer.networkParameters
+ val paramsWithNewNotary = oldParams.copy(
+ epoch = 3,
+ modifiedTime = Instant.ofEpochMilli(random63BitValue())).addNotary(notary)
+
+ val alice = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() as NodeHandleInternal
+ networkMapServer.scheduleParametersUpdate(paramsWithNewNotary, "Next parameters", Instant.ofEpochMilli(random63BitValue()))
+ // Wait for network map client to poll for the next update.
+ Thread.sleep(cacheTimeout.toMillis() * 2)
+ networkMapServer.advertiseNewParameters()
+ eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
+
+ }
+ }
+
+ @Test(timeout = 300_000)
+ fun `Can not hotload parameters if non-hotloadable parameter changes and the node will shut down`() {
+ internalDriver(
+ portAllocation = portAllocation,
+ compatibilityZone = compatibilityZone,
+ notarySpecs = emptyList(),
+ allowHibernateToManageAppSchema = false
+ ) {
+
+ val oldParams = networkMapServer.networkParameters
+ val paramsWithUpdatedMaxMessageSize = oldParams.copy(
+ epoch = 3,
+ modifiedTime = Instant.ofEpochMilli(random63BitValue()),
+ maxMessageSize = oldParams.maxMessageSize + 1)
+ val alice = startNodeAndRunFlagDay(paramsWithUpdatedMaxMessageSize)
+ eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
+ }
+ }
+
+ @Test(timeout = 300_000)
+ fun `Can not hotload parameters if notary and a non-hotloadable parameter changes and the node will shut down`() {
+ internalDriver(
+ portAllocation = portAllocation,
+ compatibilityZone = compatibilityZone,
+ notarySpecs = emptyList(),
+ allowHibernateToManageAppSchema = false
+ ) {
+
+ val oldParams = networkMapServer.networkParameters
+ val notary: Party = TestIdentity.fresh("test notary").party
+ val paramsWithUpdatedMaxMessageSizeAndNotary = oldParams.copy(
+ epoch = 3,
+ modifiedTime = Instant.ofEpochMilli(random63BitValue()),
+ maxMessageSize = oldParams.maxMessageSize + 1).addNotary(notary)
+ val alice = startNodeAndRunFlagDay(paramsWithUpdatedMaxMessageSizeAndNotary)
+ eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
+ }
+ }
+
+ private fun DriverDSLImpl.startNodeAndRunFlagDay(newParams: NetworkParameters): NodeHandleInternal {
+
+ val alice = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() as NodeHandleInternal
+ val nextHash = newParams.serialize().hash
+
+ networkMapServer.scheduleParametersUpdate(newParams, "Next parameters", Instant.ofEpochMilli(random63BitValue()))
+ // Wait for network map client to poll for the next update.
+ Thread.sleep(cacheTimeout.toMillis() * 2)
+ alice.rpc.acceptNewNetworkParameters(nextHash)
+ assertEquals(nextHash, networkMapServer.latestParametersAccepted(alice.nodeInfo.legalIdentities.first().owningKey))
+ assertEquals(networkMapServer.networkParameters, alice.rpc.networkParameters)
+ networkMapServer.advertiseNewParameters()
+ return alice
+ }
+
@Test(timeout=300_000)
fun `nodes process additions and removals from the network map correctly (and also download the network parameters)`() {
internalDriver(
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt
index e2fc51282b..a6b04d6441 100644
--- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt
@@ -33,13 +33,19 @@ class FlowVersioningTest : NodeBasedTest() {
private class PretendInitiatingCoreFlow(val initiatedParty: Party) : FlowLogic>() {
@Suspendable
override fun call(): Pair {
- // Execute receive() outside of the Pair constructor to avoid Kotlin/Quasar instrumentation bug.
val session = initiateFlow(initiatedParty)
- val alicePlatformVersionAccordingToBob = session.receive().unwrap { it }
- return Pair(
- alicePlatformVersionAccordingToBob,
- session.getCounterpartyFlowInfo().flowVersion
- )
+ return try {
+ // Get counterparty flow info before we receive Alice's data, to ensure the flow is still open
+ val bobPlatformVersionAccordingToAlice = session.getCounterpartyFlowInfo().flowVersion
+ // Execute receive() outside of the Pair constructor to avoid Kotlin/Quasar instrumentation bug.
+ val alicePlatformVersionAccordingToBob = session.receive().unwrap { it }
+ Pair(
+ alicePlatformVersionAccordingToBob,
+ bobPlatformVersionAccordingToAlice
+ )
+ } finally {
+ session.close()
+ }
}
}
diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
index cca55dd407..1ff856ddef 100644
--- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
+++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
@@ -109,6 +109,8 @@ import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapClient
import net.corda.node.services.network.NetworkMapUpdater
+import net.corda.node.services.network.NetworkParameterUpdateListener
+import net.corda.node.services.network.NetworkParametersHotloader
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
@@ -524,6 +526,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
Node.printBasicNodeInfo("CorDapp schemas synchronised")
}
+ @Suppress("ComplexMethod")
open fun start(): S {
check(started == null) { "Node has already been started" }
@@ -549,7 +552,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
startShell()
networkMapClient?.start(trustRoot)
- val (netParams, signedNetParams) = NetworkParametersReader(trustRoot, networkMapClient, configuration.baseDirectory).read()
+ val networkParametersReader = NetworkParametersReader(trustRoot, networkMapClient, configuration.baseDirectory)
+ val (netParams, signedNetParams) = networkParametersReader.read()
log.info("Loaded network parameters: $netParams")
check(netParams.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
@@ -570,13 +574,27 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (nodeInfo, signedNodeInfo) = nodeInfoAndSigned
identityService.ourNames = nodeInfo.legalIdentities.map { it.name }.toSet()
services.start(nodeInfo, netParams)
+
+ val networkParametersHotloader = if (networkMapClient == null) {
+ null
+ } else {
+ NetworkParametersHotloader(networkMapClient, trustRoot, netParams, networkParametersReader, networkParametersStorage).also {
+ it.addNotaryUpdateListener(networkMapCache)
+ it.addNotaryUpdateListener(identityService)
+ it.addNetworkParametersChangedListeners(services)
+ it.addNetworkParametersChangedListeners(networkMapUpdater)
+ }
+ }
+
networkMapUpdater.start(
trustRoot,
signedNetParams.raw.hash,
signedNodeInfo,
netParams,
keyManagementService,
- configuration.networkParameterAcceptanceSettings!!)
+ configuration.networkParameterAcceptanceSettings!!,
+ networkParametersHotloader)
+
try {
startMessagingService(rpcOps, nodeInfo, myNotaryIdentity, netParams)
} catch (e: Exception) {
@@ -1221,7 +1239,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
- inner class ServiceHubInternalImpl : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution {
+ inner class ServiceHubInternalImpl : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution, NetworkParameterUpdateListener {
override val rpcFlows = ArrayList>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
override val identityService: IdentityService get() = this@AbstractNode.identityService
@@ -1254,6 +1272,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache get() = this@AbstractNode.attachmentsClassLoaderCache
+ @Volatile
private lateinit var _networkParameters: NetworkParameters
override val networkParameters: NetworkParameters get() = _networkParameters
@@ -1340,6 +1359,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val ledgerTransaction = servicesForResolution.specialise(ltx)
return verifierFactoryService.apply(ledgerTransaction)
}
+
+ override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
+ this._networkParameters = networkParameters
+ }
}
}
diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt
index ac91bdec68..d9d8906861 100644
--- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt
@@ -12,6 +12,7 @@ import net.corda.core.internal.CertRole
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.hash
import net.corda.core.internal.toSet
+import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
@@ -19,6 +20,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.keys.BasicHSMKeyManagementService
+import net.corda.node.services.network.NotaryUpdateListener
import net.corda.node.services.persistence.PublicKeyHashToExternalId
import net.corda.node.services.persistence.WritablePublicKeyToOwningIdentityCache
import net.corda.node.utilities.AppendOnlyPersistentMap
@@ -53,7 +55,8 @@ import kotlin.streams.toList
* cached for efficient lookup.
*/
@ThreadSafe
-class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), IdentityServiceInternal {
+@Suppress("TooManyFunctions")
+class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), IdentityServiceInternal, NotaryUpdateListener {
companion object {
private val log = contextLogger()
@@ -197,7 +200,8 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
override val trustAnchor: TrustAnchor get() = _trustAnchor
/** Stores notary identities obtained from the network parameters, for which we don't need to perform a database lookup. */
- private val notaryIdentityCache = HashSet()
+ @Volatile
+ private var notaryIdentityCache = HashSet()
// CordaPersistence is not a c'tor parameter to work around the cyclic dependency
lateinit var database: CordaPersistence
@@ -453,4 +457,8 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
keys
}
}
+
+ override fun onNewNotaryList(notaries: List) {
+ notaryIdentityCache = HashSet(notaries.map { it.identity })
+ }
}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt
index 712efce341..b36f34d632 100644
--- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt
+++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt
@@ -1,6 +1,7 @@
package net.corda.node.services.network
import com.google.common.util.concurrent.MoreExecutors
+import net.corda.cliutils.ExitCodes
import net.corda.core.CordaRuntimeException
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
@@ -62,7 +63,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val baseDirectory: Path,
private val extraNetworkMapKeys: List,
private val networkParametersStorage: NetworkParametersStorage
-) : AutoCloseable {
+) : AutoCloseable, NetworkParameterUpdateListener {
companion object {
private val logger = contextLogger()
private val defaultRetryInterval = 1.minutes
@@ -77,12 +78,15 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcherSubscription = AtomicReference()
private var autoAcceptNetworkParameters: Boolean = true
private lateinit var trustRoot: X509Certificate
+ @Volatile
private lateinit var currentParametersHash: SecureHash
private lateinit var ourNodeInfo: SignedNodeInfo
private lateinit var ourNodeInfoHash: SecureHash
+
private lateinit var networkParameters: NetworkParameters
private lateinit var keyManagementService: KeyManagementService
private lateinit var excludedAutoAcceptNetworkParameters: Set
+ private var networkParametersHotloader: NetworkParametersHotloader? = null
override fun close() {
fileWatcherSubscription.updateAndGet { subscription ->
@@ -95,13 +99,15 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
}
MoreExecutors.shutdownAndAwaitTermination(networkMapPoller, 50, TimeUnit.SECONDS)
}
-
+ @Suppress("LongParameterList")
fun start(trustRoot: X509Certificate,
currentParametersHash: SecureHash,
ourNodeInfo: SignedNodeInfo,
networkParameters: NetworkParameters,
keyManagementService: KeyManagementService,
- networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings) {
+ networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings,
+ networkParametersHotloader: NetworkParametersHotloader?
+ ) {
fileWatcherSubscription.updateAndGet { subscription ->
require(subscription == null) { "Should not call this method twice" }
this.trustRoot = trustRoot
@@ -112,6 +118,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
this.keyManagementService = keyManagementService
this.autoAcceptNetworkParameters = networkParameterAcceptanceSettings.autoAcceptEnabled
this.excludedAutoAcceptNetworkParameters = networkParameterAcceptanceSettings.excludedAutoAcceptableParameters
+ this.networkParametersHotloader = networkParametersHotloader
+
val autoAcceptNetworkParametersNames = autoAcceptablePropertyNames - excludedAutoAcceptNetworkParameters
if (autoAcceptNetworkParameters && autoAcceptNetworkParametersNames.isNotEmpty()) {
@@ -180,7 +188,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
val additionalHashes = getPrivateNetworkNodeHashes(version)
val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet()
if (currentParametersHash != globalNetworkMap.networkParameterHash) {
- exitOnParametersMismatch(globalNetworkMap)
+ hotloadOrExitOnParametersMismatch(globalNetworkMap)
}
// Calculate any nodes that are now gone and remove _only_ them from the cache
// NOTE: We won't remove them until after the add/update cycle as only then will we definitely know which nodes are no longer
@@ -276,22 +284,26 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
}
}
- private fun exitOnParametersMismatch(networkMap: NetworkMap) {
+ private fun hotloadOrExitOnParametersMismatch(networkMap: NetworkMap) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
- val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null
- val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
- logger.info("Flag day occurred. Network map switched to the new network parameters: " +
- "${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
- 0
- } else {
- // TODO This needs special handling (node omitted update process or didn't accept new parameters)
+ val newParameterHash = networkMap.networkParameterHash
+ val nodeAcceptedNewParameters = updatesFile.exists() && newParameterHash == updatesFile.readObject().raw.hash
+
+ if (!nodeAcceptedNewParameters) {
logger.error(
"""Node is using network parameters with hash $currentParametersHash but the network map is advertising ${networkMap.networkParameterHash}.
To resolve this mismatch, and move to the current parameters, delete the $NETWORK_PARAMS_FILE_NAME file from the node's directory and restart.
The node will shutdown now.""")
- 1
+ exitProcess(ExitCodes.FAILURE)
}
- exitProcess(exitCode)
+
+ val hotloadSucceeded = networkParametersHotloader!!.attemptHotload(newParameterHash)
+ if (!hotloadSucceeded) {
+ logger.info("Flag day occurred. Network map switched to the new network parameters: " +
+ "${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
+ exitProcess(ExitCodes.SUCCESS)
+ }
+ currentParametersHash = newParameterHash
}
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
@@ -340,6 +352,10 @@ The node will shutdown now.""")
throw OutdatedNetworkParameterHashException(parametersHash, newParametersHash)
}
}
+
+ override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
+ this.networkParameters = networkParameters
+ }
}
private val memberPropertyPartition = NetworkParameters::class.declaredMemberProperties.partition { it.isAutoAcceptable() }
@@ -360,8 +376,8 @@ internal fun NetworkParameters.canAutoAccept(newNetworkParameters: NetworkParame
private fun KProperty1.isAutoAcceptable(): Boolean = findAnnotation() != null
-private fun NetworkParameters.valueChanged(newNetworkParameters: NetworkParameters, getter: Method?): Boolean {
+internal fun NetworkParameters.valueChanged(newNetworkParameters: NetworkParameters, getter: Method?): Boolean {
val propertyValue = getter?.invoke(this)
val newPropertyValue = getter?.invoke(newNetworkParameters)
return propertyValue != newPropertyValue
-}
\ No newline at end of file
+}
diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkParameterUpdateListener.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkParameterUpdateListener.kt
new file mode 100644
index 0000000000..bce669e919
--- /dev/null
+++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkParameterUpdateListener.kt
@@ -0,0 +1,11 @@
+package net.corda.node.services.network
+
+import net.corda.core.node.NetworkParameters
+
+/**
+ * When network parameters change on a flag day, onNewNetworkParameters will be invoked with the new parameters.
+ * Used inside {@link net.corda.node.services.network.NetworkParametersUpdater}
+ */
+interface NetworkParameterUpdateListener {
+ fun onNewNetworkParameters(networkParameters: NetworkParameters)
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkParametersHotloader.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkParametersHotloader.kt
new file mode 100644
index 0000000000..5268e4f641
--- /dev/null
+++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkParametersHotloader.kt
@@ -0,0 +1,88 @@
+package net.corda.node.services.network
+
+import net.corda.core.crypto.SecureHash
+import net.corda.core.internal.NetworkParametersStorage
+import net.corda.core.node.NetworkParameters
+import net.corda.core.node.NotaryInfo
+import net.corda.core.utilities.contextLogger
+import net.corda.node.internal.NetworkParametersReader
+import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert
+import java.security.cert.X509Certificate
+import kotlin.reflect.full.declaredMemberProperties
+import kotlin.reflect.jvm.javaGetter
+
+/**
+ * This class is responsible for hotloading new network parameters or shut down the node if it's not possible.
+ * Currently only hotloading notary changes are supported.
+ */
+class NetworkParametersHotloader(private val networkMapClient: NetworkMapClient,
+ private val trustRoot: X509Certificate,
+ @Volatile private var networkParameters: NetworkParameters,
+ private val networkParametersReader: NetworkParametersReader,
+ private val networkParametersStorage: NetworkParametersStorage) {
+ companion object {
+ private val logger = contextLogger()
+ private val alwaysHotloadable = listOf(NetworkParameters::epoch, NetworkParameters::modifiedTime)
+ }
+
+ private val networkParameterUpdateListeners = mutableListOf()
+ private val notaryUpdateListeners = mutableListOf()
+
+ fun addNetworkParametersChangedListeners(listener: NetworkParameterUpdateListener) {
+ networkParameterUpdateListeners.add(listener)
+ }
+
+ fun addNotaryUpdateListener(listener: NotaryUpdateListener) {
+ notaryUpdateListeners.add(listener)
+ }
+
+ private fun notifyListenersFor(notaries: List) = notaryUpdateListeners.forEach { it.onNewNotaryList(notaries) }
+ private fun notifyListenersFor(networkParameters: NetworkParameters) = networkParameterUpdateListeners.forEach { it.onNewNetworkParameters(networkParameters) }
+
+ fun attemptHotload(newNetworkParameterHash: SecureHash): Boolean {
+
+ val newSignedNetParams = networkMapClient.getNetworkParameters(newNetworkParameterHash)
+ val newNetParams = newSignedNetParams.verifiedNetworkParametersCert(trustRoot)
+
+ if (canHotload(newNetParams)) {
+ logger.info("All changed parameters are hotloadable")
+ hotloadParameters(newNetParams)
+ return true
+ } else {
+ return false
+ }
+ }
+
+ /**
+ * Ignoring always hotloadable properties (epoch, modifiedTime) return true if the notary is the only property that is different in the new network parameters
+ */
+ private fun canHotload(newNetworkParameters: NetworkParameters): Boolean {
+
+ val propertiesChanged = NetworkParameters::class.declaredMemberProperties
+ .minus(alwaysHotloadable)
+ .filter { networkParameters.valueChanged(newNetworkParameters, it.javaGetter) }
+
+ logger.info("Updated NetworkParameters properties: $propertiesChanged")
+
+ val noPropertiesChanged = propertiesChanged.isEmpty()
+ val onlyNotariesChanged = propertiesChanged == listOf(NetworkParameters::notaries)
+ return when {
+ noPropertiesChanged -> true
+ onlyNotariesChanged -> true
+ else -> false
+ }
+ }
+
+ /**
+ * Update local networkParameters and currentParametersHash with new values.
+ * Notify all listeners for network parameter changes
+ */
+ private fun hotloadParameters(newNetworkParameters: NetworkParameters) {
+
+ networkParameters = newNetworkParameters
+ val networkParametersAndSigned = networkParametersReader.read()
+ networkParametersStorage.setCurrentParameters(networkParametersAndSigned.signed, trustRoot)
+ notifyListenersFor(newNetworkParameters)
+ notifyListenersFor(newNetworkParameters.notaries)
+ }
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/network/NotaryUpdateListener.kt b/node/src/main/kotlin/net/corda/node/services/network/NotaryUpdateListener.kt
new file mode 100644
index 0000000000..6cd4570b71
--- /dev/null
+++ b/node/src/main/kotlin/net/corda/node/services/network/NotaryUpdateListener.kt
@@ -0,0 +1,11 @@
+package net.corda.node.services.network
+
+import net.corda.core.node.NotaryInfo
+
+/**
+ * When notaries inside network parameters change on a flag day, onNewNotaryList will be invoked with the new notary list.
+ * Used inside {@link net.corda.node.services.network.NetworkParametersUpdater}
+ */
+interface NotaryUpdateListener {
+ fun onNewNotaryList(notaries: List)
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt
index f09d9aa8f6..709e415cdd 100644
--- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt
+++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt
@@ -38,9 +38,10 @@ import javax.persistence.PersistenceException
/** Database-based network map cache. */
@ThreadSafe
+@Suppress("TooManyFunctions")
open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private val database: CordaPersistence,
- private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken() {
+ private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken(), NotaryUpdateListener {
companion object {
private val logger = contextLogger()
@@ -53,6 +54,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
override val nodeReady: OpenFuture = openFuture()
+ @Volatile
private lateinit var notaries: List
override val notaryIdentities: List get() = notaries.map { it.identity }
@@ -386,4 +388,8 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
for (nodeInfo in result) session.remove(nodeInfo)
}
}
+
+ override fun onNewNotaryList(notaries: List) {
+ this.notaries = notaries
+ }
}
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt
index 3c638584e9..a44a66b14a 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt
@@ -139,7 +139,7 @@ sealed class Event {
data class AsyncOperationCompletion(val returnValue: Any?) : Event()
/**
- * Signals the faiure of a [FlowAsyncOperation].
+ * Signals the failure of a [FlowAsyncOperation].
*
* Scheduling is triggered by the service that completes the future returned by the async operation.
*
@@ -179,6 +179,13 @@ sealed class Event {
override fun toString() = "WakeUpSleepyFlow"
}
+ /**
+ * Terminate the specified [sessions], removing them from in-memory datastructures.
+ *
+ * @param sessions The sessions to terminate
+ */
+ data class TerminateSessions(val sessions: Set) : Event()
+
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt
index cea423134f..0911ed18a4 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt
@@ -41,47 +41,18 @@ class StartedFlowTransition(
continuation = FlowContinuation.Throw(errorsToThrow[0])
)
}
- val sessionsToBeTerminated = findSessionsToBeTerminated(startingState)
- // if there are sessions to be closed, we close them as part of this transition and normal processing will continue on the next transition.
- return if (sessionsToBeTerminated.isNotEmpty()) {
- terminateSessions(sessionsToBeTerminated)
- } else {
- when (flowIORequest) {
- is FlowIORequest.Send -> sendTransition(flowIORequest)
- is FlowIORequest.Receive -> receiveTransition(flowIORequest)
- is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
- is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
- is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
- is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
- is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
- is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
- is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
- FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
- }
- }
- }
-
- private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
- return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
- val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
- if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
- sessionId to sessionState
- } else {
- null
- }
- }.toMap()
- }
-
- private fun terminateSessions(sessionsToBeTerminated: SessionMap): TransitionResult {
- return builder {
- val sessionsToRemove = sessionsToBeTerminated.keys
- val newCheckpoint = currentState.checkpoint.removeSessions(sessionsToRemove)
- .removeSessionsToBeClosed(sessionsToRemove)
- currentState = currentState.copy(checkpoint = newCheckpoint)
- actions.add(Action.RemoveSessionBindings(sessionsToRemove))
- actions.add(Action.ScheduleEvent(Event.DoRemainingWork))
- FlowContinuation.ProcessEvents
- }
+ return when (flowIORequest) {
+ is FlowIORequest.Send -> sendTransition(flowIORequest)
+ is FlowIORequest.Receive -> receiveTransition(flowIORequest)
+ is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
+ is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
+ is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
+ is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
+ is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
+ is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
+ is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
+ FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
+ }.let { scheduleTerminateSessionsIfRequired(it) }
}
private fun waitForSessionConfirmationsTransition(): TransitionResult {
@@ -158,6 +129,7 @@ class StartedFlowTransition(
}
}
+ @Suppress("TooGenericExceptionCaught")
private fun sendAndReceiveTransition(flowIORequest: FlowIORequest.SendAndReceive): TransitionResult {
val sessionIdToMessage = LinkedHashMap>()
val sessionIdToSession = LinkedHashMap()
@@ -171,18 +143,23 @@ class StartedFlowTransition(
if (isErrored()) {
FlowContinuation.ProcessEvents
} else {
- val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
- if (receivedMap == null) {
- // We don't yet have the messages, change the suspension to be on Receive
- val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
- currentState = currentState.copy(
+ try {
+ val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
+ if (receivedMap == null) {
+ // We don't yet have the messages, change the suspension to be on Receive
+ val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
+ currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
- flowState = FlowState.Started(newIoRequest, started.frozenFiber)
+ flowState = FlowState.Started(newIoRequest, started.frozenFiber)
)
- )
- FlowContinuation.ProcessEvents
- } else {
- resumeFlowLogic(receivedMap)
+ )
+ FlowContinuation.ProcessEvents
+ } else {
+ resumeFlowLogic(receivedMap)
+ }
+ } catch (t: Throwable) {
+ // E.g. A session end message received while expecting a data session message
+ resumeFlowLogic(t)
}
}
}
@@ -216,6 +193,7 @@ class StartedFlowTransition(
}
}
+ @Suppress("TooGenericExceptionCaught")
private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult {
return builder {
val sessionIdToSession = LinkedHashMap()
@@ -224,11 +202,16 @@ class StartedFlowTransition(
}
// send initialises to uninitialised sessions
sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys)
- val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
- if (receivedMap == null) {
- FlowContinuation.ProcessEvents
- } else {
- resumeFlowLogic(receivedMap)
+ try {
+ val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
+ if (receivedMap == null) {
+ FlowContinuation.ProcessEvents
+ } else {
+ resumeFlowLogic(receivedMap)
+ }
+ } catch (t: Throwable) {
+ // E.g. A session end message received while expecting a data session message
+ resumeFlowLogic(t)
}
}
}
@@ -253,6 +236,8 @@ class StartedFlowTransition(
val messages: Map>,
val newSessionMap: SessionMap
)
+
+ @Suppress("ComplexMethod", "NestedBlockDepth")
private fun pollSessionMessages(sessions: SessionMap, sessionIds: Set): PollResult? {
val newSessionMessages = LinkedHashMap(sessions)
val resultMessages = LinkedHashMap>()
@@ -267,7 +252,11 @@ class StartedFlowTransition(
} else {
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
- resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload
+ resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
+ throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?")
+ } else {
+ (messages[0] as DataSessionMessage).payload
+ }
}
}
else -> {
@@ -537,4 +526,25 @@ class StartedFlowTransition(
private fun executeForceCheckpoint(): TransitionResult {
return builder { resumeFlowLogic(Unit) }
}
+
+ private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
+ // If there are sessions to be closed, close them on a following transition
+ val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
+ return if (sessionsToBeTerminated.isNotEmpty()) {
+ transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys)))
+ } else {
+ transition
+ }
+ }
+
+ private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
+ return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
+ val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
+ if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
+ sessionId to sessionState
+ } else {
+ null
+ }
+ }.toMap()
+ }
}
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
index 169148108e..998625105f 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
@@ -62,6 +62,7 @@ class TopLevelTransition(
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
+ is Event.TerminateSessions -> terminateSessionsTransition(event)
}
}
@@ -366,4 +367,16 @@ class TopLevelTransition(
resumeFlowLogic(Unit)
}
}
+
+ private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
+ return builder {
+ val sessions = event.sessions
+ val newCheckpoint = currentState.checkpoint
+ .removeSessions(sessions)
+ .removeSessionsToBeClosed(sessions)
+ currentState = currentState.copy(checkpoint = newCheckpoint)
+ actions.add(Action.RemoveSessionBindings(sessions))
+ FlowContinuation.ProcessEvents
+ }
+ }
}
diff --git a/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt b/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt
index 8e76c1ef5c..1d06c470c8 100644
--- a/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt
+++ b/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt
@@ -6,6 +6,8 @@ import net.corda.core.context.InvocationOrigin
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByService
+import net.corda.core.identity.CordaX500Name
+import net.corda.core.internal.packageName
import net.corda.core.node.AppServiceHub
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.CordaService
@@ -16,12 +18,20 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
+import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.cordapp.DummyRPCFlow
+import net.corda.testing.core.BOC_NAME
+import net.corda.testing.core.DUMMY_NOTARY_NAME
+import net.corda.testing.core.TestIdentity
+import net.corda.testing.internal.vault.DummyLinearStateSchemaV1
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkParameters
+import net.corda.testing.node.MockServices
import net.corda.testing.node.StartedMockNode
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.enclosedCordapp
+import net.corda.testing.node.makeTestIdentityService
+import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.Test
@@ -100,6 +110,22 @@ class CordaServiceTest {
nodeA.services.cordaService(EntityManagerService::class.java)
}
+ @Test(timeout=300_000)
+ fun `MockServices when initialized with package name not on classpath throws ClassNotFoundException`() {
+ val cordappPackages = listOf(
+ "com.r3.corda.sdk.tokens.money",
+ "net.corda.finance.contracts",
+ CashSchemaV1::class.packageName,
+ DummyLinearStateSchemaV1::class.packageName)
+ val bankOfCorda = TestIdentity(BOC_NAME)
+ val dummyCashIssuer = TestIdentity(CordaX500Name("Snake Oil Issuer", "London", "GB"), 10)
+ val dummyNotary = TestIdentity(DUMMY_NOTARY_NAME, 20)
+ val identityService = makeTestIdentityService(dummyNotary.identity)
+
+ Assertions.assertThatThrownBy { MockServices(cordappPackages, dummyNotary, identityService, dummyCashIssuer.keyPair, bankOfCorda.keyPair) }
+ .isInstanceOf(ClassNotFoundException::class.java).hasMessage("Could not create jar file as the given package is not found on the classpath: com.r3.corda.sdk.tokens.money")
+ }
+
@StartableByService
class DummyServiceFlow : FlowLogic() {
companion object {
diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt
index a406bd9be6..8e36a97de9 100644
--- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt
+++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt
@@ -78,7 +78,6 @@ class NetworkMapUpdaterTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
-
private val cacheExpiryMs = 1000
private val privateNetUUID = UUID.randomUUID()
private val fs = Jimfs.newFileSystem(unix())
@@ -120,12 +119,13 @@ class NetworkMapUpdaterTest {
networkParameters: NetworkParameters = server.networkParameters,
autoAcceptNetworkParameters: Boolean = true,
excludedAutoAcceptNetworkParameters: Set = emptySet()) {
+
updater!!.start(DEV_ROOT_CA.certificate,
server.networkParameters.serialize().hash,
ourNodeInfo,
networkParameters,
MockKeyManagementService(makeTestIdentityService(), ourKeyPair),
- NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters))
+ NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters), null)
}
@Test(timeout=300_000)
diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersHotloaderTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersHotloaderTest.kt
new file mode 100644
index 0000000000..1fcd40cf42
--- /dev/null
+++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersHotloaderTest.kt
@@ -0,0 +1,125 @@
+package net.corda.node.services.network
+
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.never
+import com.nhaarman.mockito_kotlin.verify
+import net.corda.core.identity.Party
+import net.corda.core.internal.NetworkParametersStorage
+import net.corda.core.node.NetworkParameters
+import net.corda.core.node.NotaryInfo
+import net.corda.core.serialization.serialize
+import net.corda.coretesting.internal.DEV_ROOT_CA
+import net.corda.node.internal.NetworkParametersReader
+import net.corda.nodeapi.internal.createDevNetworkMapCa
+import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
+import net.corda.testing.common.internal.addNotary
+import net.corda.testing.common.internal.testNetworkParameters
+import net.corda.testing.core.SerializationEnvironmentRule
+import net.corda.testing.core.TestIdentity
+import org.junit.Assert
+import org.junit.Rule
+import org.junit.Test
+import org.mockito.Mockito
+
+class NetworkParametersHotloaderTest {
+ @Rule
+ @JvmField
+ val testSerialization = SerializationEnvironmentRule(true)
+ private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa()
+ private val trustRoot = DEV_ROOT_CA.certificate
+
+ private val originalNetworkParameters = testNetworkParameters()
+ private val notary: Party = TestIdentity.fresh("test notary").party
+ private val networkParametersWithNotary = originalNetworkParameters.addNotary(notary)
+ private val networkParametersStorage = Mockito.mock(NetworkParametersStorage::class.java)
+
+ @Test(timeout = 300_000)
+ fun `can hotload if notary changes`() {
+ `can hotload`(networkParametersWithNotary)
+ }
+
+ @Test(timeout = 300_000)
+ fun `can not hotload if notary changes but another non-hotloadable property also changes`() {
+
+ val newnetParamsWithNewNotaryAndMaxMsgSize = networkParametersWithNotary.copy(maxMessageSize = networkParametersWithNotary.maxMessageSize + 1)
+ `can not hotload`(newnetParamsWithNewNotaryAndMaxMsgSize)
+ }
+
+ @Test(timeout = 300_000)
+ fun `can hotload if only always hotloadable properties change`() {
+
+ val newParametersWithAlwaysHotloadableProperties = originalNetworkParameters.copy(epoch = originalNetworkParameters.epoch + 1, modifiedTime = originalNetworkParameters.modifiedTime.plusSeconds(60))
+ `can hotload`(newParametersWithAlwaysHotloadableProperties)
+ }
+
+ @Test(timeout = 300_000)
+ fun `can not hotload if maxMessageSize changes`() {
+
+ val parametersWithNewMaxMessageSize = originalNetworkParameters.copy(maxMessageSize = originalNetworkParameters.maxMessageSize + 1)
+ `can not hotload`(parametersWithNewMaxMessageSize)
+ }
+
+ @Test(timeout = 300_000)
+ fun `can not hotload if maxTransactionSize changes`() {
+
+ val parametersWithNewMaxTransactionSize = originalNetworkParameters.copy(maxTransactionSize = originalNetworkParameters.maxMessageSize + 1)
+ `can not hotload`(parametersWithNewMaxTransactionSize)
+ }
+
+ @Test(timeout = 300_000)
+ fun `can not hotload if minimumPlatformVersion changes`() {
+
+ val parametersWithNewMinimumPlatformVersion = originalNetworkParameters.copy(minimumPlatformVersion = originalNetworkParameters.minimumPlatformVersion + 1)
+ `can not hotload`(parametersWithNewMinimumPlatformVersion)
+ }
+
+ private fun `can hotload`(newNetworkParameters: NetworkParameters) {
+ val notaryUpdateListener = Mockito.spy(object : NotaryUpdateListener {
+ override fun onNewNotaryList(notaries: List) {
+ }
+ })
+
+ val networkParametersChangedListener = Mockito.spy(object : NetworkParameterUpdateListener {
+ override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
+ }
+ })
+ val networkParametersHotloader = createHotloaderWithMockedServices(newNetworkParameters).also {
+ it.addNotaryUpdateListener(notaryUpdateListener)
+ it.addNetworkParametersChangedListeners(networkParametersChangedListener)
+ }
+
+ Assert.assertTrue(networkParametersHotloader.attemptHotload(newNetworkParameters.serialize().hash))
+ verify(notaryUpdateListener).onNewNotaryList(newNetworkParameters.notaries)
+ verify(networkParametersChangedListener).onNewNetworkParameters(newNetworkParameters)
+ }
+
+ private fun `can not hotload`(newNetworkParameters: NetworkParameters) {
+ val notaryUpdateListener = Mockito.spy(object : NotaryUpdateListener {
+ override fun onNewNotaryList(notaries: List) {
+ }
+ })
+
+ val networkParametersChangedListener = Mockito.spy(object : NetworkParameterUpdateListener {
+ override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
+ }
+ })
+ val networkParametersHotloader = createHotloaderWithMockedServices(newNetworkParameters).also {
+ it.addNotaryUpdateListener(notaryUpdateListener)
+ it.addNetworkParametersChangedListeners(networkParametersChangedListener)
+ }
+ Assert.assertFalse(networkParametersHotloader.attemptHotload(newNetworkParameters.serialize().hash))
+ verify(notaryUpdateListener, never()).onNewNotaryList(any());
+ verify(networkParametersChangedListener, never()).onNewNetworkParameters(any());
+ }
+
+ private fun createHotloaderWithMockedServices(newNetworkParameters: NetworkParameters): NetworkParametersHotloader {
+ val signedNetworkParameters = networkMapCertAndKeyPair.sign(newNetworkParameters)
+ val networkMapClient = Mockito.mock(NetworkMapClient::class.java)
+ Mockito.`when`(networkMapClient.getNetworkParameters(newNetworkParameters.serialize().hash)).thenReturn(signedNetworkParameters)
+ val networkParametersReader = Mockito.mock(NetworkParametersReader::class.java)
+ Mockito.`when`(networkParametersReader.read())
+ .thenReturn(NetworkParametersReader.NetworkParametersAndSigned(signedNetworkParameters, trustRoot))
+ return NetworkParametersHotloader(networkMapClient, trustRoot, originalNetworkParameters, networkParametersReader, networkParametersStorage)
+ }
+}
+
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
index ee93d937d2..061345efe7 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
@@ -97,7 +97,7 @@ class RetryFlowMockTest {
}
@Test(timeout=300_000)
- fun `Restart does not set senderUUID`() {
+ fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() {
val messagesSent = Collections.synchronizedList(mutableListOf())
val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/CustomCordapp.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/CustomCordapp.kt
index 9d59807e95..37590387b3 100644
--- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/CustomCordapp.kt
+++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/CustomCordapp.kt
@@ -37,11 +37,6 @@ data class CustomCordapp(
val signingInfo: SigningInfo? = null,
override val config: Map = emptyMap()
) : TestCordappInternal() {
- init {
- require(packages.isNotEmpty() || classes.isNotEmpty() || fixups.isNotEmpty()) {
- "At least one package or class must be specified"
- }
- }
override val jarFile: Path get() = getJarFile(this)
@@ -55,7 +50,7 @@ data class CustomCordapp(
@VisibleForTesting
internal fun packageAsJar(file: Path) {
val classGraph = ClassGraph()
- if (packages.isNotEmpty()) {
+ if(packages.isNotEmpty()){
classGraph.whitelistPaths(*packages.map { it.replace('.', '/') }.toTypedArray())
}
if (classes.isNotEmpty()) {
@@ -78,6 +73,10 @@ data class CustomCordapp(
}
}
+ if (scanResult.allResources.isEmpty()){
+ throw ClassNotFoundException("Could not create jar file as the given package is not found on the classpath: ${packages.toList()[0]}")
+ }
+
// The same resource may be found in different locations (this will happen when running from gradle) so just
// pick the first one found.
scanResult.allResources.asMap().forEach { path, resourceList ->
@@ -178,8 +177,8 @@ data class CustomCordapp(
val jarFile = cordappsDirectory.createDirectories() / filename
if (it.fixups.isNotEmpty()) {
it.createFixupJar(jarFile)
- } else {
- it.packageAsJar(jarFile)
+ } else if(it.packages.isNotEmpty() || it.classes.isNotEmpty() || it.fixups.isNotEmpty()) {
+ it.packageAsJar(jarFile)
}
it.signJar(jarFile)
logger.debug { "$it packaged into $jarFile" }
diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappInternal.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappInternal.kt
index 3380c37b5d..d04eb9f147 100644
--- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappInternal.kt
+++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappInternal.kt
@@ -30,14 +30,17 @@ abstract class TestCordappInternal : TestCordapp() {
// Precedence is given to node-specific CorDapps
val allCordapps = nodeSpecificCordapps + generalCordapps.filter { it.withOnlyJarContents() !in nodeSpecificCordappsWithoutMeta }
// Ignore any duplicate jar files
- val jarToCordapp = allCordapps.associateBy { it.jarFile }
+ val jarToCordapp = allCordapps.filter {
+ it !is CustomCordapp || it.packages.isNotEmpty() || it.classes.isNotEmpty() || it.fixups.isNotEmpty() }.associateBy { it.jarFile }
val cordappsDir = baseDirectory / "cordapps"
val configDir = (cordappsDir / "config").createDirectories()
jarToCordapp.forEach { jar, cordapp ->
try {
- jar.copyToDirectory(cordappsDir)
+ if (jar.toFile().exists()) {
+ jar.copyToDirectory(cordappsDir)
+ }
} catch (e: FileAlreadyExistsException) {
// Ignore if the node already has the same CorDapp jar. This can happen if the node is being restarted.
}
diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt
index 88620bb1d7..6e3f5797a2 100644
--- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt
+++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt
@@ -117,7 +117,7 @@ class NetworkMapServer(private val pollInterval: Duration,
// Mapping from the UUID of the network (null for global one) to hashes of the nodes in network
private val networkMaps = mutableMapOf>()
val latestAcceptedParametersMap = mutableMapOf()
- private val signedNetParams by lazy { networkMapCertAndKeyPair.sign(networkParameters) }
+ private val signedNetParams get() = networkMapCertAndKeyPair.sign(networkParameters)
@POST
@Path("publish")
diff --git a/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt b/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt
index ce96875ef7..9806c56e11 100644
--- a/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt
+++ b/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt
@@ -22,7 +22,7 @@ import org.junit.Test
*/
class TestResponseFlowInIsolation {
- private val network: MockNetwork = MockNetwork(MockNetworkParameters(cordappsForAllNodes = cordappsForPackages("com.template")))
+ private val network: MockNetwork = MockNetwork(MockNetworkParameters(cordappsForAllNodes = cordappsForPackages()))
private val a = network.createNode()
private val b = network.createNode()
diff --git a/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolationInJava.java b/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolationInJava.java
index 8aead69f18..14947872ec 100644
--- a/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolationInJava.java
+++ b/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolationInJava.java
@@ -28,7 +28,7 @@ import static org.hamcrest.Matchers.instanceOf;
*/
public class TestResponseFlowInIsolationInJava {
- private final MockNetwork network = new MockNetwork(new MockNetworkParameters().withCordappsForAllNodes(cordappsForPackages("com.template")));
+ private final MockNetwork network = new MockNetwork(new MockNetworkParameters().withCordappsForAllNodes(cordappsForPackages()));
private final StartedMockNode a = network.createNode();
private final StartedMockNode b = network.createNode();
diff --git a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/HashLookupCommandTest.kt b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/HashLookupCommandTest.kt
deleted file mode 100644
index 2376081a4a..0000000000
--- a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/HashLookupCommandTest.kt
+++ /dev/null
@@ -1,96 +0,0 @@
-package net.corda.tools.shell
-
-import co.paralleluniverse.fibers.Suspendable
-import com.google.common.io.Files
-import com.jcraft.jsch.ChannelExec
-import com.jcraft.jsch.JSch
-import com.jcraft.jsch.Session
-import net.corda.core.crypto.SecureHash
-import net.corda.core.crypto.sha256
-import net.corda.core.flows.FlowLogic
-import net.corda.core.flows.StartableByRPC
-import net.corda.core.identity.Party
-import net.corda.core.messaging.startFlow
-import net.corda.core.utilities.getOrThrow
-import net.corda.node.services.Permissions
-import net.corda.testing.contracts.DummyContract
-import net.corda.testing.core.ALICE_NAME
-import net.corda.testing.driver.DriverParameters
-import net.corda.testing.driver.NodeHandle
-import net.corda.testing.driver.driver
-import net.corda.testing.node.User
-import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
-import org.bouncycastle.util.io.Streams
-import org.junit.Test
-import kotlin.test.assertTrue
-
-class HashLookupCommandTest {
-
- @Test(timeout=300_000)
- fun `hash lookup command returns correct response`() {
- val user = User("u", "p", setOf(Permissions.all()))
-
- driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP))) {
- val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true)
- val node = nodeFuture.getOrThrow()
- val txId = issueTransaction(node)
- val txIdHashed = txId.sha256()
-
- testCommand(user, node, command = "hashLookup $txId", expected = "Found a matching transaction with Id: $txId")
- testCommand(user, node, command = "hashLookup $txIdHashed", expected = "Found a matching transaction with Id: $txId")
- testCommand(user, node, command = "hashLookup ${SecureHash.randomSHA256()}", expected = "No matching transaction found")
- }
- }
-
- private fun testCommand(user: User, node: NodeHandle, command: String, expected: String) {
- val session = connectToShell(user, node)
- val channel = session.openChannel("exec") as ChannelExec
- channel.setCommand(command)
- channel.connect(5000)
-
- assertTrue(channel.isConnected)
-
- val response = String(Streams.readAll(channel.inputStream))
- val matchFound = response.lines().any { line ->
- line.contains(expected)
- }
- channel.disconnect()
- assertTrue(matchFound)
- session.disconnect()
- }
-
- private fun connectToShell(user: User, node: NodeHandle): Session {
- val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(),
- user = user.username, password = user.password,
- hostAndPort = node.rpcAddress,
- sshdPort = 2224)
-
- InteractiveShell.startShell(conf)
- InteractiveShell.nodeInfo()
-
- val session = JSch().getSession("u", "localhost", 2224)
- session.setConfig("StrictHostKeyChecking", "no")
- session.setPassword("p")
- session.connect()
-
- assertTrue(session.isConnected)
- return session
- }
-
- private fun issueTransaction(node: NodeHandle): SecureHash {
- return node.rpc.startFlow(::DummyIssue).returnValue.get()
- }
-}
-
-@StartableByRPC
-internal class DummyIssue : FlowLogic() {
- @Suspendable
- override fun call(): SecureHash {
- val me = serviceHub.myInfo.legalIdentities.first().ref(0)
- val fakeNotary = me.party
- val builder = DummyContract.generateInitial(1, fakeNotary as Party, me)
- val stx = serviceHub.signInitialTransaction(builder)
- serviceHub.recordTransactions(stx)
- return stx.id
- }
-}
diff --git a/tools/shell/src/main/java/net/corda/tools/shell/HashLookupShellCommand.java b/tools/shell/src/main/java/net/corda/tools/shell/HashLookupShellCommand.java
index 3fcd803d8e..7d09802088 100644
--- a/tools/shell/src/main/java/net/corda/tools/shell/HashLookupShellCommand.java
+++ b/tools/shell/src/main/java/net/corda/tools/shell/HashLookupShellCommand.java
@@ -1,6 +1,7 @@
package net.corda.tools.shell;
import net.corda.core.crypto.SecureHash;
+import net.corda.core.internal.VisibleForTesting;
import net.corda.core.messaging.CordaRPCOps;
import net.corda.core.messaging.StateMachineTransactionMapping;
import org.crsh.cli.Argument;
@@ -13,13 +14,14 @@ import org.crsh.text.Decoration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
@Named("hashLookup")
public class HashLookupShellCommand extends InteractiveShellCommand {
private static Logger logger = LoggerFactory.getLogger(HashLookupShellCommand.class);
- final private String manualText ="Checks if a transaction matching a specified Id hash value is recorded on this node.\n\n" +
+ private static final String manualText ="Checks if a transaction matching a specified Id hash value is recorded on this node.\n\n" +
"Both the transaction Id and the hashed value of a transaction Id (as returned by the Notary in case of a double-spend) is a valid input.\n" +
"This is mainly intended to be used for troubleshooting notarisation issues when a\n" +
"state is claimed to be already consumed by another transaction.\n\n" +
@@ -29,25 +31,32 @@ public class HashLookupShellCommand extends InteractiveShellCommand {
@Man(manualText)
public void main(@Usage("A transaction Id or a hexadecimal SHA-256 hash value representing the hashed transaction Id") @Argument(unquote = false) String txIdHash) {
+ CordaRPCOps proxy = ops();
+ try {
+ hashLookup(out, proxy, txIdHash);
+ } catch (IllegalArgumentException ex) {
+ out.println(manualText);
+ out.println(ex.getMessage(), Decoration.bold, Color.red);
+ }
+ }
+
+ @VisibleForTesting
+ protected static void hashLookup(PrintWriter out, CordaRPCOps proxy, String txIdHash) throws IllegalArgumentException {
logger.info("Executing command \"hashLookup\".");
if (txIdHash == null) {
out.println(manualText);
- out.println("Please provide a hexadecimal transaction Id hash value or a transaction Id", Decoration.bold, Color.red);
- return;
+ throw new IllegalArgumentException("Please provide a hexadecimal transaction Id hash value or a transaction Id");
}
- CordaRPCOps proxy = ops();
- List mapping = proxy.stateMachineRecordedTransactionMappingSnapshot();
-
SecureHash txIdHashParsed;
try {
txIdHashParsed = SecureHash.parse(txIdHash);
} catch (IllegalArgumentException e) {
- out.println("The provided string is not a valid hexadecimal SHA-256 hash value", Decoration.bold, Color.red);
- return;
+ throw new IllegalArgumentException("The provided string is not a valid hexadecimal SHA-256 hash value");
}
+ List mapping = proxy.stateMachineRecordedTransactionMappingSnapshot();
Optional match = mapping.stream()
.map(StateMachineTransactionMapping::getTransactionId)
.filter(
@@ -59,7 +68,7 @@ public class HashLookupShellCommand extends InteractiveShellCommand {
SecureHash found = match.get();
out.println("Found a matching transaction with Id: " + found.toString());
} else {
- out.println("No matching transaction found", Decoration.bold, Color.red);
+ throw new IllegalArgumentException("No matching transaction found");
}
}
}
diff --git a/tools/shell/src/test/kotlin/net/corda/tools/shell/HashLookupCommandTest.kt b/tools/shell/src/test/kotlin/net/corda/tools/shell/HashLookupCommandTest.kt
new file mode 100644
index 0000000000..15b4e951d8
--- /dev/null
+++ b/tools/shell/src/test/kotlin/net/corda/tools/shell/HashLookupCommandTest.kt
@@ -0,0 +1,67 @@
+package net.corda.tools.shell
+
+import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.sha256
+import net.corda.core.flows.StateMachineRunId
+import net.corda.core.messaging.CordaRPCOps
+import net.corda.core.messaging.StateMachineTransactionMapping
+import org.hamcrest.MatcherAssert
+import org.hamcrest.core.StringContains
+import org.junit.Test
+import org.mockito.Mockito
+import java.io.CharArrayWriter
+import java.io.PrintWriter
+import java.util.UUID
+import kotlin.test.assertFailsWith
+
+class HashLookupCommandTest {
+ companion object {
+ private val DEFAULT_TXID: SecureHash = SecureHash.randomSHA256()
+
+ private fun ops(vararg txIds: SecureHash): CordaRPCOps? {
+ val snapshot: List = txIds.map { txId ->
+ StateMachineTransactionMapping(StateMachineRunId(UUID.randomUUID()), txId)
+ }
+ return Mockito.mock(CordaRPCOps::class.java).apply {
+ Mockito.`when`(stateMachineRecordedTransactionMappingSnapshot()).thenReturn(snapshot)
+ }
+ }
+
+ private fun runCommand(ops: CordaRPCOps?, txIdHash: String): String {
+ val arrayWriter = CharArrayWriter()
+ return PrintWriter(arrayWriter).use {
+ HashLookupShellCommand.hashLookup(it, ops, txIdHash)
+ it.flush()
+ arrayWriter.toString()
+ }
+ }
+ }
+
+ @Test(timeout=300_000)
+ fun `hash lookup command returns correct response`() {
+ val ops = ops(DEFAULT_TXID)
+ var response = runCommand(ops, DEFAULT_TXID.toString())
+
+ MatcherAssert.assertThat(response, StringContains.containsString("Found a matching transaction with Id: $DEFAULT_TXID"))
+
+ // Verify the hash of the TX ID also works
+ response = runCommand(ops, DEFAULT_TXID.sha256().toString())
+ MatcherAssert.assertThat(response, StringContains.containsString("Found a matching transaction with Id: $DEFAULT_TXID"))
+ }
+
+ @Test(timeout=300_000)
+ fun `should reject invalid txid`() {
+ val ops = ops(DEFAULT_TXID)
+ assertFailsWith("The provided string is not a valid hexadecimal SHA-256 hash value") {
+ runCommand(ops, "abcdefgh")
+ }
+ }
+
+ @Test(timeout=300_000)
+ fun `should reject unknown txid`() {
+ val ops = ops(DEFAULT_TXID)
+ assertFailsWith("No matching transaction found") {
+ runCommand(ops, SecureHash.randomSHA256().toString())
+ }
+ }
+}
\ No newline at end of file