Docs: improve appearance of the Kafka design doc.

This commit is contained in:
Mike Hearn 2018-05-14 16:57:58 +02:00
parent 3a80cdfb68
commit 9868a18361
10 changed files with 166 additions and 114 deletions

View File

@ -1,5 +1,7 @@
# High availability support
.. important:: This design document describes a feature of Corda Enterprise.
## Overview
### Background

View File

@ -0,0 +1,50 @@
# Design Decision: Storage engine for committed state index
## Background / Context
The storage engine for the committed state index needs to support a single operation: "insert all values with unique
keys, or abort if any key conflict found". A wide range of solutions could be used for that, from embedded key-value
stores to full-fledged relational databases. However, since we don't need any extra features a RDBMS provides over a
simple key-value store, we'll only consider lightweight embedded solutions to avoid extra operational costs.
Most RDBMSs are also generally optimised for read performance (use B-tree based storage engines like InnoDB, MyISAM).
Our workload is write-heavy and uses "random" primary keys (state references), which leads to particularly poor write
performance for those types of engines as we have seen with our Galera-based notary service. One exception is the
MyRocks storage engine, which is based on RocksDB and can handle write workloads well, and is supported by Percona
Server, and MariaDB. It is easier, however, to just use RocksDB directly.
## Options Analysis
### A. RocksDB
An embedded key-value store based on log-structured merge-trees (LSM). It's highly configurable, provides lots of
configuration options for performance tuning. E.g. can be tuned to run on different hardware flash, hard disks or
entirely in-memory.
### B. LMDB
An embedded key-value store using B+ trees, has ACID semantics and support for transactions.
### C. MapDB
An embedded Java database engine, providing persistent collection implementations. Uses memory mapped files. Simple to
use, implements Java collection interfaces. Provides a HashMap implementation that we can use for storing committed
states.
### D. MVStore
An embedded log structured key-value store. Provides a simple persistent map abstraction. Supports multiple map
implementations (B-tree, R-tree, concurrent B-tree).
## Recommendation and justification
Performance test results when running on a Macbook Pro with Intel Core i7-4980HQ CPU @ 2.80GHz, 16 GB RAM, SSD:
![Comparison](../images/store-comparison.png)
Multiple tests were run with varying number of transactions and input states per transaction: "1m x 1" denotes a million
transactions with one input state.
Proceed with Option A, as RocksDB provides most tuning options and achieves by far the best write performance.
Note that the index storage engine can be replaced in the future with minimal changes required on the notary service.

View File

