#include "serval.h"
#include "dataformats.h"
#include "rhizome.h"
#include "message_ply.h"
#include "log.h"
#include "debug.h"
#include "conf.h"
#include "overlay_buffer.h"

static int message_ply_load_manifest(const keyring_identity *id, struct message_ply *ply, rhizome_manifest *m)
{
  assert(ply->known_bid);
  if (rhizome_retrieve_manifest(&ply->bundle_id, m) != RHIZOME_BUNDLE_STATUS_SAME)
    return 1;
  rhizome_authenticate_author(m);
  if (!m->haveSecret || m->authorship != AUTHOR_AUTHENTIC)
    return -1;
  assert(m->author_identity == id);
  ply->author=m->author;
  return 0;
}

static int message_ply_fill_manifest(const keyring_identity *id, const sid_t *recipient, struct message_ply *ply, rhizome_manifest *m)
{
  assert(!ply->found);
  rhizome_manifest_set_sender(m, id->box_pk);
  rhizome_manifest_set_recipient(m, recipient);
  rhizome_manifest_set_filesize(m, 0);
  rhizome_manifest_set_tail(m, 0);
  rhizome_manifest_set_author_identity(m, id);
  int ret=-1;
  struct rhizome_bundle_result result = rhizome_fill_manifest(m, NULL);
  switch (result.status) {
    case RHIZOME_BUNDLE_STATUS_NEW:
    case RHIZOME_BUNDLE_STATUS_SAME:
    case RHIZOME_BUNDLE_STATUS_DUPLICATE:
      ret = 0;
      break;
    case RHIZOME_BUNDLE_STATUS_ERROR:
    case RHIZOME_BUNDLE_STATUS_INVALID:
    case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
      WHYF("Error creating ply manifest: %s", alloca_rhizome_bundle_result(result));
      break;
    case RHIZOME_BUNDLE_STATUS_BUSY:
      // TODO
    case RHIZOME_BUNDLE_STATUS_OLD:
    case RHIZOME_BUNDLE_STATUS_FAKE:
    case RHIZOME_BUNDLE_STATUS_NO_ROOM:
    case RHIZOME_BUNDLE_STATUS_MANIFEST_TOO_BIG:
      WARNF("Cannot create ply manifest: %s", alloca_rhizome_bundle_result(result));
      break;
    case RHIZOME_BUNDLE_STATUS_READONLY:
      INFOF("Cannot create ply manifest: %s", alloca_rhizome_bundle_result(result));
      break;
  }
  rhizome_bundle_result_free(&result);
  if (ret==0){
    assert(m->haveSecret);
    assert(!recipient || m->payloadEncryption == PAYLOAD_ENCRYPTED);
    ply->bundle_id = m->keypair.public_key;
    ply->author = m->author;
    ply->found = ply->known_bid = 1;
  }
  return ret;
}

int message_ply_write_open(
  struct message_ply_write *ply_write,
  const struct keyring_identity *id,
  const char *service,
  const sid_t *recipient,
  struct message_ply *ply,
  const char *name,
  unsigned nassignments,
  const struct rhizome_manifest_field_assignment *assignments,
  uint64_t advance_by)
{
  bzero(ply_write, sizeof(*ply_write));
  ply_write->m = rhizome_new_manifest();
  if (!ply_write->m)
    return -1;

  if (ply->known_bid){
    switch(message_ply_load_manifest(id, ply, ply_write->m)){
      case 0:
	ply->found = 1;
	break;
      case 1:
	ply->found = 0;
	break;
      default:
	return -1;
    }
  }

  // TODO add sender name?
  // if recipient, actual sender & name should be encrypted...
  if (name)
    rhizome_manifest_set_name(ply_write->m, name);

  struct rhizome_bundle_result result = rhizome_apply_assignments(ply_write->m, nassignments, assignments);
  if (result.status != RHIZOME_BUNDLE_STATUS_NEW){
    WARNF("Cannot create message ply manifest: %s", alloca_rhizome_bundle_result(result));
    rhizome_bundle_result_free(&result);
    return -1;
  }
  rhizome_bundle_result_free(&result);

  if (!ply->found){
    rhizome_manifest_set_service(ply_write->m, service);
    if (ply->known_bid)
      rhizome_manifest_set_id(ply_write->m, &ply->bundle_id);
    if (message_ply_fill_manifest(id, recipient, ply, ply_write->m)!=0)
      return -1;
  }

  enum rhizome_payload_status status = rhizome_write_open_journal(&ply_write->write, ply_write->m, advance_by, RHIZOME_SIZE_UNSET);
  if (status != RHIZOME_PAYLOAD_STATUS_NEW)
    return -1;

  return 0;
}

