mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-02-20 09:26:37 +00:00
Improve debug info for rhizome sync
This commit is contained in:
parent
75d488b339
commit
31dbfa4e58
@ -256,6 +256,7 @@ ATOM(bool_t, rhizome, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_manifest, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_sql_bind, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_store, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_sync, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_tx, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_rx, 0, boolean,, "")
|
||||
ATOM(bool_t, rhizome_ads, 0, boolean,, "")
|
||||
|
@ -610,6 +610,7 @@ void rhizome_list_release(struct rhizome_list_cursor *);
|
||||
|
||||
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct socket_address *addr, const struct subscriber *peer);
|
||||
rhizome_manifest * rhizome_fetch_search(const unsigned char *id, int prefix_length);
|
||||
int rhizome_fetch_bar_queued(const rhizome_bar_t *bar);
|
||||
|
||||
/* Rhizome file storage api */
|
||||
struct rhizome_write_buffer
|
||||
@ -835,7 +836,7 @@ int rhizome_cache_close();
|
||||
int rhizome_database_filehash_from_id(const rhizome_bid_t *bidp, uint64_t version, rhizome_filehash_t *hashp);
|
||||
|
||||
int overlay_mdp_service_rhizome_sync(struct internal_mdp_header *header, struct overlay_buffer *payload);
|
||||
int rhizome_sync_bundle_inserted(const unsigned char *bar);
|
||||
void rhizome_sync_status();
|
||||
|
||||
DECLARE_ALARM(rhizome_fetch_status);
|
||||
#endif //__SERVAL_DNA__RHIZOME_H
|
||||
|
@ -155,8 +155,10 @@ static const char * fetch_state(int state)
|
||||
DEFINE_ALARM(rhizome_fetch_status);
|
||||
void rhizome_fetch_status(struct sched_ent *alarm)
|
||||
{
|
||||
if (!config.debug.rhizome)
|
||||
return;
|
||||
|
||||
unsigned i;
|
||||
DEBUGF("==== Fetch status");
|
||||
for(i=0;i<NQUEUES;i++){
|
||||
struct rhizome_fetch_queue *q=&rhizome_fetch_queues[i];
|
||||
unsigned candidates=0;
|
||||
@ -177,7 +179,7 @@ void rhizome_fetch_status(struct sched_ent *alarm)
|
||||
q->active.state==RHIZOME_FETCH_FREE?0:q->active.write_state.file_offset,
|
||||
q->active.manifest?q->active.manifest->filesize:0);
|
||||
}
|
||||
|
||||
rhizome_sync_status();
|
||||
time_ms_t now = gettime_ms();
|
||||
RESCHEDULE(alarm, now + 3000, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
|
||||
}
|
||||
@ -295,6 +297,17 @@ rhizome_manifest * rhizome_fetch_search(const unsigned char *id, int prefix_leng
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int rhizome_fetch_bar_queued(const rhizome_bar_t *bar)
|
||||
{
|
||||
const uint8_t *prefix = rhizome_bar_prefix(bar);
|
||||
uint64_t version = rhizome_bar_version(bar);
|
||||
|
||||
rhizome_manifest *m=rhizome_fetch_search(prefix, RHIZOME_BAR_PREFIX_BYTES);
|
||||
if (m && m->version >= version)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Insert a candidate into a given queue at a given position. All candidates succeeding the given
|
||||
* position are copied backward in the queue to open up an empty element at the given position. If
|
||||
* the queue was full, then the tail element is discarded, freeing the manifest it points to.
|
||||
|
@ -311,10 +311,8 @@ next:
|
||||
if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1)
|
||||
continue;
|
||||
|
||||
uint64_t version = rhizome_bar_version(bar);
|
||||
// are we already fetching this bundle [or later]?
|
||||
rhizome_manifest *m=rhizome_fetch_search(rhizome_bar_prefix(bar), RHIZOME_BAR_PREFIX_BYTES);
|
||||
if (m && m->version >= version)
|
||||
if (rhizome_fetch_bar_queued(bar))
|
||||
continue;
|
||||
|
||||
bar_count++;
|
||||
|
@ -77,6 +77,27 @@ void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber)
|
||||
state->bars_skipped);
|
||||
}
|
||||
|
||||
static int sync_status(struct subscriber *subscriber, void *UNUSED(context))
|
||||
{
|
||||
if (!subscriber->sync_state)
|
||||
return 0;
|
||||
struct rhizome_sync *state=subscriber->sync_state;
|
||||
DEBUGF("%s seen %u BARs [%"PRId64" to %"PRId64" of %"PRId64"], %d interesting, %d skipped",
|
||||
alloca_tohex_sid_t(subscriber->sid),
|
||||
state->bars_seen,
|
||||
state->sync_start,
|
||||
state->sync_end,
|
||||
state->highest_seen,
|
||||
state->bar_count,
|
||||
state->bars_skipped);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void rhizome_sync_status()
|
||||
{
|
||||
enum_subscribers(NULL, sync_status, NULL);
|
||||
}
|
||||
|
||||
static void rhizome_sync_request(struct subscriber *subscriber, uint64_t token, unsigned char forwards)
|
||||
{
|
||||
struct internal_mdp_header header;
|
||||
@ -93,7 +114,7 @@ static void rhizome_sync_request(struct subscriber *subscriber, uint64_t token,
|
||||
ob_append_byte(b, forwards);
|
||||
ob_append_packed_ui64(b, token);
|
||||
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Sending request to %s for BARs from %"PRIu64" %s", alloca_tohex_sid_t(subscriber->sid), token, forwards?"forwards":"backwards");
|
||||
|
||||
ob_flip(b);
|
||||
@ -124,12 +145,9 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
|
||||
unsigned char log2_size = rhizome_bar_log_size(&state->bars[i].bar);
|
||||
if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1)
|
||||
continue;
|
||||
|
||||
uint64_t version = rhizome_bar_version(&state->bars[i].bar);
|
||||
// are we already fetching this bundle [or later]?
|
||||
rhizome_manifest *m=rhizome_fetch_search(prefix, RHIZOME_BAR_PREFIX_BYTES);
|
||||
if (m && m->version >= version){
|
||||
state->bars[i].next_request = now+5000;
|
||||
|
||||
if (rhizome_fetch_bar_queued(&state->bars[i].bar)){
|
||||
state->bars[i].next_request = now+2000;
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -146,7 +164,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
|
||||
if (ob_remaining(payload)<RHIZOME_BAR_BYTES)
|
||||
break;
|
||||
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Requesting manifest for BAR %s", alloca_tohex_rhizome_bar_t(&state->bars[i].bar));
|
||||
|
||||
ob_append_bytes(payload, state->bars[i].bar.binary, RHIZOME_BAR_BYTES);
|
||||
@ -155,7 +173,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
|
||||
state->bars[i].next_request = now+5000;
|
||||
if (!state->bars[i].tries){
|
||||
// remove this BAR and shift the last BAR down to this position if required.
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Giving up on fetching BAR %s", alloca_tohex_rhizome_bar_t(&state->bars[i].bar));
|
||||
state->bar_count --;
|
||||
if (i<state->bar_count)
|
||||
@ -192,7 +210,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
|
||||
}else if(!state->sync_complete){
|
||||
state->sync_complete = 1;
|
||||
state->completed = gettime_ms();
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("BAR sync with %s complete", alloca_tohex_sid_t(subscriber->sid));
|
||||
}
|
||||
state->next_request = now+5000;
|
||||
@ -216,7 +234,7 @@ static int sync_bundle_inserted(struct subscriber *subscriber, void *context)
|
||||
uint64_t this_version = rhizome_bar_version(this_bar);
|
||||
if (memcmp(this_id, id, RHIZOME_BAR_PREFIX_BYTES)==0 && version >= this_version){
|
||||
// remove this BAR and shift the last BAR down to this position if required.
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Removing BAR %s from queue", alloca_tohex_rhizome_bar_t(this_bar));
|
||||
state->bar_count --;
|
||||
if (i<state->bar_count)
|
||||
@ -231,7 +249,7 @@ static int sync_bundle_inserted(struct subscriber *subscriber, void *context)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rhizome_sync_bundle_inserted(const unsigned char *bar)
|
||||
static int rhizome_sync_bundle_inserted(const rhizome_bar_t *bar)
|
||||
{
|
||||
enum_subscribers(NULL, sync_bundle_inserted, (void *)bar);
|
||||
return 0;
|
||||
@ -250,6 +268,9 @@ static int sync_cache_bar(struct rhizome_sync *state, const rhizome_bar_t *bar,
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Remembering BAR %s", alloca_tohex_rhizome_bar_t(bar));
|
||||
|
||||
state->bars[state->bar_count].bar = *bar;
|
||||
state->bars[state->bar_count].next_request = gettime_ms();
|
||||
state->bars[state->bar_count].tries = MAX_TRIES;
|
||||
@ -330,7 +351,7 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
|
||||
|
||||
if (bar_count>0 && state->sync_end == 0 && bar_tokens[0]>=bar_tokens[bar_count -1]){
|
||||
// make sure we start syncing from the end
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Starting BAR sync with %s", alloca_tohex_sid_t(subscriber->sid));
|
||||
state->sync_start = state->sync_end = state->highest_seen;
|
||||
mid_point=0;
|
||||
@ -359,7 +380,7 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
|
||||
if (r==1)
|
||||
added=1;
|
||||
}
|
||||
if (config.debug.rhizome)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Synced %"PRIu64" - %"PRIu64" with %s", state->sync_start, state->sync_end, alloca_tohex_sid_t(subscriber->sid));
|
||||
if (added)
|
||||
state->next_request = gettime_ms();
|
||||
@ -430,7 +451,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
|
||||
|
||||
if (rowid>max_token){
|
||||
// a new bundle has been imported
|
||||
rhizome_sync_bundle_inserted(bar);
|
||||
rhizome_sync_bundle_inserted((const rhizome_bar_t *)bar);
|
||||
}
|
||||
|
||||
if (count < max_count){
|
||||
@ -479,7 +500,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
|
||||
sqlite3_finalize(statement);
|
||||
|
||||
if (count){
|
||||
if (config.debug.rhizome_ads)
|
||||
if (config.debug.rhizome_sync)
|
||||
DEBUGF("Sending %d BARs from %"PRIu64" to %"PRIu64, count, token, last);
|
||||
ob_flip(b);
|
||||
overlay_send_frame(&header, b);
|
||||
|
2
server.c
2
server.c
@ -393,6 +393,8 @@ void cf_on_config_change()
|
||||
if (config.rhizome.enable){
|
||||
rhizome_opendb();
|
||||
RESCHEDULE(&ALARM_STRUCT(rhizome_clean_db), now + 30*60*1000, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
|
||||
if (config.debug.rhizome)
|
||||
RESCHEDULE(&ALARM_STRUCT(rhizome_fetch_status), now + 3000, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
|
||||
}else if(rhizome_db){
|
||||
rhizome_close_db();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user