centos8 binaries for libhiredis and libredis++

This commit is contained in:
Grant Limberg 2020-05-12 12:58:09 -07:00
parent aab96964b6
commit 5babd01d40
22 changed files with 12246 additions and 0 deletions

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,180 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_COMMAND_ARGS_H
#define SEWENEW_REDISPLUSPLUS_COMMAND_ARGS_H
#include <vector>
#include <list>
#include <string>
#include <tuple>
#include "utils.h"
namespace sw {
namespace redis {
class CmdArgs {
public:
template <typename Arg>
CmdArgs& append(Arg &&arg);
template <typename Arg, typename ...Args>
CmdArgs& append(Arg &&arg, Args &&...args);
// All overloads of operator<< are for internal use only.
CmdArgs& operator<<(const StringView &arg);
template <typename T,
typename std::enable_if<std::is_arithmetic<typename std::decay<T>::type>::value,
int>::type = 0>
CmdArgs& operator<<(T &&arg);
template <typename Iter>
CmdArgs& operator<<(const std::pair<Iter, Iter> &range);
template <std::size_t N, typename ...Args>
auto operator<<(const std::tuple<Args...> &) ->
typename std::enable_if<N == sizeof...(Args), CmdArgs&>::type {
return *this;
}
template <std::size_t N = 0, typename ...Args>
auto operator<<(const std::tuple<Args...> &arg) ->
typename std::enable_if<N < sizeof...(Args), CmdArgs&>::type;
const char** argv() {
return _argv.data();
}
const std::size_t* argv_len() {
return _argv_len.data();
}
std::size_t size() const {
return _argv.size();
}
private:
// Deep copy.
CmdArgs& _append(std::string arg);
// Shallow copy.
CmdArgs& _append(const StringView &arg);
// Shallow copy.
CmdArgs& _append(const char *arg);
template <typename T,
typename std::enable_if<std::is_arithmetic<typename std::decay<T>::type>::value,
int>::type = 0>
CmdArgs& _append(T &&arg) {
return operator<<(std::forward<T>(arg));
}
template <typename Iter>
CmdArgs& _append(std::true_type, const std::pair<Iter, Iter> &range);
template <typename Iter>
CmdArgs& _append(std::false_type, const std::pair<Iter, Iter> &range);
std::vector<const char *> _argv;
std::vector<std::size_t> _argv_len;
std::list<std::string> _args;
};
template <typename Arg>
inline CmdArgs& CmdArgs::append(Arg &&arg) {
return _append(std::forward<Arg>(arg));
}
template <typename Arg, typename ...Args>
inline CmdArgs& CmdArgs::append(Arg &&arg, Args &&...args) {
_append(std::forward<Arg>(arg));
return append(std::forward<Args>(args)...);
}
inline CmdArgs& CmdArgs::operator<<(const StringView &arg) {
_argv.push_back(arg.data());
_argv_len.push_back(arg.size());
return *this;
}
template <typename Iter>
inline CmdArgs& CmdArgs::operator<<(const std::pair<Iter, Iter> &range) {
return _append(IsKvPair<typename std::decay<decltype(*std::declval<Iter>())>::type>(), range);
}
template <typename T,
typename std::enable_if<std::is_arithmetic<typename std::decay<T>::type>::value,
int>::type>
inline CmdArgs& CmdArgs::operator<<(T &&arg) {
return _append(std::to_string(std::forward<T>(arg)));
}
template <std::size_t N, typename ...Args>
auto CmdArgs::operator<<(const std::tuple<Args...> &arg) ->
typename std::enable_if<N < sizeof...(Args), CmdArgs&>::type {
operator<<(std::get<N>(arg));
return operator<<<N + 1, Args...>(arg);
}
inline CmdArgs& CmdArgs::_append(std::string arg) {
_args.push_back(std::move(arg));
return operator<<(_args.back());
}
inline CmdArgs& CmdArgs::_append(const StringView &arg) {
return operator<<(arg);
}
inline CmdArgs& CmdArgs::_append(const char *arg) {
return operator<<(arg);
}
template <typename Iter>
CmdArgs& CmdArgs::_append(std::false_type, const std::pair<Iter, Iter> &range) {
auto first = range.first;
auto last = range.second;
while (first != last) {
*this << *first;
++first;
}
return *this;
}
template <typename Iter>
CmdArgs& CmdArgs::_append(std::true_type, const std::pair<Iter, Iter> &range) {
auto first = range.first;
auto last = range.second;
while (first != last) {
*this << first->first << first->second;
++first;
}
return *this;
}
}
}
#endif // end SEWENEW_REDISPLUSPLUS_COMMAND_ARGS_H

View File

@ -0,0 +1,211 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_COMMAND_OPTIONS_H
#define SEWENEW_REDISPLUSPLUS_COMMAND_OPTIONS_H
#include <string>
#include "utils.h"
namespace sw {
namespace redis {
enum class UpdateType {
EXIST,
NOT_EXIST,
ALWAYS
};
enum class InsertPosition {
BEFORE,
AFTER
};
enum class BoundType {
CLOSED,
OPEN,
LEFT_OPEN,
RIGHT_OPEN
};
// (-inf, +inf)
template <typename T>
class UnboundedInterval;
// [min, max], (min, max), (min, max], [min, max)
template <typename T>
class BoundedInterval;
// [min, +inf), (min, +inf)
template <typename T>
class LeftBoundedInterval;
// (-inf, max], (-inf, max)
template <typename T>
class RightBoundedInterval;
template <>
class UnboundedInterval<double> {
public:
const std::string& min() const;
const std::string& max() const;
};
template <>
class BoundedInterval<double> {
public:
BoundedInterval(double min, double max, BoundType type);
const std::string& min() const {
return _min;
}
const std::string& max() const {
return _max;
}
private:
std::string _min;
std::string _max;
};
template <>
class LeftBoundedInterval<double> {
public:
LeftBoundedInterval(double min, BoundType type);
const std::string& min() const {
return _min;
}
const std::string& max() const;
private:
std::string _min;
};
template <>
class RightBoundedInterval<double> {
public:
RightBoundedInterval(double max, BoundType type);
const std::string& min() const;
const std::string& max() const {
return _max;
}
private:
std::string _max;
};
template <>
class UnboundedInterval<std::string> {
public:
const std::string& min() const;
const std::string& max() const;
};
template <>
class BoundedInterval<std::string> {
public:
BoundedInterval(const std::string &min, const std::string &max, BoundType type);
const std::string& min() const {
return _min;
}
const std::string& max() const {
return _max;
}
private:
std::string _min;
std::string _max;
};
template <>
class LeftBoundedInterval<std::string> {
public:
LeftBoundedInterval(const std::string &min, BoundType type);
const std::string& min() const {
return _min;
}
const std::string& max() const;
private:
std::string _min;
};
template <>
class RightBoundedInterval<std::string> {
public:
RightBoundedInterval(const std::string &max, BoundType type);
const std::string& min() const;
const std::string& max() const {
return _max;
}
private:
std::string _max;
};
struct LimitOptions {
long long offset = 0;
long long count = -1;
};
enum class Aggregation {
SUM,
MIN,
MAX
};
enum class BitOp {
AND,
OR,
XOR,
NOT
};
enum class GeoUnit {
M,
KM,
MI,
FT
};
template <typename T>
struct WithCoord : TupleWithType<std::pair<double, double>, T> {};
template <typename T>
struct WithDist : TupleWithType<double, T> {};
template <typename T>
struct WithHash : TupleWithType<long long, T> {};
}
}
#endif // end SEWENEW_REDISPLUSPLUS_COMMAND_OPTIONS_H

