#include "RabbitMQ.hpp"

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdexcept>
#include <cstring>

namespace ZeroTier
{

RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
    : _mqc(cfg)
    , _qName(queueName)
    , _socket(NULL)
    , _status(0)
{   
}

RabbitMQ::~RabbitMQ()
{
    amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
    amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(_conn);
}

void RabbitMQ::init()
{
    struct timeval tval;
    memset(&tval, 0, sizeof(struct timeval));
    tval.tv_sec = 5;

    fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
    _conn = amqp_new_connection();
    _socket = amqp_tcp_socket_new(_conn);
    if (!_socket) {
        throw std::runtime_error("Can't create socket for RabbitMQ");
    }
    
    _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
    if (_status) {
        throw std::runtime_error("Can't connect to RabbitMQ");
    }
    
    amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
        _mqc->username, _mqc->password);
    if (r.reply_type != AMQP_RESPONSE_NORMAL) {
        throw std::runtime_error("RabbitMQ Login Error");
    }

    static int chan = 0;
    _channel = ++chan;
    amqp_channel_open(_conn, _channel);
    r = amqp_get_rpc_reply(_conn);
    if(r.reply_type != AMQP_RESPONSE_NORMAL) {
        throw std::runtime_error("Error opening communication channel");
    }
    
    _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
    r = amqp_get_rpc_reply(_conn);
    if (r.reply_type != AMQP_RESPONSE_NORMAL) {
        throw std::runtime_error("Error declaring queue " + std::string(_qName));
    }

    amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
    r = amqp_get_rpc_reply(_conn);
    if (r.reply_type != AMQP_RESPONSE_NORMAL) {
        throw std::runtime_error("Error consuming queue " + std::string(_qName));
    }
    fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
}

std::string RabbitMQ::consume()
{
    amqp_rpc_reply_t res;
    amqp_envelope_t envelope;
    amqp_maybe_release_buffers(_conn);

    res = amqp_consume_message(_conn, &envelope, NULL, 0);
    if (res.reply_type != AMQP_RESPONSE_NORMAL) {
        throw std::runtime_error("Error getting message");
    }

    std::string msg(
        (const char*)envelope.message.body.bytes,
        envelope.message.body.len
    );
    amqp_destroy_envelope(&envelope);
    return msg;
}

}