ZeroTierOne/controller/RabbitMQ.cpp

121 lines
3.0 KiB
C++
Raw Normal View History

/*
2019-08-23 16:23:39 +00:00
* Copyright (c)2019 ZeroTier, Inc.
*
2019-08-23 16:23:39 +00:00
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
2019-08-23 16:23:39 +00:00
* Change Date: 2023-01-01
*
2019-08-23 16:23:39 +00:00
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
2019-08-23 16:23:39 +00:00
/****/
2019-03-05 23:11:50 +00:00
#include "RabbitMQ.hpp"
2019-03-11 18:16:44 +00:00
#ifdef ZT_CONTROLLER_USE_LIBPQ
2019-03-05 23:11:50 +00:00
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdexcept>
#include <cstring>
namespace ZeroTier
{
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
2019-08-06 12:51:50 +00:00
: _mqc(cfg)
, _qName(queueName)
, _socket(NULL)
, _status(0)
2019-08-23 16:23:39 +00:00
{
2019-03-05 23:11:50 +00:00
}
RabbitMQ::~RabbitMQ()
{
2019-08-06 12:51:50 +00:00
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(_conn);
2019-03-05 23:11:50 +00:00
}
void RabbitMQ::init()
{
2019-08-06 12:51:50 +00:00
struct timeval tval;
memset(&tval, 0, sizeof(struct timeval));
tval.tv_sec = 5;
2019-03-05 23:11:50 +00:00
2019-08-06 12:51:50 +00:00
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
_conn = amqp_new_connection();
_socket = amqp_tcp_socket_new(_conn);
if (!_socket) {
throw std::runtime_error("Can't create socket for RabbitMQ");
}
2019-08-23 16:23:39 +00:00
2019-08-06 12:51:50 +00:00
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
if (_status) {
throw std::runtime_error("Can't connect to RabbitMQ");
}
2019-08-23 16:23:39 +00:00
2019-08-06 12:51:50 +00:00
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
_mqc->username, _mqc->password);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("RabbitMQ Login Error");
}
2019-03-05 23:11:50 +00:00
2019-08-06 12:51:50 +00:00
static int chan = 0;
2019-03-08 18:29:36 +00:00
{
Mutex::Lock l(_chan_m);
2019-08-06 12:51:50 +00:00
_channel = ++chan;
}
amqp_channel_open(_conn, _channel);
r = amqp_get_rpc_reply(_conn);
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error opening communication channel");
}
2019-08-23 16:23:39 +00:00
2019-08-06 12:51:50 +00:00
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error declaring queue " + std::string(_qName));
2019-03-08 18:29:36 +00:00
}
2019-03-05 23:11:50 +00:00
2019-08-06 12:51:50 +00:00
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error consuming queue " + std::string(_qName));
}
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
2019-03-05 23:11:50 +00:00
}
std::string RabbitMQ::consume()
{
2019-08-06 12:51:50 +00:00
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(_conn);
2019-03-05 23:11:50 +00:00
2019-08-06 12:51:50 +00:00
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
2019-08-06 12:51:50 +00:00
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
// timeout waiting for message. Return empty string
return "";
} else {
throw std::runtime_error("Error getting message");
}
}
2019-03-05 23:11:50 +00:00
2019-08-06 12:51:50 +00:00
std::string msg(
(const char*)envelope.message.body.bytes,
envelope.message.body.len
);
amqp_destroy_envelope(&envelope);
return msg;
2019-03-05 23:11:50 +00:00
}
2019-03-08 18:29:36 +00:00
}
2019-03-11 18:16:44 +00:00
#endif // ZT_CONTROLLER_USE_LIBPQ