View File

@ -0,0 +1,194 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_CONNECTION_H
#define SEWENEW_REDISPLUSPLUS_CONNECTION_H
#include <cerrno>
#include <cstring>
#include <memory>
#include <string>
#include <sstream>
#include <chrono>
#include <hiredis/hiredis.h>
#include "errors.h"
#include "reply.h"
#include "utils.h"
namespace sw {
namespace redis {
enum class ConnectionType {
TCP = 0,
UNIX
};
struct ConnectionOptions {
public:
ConnectionOptions() = default;
explicit ConnectionOptions(const std::string &uri);
ConnectionOptions(const ConnectionOptions &) = default;
ConnectionOptions& operator=(const ConnectionOptions &) = default;
ConnectionOptions(ConnectionOptions &&) = default;
ConnectionOptions& operator=(ConnectionOptions &&) = default;
~ConnectionOptions() = default;
ConnectionType type = ConnectionType::TCP;
std::string host;
int port = 6379;
std::string path;
std::string password;
int db = 0;
bool keep_alive = false;
std::chrono::milliseconds connect_timeout{0};
std::chrono::milliseconds socket_timeout{0};
private:
ConnectionOptions _parse_options(const std::string &uri) const;
ConnectionOptions _parse_tcp_options(const std::string &path) const;
ConnectionOptions _parse_unix_options(const std::string &path) const;
auto _split_string(const std::string &str, const std::string &delimiter) const ->
std::pair<std::string, std::string>;
};
class CmdArgs;
class Connection {
public:
explicit Connection(const ConnectionOptions &opts);
Connection(const Connection &) = delete;
Connection& operator=(const Connection &) = delete;
Connection(Connection &&) = default;
Connection& operator=(Connection &&) = default;
~Connection() = default;
// Check if the connection is broken. Client needs to do this check
// before sending some command to the connection. If it's broken,
// client needs to reconnect it.
bool broken() const noexcept {
return _ctx->err != REDIS_OK;
}
void reset() noexcept {
_ctx->err = 0;
}
void reconnect();
auto last_active() const
-> std::chrono::time_point<std::chrono::steady_clock> {
return _last_active;
}
template <typename ...Args>
void send(const char *format, Args &&...args);
void send(int argc, const char **argv, const std::size_t *argv_len);
void send(CmdArgs &args);
ReplyUPtr recv();
const ConnectionOptions& options() const {
return _opts;
}
friend void swap(Connection &lhs, Connection &rhs) noexcept;
private:
class Connector;
struct ContextDeleter {
void operator()(redisContext *context) const {
if (context != nullptr) {
redisFree(context);
}
};
};
using ContextUPtr = std::unique_ptr<redisContext, ContextDeleter>;
void _set_options();
void _auth();
void _select_db();
redisContext* _context();
ContextUPtr _ctx;
// The time that the connection is created or the time that
// the connection is used, i.e. *context()* is called.
std::chrono::time_point<std::chrono::steady_clock> _last_active{};
ConnectionOptions _opts;
};
using ConnectionSPtr = std::shared_ptr<Connection>;
enum class Role {
MASTER,
SLAVE
};
// Inline implementaions.
template <typename ...Args>
inline void Connection::send(const char *format, Args &&...args) {
auto ctx = _context();
assert(ctx != nullptr);
if (redisAppendCommand(ctx,
format,
std::forward<Args>(args)...) != REDIS_OK) {
throw_error(*ctx, "Failed to send command");
}
assert(!broken());
}
inline redisContext* Connection::_context() {
_last_active = std::chrono::steady_clock::now();
return _ctx.get();
}
}
}
#endif // end SEWENEW_REDISPLUSPLUS_CONNECTION_H

View File

@ -0,0 +1,115 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_CONNECTION_POOL_H
#define SEWENEW_REDISPLUSPLUS_CONNECTION_POOL_H
#include <chrono>
#include <mutex>
#include <memory>
#include <condition_variable>
#include <deque>
#include "connection.h"
#include "sentinel.h"
namespace sw {
namespace redis {
struct ConnectionPoolOptions {
// Max number of connections, including both in-use and idle ones.
std::size_t size = 1;
// Max time to wait for a connection. 0ms means client waits forever.
std::chrono::milliseconds wait_timeout{0};
// Max lifetime of a connection. 0ms means we never expire the connection.
std::chrono::milliseconds connection_lifetime{0};
};
class ConnectionPool {
public:
ConnectionPool(const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts);
ConnectionPool(SimpleSentinel sentinel,
const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts);
ConnectionPool() = default;
ConnectionPool(ConnectionPool &&that);
ConnectionPool& operator=(ConnectionPool &&that);
ConnectionPool(const ConnectionPool &) = delete;
ConnectionPool& operator=(const ConnectionPool &) = delete;
~ConnectionPool() = default;
// Fetch a connection from pool.
Connection fetch();
ConnectionOptions connection_options();
void release(Connection connection);
// Create a new connection.
Connection create();
private:
void _move(ConnectionPool &&that);
// NOT thread-safe
Connection _create();
Connection _create(SimpleSentinel &sentinel, const ConnectionOptions &opts, bool locked);
Connection _fetch();
void _wait_for_connection(std::unique_lock<std::mutex> &lock);
bool _need_reconnect(const Connection &connection,
const std::chrono::milliseconds &connection_lifetime) const;
void _update_connection_opts(const std::string &host, int port) {
_opts.host = host;
_opts.port = port;
}
bool _role_changed(const ConnectionOptions &opts) const {
return opts.port != _opts.port || opts.host != _opts.host;
}
ConnectionOptions _opts;
ConnectionPoolOptions _pool_opts;
std::deque<Connection> _pool;
std::size_t _used_connections = 0;
std::mutex _mutex;
std::condition_variable _cv;
SimpleSentinel _sentinel;
};
}
}
#endif // end SEWENEW_REDISPLUSPLUS_CONNECTION_POOL_H

