Only advertise BAR's on an alarm

- we no longer hit the database for every outgoing packet, attempting to announce bundles
- we no longer advertise manifests periodically
- when an interesting bar arrives, we ask for the manifest to be announced, which uses the existing packet format
This commit is contained in:
Jeremy Lakeman 2013-02-12 10:34:04 +10:30
parent c7caec488a
commit ba1800012e
10 changed files with 349 additions and 329 deletions

View File

@ -137,6 +137,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MDP_PORT_RHIZOME_REQUEST 13
#define MDP_PORT_RHIZOME_RESPONSE 14
#define MDP_PORT_DIRECTORY 15
#define MDP_PORT_RHIZOME_MANIFEST_REQUEST 16
#define MDP_PORT_NOREPLY 0x3f
#define MDP_TYPE_MASK 0xff

View File

@ -150,6 +150,9 @@ schedule(&_sched_##X); }
/* Periodically update route table. */
SCHEDULE(overlay_route_tick, 100, 100);
/* Periodically advertise bundles */
SCHEDULE(overlay_rhizome_advertise, 1000, 10000);
/* Show CPU usage stats periodically */
if (config.debug.timing){
SCHEDULE(fd_periodicstats, 3000, 500);

View File

@ -528,6 +528,7 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG
case MDP_PORT_DNALOOKUP:
case MDP_PORT_RHIZOME_RESPONSE:
case MDP_PORT_RHIZOME_REQUEST:
case MDP_PORT_RHIZOME_MANIFEST_REQUEST:
case MDP_PORT_PROBE:
case MDP_PORT_STUNREQ:
case MDP_PORT_STUN:

View File

@ -300,6 +300,27 @@ int overlay_mdp_service_echo(overlay_mdp_frame *mdp)
RETURN(0);
}
static int overlay_mdp_service_manifest_response(overlay_mdp_frame *mdp){
int offset=0;
char id_hex[RHIZOME_MANIFEST_ID_STRLEN];
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return WHY("Unable to allocate manifest");
while (offset<mdp->out.payload_length){
unsigned char *bar=&mdp->out.payload[offset];
tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
strcat(id_hex, "%");
if (!rhizome_retrieve_manifest(id_hex, m)){
rhizome_advertise_manifest(m);
}
offset+=RHIZOME_BAR_BYTES;
}
rhizome_manifest_free(m);
return 0;
}
int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp)
{
IN();
@ -316,6 +337,7 @@ int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp)
RETURN(overlay_mdp_service_rhizomerequest(mdp));
} else break;
case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp));
case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_response(mdp));
}
/* Unbound socket. We won't be sending ICMP style connection refused

View File

@ -41,7 +41,6 @@ struct outgoing_packet{
overlay_interface *interface;
int i;
struct subscriber *unicast_subscriber;
int add_advertisements;
struct sockaddr_in dest;
int header_length;
struct overlay_buffer *buffer;
@ -240,7 +239,6 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati
packet->i = (interface - overlay_interfaces);
packet->dest=addr;
packet->buffer=ob_new();
packet->add_advertisements=1;
if (unicast)
packet->unicast_subscriber = destination;
ob_limitsize(packet->buffer, packet->interface->mtu);
@ -452,10 +450,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
goto skip;
}
// don't send rhizome adverts if the packet contains a voice payload
if (frame->queue==OQ_ISOCHRONOUS_VOICE)
packet->add_advertisements=0;
sent:
if (config.debug.overlayframes){
DEBUGF("Sent payload type %x len %d for %s via %s", frame->type, ob_position(frame->payload),
@ -521,10 +515,6 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
if(packet->buffer){
if (ob_position(packet->buffer) > packet->header_length){
// stuff rhizome announcements at the last moment
if (packet->add_advertisements)
overlay_rhizome_add_advertisements(&packet->context, packet->i,packet->buffer);
if (config.debug.packetconstruction)
ob_dump(packet->buffer,"assembled packet");

View File

@ -306,12 +306,14 @@ int rhizome_manifest_extract_signature(rhizome_manifest *m,int *ofs);
int rhizome_update_file_priority(const char *fileid);
int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found, int check_author);
int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar);
long long rhizome_bar_version(unsigned char *bar);
int64_t rhizome_bar_version(unsigned char *bar);
unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar);
int rhizome_is_bar_interesting(unsigned char *bar);
int rhizome_list_manifests(const char *service, const char *name,
const char *sender_sid, const char *recipient_sid,
int limit, int offset, char count_rows);
int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest *m);
int rhizome_advertise_manifest(rhizome_manifest *m);
#define RHIZOME_DONTVERIFY 0
#define RHIZOME_VERIFY 1
@ -349,8 +351,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk,
int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk,
const unsigned char *pk);
int rhizome_find_bundle_author(rhizome_manifest *m);
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout);
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
int rhizome_queue_ignore_manifest(unsigned char *bid_prefix, int prefix_len, int timeout);
int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len);
/* one manifest is required per candidate, plus a few spare.
so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES.

View File

@ -287,7 +287,6 @@ int rhizome_opendb()
verify_bundles();
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "PRAGMA user_version=2;");
}
// TODO recreate tables with collate nocase on hex columns
/* Future schema updates should be performed here.
@ -843,22 +842,22 @@ int rhizome_store_bundle(rhizome_manifest *m)
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") != SQLITE_OK)
return WHY("Failed to begin transaction");
sqlite3_stmt *stmt;
if ((stmt = sqlite_prepare(&retry, "INSERT OR REPLACE INTO MANIFESTS(id,manifest,version,inserttime,bar,filesize,filehash,author,service,name,sender,recipient) VALUES(?,?,?,?,?,?,?,?,?,?,?,?);")) == NULL)
goto rollback;
if (!( sqlite_code_ok(sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_blob(stmt, 2, m->manifestdata, m->manifest_bytes, SQLITE_TRANSIENT))
if (!( sqlite_code_ok(sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_blob(stmt, 2, m->manifestdata, m->manifest_bytes, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_int64(stmt, 3, m->version))
&& sqlite_code_ok(sqlite3_bind_int64(stmt, 4, (long long) gettime_ms()))
&& sqlite_code_ok(sqlite3_bind_blob(stmt, 5, bar, RHIZOME_BAR_BYTES, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_blob(stmt, 5, bar, RHIZOME_BAR_BYTES, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_int64(stmt, 6, m->fileLength))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 7, filehash, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 8, author, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 9, service, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 10, name, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 11, sender, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 12, recipient, -1, SQLITE_TRANSIENT))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 7, filehash, -1, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 8, author, -1, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 9, service, -1, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 10, name, -1, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 11, sender, -1, SQLITE_STATIC))
&& sqlite_code_ok(sqlite3_bind_text(stmt, 12, recipient, -1, SQLITE_STATIC))
)) {
WHYF("query failed, %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(stmt));
goto rollback;
@ -1375,7 +1374,7 @@ int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest *m){
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT manifest, version, inserttime, author FROM manifests WHERE id = ?");
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT manifest, version, inserttime, author FROM manifests WHERE id like ?");
if (!statement)
return -1;
@ -1410,3 +1409,36 @@ done:
sqlite3_finalize(statement);
return ret;
}
int rhizome_is_bar_interesting(unsigned char *bar){
int64_t version = rhizome_bar_version(bar);
int ret=1;
char id_hex[RHIZOME_MANIFEST_ID_STRLEN];
tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
strcat(id_hex, "%");
// are we ignoring this manifest?
if (rhizome_ignore_manifest_check(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES)){
DEBUGF("Ignoring %s", id_hex);
return 0;
}
// do we have this bundle [or later]?
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry,
"SELECT id, version FROM manifests WHERE id like ? and version >= ?");
sqlite3_bind_text(statement, 1, id_hex, -1, SQLITE_STATIC);
sqlite3_bind_int64(statement, 2, version);
if (sqlite_step_retry(&retry, statement) == SQLITE_ROW){
if (0){
const char *q_id = (const char *) sqlite3_column_text(statement, 0);
long long q_version = (long long) sqlite3_column_int64(statement, 1);
DEBUGF("Already have %s, %lld (vs %s, %lld)", q_id, q_version, id_hex, version);
}
ret=0;
}
sqlite3_finalize(statement);
return ret;
}

View File

@ -425,9 +425,7 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
}
typedef struct ignored_manifest {
unsigned char bid[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES];
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
unsigned char bid[RHIZOME_BAR_PREFIX_BYTES];
time_ms_t timeout;
} ignored_manifest;
@ -447,15 +445,18 @@ typedef struct ignored_manifest_cache {
a collision is exceedingly remote */
ignored_manifest_cache ignored;
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char *peersid)
int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len)
{
int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS);
if (prefix_len < RHIZOME_BAR_PREFIX_BYTES)
FATAL("Prefix length is too short");
int bin = bid_prefix[0]>>(8-IGNORED_BIN_BITS);
int slot;
for(slot = 0; slot != IGNORED_BIN_SIZE; ++slot)
{
if (!memcmp(ignored.bins[bin].m[slot].bid,
m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES))
bid_prefix,
RHIZOME_BAR_PREFIX_BYTES))
{
if (ignored.bins[bin].m[slot].timeout>gettime_ms())
return 1;
@ -466,31 +467,28 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in
return 0;
}
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout)
int rhizome_queue_ignore_manifest(unsigned char *bid_prefix, int prefix_len, int timeout)
{
if (prefix_len < RHIZOME_BAR_PREFIX_BYTES)
FATAL("Prefix length is too short");
/* The supplied manifest from a given IP has errors, so remember
that it isn't worth considering */
int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS);
int bin = bid_prefix[0]>>(8-IGNORED_BIN_BITS);
int slot;
for(slot = 0; slot != IGNORED_BIN_SIZE; ++slot)
{
if (!memcmp(ignored.bins[bin].m[slot].bid,
m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES))
bid_prefix,
RHIZOME_BAR_PREFIX_BYTES))
break;
}
if (slot>=IGNORED_BIN_SIZE) slot=random()%IGNORED_BIN_SIZE;
bcopy(&m->cryptoSignPublic[0],
bcopy(&bid_prefix[0],
&ignored.bins[bin].m[slot].bid[0],
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES);
RHIZOME_BAR_PREFIX_BYTES);
/* ignore for a while */
ignored.bins[bin].m[slot].timeout=gettime_ms()+timeout;
bcopy(peerip,
&ignored.bins[bin].m[slot].peer_ipandport,
sizeof(struct sockaddr_in));
bcopy(peersid,
ignored.bins[bin].m[slot].peer_sid,
SID_SIZE);
return 0;
}
@ -897,7 +895,8 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (rhizome_manifest_verify(m) != 0) {
WHY("Error verifying manifest when considering for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_queue_ignore_manifest(m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
@ -931,7 +930,8 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_queue_ignore_manifest(m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
@ -957,7 +957,8 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_queue_ignore_manifest(m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}

View File

@ -113,11 +113,12 @@ int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar)
RETURN(0);
}
long long rhizome_bar_version(unsigned char *bar)
int64_t rhizome_bar_version(unsigned char *bar)
{
long long version=0;
int64_t version=0;
int i;
for(i=0;i<7;i++) version|=bar[RHIZOME_BAR_VERSION_OFFSET+6-i]<<(8LL*i);
for(i=0;i<7;i++)
version|=((int64_t)(bar[RHIZOME_BAR_VERSION_OFFSET+6-i]))<<(8LL*i);
return version;
}
@ -132,194 +133,123 @@ unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar)
return bidprefix;
}
struct advertisement_state{
long long bundles_available;
int64_t bundle_last_rowid[2];
};
struct advertisement_state advert_state[OVERLAY_MAX_INTERFACES];
static int append_bars(struct overlay_buffer *e, sqlite_retry_state *retry, const char *sql, long long *last_rowid){
int count=0;
sqlite3_stmt *statement=sqlite_prepare(retry, sql, *last_rowid);
while(sqlite_step_retry(retry, statement) == SQLITE_ROW) {
count++;
if (sqlite3_column_type(statement, 0)!=SQLITE_BLOB)
continue;
const void *data = sqlite3_column_blob(statement, 0);
int blob_bytes = sqlite3_column_bytes(statement, 0);
int64_t rowid = sqlite3_column_int64(statement, 1);
if (blob_bytes!=RHIZOME_BAR_BYTES) {
if (config.debug.rhizome_ads)
DEBUG("Found a BAR that is the wrong size - ignoring");
continue;
}
if (ob_append_bytes(e, (unsigned char *)data, blob_bytes)){
// out of room
count--;
break;
}
*last_rowid=rowid;
}
if (statement)
sqlite3_finalize(statement);
return count;
}
int overlay_rhizome_add_advertisements(struct decode_context *context, int interface_number, struct overlay_buffer *e)
{
IN();
/* We need to change manifest table to include payload length to make our life
easy here (also would let us order advertisements by size of payload).
For now, we will just advertised only occassionally.
XXX We will move all processing of Rhizome into a separate process
so that the CPU delays caused by Rhizome verifying signatures isn't a problem.
/* Periodically queue BAR advertisements
Always advertise the most recent 3 manifests in the table, cycle through the rest of the table, adding 17 BAR's at a time
*/
if (!is_rhizome_advertise_enabled())
RETURN(0);
void overlay_rhizome_advertise(struct sched_ent *alarm){
static long long bundles_available=0;
static int64_t bundle_last_rowid=INT64_MAX;
struct advertisement_state *state=&advert_state[interface_number];
if (!is_rhizome_advertise_enabled())
return;
int pass;
/* XXX Should add priority bundles here.
XXX Should prioritise bundles for subscribed groups, Serval-authorised files
etc over common bundles.
XXX Should wait a while after going through bundle list so that we don't waste
CPU on db queries if there are not many bundles. Actually, we probably just
shouldn't be sending bundles blindly on every tick.
XXX How do we indicate group membership with BARs? Or do groups actively poll?
XXX XXX XXX We should cache database results so that we don't waste all our time
and energy asking the database much the same questions possibly many times per
second.
*/
// TODO Group handling not completely thought out here yet.
int (*oldfunc)() = sqlite_set_tracefunc(is_debug_rhizome_ads);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
/* Get number of bundles available if required */
long long tmp = 0;
if (sqlite_exec_int64_retry(&retry, &tmp, "SELECT COUNT(BAR) FROM MANIFESTS;") != 1) {
sqlite_set_tracefunc(oldfunc);
RETURN(WHY("Could not count BARs for advertisement"));
/* Get number of bundles available */
if (sqlite_exec_int64_retry(&retry, &bundles_available, "SELECT COUNT(BAR) FROM MANIFESTS;") != 1){
WHY("Could not count BARs for advertisement");
goto end;
}
if (state->bundles_available!=tmp){
state->bundle_last_rowid[0]=INT64_MAX;
state->bundle_last_rowid[1]=INT64_MAX;
}
if (bundles_available<1)
goto end;
state->bundles_available = tmp;
if (tmp<1)
RETURN(0);
struct overlay_frame *frame = malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
frame->type = OF_TYPE_RHIZOME_ADVERT;
frame->source = my_subscriber;
frame->ttl = 1;
frame->queue = OQ_OPPORTUNISTIC;
frame->payload = ob_new();
ob_limitsize(frame->payload, 800);
/* Randomly choose whether to advertise manifests or BARs first. */
int skipmanifests=random()&1;
ob_append_byte(frame->payload, 2);
ob_append_ui16(frame->payload, rhizome_http_server_port);
if (config.debug.rhizome_ads)
DEBUGF("%lld bundles in database, starting from %lld, %lld.",state->bundles_available,
state->bundle_last_rowid[0], state->bundle_last_rowid[1]);
long long rowid=0;
int count = append_bars(frame->payload, &retry,
"SELECT BAR,ROWID FROM MANIFESTS ORDER BY ROWID DESC LIMIT 3",
&rowid);
sqlite3_stmt *statement=NULL;
ob_checkpoint(e);
if (overlay_frame_build_header(context, e,
0, OF_TYPE_RHIZOME_ADVERT, 0, 1,
NULL, NULL,
NULL, my_subscriber)){
ob_rewind(e);
RETURN(-1);
}
if (ob_append_rfs(e, 2)){
ob_rewind(e);
RETURN(-1);
}
/* Version of rhizome advert block (1 byte):
1 = manifests then BARs,
2 = BARs only,
3 = HTTP port then manifests then BARs,
4 = HTTP port then BARs only
*/
if (ob_append_byte(e,3+skipmanifests)){
ob_rewind(e);
RETURN(-1);
}
/* Rhizome HTTP server port number (2 bytes) */
if (ob_append_ui16(e, rhizome_http_server_port)){
ob_rewind(e);
RETURN(-1);
}
for(pass=skipmanifests;pass<2;pass++) {
ob_checkpoint(e);
switch(pass) {
case 0: /* Full manifests */
statement = sqlite_prepare(&retry, "SELECT MANIFEST,ROWID,ID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 30",
state->bundle_last_rowid[pass]);
break;
case 1: /* BARs */
statement = sqlite_prepare(&retry, "SELECT BAR,ROWID,ID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 30",
state->bundle_last_rowid[pass]);
break;
}
if (!statement) {
sqlite_set_tracefunc(oldfunc);
WHY("Could not prepare sql statement for fetching BARs for advertisement");
goto stopStuffing;
}
int count=0;
if (count>=3){
if (bundle_last_rowid>rowid || bundle_last_rowid<=0)
bundle_last_rowid=rowid;
while(sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
count++;
if (sqlite3_column_type(statement, 0)!=SQLITE_BLOB)
continue;
const void *data = sqlite3_column_blob(statement, 0);
int blob_bytes = sqlite3_column_bytes(statement, 0);
int64_t rowid = sqlite3_column_int64(statement, 1);
const unsigned char *manifestId = sqlite3_column_text(statement, 2);
if (config.debug.rhizome_ads)
DEBUGF("Considering manifest %s", manifestId);
if (pass&&(blob_bytes!=RHIZOME_BAR_BYTES)) {
if (config.debug.rhizome_ads)
DEBUG("Found a BAR that is the wrong size - ignoring");
continue;
}
/* Only include manifests that are <=1KB inline.
Longer ones are only advertised by BAR */
if (blob_bytes>1024) {
WARN("ignoring manifest > 1k");
continue;
}
int overhead=(!pass)?2:0;
/* make sure there's enough room for the blob, its length,
the 0xFF end marker and 1 spare for the rfs length to increase */
if (ob_makespace(e,overhead+blob_bytes+1))
goto stopStuffing;
if (!pass) {
/* include manifest length field */
ob_append_ui16(e, blob_bytes);
}
if (ob_append_bytes(e, (unsigned char *)data, blob_bytes)){
WHY("Failed to append data into buffer");
goto stopStuffing;
}
state->bundle_last_rowid[pass]=rowid;
ob_checkpoint(e);
}
if (count<30){
// if we hit the end of the cursor, before the end of the packey buffer, restart next time at the beginning.
state->bundle_last_rowid[pass]=INT64_MAX;
}
stopStuffing:
if (statement)
sqlite3_finalize(statement);
statement = NULL;
ob_rewind(e);
if (!pass) {
/* Mark end of whole manifests by writing 0xff, which is more than the MSB
of a manifest's length is allowed to be. */
ob_append_byte(e,0xff);
}
count = append_bars(frame->payload, &retry,
"SELECT BAR,ROWID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 17",
&bundle_last_rowid);
if (count<17)
bundle_last_rowid=INT64_MAX;
}
ob_patch_rfs(e);
if (overlay_payload_enqueue(frame))
op_free(frame);
end:
sqlite_set_tracefunc(oldfunc);
RETURN(0);
alarm->alarm = gettime_ms()+500;
alarm->deadline = alarm->alarm+10000;
schedule(alarm);
}
/* Queue an advertisment for a single manifest */
int rhizome_advertise_manifest(rhizome_manifest *m){
struct overlay_frame *frame = malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
frame->type = OF_TYPE_RHIZOME_ADVERT;
frame->source = my_subscriber;
frame->ttl = 1;
frame->queue = OQ_OPPORTUNISTIC;
frame->payload = ob_new();
ob_limitsize(frame->payload, 800);
if (ob_append_byte(frame->payload, 3)) goto error;
if (ob_append_ui16(frame->payload, rhizome_http_server_port)) goto error;
if (ob_append_ui16(frame->payload, m->manifest_all_bytes)) goto error;
if (ob_append_bytes(frame->payload, m->manifestdata, m->manifest_all_bytes)) goto error;
ob_append_byte(frame->payload, 0xFF);
if (overlay_payload_enqueue(frame)) goto error;
return 0;
error:
op_free(frame);
return -1;
}
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long now)
@ -338,115 +268,153 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
int (*oldfunc)() = sqlite_set_tracefunc(is_debug_rhizome_ads);
switch (ad_frame_type) {
case 3:
/* The same as type=1, but includes the source HTTP port number */
httpaddr.sin_port = htons(ob_get_ui16(f->payload));
// FALL THROUGH ...
case 1:
/* Extract whole manifests */
while(f->payload->position < f->payload->sizeLimit) {
if (ob_getbyte(f->payload, f->payload->position)==0xff){
f->payload->position++;
break;
}
manifest_length=ob_get_ui16(f->payload);
if (manifest_length==0) continue;
unsigned char *data = ob_get_bytes_ptr(f->payload, manifest_length);
if (!data) {
assert(inet_ntop(AF_INET, &httpaddr.sin_addr, httpaddrtxt, sizeof(httpaddrtxt)) != NULL);
WHYF("Illegal manifest length field in rhizome advertisement frame %d vs %d.",
manifest_length, f->payload->sizeLimit - f->payload->position);
break;
}
/* Read manifest without verifying signatures (which would waste lots of
energy, everytime we see a manifest that we already have).
In fact, it would be better here to do a really rough and ready parser
to get the id and version fields out, and avoid the memory copies that
otherwise happen.
But we do need to make sure that at least one signature is there.
*/
m = rhizome_new_manifest();
if (!m) {
WHY("Out of manifests");
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
if (rhizome_read_manifest_file(m, (char *)data, manifest_length) == -1) {
WHY("Error importing manifest body");
rhizome_manifest_free(m);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
char manifest_id_prefix[RHIZOME_MANIFEST_ID_STRLEN + 1];
if (rhizome_manifest_get(m, "id", manifest_id_prefix, sizeof manifest_id_prefix) == NULL) {
WHY("Manifest does not contain 'id' field");
rhizome_manifest_free(m);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
/* trim manifest ID to a prefix for ease of debugging
(that is the only use of this */
manifest_id_prefix[8]=0;
long long version = rhizome_manifest_get_ll(m, "version");
if (config.debug.rhizome_ads)
DEBUGF("manifest id=%s* version=%lld", manifest_id_prefix, version);
/* Crude signature presence test */
for(i=m->manifest_all_bytes-1;i>0;i--)
if (!m->manifestdata[i]) {
/* A null in the middle says we have a signature */
break;
}
if (!i) {
/* ignore the announcement, but don't ignore other people
offering the same manifest */
WARN("Ignoring manifest announcment with no signature");
rhizome_manifest_free(m);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
if (rhizome_ignore_manifest_check(m, &httpaddr,f->source->sid))
{
/* Ignoring manifest that has caused us problems recently */
if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix);
}
else if (m->errors == 0)
{
/* Manifest is okay, so see if it is worth storing */
if (rhizome_manifest_version_cache_lookup(m)) {
/* We already have this version or newer */
if (config.debug.rhizome_ads)
DEBUG("We already have that manifest or newer.");
} else {
if (config.debug.rhizome_ads)
DEBUG("Not seen before.");
rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid);
// the above function will free the manifest structure, make sure we don't free it again
m=NULL;
}
}
else
{
if (config.debug.rhizome_ads)
DEBUG("Unverified manifest has errors - so not processing any further.");
/* Don't waste any time on this manifest in future attempts for at least
a minute. */
rhizome_queue_ignore_manifest(m, &httpaddr,f->source->sid, 60000);
}
if (m) {
rhizome_manifest_free(m);
m = NULL;
}
if (ad_frame_type & 2){
httpaddr.sin_port = htons(ob_get_ui16(f->payload));
}
if (ad_frame_type & 1){
/* Extract whole manifests */
while(f->payload->position < f->payload->sizeLimit) {
if (ob_getbyte(f->payload, f->payload->position)==0xff){
f->payload->position++;
break;
}
manifest_length=ob_get_ui16(f->payload);
if (manifest_length==0) continue;
unsigned char *data = ob_get_bytes_ptr(f->payload, manifest_length);
if (!data) {
assert(inet_ntop(AF_INET, &httpaddr.sin_addr, httpaddrtxt, sizeof(httpaddrtxt)) != NULL);
WHYF("Illegal manifest length field in rhizome advertisement frame %d vs %d.",
manifest_length, f->payload->sizeLimit - f->payload->position);
break;
}
/* Read manifest without verifying signatures (which would waste lots of
energy, everytime we see a manifest that we already have).
In fact, it would be better here to do a really rough and ready parser
to get the id and version fields out, and avoid the memory copies that
otherwise happen.
But we do need to make sure that at least one signature is there.
*/
m = rhizome_new_manifest();
if (!m) {
WHY("Out of manifests");
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
if (rhizome_read_manifest_file(m, (char *)data, manifest_length) == -1) {
WHY("Error importing manifest body");
rhizome_manifest_free(m);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
char manifest_id_prefix[RHIZOME_MANIFEST_ID_STRLEN + 1];
if (rhizome_manifest_get(m, "id", manifest_id_prefix, sizeof manifest_id_prefix) == NULL) {
WHY("Manifest does not contain 'id' field");
rhizome_manifest_free(m);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
/* trim manifest ID to a prefix for ease of debugging
(that is the only use of this */
manifest_id_prefix[8]=0;
if (config.debug.rhizome_ads){
long long version = rhizome_manifest_get_ll(m, "version");
DEBUGF("manifest id=%s* version=%lld", manifest_id_prefix, version);
}
/* Crude signature presence test */
for(i=m->manifest_all_bytes-1;i>0;i--)
if (!m->manifestdata[i]) {
/* A null in the middle says we have a signature */
break;
}
if (!i) {
/* ignore the announcement, but don't ignore other people
offering the same manifest */
WARN("Ignoring manifest announcment with no signature");
rhizome_manifest_free(m);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}
if (rhizome_ignore_manifest_check(m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES))
{
/* Ignoring manifest that has caused us problems recently */
if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix);
}
else if (m->errors == 0)
{
/* Manifest is okay, so see if it is worth storing */
if (rhizome_manifest_version_cache_lookup(m)) {
/* We already have this version or newer */
if (config.debug.rhizome_ads)
DEBUG("We already have that manifest or newer.");
} else {
if (config.debug.rhizome_ads)
DEBUG("Not seen before.");
rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid);
// the above function will free the manifest structure, make sure we don't free it again
m=NULL;
}
}
else
{
if (config.debug.rhizome_ads)
DEBUG("Unverified manifest has errors - so not processing any further.");
/* Don't waste any time on this manifest in future attempts for at least
a minute. */
rhizome_queue_ignore_manifest(m->cryptoSignPublic,
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000);
}
if (m) {
rhizome_manifest_free(m);
m = NULL;
}
}
}
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
mdp.out.payload_length=0;
// parse BAR's
while(ob_remaining(f->payload)>0){
unsigned char *bar=ob_get_bytes_ptr(f->payload, RHIZOME_BAR_BYTES);
if (!bar){
WARNF("Expected whole BAR @%d (only %d remains)", ob_position(f->payload), ob_remaining(f->payload));
dump("buffer", ob_ptr(f->payload), ob_limit(f->payload));
break;
}
if (rhizome_is_bar_interesting(bar)==1){
// add a request for the manifest
if (mdp.out.payload_length==0){
bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE);
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
bcopy(f->source->sid,mdp.out.dst.sid,SID_SIZE);
mdp.out.dst.port=MDP_PORT_RHIZOME_MANIFEST_REQUEST;
if (f->source->reachable&REACHABLE_DIRECT)
mdp.out.ttl=1;
else
mdp.out.ttl=64;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_ORDINARY;
}
DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bar, RHIZOME_BAR_BYTES));
bcopy(bar, &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES);
mdp.out.payload_length+=RHIZOME_BAR_BYTES;
}
}
if (mdp.out.payload_length>0)
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
sqlite_set_tracefunc(oldfunc);
RETURN(0);
}

View File

@ -491,7 +491,7 @@ int overlay_frame_build_header(struct decode_context *context, struct overlay_bu
struct broadcast *broadcast, struct subscriber *next_hop,
struct subscriber *destination, struct subscriber *source);
int overlay_interface_args(const char *arg);
int overlay_rhizome_add_advertisements(struct decode_context *context, int interface_number, struct overlay_buffer *e);
void overlay_rhizome_advertise(struct sched_ent *alarm);
int overlay_add_local_identity(unsigned char *s);
extern int overlay_interface_count;