int message_ply_write_finish(struct message_ply_write *ply_write, struct message_ply *ply)
{
  enum rhizome_payload_status status = rhizome_finish_write(&ply_write->write);
  status = rhizome_finish_store(&ply_write->write, ply_write->m, status);
  // There's a chance that the new payload will actually be identical to one that already exists
  if (status != RHIZOME_PAYLOAD_STATUS_NEW && status != RHIZOME_PAYLOAD_STATUS_STORED){
    WHYF("Failed to write payload: %s", rhizome_payload_status_message(status));
    return -1;
  }
  rhizome_manifest *mout = NULL;
  struct rhizome_bundle_result result = rhizome_manifest_finalise(ply_write->m, &mout, 1);
  if (result.status != RHIZOME_BUNDLE_STATUS_NEW){
    WHYF("Cannot create message ply manifest: %s", alloca_rhizome_bundle_result(result));
    rhizome_bundle_result_free(&result);
    return -1;
  }

  if (ply){
    ply->version = ply_write->m->version;
    ply->tail = ply_write->m->tail;
    ply->size = ply_write->m->filesize;
  }

  rhizome_bundle_result_free(&result);
  if (mout && mout!=ply_write->m){
    rhizome_manifest_free(ply_write->m);
    ply_write->m = mout;
  }
  return 0;
}

void message_ply_write_close(struct message_ply_write *ply_write)
{
  rhizome_fail_write(&ply_write->write);
  if (ply_write->m)
    rhizome_manifest_free(ply_write->m);
  ply_write->m = NULL;
}

int message_ply_append(const keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,
  const char *name, unsigned nassignments, const struct rhizome_manifest_field_assignment *assignments)
{
  struct message_ply_write write;
  int ret=-1;

  assert(!ob_overrun(b));

  if (message_ply_write_open(&write, id, service, recipient, ply, name, nassignments, assignments, 0) == -1)
    goto end;
  DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64,
    ob_position(b), write.write.written_offset);
  if (rhizome_write_buffer(&write.write, ob_ptr(b), ob_position(b)) == -1)
    goto end;
  ret = message_ply_write_finish(&write, ply);
end:
  message_ply_write_close(&write);
  return ret;
}

int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid, const sign_keypair_t *keypair)
{

  rhizome_manifest *m = rhizome_new_manifest();
  if (!m)
    return -1;

  int ret=-1;
  enum rhizome_bundle_status status;
  if (keypair){
    DEBUGF2(meshms, meshmb, "Opening ply %s", alloca_tohex_rhizome_bid_t(keypair->public_key));
    struct rhizome_bundle_result result;
    result = rhizome_private_bundle(m, keypair);
    status = result.status;
    rhizome_bundle_result_free(&result);
  }else{
    DEBUGF2(meshms, meshmb, "Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
    status = rhizome_retrieve_manifest(bid, m);
  }

  if (status == RHIZOME_BUNDLE_STATUS_SAME
    && rhizome_open_decrypt_read(m, &ply->read) == RHIZOME_PAYLOAD_STATUS_STORED){

    assert(m->filesize != RHIZOME_SIZE_UNSET);
    ply->bundle_id = m->keypair.public_key;
    rhizome_lookup_author(m);
    if (m->authorship == AUTHOR_LOCAL || m->authorship == AUTHOR_REMOTE)
      ply->author = m->author;
    else if(m->has_sender)
      ply->author = m->sender;
    else
      bzero(&ply->author, sizeof(ply->author));
    ply->read.offset = ply->read.length = m->filesize;
    if (m->name && *m->name)
      ply->name = str_edup(m->name);
    ret = 0;
  }
  rhizome_manifest_free(m);
  return ret;
}

void message_ply_read_rewind(struct message_ply_read *ply)
{
  ply->read.offset = ply->read.length;
}

int message_ply_is_open(struct message_ply_read *ply)
{
  return ply->read.length>0;
}

void message_ply_read_close(struct message_ply_read *ply)
{
  if (ply->record){
    free(ply->record);
    ply->record=NULL;
  }
  if (ply->name){
    free((void*)ply->name);
    ply->name = NULL;
  }
  ply->record_size=0;
  ply->buff.len=0;
  rhizome_read_close(&ply->read);
}

// read the next record from the ply (backwards)
// returns -1 if there is an error, or if at the end of records
int message_ply_read_prev(struct message_ply_read *ply)
{
  ply->record_end_offset = ply->read.offset;
  uint8_t footer[2];
  if (ply->read.offset <= sizeof footer) {
    DEBUGF2(meshms, meshmb, "EOF");
    return -1;
  }
  ply->read.offset -= sizeof footer;
  ssize_t read = rhizome_read_buffered(&ply->read, &ply->buff, footer, sizeof footer);
  if (read == -1)
    return WHYF("rhizome_read_buffered() failed");
  if ((size_t) read != sizeof footer)
    return WHYF("Expected %zu bytes read, got %zu", (size_t) sizeof footer, (size_t) read);
  // (rhizome_read automatically advances the offset by the number of bytes read)
  {
    uint16_t r = read_uint16(footer);
    ply->type = r & 0xF;
    ply->record_length = r >> 4;
  }
  DEBUGF2(meshms, meshmb, "Found record %d, length %d @%"PRId64" - @%"PRId64,
    ply->type, ply->record_length, ply->record_end_offset - (ply->record_length + sizeof footer), ply->record_end_offset);
  // need to allow for advancing the tail and cutting a message in half.
  if (ply->record_length + sizeof footer > ply->read.offset){
    DEBUGF2(meshms, meshmb, "EOF");
    return -1;
  }
  ply->read.offset -= ply->record_length + sizeof(footer);
  uint64_t record_start = ply->read.offset;
  if (ply->record_size < ply->record_length){
    ply->record_size = ply->record_length;
    unsigned char *b = erealloc(ply->record, ply->record_size);
    if (!b)
      return -1;
    ply->record = b;
  }
  read = rhizome_read_buffered(&ply->read, &ply->buff, ply->record, ply->record_length);
  if (read == -1)
    return WHYF("rhizome_read_buffered() failed");
  if ((size_t) read != ply->record_length)
    return WHYF("Expected %u bytes read, got %zu", ply->record_length, (size_t) read);
  ply->read.offset = record_start;
  return 0;
}

int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestamp)
{
  assert(ply->type == MESSAGE_BLOCK_TYPE_TIME);
  if (ply->record_length<4)
    return -1;
  *timestamp = read_uint32(ply->record);
  return 0;
}

