diff --git a/constants.h b/constants.h index 901dd259..b36241b2 100644 --- a/constants.h +++ b/constants.h @@ -137,6 +137,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define MDP_PORT_RHIZOME_REQUEST 13 #define MDP_PORT_RHIZOME_RESPONSE 14 #define MDP_PORT_DIRECTORY 15 +#define MDP_PORT_RHIZOME_MANIFEST_REQUEST 16 #define MDP_PORT_NOREPLY 0x3f #define MDP_TYPE_MASK 0xff diff --git a/overlay.c b/overlay.c index 88d8922d..ceca7861 100644 --- a/overlay.c +++ b/overlay.c @@ -150,6 +150,9 @@ schedule(&_sched_##X); } /* Periodically update route table. */ SCHEDULE(overlay_route_tick, 100, 100); + /* Periodically advertise bundles */ + SCHEDULE(overlay_rhizome_advertise, 1000, 10000); + /* Show CPU usage stats periodically */ if (config.debug.timing){ SCHEDULE(fd_periodicstats, 3000, 500); diff --git a/overlay_mdp.c b/overlay_mdp.c index 0366e468..49de8b9b 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -528,6 +528,7 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG case MDP_PORT_DNALOOKUP: case MDP_PORT_RHIZOME_RESPONSE: case MDP_PORT_RHIZOME_REQUEST: + case MDP_PORT_RHIZOME_MANIFEST_REQUEST: case MDP_PORT_PROBE: case MDP_PORT_STUNREQ: case MDP_PORT_STUN: diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 233ff219..b779c2f1 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -300,6 +300,27 @@ int overlay_mdp_service_echo(overlay_mdp_frame *mdp) RETURN(0); } +static int overlay_mdp_service_manifest_response(overlay_mdp_frame *mdp){ + int offset=0; + char id_hex[RHIZOME_MANIFEST_ID_STRLEN]; + rhizome_manifest *m = rhizome_new_manifest(); + if (!m) + return WHY("Unable to allocate manifest"); + + while (offsetout.payload_length){ + unsigned char *bar=&mdp->out.payload[offset]; + tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES); + strcat(id_hex, "%"); + if (!rhizome_retrieve_manifest(id_hex, m)){ + rhizome_advertise_manifest(m); + } + offset+=RHIZOME_BAR_BYTES; + } + + rhizome_manifest_free(m); + return 0; +} + int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) { IN(); @@ -316,6 +337,7 @@ int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) RETURN(overlay_mdp_service_rhizomerequest(mdp)); } else break; case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp)); + case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_response(mdp)); } /* Unbound socket. We won't be sending ICMP style connection refused diff --git a/overlay_queue.c b/overlay_queue.c index bf53a9f0..f7b85835 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -41,7 +41,6 @@ struct outgoing_packet{ overlay_interface *interface; int i; struct subscriber *unicast_subscriber; - int add_advertisements; struct sockaddr_in dest; int header_length; struct overlay_buffer *buffer; @@ -240,7 +239,6 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati packet->i = (interface - overlay_interfaces); packet->dest=addr; packet->buffer=ob_new(); - packet->add_advertisements=1; if (unicast) packet->unicast_subscriber = destination; ob_limitsize(packet->buffer, packet->interface->mtu); @@ -452,10 +450,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim goto skip; } - // don't send rhizome adverts if the packet contains a voice payload - if (frame->queue==OQ_ISOCHRONOUS_VOICE) - packet->add_advertisements=0; - sent: if (config.debug.overlayframes){ DEBUGF("Sent payload type %x len %d for %s via %s", frame->type, ob_position(frame->payload), @@ -521,10 +515,6 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) { if(packet->buffer){ if (ob_position(packet->buffer) > packet->header_length){ - // stuff rhizome announcements at the last moment - if (packet->add_advertisements) - overlay_rhizome_add_advertisements(&packet->context, packet->i,packet->buffer); - if (config.debug.packetconstruction) ob_dump(packet->buffer,"assembled packet"); diff --git a/rhizome.h b/rhizome.h index 8dab27f5..6deb5a48 100644 --- a/rhizome.h +++ b/rhizome.h @@ -306,12 +306,14 @@ int rhizome_manifest_extract_signature(rhizome_manifest *m,int *ofs); int rhizome_update_file_priority(const char *fileid); int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found, int check_author); int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar); -long long rhizome_bar_version(unsigned char *bar); +int64_t rhizome_bar_version(unsigned char *bar); unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar); +int rhizome_is_bar_interesting(unsigned char *bar); int rhizome_list_manifests(const char *service, const char *name, const char *sender_sid, const char *recipient_sid, int limit, int offset, char count_rows); int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest *m); +int rhizome_advertise_manifest(rhizome_manifest *m); #define RHIZOME_DONTVERIFY 0 #define RHIZOME_VERIFY 1 @@ -349,8 +351,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk, int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk, const unsigned char *pk); int rhizome_find_bundle_author(rhizome_manifest *m); -int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout); -int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); +int rhizome_queue_ignore_manifest(unsigned char *bid_prefix, int prefix_len, int timeout); +int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len); /* one manifest is required per candidate, plus a few spare. so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES. diff --git a/rhizome_database.c b/rhizome_database.c index 0615a76f..79bdd06e 100644 --- a/rhizome_database.c +++ b/rhizome_database.c @@ -287,7 +287,6 @@ int rhizome_opendb() verify_bundles(); sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "PRAGMA user_version=2;"); } - // TODO recreate tables with collate nocase on hex columns /* Future schema updates should be performed here. @@ -843,22 +842,22 @@ int rhizome_store_bundle(rhizome_manifest *m) sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") != SQLITE_OK) return WHY("Failed to begin transaction"); - + sqlite3_stmt *stmt; if ((stmt = sqlite_prepare(&retry, "INSERT OR REPLACE INTO MANIFESTS(id,manifest,version,inserttime,bar,filesize,filehash,author,service,name,sender,recipient) VALUES(?,?,?,?,?,?,?,?,?,?,?,?);")) == NULL) goto rollback; - if (!( sqlite_code_ok(sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT)) - && sqlite_code_ok(sqlite3_bind_blob(stmt, 2, m->manifestdata, m->manifest_bytes, SQLITE_TRANSIENT)) + if (!( sqlite_code_ok(sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_STATIC)) + && sqlite_code_ok(sqlite3_bind_blob(stmt, 2, m->manifestdata, m->manifest_bytes, SQLITE_STATIC)) && sqlite_code_ok(sqlite3_bind_int64(stmt, 3, m->version)) && sqlite_code_ok(sqlite3_bind_int64(stmt, 4, (long long) gettime_ms())) - && sqlite_code_ok(sqlite3_bind_blob(stmt, 5, bar, RHIZOME_BAR_BYTES, SQLITE_TRANSIENT)) + && sqlite_code_ok(sqlite3_bind_blob(stmt, 5, bar, RHIZOME_BAR_BYTES, SQLITE_STATIC)) && sqlite_code_ok(sqlite3_bind_int64(stmt, 6, m->fileLength)) - && sqlite_code_ok(sqlite3_bind_text(stmt, 7, filehash, -1, SQLITE_TRANSIENT)) - && sqlite_code_ok(sqlite3_bind_text(stmt, 8, author, -1, SQLITE_TRANSIENT)) - && sqlite_code_ok(sqlite3_bind_text(stmt, 9, service, -1, SQLITE_TRANSIENT)) - && sqlite_code_ok(sqlite3_bind_text(stmt, 10, name, -1, SQLITE_TRANSIENT)) - && sqlite_code_ok(sqlite3_bind_text(stmt, 11, sender, -1, SQLITE_TRANSIENT)) - && sqlite_code_ok(sqlite3_bind_text(stmt, 12, recipient, -1, SQLITE_TRANSIENT)) + && sqlite_code_ok(sqlite3_bind_text(stmt, 7, filehash, -1, SQLITE_STATIC)) + && sqlite_code_ok(sqlite3_bind_text(stmt, 8, author, -1, SQLITE_STATIC)) + && sqlite_code_ok(sqlite3_bind_text(stmt, 9, service, -1, SQLITE_STATIC)) + && sqlite_code_ok(sqlite3_bind_text(stmt, 10, name, -1, SQLITE_STATIC)) + && sqlite_code_ok(sqlite3_bind_text(stmt, 11, sender, -1, SQLITE_STATIC)) + && sqlite_code_ok(sqlite3_bind_text(stmt, 12, recipient, -1, SQLITE_STATIC)) )) { WHYF("query failed, %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(stmt)); goto rollback; @@ -1375,7 +1374,7 @@ int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest *m){ sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; - sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT manifest, version, inserttime, author FROM manifests WHERE id = ?"); + sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT manifest, version, inserttime, author FROM manifests WHERE id like ?"); if (!statement) return -1; @@ -1410,3 +1409,36 @@ done: sqlite3_finalize(statement); return ret; } + +int rhizome_is_bar_interesting(unsigned char *bar){ + int64_t version = rhizome_bar_version(bar); + int ret=1; + char id_hex[RHIZOME_MANIFEST_ID_STRLEN]; + tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES); + strcat(id_hex, "%"); + + // are we ignoring this manifest? + if (rhizome_ignore_manifest_check(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES)){ + DEBUGF("Ignoring %s", id_hex); + return 0; + } + + // do we have this bundle [or later]? + sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; + sqlite3_stmt *statement = sqlite_prepare(&retry, + "SELECT id, version FROM manifests WHERE id like ? and version >= ?"); + + sqlite3_bind_text(statement, 1, id_hex, -1, SQLITE_STATIC); + sqlite3_bind_int64(statement, 2, version); + + if (sqlite_step_retry(&retry, statement) == SQLITE_ROW){ + if (0){ + const char *q_id = (const char *) sqlite3_column_text(statement, 0); + long long q_version = (long long) sqlite3_column_int64(statement, 1); + DEBUGF("Already have %s, %lld (vs %s, %lld)", q_id, q_version, id_hex, version); + } + ret=0; + } + sqlite3_finalize(statement); + return ret; +} \ No newline at end of file diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 9edc8f78..8920dc65 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -425,9 +425,7 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) } typedef struct ignored_manifest { - unsigned char bid[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES]; - struct sockaddr_in peer_ipandport; - unsigned char peer_sid[SID_SIZE]; + unsigned char bid[RHIZOME_BAR_PREFIX_BYTES]; time_ms_t timeout; } ignored_manifest; @@ -447,15 +445,18 @@ typedef struct ignored_manifest_cache { a collision is exceedingly remote */ ignored_manifest_cache ignored; -int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char *peersid) +int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len) { - int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS); + if (prefix_len < RHIZOME_BAR_PREFIX_BYTES) + FATAL("Prefix length is too short"); + + int bin = bid_prefix[0]>>(8-IGNORED_BIN_BITS); int slot; for(slot = 0; slot != IGNORED_BIN_SIZE; ++slot) { if (!memcmp(ignored.bins[bin].m[slot].bid, - m->cryptoSignPublic, - crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES)) + bid_prefix, + RHIZOME_BAR_PREFIX_BYTES)) { if (ignored.bins[bin].m[slot].timeout>gettime_ms()) return 1; @@ -466,31 +467,28 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in return 0; } -int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout) +int rhizome_queue_ignore_manifest(unsigned char *bid_prefix, int prefix_len, int timeout) { + if (prefix_len < RHIZOME_BAR_PREFIX_BYTES) + FATAL("Prefix length is too short"); + /* The supplied manifest from a given IP has errors, so remember that it isn't worth considering */ - int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS); + int bin = bid_prefix[0]>>(8-IGNORED_BIN_BITS); int slot; for(slot = 0; slot != IGNORED_BIN_SIZE; ++slot) { if (!memcmp(ignored.bins[bin].m[slot].bid, - m->cryptoSignPublic, - crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES)) + bid_prefix, + RHIZOME_BAR_PREFIX_BYTES)) break; } if (slot>=IGNORED_BIN_SIZE) slot=random()%IGNORED_BIN_SIZE; - bcopy(&m->cryptoSignPublic[0], + bcopy(&bid_prefix[0], &ignored.bins[bin].m[slot].bid[0], - crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES); + RHIZOME_BAR_PREFIX_BYTES); /* ignore for a while */ ignored.bins[bin].m[slot].timeout=gettime_ms()+timeout; - bcopy(peerip, - &ignored.bins[bin].m[slot].peer_ipandport, - sizeof(struct sockaddr_in)); - bcopy(peersid, - ignored.bins[bin].m[slot].peer_sid, - SID_SIZE); return 0; } @@ -897,7 +895,8 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (rhizome_manifest_verify(m) != 0) { WHY("Error verifying manifest when considering for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); + rhizome_queue_ignore_manifest(m->cryptoSignPublic, + crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000); rhizome_manifest_free(m); RETURN(-1); } @@ -931,7 +930,8 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); + rhizome_queue_ignore_manifest(m->cryptoSignPublic, + crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000); rhizome_manifest_free(m); RETURN(-1); } @@ -957,7 +957,8 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); + rhizome_queue_ignore_manifest(m->cryptoSignPublic, + crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000); rhizome_manifest_free(m); RETURN(-1); } diff --git a/rhizome_packetformats.c b/rhizome_packetformats.c index c30d9b42..9a0284ed 100644 --- a/rhizome_packetformats.c +++ b/rhizome_packetformats.c @@ -113,11 +113,12 @@ int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar) RETURN(0); } -long long rhizome_bar_version(unsigned char *bar) +int64_t rhizome_bar_version(unsigned char *bar) { - long long version=0; + int64_t version=0; int i; - for(i=0;i<7;i++) version|=bar[RHIZOME_BAR_VERSION_OFFSET+6-i]<<(8LL*i); + for(i=0;i<7;i++) + version|=((int64_t)(bar[RHIZOME_BAR_VERSION_OFFSET+6-i]))<<(8LL*i); return version; } @@ -132,194 +133,123 @@ unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar) return bidprefix; } -struct advertisement_state{ - long long bundles_available; - int64_t bundle_last_rowid[2]; -}; -struct advertisement_state advert_state[OVERLAY_MAX_INTERFACES]; +static int append_bars(struct overlay_buffer *e, sqlite_retry_state *retry, const char *sql, long long *last_rowid){ + int count=0; + + sqlite3_stmt *statement=sqlite_prepare(retry, sql, *last_rowid); + + while(sqlite_step_retry(retry, statement) == SQLITE_ROW) { + count++; + if (sqlite3_column_type(statement, 0)!=SQLITE_BLOB) + continue; + + const void *data = sqlite3_column_blob(statement, 0); + int blob_bytes = sqlite3_column_bytes(statement, 0); + int64_t rowid = sqlite3_column_int64(statement, 1); + + if (blob_bytes!=RHIZOME_BAR_BYTES) { + if (config.debug.rhizome_ads) + DEBUG("Found a BAR that is the wrong size - ignoring"); + continue; + } + + if (ob_append_bytes(e, (unsigned char *)data, blob_bytes)){ + // out of room + count--; + break; + } + + *last_rowid=rowid; + } + + if (statement) + sqlite3_finalize(statement); + + return count; +} -int overlay_rhizome_add_advertisements(struct decode_context *context, int interface_number, struct overlay_buffer *e) -{ - IN(); - - /* We need to change manifest table to include payload length to make our life - easy here (also would let us order advertisements by size of payload). - For now, we will just advertised only occassionally. - - XXX We will move all processing of Rhizome into a separate process - so that the CPU delays caused by Rhizome verifying signatures isn't a problem. +/* Periodically queue BAR advertisements + Always advertise the most recent 3 manifests in the table, cycle through the rest of the table, adding 17 BAR's at a time */ - if (!is_rhizome_advertise_enabled()) - RETURN(0); +void overlay_rhizome_advertise(struct sched_ent *alarm){ + static long long bundles_available=0; + static int64_t bundle_last_rowid=INT64_MAX; - struct advertisement_state *state=&advert_state[interface_number]; + if (!is_rhizome_advertise_enabled()) + return; - int pass; - - /* XXX Should add priority bundles here. - XXX Should prioritise bundles for subscribed groups, Serval-authorised files - etc over common bundles. - XXX Should wait a while after going through bundle list so that we don't waste - CPU on db queries if there are not many bundles. Actually, we probably just - shouldn't be sending bundles blindly on every tick. - XXX How do we indicate group membership with BARs? Or do groups actively poll? - - XXX XXX XXX We should cache database results so that we don't waste all our time - and energy asking the database much the same questions possibly many times per - second. - */ - - // TODO Group handling not completely thought out here yet. - int (*oldfunc)() = sqlite_set_tracefunc(is_debug_rhizome_ads); sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; - /* Get number of bundles available if required */ - long long tmp = 0; - if (sqlite_exec_int64_retry(&retry, &tmp, "SELECT COUNT(BAR) FROM MANIFESTS;") != 1) { - sqlite_set_tracefunc(oldfunc); - RETURN(WHY("Could not count BARs for advertisement")); + /* Get number of bundles available */ + if (sqlite_exec_int64_retry(&retry, &bundles_available, "SELECT COUNT(BAR) FROM MANIFESTS;") != 1){ + WHY("Could not count BARs for advertisement"); + goto end; } - if (state->bundles_available!=tmp){ - state->bundle_last_rowid[0]=INT64_MAX; - state->bundle_last_rowid[1]=INT64_MAX; - } + if (bundles_available<1) + goto end; - state->bundles_available = tmp; - if (tmp<1) - RETURN(0); + struct overlay_frame *frame = malloc(sizeof(struct overlay_frame)); + bzero(frame,sizeof(struct overlay_frame)); + frame->type = OF_TYPE_RHIZOME_ADVERT; + frame->source = my_subscriber; + frame->ttl = 1; + frame->queue = OQ_OPPORTUNISTIC; + frame->payload = ob_new(); + ob_limitsize(frame->payload, 800); - /* Randomly choose whether to advertise manifests or BARs first. */ - int skipmanifests=random()&1; + ob_append_byte(frame->payload, 2); + ob_append_ui16(frame->payload, rhizome_http_server_port); - if (config.debug.rhizome_ads) - DEBUGF("%lld bundles in database, starting from %lld, %lld.",state->bundles_available, - state->bundle_last_rowid[0], state->bundle_last_rowid[1]); + long long rowid=0; + int count = append_bars(frame->payload, &retry, + "SELECT BAR,ROWID FROM MANIFESTS ORDER BY ROWID DESC LIMIT 3", + &rowid); - sqlite3_stmt *statement=NULL; - - ob_checkpoint(e); - - if (overlay_frame_build_header(context, e, - 0, OF_TYPE_RHIZOME_ADVERT, 0, 1, - NULL, NULL, - NULL, my_subscriber)){ - ob_rewind(e); - RETURN(-1); - } - - if (ob_append_rfs(e, 2)){ - ob_rewind(e); - RETURN(-1); - } - - /* Version of rhizome advert block (1 byte): - 1 = manifests then BARs, - 2 = BARs only, - 3 = HTTP port then manifests then BARs, - 4 = HTTP port then BARs only - */ - if (ob_append_byte(e,3+skipmanifests)){ - ob_rewind(e); - RETURN(-1); - } - /* Rhizome HTTP server port number (2 bytes) */ - if (ob_append_ui16(e, rhizome_http_server_port)){ - ob_rewind(e); - RETURN(-1); - } - - for(pass=skipmanifests;pass<2;pass++) { - ob_checkpoint(e); - switch(pass) { - case 0: /* Full manifests */ - statement = sqlite_prepare(&retry, "SELECT MANIFEST,ROWID,ID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 30", - state->bundle_last_rowid[pass]); - break; - case 1: /* BARs */ - statement = sqlite_prepare(&retry, "SELECT BAR,ROWID,ID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 30", - state->bundle_last_rowid[pass]); - break; - } - if (!statement) { - sqlite_set_tracefunc(oldfunc); - WHY("Could not prepare sql statement for fetching BARs for advertisement"); - goto stopStuffing; - } - int count=0; + if (count>=3){ + if (bundle_last_rowid>rowid || bundle_last_rowid<=0) + bundle_last_rowid=rowid; - while(sqlite_step_retry(&retry, statement) == SQLITE_ROW) { - count++; - if (sqlite3_column_type(statement, 0)!=SQLITE_BLOB) - continue; - - const void *data = sqlite3_column_blob(statement, 0); - int blob_bytes = sqlite3_column_bytes(statement, 0); - int64_t rowid = sqlite3_column_int64(statement, 1); - const unsigned char *manifestId = sqlite3_column_text(statement, 2); - - if (config.debug.rhizome_ads) - DEBUGF("Considering manifest %s", manifestId); - - if (pass&&(blob_bytes!=RHIZOME_BAR_BYTES)) { - if (config.debug.rhizome_ads) - DEBUG("Found a BAR that is the wrong size - ignoring"); - continue; - } - - /* Only include manifests that are <=1KB inline. - Longer ones are only advertised by BAR */ - if (blob_bytes>1024) { - WARN("ignoring manifest > 1k"); - continue; - } - - int overhead=(!pass)?2:0; - - /* make sure there's enough room for the blob, its length, - the 0xFF end marker and 1 spare for the rfs length to increase */ - if (ob_makespace(e,overhead+blob_bytes+1)) - goto stopStuffing; - - if (!pass) { - /* include manifest length field */ - ob_append_ui16(e, blob_bytes); - } - - if (ob_append_bytes(e, (unsigned char *)data, blob_bytes)){ - WHY("Failed to append data into buffer"); - goto stopStuffing; - } - - state->bundle_last_rowid[pass]=rowid; - - ob_checkpoint(e); - } - - if (count<30){ - // if we hit the end of the cursor, before the end of the packey buffer, restart next time at the beginning. - state->bundle_last_rowid[pass]=INT64_MAX; - } - - stopStuffing: - if (statement) - sqlite3_finalize(statement); - statement = NULL; - - ob_rewind(e); - - if (!pass) { - /* Mark end of whole manifests by writing 0xff, which is more than the MSB - of a manifest's length is allowed to be. */ - ob_append_byte(e,0xff); - } + count = append_bars(frame->payload, &retry, + "SELECT BAR,ROWID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 17", + &bundle_last_rowid); + if (count<17) + bundle_last_rowid=INT64_MAX; } - - ob_patch_rfs(e); - + + if (overlay_payload_enqueue(frame)) + op_free(frame); + +end: sqlite_set_tracefunc(oldfunc); - RETURN(0); + alarm->alarm = gettime_ms()+500; + alarm->deadline = alarm->alarm+10000; + schedule(alarm); +} + +/* Queue an advertisment for a single manifest */ +int rhizome_advertise_manifest(rhizome_manifest *m){ + struct overlay_frame *frame = malloc(sizeof(struct overlay_frame)); + bzero(frame,sizeof(struct overlay_frame)); + frame->type = OF_TYPE_RHIZOME_ADVERT; + frame->source = my_subscriber; + frame->ttl = 1; + frame->queue = OQ_OPPORTUNISTIC; + frame->payload = ob_new(); + ob_limitsize(frame->payload, 800); + + if (ob_append_byte(frame->payload, 3)) goto error; + if (ob_append_ui16(frame->payload, rhizome_http_server_port)) goto error; + if (ob_append_ui16(frame->payload, m->manifest_all_bytes)) goto error; + if (ob_append_bytes(frame->payload, m->manifestdata, m->manifest_all_bytes)) goto error; + ob_append_byte(frame->payload, 0xFF); + if (overlay_payload_enqueue(frame)) goto error; + return 0; + +error: + op_free(frame); + return -1; } int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long now) @@ -338,115 +268,153 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long int (*oldfunc)() = sqlite_set_tracefunc(is_debug_rhizome_ads); - switch (ad_frame_type) { - case 3: - /* The same as type=1, but includes the source HTTP port number */ - httpaddr.sin_port = htons(ob_get_ui16(f->payload)); - // FALL THROUGH ... - case 1: - /* Extract whole manifests */ - while(f->payload->position < f->payload->sizeLimit) { - if (ob_getbyte(f->payload, f->payload->position)==0xff){ - f->payload->position++; - break; - } - - manifest_length=ob_get_ui16(f->payload); - if (manifest_length==0) continue; - - unsigned char *data = ob_get_bytes_ptr(f->payload, manifest_length); - if (!data) { - assert(inet_ntop(AF_INET, &httpaddr.sin_addr, httpaddrtxt, sizeof(httpaddrtxt)) != NULL); - WHYF("Illegal manifest length field in rhizome advertisement frame %d vs %d.", - manifest_length, f->payload->sizeLimit - f->payload->position); - break; - } - - /* Read manifest without verifying signatures (which would waste lots of - energy, everytime we see a manifest that we already have). - In fact, it would be better here to do a really rough and ready parser - to get the id and version fields out, and avoid the memory copies that - otherwise happen. - But we do need to make sure that at least one signature is there. - */ - m = rhizome_new_manifest(); - if (!m) { - WHY("Out of manifests"); - sqlite_set_tracefunc(oldfunc); - RETURN(0); - } - - if (rhizome_read_manifest_file(m, (char *)data, manifest_length) == -1) { - WHY("Error importing manifest body"); - rhizome_manifest_free(m); - sqlite_set_tracefunc(oldfunc); - RETURN(0); - } - - char manifest_id_prefix[RHIZOME_MANIFEST_ID_STRLEN + 1]; - if (rhizome_manifest_get(m, "id", manifest_id_prefix, sizeof manifest_id_prefix) == NULL) { - WHY("Manifest does not contain 'id' field"); - rhizome_manifest_free(m); - sqlite_set_tracefunc(oldfunc); - RETURN(0); - } - /* trim manifest ID to a prefix for ease of debugging - (that is the only use of this */ - manifest_id_prefix[8]=0; - long long version = rhizome_manifest_get_ll(m, "version"); - if (config.debug.rhizome_ads) - DEBUGF("manifest id=%s* version=%lld", manifest_id_prefix, version); - - /* Crude signature presence test */ - for(i=m->manifest_all_bytes-1;i>0;i--) - if (!m->manifestdata[i]) { - /* A null in the middle says we have a signature */ - break; - } - if (!i) { - /* ignore the announcement, but don't ignore other people - offering the same manifest */ - WARN("Ignoring manifest announcment with no signature"); - rhizome_manifest_free(m); - sqlite_set_tracefunc(oldfunc); - RETURN(0); - } - - if (rhizome_ignore_manifest_check(m, &httpaddr,f->source->sid)) - { - /* Ignoring manifest that has caused us problems recently */ - if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix); - } - else if (m->errors == 0) - { - /* Manifest is okay, so see if it is worth storing */ - if (rhizome_manifest_version_cache_lookup(m)) { - /* We already have this version or newer */ - if (config.debug.rhizome_ads) - DEBUG("We already have that manifest or newer."); - } else { - if (config.debug.rhizome_ads) - DEBUG("Not seen before."); - rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid); - // the above function will free the manifest structure, make sure we don't free it again - m=NULL; - } - } - else - { - if (config.debug.rhizome_ads) - DEBUG("Unverified manifest has errors - so not processing any further."); - /* Don't waste any time on this manifest in future attempts for at least - a minute. */ - rhizome_queue_ignore_manifest(m, &httpaddr,f->source->sid, 60000); - } - if (m) { - rhizome_manifest_free(m); - m = NULL; - } + if (ad_frame_type & 2){ + httpaddr.sin_port = htons(ob_get_ui16(f->payload)); + } + + if (ad_frame_type & 1){ + /* Extract whole manifests */ + while(f->payload->position < f->payload->sizeLimit) { + if (ob_getbyte(f->payload, f->payload->position)==0xff){ + f->payload->position++; + break; } + + manifest_length=ob_get_ui16(f->payload); + if (manifest_length==0) continue; + + unsigned char *data = ob_get_bytes_ptr(f->payload, manifest_length); + if (!data) { + assert(inet_ntop(AF_INET, &httpaddr.sin_addr, httpaddrtxt, sizeof(httpaddrtxt)) != NULL); + WHYF("Illegal manifest length field in rhizome advertisement frame %d vs %d.", + manifest_length, f->payload->sizeLimit - f->payload->position); + break; + } + + /* Read manifest without verifying signatures (which would waste lots of + energy, everytime we see a manifest that we already have). + In fact, it would be better here to do a really rough and ready parser + to get the id and version fields out, and avoid the memory copies that + otherwise happen. + But we do need to make sure that at least one signature is there. + */ + m = rhizome_new_manifest(); + if (!m) { + WHY("Out of manifests"); + sqlite_set_tracefunc(oldfunc); + RETURN(0); + } + + if (rhizome_read_manifest_file(m, (char *)data, manifest_length) == -1) { + WHY("Error importing manifest body"); + rhizome_manifest_free(m); + sqlite_set_tracefunc(oldfunc); + RETURN(0); + } + + char manifest_id_prefix[RHIZOME_MANIFEST_ID_STRLEN + 1]; + if (rhizome_manifest_get(m, "id", manifest_id_prefix, sizeof manifest_id_prefix) == NULL) { + WHY("Manifest does not contain 'id' field"); + rhizome_manifest_free(m); + sqlite_set_tracefunc(oldfunc); + RETURN(0); + } + /* trim manifest ID to a prefix for ease of debugging + (that is the only use of this */ + manifest_id_prefix[8]=0; + if (config.debug.rhizome_ads){ + long long version = rhizome_manifest_get_ll(m, "version"); + DEBUGF("manifest id=%s* version=%lld", manifest_id_prefix, version); + } + /* Crude signature presence test */ + for(i=m->manifest_all_bytes-1;i>0;i--) + if (!m->manifestdata[i]) { + /* A null in the middle says we have a signature */ + break; + } + if (!i) { + /* ignore the announcement, but don't ignore other people + offering the same manifest */ + WARN("Ignoring manifest announcment with no signature"); + rhizome_manifest_free(m); + sqlite_set_tracefunc(oldfunc); + RETURN(0); + } + + if (rhizome_ignore_manifest_check(m->cryptoSignPublic, + crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES)) + { + /* Ignoring manifest that has caused us problems recently */ + if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix); + } + else if (m->errors == 0) + { + /* Manifest is okay, so see if it is worth storing */ + if (rhizome_manifest_version_cache_lookup(m)) { + /* We already have this version or newer */ + if (config.debug.rhizome_ads) + DEBUG("We already have that manifest or newer."); + } else { + if (config.debug.rhizome_ads) + DEBUG("Not seen before."); + rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid); + // the above function will free the manifest structure, make sure we don't free it again + m=NULL; + } + } + else + { + if (config.debug.rhizome_ads) + DEBUG("Unverified manifest has errors - so not processing any further."); + /* Don't waste any time on this manifest in future attempts for at least + a minute. */ + rhizome_queue_ignore_manifest(m->cryptoSignPublic, + crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000); + } + if (m) { + rhizome_manifest_free(m); + m = NULL; + } + } + } + + overlay_mdp_frame mdp; + + bzero(&mdp,sizeof(mdp)); + mdp.out.payload_length=0; + + // parse BAR's + while(ob_remaining(f->payload)>0){ + unsigned char *bar=ob_get_bytes_ptr(f->payload, RHIZOME_BAR_BYTES); + if (!bar){ + WARNF("Expected whole BAR @%d (only %d remains)", ob_position(f->payload), ob_remaining(f->payload)); + dump("buffer", ob_ptr(f->payload), ob_limit(f->payload)); break; } + if (rhizome_is_bar_interesting(bar)==1){ + // add a request for the manifest + if (mdp.out.payload_length==0){ + bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); + mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + bcopy(f->source->sid,mdp.out.dst.sid,SID_SIZE); + mdp.out.dst.port=MDP_PORT_RHIZOME_MANIFEST_REQUEST; + if (f->source->reachable&REACHABLE_DIRECT) + mdp.out.ttl=1; + else + mdp.out.ttl=64; + mdp.packetTypeAndFlags=MDP_TX; + + mdp.out.queue=OQ_ORDINARY; + } + DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bar, RHIZOME_BAR_BYTES)); + bcopy(bar, &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES); + mdp.out.payload_length+=RHIZOME_BAR_BYTES; + } + } + + if (mdp.out.payload_length>0) + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + sqlite_set_tracefunc(oldfunc); RETURN(0); } diff --git a/serval.h b/serval.h index 09187411..e04b47c6 100644 --- a/serval.h +++ b/serval.h @@ -491,7 +491,7 @@ int overlay_frame_build_header(struct decode_context *context, struct overlay_bu struct broadcast *broadcast, struct subscriber *next_hop, struct subscriber *destination, struct subscriber *source); int overlay_interface_args(const char *arg); -int overlay_rhizome_add_advertisements(struct decode_context *context, int interface_number, struct overlay_buffer *e); +void overlay_rhizome_advertise(struct sched_ent *alarm); int overlay_add_local_identity(unsigned char *s); extern int overlay_interface_count;