View File

@ -0,0 +1,159 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_ERRORS_H
#define SEWENEW_REDISPLUSPLUS_ERRORS_H
#include <exception>
#include <string>
#include <hiredis/hiredis.h>
namespace sw {
namespace redis {
enum ReplyErrorType {
ERR,
MOVED,
ASK
};
class Error : public std::exception {
public:
explicit Error(const std::string &msg) : _msg(msg) {}
Error(const Error &) = default;
Error& operator=(const Error &) = default;
Error(Error &&) = default;
Error& operator=(Error &&) = default;
virtual ~Error() = default;
virtual const char* what() const noexcept {
return _msg.data();
}
private:
std::string _msg;
};
class IoError : public Error {
public:
explicit IoError(const std::string &msg) : Error(msg) {}
IoError(const IoError &) = default;
IoError& operator=(const IoError &) = default;
IoError(IoError &&) = default;
IoError& operator=(IoError &&) = default;
virtual ~IoError() = default;
};
class TimeoutError : public IoError {
public:
explicit TimeoutError(const std::string &msg) : IoError(msg) {}
TimeoutError(const TimeoutError &) = default;
TimeoutError& operator=(const TimeoutError &) = default;
TimeoutError(TimeoutError &&) = default;
TimeoutError& operator=(TimeoutError &&) = default;
virtual ~TimeoutError() = default;
};
class ClosedError : public Error {
public:
explicit ClosedError(const std::string &msg) : Error(msg) {}
ClosedError(const ClosedError &) = default;
ClosedError& operator=(const ClosedError &) = default;
ClosedError(ClosedError &&) = default;
ClosedError& operator=(ClosedError &&) = default;
virtual ~ClosedError() = default;
};
class ProtoError : public Error {
public:
explicit ProtoError(const std::string &msg) : Error(msg) {}
ProtoError(const ProtoError &) = default;
ProtoError& operator=(const ProtoError &) = default;
ProtoError(ProtoError &&) = default;
ProtoError& operator=(ProtoError &&) = default;
virtual ~ProtoError() = default;
};
class OomError : public Error {
public:
explicit OomError(const std::string &msg) : Error(msg) {}
OomError(const OomError &) = default;
OomError& operator=(const OomError &) = default;
OomError(OomError &&) = default;
OomError& operator=(OomError &&) = default;
virtual ~OomError() = default;
};
class ReplyError : public Error {
public:
explicit ReplyError(const std::string &msg) : Error(msg) {}
ReplyError(const ReplyError &) = default;
ReplyError& operator=(const ReplyError &) = default;
ReplyError(ReplyError &&) = default;
ReplyError& operator=(ReplyError &&) = default;
virtual ~ReplyError() = default;
};
class WatchError : public Error {
public:
explicit WatchError() : Error("Watched key has been modified") {}
WatchError(const WatchError &) = default;
WatchError& operator=(const WatchError &) = default;
WatchError(WatchError &&) = default;
WatchError& operator=(WatchError &&) = default;
virtual ~WatchError() = default;
};
// MovedError and AskError are defined in shards.h
class MovedError;
class AskError;
void throw_error(redisContext &context, const std::string &err_info);
void throw_error(const redisReply &reply);
}
}
#endif // end SEWENEW_REDISPLUSPLUS_ERRORS_H

View File

@ -0,0 +1,49 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_PIPELINE_H
#define SEWENEW_REDISPLUSPLUS_PIPELINE_H
#include <cassert>
#include <vector>
#include "connection.h"
namespace sw {
namespace redis {
class PipelineImpl {
public:
template <typename Cmd, typename ...Args>
void command(Connection &connection, Cmd cmd, Args &&...args) {
assert(!connection.broken());
cmd(connection, std::forward<Args>(args)...);
}
std::vector<ReplyUPtr> exec(Connection &connection, std::size_t cmd_num);
void discard(Connection &connection, std::size_t /*cmd_num*/) {
// Reconnect to Redis to discard all commands.
connection.reconnect();
}
};
}
}
#endif // end SEWENEW_REDISPLUSPLUS_PIPELINE_H

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,208 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_QUEUED_REDIS_HPP
#define SEWENEW_REDISPLUSPLUS_QUEUED_REDIS_HPP
namespace sw {
namespace redis {
template <typename Impl>
template <typename ...Args>
QueuedRedis<Impl>::QueuedRedis(const ConnectionSPtr &connection, Args &&...args) :
_connection(connection),
_impl(std::forward<Args>(args)...) {
assert(_connection);
}
template <typename Impl>
Redis QueuedRedis<Impl>::redis() {
return Redis(_connection);
}
template <typename Impl>
template <typename Cmd, typename ...Args>
auto QueuedRedis<Impl>::command(Cmd cmd, Args &&...args)
-> typename std::enable_if<!std::is_convertible<Cmd, StringView>::value,
QueuedRedis<Impl>&>::type {
try {
_sanity_check();
_impl.command(*_connection, cmd, std::forward<Args>(args)...);
++_cmd_num;
} catch (const Error &e) {
_invalidate();
throw;
}
return *this;
}
template <typename Impl>
template <typename ...Args>
QueuedRedis<Impl>& QueuedRedis<Impl>::command(const StringView &cmd_name, Args &&...args) {
auto cmd = [](Connection &connection, const StringView &cmd_name, Args &&...args) {
CmdArgs cmd_args;
cmd_args.append(cmd_name, std::forward<Args>(args)...);
connection.send(cmd_args);
};
return command(cmd, cmd_name, std::forward<Args>(args)...);
}
template <typename Impl>
template <typename Input>
auto QueuedRedis<Impl>::command(Input first, Input last)
-> typename std::enable_if<IsIter<Input>::value, QueuedRedis<Impl>&>::type {
if (first == last) {
throw Error("command: empty range");
}
auto cmd = [](Connection &connection, Input first, Input last) {
CmdArgs cmd_args;
while (first != last) {
cmd_args.append(*first);
++first;
}
connection.send(cmd_args);
};
return command(cmd, first, last);
}
template <typename Impl>
QueuedReplies QueuedRedis<Impl>::exec() {
try {
_sanity_check();
auto replies = _impl.exec(*_connection, _cmd_num);
_rewrite_replies(replies);
_reset();
return QueuedReplies(std::move(replies));
} catch (const Error &e) {
_invalidate();
throw;
}
}
template <typename Impl>
void QueuedRedis<Impl>::discard() {
try {
_sanity_check();
_impl.discard(*_connection, _cmd_num);
_reset();
} catch (const Error &e) {
_invalidate();
throw;
}
}
template <typename Impl>
void QueuedRedis<Impl>::_sanity_check() const {
if (!_valid) {
throw Error("Not in valid state");
}
if (_connection->broken()) {
throw Error("Connection is broken");
}
}
template <typename Impl>
inline void QueuedRedis<Impl>::_reset() {
_cmd_num = 0;
_set_cmd_indexes.clear();
_georadius_cmd_indexes.clear();
}
template <typename Impl>
void QueuedRedis<Impl>::_invalidate() {
_valid = false;
_reset();
}
template <typename Impl>
void QueuedRedis<Impl>::_rewrite_replies(std::vector<ReplyUPtr> &replies) const {
_rewrite_replies(_set_cmd_indexes, reply::rewrite_set_reply, replies);
_rewrite_replies(_georadius_cmd_indexes, reply::rewrite_georadius_reply, replies);
}
template <typename Impl>
template <typename Func>
void QueuedRedis<Impl>::_rewrite_replies(const std::vector<std::size_t> &indexes,
Func rewriter,
std::vector<ReplyUPtr> &replies) const {
for (auto idx : indexes) {
assert(idx < replies.size());
auto &reply = replies[idx];
assert(reply);
rewriter(*reply);
}
}
inline std::size_t QueuedReplies::size() const {
return _replies.size();
}
inline redisReply& QueuedReplies::get(std::size_t idx) {
_index_check(idx);
auto &reply = _replies[idx];
assert(reply);
return *reply;
}
template <typename Result>
inline Result QueuedReplies::get(std::size_t idx) {
auto &reply = get(idx);
return reply::parse<Result>(reply);
}
template <typename Output>
inline void QueuedReplies::get(std::size_t idx, Output output) {
auto &reply = get(idx);
reply::to_array(reply, output);
}
inline void QueuedReplies::_index_check(std::size_t idx) const {
if (idx >= size()) {
throw Error("Out of range");
}
}
}
}
#endif // end SEWENEW_REDISPLUSPLUS_QUEUED_REDIS_HPP

