CORDA-3871: New integration test for handshake timeout in AMQPClient ()

* CORDA-3871: Import external code

Compiles, but does not work for various reasons

* CORDA-3871: More improvements to imported code

Currently fails due to keystores not being found

* CORDA-3871: Initialise keystores for the server

Currently fails due to keystores for client not being found

* CORDA-3871: Configure certificates to client

The program started to run

* CORDA-3871: Improve debug output

* CORDA-3871: Few more minor changes

* CORDA-3871: Add AMQClient test

Currently fails due to `localCert` not being set

* CORDA-3871: Configure server to demand client to present its certificate

* CORDA-3871: Changes to the test to make it pass

ACK status is not delivered as server is not talking AMQP

* CORDA-3871: Add delayed handshake scenario

* CORDA-3871: Tidy-up imported classes

* CORDA-3871: Hide thread creation inside `ServerThread`

* CORDA-3871: Test description

* CORDA-3871: Detekt baseline update

* CORDA-3871: Trigger repeated execution of new tests

To make sure they are not flaky

* CORDA-3871: Improve robustness of the newly introduced tests

* CORDA-3871: Improve robustness of the newly introduced tests

* CORDA-3871: New tests proven to be stable - reduce number of iterations to 1

* CORDA-3871: Adding Alex Karnezis to the list of contributors
This commit is contained in:
Viktor Kolomeyko 2020-07-31 09:26:32 +01:00 committed by GitHub
parent 68feb1c35f
commit c498c5bf7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1098 additions and 79 deletions
CONTRIBUTORS.mddetekt-baseline.xml
node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty
node/src/integration-test

View File

@ -13,6 +13,7 @@ see changes to this list.
* agoldvarg
* Ajitha Thayaharan (BCS Technology International)
* Alberto Arri (R3)
* Alex Karnezis
* amiracam
* Amol Pednekar
* Andras Slemmer (R3)

View File

