From f060e4dc597e04c4462f19f47df599aa2dc09eed Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 22 Mar 2017 12:52:18 +1030 Subject: [PATCH] Add Java api for meshmb restful activity --- .../servaldna/ServalDClient.java | 11 +++ .../servaldna/meshmb/MeshMBActivityList.java | 59 +++++++++++++ .../meshmb/MeshMBActivityMessage.java | 51 +++++++++++ .../servaldna/meshmb/MeshMBCommon.java | 4 +- .../servaldna/meshmb/PlyMessage.java | 4 +- meshmb.c | 86 +++++++++++-------- meshmb.h | 3 +- meshmb_cli.c | 4 +- meshmb_restful.c | 43 ++++++---- message_ply.c | 7 +- 10 files changed, 211 insertions(+), 61 deletions(-) create mode 100644 java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityList.java create mode 100644 java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityMessage.java diff --git a/java-api/src/org/servalproject/servaldna/ServalDClient.java b/java-api/src/org/servalproject/servaldna/ServalDClient.java index b91e29db..398d9a7d 100644 --- a/java-api/src/org/servalproject/servaldna/ServalDClient.java +++ b/java-api/src/org/servalproject/servaldna/ServalDClient.java @@ -24,6 +24,7 @@ import org.servalproject.codec.Base64; import org.servalproject.servaldna.keyring.KeyringCommon; import org.servalproject.servaldna.keyring.KeyringIdentity; import org.servalproject.servaldna.keyring.KeyringIdentityList; +import org.servalproject.servaldna.meshmb.MeshMBActivityList; import org.servalproject.servaldna.meshmb.MeshMBCommon; import org.servalproject.servaldna.meshmb.MeshMBSubscriptionList; import org.servalproject.servaldna.meshmb.MessagePlyList; @@ -219,6 +220,16 @@ public class ServalDClient implements ServalDHttpConnectionFactory { return list; } + public MeshMBActivityList meshmbActivity(Subscriber identity) throws IOException, ServalDInterfaceException { + return meshmbActivity(identity, null); + } + + public MeshMBActivityList meshmbActivity(Subscriber identity, String token) throws IOException, ServalDInterfaceException { + MeshMBActivityList list = new MeshMBActivityList(this, identity, token); + list.connect(); + return list; + } + // interface ServalDHttpConnectionFactory public HttpURLConnection newServalDHttpConnection(String path) throws ServalDInterfaceException, IOException { diff --git a/java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityList.java b/java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityList.java new file mode 100644 index 00000000..fee69015 --- /dev/null +++ b/java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityList.java @@ -0,0 +1,59 @@ +package org.servalproject.servaldna.meshmb; + +import org.servalproject.json.JSONTableScanner; +import org.servalproject.servaldna.AbstractJsonList; +import org.servalproject.servaldna.ServalDHttpConnectionFactory; +import org.servalproject.servaldna.ServalDInterfaceException; +import org.servalproject.servaldna.SigningKey; +import org.servalproject.servaldna.Subscriber; +import org.servalproject.servaldna.SubscriberId; + +import java.io.IOException; +import java.util.Map; + +/** + * Created by jeremy on 21/03/17. + */ + +public class MeshMBActivityList extends AbstractJsonList { + private final Subscriber identity; + private final String token; + + public MeshMBActivityList(ServalDHttpConnectionFactory httpConnector, Subscriber identity, String token) { + super(httpConnector, new JSONTableScanner() + .addColumn(".token", String.class) + .addColumn("ack_offset", Long.class) + .addColumn("id", SigningKey.class) + .addColumn("author", SubscriberId.class) + .addColumn("name", String.class) + .addColumn("timestamp", Long.class) + .addColumn("offset", Long.class) + .addColumn("message", String.class)); + this.identity = identity; + this.token = token; + } + + @Override + protected String getUrl() { + if (token == null) + return "/restful/meshmb/" + identity.signingKey.toHex() + "/activity.json"; + if (token.equals("")) + return "/restful/meshmb/" + identity.signingKey.toHex() + "/activity/activity.json"; + return "/restful/meshmb/" + identity.signingKey.toHex() + "/activity/"+token+"/activity.json"; + } + + @Override + protected MeshMBActivityMessage factory(Map row, long rowCount) throws ServalDInterfaceException { + return new MeshMBActivityMessage( + (String) row.get(".token"), + (Long) row.get("ack_offset"), + new Subscriber((SubscriberId)row.get("author"), + (SigningKey) row.get("id"), + true), + (String) row.get("name"), + (Long) row.get("timestamp"), + (Long) row.get("offset"), + (String) row.get("message") + ); + } +} diff --git a/java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityMessage.java b/java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityMessage.java new file mode 100644 index 00000000..0ee6a39d --- /dev/null +++ b/java-api/src/org/servalproject/servaldna/meshmb/MeshMBActivityMessage.java @@ -0,0 +1,51 @@ +package org.servalproject.servaldna.meshmb; + +import org.servalproject.servaldna.Subscriber; + +import java.util.Date; + +/** + * Created by jeremy on 21/03/17. + */ + +public class MeshMBActivityMessage implements Comparable{ + public final String token; + public final long ack_offset; + public final Subscriber subscriber; + public final String name; + public final long offset; + public final Date date; + public final long timestamp; + public final String text; + + public MeshMBActivityMessage(String token, + long ack_offset, + Subscriber subscriber, + String name, + long timestamp, + long offset, + String text){ + this.token = token; + this.ack_offset = ack_offset; + this.subscriber = subscriber; + this.name = name; + this.offset = offset; + this.date = new Date(timestamp * 1000); + this.timestamp = timestamp; + this.text = text; + } + + @Override + public int compareTo(MeshMBActivityMessage message) { + if (this.ack_offset < message.ack_offset) + return 1; + if (this.ack_offset > message.ack_offset) + return -1; + if (this.offset < message.offset) + return 1; + if (this.offset > message.offset) + return -1; + return 0; + } + +} diff --git a/java-api/src/org/servalproject/servaldna/meshmb/MeshMBCommon.java b/java-api/src/org/servalproject/servaldna/meshmb/MeshMBCommon.java index bbd1ea97..391fc7da 100644 --- a/java-api/src/org/servalproject/servaldna/meshmb/MeshMBCommon.java +++ b/java-api/src/org/servalproject/servaldna/meshmb/MeshMBCommon.java @@ -29,14 +29,14 @@ public class MeshMBCommon { } public static int ignore(ServalDHttpConnectionFactory connector, Subscriber id, SigningKey peer) throws ServalDInterfaceException, IOException { - HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/follow/" + peer.toHex()); + HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/ignore/" + peer.toHex()); conn.setRequestMethod("POST"); conn.connect(); return conn.getResponseCode(); } public static int follow(ServalDHttpConnectionFactory connector, Subscriber id, SigningKey peer) throws ServalDInterfaceException, IOException { - HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/ignore/" + peer.toHex()); + HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/follow/" + peer.toHex()); conn.setRequestMethod("POST"); conn.connect(); return conn.getResponseCode(); diff --git a/java-api/src/org/servalproject/servaldna/meshmb/PlyMessage.java b/java-api/src/org/servalproject/servaldna/meshmb/PlyMessage.java index 2b18960e..5bd918d4 100644 --- a/java-api/src/org/servalproject/servaldna/meshmb/PlyMessage.java +++ b/java-api/src/org/servalproject/servaldna/meshmb/PlyMessage.java @@ -21,6 +21,8 @@ public class PlyMessage implements Comparable{ @Override public int compareTo(PlyMessage plyMessage) { - return (this.offset < plyMessage.offset) ? -1 : 0; + if (this.offset == plyMessage.offset) + return 0; + return (this.offset < plyMessage.offset) ? 1 : -1; } } diff --git a/meshmb.c b/meshmb.c index eed130be..c025540b 100644 --- a/meshmb.c +++ b/meshmb.c @@ -12,7 +12,7 @@ struct feed_metadata{ size_t tree_depth; - struct message_ply ply; // (ply starts with a rhizome_bid_t, so this is consistent with a nibble tree) + rhizome_bid_t bundle_id; struct meshmb_feed_details details; // what is the offset of their last message uint64_t last_message_offset; @@ -63,23 +63,25 @@ static int activity_next_ack(struct meshmb_activity_iterator *i){ if (message_ply_parse_ack(&i->ack_reader, &ack) == -1) return -1; - DEBUGF(meshmb, "Found ack for %s, %u to %u", + DEBUGF(meshmb, "Found ack for %s, %"PRIu64" to %"PRIu64, alloca_tohex(ack.binary, ack.binary_length), ack.start_offset, ack.end_offset); struct feed_metadata *metadata; if (tree_find(&i->feeds->root, (void**)&metadata, ack.binary, ack.binary_length, NULL, NULL)==TREE_FOUND){ if (i->metadata == metadata){ // shortcut for consecutive acks for the same incoming feed - DEBUGF(meshmb, "Ply still open @%u", + DEBUGF(meshmb, "Ply still open @%"PRIu64, i->msg_reader.read.offset); }else{ message_ply_read_close(&i->msg_reader); - if (message_ply_read_open(&i->msg_reader, &metadata->ply.bundle_id, NULL)==-1){ + if (message_ply_read_open(&i->msg_reader, &metadata->bundle_id, NULL)==-1){ i->metadata = NULL; continue; } i->metadata = metadata; } + }else{ + WARNF("Failed to find metadata for %s", alloca_tohex(ack.binary, ack.binary_length)); } i->ack_start = ack.start_offset; @@ -113,7 +115,7 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){ // can we read another message? if (message_ply_is_open(&i->msg_reader) && i->msg_reader.read.offset > i->ack_start){ - DEBUGF(meshmb, "Reading next incoming record from %u", + DEBUGF(meshmb, "Reading next incoming record from %"PRIu64, i->msg_reader.read.offset); if (message_ply_read_prev(&i->msg_reader)!=-1 && i->msg_reader.read.offset >= i->ack_start) @@ -146,6 +148,9 @@ static int finish_ack_writing(struct meshmb_feeds *feeds){ { struct overlay_buffer *b = ob_new(); message_ply_append_timestamp(b); + assert(!ob_overrun(b)); + DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64, + ob_position(b), feeds->ack_writer.file_offset); ret = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b)); ob_free(b); } @@ -178,24 +183,27 @@ static int finish_ack_writing(struct meshmb_feeds *feeds){ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader) { - if (!metadata->ply.found){ + if (!metadata->details.ply.found){ // get the current size from the db sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; - if (sqlite_exec_uint64_retry(&retry, &metadata->ply.size, + if (sqlite_exec_uint64_retry(&retry, &metadata->details.ply.size, "SELECT filesize FROM manifests WHERE id = ?", - RHIZOME_BID_T, &metadata->ply.bundle_id, - END) == SQLITE_ROW) - metadata->ply.found = 1; + RHIZOME_BID_T, &metadata->bundle_id, + END) == SQLITE_ROW){ + metadata->details.ply.found = 1; + } else return -1; } - DEBUGF(meshmb, "Size from %u to %u", metadata->size, metadata->ply.size); - if (metadata->size == metadata->ply.size) + DEBUGF(meshmb, "Size of %s from %"PRIu64" to %"PRIu64, + alloca_tohex_rhizome_bid_t(metadata->bundle_id), + metadata->size, metadata->details.ply.size); + if (metadata->size == metadata->details.ply.size) return 0; if (!message_ply_is_open(reader) - && message_ply_read_open(reader, &metadata->ply.bundle_id, NULL)!=0) + && message_ply_read_open(reader, &metadata->bundle_id, NULL)!=0) return -1; // TODO allow the user to specify an overridden name? @@ -203,7 +211,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada free((void*)metadata->details.name); metadata->details.name = NULL; } - metadata->details.author = reader->author; + metadata->details.ply.author = reader->author; if (reader->name){ size_t len = strlen(reader->name); @@ -242,7 +250,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada } } - DEBUGF(meshmb, "Last message from %u to %u", metadata->last_message_offset, last_offset); + DEBUGF(meshmb, "Last message from %"PRIu64" to %"PRIu64, metadata->last_message_offset, last_offset); if (last_offset > metadata->last_message_offset){ // add an ack to our journal to thread new messages if (!feeds->ack_manifest){ @@ -283,11 +291,14 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada bzero(&ack, sizeof ack); ack.start_offset = metadata->size; - ack.end_offset = metadata->ply.size; - ack.binary = metadata->ply.bundle_id.binary; + ack.end_offset = metadata->details.ply.size; + ack.binary = metadata->bundle_id.binary; ack.binary_length = (metadata->tree_depth >> 3) + 3; message_ply_append_ack(b, &ack); + assert(!ob_overrun(b)); + DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64, + ob_position(b), feeds->ack_writer.file_offset); int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b)); DEBUGF(meshmb, "Acked incoming messages"); ob_free(b); @@ -297,7 +308,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada } metadata->last_message_offset = last_offset; - metadata->size = metadata->ply.size; + metadata->size = metadata->details.ply.size; feeds->dirty=1; return 1; @@ -320,11 +331,11 @@ int meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct { struct feed_metadata *metadata; if (strcmp(m->service, RHIZOME_SERVICE_MESHMB) == 0 - && tree_find(&feeds->root, (void**)&metadata, m->keypair.public_key.binary, sizeof m->keypair.public_key.binary, NULL, NULL)==0){ + && tree_find(&feeds->root, (void**)&metadata, m->keypair.public_key.binary, sizeof m->keypair.public_key.binary, NULL, NULL)==TREE_FOUND){ - metadata->ply.found = 1; - if (metadata->ply.size != m->filesize){ - metadata->ply.size = m->filesize; + metadata->details.ply.found = 1; + if (metadata->details.ply.size != m->filesize){ + metadata->details.ply.size = m->filesize; if (update_stats(feeds, metadata, reader)==-1) return -1; } @@ -352,9 +363,9 @@ static int write_metadata(void **record, void *context) uint8_t buffer[sizeof (rhizome_bid_t) + sizeof (sid_t) + 1 + 12*4 + name_len + msg_len]; size_t len = 0; - bcopy(metadata->ply.bundle_id.binary, &buffer[len], sizeof (rhizome_bid_t)); + bcopy(metadata->bundle_id.binary, &buffer[len], sizeof (rhizome_bid_t)); len += sizeof (rhizome_bid_t); - bcopy(metadata->details.author.binary, &buffer[len], sizeof (sid_t)); + bcopy(metadata->details.ply.author.binary, &buffer[len], sizeof (sid_t)); len += sizeof (sid_t); buffer[len++]=0;// flags? len+=pack_uint(&buffer[len], metadata->size); @@ -372,10 +383,10 @@ static int write_metadata(void **record, void *context) buffer[len]=0; len+=msg_len; assert(len < sizeof buffer); - DEBUGF(meshmb, "Write %u bytes of metadata for %s/%s", + DEBUGF(meshmb, "Write %zu bytes of metadata for %s/%s", len, - alloca_tohex_rhizome_bid_t(metadata->ply.bundle_id), - alloca_tohex_sid_t(metadata->details.author) + alloca_tohex_rhizome_bid_t(metadata->bundle_id), + alloca_tohex_sid_t(metadata->details.ply.author) ); return rhizome_write_buffer(write, buffer, len); } @@ -468,7 +479,10 @@ static void* alloc_feed(void *context, const uint8_t *binary, size_t UNUSED(bin_ struct meshmb_feeds *feeds = (struct meshmb_feeds *)context; struct feed_metadata *feed = emalloc_zero(sizeof(struct feed_metadata)); if (feed){ - feed->ply.bundle_id = *(rhizome_bid_t *)binary; + struct tree_record *tree = (struct tree_record *)feed; + assert(&tree->binary[0] == &feed->bundle_id.binary[0]); + feed->details.ply.bundle_id = *(rhizome_bid_t *)binary; + feed->bundle_id = *(rhizome_bid_t *)binary; feeds->dirty = 1; DEBUGF(meshmb, "Allocated feed"); } @@ -490,7 +504,7 @@ static int read_metadata(struct meshmb_feeds *feeds, struct rhizome_read *read) while(1){ ssize_t bytes = rhizome_read_buffered(read, &buff, buffer, sizeof buffer); - if (bytes==0) + if (bytes<=0) break; uint64_t delta=0; @@ -545,7 +559,9 @@ static int read_metadata(struct meshmb_feeds *feeds, struct rhizome_read *read) goto error; } - read->offset += offset - bytes; + DEBUGF(meshmb, "Seeking backwards %"PRIu64", %u, %zu", read->offset, offset, bytes); + read->offset = (read->offset - bytes) + offset; + struct feed_metadata *result; if (tree_find(&feeds->root, (void**)&result, bid->binary, sizeof *bid, alloc_feed, feeds)<0) return WHY("Failed to allocate metadata"); @@ -553,16 +569,15 @@ static int read_metadata(struct meshmb_feeds *feeds, struct rhizome_read *read) result->last_message_offset = last_message_offset; result->last_seen = last_seen; result->size = size; - result->details.bundle_id = *bid; - result->details.author = *author; + result->details.ply.author = *author; result->details.name = (name && *name) ? str_edup(name) : NULL; result->details.last_message = (msg && *msg) ? str_edup(msg) : NULL; result->details.timestamp = timestamp; - DEBUGF(meshmb, "Processed %u bytes of metadata for %s", + DEBUGF(meshmb, "Processed %u bytes of metadata for %s (%s)", offset, - alloca_tohex_rhizome_bid_t(result->ply.bundle_id), - alloca_tohex_sid_t(result->details.author) + alloca_tohex_rhizome_bid_t(*bid), + alloca_tohex_sid_t(*author) ); } feeds->dirty = 0; @@ -698,7 +713,6 @@ int meshmb_send(const keyring_identity *id, const char *message, size_t message_ struct overlay_buffer *b = ob_new(); message_ply_append_message(b, message, message_len); message_ply_append_timestamp(b); - assert(!ob_overrun(b)); keyring_identity_extract(id, &did, &name); int ret = message_ply_append(id, RHIZOME_SERVICE_MESHMB, NULL, &ply, b, name, nassignments, assignments); diff --git a/meshmb.h b/meshmb.h index a377941b..6e850bb7 100644 --- a/meshmb.h +++ b/meshmb.h @@ -10,8 +10,7 @@ enum meshmb_send_status{ // details of a feed that you are following struct meshmb_feed_details{ - rhizome_bid_t bundle_id; - sid_t author; + struct message_ply ply; const char *name; const char *last_message; time_s_t timestamp; diff --git a/meshmb_cli.c b/meshmb_cli.c index 6f81e54e..1bbfb6af 100644 --- a/meshmb_cli.c +++ b/meshmb_cli.c @@ -256,8 +256,8 @@ static int list_callback(struct meshmb_feed_details *details, void *context) struct cli_enum_context *enum_context = context; enum_context->rowcount++; cli_put_long(enum_context->context, enum_context->rowcount, ":"); - cli_put_string(enum_context->context, alloca_tohex_rhizome_bid_t(details->bundle_id), ":"); - cli_put_string(enum_context->context, alloca_tohex_sid_t(details->author), ":"); + cli_put_string(enum_context->context, alloca_tohex_rhizome_bid_t(details->ply.bundle_id), ":"); + cli_put_string(enum_context->context, alloca_tohex_sid_t(details->ply.author), ":"); cli_put_string(enum_context->context, details->name, ":"); cli_put_long(enum_context->context, details->timestamp ? (long)(gettime() - details->timestamp) : (long)-1, ":"); cli_put_string(enum_context->context, details->last_message, "\n"); diff --git a/meshmb_restful.c b/meshmb_restful.c index e404556d..ce656fdb 100644 --- a/meshmb_restful.c +++ b/meshmb_restful.c @@ -26,7 +26,7 @@ static int send_part_end(struct http_request *hr) if (r->u.sendmsg.message.length == 0) return http_response_form_part(r, 400, "Invalid (empty)", PART_MESSAGE, NULL, 0); r->u.sendmsg.received_message = 1; - DEBUGF(httpd, "received %s = %s", PART_MESSAGE, alloca_toprint(-1, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length)); + DEBUGF(meshmb, "received %s = %s", PART_MESSAGE, alloca_toprint(-1, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length)); } else FATALF("current_part = %s", alloca_str_toprint(r->u.sendmsg.current_part)); r->u.sendmsg.current_part = NULL; @@ -165,6 +165,8 @@ static int strn_to_activity_token(const char *str, httpd_request *r, const char && **afterp=='/'){ (*afterp)++; } else { + r->u.meshmb_feeds.end_ack_offset=0; + r->u.meshmb_feeds.end_msg_offset=0; *afterp=str; } return 1; @@ -181,12 +183,12 @@ static int next_ply_message(httpd_request *r){ if (r->u.plylist.current_offset) r->u.plylist.ply_reader.read.offset = r->u.plylist.current_offset; - DEBUGF(httpd, "Opened ply @%"PRIu64, r->u.plylist.ply_reader.read.offset); + DEBUGF(meshmb, "Opened ply @%"PRIu64, r->u.plylist.ply_reader.read.offset); } if (r->u.plylist.current_offset==0){ // enumerate everything from the top - DEBUGF(httpd, "Started reading @%"PRIu64, r->u.plylist.ply_reader.read.length); + DEBUGF(meshmb, "Started reading @%"PRIu64, r->u.plylist.ply_reader.read.length); r->u.plylist.current_offset = r->u.plylist.start_offset = r->u.plylist.ply_reader.read.offset = @@ -196,7 +198,7 @@ static int next_ply_message(httpd_request *r){ while(message_ply_read_prev(&r->u.plylist.ply_reader) == 0){ r->u.plylist.current_offset = r->u.plylist.ply_reader.record_end_offset; if (r->u.plylist.current_offset <= r->u.plylist.end_offset){ - DEBUGF(httpd, "Hit end %"PRIu64" @%"PRIu64, + DEBUGF(meshmb, "Hit end %"PRIu64" @%"PRIu64, r->u.plylist.end_offset, r->u.plylist.current_offset); break; } @@ -239,7 +241,7 @@ static int restful_meshmb_list_json_content_chunk(struct http_request *hr, strbu "timestamp" }; - DEBUGF(httpd, "Phase %d", r->u.plylist.phase); + DEBUGF(meshmb, "Phase %d", r->u.plylist.phase); switch (r->u.plylist.phase) { case LIST_HEADER: @@ -535,9 +537,9 @@ static int restful_feedlist_enum(struct meshmb_feed_details *details, void *cont if (state->request->u.meshmb_feeds.rowcount!=0) strbuf_putc(state->buffer, ','); strbuf_puts(state->buffer, "\n["); - strbuf_json_hex(state->buffer, details->bundle_id.binary, sizeof details->bundle_id.binary); + strbuf_json_hex(state->buffer, details->ply.bundle_id.binary, sizeof details->ply.bundle_id.binary); strbuf_puts(state->buffer, ","); - strbuf_json_hex(state->buffer, details->author.binary, sizeof details->author.binary); + strbuf_json_hex(state->buffer, details->ply.author.binary, sizeof details->ply.author.binary); strbuf_puts(state->buffer, ","); strbuf_json_string(state->buffer, details->name); strbuf_puts(state->buffer, ","); @@ -551,7 +553,7 @@ static int restful_feedlist_enum(struct meshmb_feed_details *details, void *cont return 1; }else{ ++state->request->u.meshmb_feeds.rowcount; - state->request->u.meshmb_feeds.bundle_id = details->bundle_id; + state->request->u.meshmb_feeds.bundle_id = details->ply.bundle_id; return 0; } } @@ -567,7 +569,7 @@ static int restful_meshmb_feedlist_json_content_chunk(struct http_request *hr, s "last_message" }; - DEBUGF(httpd, "Phase %d", r->u.meshmb_feeds.phase); + DEBUGF(meshmb, "Phase %d", r->u.meshmb_feeds.phase); switch (r->u.meshmb_feeds.phase) { case LIST_HEADER: @@ -620,7 +622,7 @@ static void feedlist_on_rhizome_add(httpd_request *r, rhizome_manifest *m) if (ret!=1) return; - // TODO short timer? Syncing with a new neighbour might update a lot of subscribed feeds. + // TODO delay until resumed? int gen = meshmb_flush(r->u.meshmb_feeds.session->feeds); if (gen>=0 && gen != r->u.meshmb_feeds.generation){ if (r->u.meshmb_feeds.iterator){ @@ -673,7 +675,7 @@ static void activity_test_end(httpd_request *r){ r->u.meshmb_feeds.phase = LIST_ROWS; - DEBUGF(httpd,"Iterator @ack %"PRIu64", @msg %"PRIu64, + DEBUGF(meshmb,"Iterator @ack %"PRIu64", @msg %"PRIu64, r->u.meshmb_feeds.current_ack_offset, r->u.meshmb_feeds.current_msg_offset); return; @@ -722,6 +724,7 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s httpd_request *r = (httpd_request *) hr; const char *headers[] = { ".token", + "ack_offset", "id", "author", "name", @@ -730,7 +733,7 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s "message" }; - DEBUGF(httpd, "Phase %d", r->u.meshmb_feeds.phase); + DEBUGF(meshmb, "Phase %d", r->u.meshmb_feeds.phase); switch (r->u.meshmb_feeds.phase) { case LIST_HEADER: @@ -750,9 +753,11 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s case LIST_ROWS: case LIST_FIRST: -ROWS: { activity_iterator_open(r); + if (r->u.meshmb_feeds.phase == LIST_END) + return 1; + struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; if (r->u.meshmb_feeds.rowcount!=0) @@ -760,6 +765,8 @@ ROWS: strbuf_puts(b, "\n[\""); activity_token_to_str(b, r); strbuf_puts(b, "\","); + strbuf_sprintf(b, "%"PRIu64, iterator->ack_reader.record_end_offset); + strbuf_puts(b, ","); strbuf_json_hex(b, iterator->msg_reader.bundle_id.binary, sizeof iterator->msg_reader.bundle_id.binary); strbuf_puts(b, ","); strbuf_json_hex(b, iterator->msg_reader.author.binary, sizeof iterator->msg_reader.author.binary); @@ -768,13 +775,13 @@ ROWS: strbuf_puts(b, ","); strbuf_sprintf(b, "%d", iterator->ack_timestamp); strbuf_puts(b, ","); - strbuf_sprintf(b, "%lu", iterator->msg_reader.record_end_offset); + strbuf_sprintf(b, "%"PRIu64, iterator->msg_reader.record_end_offset); strbuf_puts(b, ","); strbuf_json_string(b, (const char *)iterator->msg_reader.record); strbuf_puts(b, "]"); if (!strbuf_overrun(b)){ r->u.meshmb_feeds.rowcount++; - DEBUGF(httpd, "Wrote record %u (%s)", r->u.meshmb_feeds.rowcount, (const char *)iterator->msg_reader.record); + DEBUGF(meshmb, "Wrote record %u (%s)", r->u.meshmb_feeds.rowcount, (const char *)iterator->msg_reader.record); activity_next(r); } return 1; @@ -793,14 +800,14 @@ ROWS: struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; if (iterator && iterator->ack_reader.read.length > r->u.meshmb_feeds.start_ack_offset){ - DEBUGF(httpd, "Seeking back to ack %"PRIu64", msg (0) to resume now", iterator->ack_reader.read.length); + DEBUGF(meshmb, "Seeking back to ack %"PRIu64", msg (0) to resume now", iterator->ack_reader.read.length); r->u.meshmb_feeds.start_ack_offset = iterator->ack_reader.read.length; meshmb_activity_seek(iterator, r->u.meshmb_feeds.start_ack_offset, 0); r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset; r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset; if (iterator->msg_reader.type != MESSAGE_BLOCK_TYPE_MESSAGE) activity_next(r); - goto ROWS; + return 1; } r->u.meshmb_feeds.start_ack_offset = 0; @@ -846,6 +853,8 @@ static int restful_meshmb_activity(httpd_request *r, const char *remainder) r->u.meshmb_feeds.session = session; r->u.meshmb_feeds.iterator = NULL; r->u.meshmb_feeds.generation = meshmb_flush(session->feeds); + r->u.meshmb_feeds.current_ack_offset = 0; + r->u.meshmb_feeds.current_msg_offset = 0; bzero(&r->u.meshmb_feeds.bundle_id, sizeof r->u.meshmb_feeds.bundle_id); http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshmb_activity_json_content); diff --git a/message_ply.c b/message_ply.c index fd583528..347b711c 100644 --- a/message_ply.c +++ b/message_ply.c @@ -157,8 +157,12 @@ int message_ply_append(const keyring_identity *id, const char *service, const si struct message_ply_write write; int ret=-1; + assert(!ob_overrun(b)); + if (message_ply_write_open(&write, id, service, recipient, ply, name, nassignments, assignments, 0) == -1) goto end; + DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64, + ob_position(b), write.write.written_offset); if (rhizome_write_buffer(&write.write, ob_ptr(b), ob_position(b)) == -1) goto end; ret = message_ply_write_finish(&write); @@ -249,7 +253,8 @@ int message_ply_read_prev(struct message_ply_read *ply) ply->type = r & 0xF; ply->record_length = r >> 4; } - DEBUGF2(meshms, meshmb, "Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset); + DEBUGF2(meshms, meshmb, "Found record %d, length %d @%"PRId64" - @%"PRId64, + ply->type, ply->record_length, ply->record_end_offset - (ply->record_length + sizeof footer), ply->record_end_offset); // need to allow for advancing the tail and cutting a message in half. if (ply->record_length + sizeof footer > ply->read.offset){ DEBUGF2(meshms, meshmb, "EOF");