Rework detection of rhizome add from cli to trigger bundle_add

This commit is contained in:
Jeremy Lakeman 2016-03-08 13:33:23 +10:30
parent 49ec0f61c4
commit edc1658cfd
4 changed files with 47 additions and 17 deletions

View File

@ -614,7 +614,7 @@ static int monitor_help(const struct cli_parsed *parsed, struct cli_context *con
return 0;
}
int monitor_announce_bundle(rhizome_manifest *m)
static void monitor_announce_bundle(rhizome_manifest *m)
{
char msg[1024];
int len = snprintf(msg,1024,"\n*%zd:BUNDLE:%s\n",
@ -624,8 +624,8 @@ int monitor_announce_bundle(rhizome_manifest *m)
len+=m->manifest_all_bytes;
msg[len++]='\n';
monitor_tell_clients(msg, len, MONITOR_RHIZOME);
return 0;
}
DEFINE_TRIGGER(bundle_add, monitor_announce_bundle);
int monitor_announce_peer(const sid_t *sidp)
{

View File

@ -648,7 +648,6 @@ int rhizome_delete_file(const rhizome_filehash_t *hashp);
#define RHIZOME_VERIFY 1
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_announce_bundle(rhizome_manifest *m);
enum rhizome_secret_disposition {
FOUND_RHIZOME_SECRET = 0,
IDENTITY_NOT_FOUND,

View File

@ -1362,11 +1362,6 @@ int rhizome_store_manifest(rhizome_manifest *m)
m->version
);
CALL_TRIGGER(bundle_add, m);
monitor_announce_bundle(m);
if (serverMode){
time_ms_t now = gettime_ms();
RESCHEDULE(&ALARM_STRUCT(rhizome_sync_announce), now, now, TIME_MS_NEVER_WILL);
}
return 0;
}
rollback:

View File

@ -65,6 +65,8 @@ struct rhizome_sync
int bar_count;
};
static uint64_t max_token=0;
DEFINE_ALARM(rhizome_sync_announce);
void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber)
@ -248,8 +250,45 @@ static int sync_bundle_inserted(struct subscriber *subscriber, void *context)
return 0;
}
static void annouce_cli_bundle_add(uint64_t row_id)
{
if (row_id<=max_token)
return;
if (max_token!=0){
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT manifest FROM manifests WHERE rowid > ? AND rowid <= ? ORDER BY rowid ASC;",
INT64, max_token, INT64, row_id, END);
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
const void *blob = sqlite3_column_blob(statement, 0);
size_t blob_length = sqlite3_column_bytes(statement, 0);
rhizome_manifest *m = rhizome_new_manifest();
if (m) {
memcpy(m->manifestdata, blob, blob_length);
m->manifest_all_bytes = blob_length;
if ( rhizome_manifest_parse(m) != -1
&& rhizome_manifest_validate(m)
&& rhizome_manifest_verify(m)
) {
assert(m->finalised);
CALL_TRIGGER(bundle_add, m);
}
rhizome_manifest_free(m);
}
}
sqlite3_finalize(statement);
}
max_token = row_id;
}
static void rhizome_sync_bundle_inserted(rhizome_manifest *m)
{
annouce_cli_bundle_add(m->rowid - 1);
if (m->rowid > max_token)
max_token = m->rowid;
rhizome_bar_t bar;
rhizome_manifest_to_bar(m, &bar);
enum_subscribers(NULL, sync_bundle_inserted, (void *)&bar);
@ -406,7 +445,6 @@ static void append_response(struct overlay_buffer *b, uint64_t token, const unsi
}
}
static uint64_t max_token=0;
static void sync_send_response(struct subscriber *dest, int forwards, uint64_t token, int max_count)
{
IN();
@ -458,10 +496,6 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
if (bar_size != RHIZOME_BAR_BYTES)
continue;
// TODO call trigger bundle_add??
if (rowid>max_token)
enum_subscribers(NULL, sync_bundle_inserted, (void *)bar);
if (count < max_count){
// make sure we include the exact rowid that was requested, even if we just deleted / replaced the manifest
if (count==0 && rowid!=token){
@ -490,8 +524,12 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
break;
}
if (token != HEAD_FLAG && token > max_token)
max_token = token;
sqlite3_finalize(statement);
if (token != HEAD_FLAG && token > max_token){
// report bundles added by cli
annouce_cli_bundle_add(token);
}
// send a zero lower bound if we reached the end of our manifest list
if (count && count < max_count && !forwards){
@ -505,8 +543,6 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
}
}
sqlite3_finalize(statement);
if (count){
DEBUGF(rhizome_sync, "Sending %d BARs from %"PRIu64" to %"PRIu64, count, token, last);
ob_flip(b);