int message_ply_parse_ack(struct message_ply_read *ply, struct message_ply_ack *ack)
{
  int ofs=0;
  bzero(ack, sizeof *ack);

  int r = unpack_uint(&ply->record[ofs], ply->record_length, &ack->end_offset);
  if (r == -1)
    return -1;
  ofs+=r;
  uint64_t length;
  r = unpack_uint(&ply->record[ofs], ply->record_length - ofs, &length);
  if (r == -1)
    return 0;
  ack->start_offset = ack->end_offset - length;
  ofs += r;
  if (ofs < ply->record_length){
    ack->binary = &ply->record[ofs];
    ack->binary_length = ply->record_length - ofs;
  }
  return 0;
}

// keep reading past messages until you find this type.
int message_ply_find_prev(struct message_ply_read *ply, const char message_type)
{
  int ret;
  while ((ret = message_ply_read_prev(ply)) == 0 && ply->type != message_type)
    ;
  return ret;
}

static void append_footer(struct overlay_buffer *b, char type)
{
  size_t message_len = ob_position(b) - ob_mark(b);
  assert(message_len <= MESSAGE_PLY_MAX_LEN);
  ob_append_ui16_rv(b, (message_len << 4) | (type&0xF));
}

// append a timestamp as a uint32_t with 1s precision
void message_ply_append_timestamp(struct overlay_buffer *b)
{
  if (!config.rhizome.reliable_clock)
    return;
  ob_checkpoint(b);
  ob_append_ui32_rv(b, gettime());
  append_footer(b, MESSAGE_BLOCK_TYPE_TIME);
}

void message_ply_append_ack(struct overlay_buffer *b, const struct message_ply_ack *ack)
{
  assert(ack->binary_length == 0 || ack->binary);
  ob_checkpoint(b);
  ob_append_packed_ui64(b, ack->end_offset);
  // append the number of bytes acked (should be smaller than an absolute offset)
  if (ack->start_offset || ack->binary_length)
    ob_append_packed_ui64(b, ack->end_offset - ack->start_offset);
  if (ack->binary_length)
    ob_append_bytes(b, ack->binary, ack->binary_length);
  append_footer(b, MESSAGE_BLOCK_TYPE_ACK);
}

void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len)
{
  ob_checkpoint(b);
  ob_append_strn(b, message, message_len);
  append_footer(b, MESSAGE_BLOCK_TYPE_MESSAGE);
}