View File

@ -0,0 +1,25 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_REDISPLUSPLUS_H
#define SEWENEW_REDISPLUSPLUS_REDISPLUSPLUS_H
#include "redis.h"
#include "redis_cluster.h"
#include "queued_redis.h"
#include "sentinel.h"
#endif // end SEWENEW_REDISPLUSPLUS_REDISPLUSPLUS_H

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,363 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_REPLY_H
#define SEWENEW_REDISPLUSPLUS_REPLY_H
#include <cassert>
#include <string>
#include <memory>
#include <functional>
#include <tuple>
#include <hiredis/hiredis.h>
#include "errors.h"
#include "utils.h"
namespace sw {
namespace redis {
struct ReplyDeleter {
void operator()(redisReply *reply) const {
if (reply != nullptr) {
freeReplyObject(reply);
}
}
};
using ReplyUPtr = std::unique_ptr<redisReply, ReplyDeleter>;
namespace reply {
template <typename T>
struct ParseTag {};
template <typename T>
inline T parse(redisReply &reply) {
return parse(ParseTag<T>(), reply);
}
void parse(ParseTag<void>, redisReply &reply);
std::string parse(ParseTag<std::string>, redisReply &reply);
long long parse(ParseTag<long long>, redisReply &reply);
double parse(ParseTag<double>, redisReply &reply);
bool parse(ParseTag<bool>, redisReply &reply);
template <typename T>
Optional<T> parse(ParseTag<Optional<T>>, redisReply &reply);
template <typename T, typename U>
std::pair<T, U> parse(ParseTag<std::pair<T, U>>, redisReply &reply);
template <typename ...Args>
std::tuple<Args...> parse(ParseTag<std::tuple<Args...>>, redisReply &reply);
template <typename T, typename std::enable_if<IsSequenceContainer<T>::value, int>::type = 0>
T parse(ParseTag<T>, redisReply &reply);
template <typename T, typename std::enable_if<IsAssociativeContainer<T>::value, int>::type = 0>
T parse(ParseTag<T>, redisReply &reply);
template <typename Output>
long long parse_scan_reply(redisReply &reply, Output output);
inline bool is_error(redisReply &reply) {
return reply.type == REDIS_REPLY_ERROR;
}
inline bool is_nil(redisReply &reply) {
return reply.type == REDIS_REPLY_NIL;
}
inline bool is_string(redisReply &reply) {
return reply.type == REDIS_REPLY_STRING;
}
inline bool is_status(redisReply &reply) {
return reply.type == REDIS_REPLY_STATUS;
}
inline bool is_integer(redisReply &reply) {
return reply.type == REDIS_REPLY_INTEGER;
}
inline bool is_array(redisReply &reply) {
return reply.type == REDIS_REPLY_ARRAY;
}
std::string to_status(redisReply &reply);
template <typename Output>
void to_array(redisReply &reply, Output output);
// Rewrite set reply to bool type
void rewrite_set_reply(redisReply &reply);
// Rewrite georadius reply to OptionalLongLong type
void rewrite_georadius_reply(redisReply &reply);
template <typename Output>
auto parse_xpending_reply(redisReply &reply, Output output)
-> std::tuple<long long, OptionalString, OptionalString>;
}
// Inline implementations.
namespace reply {
namespace detail {
template <typename Output>
void to_array(redisReply &reply, Output output) {
if (!is_array(reply)) {
throw ProtoError("Expect ARRAY reply");
}
if (reply.element == nullptr) {
// Empty array.
return;
}
for (std::size_t idx = 0; idx != reply.elements; ++idx) {
auto *sub_reply = reply.element[idx];
if (sub_reply == nullptr) {
throw ProtoError("Null array element reply");
}
*output = parse<typename IterType<Output>::type>(*sub_reply);
++output;
}
}
bool is_flat_array(redisReply &reply);
template <typename Output>
void to_flat_array(redisReply &reply, Output output) {
if (reply.element == nullptr) {
// Empty array.
return;
}
if (reply.elements % 2 != 0) {
throw ProtoError("Not string pair array reply");
}
for (std::size_t idx = 0; idx != reply.elements; idx += 2) {
auto *key_reply = reply.element[idx];
auto *val_reply = reply.element[idx + 1];
if (key_reply == nullptr || val_reply == nullptr) {
throw ProtoError("Null string array reply");
}
using Pair = typename IterType<Output>::type;
using FirstType = typename std::decay<typename Pair::first_type>::type;
using SecondType = typename std::decay<typename Pair::second_type>::type;
*output = std::make_pair(parse<FirstType>(*key_reply),
parse<SecondType>(*val_reply));
++output;
}
}
template <typename Output>
void to_array(std::true_type, redisReply &reply, Output output) {
if (is_flat_array(reply)) {
to_flat_array(reply, output);
} else {
to_array(reply, output);
}
}
template <typename Output>
void to_array(std::false_type, redisReply &reply, Output output) {
to_array(reply, output);
}
template <typename T>
std::tuple<T> parse_tuple(redisReply **reply, std::size_t idx) {
assert(reply != nullptr);
auto *sub_reply = reply[idx];
if (sub_reply == nullptr) {
throw ProtoError("Null reply");
}
return std::make_tuple(parse<T>(*sub_reply));
}
template <typename T, typename ...Args>
auto parse_tuple(redisReply **reply, std::size_t idx) ->
typename std::enable_if<sizeof...(Args) != 0, std::tuple<T, Args...>>::type {
assert(reply != nullptr);
return std::tuple_cat(parse_tuple<T>(reply, idx),
parse_tuple<Args...>(reply, idx + 1));
}
}
template <typename T>
Optional<T> parse(ParseTag<Optional<T>>, redisReply &reply) {
if (reply::is_nil(reply)) {
return {};
}
return Optional<T>(parse<T>(reply));
}
template <typename T, typename U>
std::pair<T, U> parse(ParseTag<std::pair<T, U>>, redisReply &reply) {
if (!is_array(reply)) {
throw ProtoError("Expect ARRAY reply");
}
if (reply.elements != 2) {
throw ProtoError("NOT key-value PAIR reply");
}
if (reply.element == nullptr) {
throw ProtoError("Null PAIR reply");
}
auto *first = reply.element[0];
auto *second = reply.element[1];
if (first == nullptr || second == nullptr) {
throw ProtoError("Null pair reply");
}
return std::make_pair(parse<typename std::decay<T>::type>(*first),
parse<typename std::decay<U>::type>(*second));
}
template <typename ...Args>
std::tuple<Args...> parse(ParseTag<std::tuple<Args...>>, redisReply &reply) {
constexpr auto size = sizeof...(Args);
static_assert(size > 0, "DO NOT support parsing tuple with 0 element");
if (!is_array(reply)) {
throw ProtoError("Expect ARRAY reply");
}
if (reply.elements != size) {
throw ProtoError("Expect tuple reply with " + std::to_string(size) + "elements");
}
if (reply.element == nullptr) {
throw ProtoError("Null TUPLE reply");
}
return detail::parse_tuple<Args...>(reply.element, 0);
}
template <typename T, typename std::enable_if<IsSequenceContainer<T>::value, int>::type>
T parse(ParseTag<T>, redisReply &reply) {
if (!is_array(reply)) {
throw ProtoError("Expect ARRAY reply");
}
T container;
to_array(reply, std::back_inserter(container));
return container;
}
template <typename T, typename std::enable_if<IsAssociativeContainer<T>::value, int>::type>
T parse(ParseTag<T>, redisReply &reply) {
if (!is_array(reply)) {
throw ProtoError("Expect ARRAY reply");
}
T container;
to_array(reply, std::inserter(container, container.end()));
return container;
}
template <typename Output>
long long parse_scan_reply(redisReply &reply, Output output) {
if (reply.elements != 2 || reply.element == nullptr) {
throw ProtoError("Invalid scan reply");
}
auto *cursor_reply = reply.element[0];
auto *data_reply = reply.element[1];
if (cursor_reply == nullptr || data_reply == nullptr) {
throw ProtoError("Invalid cursor reply or data reply");
}
auto cursor_str = reply::parse<std::string>(*cursor_reply);
auto new_cursor = 0;
try {
new_cursor = std::stoll(cursor_str);
} catch (const std::exception &e) {
throw ProtoError("Invalid cursor reply: " + cursor_str);
}
reply::to_array(*data_reply, output);
return new_cursor;
}
template <typename Output>
void to_array(redisReply &reply, Output output) {
if (!is_array(reply)) {
throw ProtoError("Expect ARRAY reply");
}
detail::to_array(typename IsKvPairIter<Output>::type(), reply, output);
}
template <typename Output>
auto parse_xpending_reply(redisReply &reply, Output output)
-> std::tuple<long long, OptionalString, OptionalString> {
if (!is_array(reply) || reply.elements != 4) {
throw ProtoError("expect array reply with 4 elements");
}
for (std::size_t idx = 0; idx != reply.elements; ++idx) {
if (reply.element[idx] == nullptr) {
throw ProtoError("null array reply");
}
}
auto num = parse<long long>(*(reply.element[0]));
auto start = parse<OptionalString>(*(reply.element[1]));
auto end = parse<OptionalString>(*(reply.element[2]));
auto &entry_reply = *(reply.element[3]);
if (!is_nil(entry_reply)) {
to_array(entry_reply, output);
}
return std::make_tuple(num, std::move(start), std::move(end));
}
}
}
}
#endif // end SEWENEW_REDISPLUSPLUS_REPLY_H

