mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-01-03 03:36:41 +00:00
135 lines
3.7 KiB
C++
135 lines
3.7 KiB
C++
/*
|
|
* ZeroTier One - Network Virtualization Everywhere
|
|
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*
|
|
* --
|
|
*
|
|
* You can be released from the requirements of the license by purchasing
|
|
* a commercial license. Buying such a license is mandatory as soon as you
|
|
* develop commercial closed-source software that incorporates or links
|
|
* directly against ZeroTier software without disclosing the source code
|
|
* of your own application.
|
|
*/
|
|
|
|
|
|
#include "RabbitMQ.hpp"
|
|
|
|
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
|
|
|
#include <amqp.h>
|
|
#include <amqp_tcp_socket.h>
|
|
#include <stdexcept>
|
|
#include <cstring>
|
|
|
|
namespace ZeroTier
|
|
{
|
|
|
|
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
|
|
: _mqc(cfg)
|
|
, _qName(queueName)
|
|
, _socket(NULL)
|
|
, _status(0)
|
|
{
|
|
}
|
|
|
|
RabbitMQ::~RabbitMQ()
|
|
{
|
|
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
|
|
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
|
|
amqp_destroy_connection(_conn);
|
|
}
|
|
|
|
void RabbitMQ::init()
|
|
{
|
|
struct timeval tval;
|
|
memset(&tval, 0, sizeof(struct timeval));
|
|
tval.tv_sec = 5;
|
|
|
|
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
|
|
_conn = amqp_new_connection();
|
|
_socket = amqp_tcp_socket_new(_conn);
|
|
if (!_socket) {
|
|
throw std::runtime_error("Can't create socket for RabbitMQ");
|
|
}
|
|
|
|
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
|
|
if (_status) {
|
|
throw std::runtime_error("Can't connect to RabbitMQ");
|
|
}
|
|
|
|
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
|
|
_mqc->username, _mqc->password);
|
|
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
|
throw std::runtime_error("RabbitMQ Login Error");
|
|
}
|
|
|
|
static int chan = 0;
|
|
{
|
|
Mutex::Lock l(_chan_m);
|
|
_channel = ++chan;
|
|
}
|
|
amqp_channel_open(_conn, _channel);
|
|
r = amqp_get_rpc_reply(_conn);
|
|
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
|
|
throw std::runtime_error("Error opening communication channel");
|
|
}
|
|
|
|
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
|
|
r = amqp_get_rpc_reply(_conn);
|
|
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
|
throw std::runtime_error("Error declaring queue " + std::string(_qName));
|
|
}
|
|
|
|
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
|
|
r = amqp_get_rpc_reply(_conn);
|
|
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
|
throw std::runtime_error("Error consuming queue " + std::string(_qName));
|
|
}
|
|
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
|
|
}
|
|
|
|
std::string RabbitMQ::consume()
|
|
{
|
|
amqp_rpc_reply_t res;
|
|
amqp_envelope_t envelope;
|
|
amqp_maybe_release_buffers(_conn);
|
|
|
|
struct timeval timeout;
|
|
timeout.tv_sec = 1;
|
|
timeout.tv_usec = 0;
|
|
|
|
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
|
|
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
|
|
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
|
|
// timeout waiting for message. Return empty string
|
|
return "";
|
|
} else {
|
|
throw std::runtime_error("Error getting message");
|
|
}
|
|
}
|
|
|
|
std::string msg(
|
|
(const char*)envelope.message.body.bytes,
|
|
envelope.message.body.len
|
|
);
|
|
amqp_destroy_envelope(&envelope);
|
|
return msg;
|
|
}
|
|
|
|
}
|
|
|
|
#endif // ZT_CONTROLLER_USE_LIBPQ
|