diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt
index 18e4f878cb..031b990b0a 100644
--- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt
+++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt
@@ -26,6 +26,7 @@ import net.corda.testing.node.internal.rpcDriver
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.Test
+import java.lang.RuntimeException
 import java.lang.Thread.sleep
 import java.time.Duration
 import java.util.concurrent.CountDownLatch
@@ -163,6 +164,47 @@ class CordaRPCClientReconnectionTest {
         }
     }
 
+    @Test(timeout=300_000)
+    fun `when user code throws an error on a reconnecting observable, then onError is invoked and observable is unsubscribed successfully`() {
+        driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
+            val normalLatch = CountDownLatch(1)
+            val errorLatch = CountDownLatch(1)
+            var observedEvents = 0
+
+            fun startNode(address: NetworkHostAndPort): NodeHandle {
+                return startNode(
+                        providedName = CHARLIE_NAME,
+                        rpcUsers = listOf(CordaRPCClientTest.rpcUser),
+                        customOverrides = mapOf("rpcSettings.address" to address.toString())
+                ).getOrThrow()
+            }
+
+            val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))
+
+            startNode(addresses[0])
+            val client = CordaRPCClient(addresses)
+
+            (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
+                val rpcOps = it.proxy as ReconnectingCordaRPCOps
+                val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
+                val subscription = cashStatesFeed.updates.subscribe ({
+                    normalLatch.countDown()
+                    observedEvents++
+                    throw RuntimeException()
+                }, {
+                    errorLatch.countDown()
+                })
+                rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
+                rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
+
+                assertTrue { normalLatch.await(2, TimeUnit.SECONDS) }
+                assertTrue { errorLatch.await(2, TimeUnit.SECONDS) }
+                assertThat(subscription.isUnsubscribed).isTrue()
+                assertThat(observedEvents).isEqualTo(1)
+            }
+        }
+    }
+
     @Test(timeout=300_000)
     fun `an RPC call fails, when the maximum number of attempts is exceeded`() {
         driver(DriverParameters(cordappsForAllNodes = emptyList())) {
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt
index b5052d9bbf..fe3c03f453 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt
@@ -1,5 +1,6 @@
 package net.corda.client.rpc.internal
 
+import net.corda.client.rpc.ConnectionFailureException
 import net.corda.core.messaging.DataFeed
 import rx.Observable
 import rx.Subscriber
@@ -54,16 +55,30 @@ class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubs
             }
         }
 
+        /**
+         * Depending on the type of error, the reaction is different:
+         * - If the error is coming from a connection disruption, we establish a new connection and re-wire the observable
+         *   without letting the client notice at all.
+         * - In any other case, we let the error propagate to the client's observable. Both of the observables
+         *   (this one and the client's one) will be automatically unsubscribed, since that's the semantics of onError.
+         */
         private fun scheduleResubscribe(error: Throwable) {
             if (unsubscribed) return
-            reconnectingRPCConnection.observersPool.execute {
-                if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
-                reconnectingRPCConnection.reconnectOnError(error)
-                // It can take a while to reconnect so we might find that we've shutdown in in the meantime
-                if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
-                val newDataFeed = createDataFeed()
-                subscribeImmediately(newDataFeed)
+
+            if (error is ConnectionFailureException) {
+                reconnectingRPCConnection.observersPool.execute {
+                    if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
+                    reconnectingRPCConnection.reconnectOnError(error)
+                    // It can take a while to reconnect so we might find that we've shutdown in in the meantime
+                    if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
+                    val newDataFeed = createDataFeed()
+                    subscribeImmediately(newDataFeed)
+                }
+            } else {
+                val subscriber = checkNotNull(this.subscriber.get())
+                subscriber.onError(error)
             }
+
         }
     }
 }
\ No newline at end of file
diff --git a/detekt-baseline.xml b/detekt-baseline.xml
index 2c6cf777c8..39017a0c2b 100644
--- a/detekt-baseline.xml
+++ b/detekt-baseline.xml
@@ -1656,6 +1656,7 @@
     <ID>TooGenericExceptionThrown:ClassLoadingUtilsTest.kt$ClassLoadingUtilsTest$throw RuntimeException()</ID>
     <ID>TooGenericExceptionThrown:CommandParsers.kt$AzureParser.RegionConverter$throw Error("Unknown azure region: $value")</ID>
     <ID>TooGenericExceptionThrown:ContractHierarchyTest.kt$ContractHierarchyTest.IndirectContractParent$throw RuntimeException("Boom!")</ID>
+    <ID>TooGenericExceptionThrown:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$throw RuntimeException()</ID>
     <ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated exit of ${request.amount} from $issuer, however there is no cash to exit!" )</ID>
     <ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from $issuer, " + "however they only have $issuerQuantity!" )</ID>
     <ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however there is no cash from $issuer!" )</ID>