View File

@ -0,0 +1,138 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_SENTINEL_H
#define SEWENEW_REDISPLUSPLUS_SENTINEL_H
#include <string>
#include <list>
#include <vector>
#include <memory>
#include <mutex>
#include "connection.h"
#include "shards.h"
#include "reply.h"
namespace sw {
namespace redis {
struct SentinelOptions {
std::vector<std::pair<std::string, int>> nodes;
std::string password;
bool keep_alive = true;
std::chrono::milliseconds connect_timeout{100};
std::chrono::milliseconds socket_timeout{100};
std::chrono::milliseconds retry_interval{100};
std::size_t max_retry = 2;
};
class Sentinel {
public:
explicit Sentinel(const SentinelOptions &sentinel_opts);
Sentinel(const Sentinel &) = delete;
Sentinel& operator=(const Sentinel &) = delete;
Sentinel(Sentinel &&) = delete;
Sentinel& operator=(Sentinel &&) = delete;
~Sentinel() = default;
private:
Connection master(const std::string &master_name, const ConnectionOptions &opts);
Connection slave(const std::string &master_name, const ConnectionOptions &opts);
class Iterator;
friend class SimpleSentinel;
std::list<ConnectionOptions> _parse_options(const SentinelOptions &opts) const;
Optional<Node> _get_master_addr_by_name(Connection &connection, const StringView &name);
std::vector<Node> _get_slave_addr_by_name(Connection &connection, const StringView &name);
Connection _connect_redis(const Node &node, ConnectionOptions opts);
Role _get_role(Connection &connection);
std::vector<Node> _parse_slave_info(redisReply &reply) const;
std::list<Connection> _healthy_sentinels;
std::list<ConnectionOptions> _broken_sentinels;
SentinelOptions _sentinel_opts;
std::mutex _mutex;
};
class SimpleSentinel {
public:
SimpleSentinel(const std::shared_ptr<Sentinel> &sentinel,
const std::string &master_name,
Role role);
SimpleSentinel() = default;
SimpleSentinel(const SimpleSentinel &) = default;
SimpleSentinel& operator=(const SimpleSentinel &) = default;
SimpleSentinel(SimpleSentinel &&) = default;
SimpleSentinel& operator=(SimpleSentinel &&) = default;
~SimpleSentinel() = default;
explicit operator bool() const {
return bool(_sentinel);
}
Connection create(const ConnectionOptions &opts);
private:
std::shared_ptr<Sentinel> _sentinel;
std::string _master_name;
Role _role = Role::MASTER;
};
class StopIterError : public Error {
public:
StopIterError() : Error("StopIterError") {}
StopIterError(const StopIterError &) = default;
StopIterError& operator=(const StopIterError &) = default;
StopIterError(StopIterError &&) = default;
StopIterError& operator=(StopIterError &&) = default;
virtual ~StopIterError() = default;
};
}
}
#endif // end SEWENEW_REDISPLUSPLUS_SENTINEL_H

