diff --git a/rhizome_sync.c b/rhizome_sync.c index ffb78c15..7ce46e33 100644 --- a/rhizome_sync.c +++ b/rhizome_sync.c @@ -132,7 +132,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi if (state->bar_count >= CACHE_BARS) return; - if (state->next_request<=gettime_ms()){ + if (state->next_request<=now){ if (state->sync_end < state->highest_seen){ rhizome_sync_request(subscriber, state->sync_end, 1); }else if(state->sync_start >0){ @@ -142,7 +142,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi if (config.debug.rhizome) DEBUGF("BAR sync with %s complete", alloca_tohex_sid(subscriber->sid)); } - state->next_request = gettime_ms()+500; + state->next_request = now+5000; } } @@ -181,18 +181,26 @@ int rhizome_sync_bundle_inserted(const unsigned char *bar) return 0; } -static void sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64_t token) +static int sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64_t token) { + int ret=0; + // check the database before adding the BAR to the list if (token!=0 && rhizome_is_bar_interesting(bar)!=0){ bcopy(bar, state->bars[state->bar_count].bar, RHIZOME_BAR_BYTES); state->bars[state->bar_count].next_request = gettime_ms(); state->bar_count++; + ret=1; } - if (state->sync_end < token) + if (state->sync_end < token){ state->sync_end = token; - if (state->sync_start > token) + ret=1; + } + if (state->sync_start > token){ state->sync_start = token; + ret=1; + } + return ret; } static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_sync *state, struct overlay_buffer *b) @@ -255,20 +263,25 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_ // extend the set of BARs we have synced from this peer // we require the list of BARs to be either ASC or DESC and include BARs for *all* manifests in that range // TODO stop if we are taking too much CPU time. - for (i=mid_point; ibar_count < BARS_PER_RESPONSE; i++) - sync_cache_bar(state, bars[i], bar_tokens[i]); - for (i=mid_point -1; i>=0 && state->bar_count < BARS_PER_RESPONSE; i--) - sync_cache_bar(state, bars[i], bar_tokens[i]); + int added=0; + for (i=mid_point; ibar_count < BARS_PER_RESPONSE; i++){ + if (sync_cache_bar(state, bars[i], bar_tokens[i])) + added=1; + } + for (i=mid_point -1; i>=0 && state->bar_count < BARS_PER_RESPONSE; i--){ + if (sync_cache_bar(state, bars[i], bar_tokens[i])) + added=1; + } if (config.debug.rhizome) DEBUGF("Synced %llu - %llu with %s", state->sync_start, state->sync_end, alloca_tohex_sid(subscriber->sid)); - state->next_request = gettime_ms(); + if (added) + state->next_request = gettime_ms(); } } static int append_response(struct overlay_buffer *b, uint64_t token, const unsigned char *bar) { - ob_checkpoint(b); if (ob_append_packed_ui64(b, token)) return -1; if (bar){ @@ -280,6 +293,7 @@ static int append_response(struct overlay_buffer *b, uint64_t token, const unsig return -1; bzero(ptr, RHIZOME_BAR_BYTES); } + ob_checkpoint(b); return 0; } @@ -310,6 +324,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload)); ob_append_byte(b, MSG_TYPE_BARS); + ob_checkpoint(b); sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite3_stmt *statement; @@ -343,9 +358,12 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t // make sure we include the exact rowid that was requested, even if we just deleted / replaced the manifest if (count==0 && rowid!=token){ if (token!=HEAD_FLAG){ - append_response(b, token, NULL); - count++; - last = token; + if (append_response(b, token, NULL)) + ob_rewind(b); + else{ + count++; + last = token; + } }else token = rowid; } @@ -367,9 +385,12 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t // send a zero lower bound if we reached the end of our manifest list if (count && count < BARS_PER_RESPONSE && !forwards){ - append_response(b, 0, NULL); - last = 0; - count++; + if (append_response(b, 0, NULL)) + ob_rewind(b); + else { + last = 0; + count++; + } } sqlite3_finalize(statement);