@ -9,23 +9,11 @@
<ID>ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step</ID>
<ID>ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_</ID>
<ID>ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step</ID>
<ID>ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$AWAITING_REQUEST : Step</ID>
<ID>ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$SENDING_TOP_UP_ISSUE_REQUEST : Step</ID>
<ID>ClassNaming:DeserializeNeedingCarpentryTests.kt$DeserializeNeedingCarpentryTests$outer</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$FINALISING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$GENERATING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$SIGNING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$VERIFYING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$EXTRACTING_VAULT_STATES : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$ID_OTHER_NODES : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$OTHER_TX_COMPONENTS : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SENDING_AND_RECEIVING_DATA : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SIGS_GATHERING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_BUILDING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_SIGNING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_VERIFICATION : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$VERIFYING_SIGS : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$ResponderFlow.Companion$RECEIVING_AND_SENDING_DATA : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$ExceptionFlow$START_STEP : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$RECEIVED_STEP : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$START_STEP : Step</ID>
@ -178,7 +166,6 @@
<ID>ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage)</ID>
<ID>ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection?</ID>
<ID>ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type</ID>
<ID>ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test(timeout=300_000) fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`()</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$private fun doRunMigration( run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null )</ID>
<ID>ComplexMethod:SendTransactionFlow.kt$DataVendingFlow$@Suspendable override fun call(): Void?</ID>
@ -310,7 +297,6 @@
<ID>ForbiddenComment:CordappProviderImplTests.kt$CordappProviderImplTests.Companion$// TODO: Cordapp name should differ from the JAR name</ID>
<ID>ForbiddenComment:CoreFlowHandlers.kt$NotaryChangeHandler$// TODO: Right now all nodes will automatically approve the notary change. We need to figure out if stricter controls are necessary.</ID>
<ID>ForbiddenComment:CrossCashTest.kt$CrossCashState$// TODO: Alternative: We may possibly reduce the complexity of the search even further using some form of</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: Check if non-ECC keys satisfy params (i.e. approved/valid RSA modulus size).</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: We currently use SHA256(seed) when retrying, but BIP32 just skips a counter (i) that results to an invalid key.</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: change the val name to a more descriptive one as it's now confusing and looks like a Key type.</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: change val name to SPHINCS256_SHA512. This will break backwards compatibility.</ID>
@ -435,7 +421,6 @@
<ID>ForbiddenComment:RatesFixFlow.kt$RatesFixFlow.FixQueryFlow$// TODO: add deadline to receive</ID>
<ID>ForbiddenComment:ResolveTransactionsFlow.kt$ResolveTransactionsFlow$// TODO: This could be done in parallel with other fetches for extra speed.</ID>
<ID>ForbiddenComment:ResolveTransactionsFlowTest.kt$ResolveTransactionsFlowTest$// TODO: this operation should not require an explicit transaction</ID>
<ID>ForbiddenComment:RestrictedEntityManager.kt$RestrictedEntityManager$// TODO: Figure out which other methods on EntityManager need to be blocked?</ID>
<ID>ForbiddenComment:ScheduledActivityObserver.kt$ScheduledActivityObserver.Companion$// TODO: Beware we are calling dynamically loaded contract code inside here.</ID>
<ID>ForbiddenComment:ScheduledFlowIntegrationTests.kt$ScheduledFlowIntegrationTests$// TODO: the queries below are not atomic so we need to allow enough time for the scheduler to finish. Would be better to query scheduler.</ID>
<ID>ForbiddenComment:SendTransactionFlow.kt$DataVendingFlow$// Security TODO: Check for abnormally large or malformed data requests</ID>
@ -622,7 +607,6 @@
<ID>FunctionNaming:VersionExtractorTest.kt$VersionExtractorTest$@Test(timeout=300_000) fun version_header_extraction_present()</ID>
<ID>LargeClass:AbstractNode.kt$AbstractNode&lt;S&gt; : SingletonSerializeAsToken</ID>
<ID>LargeClass:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager : StateMachineManagerStateMachineManagerInternal</ID>
<ID>LongMethod:FlowCookbook.kt$InitiatorFlow$@Suppress("RemoveExplicitTypeArguments") @Suspendable override fun call()</ID>
<ID>LongMethod:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection&lt;Predicate&gt;</ID>
<ID>LongParameterList:AMQPSerializer.kt$AMQPSerializer$(obj: Any, data: Data, type: Type, output: SerializationOutput, context: SerializationContext, debugIndent: Int = 0)</ID>
<ID>LongParameterList:AbstractCashSelection.kt$AbstractCashSelection$(connection: Connection, amount: Amount&lt;Currency&gt;, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set&lt;AbstractParty&gt;, withIssuerRefs: Set&lt;OpaqueBytes&gt;, withResultSet: (ResultSet) -&gt; Boolean)</ID>
@ -634,7 +618,6 @@
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcTests.kt$ArtemisRpcTests$(nodeSSlconfig: MutualSslConfiguration, brokerSslOptions: BrokerRpcSslOptions?, useSslForBroker: Boolean, clientSslOptions: ClientRpcSslOptions?, address: NetworkHostAndPort = ports.nextHostAndPort(), adminAddress: NetworkHostAndPort = ports.nextHostAndPort(), baseDirectory: Path = tempFolder.root.toPath() )</ID>
<ID>LongParameterList:AttachmentsClassLoader.kt$AttachmentsClassLoaderBuilder$(attachments: List&lt;Attachment&gt;, params: NetworkParameters, txId: SecureHash, isAttachmentTrusted: (Attachment) -&gt; Boolean, parent: ClassLoader = ClassLoader.getSystemClassLoader(), block: (ClassLoader) -&gt; T)</ID>
<ID>LongParameterList:BFTSmart.kt$BFTSmart.Replica$( states: List&lt;StateRef&gt;, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List&lt;StateRef&gt; = emptyList() )</ID>
<ID>LongParameterList:BusinessCalendar.kt$BusinessCalendar.Companion$(startDate: LocalDate, period: Frequency, calendar: BusinessCalendar = EMPTY, dateRollConvention: DateRollConvention = DateRollConvention.Following, noOfAdditionalPeriods: Int = Integer.MAX_VALUE, endDate: LocalDate? = null, periodOffset: Int? = null)</ID>
<ID>LongParameterList:Cash.kt$Cash$(inputs: List&lt;State&gt;, outputs: List&lt;State&gt;, tx: LedgerTransaction, issueCommand: CommandWithParties&lt;Commands.Issue&gt;, currency: Currency, issuer: PartyAndReference)</ID>
@ -732,7 +715,6 @@
<ID>LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuer: X500Principal, issuerPublicKey: PublicKey, issuerSigner: ContentSigner, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair&lt;Date, Date&gt;, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)</ID>
<ID>LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuer: X500Principal, issuerPublicKey: PublicKey, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair&lt;Date, Date&gt;, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)</ID>
<ID>LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuerCertificate: X509Certificate, issuerKeyPair: KeyPair, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair&lt;Duration, Duration&gt; = DEFAULT_VALIDITY_WINDOW, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)</ID>
<ID>LongParameterList:internalAccessTestHelpers.kt$( inputs: List&lt;StateAndRef&lt;ContractState&gt;&gt;, outputs: List&lt;TransactionState&lt;ContractState&gt;&gt;, commands: List&lt;CommandWithParties&lt;CommandData&gt;&gt;, attachments: List&lt;Attachment&gt;, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt, networkParameters: NetworkParameters, references: List&lt;StateAndRef&lt;ContractState&gt;&gt;, componentGroups: List&lt;ComponentGroup&gt;? = null, serializedInputs: List&lt;SerializedStateAndRef&gt;? = null, serializedReferences: List&lt;SerializedStateAndRef&gt;? = null, isAttachmentTrusted: (Attachment) -&gt; Boolean )</ID>
<ID>MagicNumber:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$128</ID>
<ID>MagicNumber:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme$128</ID>
<ID>MagicNumber:AMQPServer.kt$AMQPServer$100</ID>
@ -753,7 +735,6 @@
<ID>MagicNumber:AttachmentDemo.kt$10009</ID>
<ID>MagicNumber:AttachmentDemo.kt$10010</ID>
<ID>MagicNumber:AttachmentTrustTable.kt$AttachmentTrustTable$3</ID>
<ID>MagicNumber:AttachmentsClassLoader.kt$AttachmentsClassLoader$4</ID>
<ID>MagicNumber:AzureSmbVolume.kt$AzureSmbVolume$5000</ID>
<ID>MagicNumber:BFTSmart.kt$BFTSmart.Client$100</ID>
<ID>MagicNumber:BFTSmart.kt$BFTSmart.Replica.&lt;no name provided&gt;$20000</ID>
@ -775,12 +756,6 @@
<ID>MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$16</ID>
<ID>MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$30.0</ID>
<ID>MagicNumber:ClassCarpenter.kt$ClassCarpenterImpl$3</ID>
<ID>MagicNumber:ClientRpcExample.kt$ClientRpcExample$3</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$0.7</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$0.8</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$1000</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$10000</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$2000</ID>
<ID>MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$10</ID>
<ID>MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$30</ID>
<ID>MagicNumber:CompositeSignature.kt$CompositeSignature$1024</ID>
@ -846,11 +821,6 @@
<ID>MagicNumber:ExchangeRateModel.kt$1.18</ID>
<ID>MagicNumber:ExchangeRateModel.kt$1.31</ID>
<ID>MagicNumber:FixingFlow.kt$FixingFlow.Fixer.&lt;no name provided&gt;$30</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$30</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$45</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$777</ID>
<ID>MagicNumber:FlowCookbook.kt$ResponderFlow$99</ID>
<ID>MagicNumber:FlowCookbook.kt$ResponderFlow.&lt;no name provided&gt;$777</ID>
<ID>MagicNumber:FlowLogic.kt$FlowLogic$300</ID>
<ID>MagicNumber:FlowLogic.kt$FlowLogic.Companion$5</ID>
<ID>MagicNumber:FlowMonitor.kt$FlowMonitor$1000</ID>
@ -864,7 +834,6 @@
<ID>MagicNumber:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$10</ID>
<ID>MagicNumber:HttpUtils.kt$HttpUtils$5</ID>
<ID>MagicNumber:HttpUtils.kt$HttpUtils$60</ID>
<ID>MagicNumber:IOUFlowResponder.kt$IOUFlowResponder.&lt;no name provided&gt;$100</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$360.0</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$4</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$8</ID>
@ -1026,7 +995,6 @@
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$100</ID>
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$32768</ID>
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$40</ID>
<ID>MagicNumber:NonValidatingNotaryFlow.kt$NonValidatingNotaryFlow$4</ID>
<ID>MagicNumber:Notarise.kt$10</ID>
<ID>MagicNumber:Notarise.kt$10003</ID>
<ID>MagicNumber:NullKeys.kt$NullKeys$32</ID>
@ -1143,7 +1111,6 @@
<ID>MagicNumber:StandaloneShell.kt$StandaloneShell$7</ID>
<ID>MagicNumber:StateRevisionFlow.kt$StateRevisionFlow.Requester$30</ID>
<ID>MagicNumber:Structures.kt$PrivacySalt$32</ID>
<ID>MagicNumber:TargetVersionDependentRules.kt$StateContractValidationEnforcementRule$4</ID>
<ID>MagicNumber:TestNodeInfoBuilder.kt$TestNodeInfoBuilder$1234</ID>
<ID>MagicNumber:TestUtils.kt$10000</ID>
<ID>MagicNumber:TestUtils.kt$30000</ID>
@ -1154,7 +1121,6 @@
<ID>MagicNumber:TraderDemoClientApi.kt$TraderDemoClientApi$3</ID>
<ID>MagicNumber:TransactionBuilder.kt$TransactionBuilder$4</ID>
<ID>MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30</ID>
<ID>MagicNumber:TransactionUtils.kt$4</ID>
<ID>MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3</ID>
<ID>MagicNumber:TransactionViewer.kt$TransactionViewer$15.0</ID>
<ID>MagicNumber:TransactionViewer.kt$TransactionViewer$20.0</ID>
@ -1179,8 +1145,6 @@
<ID>MagicNumber:WebServer.kt$100.0</ID>
<ID>MagicNumber:WebServer.kt$WebServer$500</ID>
<ID>MagicNumber:WireTransaction.kt$WireTransaction$4</ID>
<ID>MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitCompletionFlow$60</ID>
<ID>MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitTradeApprovalFlow$60</ID>
<ID>MagicNumber:X509Utilities.kt$X509Utilities$3650</ID>
<ID>MagicNumber:errorAndTerminate.kt$10</ID>
<ID>MatchingDeclarationName:AMQPSerializerFactories.kt$net.corda.serialization.internal.amqp.AMQPSerializerFactories.kt</ID>
@ -1225,7 +1189,6 @@
<ID>MatchingDeclarationName:TestConstants.kt$net.corda.testing.core.TestConstants.kt</ID>
<ID>MatchingDeclarationName:TestUtils.kt$net.corda.testing.core.TestUtils.kt</ID>
<ID>MatchingDeclarationName:TransactionTypes.kt$net.corda.explorer.model.TransactionTypes.kt</ID>
<ID>MatchingDeclarationName:TutorialFlowStateMachines.kt$net.corda.docs.kotlin.tutorial.flowstatemachines.TutorialFlowStateMachines.kt</ID>
<ID>MatchingDeclarationName:Utils.kt$io.cryptoblk.core.Utils.kt</ID>
<ID>MatchingDeclarationName:VirtualCordapps.kt$net.corda.node.internal.cordapp.VirtualCordapps.kt</ID>
<ID>ModifierOrder:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected</ID>
@ -1317,13 +1280,8 @@
<ID>SpreadOperator:FlowFrameworkTripartyTests.kt$FlowFrameworkTripartyTests$(*expected)</ID>
<ID>SpreadOperator:FlowLogicRefFactoryImpl.kt$FlowLogicRefFactoryImpl$(flowClass, *args)</ID>
<ID>SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeAClasses.toTypedArray())</ID>
<ID>SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeBClasses.toTypedArray())</ID>
<ID>SpreadOperator:FlowTestsUtils.kt$(*allSessions)</ID>
<ID>SpreadOperator:FlowTestsUtils.kt$(session, *sessions)</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourInputStates.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirInputStates.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())</ID>
<ID>SpreadOperator:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$(OpaqueBytes(request.encoded), "Platform-Version" to "${versionInfo.platformVersion}", "Client-Version" to versionInfo.releaseVersion, "Private-Network-Map" to (config.pnm?.toString() ?: ""), *(config.csrToken?.let { arrayOf(CENM_SUBMISSION_TOKEN to it) } ?: arrayOf()))</ID>
<ID>SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray())</ID>
<ID>SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())</ID>
@ -1453,7 +1411,6 @@
<ID>ThrowsCount:StructuresTests.kt$AttachmentTest$@Test(timeout=300_000) fun `openAsJAR does not leak file handle if attachment has corrupted manifest`()</ID>
<ID>ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$ private fun getUniqueContractAttachmentsByContract(): Map&lt;ContractClassName, ContractAttachment&gt;</ID>
<ID>ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$// Using basic graph theory, a full cycle of encumbered (co-dependent) states should exist to achieve bi-directional // encumbrances. This property is important to ensure that no states involved in an encumbrance-relationship // can be spent on their own. Briefly, if any of the states is having more than one encumbrance references by // other states, a full cycle detection will fail. As a result, all of the encumbered states must be present // as "from" and "to" only once (or zero times if no encumbrance takes place). For instance, // a -&gt; b // c -&gt; b and a -&gt; b // b -&gt; a b -&gt; c // do not satisfy the bi-directionality (full cycle) property. // // In the first example "b" appears twice in encumbrance ("to") list and "c" exists in the encumbered ("from") list only. // Due the above, one could consume "a" and "b" in the same transaction and then, because "b" is already consumed, "c" cannot be spent. // // Similarly, the second example does not form a full cycle because "a" and "c" exist in one of the lists only. // As a result, one can consume "b" and "c" in the same transactions, which will make "a" impossible to be spent. // // On other hand the following are valid constructions: // a -&gt; b a -&gt; c // b -&gt; c and c -&gt; b // c -&gt; a b -&gt; a // and form a full cycle, meaning that the bi-directionality property is satisfied. private fun checkBidirectionalOutputEncumbrances(statesAndEncumbrance: List&lt;Pair&lt;Int, Int&gt;&gt;)</ID>
<ID>ThrowsCount:WireTransaction.kt$WireTransaction$private fun toLedgerTransactionInternal( resolveIdentity: (PublicKey) -&gt; Party?, resolveAttachment: (SecureHash) -&gt; Attachment?, resolveStateRefAsSerialized: (StateRef) -&gt; SerializedBytes&lt;TransactionState&lt;ContractState&gt;&gt;?, resolveParameters: (SecureHash?) -&gt; NetworkParameters?, isAttachmentTrusted: (Attachment) -&gt; Boolean ): LedgerTransaction</ID>
<ID>ThrowsCount:WireTransaction.kt$WireTransaction.Companion$ @CordaInternal fun resolveStateRefBinaryComponent(stateRef: StateRef, services: ServicesForResolution): SerializedBytes&lt;TransactionState&lt;ContractState&gt;&gt;?</ID>
<ID>TooGenericExceptionCaught:AMQPChannelHandler.kt$AMQPChannelHandler$ex: Exception</ID>
<ID>TooGenericExceptionCaught:AMQPExceptions.kt$th: Throwable</ID>
@ -1502,7 +1459,6 @@
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl.Companion$th: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorHandling.kt$ErrorHandling.CheckpointAfterErrorFlow$t: Throwable</ID>
<ID>TooGenericExceptionCaught:EventProcessor.kt$EventProcessor$ex: Exception</ID>
<ID>TooGenericExceptionCaught:Eventually.kt$e: Exception</ID>
@ -1574,7 +1530,6 @@
<ID>TooGenericExceptionCaught:NotaryUtils.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:ObjectDiffer.kt$ObjectDiffer$throwable: Exception</ID>
<ID>TooGenericExceptionCaught:P2PMessagingClient.kt$P2PMessagingClient$e: Exception</ID>
<ID>TooGenericExceptionCaught:PersistentIdentityMigrationNewTableTest.kt$PersistentIdentityMigrationNewTableTest$e: Exception</ID>
<ID>TooGenericExceptionCaught:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$e: Exception</ID>
<ID>TooGenericExceptionCaught:ProfileController.kt$ProfileController$e: Exception</ID>
<ID>TooGenericExceptionCaught:PropertyValidationTest.kt$PropertyValidationTest$e: Exception</ID>
@ -1715,6 +1670,7 @@
<ID>TooManyFunctions:RPCApi.kt$net.corda.nodeapi.RPCApi.kt</ID>
<ID>TooManyFunctions:RPCClientProxyHandler.kt$RPCClientProxyHandler : InvocationHandler</ID>
<ID>TooManyFunctions:RPCServer.kt$RPCServer</ID>
<ID>TooManyFunctions:SSLHelper.kt$net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper.kt</ID>
<ID>TooManyFunctions:SerializationHelper.kt$net.corda.serialization.internal.amqp.SerializationHelper.kt</ID>
<ID>TooManyFunctions:ServiceHub.kt$ServiceHub : ServicesForResolution</ID>
<ID>TooManyFunctions:SignedTransaction.kt$SignedTransaction : TransactionWithSignatures</ID>
@ -1742,8 +1698,6 @@
<ID>UnusedImports:Amount.kt$import net.corda.core.crypto.CompositeKey</ID>
<ID>UnusedImports:Amount.kt$import net.corda.core.identity.Party</ID>
<ID>UnusedImports:DummyLinearStateSchemaV1.kt$import net.corda.core.contracts.ContractState</ID>
<ID>UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.core.internal.packageName</ID>
<ID>UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.finance.schemas.CashSchemaV1</ID>
<ID>UnusedImports:InternalTestUtils.kt$import java.nio.file.Files</ID>
<ID>UnusedImports:InternalTestUtils.kt$import net.corda.nodeapi.internal.loadDevCaTrustStore</ID>
<ID>UnusedImports:NetworkMap.kt$import net.corda.core.node.NodeInfo</ID>
@ -2012,8 +1966,6 @@
<ID>WildcardImport:CordaModule.kt$import net.corda.core.identity.*</ID>
<ID>WildcardImport:CordaModule.kt$import net.corda.core.transactions.*</ID>
<ID>WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*</ID>
<ID>WildcardImport:CordaServiceTest.kt$import kotlin.test.*</ID>
<ID>WildcardImport:CordaViewModel.kt$import tornadofx.*</ID>
<ID>WildcardImport:Cordapp.kt$import net.corda.core.cordapp.Cordapp.Info.*</ID>
@ -2031,10 +1983,6 @@
<ID>WildcardImport:CryptoSignUtils.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:CryptoUtilsTest.kt$import kotlin.test.*</ID>
<ID>WildcardImport:CustomCordapp.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CustomVaultQuery.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:CustomVaultQuery.kt$import net.corda.core.utilities.*</ID>
<ID>WildcardImport:CustomVaultQueryTest.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CustomVaultQueryTest.kt$import net.corda.finance.*</ID>
<ID>WildcardImport:DBNetworkParametersStorage.kt$import javax.persistence.*</ID>
<ID>WildcardImport:DBRunnerExtension.kt$import org.junit.jupiter.api.extension.*</ID>
<ID>WildcardImport:DBTransactionStorage.kt$import javax.persistence.*</ID>
@ -2057,7 +2005,6 @@
<ID>WildcardImport:DeserializeSimpleTypesTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:DigitalSignatureWithCert.kt$import java.security.cert.*</ID>
<ID>WildcardImport:DistributedServiceTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:DoRemainingWorkTransition.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:DockerInstantiator.kt$import com.github.dockerjava.api.model.*</ID>
<ID>WildcardImport:DriverDSLImpl.kt$import net.corda.testing.driver.*</ID>
<ID>WildcardImport:DummyContract.kt$import net.corda.core.contracts.*</ID>
@ -2076,8 +2023,6 @@
<ID>WildcardImport:EvolutionSerializerFactoryTests.kt$import kotlin.test.*</ID>
<ID>WildcardImport:EvolutionSerializerFactoryTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:Explorer.kt$import tornadofx.*</ID>
<ID>WildcardImport:FiberDeserializationCheckingInterceptor.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:FinalityFlowMigration.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FinalityFlowTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:FinalityFlowTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FinalityHandlerTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*</ID>
@ -2089,11 +2034,7 @@
<ID>WildcardImport:FlowCheckpointCordapp.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowCookbook.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:FlowCookbook.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkPersistenceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTripartyTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowLogicRefFactoryImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowMatchers.kt$import net.corda.coretesting.internal.matchers.*</ID>
@ -2101,10 +2042,7 @@
<ID>WildcardImport:FlowRetryTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStackSnapshotTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachine.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowsDrainingModeContentionTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FxTransactionBuildTutorialTest.kt$import net.corda.finance.*</ID>
<ID>WildcardImport:GenericsTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:Gui.kt$import tornadofx.*</ID>
<ID>WildcardImport:GuiUtilities.kt$import tornadofx.*</ID>
@ -2121,8 +2059,6 @@
<ID>WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.EqualityComparisonOperator.*</ID>
<ID>WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.LikenessOperator.*</ID>
<ID>WildcardImport:HibernateStatistics.kt$import org.hibernate.stat.*</ID>
<ID>WildcardImport:IOUContract.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:IOUFlowResponder.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:IRS.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:IRS.kt$import net.corda.finance.contracts.*</ID>
<ID>WildcardImport:IRSState.kt$import net.corda.core.contracts.*</ID>
@ -2169,7 +2105,6 @@
<ID>WildcardImport:JarSignatureCollectorTest.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:KeyStoreUtilities.kt$import java.security.*</ID>
<ID>WildcardImport:KeyStoreUtilities.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:KotlinIntegrationTestingTutorial.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:Kryo.kt$import com.esotericsoftware.kryo.*</ID>
<ID>WildcardImport:Kryo.kt$import net.corda.core.transactions.*</ID>
<ID>WildcardImport:KryoStreamsTest.kt$import java.io.*</ID>
@ -2420,8 +2355,6 @@
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:SimpleMQClient.kt$import org.apache.activemq.artemis.api.core.client.*</ID>
<ID>WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.node.services.statemachine.interceptors.*</ID>
<ID>WildcardImport:SpringDriver.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.node.services.vault.*</ID>
@ -2473,8 +2406,6 @@
<ID>WildcardImport:TransactionViewer.kt$import net.corda.client.jfx.utils.*</ID>
<ID>WildcardImport:TransactionViewer.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TransactionViewer.kt$import tornadofx.*</ID>
<ID>WildcardImport:TutorialContract.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TutorialTestDSL.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:TwoPartyDealFlow.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.flows.*</ID>
@ -2502,7 +2433,6 @@
<ID>WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:VaultFiller.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:VaultFlowTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.QueryCriteria.*</ID>

View File

@ -200,10 +200,7 @@ internal fun createClientSslHelper(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set<CordaX500Name>,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, newSecureRandom())
val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
@ -239,10 +236,7 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
internal fun createServerSslHandler(keyStore: CertificateStore,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, newSecureRandom())
val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine()
sslEngine.useClientMode = false
sslEngine.needClientAuth = true
@ -256,6 +250,15 @@ internal fun createServerSslHandler(keyStore: CertificateStore,
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
fun createAndInitSslContext(keyManagerFactory: KeyManagerFactory, trustManagerFactory: TrustManagerFactory): SSLContext {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java)
.map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, newSecureRandom())
return sslContext
}
@VisibleForTesting
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore, revocationConfig: RevocationConfig): ManagerFactoryParameters {
val pkixParams = PKIXBuilderParameters(trustStore.value.internal, X509CertSelector())

View File

@ -0,0 +1,216 @@
package net.corda.node.amqp;
import net.corda.nodeapi.internal.protonwrapper.netty.SSLHelperKt;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManagerFactory;
/**
* An SSL/TLS client that connects to a server using its IP address and port.
* <p/>
* After initialization of a {@link NioSslClient} object, {@link NioSslClient#connect()} should be called,
* in order to establish connection with the server.
* <p/>
* When the connection between the client and the object is established, {@link NioSslClient} provides
* a public write and read method, in order to communicate with its peer.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public class NioSslClient extends NioSslPeer {
/**
* The remote address of the server this client is configured to connect to.
*/
private final String remoteAddress;
/**
* The port of the server this client is configured to connect to.
*/
private final int port;
/**
* The engine that will be used to encrypt/decrypt data between this client and the server.
*/
private final SSLEngine engine;
/**
* The socket channel that will be used as the transport link between this client and the server.
*/
private SocketChannel socketChannel;
/**
* Initiates the engine to run as a client using peer information, and allocates space for the
* buffers that will be used by the engine.
*
* @param remoteAddress The IP address of the peer.
* @param port The peer's port that will be used.
*/
public NioSslClient(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, String remoteAddress, int port) {
this.remoteAddress = remoteAddress;
this.port = port;
SSLContext context = SSLHelperKt.createAndInitSslContext(keyManagerFactory, trustManagerFactory);
engine = context.createSSLEngine(remoteAddress, port);
engine.setUseClientMode(true);
SSLSession session = engine.getSession();
myAppData = ByteBuffer.allocate(1024);
myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
peerAppData = ByteBuffer.allocate(1024);
peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
}
/**
* Opens a socket channel to communicate with the configured server and tries to complete the handshake protocol.
*
* @return True if client established a connection with the server, false otherwise.
*/
public boolean connect() throws Exception {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(remoteAddress, port));
while (!socketChannel.finishConnect()) {
// can do something here...
}
engine.beginHandshake();
return doHandshake(socketChannel, engine);
}
/**
* Public method to send a message to the server.
*
* @param message - message to be sent to the server.
* @throws IOException if an I/O error occurs to the socket channel.
*/
public void write(String message) throws IOException {
write(socketChannel, engine, message);
}
/**
* Implements the write method that sends a message to the server the client is connected to,
* but should not be called by the user, since socket channel and engine are inner class' variables.
* {@link NioSslClient#write(String)} should be called instead.
*
* @param message - message to be sent to the server.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
@Override
protected void write(SocketChannel socketChannel, SSLEngine engine, String message) throws IOException {
log.debug("About to write to the server...");
myAppData.clear();
myAppData.put(message.getBytes());
myAppData.flip();
while (myAppData.hasRemaining()) {
// The loop has a meaning for (outgoing) messages larger than 16KB.
// Every wrap call will remove 16KB from the original message and send it to the remote peer.
myNetData.clear();
SSLEngineResult result = engine.wrap(myAppData, myNetData);
switch (result.getStatus()) {
case OK:
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
log.debug("Message sent to the server: " + message);
break;
case BUFFER_OVERFLOW:
myNetData = enlargePacketBuffer(engine, myNetData);
break;
case BUFFER_UNDERFLOW:
throw new SSLException("Buffer underflow occured after a wrap. I don't think we should ever get here.");
case CLOSED:
closeConnection(socketChannel, engine);
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
}
/**
* Public method to try to read from the server.
*/
public void read() throws Exception {
read(socketChannel, engine);
}
/**
* Will wait for response from the remote peer, until it actually gets something.
* Uses {@link SocketChannel#read(ByteBuffer)}, which is non-blocking, and if
* it gets nothing from the peer, waits for {@code waitToReadMillis} and tries again.
* <p/>
* Just like {@link NioSslPeer#read(SocketChannel, SSLEngine)} it uses inner class' socket channel
* and engine and should not be used by the client. {@link NioSslClient#read()} should be called instead.
*
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
*/
@Override
protected void read(SocketChannel socketChannel, SSLEngine engine) throws Exception {
log.debug("About to read from the server...");
peerNetData.clear();
int waitToReadMillis = 50;
boolean exitReadLoop = false;
while (!exitReadLoop) {
int bytesRead = socketChannel.read(peerNetData);
if (bytesRead > 0) {
peerNetData.flip();
while (peerNetData.hasRemaining()) {
peerAppData.clear();
SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
switch (result.getStatus()) {
case OK:
peerAppData.flip();
log.debug("Server response: " + peerAppDataAsString());
exitReadLoop = true;
break;
case BUFFER_OVERFLOW:
peerAppData = enlargeApplicationBuffer(engine, peerAppData);
break;
case BUFFER_UNDERFLOW:
peerNetData = handleBufferUnderflow(engine, peerNetData);
break;
case CLOSED:
closeConnection(socketChannel, engine);
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
} else if (bytesRead < 0) {
handleEndOfStream(socketChannel, engine);
return;
}
Thread.sleep(waitToReadMillis);
}
}
/**
* Should be called when the client wants to explicitly close the connection to the server.
*
* @throws IOException if an I/O error occurs to the socket channel.
*/
public void shutdown() throws IOException {
log.debug("About to close connection with the server...");
closeConnection(socketChannel, engine);
executor.shutdown();
log.debug("Goodbye!");
}
}

View File

@ -0,0 +1,329 @@
package net.corda.node.amqp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* A class that represents an SSL/TLS peer, and can be extended to create a client or a server.
* <p/>
* It makes use of the JSSE framework, and specifically the {@link SSLEngine} logic, which
* is described by Oracle as "an advanced API, not appropriate for casual use", since
* it requires the user to implement much of the communication establishment procedure himself.
* More information about it can be found here: http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLEngine
* <p/>
* {@link NioSslPeer} implements the handshake protocol, required to establish a connection between two peers,
* which is common for both client and server and provides the abstract {@link NioSslPeer#read(SocketChannel, SSLEngine)} and
* {@link NioSslPeer#write(SocketChannel, SSLEngine, String)} methods, that need to be implemented by the specific SSL/TLS peer
* that is going to extend this class.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public abstract class NioSslPeer {
/**
* Class' logger.
*/
protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* Will contain this peer's application data in plaintext, that will be later encrypted
* using {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} and sent to the other peer. This buffer can typically
* be of any size, as long as it is large enough to contain this peer's outgoing messages.
* If this peer tries to send a message bigger than buffer's capacity a {@link BufferOverflowException}
* will be thrown.
*/
protected ByteBuffer myAppData;
/**
* Will contain this peer's encrypted data, that will be generated after {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)}
* is applied on {@link NioSslPeer#myAppData}. It should be initialized using {@link SSLSession#getPacketBufferSize()},
* which returns the size up to which, SSL/TLS packets will be generated from the engine under a session.
* All SSLEngine network buffers should be sized at least this large to avoid insufficient space problems when performing wrap and unwrap calls.
*/
protected ByteBuffer myNetData;
/**
* Will contain the other peer's (decrypted) application data. It must be large enough to hold the application data
* from any peer. Can be initialized with {@link SSLSession#getApplicationBufferSize()} for an estimation
* of the other peer's application data and should be enlarged if this size is not enough.
*/
protected ByteBuffer peerAppData;
/**
* Will contain the other peer's encrypted data. The SSL/TLS protocols specify that implementations should produce packets containing at most 16 KB of plaintext,
* so a buffer sized to this value should normally cause no capacity problems. However, some implementations violate the specification and generate large records up to 32 KB.
* If the {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} detects large inbound packets, the buffer sizes returned by SSLSession will be updated dynamically, so the this peer
* should check for overflow conditions and enlarge the buffer using the session's (updated) buffer size.
*/
protected ByteBuffer peerNetData;
/**
* Will be used to execute tasks that may emerge during handshake in parallel with the server's main thread.
*/
protected ExecutorService executor = Executors.newSingleThreadExecutor();
protected abstract void read(SocketChannel socketChannel, SSLEngine engine) throws Exception;
protected abstract void write(SocketChannel socketChannel, SSLEngine engine, String message) throws Exception;
/**
* Implements the handshake protocol between two peers, required for the establishment of the SSL/TLS connection.
* During the handshake, encryption configuration information - such as the list of available cipher suites - will be exchanged
* and if the handshake is successful will lead to an established SSL/TLS session.
*
* <p/>
* A typical handshake will usually contain the following steps:
*
* <ul>
* <li>1. wrap: ClientHello</li>
* <li>2. unwrap: ServerHello/Cert/ServerHelloDone</li>
* <li>3. wrap: ClientKeyExchange</li>
* <li>4. wrap: ChangeCipherSpec</li>
* <li>5. wrap: Finished</li>
* <li>6. unwrap: ChangeCipherSpec</li>
* <li>7. unwrap: Finished</li>
* </ul>
* <p/>
* Handshake is also used during the end of the session, in order to properly close the connection between the two peers.
* A proper connection close will typically include the one peer sending a CLOSE message to another, and then wait for
* the other's CLOSE message to close the transport link. The other peer from his perspective would read a CLOSE message
* from his peer and then enter the handshake procedure to send his own CLOSE message as well.
*
* @param socketChannel - the socket channel that connects the two peers.
* @param engine - the engine that will be used for encryption/decryption of the data exchanged with the other peer.
* @return True if the connection handshake was successful or false if an error occurred.
* @throws IOException - if an error occurs during read/write to the socket channel.
*/
protected boolean doHandshake(SocketChannel socketChannel, SSLEngine engine) throws IOException {
log.debug("About to do handshake...");
SSLEngineResult result;
HandshakeStatus handshakeStatus;
// NioSslPeer's fields myAppData and peerAppData are supposed to be large enough to hold all message data the peer
// will send and expects to receive from the other peer respectively. Since the messages to be exchanged will usually be less
// than 16KB long the capacity of these fields should also be smaller. Here we initialize these two local buffers
// to be used for the handshake, while keeping client's buffers at the same size.
int appBufferSize = engine.getSession().getApplicationBufferSize();
ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
myNetData.clear();
peerNetData.clear();
handshakeStatus = engine.getHandshakeStatus();
while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
switch (handshakeStatus) {
case NEED_UNWRAP:
if (socketChannel.read(peerNetData) < 0) {
if (engine.isInboundDone() && engine.isOutboundDone()) {
return false;
}
try {
engine.closeInbound();
} catch (SSLException e) {
log.error("This engine was forced to close inbound, without having received the proper SSL/TLS close " +
"notification message from the peer, due to end of stream.", e);
}
engine.closeOutbound();
// After closeOutbound the engine will be set to WRAP state, in order to try to send a close message to the client.
handshakeStatus = engine.getHandshakeStatus();
break;
}
peerNetData.flip();
try {
result = engine.unwrap(peerNetData, peerAppData);
peerNetData.compact();
handshakeStatus = result.getHandshakeStatus();
} catch (SSLException sslException) {
log.error("A problem was encountered while processing the data that caused the SSLEngine to abort." +
" Will try to properly close connection...", sslException);
engine.closeOutbound();
handshakeStatus = engine.getHandshakeStatus();
break;
}
switch (result.getStatus()) {
case OK:
break;
case BUFFER_OVERFLOW:
// Will occur when peerAppData's capacity is smaller than the data derived from peerNetData's unwrap.
peerAppData = enlargeApplicationBuffer(engine, peerAppData);
break;
case BUFFER_UNDERFLOW:
// Will occur either when no data was read from the peer or when the peerNetData buffer was too small to hold all peer's data.
peerNetData = handleBufferUnderflow(engine, peerNetData);
break;
case CLOSED:
if (engine.isOutboundDone()) {
return false;
} else {
engine.closeOutbound();
handshakeStatus = engine.getHandshakeStatus();
break;
}
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
break;
case NEED_WRAP:
myNetData.clear();
try {
result = engine.wrap(myAppData, myNetData);
handshakeStatus = result.getHandshakeStatus();
} catch (SSLException sslException) {
log.error("A problem was encountered while processing the data that caused the SSLEngine to abort." +
"Will try to properly close connection...", sslException);
engine.closeOutbound();
handshakeStatus = engine.getHandshakeStatus();
break;
}
switch (result.getStatus()) {
case OK :
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
break;
case BUFFER_OVERFLOW:
// Will occur if there is not enough space in myNetData buffer to write all the data that would be generated by the method wrap.
// Since myNetData is set to session's packet size we should not get to this point because SSLEngine is supposed
// to produce messages smaller or equal to that, but a general handling would be the following:
myNetData = enlargePacketBuffer(engine, myNetData);
break;
case BUFFER_UNDERFLOW:
throw new SSLException("Buffer underflow occurred after a wrap. I don't think we should ever get here.");
case CLOSED:
try {
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
// At this point the handshake status will probably be NEED_UNWRAP so we make sure that peerNetData is clear to read.
peerNetData.clear();
} catch (Exception e) {
log.error("Failed to send server's CLOSE message due to socket channel's failure.");
handshakeStatus = engine.getHandshakeStatus();
}
break;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
break;
case NEED_TASK:
Runnable task;
while ((task = engine.getDelegatedTask()) != null) {
executor.execute(task);
}
handshakeStatus = engine.getHandshakeStatus();
break;
default:
throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
}
}
log.debug("Handshake status: " + handshakeStatus);
return true;
}
protected ByteBuffer enlargePacketBuffer(SSLEngine engine, ByteBuffer buffer) {
return enlargeBuffer(buffer, engine.getSession().getPacketBufferSize());
}
protected ByteBuffer enlargeApplicationBuffer(SSLEngine engine, ByteBuffer buffer) {
return enlargeBuffer(buffer, engine.getSession().getApplicationBufferSize());
}
/**
* Compares <code>sessionProposedCapacity<code> with buffer's capacity. If buffer's capacity is smaller,
* returns a buffer with the proposed capacity. If it's equal or larger, returns a buffer
* with capacity twice the size of the initial one.
*
* @param buffer - the buffer to be enlarged.
* @param sessionProposedCapacity - the minimum size of the new buffer, proposed by {@link SSLSession}.
* @return A new buffer with a larger capacity.
*/
protected ByteBuffer enlargeBuffer(ByteBuffer buffer, int sessionProposedCapacity) {
if (sessionProposedCapacity > buffer.capacity()) {
buffer = ByteBuffer.allocate(sessionProposedCapacity);
} else {
buffer = ByteBuffer.allocate(buffer.capacity() * 2);
}
return buffer;
}
/**
* Handles {@link SSLEngineResult.Status#BUFFER_UNDERFLOW}. Will check if the buffer is already filled, and if there is no space problem
* will return the same buffer, so the client tries to read again. If the buffer is already filled will try to enlarge the buffer either to
* session's proposed size or to a larger capacity. A buffer underflow can happen only after an unwrap, so the buffer will always be a
* peerNetData buffer.
*
* @param buffer - will always be peerNetData buffer.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @return The same buffer if there is no space problem or a new buffer with the same data but more space.
*/
protected ByteBuffer handleBufferUnderflow(SSLEngine engine, ByteBuffer buffer) {
if (engine.getSession().getPacketBufferSize() < buffer.limit()) {
return buffer;
} else {
ByteBuffer replaceBuffer = enlargePacketBuffer(engine, buffer);
buffer.flip();
replaceBuffer.put(buffer);
return replaceBuffer;
}
}
/**
* This method should be called when this peer wants to explicitly close the connection
* or when a close message has arrived from the other peer, in order to provide an orderly shutdown.
* <p/>
* It first calls {@link SSLEngine#closeOutbound()} which prepares this peer to send its own close message and
* sets {@link SSLEngine} to the <code>NEED_WRAP</code> state. Then, it delegates the exchange of close messages
* to the handshake method and finally, it closes socket channel.
*
* @param socketChannel - the transport link used between the two peers.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
protected void closeConnection(SocketChannel socketChannel, SSLEngine engine) throws IOException {
engine.closeOutbound();
doHandshake(socketChannel, engine);
socketChannel.close();
}
/**
* In addition to orderly shutdowns, an unorderly shutdown may occur, when the transport link (socket channel)
* is severed before close messages are exchanged. This may happen by getting an -1 or {@link IOException}
* when trying to read from the socket channel, or an {@link IOException} when trying to write to it.
* In both cases {@link SSLEngine#closeInbound()} should be called and then try to follow the standard procedure.
*
* @param socketChannel - the transport link used between the two peers.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
protected void handleEndOfStream(SocketChannel socketChannel, SSLEngine engine) throws IOException {
try {
engine.closeInbound();
} catch (Exception e) {
log.error("This engine was forced to close inbound, without having received the proper SSL/TLS close notification message from the peer, due to end of stream.");
}
closeConnection(socketChannel, engine);
}
protected String peerAppDataAsString() {
return new String(Arrays.copyOf(peerAppData.array(), peerAppData.limit()));
}
}

View File

@ -0,0 +1,263 @@
package net.corda.node.amqp;
import net.corda.nodeapi.internal.protonwrapper.netty.SSLHelperKt;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.time.Duration;
import java.util.Iterator;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManagerFactory;
/**
* An SSL/TLS server, that will listen to a specific address and port and serve SSL/TLS connections
* compatible with the protocol it applies.
* <p/>
* After initialization {@link NioSslServer#start()} should be called so the server starts to listen to
* new connection requests. At this point, start is blocking, so, in order to be able to gracefully stop
* the server, a {@link Runnable} containing a server object should be created. This runnable should
* start the server in its run method and also provide a stop method, which will call {@link NioSslServer#stop()}.
* </p>
* NioSslServer makes use of Java NIO, and specifically listens to new connection requests with a {@link ServerSocketChannel}, which will
* create new {@link SocketChannel}s and a {@link Selector} which serves all the connections in one thread.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public class NioSslServer extends NioSslPeer {
private final Duration handshakeDelay;
/**
* Declares if the server is active to serve and create new connections.
*/
private boolean active;
/**
* The context will be initialized with a specific SSL/TLS protocol and will then be used
* to create {@link SSLEngine} classes for each new connection that arrives to the server.
*/
private final SSLContext context;
/**
* A part of Java NIO that will be used to serve all connections to the server in one thread.
*/
private final Selector selector;
/**
* Server is designed to apply an SSL/TLS protocol and listen to an IP address and port.
*
* @param hostAddress - the IP address this server will listen to.
* @param port - the port this server will listen to.
* @param handshakeDelay - if not [null] specifies for how long the handshake should be delayed
*/
public NioSslServer(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, String hostAddress, int port,
Duration handshakeDelay) throws Exception {
context = SSLHelperKt.createAndInitSslContext(keyManagerFactory, trustManagerFactory);
SSLSession dummySession = context.createSSLEngine().getSession();
myAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
myNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
peerAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
peerNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
dummySession.invalidate();
selector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(hostAddress, port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
this.handshakeDelay = handshakeDelay;
active = true;
}
/**
* Should be called in order the server to start listening to new connections.
* This method will run in a loop as long as the server is active. In order to stop the server
* you should use {@link NioSslServer#stop()} which will set it to inactive state
* and also wake up the listener, which may be in blocking select() state.
*/
public void start() throws Exception {
log.debug("Initialized and waiting for new connections...");
while (isActive()) {
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read((SocketChannel) key.channel(), (SSLEngine) key.attachment());
}
}
}
log.debug("Goodbye!");
}
/**
* Sets the server to an inactive state, in order to exit the reading loop in {@link NioSslServer#start()}
* and also wakes up the selector, which may be in select() blocking state.
*/
public void stop() {
log.debug("Will now close server...");
active = false;
executor.shutdown();
selector.wakeup();
}
/**
* Will be called after a new connection request arrives to the server. Creates the {@link SocketChannel} that will
* be used as the network layer link, and the {@link SSLEngine} that will encrypt and decrypt all the data
* that will be exchanged during the session with this specific client.
*
* @param key - the key dedicated to the {@link ServerSocketChannel} used by the server to listen to new connection requests.
*/
private void accept(SelectionKey key) throws Exception {
log.debug("New connection request!");
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);
SSLEngine engine = context.createSSLEngine();
engine.setUseClientMode(false);
// Demand client to present its certificate
engine.setNeedClientAuth(true);
engine.beginHandshake();
if (handshakeDelay != null) {
log.info("Deliberately sleeping during handshake for: " + handshakeDelay);
Thread.sleep(handshakeDelay.toMillis());
}
if (doHandshake(socketChannel, engine)) {
socketChannel.register(selector, SelectionKey.OP_READ, engine);
} else {
socketChannel.close();
log.debug("Connection closed due to handshake failure.");
}
}
/**
* Will be called by the selector when the specific socket channel has data to be read.
* As soon as the server reads these data, it will call {@link NioSslServer#write(SocketChannel, SSLEngine, String)}
* to send back a trivial response.
*
* @param socketChannel - the transport link used between the two peers.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
@Override
protected void read(SocketChannel socketChannel, SSLEngine engine) throws IOException {
log.debug("About to read from a client...");
peerNetData.clear();
int bytesRead = socketChannel.read(peerNetData);
if (bytesRead > 0) {
peerNetData.flip();
while (peerNetData.hasRemaining()) {
peerAppData.clear();
SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
switch (result.getStatus()) {
case OK:
peerAppData.flip();
log.debug("Incoming message: " + peerAppDataAsString());
break;
case BUFFER_OVERFLOW:
peerAppData = enlargeApplicationBuffer(engine, peerAppData);
break;
case BUFFER_UNDERFLOW:
peerNetData = handleBufferUnderflow(engine, peerNetData);
break;
case CLOSED:
log.debug("Client wants to close connection...");
closeConnection(socketChannel, engine);
log.debug("Goodbye client!");
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
write(socketChannel, engine, "Hello! I am your server!");
} else if (bytesRead < 0) {
log.error("Received end of stream. Will try to close connection with client...");
handleEndOfStream(socketChannel, engine);
log.debug("Goodbye client!");
}
}
/**
* Will send a message back to a client.
*
* @param message - the message to be sent.
* @throws IOException if an I/O error occurs to the socket channel.
*/
@Override
protected void write(SocketChannel socketChannel, SSLEngine engine, String message) throws IOException {
log.debug("About to write to a client...");
myAppData.clear();
myAppData.put(message.getBytes());
myAppData.flip();
while (myAppData.hasRemaining()) {
// The loop has a meaning for (outgoing) messages larger than 16KB.
// Every wrap call will remove 16KB from the original message and send it to the remote peer.
myNetData.clear();
SSLEngineResult result = engine.wrap(myAppData, myNetData);
switch (result.getStatus()) {
case OK:
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
log.debug("Message sent to the client: " + message);
break;
case BUFFER_OVERFLOW:
myNetData = enlargePacketBuffer(engine, myNetData);
break;
case BUFFER_UNDERFLOW:
throw new SSLException("Buffer underflow occurred after a wrap. I don't think we should ever get here.");
case CLOSED:
closeConnection(socketChannel, engine);
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
}
/**
* Determines if the the server is active or not.
*
* @return if the server is active or not.
*/
public boolean isActive() {
return active;
}
}

View File

@ -0,0 +1,69 @@
package net.corda.node.amqp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import java.time.Duration;
/**
* This class provides a runnable that can be used to initialize a {@link NioSslServer} thread.
* <p/>
* Run starts the server, which will start listening to the configured IP address and port for
* new SSL/TLS connections and serve the ones already connected to it.
* <p/>
* Also a stop method is provided in order to gracefully close the server and stop the thread.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public class ServerThread implements AutoCloseable {
private final static Logger log = LoggerFactory.getLogger(ServerThread.class);
private static final long JOIN_TIMEOUT_MS = 10000;
private final NioSslServer server;
private Thread serverThread;
public ServerThread(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, int port) throws Exception {
this(keyManagerFactory, trustManagerFactory, port, null);
}
public ServerThread(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, int port, @Nullable Duration handshakeDelay) throws Exception {
server = new NioSslServer(keyManagerFactory, trustManagerFactory, "localhost", port, handshakeDelay);
}
public void start() {
Runnable serverRunnable = () -> {
try {
server.start();
} catch (Exception e) {
log.error("Exception starting server", e);
}
};
serverThread = new Thread(serverRunnable, this.getClass().getSimpleName() + "-ServerThread");
serverThread.start();
}
/**
* Should be called in order to gracefully stop the server.
*/
public void stop() throws InterruptedException {
server.stop();
serverThread.join(JOIN_TIMEOUT_MS);
}
public boolean isActive() {
return server.isActive();
}
@Override
public void close() throws Exception {
stop();
}
}

View File

@ -0,0 +1,208 @@
package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.internal.div
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.init
import net.corda.nodeapi.internal.protonwrapper.netty.initialiseTrustStoreAndEnableCrlChecking
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.driver.internal.incrementalPortAllocation
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.test.assertFalse
import kotlin.test.assertTrue
/**
* This test verifies some edge case scenarios like handshake timeouts when [AMQPClient] connected to the server
*
* In order to have control over handshake internals a simple TLS server is created which may have a configurable handshake delay.
*/
@RunWith(Parameterized::class)
class AMQPClientSslErrorsTest(@Suppress("unused") private val iteration: Int) {
companion object {
private const val MAX_MESSAGE_SIZE = 10 * 1024
private val log = contextLogger()
@JvmStatic
@Parameterized.Parameters(name = "iteration = {0}")
fun iterations(): Iterable<Array<Int>> {
// It is possible to change this value to a greater number
// to ensure that the test is not flaking when executed on CI
val repsCount = 1
return (1..repsCount).map { arrayOf(it) }
}
}
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val portAllocation = incrementalPortAllocation()
private lateinit var serverKeyManagerFactory: KeyManagerFactory
private lateinit var serverTrustManagerFactory: TrustManagerFactory
private lateinit var clientKeyManagerFactory: KeyManagerFactory
private lateinit var clientTrustManagerFactory: TrustManagerFactory
private lateinit var clientAmqpConfig: AMQPConfiguration
@Before
fun setup() {
setupServerCertificates()
setupClientCertificates()
}
private fun setupServerCertificates() {
val baseDirectory = temporaryFolder.root.toPath() / "server"
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val serverConfig = mock<NodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(ALICE_NAME).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
}
serverConfig.configureWithDevSSLCertificate()
val keyStore = serverConfig.p2pSslOptions.keyStore.get()
val serverAmqpConfig = object : AMQPConfiguration {
override val keyStore = keyStore
override val trustStore = serverConfig.p2pSslOptions.trustStore.get()
override val revocationConfig = true.toRevocationConfig()
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
}
serverKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
serverTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
serverKeyManagerFactory.init(keyStore)
serverTrustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(serverAmqpConfig.trustStore, serverAmqpConfig.revocationConfig))
}
private fun setupClientCertificates() {
val baseDirectory = temporaryFolder.root.toPath() / "client"
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val clientConfig = mock<NodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(true).whenever(it).crlCheckSoftFail
}
clientConfig.configureWithDevSSLCertificate()
//val nodeCert = (signingCertificateStore to p2pSslConfiguration).recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint)
val keyStore = clientConfig.p2pSslOptions.keyStore.get()
clientAmqpConfig = object : AMQPConfiguration {
override val keyStore = keyStore
override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
override val sslHandshakeTimeout: Long = 3000
}
clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
clientTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
clientKeyManagerFactory.init(keyStore)
clientTrustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(clientAmqpConfig.trustStore, clientAmqpConfig.revocationConfig))
}
@Test(timeout = 300_000)
fun trivialClientServerExchange() {
val serverPort = portAllocation.nextPort()
val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort).also { it.start() }
//System.setProperty("javax.net.debug", "all");
serverThread.use {
val client = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
client.connect()
client.write("Hello! I am a client!")
client.read()
client.shutdown()
val client2 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
val client3 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
val client4 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
client2.connect()
client2.write("Hello! I am another client!")
client2.read()
client2.shutdown()
client3.connect()
client4.connect()
client3.write("Hello from client3!!!")
client4.write("Hello from client4!!!")
client3.read()
client4.read()
client3.shutdown()
client4.shutdown()
}
assertFalse(serverThread.isActive)
}
@Test(timeout = 300_000)
fun amqpClientServerConnect() {
val serverPort = portAllocation.nextPort()
val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort)
.also { it.start() }
serverThread.use {
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), clientAmqpConfig)
amqpClient.use {
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val clientConnect = clientConnected.get()
assertTrue(clientConnect.connected)
log.info("Confirmed connected")
}
}
assertFalse(serverThread.isActive)
}
@Test(timeout = 300_000)
fun amqpClientServerHandshakeTimeout() {
val serverPort = portAllocation.nextPort()
val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort, 5.seconds)
.also { it.start() }
serverThread.use {
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), clientAmqpConfig)
amqpClient.use {
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val clientConnect = clientConnected.get()
assertFalse(clientConnect.connected)
// Not a badCert, but a timeout during handshake
assertFalse(clientConnect.badCert)
}
}
assertFalse(serverThread.isActive)
}
}