View File

@ -0,0 +1,115 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_SHARDS_H
#define SEWENEW_REDISPLUSPLUS_SHARDS_H
#include <string>
#include <map>
#include "errors.h"
namespace sw {
namespace redis {
using Slot = std::size_t;
struct SlotRange {
Slot min;
Slot max;
};
inline bool operator<(const SlotRange &lhs, const SlotRange &rhs) {
return lhs.max < rhs.max;
}
struct Node {
std::string host;
int port;
};
inline bool operator==(const Node &lhs, const Node &rhs) {
return lhs.host == rhs.host && lhs.port == rhs.port;
}
struct NodeHash {
std::size_t operator()(const Node &node) const noexcept {
auto host_hash = std::hash<std::string>{}(node.host);
auto port_hash = std::hash<int>{}(node.port);
return host_hash ^ (port_hash << 1);
}
};
using Shards = std::map<SlotRange, Node>;
class RedirectionError : public ReplyError {
public:
RedirectionError(const std::string &msg);
RedirectionError(const RedirectionError &) = default;
RedirectionError& operator=(const RedirectionError &) = default;
RedirectionError(RedirectionError &&) = default;
RedirectionError& operator=(RedirectionError &&) = default;
virtual ~RedirectionError() = default;
Slot slot() const {
return _slot;
}
const Node& node() const {
return _node;
}
private:
std::pair<Slot, Node> _parse_error(const std::string &msg) const;
Slot _slot = 0;
Node _node;
};
class MovedError : public RedirectionError {
public:
explicit MovedError(const std::string &msg) : RedirectionError(msg) {}
MovedError(const MovedError &) = default;
MovedError& operator=(const MovedError &) = default;
MovedError(MovedError &&) = default;
MovedError& operator=(MovedError &&) = default;
virtual ~MovedError() = default;
};
class AskError : public RedirectionError {
public:
explicit AskError(const std::string &msg) : RedirectionError(msg) {}
AskError(const AskError &) = default;
AskError& operator=(const AskError &) = default;
AskError(AskError &&) = default;
AskError& operator=(AskError &&) = default;
virtual ~AskError() = default;
};
}
}
#endif // end SEWENEW_REDISPLUSPLUS_SHARDS_H

View File

@ -0,0 +1,137 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H
#define SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H
#include <cassert>
#include <unordered_map>
#include <string>
#include <random>
#include <memory>
#include "reply.h"
#include "connection_pool.h"
#include "shards.h"
namespace sw {
namespace redis {
using ConnectionPoolSPtr = std::shared_ptr<ConnectionPool>;
class GuardedConnection {
public:
GuardedConnection(const ConnectionPoolSPtr &pool) : _pool(pool),
_connection(_pool->fetch()) {
assert(!_connection.broken());
}
GuardedConnection(const GuardedConnection &) = delete;
GuardedConnection& operator=(const GuardedConnection &) = delete;
GuardedConnection(GuardedConnection &&) = default;
GuardedConnection& operator=(GuardedConnection &&) = default;
~GuardedConnection() {
_pool->release(std::move(_connection));
}
Connection& connection() {
return _connection;
}
private:
ConnectionPoolSPtr _pool;
Connection _connection;
};
class ShardsPool {
public:
ShardsPool() = default;
ShardsPool(const ShardsPool &that) = delete;
ShardsPool& operator=(const ShardsPool &that) = delete;
ShardsPool(ShardsPool &&that);
ShardsPool& operator=(ShardsPool &&that);
~ShardsPool() = default;
ShardsPool(const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts);
// Fetch a connection by key.
GuardedConnection fetch(const StringView &key);
// Randomly pick a connection.
GuardedConnection fetch();
// Fetch a connection by node.
GuardedConnection fetch(const Node &node);
void update();
ConnectionOptions connection_options(const StringView &key);
ConnectionOptions connection_options();
private:
void _move(ShardsPool &&that);
void _init_pool(const Shards &shards);
Shards _cluster_slots(Connection &connection) const;
ReplyUPtr _cluster_slots_command(Connection &connection) const;
Shards _parse_reply(redisReply &reply) const;
std::pair<SlotRange, Node> _parse_slot_info(redisReply &reply) const;
// Get slot by key.
std::size_t _slot(const StringView &key) const;
// Randomly pick a slot.
std::size_t _slot() const;
ConnectionPoolSPtr& _get_pool(Slot slot);
GuardedConnection _fetch(Slot slot);
ConnectionOptions _connection_options(Slot slot);
using NodeMap = std::unordered_map<Node, ConnectionPoolSPtr, NodeHash>;
NodeMap::iterator _add_node(const Node &node);
ConnectionPoolOptions _pool_opts;
ConnectionOptions _connection_opts;
Shards _shards;
NodeMap _pools;
std::mutex _mutex;
static const std::size_t SHARDS = 16383;
};
}
}
#endif // end SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H

View File

