Ack incoming feeds for threading

This commit is contained in:
Jeremy Lakeman 2017-02-14 10:45:06 +10:30
parent ecc77b4088
commit 940b8c2d4b
6 changed files with 114 additions and 23 deletions

108
meshmb.c
View File

@ -26,6 +26,9 @@ struct meshmb_feeds{
struct tree_root root;
keyring_identity *id;
sign_keypair_t bundle_keypair;
sign_keypair_t ack_bundle_keypair;
rhizome_manifest *ack_manifest;
struct rhizome_write ack_writer;
bool_t dirty;
uint8_t generation;
};
@ -34,7 +37,34 @@ struct meshmb_feeds{
#define MAX_NAME_LEN (256) // ??
#define MAX_MSG_LEN (256) // ??
static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader)
static int finish_ack_writing(struct meshmb_feeds *feeds){
if (!feeds->ack_manifest)
return 0;
int ret=-1;
DEBUGF(meshmb, "Completing private ply for ack thread");
enum rhizome_payload_status status = rhizome_finish_write(&feeds->ack_writer);
status = rhizome_finish_store(&feeds->ack_writer, feeds->ack_manifest, status);
if (status == RHIZOME_PAYLOAD_STATUS_NEW){
rhizome_manifest *mout=NULL;
struct rhizome_bundle_result result = rhizome_manifest_finalise(feeds->ack_manifest, &mout, 0);
if (mout && mout!=feeds->ack_manifest)
rhizome_manifest_free(mout);
if (result.status == RHIZOME_BUNDLE_STATUS_NEW)
ret = 0;
rhizome_bundle_result_free(&result);
}
bzero(&feeds->ack_writer, sizeof feeds->ack_writer);
rhizome_manifest_free(feeds->ack_manifest);
feeds->ack_manifest = NULL;
return ret;
}
static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader)
{
if (!metadata->ply.found){
// get the current size from the db
@ -45,17 +75,18 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad
END) == SQLITE_ROW)
metadata->ply.found = 1;
else
return;
return -1;
}
DEBUGF(meshmb, "Size from %u to %u", metadata->size, metadata->ply.size);
if (metadata->size == metadata->ply.size)
return;
return 0;
if (!message_ply_is_open(reader)
&& message_ply_read_open(reader, &metadata->ply.bundle_id)!=0)
return;
return -1;
// TODO remember if user has overridden the name?
// TODO allow the user to specify an overridden name?
if (metadata->details.name){
free((void*)metadata->details.name);
metadata->details.name = NULL;
@ -71,7 +102,11 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad
reader->read.offset = reader->read.length;
time_s_t timestamp = 0;
uint64_t last_offset = metadata->last_message_offset;
while (message_ply_read_prev(reader) == 0){
if (reader->record_end_offset <= metadata->size)
break;
if (reader->type == MESSAGE_BLOCK_TYPE_TIME){
if (reader->record_length<4){
WARN("Malformed ply, expected 4 byte timestamp");
@ -83,7 +118,7 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad
if (metadata->last_message_offset == reader->record_end_offset)
break;
metadata->last_message_offset = reader->record_end_offset;
last_offset = reader->record_end_offset;
if (metadata->details.last_message)
free((void*)metadata->details.last_message);
@ -96,13 +131,57 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad
}
}
DEBUGF(meshmb, "Last message from %u to %u", metadata->last_message_offset, last_offset);
if (last_offset > metadata->last_message_offset){
// add an ack to our journal to thread new messages
if (!feeds->ack_manifest){
rhizome_manifest *m = rhizome_new_manifest();
DEBUGF(meshmb, "Opening private ply for ack thread");
struct rhizome_bundle_result result = rhizome_private_bundle(m, &feeds->ack_bundle_keypair);
switch(result.status){
case RHIZOME_BUNDLE_STATUS_NEW:
rhizome_manifest_set_tail(m, 0);
rhizome_manifest_set_filesize(m, 0);
case RHIZOME_BUNDLE_STATUS_SAME:
{
enum rhizome_payload_status pstatus = rhizome_write_open_journal(&feeds->ack_writer, m, 0, RHIZOME_SIZE_UNSET);
if (pstatus==RHIZOME_PAYLOAD_STATUS_NEW)
break;
}
// fallthrough
case RHIZOME_BUNDLE_STATUS_BUSY:
rhizome_bundle_result_free(&result);
rhizome_manifest_free(m);
return -1;
default:
// everything else should be impossible.
FATALF("Cannot create manifest: %s", alloca_rhizome_bundle_result(result));
}
rhizome_bundle_result_free(&result);
feeds->ack_manifest = m;
}
{
struct overlay_buffer *b = ob_new();
message_ply_append_ack(b, last_offset, metadata->last_message_offset, &metadata->ply.bundle_id);
int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
DEBUGF(meshmb, "Acked incoming messages");
ob_free(b);
if (r == -1)
return -1;
}
}
metadata->last_message_offset = last_offset;
metadata->size = metadata->ply.size;
// TODO assemble ACK list for unified reading....?
feeds->dirty=1;
return;
return 1;
}
// TODO, might be quicker to fetch all meshmb bundles and test if they are in the feed list
@ -182,6 +261,8 @@ static int write_metadata(void **record, void *context)
int meshmb_flush(struct meshmb_feeds *feeds)
{
finish_ack_writing(feeds);
if (!feeds->dirty){
DEBUGF(meshmb, "Ignoring flush, not dirty");
return feeds->generation;
@ -378,9 +459,16 @@ int meshmb_open(keyring_identity *id, struct meshmb_feeds **feeds)
(*feeds)->id = id;
rhizome_manifest *m = rhizome_new_manifest();
if (m){
// deterministic bundle id's for storing active follow / ignore state;
crypto_seed_keypair(&(*feeds)->bundle_keypair,
"91656c3d62e9fe2678a1a81fabe3f413%s5a37120ca55d911634560e4d4dc1283f",
alloca_tohex(id->sign_keypair->private_key.binary, sizeof id->sign_keypair->private_key));
// and for threading incoming feed messages;
crypto_seed_keypair(&(*feeds)->ack_bundle_keypair,
"de3f2e21d9735d41b1fd7ddf03a58f2b%s937a440c12f9478d026bbf579ab115c0",
alloca_tohex(id->sign_keypair->private_key.binary, sizeof id->sign_keypair->private_key));
struct rhizome_bundle_result result = rhizome_private_bundle(m, &(*feeds)->bundle_keypair);
DEBUGF(meshmb, "Private bundle %s, %s",
alloca_tohex_identity_t(&(*feeds)->bundle_keypair.public_key),

View File

@ -304,7 +304,7 @@ static enum meshms_status update_conversation(const keyring_identity *id, struct
unsigned char buffer[30];
struct overlay_buffer *b = ob_static(buffer, sizeof buffer);
message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack);
message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack, NULL);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
@ -566,7 +566,7 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct
rhizome_manifest_set_filehash(m, &write.id);
rhizome_manifest_set_filesize(m, write.file_length);
struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1);
struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 0);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_ERROR:
// error is already logged
@ -917,7 +917,7 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
uint8_t ack = (c->metadata.my_last_ack < c->metadata.their_last_message) ? 1:0;
DEBUGF(meshms,"Our ack %"PRIu64", their message %"PRIu64, c->metadata.my_last_ack, c->metadata.their_last_message);
if (ack)
message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack);
message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack, NULL);
message_ply_append_message(b, message, message_len);
message_ply_append_timestamp(b);

