Add Java api for meshmb restful activity

This commit is contained in:
Jeremy Lakeman 2017-03-22 12:52:18 +10:30
parent 4b98def664
commit f060e4dc59
10 changed files with 211 additions and 61 deletions

View File

@ -24,6 +24,7 @@ import org.servalproject.codec.Base64;
import org.servalproject.servaldna.keyring.KeyringCommon;
import org.servalproject.servaldna.keyring.KeyringIdentity;
import org.servalproject.servaldna.keyring.KeyringIdentityList;
import org.servalproject.servaldna.meshmb.MeshMBActivityList;
import org.servalproject.servaldna.meshmb.MeshMBCommon;
import org.servalproject.servaldna.meshmb.MeshMBSubscriptionList;
import org.servalproject.servaldna.meshmb.MessagePlyList;
@ -219,6 +220,16 @@ public class ServalDClient implements ServalDHttpConnectionFactory {
return list;
}
public MeshMBActivityList meshmbActivity(Subscriber identity) throws IOException, ServalDInterfaceException {
return meshmbActivity(identity, null);
}
public MeshMBActivityList meshmbActivity(Subscriber identity, String token) throws IOException, ServalDInterfaceException {
MeshMBActivityList list = new MeshMBActivityList(this, identity, token);
list.connect();
return list;
}
// interface ServalDHttpConnectionFactory
public HttpURLConnection newServalDHttpConnection(String path) throws ServalDInterfaceException, IOException
{

View File

@ -0,0 +1,59 @@
package org.servalproject.servaldna.meshmb;
import org.servalproject.json.JSONTableScanner;
import org.servalproject.servaldna.AbstractJsonList;
import org.servalproject.servaldna.ServalDHttpConnectionFactory;
import org.servalproject.servaldna.ServalDInterfaceException;
import org.servalproject.servaldna.SigningKey;
import org.servalproject.servaldna.Subscriber;
import org.servalproject.servaldna.SubscriberId;
import java.io.IOException;
import java.util.Map;
/**
* Created by jeremy on 21/03/17.
*/
public class MeshMBActivityList extends AbstractJsonList<MeshMBActivityMessage, IOException> {
private final Subscriber identity;
private final String token;
public MeshMBActivityList(ServalDHttpConnectionFactory httpConnector, Subscriber identity, String token) {
super(httpConnector, new JSONTableScanner()
.addColumn(".token", String.class)
.addColumn("ack_offset", Long.class)
.addColumn("id", SigningKey.class)
.addColumn("author", SubscriberId.class)
.addColumn("name", String.class)
.addColumn("timestamp", Long.class)
.addColumn("offset", Long.class)
.addColumn("message", String.class));
this.identity = identity;
this.token = token;
}
@Override
protected String getUrl() {
if (token == null)
return "/restful/meshmb/" + identity.signingKey.toHex() + "/activity.json";
if (token.equals(""))
return "/restful/meshmb/" + identity.signingKey.toHex() + "/activity/activity.json";
return "/restful/meshmb/" + identity.signingKey.toHex() + "/activity/"+token+"/activity.json";
}
@Override
protected MeshMBActivityMessage factory(Map<String, Object> row, long rowCount) throws ServalDInterfaceException {
return new MeshMBActivityMessage(
(String) row.get(".token"),
(Long) row.get("ack_offset"),
new Subscriber((SubscriberId)row.get("author"),
(SigningKey) row.get("id"),
true),
(String) row.get("name"),
(Long) row.get("timestamp"),
(Long) row.get("offset"),
(String) row.get("message")
);
}
}

View File

@ -0,0 +1,51 @@
package org.servalproject.servaldna.meshmb;
import org.servalproject.servaldna.Subscriber;
import java.util.Date;
/**
* Created by jeremy on 21/03/17.
*/
public class MeshMBActivityMessage implements Comparable<MeshMBActivityMessage>{
public final String token;
public final long ack_offset;
public final Subscriber subscriber;
public final String name;
public final long offset;
public final Date date;
public final long timestamp;
public final String text;
public MeshMBActivityMessage(String token,
long ack_offset,
Subscriber subscriber,
String name,
long timestamp,
long offset,
String text){
this.token = token;
this.ack_offset = ack_offset;
this.subscriber = subscriber;
this.name = name;
this.offset = offset;
this.date = new Date(timestamp * 1000);
this.timestamp = timestamp;
this.text = text;
}
@Override
public int compareTo(MeshMBActivityMessage message) {
if (this.ack_offset < message.ack_offset)
return 1;
if (this.ack_offset > message.ack_offset)
return -1;
if (this.offset < message.offset)
return 1;
if (this.offset > message.offset)
return -1;
return 0;
}
}

View File

@ -29,14 +29,14 @@ public class MeshMBCommon {
}
public static int ignore(ServalDHttpConnectionFactory connector, Subscriber id, SigningKey peer) throws ServalDInterfaceException, IOException {
HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/follow/" + peer.toHex());
HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/ignore/" + peer.toHex());
conn.setRequestMethod("POST");
conn.connect();
return conn.getResponseCode();
}
public static int follow(ServalDHttpConnectionFactory connector, Subscriber id, SigningKey peer) throws ServalDInterfaceException, IOException {
HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/ignore/" + peer.toHex());
HttpURLConnection conn = connector.newServalDHttpConnection("/restful/meshmb/" + id.signingKey.toHex() + "/follow/" + peer.toHex());
conn.setRequestMethod("POST");
conn.connect();
return conn.getResponseCode();

View File

@ -21,6 +21,8 @@ public class PlyMessage implements Comparable<PlyMessage>{
@Override
public int compareTo(PlyMessage plyMessage) {
return (this.offset < plyMessage.offset) ? -1 : 0;
if (this.offset == plyMessage.offset)
return 0;
return (this.offset < plyMessage.offset) ? 1 : -1;
}
}

View File

@ -12,7 +12,7 @@
struct feed_metadata{
size_t tree_depth;
struct message_ply ply; // (ply starts with a rhizome_bid_t, so this is consistent with a nibble tree)
rhizome_bid_t bundle_id;
struct meshmb_feed_details details;
// what is the offset of their last message
uint64_t last_message_offset;
@ -63,23 +63,25 @@ static int activity_next_ack(struct meshmb_activity_iterator *i){
if (message_ply_parse_ack(&i->ack_reader, &ack) == -1)
return -1;
DEBUGF(meshmb, "Found ack for %s, %u to %u",
DEBUGF(meshmb, "Found ack for %s, %"PRIu64" to %"PRIu64,
alloca_tohex(ack.binary, ack.binary_length), ack.start_offset, ack.end_offset);
struct feed_metadata *metadata;
if (tree_find(&i->feeds->root, (void**)&metadata, ack.binary, ack.binary_length, NULL, NULL)==TREE_FOUND){
if (i->metadata == metadata){
// shortcut for consecutive acks for the same incoming feed
DEBUGF(meshmb, "Ply still open @%u",
DEBUGF(meshmb, "Ply still open @%"PRIu64,
i->msg_reader.read.offset);
}else{
message_ply_read_close(&i->msg_reader);
if (message_ply_read_open(&i->msg_reader, &metadata->ply.bundle_id, NULL)==-1){
if (message_ply_read_open(&i->msg_reader, &metadata->bundle_id, NULL)==-1){
i->metadata = NULL;
continue;
}
i->metadata = metadata;
}
}else{
WARNF("Failed to find metadata for %s", alloca_tohex(ack.binary, ack.binary_length));
}
i->ack_start = ack.start_offset;
@ -113,7 +115,7 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){
// can we read another message?
if (message_ply_is_open(&i->msg_reader)
&& i->msg_reader.read.offset > i->ack_start){
DEBUGF(meshmb, "Reading next incoming record from %u",
DEBUGF(meshmb, "Reading next incoming record from %"PRIu64,
i->msg_reader.read.offset);
if (message_ply_read_prev(&i->msg_reader)!=-1
&& i->msg_reader.read.offset >= i->ack_start)
@ -146,6 +148,9 @@ static int finish_ack_writing(struct meshmb_feeds *feeds){
{
struct overlay_buffer *b = ob_new();
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64,
ob_position(b), feeds->ack_writer.file_offset);
ret = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
ob_free(b);
}
@ -178,24 +183,27 @@ static int finish_ack_writing(struct meshmb_feeds *feeds){
static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader)
{
if (!metadata->ply.found){
if (!metadata->details.ply.found){
// get the current size from the db
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_uint64_retry(&retry, &metadata->ply.size,
if (sqlite_exec_uint64_retry(&retry, &metadata->details.ply.size,
"SELECT filesize FROM manifests WHERE id = ?",
RHIZOME_BID_T, &metadata->ply.bundle_id,
END) == SQLITE_ROW)
metadata->ply.found = 1;
RHIZOME_BID_T, &metadata->bundle_id,
END) == SQLITE_ROW){
metadata->details.ply.found = 1;
}
else
return -1;
}
DEBUGF(meshmb, "Size from %u to %u", metadata->size, metadata->ply.size);
if (metadata->size == metadata->ply.size)
DEBUGF(meshmb, "Size of %s from %"PRIu64" to %"PRIu64,
alloca_tohex_rhizome_bid_t(metadata->bundle_id),
metadata->size, metadata->details.ply.size);
if (metadata->size == metadata->details.ply.size)
return 0;
if (!message_ply_is_open(reader)
&& message_ply_read_open(reader, &metadata->ply.bundle_id, NULL)!=0)
&& message_ply_read_open(reader, &metadata->bundle_id, NULL)!=0)
return -1;
// TODO allow the user to specify an overridden name?
@ -203,7 +211,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
free((void*)metadata->details.name);
metadata->details.name = NULL;
}
metadata->details.author = reader->author;
metadata->details.ply.author = reader->author;
if (reader->name){
size_t len = strlen(reader->name);
@ -242,7 +250,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
}
}
DEBUGF(meshmb, "Last message from %u to %u", metadata->last_message_offset, last_offset);
DEBUGF(meshmb, "Last message from %"PRIu64" to %"PRIu64, metadata->last_message_offset, last_offset);
if (last_offset > metadata->last_message_offset){
// add an ack to our journal to thread new messages
if (!feeds->ack_manifest){
@ -283,11 +291,14 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
bzero(&ack, sizeof ack);
ack.start_offset = metadata->size;
ack.end_offset = metadata->ply.size;
ack.binary = metadata->ply.bundle_id.binary;
ack.end_offset = metadata->details.ply.size;
ack.binary = metadata->bundle_id.binary;
ack.binary_length = (metadata->tree_depth >> 3) + 3;
message_ply_append_ack(b, &ack);
assert(!ob_overrun(b));
DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64,
ob_position(b), feeds->ack_writer.file_offset);
int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
DEBUGF(meshmb, "Acked incoming messages");
ob_free(b);
@ -297,7 +308,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
}
metadata->last_message_offset = last_offset;
metadata->size = metadata->ply.size;
metadata->size = metadata->details.ply.size;
feeds->dirty=1;
return 1;
@ -320,11 +331,11 @@ int meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct
{
struct feed_metadata *metadata;
if (strcmp(m->service, RHIZOME_SERVICE_MESHMB) == 0
&& tree_find(&feeds->root, (void**)&metadata, m->keypair.public_key.binary, sizeof m->keypair.public_key.binary, NULL, NULL)==0){
&& tree_find(&feeds->root, (void**)&metadata, m->keypair.public_key.binary, sizeof m->keypair.public_key.binary, NULL, NULL)==TREE_FOUND){
metadata->ply.found = 1;
if (metadata->ply.size != m->filesize){
metadata->ply.size = m->filesize;
metadata->details.ply.found = 1;
if (metadata->details.ply.size != m->filesize){
metadata->details.ply.size = m->filesize;
if (update_stats(feeds, metadata, reader)==-1)
return -1;
}
@ -352,9 +363,9 @@ static int write_metadata(void **record, void *context)
uint8_t buffer[sizeof (rhizome_bid_t) + sizeof (sid_t) + 1 + 12*4 + name_len + msg_len];
size_t len = 0;
bcopy(metadata->ply.bundle_id.binary, &buffer[len], sizeof (rhizome_bid_t));
bcopy(metadata->bundle_id.binary, &buffer[len], sizeof (rhizome_bid_t));
len += sizeof (rhizome_bid_t);
bcopy(metadata->details.author.binary, &buffer[len], sizeof (sid_t));
bcopy(metadata->details.ply.author.binary, &buffer[len], sizeof (sid_t));
len += sizeof (sid_t);
buffer[len++]=0;// flags?
len+=pack_uint(&buffer[len], metadata->size);
@ -372,10 +383,10 @@ static int write_metadata(void **record, void *context)
buffer[len]=0;
len+=msg_len;
assert(len < sizeof buffer);
DEBUGF(meshmb, "Write %u bytes of metadata for %s/%s",
DEBUGF(meshmb, "Write %zu bytes of metadata for %s/%s",
len,
alloca_tohex_rhizome_bid_t(metadata->ply.bundle_id),
alloca_tohex_sid_t(metadata->details.author)
alloca_tohex_rhizome_bid_t(metadata->bundle_id),
alloca_tohex_sid_t(metadata->details.ply.author)
);
return rhizome_write_buffer(write, buffer, len);
}
@ -468,7 +479,10 @@ static void* alloc_feed(void *context, const uint8_t *binary, size_t UNUSED(bin_
struct meshmb_feeds *feeds = (struct meshmb_feeds *)context;
struct feed_metadata *feed = emalloc_zero(sizeof(struct feed_metadata));
if (feed){
feed->ply.bundle_id = *(rhizome_bid_t *)binary;
struct tree_record *tree = (struct tree_record *)feed;
assert(&tree->binary[0] == &feed->bundle_id.binary[0]);
feed->details.ply.bundle_id = *(rhizome_bid_t *)binary;
feed->bundle_id = *(rhizome_bid_t *)binary;
feeds->dirty = 1;
DEBUGF(meshmb, "Allocated feed");
}
@ -490,7 +504,7 @@ static int read_metadata(struct meshmb_feeds *feeds, struct rhizome_read *read)
while(1){
ssize_t bytes = rhizome_read_buffered(read, &buff, buffer, sizeof buffer);
if (bytes==0)
if (bytes<=0)
break;
uint64_t delta=0;
@ -545,7 +559,9 @@ static int read_metadata(struct meshmb_feeds *feeds, struct rhizome_read *read)
goto error;
}
read->offset += offset - bytes;
DEBUGF(meshmb, "Seeking backwards %"PRIu64", %u, %zu", read->offset, offset, bytes);
read->offset = (read->offset - bytes) + offset;
struct feed_metadata *result;
if (tree_find(&feeds->root, (void**)&result, bid->binary, sizeof *bid, alloc_feed, feeds)<0)
return WHY("Failed to allocate metadata");
@ -553,16 +569,15 @@ static int read_metadata(struct meshmb_feeds *feeds, struct rhizome_read *read)
result->last_message_offset = last_message_offset;
result->last_seen = last_seen;
result->size = size;
result->details.bundle_id = *bid;
result->details.author = *author;
result->details.ply.author = *author;
result->details.name = (name && *name) ? str_edup(name) : NULL;
result->details.last_message = (msg && *msg) ? str_edup(msg) : NULL;
result->details.timestamp = timestamp;
DEBUGF(meshmb, "Processed %u bytes of metadata for %s",
DEBUGF(meshmb, "Processed %u bytes of metadata for %s (%s)",
offset,
alloca_tohex_rhizome_bid_t(result->ply.bundle_id),
alloca_tohex_sid_t(result->details.author)
alloca_tohex_rhizome_bid_t(*bid),
alloca_tohex_sid_t(*author)
);
}
feeds->dirty = 0;
@ -698,7 +713,6 @@ int meshmb_send(const keyring_identity *id, const char *message, size_t message_
struct overlay_buffer *b = ob_new();
message_ply_append_message(b, message, message_len);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
keyring_identity_extract(id, &did, &name);
int ret = message_ply_append(id, RHIZOME_SERVICE_MESHMB, NULL, &ply, b, name, nassignments, assignments);

View File

@ -10,8 +10,7 @@ enum meshmb_send_status{
// details of a feed that you are following
struct meshmb_feed_details{
rhizome_bid_t bundle_id;
sid_t author;
struct message_ply ply;
const char *name;
const char *last_message;
time_s_t timestamp;

View File

@ -256,8 +256,8 @@ static int list_callback(struct meshmb_feed_details *details, void *context)
struct cli_enum_context *enum_context = context;
enum_context->rowcount++;
cli_put_long(enum_context->context, enum_context->rowcount, ":");
cli_put_string(enum_context->context, alloca_tohex_rhizome_bid_t(details->bundle_id), ":");
cli_put_string(enum_context->context, alloca_tohex_sid_t(details->author), ":");
cli_put_string(enum_context->context, alloca_tohex_rhizome_bid_t(details->ply.bundle_id), ":");
cli_put_string(enum_context->context, alloca_tohex_sid_t(details->ply.author), ":");
cli_put_string(enum_context->context, details->name, ":");
cli_put_long(enum_context->context, details->timestamp ? (long)(gettime() - details->timestamp) : (long)-1, ":");
cli_put_string(enum_context->context, details->last_message, "\n");

View File

@ -26,7 +26,7 @@ static int send_part_end(struct http_request *hr)
if (r->u.sendmsg.message.length == 0)
return http_response_form_part(r, 400, "Invalid (empty)", PART_MESSAGE, NULL, 0);
r->u.sendmsg.received_message = 1;
DEBUGF(httpd, "received %s = %s", PART_MESSAGE, alloca_toprint(-1, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length));
DEBUGF(meshmb, "received %s = %s", PART_MESSAGE, alloca_toprint(-1, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length));
} else
FATALF("current_part = %s", alloca_str_toprint(r->u.sendmsg.current_part));
r->u.sendmsg.current_part = NULL;
@ -165,6 +165,8 @@ static int strn_to_activity_token(const char *str, httpd_request *r, const char
&& **afterp=='/'){
(*afterp)++;
} else {
r->u.meshmb_feeds.end_ack_offset=0;
r->u.meshmb_feeds.end_msg_offset=0;
*afterp=str;
}
return 1;
@ -181,12 +183,12 @@ static int next_ply_message(httpd_request *r){
if (r->u.plylist.current_offset)
r->u.plylist.ply_reader.read.offset = r->u.plylist.current_offset;
DEBUGF(httpd, "Opened ply @%"PRIu64, r->u.plylist.ply_reader.read.offset);
DEBUGF(meshmb, "Opened ply @%"PRIu64, r->u.plylist.ply_reader.read.offset);
}
if (r->u.plylist.current_offset==0){
// enumerate everything from the top
DEBUGF(httpd, "Started reading @%"PRIu64, r->u.plylist.ply_reader.read.length);
DEBUGF(meshmb, "Started reading @%"PRIu64, r->u.plylist.ply_reader.read.length);
r->u.plylist.current_offset =
r->u.plylist.start_offset =
r->u.plylist.ply_reader.read.offset =
@ -196,7 +198,7 @@ static int next_ply_message(httpd_request *r){
while(message_ply_read_prev(&r->u.plylist.ply_reader) == 0){
r->u.plylist.current_offset = r->u.plylist.ply_reader.record_end_offset;
if (r->u.plylist.current_offset <= r->u.plylist.end_offset){
DEBUGF(httpd, "Hit end %"PRIu64" @%"PRIu64,
DEBUGF(meshmb, "Hit end %"PRIu64" @%"PRIu64,
r->u.plylist.end_offset, r->u.plylist.current_offset);
break;
}
@ -239,7 +241,7 @@ static int restful_meshmb_list_json_content_chunk(struct http_request *hr, strbu
"timestamp"
};
DEBUGF(httpd, "Phase %d", r->u.plylist.phase);
DEBUGF(meshmb, "Phase %d", r->u.plylist.phase);
switch (r->u.plylist.phase) {
case LIST_HEADER:
@ -535,9 +537,9 @@ static int restful_feedlist_enum(struct meshmb_feed_details *details, void *cont
if (state->request->u.meshmb_feeds.rowcount!=0)
strbuf_putc(state->buffer, ',');
strbuf_puts(state->buffer, "\n[");
strbuf_json_hex(state->buffer, details->bundle_id.binary, sizeof details->bundle_id.binary);
strbuf_json_hex(state->buffer, details->ply.bundle_id.binary, sizeof details->ply.bundle_id.binary);
strbuf_puts(state->buffer, ",");
strbuf_json_hex(state->buffer, details->author.binary, sizeof details->author.binary);
strbuf_json_hex(state->buffer, details->ply.author.binary, sizeof details->ply.author.binary);
strbuf_puts(state->buffer, ",");
strbuf_json_string(state->buffer, details->name);
strbuf_puts(state->buffer, ",");
@ -551,7 +553,7 @@ static int restful_feedlist_enum(struct meshmb_feed_details *details, void *cont
return 1;
}else{
++state->request->u.meshmb_feeds.rowcount;
state->request->u.meshmb_feeds.bundle_id = details->bundle_id;
state->request->u.meshmb_feeds.bundle_id = details->ply.bundle_id;
return 0;
}
}
@ -567,7 +569,7 @@ static int restful_meshmb_feedlist_json_content_chunk(struct http_request *hr, s
"last_message"
};
DEBUGF(httpd, "Phase %d", r->u.meshmb_feeds.phase);
DEBUGF(meshmb, "Phase %d", r->u.meshmb_feeds.phase);
switch (r->u.meshmb_feeds.phase) {
case LIST_HEADER:
@ -620,7 +622,7 @@ static void feedlist_on_rhizome_add(httpd_request *r, rhizome_manifest *m)
if (ret!=1)
return;
// TODO short timer? Syncing with a new neighbour might update a lot of subscribed feeds.
// TODO delay until resumed?
int gen = meshmb_flush(r->u.meshmb_feeds.session->feeds);
if (gen>=0 && gen != r->u.meshmb_feeds.generation){
if (r->u.meshmb_feeds.iterator){
@ -673,7 +675,7 @@ static void activity_test_end(httpd_request *r){
r->u.meshmb_feeds.phase = LIST_ROWS;
DEBUGF(httpd,"Iterator @ack %"PRIu64", @msg %"PRIu64,
DEBUGF(meshmb,"Iterator @ack %"PRIu64", @msg %"PRIu64,
r->u.meshmb_feeds.current_ack_offset,
r->u.meshmb_feeds.current_msg_offset);
return;
@ -722,6 +724,7 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s
httpd_request *r = (httpd_request *) hr;
const char *headers[] = {
".token",
"ack_offset",
"id",
"author",
"name",
@ -730,7 +733,7 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s
"message"
};
DEBUGF(httpd, "Phase %d", r->u.meshmb_feeds.phase);
DEBUGF(meshmb, "Phase %d", r->u.meshmb_feeds.phase);
switch (r->u.meshmb_feeds.phase) {
case LIST_HEADER:
@ -750,9 +753,11 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s
case LIST_ROWS:
case LIST_FIRST:
ROWS:
{
activity_iterator_open(r);
if (r->u.meshmb_feeds.phase == LIST_END)
return 1;
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
if (r->u.meshmb_feeds.rowcount!=0)
@ -760,6 +765,8 @@ ROWS:
strbuf_puts(b, "\n[\"");
activity_token_to_str(b, r);
strbuf_puts(b, "\",");
strbuf_sprintf(b, "%"PRIu64, iterator->ack_reader.record_end_offset);
strbuf_puts(b, ",");
strbuf_json_hex(b, iterator->msg_reader.bundle_id.binary, sizeof iterator->msg_reader.bundle_id.binary);
strbuf_puts(b, ",");
strbuf_json_hex(b, iterator->msg_reader.author.binary, sizeof iterator->msg_reader.author.binary);
@ -768,13 +775,13 @@ ROWS:
strbuf_puts(b, ",");
strbuf_sprintf(b, "%d", iterator->ack_timestamp);
strbuf_puts(b, ",");
strbuf_sprintf(b, "%lu", iterator->msg_reader.record_end_offset);
strbuf_sprintf(b, "%"PRIu64, iterator->msg_reader.record_end_offset);
strbuf_puts(b, ",");
strbuf_json_string(b, (const char *)iterator->msg_reader.record);
strbuf_puts(b, "]");
if (!strbuf_overrun(b)){
r->u.meshmb_feeds.rowcount++;
DEBUGF(httpd, "Wrote record %u (%s)", r->u.meshmb_feeds.rowcount, (const char *)iterator->msg_reader.record);
DEBUGF(meshmb, "Wrote record %u (%s)", r->u.meshmb_feeds.rowcount, (const char *)iterator->msg_reader.record);
activity_next(r);
}
return 1;
@ -793,14 +800,14 @@ ROWS:
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
if (iterator && iterator->ack_reader.read.length > r->u.meshmb_feeds.start_ack_offset){
DEBUGF(httpd, "Seeking back to ack %"PRIu64", msg (0) to resume now", iterator->ack_reader.read.length);
DEBUGF(meshmb, "Seeking back to ack %"PRIu64", msg (0) to resume now", iterator->ack_reader.read.length);
r->u.meshmb_feeds.start_ack_offset = iterator->ack_reader.read.length;
meshmb_activity_seek(iterator, r->u.meshmb_feeds.start_ack_offset, 0);
r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset;
r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset;
if (iterator->msg_reader.type != MESSAGE_BLOCK_TYPE_MESSAGE)
activity_next(r);
goto ROWS;
return 1;
}
r->u.meshmb_feeds.start_ack_offset = 0;
@ -846,6 +853,8 @@ static int restful_meshmb_activity(httpd_request *r, const char *remainder)
r->u.meshmb_feeds.session = session;
r->u.meshmb_feeds.iterator = NULL;
r->u.meshmb_feeds.generation = meshmb_flush(session->feeds);
r->u.meshmb_feeds.current_ack_offset = 0;
r->u.meshmb_feeds.current_msg_offset = 0;
bzero(&r->u.meshmb_feeds.bundle_id, sizeof r->u.meshmb_feeds.bundle_id);
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshmb_activity_json_content);

View File

@ -157,8 +157,12 @@ int message_ply_append(const keyring_identity *id, const char *service, const si
struct message_ply_write write;
int ret=-1;
assert(!ob_overrun(b));
if (message_ply_write_open(&write, id, service, recipient, ply, name, nassignments, assignments, 0) == -1)
goto end;
DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64,
ob_position(b), write.write.written_offset);
if (rhizome_write_buffer(&write.write, ob_ptr(b), ob_position(b)) == -1)
goto end;
ret = message_ply_write_finish(&write);
@ -249,7 +253,8 @@ int message_ply_read_prev(struct message_ply_read *ply)
ply->type = r & 0xF;
ply->record_length = r >> 4;
}
DEBUGF2(meshms, meshmb, "Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
DEBUGF2(meshms, meshmb, "Found record %d, length %d @%"PRId64" - @%"PRId64,
ply->type, ply->record_length, ply->record_end_offset - (ply->record_length + sizeof footer), ply->record_end_offset);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length + sizeof footer > ply->read.offset){
DEBUGF2(meshms, meshmb, "EOF");