@ -0,0 +1,231 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_SUBSCRIBER_H
#define SEWENEW_REDISPLUSPLUS_SUBSCRIBER_H
#include <unordered_map>
#include <string>
#include <functional>
#include "connection.h"
#include "reply.h"
#include "command.h"
#include "utils.h"
namespace sw {
namespace redis {
// @NOTE: Subscriber is NOT thread-safe.
// Subscriber uses callbacks to handle messages. There are 6 kinds of messages:
// 1) MESSAGE: message sent to a channel.
// 2) PMESSAGE: message sent to channels of a given pattern.
// 3) SUBSCRIBE: meta message sent when we successfully subscribe to a channel.
// 4) UNSUBSCRIBE: meta message sent when we successfully unsubscribe to a channel.
// 5) PSUBSCRIBE: meta message sent when we successfully subscribe to a channel pattern.
// 6) PUNSUBSCRIBE: meta message sent when we successfully unsubscribe to a channel pattern.
//
// Use Subscriber::on_message(MsgCallback) to set the callback function for message of
// *MESSAGE* type, and the callback interface is:
// void (std::string channel, std::string msg)
//
// Use Subscriber::on_pmessage(PatternMsgCallback) to set the callback function for message of
// *PMESSAGE* type, and the callback interface is:
// void (std::string pattern, std::string channel, std::string msg)
//
// Messages of other types are called *META MESSAGE*, they have the same callback interface.
// Use Subscriber::on_meta(MetaCallback) to set the callback function:
// void (Subscriber::MsgType type, OptionalString channel, long long num)
//
// NOTE: If we haven't subscribe/psubscribe to any channel/pattern, and try to
// unsubscribe/punsubscribe without any parameter, i.e. unsubscribe/punsubscribe all
// channels/patterns, *channel* will be null. So the second parameter of meta callback
// is of type *OptionalString*.
//
// All these callback interfaces pass std::string by value, and you can take their ownership
// (i.e. std::move) safely.
//
// If you don't set callback for a specific kind of message, Subscriber::consume() will
// receive the message, and ignore it, i.e. no callback will be called.
class Subscriber {
public:
Subscriber(const Subscriber &) = delete;
Subscriber& operator=(const Subscriber &) = delete;
Subscriber(Subscriber &&) = default;
Subscriber& operator=(Subscriber &&) = default;
~Subscriber() = default;
enum class MsgType {
SUBSCRIBE,
UNSUBSCRIBE,
PSUBSCRIBE,
PUNSUBSCRIBE,
MESSAGE,
PMESSAGE
};
template <typename MsgCb>
void on_message(MsgCb msg_callback);
template <typename PMsgCb>
void on_pmessage(PMsgCb pmsg_callback);
template <typename MetaCb>
void on_meta(MetaCb meta_callback);
void subscribe(const StringView &channel);
template <typename Input>
void subscribe(Input first, Input last);
template <typename T>
void subscribe(std::initializer_list<T> channels) {
subscribe(channels.begin(), channels.end());
}
void unsubscribe();
void unsubscribe(const StringView &channel);
template <typename Input>
void unsubscribe(Input first, Input last);
template <typename T>
void unsubscribe(std::initializer_list<T> channels) {
unsubscribe(channels.begin(), channels.end());
}
void psubscribe(const StringView &pattern);
template <typename Input>
void psubscribe(Input first, Input last);
template <typename T>
void psubscribe(std::initializer_list<T> channels) {
psubscribe(channels.begin(), channels.end());
}
void punsubscribe();
void punsubscribe(const StringView &channel);
template <typename Input>
void punsubscribe(Input first, Input last);
template <typename T>
void punsubscribe(std::initializer_list<T> channels) {
punsubscribe(channels.begin(), channels.end());
}
void consume();
private:
friend class Redis;
friend class RedisCluster;
explicit Subscriber(Connection connection);
MsgType _msg_type(redisReply *reply) const;
void _check_connection();
void _handle_message(redisReply &reply);
void _handle_pmessage(redisReply &reply);
void _handle_meta(MsgType type, redisReply &reply);
using MsgCallback = std::function<void (std::string channel, std::string msg)>;
using PatternMsgCallback = std::function<void (std::string pattern,
std::string channel,
std::string msg)>;
using MetaCallback = std::function<void (MsgType type,
OptionalString channel,
long long num)>;
using TypeIndex = std::unordered_map<std::string, MsgType>;
static const TypeIndex _msg_type_index;
Connection _connection;
MsgCallback _msg_callback = nullptr;
PatternMsgCallback _pmsg_callback = nullptr;
MetaCallback _meta_callback = nullptr;
};
template <typename MsgCb>
void Subscriber::on_message(MsgCb msg_callback) {
_msg_callback = msg_callback;
}
template <typename PMsgCb>
void Subscriber::on_pmessage(PMsgCb pmsg_callback) {
_pmsg_callback = pmsg_callback;
}
template <typename MetaCb>
void Subscriber::on_meta(MetaCb meta_callback) {
_meta_callback = meta_callback;
}
template <typename Input>
void Subscriber::subscribe(Input first, Input last) {
if (first == last) {
return;
}
_check_connection();
cmd::subscribe_range(_connection, first, last);
}
template <typename Input>
void Subscriber::unsubscribe(Input first, Input last) {
_check_connection();
cmd::unsubscribe_range(_connection, first, last);
}
template <typename Input>
void Subscriber::psubscribe(Input first, Input last) {
if (first == last) {
return;
}
_check_connection();
cmd::psubscribe_range(_connection, first, last);
}
template <typename Input>
void Subscriber::punsubscribe(Input first, Input last) {
_check_connection();
cmd::punsubscribe_range(_connection, first, last);
}
}
}
#endif // end SEWENEW_REDISPLUSPLUS_SUBSCRIBER_H

View File

@ -0,0 +1,77 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_TRANSACTION_H
#define SEWENEW_REDISPLUSPLUS_TRANSACTION_H
#include <cassert>
#include <vector>
#include "connection.h"
#include "errors.h"
namespace sw {
namespace redis {
class TransactionImpl {
public:
explicit TransactionImpl(bool piped) : _piped(piped) {}
template <typename Cmd, typename ...Args>
void command(Connection &connection, Cmd cmd, Args &&...args);
std::vector<ReplyUPtr> exec(Connection &connection, std::size_t cmd_num);
void discard(Connection &connection, std::size_t cmd_num);
private:
void _open_transaction(Connection &connection);
void _close_transaction();
void _get_queued_reply(Connection &connection);
void _get_queued_replies(Connection &connection, std::size_t cmd_num);
std::vector<ReplyUPtr> _exec(Connection &connection);
void _discard(Connection &connection);
bool _in_transaction = false;
bool _piped;
};
template <typename Cmd, typename ...Args>
void TransactionImpl::command(Connection &connection, Cmd cmd, Args &&...args) {
assert(!connection.broken());
if (!_in_transaction) {
_open_transaction(connection);
}
cmd(connection, std::forward<Args>(args)...);
if (!_piped) {
_get_queued_reply(connection);
}
}
}
}
#endif // end SEWENEW_REDISPLUSPLUS_TRANSACTION_H

View File