@ -1,12 +1,10 @@
![Corda](https://www.corda.net/wp-content/uploads/2016/11/fg005_corda_b.png)
--------------------------------------------
Design Decision: Replication framework
================================
# Design Decision: Replication framework
## Background / Context
Multiple libraries/platforms exist for implementing fault-tolerant systems. In existing CFT notary implementations we experimented with using a traditional relational database with active replication, as well as a pure state machine replication approach based on CFT consensus algorithms.
Multiple libraries/platforms exist for implementing fault-tolerant systems. In existing CFT notary implementations we
experimented with using a traditional relational database with active replication, as well as a pure state machine
replication approach based on CFT consensus algorithms.
## Options Analysis
@ -14,7 +12,12 @@ Multiple libraries/platforms exist for implementing fault-tolerant systems. In e
*Raft-based fault-tolerant distributed coordination framework.*
Our first CFT notary notary implementation was based on Atomix. Atomix can be easily embedded into a Corda node and provides abstractions for implementing custom replicated state machines. In our case the state machine manages committed Corda contract states. When notarisation requests are sent to Atomix, they get forwarded to the leader node. The leader persists the request to a log, and replicates it to all followers. Once the majority of followers acknowledge receipt, it applies the request to the user-defined state machine. In our case we commit all input states in the request to a JDBC-backed map, or return an error if conflicts occur.
Our first CFT notary notary implementation was based on Atomix. Atomix can be easily embedded into a Corda node and
provides abstractions for implementing custom replicated state machines. In our case the state machine manages committed
Corda contract states. When notarisation requests are sent to Atomix, they get forwarded to the leader node. The leader
persists the request to a log, and replicates it to all followers. Once the majority of followers acknowledge receipt,
it applies the request to the user-defined state machine. In our case we commit all input states in the request to a
JDBC-backed map, or return an error if conflicts occur.
#### Advantages
@ -32,7 +35,8 @@ Our first CFT notary notary implementation was based on Atomix. Atomix can be ea
*Java persistence layer with a built-in Raft-based replicated key-value store.*
Conceptually similar to Atomix, but persists the state machine instead of the request log. Built around an abstract persistent key-value store: requests get cleaned up after replication and processing.
Conceptually similar to Atomix, but persists the state machine instead of the request log. Built around an abstract
persistent key-value store: requests get cleaned up after replication and processing.
#### Advantages
@ -52,7 +56,12 @@ Conceptually similar to Atomix, but persists the state machine instead of the re
*Paxos-based distributed streaming platform.*
Atomix and Permazen implement both the replicated request log and the state machine, but Kafka only provides the log component. In theory that means more complexity having to implement request log processing and state machine management, but for our use case it's fairly straightforward: consume requests and insert input states into a database, marking the position of the last processed request. If the database is lost, we can just replay the log from the beginning. The main benefit of this approach is that it gives a more granular control and performance tuning opportunities in different parts of the system.
Atomix and Permazen implement both the replicated request log and the state machine, but Kafka only provides the log
component. In theory that means more complexity having to implement request log processing and state machine management,
but for our use case it's fairly straightforward: consume requests and insert input states into a database, marking the
position of the last processed request. If the database is lost, we can just replay the log from the beginning. The main
benefit of this approach is that it gives a more granular control and performance tuning opportunities in different
parts of the system.
#### Advantages
@ -67,11 +76,16 @@ Atomix and Permazen implement both the replicated request log and the state mach
### D. Custom Raft-based implementation
For even more granular control, we could replace Kafka with our own replicated log implementation. Kafka was started before the Raft consensus algorithm was introduced, and is using Zookeeper for coordination, which is based on Paxos for consensus. Paxos is known to be complex to understand and implement, and the main driver behind Raft was to create a much simpler algorithm with equivalent functionality. Hence, while reimplementing Zookeeper would be an onerous task, building a Raft-based alternative from scratch is somewhat feasible.
For even more granular control, we could replace Kafka with our own replicated log implementation. Kafka was started
before the Raft consensus algorithm was introduced, and is using Zookeeper for coordination, which is based on Paxos for
consensus. Paxos is known to be complex to understand and implement, and the main driver behind Raft was to create a
much simpler algorithm with equivalent functionality. Hence, while reimplementing Zookeeper would be an onerous task,
building a Raft-based alternative from scratch is somewhat feasible.
#### Advantages
Most of the implementations above have many extra features our use-case does not require. We can implement a relatively simple clean optimised solution that will most likely outperform others (Thomas Schroeter already built a prototype).
Most of the implementations above have many extra features our use-case does not require. We can implement a relatively
simple clean optimised solution that will most likely outperform others (Thomas Schroeter already built a prototype).
#### Disadvantages
@ -81,9 +95,17 @@ Large effort required to make it highly performant and reliable.
*Synchronous replication plugin for MySQL, uses certification-based replication.*
All of the options discussed so far were based on abstract state machine replication. Another approach is simply using a more traditional RDBMS with active replication support. Note that most relational databases support some form replication in general, however, very few provide strong consistency guarantees and ensure no data loss. Galera is a plugin for MySQL enabling synchronous multi-master replication.
All of the options discussed so far were based on abstract state machine replication. Another approach is simply using a
more traditional RDBMS with active replication support. Note that most relational databases support some form
replication in general, however, very few provide strong consistency guarantees and ensure no data loss. Galera is a
plugin for MySQL enabling synchronous multi-master replication.
Galera uses certification-based replication, which operates on write-sets: a database server executes the (database) transaction, and only performs replication if the transaction requires write operations. If it does, the transaction is broadcasted to all other servers (using atomic broadcast). On delivery, each server executes a deterministic certification phase, which decides if the transaction can commit or must abort. If a conflict occurs, the entire cluster rolls back the transaction. This type of technique is quite efficient in low-conflict situations and allows read scaling (the latter is mostly irrelevant for our use case).
Galera uses certification-based replication, which operates on write-sets: a database server executes the (database)
transaction, and only performs replication if the transaction requires write operations. If it does, the transaction is
broadcasted to all other servers (using atomic broadcast). On delivery, each server executes a deterministic
certification phase, which decides if the transaction can commit or must abort. If a conflict occurs, the entire cluster
rolls back the transaction. This type of technique is quite efficient in low-conflict situations and allows read scaling
(the latter is mostly irrelevant for our use case).
#### Advantages
@ -100,7 +122,11 @@ Galera uses certification-based replication, which operates on write-sets: a dat
*Distributed SQL database built on a transactional and strongly-consistent key-value store. Uses Raft-based replication.*
On paper, CockroachDB looks like a great candidate, but it relies on sharding: data is automatically split into partitions, and each partition is replicated using Raft. It performs great for single-shard database transactions, and also natively supports cross-shard atomic commits. However, the majority of Corda transactions are likely to have more than one input state, which means that most transaction commits will require cross-shard database transactions. In our tests we were only able to achieve up to 30 TPS in a 3 DC deployment.
On paper, CockroachDB looks like a great candidate, but it relies on sharding: data is automatically split into
partitions, and each partition is replicated using Raft. It performs great for single-shard database transactions, and
also natively supports cross-shard atomic commits. However, the majority of Corda transactions are likely to have more
than one input state, which means that most transaction commits will require cross-shard database transactions. In our
tests we were only able to achieve up to 30 TPS in a 3 DC deployment.
#### Advantages
@ -114,4 +140,5 @@ On paper, CockroachDB looks like a great candidate, but it relies on sharding: d
## Recommendation and justification
Proceed with Option C. A Kafka-based solution strikes the best balance between performance and the required effort to build a production-ready solution.
Proceed with Option C. A Kafka-based solution strikes the best balance between performance and the required effort to
build a production-ready solution.

View File

@ -1,46 +1,18 @@
![Corda](https://www.corda.net/wp-content/uploads/2016/11/fg005_corda_b.png)
# High Performance CFT Notary Service
DOCUMENT MANAGEMENT
---
## Document Control
| Title | High Performance CFT Notary Service |
| -------------------- | ------------------------------------------------------------ |
| Date | 27 March 2018 |
| Author | Andrius Dagys, Thomas Schroeter |
| Distribution | Design Review Board, Product Management, Services - Technical (Consulting), Platform Delivery |
| Corda target version | Enterprise |
| JIRA reference | https://r3-cev.atlassian.net/browse/CID-294 |
## Approvals
#### Document Sign-off
| Author | Andrius Dagys |
| ----------------- | -------------------------------------------------- |
| Reviewer(s) | (GitHub PR reviewers) |
| Final approver(s) | (GitHub PR approver(s) from Design Approval Board) |
#### Design Decisions
| Description | Recommendation | Approval |
| ---------------------------------------- | --------------- | ----------------------- |
| [Replication framework](decisions/replicated-storage.md) | Option C | (Design Approval Board) |
| [Index storage engine](decisions/index-storage.md) | Option A |(Design Approval Board) |
HIGH LEVEL DESIGN
---
.. important:: This design document describes a feature of Corda Enterprise.
## Overview
This proposal describes the architecture and an implementation for a high performance crash fault-tolerant notary service, operated by a single party.
This proposal describes the architecture and an implementation for a high performance crash fault-tolerant notary
service, operated by a single party.
## Background
For initial deployments, we expect to operate a single non-validating CFT notary service. The current Raft and Galera implementations cannot handle more than 100-200 TPS, which is likely to be a serious bottleneck in the near future. To support our clients and compete with other platforms we need a notary service that can handle TPS in the order of 1,000s.
For initial deployments, we expect to operate a single non-validating CFT notary service. The current Raft and Galera
implementations cannot handle more than 100-200 TPS, which is likely to be a serious bottleneck in the near future. To
support our clients and compete with other platforms we need a notary service that can handle TPS in the order of
1,000s.
## Scope
@ -69,28 +41,59 @@ The notary service should be able to:
- Tolerate single datacenter failure.
- Tolerate single disk failure/corruption.
## Target Solution
Having explored different solutions for implementing notaries we propose the following architecture for a CFT notary, consisting of two components:
1. A central replicated request log, which orders and stores all notarisation requests. Efficient append-only log storage can be used along with batched replication, making performance mainly dependent on network throughput.
2. Worker nodes that service clients and maintain a consumed state index. The state index is a simple key-value store containing committed state references and pointers to the corresponding request positions in the log. If lost, it can be reconstructed by replaying and applying request log entries. There is a range of fast key-value stores that can be used for implementation.
![High level architecture](./images/high-level.svg)
At high level, client notarisation requests first get forwarded to a central replicated request log. The requests are then applied in order to the consumed state index in each worker to verify input state uniqueness. Each individual request outcome (success/conflict) is then sent back to the initiating client by the worker responsible for it. To emphasise, each worker will process _all_ notarisation requests, but only respond to the ones it received directly.
Messages (requests) in the request log are persisted and retained forever. The state index has a relatively low footprint and can in theory be kept entirely in memory. However, when a worker crashes, replaying the log to recover the index may take too long depending on the SLAs. Additionally, we expect applying the requests to the index to be much faster than consuming request batches even with persistence enabled.
_Technically_, the request log can also be kept entirely in memory, and the cluster will still be able to tolerate up to $f < n/2$ node failures. However, if for some reason the entire cluster is shut down (e.g. administrator error), all requests will be forever lost! Therefore, we should avoid it.
The request log does not need to be a separate cluster, and the worker nodes _could_ maintain the request log replicas locally. This would allow workers to consume ordered requests from the local copy rather than from a leader node across the network. It is hard to say, however, if this would have a significant performance impact without performing tests in the specific network environment (e.g. the bottleneck could be the replication step).
One advantage of hosting the request log in a separate cluster is that it makes it easier to independently scale the number of worker nodes. If, for example, if transaction validation and resolution is required when receiving a notarisation request, we might find that a significant number of receivers is required to generate enough incoming traffic to the request log. On the flipside, increasing the number of workers adds additional consumers and load on the request log, so a balance needs to be found.
## Design Decisions
As the design decision documents below discuss, the most suitable platform for managing the request log was chosen to be [Apache Kafka](https://kafka.apache.org/), and [RocksDB](http://rocksdb.org/) as the storage engine for the committed state index.
.. toctree::
:maxdepth: 2
decisions/replicated-storage.md
decisions/index-storage.md
## Target Solution
Having explored different solutions for implementing notaries we propose the following architecture for a CFT notary,
consisting of two components:
1. A central replicated request log, which orders and stores all notarisation requests. Efficient append-only log
storage can be used along with batched replication, making performance mainly dependent on network throughput.
2. Worker nodes that service clients and maintain a consumed state index. The state index is a simple key-value store
containing committed state references and pointers to the corresponding request positions in the log. If lost, it can be
reconstructed by replaying and applying request log entries. There is a range of fast key-value stores that can be used
for implementation.
![High level architecture](./images/high-level.svg)
At high level, client notarisation requests first get forwarded to a central replicated request log. The requests are
then applied in order to the consumed state index in each worker to verify input state uniqueness. Each individual
request outcome (success/conflict) is then sent back to the initiating client by the worker responsible for it. To
emphasise, each worker will process _all_ notarisation requests, but only respond to the ones it received directly.
Messages (requests) in the request log are persisted and retained forever. The state index has a relatively low
footprint and can in theory be kept entirely in memory. However, when a worker crashes, replaying the log to recover the
index may take too long depending on the SLAs. Additionally, we expect applying the requests to the index to be much
faster than consuming request batches even with persistence enabled.
_Technically_, the request log can also be kept entirely in memory, and the cluster will still be able to tolerate up to
$f < n/2$ node failures. However, if for some reason the entire cluster is shut down (e.g. administrator error), all
requests will be forever lost! Therefore, we should avoid it.
The request log does not need to be a separate cluster, and the worker nodes _could_ maintain the request log replicas
locally. This would allow workers to consume ordered requests from the local copy rather than from a leader node across
the network. It is hard to say, however, if this would have a significant performance impact without performing tests in
the specific network environment (e.g. the bottleneck could be the replication step).
One advantage of hosting the request log in a separate cluster is that it makes it easier to independently scale the
number of worker nodes. If, for example, if transaction validation and resolution is required when receiving a
notarisation request, we might find that a significant number of receivers is required to generate enough incoming
traffic to the request log. On the flipside, increasing the number of workers adds additional consumers and load on the
request log, so a balance needs to be found.
## Design Decisions
As the design decision documents below discuss, the most suitable platform for managing the request log was chosen to be
[Apache Kafka](https://kafka.apache.org/), and [RocksDB](http://rocksdb.org/) as the storage engine for the committed
state index.
| Heading | Recommendation |
| ---------------------------------------- | -------------- |
@ -106,13 +109,23 @@ A Kafka-based notary service does not deviate much from the high-level target so
![Kafka overview](./images/kafka-high-level.svg)
For our purposes we can view Kafka as a replicated durable queue we can push messages (_records_) to and consume from. Consuming a record just increments the consumer's position pointer, and does not delete it. Old records eventually expire and get cleaned up, but the expiry time can be set to "indefinite" so all data is retained (it's a supported use-case).
For our purposes we can view Kafka as a replicated durable queue we can push messages (_records_) to and consume from.
Consuming a record just increments the consumer's position pointer, and does not delete it. Old records eventually
expire and get cleaned up, but the expiry time can be set to "indefinite" so all data is retained (it's a supported
use-case).
The main caveat is that Kafka does not allow consuming records from replicas directly all communication has to be routed via a single leader node.
The main caveat is that Kafka does not allow consuming records from replicas directly all communication has to be
routed via a single leader node.
In Kafka, logical queues are called _topics_. Each topic can be split into multiple partitions. Topics are assigned a _replication factor_, which specifies how many replicas Kafka should create for each partition. Each replicated partition has an assigned leader node which producers and consumers can connect to. Partitioning topics and evenly distributing partition leadership allows Kafka to scale well horizontally.
In Kafka, logical queues are called _topics_. Each topic can be split into multiple partitions. Topics are assigned a
_replication factor_, which specifies how many replicas Kafka should create for each partition. Each replicated
partition has an assigned leader node which producers and consumers can connect to. Partitioning topics and evenly
distributing partition leadership allows Kafka to scale well horizontally.
In our use-case, however, we can only use a single-partition topic for notarisation requests, which limits the total capacity and throughput to a single machine. Partitioning requests would break global transaction ordering guarantees for consumers. There is a [proposal](#kafka-throughput-scaling-via-partitioning) from Rick Parker on how we _could_ use partitioning to potentially avoid traffic contention on the single leader node.
In our use-case, however, we can only use a single-partition topic for notarisation requests, which limits the total
capacity and throughput to a single machine. Partitioning requests would break global transaction ordering guarantees
for consumers. There is a [proposal](#kafka-throughput-scaling-via-partitioning) from Rick Parker on how we _could_ use
partitioning to potentially avoid traffic contention on the single leader node.
### Data model

View File

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 19 KiB

View File

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 16 KiB

View File

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 37 KiB

View File

Before

Width:  |  Height:  |  Size: 40 KiB

After

Width:  |  Height:  |  Size: 40 KiB

View File

@ -1,41 +0,0 @@
![Corda](https://www.corda.net/wp-content/uploads/2016/11/fg005_corda_b.png)
--------------------------------------------
Design Decision: Storage engine for committed state index
============================================
## Background / Context
The storage engine for the committed state index needs to support a single operation: "insert all values with unique keys, or abort if any key conflict found". A wide range of solutions could be used for that, from embedded key-value stores to full-fledged relational databases. However, since we don't need any extra features a RDBMS provides over a simple key-value store, we'll only consider lightweight embedded solutions to avoid extra operational costs.
Most RDBMSs are also generally optimised for read performance (use B-tree based storage engines like InnoDB, MyISAM). Our workload is write-heavy and uses "random" primary keys (state references), which leads to particularly poor write performance for those types of engines as we have seen with our Galera-based notary service. One exception is the MyRocks storage engine, which is based on RocksDB and can handle write workloads well, and is supported by Percona Server, and MariaDB. It is easier, however, to just use RocksDB directly.
## Options Analysis
### A. RocksDB
An embedded key-value store based on log-structured merge-trees (LSM). It's highly configurable, provides lots of configuration options for performance tuning. E.g. can be tuned to run on different hardware flash, hard disks or entirely in-memory.
### B. LMDB
An embedded key-value store using B+ trees, has ACID semantics and support for transactions.
### C. MapDB
An embedded Java database engine, providing persistent collection implementations. Uses memory mapped files. Simple to use, implements Java collection interfaces. Provides a HashMap implementation that we can use for storing committed states.
### D. MVStore
An embedded log structured key-value store. Provides a simple persistent map abstraction. Supports multiple map implementations (B-tree, R-tree, concurrent B-tree).
## Recommendation and justification
Performance test results when running on a Macbook Pro with Intel Core i7-4980HQ CPU @ 2.80GHz, 16 GB RAM, SSD:
![Comparison](../images/store-comparison.png)
Multiple tests were run with varying number of transactions and input states per transaction: "1m x 1" denotes a million transactions with one input state.
Proceed with Option A, as RocksDB provides most tuning options and achieves by far the best write performance.
Note that the index storage engine can be replaced in the future with minimal changes required on the notary service.

View File

@ -61,6 +61,7 @@ We look forward to seeing what you can do with Corda!
design/failure-detection-master-election/design.md
design/float/design.md
design/hadr/design.md
design/kafka-notary/design.md
.. toctree::
:caption: Participate