View File

@ -287,13 +287,15 @@ void message_ply_append_timestamp(struct overlay_buffer *b)
append_footer(b, MESSAGE_BLOCK_TYPE_TIME);
}
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset)
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid)
{
ob_checkpoint(b);
ob_append_packed_ui64(b, message_offset);
// append the number of bytes acked (should be smaller than an absolute offset)
if (previous_ack_offset)
if (previous_ack_offset || bid)
ob_append_packed_ui64(b, message_offset - previous_ack_offset);
if (bid)
ob_append_bytes(b, bid->binary, sizeof *bid);
append_footer(b, MESSAGE_BLOCK_TYPE_ACK);
}

View File

@ -41,7 +41,6 @@ struct message_ply_read {
struct message_ply_write{
rhizome_manifest *m;
struct rhizome_write write;
};
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid);
@ -65,7 +64,7 @@ int message_ply_write_open(
int message_ply_write_finish(struct message_ply_write *write);
void message_ply_write_close(struct message_ply_write *write);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid);
void message_ply_append_timestamp(struct overlay_buffer *b);
void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len);
int message_ply_append(const struct keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,

View File

@ -875,6 +875,7 @@ enum rhizome_payload_status rhizome_write_open_manifest(struct rhizome_write *wr
enum rhizome_payload_status rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, uint64_t advance_by, uint64_t append_size);
int rhizome_write_file(struct rhizome_write *write, const char *filename, off_t offset, uint64_t length);
void rhizome_fail_write(struct rhizome_write *write);
int is_rhizome_write_open(const struct rhizome_write *write);
enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write);
enum rhizome_payload_status rhizome_finish_store(struct rhizome_write *write, rhizome_manifest *m, enum rhizome_payload_status status);
enum rhizome_payload_status rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath);

View File

@ -690,6 +690,11 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename, off_t
return ret;
}
int is_rhizome_write_open(const struct rhizome_write *write)
{
return write->temp_id ? 1:0;
}
void rhizome_fail_write(struct rhizome_write *write)
{
if (write->blob_fd != -1){
@ -711,6 +716,7 @@ void rhizome_fail_write(struct rhizome_write *write)
write->buffer_list=n->_next;
free(n);
}
write->temp_id=0;
}
static int keep_hash(struct rhizome_write *write_state, struct crypto_hash_sha512_state *hash_state)
@ -1827,12 +1833,6 @@ enum rhizome_payload_status rhizome_write_open_journal(struct rhizome_write *wri
enum rhizome_payload_status rhizome_finish_store(struct rhizome_write *write, rhizome_manifest *m, enum rhizome_payload_status status)
{
DEBUGF(rhizome, "write=%p m=manifest %p, status=%d %s", write, m, status, rhizome_payload_status_message_nonnull(status));
switch (status) {
case RHIZOME_PAYLOAD_STATUS_NEW:
break;
default:
break;
}
int status_valid = 0;
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
@ -1890,6 +1890,7 @@ enum rhizome_payload_status rhizome_append_journal_buffer(rhizome_manifest *m, u
{
struct rhizome_write write;
bzero(&write, sizeof write);
assert(advance_by || (buffer && len));
enum rhizome_payload_status status = rhizome_write_open_journal(&write, m, advance_by, (uint64_t) len);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return status;