@ -0,0 +1,269 @@
/**************************************************************************
Copyright (c) 2017 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/
#ifndef SEWENEW_REDISPLUSPLUS_UTILS_H
#define SEWENEW_REDISPLUSPLUS_UTILS_H
#include <cstring>
#include <string>
#include <type_traits>
namespace sw {
namespace redis {
// By now, not all compilers support std::string_view,
// so we make our own implementation.
class StringView {
public:
constexpr StringView() noexcept = default;
constexpr StringView(const char *data, std::size_t size) : _data(data), _size(size) {}
StringView(const char *data) : _data(data), _size(std::strlen(data)) {}
StringView(const std::string &str) : _data(str.data()), _size(str.size()) {}
constexpr StringView(const StringView &) noexcept = default;
StringView& operator=(const StringView &) noexcept = default;
constexpr const char* data() const noexcept {
return _data;
}
constexpr std::size_t size() const noexcept {
return _size;
}
private:
const char *_data = nullptr;
std::size_t _size = 0;
};
template <typename T>
class Optional {
public:
Optional() = default;
Optional(const Optional &) = default;
Optional& operator=(const Optional &) = default;
Optional(Optional &&) = default;
Optional& operator=(Optional &&) = default;
~Optional() = default;
template <typename ...Args>
explicit Optional(Args &&...args) : _value(true, T(std::forward<Args>(args)...)) {}
explicit operator bool() const {
return _value.first;
}
T& value() {
return _value.second;
}
const T& value() const {
return _value.second;
}
T* operator->() {
return &(_value.second);
}
const T* operator->() const {
return &(_value.second);
}
T& operator*() {
return _value.second;
}
const T& operator*() const {
return _value.second;
}
private:
std::pair<bool, T> _value;
};
using OptionalString = Optional<std::string>;
using OptionalLongLong = Optional<long long>;
using OptionalDouble = Optional<double>;
using OptionalStringPair = Optional<std::pair<std::string, std::string>>;
template <typename ...>
struct IsKvPair : std::false_type {};
template <typename T, typename U>
struct IsKvPair<std::pair<T, U>> : std::true_type {};
template <typename ...>
using Void = void;
template <typename T, typename U = Void<>>
struct IsInserter : std::false_type {};
template <typename T>
//struct IsInserter<T, Void<typename T::container_type>> : std::true_type {};
struct IsInserter<T,
typename std::enable_if<!std::is_void<typename T::container_type>::value>::type>
: std::true_type {};
template <typename Iter, typename T = Void<>>
struct IterType {
using type = typename std::iterator_traits<Iter>::value_type;
};
template <typename Iter>
//struct IterType<Iter, Void<typename Iter::container_type>> {
struct IterType<Iter,
//typename std::enable_if<std::is_void<typename Iter::value_type>::value>::type> {
typename std::enable_if<IsInserter<Iter>::value>::type> {
using type = typename std::decay<typename Iter::container_type::value_type>::type;
};
template <typename Iter, typename T = Void<>>
struct IsIter : std::false_type {};
template <typename Iter>
struct IsIter<Iter, typename std::enable_if<IsInserter<Iter>::value>::type> : std::true_type {};
template <typename Iter>
//struct IsIter<Iter, Void<typename std::iterator_traits<Iter>::iterator_category>>
struct IsIter<Iter,
typename std::enable_if<!std::is_void<
typename std::iterator_traits<Iter>::value_type>::value>::type>
: std::integral_constant<bool, !std::is_convertible<Iter, StringView>::value> {};
template <typename T>
struct IsKvPairIter : IsKvPair<typename IterType<T>::type> {};
template <typename T, typename Tuple>
struct TupleWithType : std::false_type {};
template <typename T>
struct TupleWithType<T, std::tuple<>> : std::false_type {};
template <typename T, typename U, typename ...Args>
struct TupleWithType<T, std::tuple<U, Args...>> : TupleWithType<T, std::tuple<Args...>> {};
template <typename T, typename ...Args>
struct TupleWithType<T, std::tuple<T, Args...>> : std::true_type {};
template <std::size_t ...Is>
struct IndexSequence {};
template <std::size_t I, std::size_t ...Is>
struct MakeIndexSequence : MakeIndexSequence<I - 1, I - 1, Is...> {};
template <std::size_t ...Is>
struct MakeIndexSequence<0, Is...> : IndexSequence<Is...> {};
// NthType and NthValue are taken from
// https://stackoverflow.com/questions/14261183
template <std::size_t I, typename ...Args>
struct NthType {};
template <typename Arg, typename ...Args>
struct NthType<0, Arg, Args...> {
using type = Arg;
};
template <std::size_t I, typename Arg, typename ...Args>
struct NthType<I, Arg, Args...> {
using type = typename NthType<I - 1, Args...>::type;
};
template <typename ...Args>
struct LastType {
using type = typename NthType<sizeof...(Args) - 1, Args...>::type;
};
struct InvalidLastType {};
template <>
struct LastType<> {
using type = InvalidLastType;
};
template <std::size_t I, typename Arg, typename ...Args>
auto NthValue(Arg &&arg, Args &&...)
-> typename std::enable_if<(I == 0), decltype(std::forward<Arg>(arg))>::type {
return std::forward<Arg>(arg);
}
template <std::size_t I, typename Arg, typename ...Args>
auto NthValue(Arg &&, Args &&...args)
-> typename std::enable_if<(I > 0),
decltype(std::forward<typename NthType<I, Arg, Args...>::type>(
std::declval<typename NthType<I, Arg, Args...>::type>()))>::type {
return std::forward<typename NthType<I, Arg, Args...>::type>(
NthValue<I - 1>(std::forward<Args>(args)...));
}
template <typename ...Args>
auto LastValue(Args &&...args)
-> decltype(std::forward<typename LastType<Args...>::type>(
std::declval<typename LastType<Args...>::type>())) {
return std::forward<typename LastType<Args...>::type>(
NthValue<sizeof...(Args) - 1>(std::forward<Args>(args)...));
}
template <typename T, typename = Void<>>
struct HasPushBack : std::false_type {};
template <typename T>
struct HasPushBack<T,
typename std::enable_if<
std::is_void<decltype(
std::declval<T>().push_back(std::declval<typename T::value_type>())
)>::value>::type> : std::true_type {};
template <typename T, typename = Void<>>
struct HasInsert : std::false_type {};
template <typename T>
struct HasInsert<T,
typename std::enable_if<
std::is_same<
decltype(std::declval<T>().insert(std::declval<typename T::const_iterator>(),
std::declval<typename T::value_type>())),
typename T::iterator>::value>::type> : std::true_type {};
template <typename T>
struct IsSequenceContainer
: std::integral_constant<bool,
HasPushBack<T>::value
&& !std::is_same<typename std::decay<T>::type, std::string>::value> {};
template <typename T>
struct IsAssociativeContainer
: std::integral_constant<bool,
HasInsert<T>::value && !HasPushBack<T>::value> {};
uint16_t crc16(const char *buf, int len);
}
}
#endif // end SEWENEW_REDISPLUSPLUS_UTILS_H