Restart announcements when new bundles added, use more efficient sql, remember position per interface

This commit is contained in:
Jeremy Lakeman 2013-01-09 14:47:28 +10:30
parent c155ebeef2
commit 6de54bc0f1

View File

@ -132,8 +132,12 @@ unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar)
return bidprefix;
}
int bundles_available=-1;
int bundle_offset[2]={0,0};
struct advertisement_state{
long long bundles_available;
int64_t bundle_last_rowid[2];
};
struct advertisement_state advert_state[OVERLAY_MAX_INTERFACES];
int overlay_rhizome_add_advertisements(struct decode_context *context, int interface_number, struct overlay_buffer *e)
{
IN();
@ -148,17 +152,9 @@ int overlay_rhizome_add_advertisements(struct decode_context *context, int inter
if (!is_rhizome_advertise_enabled())
RETURN(0);
struct advertisement_state *state=&advert_state[interface_number];
int pass;
int bytes=e->sizeLimit-e->position;
int overhead=1+11+1+2+2; /* maximum overhead */
int slots=(bytes-overhead)/RHIZOME_BAR_BYTES;
if (slots>30) slots=30;
int bundles_advertised=0;
if (slots<1) { RETURN(WHY("No room for node advertisements")); }
/* Randomly choose whether to advertise manifests or BARs first. */
int skipmanifests=random()&1;
/* XXX Should add priority bundles here.
XXX Should prioritise bundles for subscribed groups, Serval-authorised files
@ -184,17 +180,24 @@ int overlay_rhizome_add_advertisements(struct decode_context *context, int inter
sqlite_set_tracefunc(oldfunc);
RETURN(WHY("Could not count BARs for advertisement"));
}
bundles_available = (int) tmp;
if (bundles_available==-1||(bundle_offset[0]>=bundles_available))
bundle_offset[0]=0;
if (bundles_available==-1||(bundle_offset[1]>=bundles_available))
bundle_offset[1]=0;
if (state->bundles_available!=tmp){
state->bundle_last_rowid[0]=INT64_MAX;
state->bundle_last_rowid[1]=INT64_MAX;
}
state->bundles_available = tmp;
if (tmp<1)
RETURN(0);
/* Randomly choose whether to advertise manifests or BARs first. */
int skipmanifests=random()&1;
if (config.debug.rhizome_ads)
DEBUGF("%d bundles in database (%d %d), slots=%d.",bundles_available,
bundle_offset[0],bundle_offset[1],slots);
DEBUGF("%lld bundles in database, starting from %lld, %lld.",state->bundles_available,
state->bundle_last_rowid[0], state->bundle_last_rowid[1]);
sqlite3_stmt *statement=NULL;
sqlite3_blob *blob=NULL;
ob_checkpoint(e);
@ -226,10 +229,12 @@ int overlay_rhizome_add_advertisements(struct decode_context *context, int inter
ob_checkpoint(e);
switch(pass) {
case 0: /* Full manifests */
statement = sqlite_prepare(&retry, "SELECT MANIFEST,ROWID FROM MANIFESTS LIMIT %d,%d", bundle_offset[pass], slots);
statement = sqlite_prepare(&retry, "SELECT MANIFEST,ROWID,ID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 30",
state->bundle_last_rowid[pass]);
break;
case 1: /* BARs */
statement = sqlite_prepare(&retry, "SELECT BAR,ROWID FROM MANIFESTS LIMIT %d,%d", bundle_offset[pass], slots);
statement = sqlite_prepare(&retry, "SELECT BAR,ROWID,ID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 30",
state->bundle_last_rowid[pass]);
break;
}
if (!statement) {
@ -237,82 +242,62 @@ int overlay_rhizome_add_advertisements(struct decode_context *context, int inter
WHY("Could not prepare sql statement for fetching BARs for advertisement");
goto stopStuffing;
}
int count=0;
while( sqlite_step_retry(&retry, statement) == SQLITE_ROW
&& e->position+RHIZOME_BAR_BYTES<=e->sizeLimit
) {
int column_type=sqlite3_column_type(statement, 0);
switch(column_type) {
case SQLITE_BLOB:
if (blob)
sqlite3_blob_close(blob);
blob = NULL;
int ret;
int64_t rowid = sqlite3_column_int64(statement, 1);
do ret = sqlite3_blob_open(rhizome_db, "main", "manifests", pass?"bar":"manifest", rowid, 0 /* read only */, &blob);
while (sqlite_code_busy(ret) && sqlite_retry(&retry, "sqlite3_blob_open"));
if (!sqlite_code_ok(ret)) {
WHYF("sqlite3_blob_open() failed, %s", sqlite3_errmsg(rhizome_db));
continue;
}
sqlite_retry_done(&retry, "sqlite3_blob_open");
while(sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
count++;
if (sqlite3_column_type(statement, 0)!=SQLITE_BLOB)
continue;
int blob_bytes=sqlite3_blob_bytes(blob);
if (pass&&(blob_bytes!=RHIZOME_BAR_BYTES)) {
if (config.debug.rhizome_ads)
DEBUG("Found a BAR that is the wrong size - ignoring");
sqlite3_blob_close(blob);
blob=NULL;
continue;
}
const void *data = sqlite3_column_blob(statement, 0);
int blob_bytes = sqlite3_column_bytes(statement, 0);
int64_t rowid = sqlite3_column_int64(statement, 1);
const unsigned char *manifestId = sqlite3_column_text(statement, 2);
/* Only include manifests that are <=1KB inline.
Longer ones are only advertised by BAR */
if (blob_bytes>1024) {
WARN("ignoring manifest > 1k");
sqlite3_blob_close(blob);
blob = NULL;
bundle_offset[pass]++;
continue;
}
if (config.debug.rhizome_ads)
DEBUGF("Considering manifest %s", manifestId);
int overhead=0;
if (!pass) overhead=2;
/* make sure there's enough room for the blob, its length,
the 0xFF end marker and 1 spare for the rfs length to increase */
if (ob_makespace(e,overhead+blob_bytes+2))
goto stopStuffing;
if (!pass) {
/* include manifest length field */
ob_append_ui16(e, blob_bytes);
}
unsigned char *dest=ob_append_space(e, blob_bytes);
if (!dest){
WHY("Reading blob will overflow overlay_buffer");
goto stopStuffing;
}
if (sqlite3_blob_read(blob,dest,blob_bytes,0) != SQLITE_OK) {
WHYF("sqlite3_blob_read() failed, %s", sqlite3_errmsg(rhizome_db));
goto stopStuffing;
}
bundles_advertised++;
bundle_offset[pass]++;
sqlite3_blob_close(blob);
blob=NULL;
ob_checkpoint(e);
if (pass&&(blob_bytes!=RHIZOME_BAR_BYTES)) {
if (config.debug.rhizome_ads)
DEBUG("Found a BAR that is the wrong size - ignoring");
continue;
}
/* Only include manifests that are <=1KB inline.
Longer ones are only advertised by BAR */
if (blob_bytes>1024) {
WARN("ignoring manifest > 1k");
continue;
}
int overhead=(!pass)?2:0;
/* make sure there's enough room for the blob, its length,
the 0xFF end marker and 1 spare for the rfs length to increase */
if (ob_makespace(e,overhead+blob_bytes+1))
goto stopStuffing;
if (!pass) {
/* include manifest length field */
ob_append_ui16(e, blob_bytes);
}
if (ob_append_bytes(e, (unsigned char *)data, blob_bytes)){
WHY("Failed to append data into buffer");
goto stopStuffing;
}
state->bundle_last_rowid[pass]=rowid;
ob_checkpoint(e);
}
if (count<30){
// if we hit the end of the cursor, before the end of the packey buffer, restart next time at the beginning.
state->bundle_last_rowid[pass]=INT64_MAX;
}
stopStuffing:
if (blob)
sqlite3_blob_close(blob);
blob = NULL;
if (statement)
sqlite3_finalize(statement);
statement = NULL;