From bf7d0d5b16ecd75233a9e090c932e5837fcd1ccb Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Fri, 16 Aug 2013 14:57:28 +0930 Subject: [PATCH] Cache rhizome read state when serving content via MDP --- overlay_mdp_services.c | 161 +++++++++++++++++------------------------ rhizome.h | 3 + rhizome_database.c | 2 + rhizome_store.c | 132 +++++++++++++++++++++++++++++++++ server.c | 3 + 5 files changed, 205 insertions(+), 96 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 29549d19..960f4fb5 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -29,111 +29,84 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "crypto.h" #include "log.h" - - int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t version, uint64_t fileOffset, uint32_t bitmap, uint16_t blockLength) { IN(); if (blockLength>1024) RETURN(-1); - char *id_str = alloca_tohex_bid(id); - if (config.debug.rhizome_tx) - DEBUGF("Requested blocks for %s @%"PRIx64, id_str, fileOffset); - - /* Find manifest that corresponds to BID and version. - If we don't have this combination, then do nothing. - If we do have the combination, then find the associated file, - and open the blob so that we can send some of it. - - TODO: If we have a newer version of the manifest, and the manifest is a - journal, then the newer version is okay to use to service this request. - */ - - char filehash[SHA512_DIGEST_STRING_LENGTH]; - if (rhizome_database_filehash_from_id(id_str, version, filehash)<=0) - RETURN(-1); - - struct rhizome_read read; - bzero(&read, sizeof read); - - int ret=rhizome_open_read(&read, filehash, 0); - - if (!ret){ - overlay_mdp_frame reply; - bzero(&reply,sizeof(reply)); - // Reply is broadcast, so we cannot authcrypt, and signing is too time consuming - // for low devices. The result is that an attacker can prevent rhizome transfers - // if they want to by injecting fake blocks. The alternative is to not broadcast - // back replies, and then we can authcrypt. - // multiple receivers starting at different times, we really need merkle-tree hashing. - // so multiple receivers is not realistic for now. So use non-broadcast unicode - // for now would seem the safest. But that would stop us from allowing multiple - // receivers in the special case where additional nodes begin listening in from the - // beginning. - reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN; - bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE); - reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + DEBUGF("Requested blocks for %s @%"PRIx64, alloca_tohex_bid(id), fileOffset); - if (dest && dest->reachable&REACHABLE_UNICAST){ - // if we get a request from a peer that we can only talk to via unicast, send data via unicast too. - bcopy(dest->sid, reply.out.dst.sid, SID_SIZE); - }else{ - // send replies to broadcast so that others can hear blocks and record them - // (not that preemptive listening is implemented yet). - memset(reply.out.dst.sid,0xff,SID_SIZE); - reply.out.ttl=1; - } - - reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE; - reply.out.queue=OQ_OPPORTUNISTIC; - reply.out.payload[0]='B'; // reply contains blocks - // include 16 bytes of BID prefix for identification - bcopy(id, &reply.out.payload[1], 16); - // and version of manifest - bcopy(&version, &reply.out.payload[1+16], sizeof(uint64_t)); - - int i; - for(i=0;i<32;i++){ - if (bitmap&(1<<(31-i))) - continue; - - if (overlay_queue_remaining(reply.out.queue) < 10) - break; - - // calculate and set offset of block - read.offset = fileOffset+i*blockLength; - - // stop if we passed the length of the file - // (but we may not know the file length until we attempt a read) - if (read.length!=-1 && read.offset>read.length) - break; - - write_uint64(&reply.out.payload[1+16+8], read.offset); - - int bytes_read = rhizome_read(&read, &reply.out.payload[1+16+8+8], blockLength); - if (bytes_read<=0) - break; - - reply.out.payload_length=1+16+8+8+bytes_read; - - // Mark the last block of the file, if required - if (read.offset >= read.length) - reply.out.payload[0]='T'; - - // send packet - if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0)) - break; - } + overlay_mdp_frame reply; + bzero(&reply,sizeof(reply)); + // Reply is broadcast, so we cannot authcrypt, and signing is too time consuming + // for low devices. The result is that an attacker can prevent rhizome transfers + // if they want to by injecting fake blocks. The alternative is to not broadcast + // back replies, and then we can authcrypt. + // multiple receivers starting at different times, we really need merkle-tree hashing. + // so multiple receivers is not realistic for now. So use non-broadcast unicode + // for now would seem the safest. But that would stop us from allowing multiple + // receivers in the special case where additional nodes begin listening in from the + // beginning. + reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN; + bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE); + reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + + if (dest && dest->reachable&REACHABLE_UNICAST){ + // if we get a request from a peer that we can only talk to via unicast, send data via unicast too. + bcopy(dest->sid, reply.out.dst.sid, SID_SIZE); + }else{ + // send replies to broadcast so that others can hear blocks and record them + // (not that preemptive listening is implemented yet). + memset(reply.out.dst.sid,0xff,SID_SIZE); + reply.out.ttl=1; + } + + reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE; + reply.out.queue=OQ_OPPORTUNISTIC; + reply.out.payload[0]='B'; // reply contains blocks + // include 16 bytes of BID prefix for identification + bcopy(id, &reply.out.payload[1], 16); + // and version of manifest + bcopy(&version, &reply.out.payload[1+16], sizeof(uint64_t)); + + int i; + for(i=0;i<32;i++){ + if (bitmap&(1<<(31-i))) + continue; + + if (overlay_queue_remaining(reply.out.queue) < 10) + break; + + // calculate and set offset of block + uint64_t offset = fileOffset+i*blockLength; + + write_uint64(&reply.out.payload[1+16+8], offset); + + int bytes_read = rhizome_read_cached(id, version, gettime_ms()+2000, offset, &reply.out.payload[1+16+8+8], blockLength); + if (bytes_read<=0) + break; + + reply.out.payload_length=1+16+8+8+bytes_read; + + // Mark the last block of the file, if required + if (bytes_read < blockLength) + reply.out.payload[0]='T'; + + // send packet + if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0)) + break; } - rhizome_read_close(&read); - RETURN(ret); + RETURN(0); OUT(); } int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) { + if (!is_rhizome_mdp_server_running()) + return -1; + uint64_t version= read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]); uint64_t fileOffset= @@ -405,11 +378,7 @@ int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_fr case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(frame, mdp)); case MDP_PORT_STUNREQ: RETURN(overlay_mdp_service_stun_req(mdp)); case MDP_PORT_STUN: RETURN(overlay_mdp_service_stun(mdp)); - case MDP_PORT_RHIZOME_REQUEST: - if (is_rhizome_mdp_server_running()) { - RETURN(overlay_mdp_service_rhizomerequest(mdp)); - } - break; + case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(mdp)); case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp)); case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_response(mdp)); case MDP_PORT_RHIZOME_SYNC: RETURN(overlay_mdp_service_rhizome_sync(frame, mdp)); diff --git a/rhizome.h b/rhizome.h index 1945e43f..33af3593 100644 --- a/rhizome.h +++ b/rhizome.h @@ -725,6 +725,9 @@ int rhizome_store_delete(const char *id); int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhizome_read *read_state, int hash); int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t *bsk); int rhizome_dump_file(const char *id, const char *filepath, int64_t *length); +int rhizome_read_cached(unsigned char *bundle_id, uint64_t version, time_ms_t timeout, + uint64_t fileOffset, unsigned char *buffer, int length); +int rhizome_cache_close(); int rhizome_database_filehash_from_id(const char *id, uint64_t version, char hash[SHA512_DIGEST_STRING_LENGTH]); diff --git a/rhizome_database.c b/rhizome_database.c index 4e4ec7a3..c5506c22 100644 --- a/rhizome_database.c +++ b/rhizome_database.c @@ -288,6 +288,8 @@ int rhizome_close_db() { IN(); if (rhizome_db) { + rhizome_cache_close(); + if (!sqlite3_get_autocommit(rhizome_db)){ WHY("Uncommitted transaction!"); sqlite_exec_void("ROLLBACK;"); diff --git a/rhizome_store.c b/rhizome_store.c index f398579b..99a1c79a 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -831,6 +831,138 @@ int rhizome_read_close(struct rhizome_read *read) return 0; } +struct cache_entry{ + struct cache_entry *_left; + struct cache_entry *_right; + unsigned char bundle_id[RHIZOME_MANIFEST_ID_BYTES]; + uint64_t version; + struct rhizome_read read_state; + time_ms_t expires; +}; +struct cache_entry *root; + +static struct cache_entry ** find_entry_location(struct cache_entry **ptr, unsigned char *bundle_id, uint64_t version) +{ + while(*ptr){ + struct cache_entry *entry = *ptr; + int cmp = memcmp(bundle_id, entry->bundle_id, sizeof entry->bundle_id); + if (cmp==0){ + if (entry->version==version) + break; + if (version < entry->version) + ptr = &entry->_left; + else + ptr = &entry->_right; + continue; + } + if (cmp<0) + ptr = &entry->_left; + else + ptr = &entry->_right; + } + return ptr; +} + +static time_ms_t close_entries(struct cache_entry **entry, time_ms_t timeout) +{ + if (!*entry) + return 0; + + time_ms_t ret = close_entries(&(*entry)->_left, timeout); + time_ms_t t_right = close_entries(&(*entry)->_right, timeout); + if (t_right!=0 && (t_right < ret || ret==0)) + ret=t_right; + + if ((*entry)->expires < timeout || timeout==0){ + rhizome_read_close(&(*entry)->read_state); + // remember the two children + struct cache_entry *left=(*entry)->_left; + struct cache_entry *right=(*entry)->_right; + // free this entry + free(*entry); + // re-add both children to the tree + *entry=left; + if (right){ + entry = find_entry_location(entry, right->bundle_id, right->version); + *entry=right; + } + }else{ + if ((*entry)->expires < ret || ret==0) + ret=(*entry)->expires; + } + return ret; +} + +// close any expired cache entries +static void rhizome_cache_alarm(struct sched_ent *alarm) +{ + alarm->alarm = close_entries(&root, gettime_ms()); + if (alarm->alarm){ + alarm->deadline = alarm->alarm + 1000; + schedule(alarm); + } +} + +static struct profile_total cache_alarm_stats={ + .name="rhizome_cache_alarm", +}; +static struct sched_ent cache_alarm={ + .function = rhizome_cache_alarm, + .stats = &cache_alarm_stats, +}; + +// close all cache entries +int rhizome_cache_close() +{ + close_entries(&root, 0); + unschedule(&cache_alarm); + return 0; +} + +// read a block of data, caching meta data for reuse +int rhizome_read_cached(unsigned char *bundle_id, uint64_t version, time_ms_t timeout, + uint64_t fileOffset, unsigned char *buffer, int length) +{ + // look for a cached entry + struct cache_entry **ptr = find_entry_location(&root, bundle_id, version); + struct cache_entry *entry = *ptr; + + // if we don't have one yet, create one and open it + if (!entry){ + char *id_str = alloca_tohex_bid(bundle_id); + + char filehash[SHA512_DIGEST_STRING_LENGTH]; + if (rhizome_database_filehash_from_id(id_str, version, filehash)<=0) + return -1; + + entry = emalloc_zero(sizeof(struct cache_entry)); + + if (rhizome_open_read(&entry->read_state, filehash, 0)){ + free(entry); + return -1; + } + bcopy(bundle_id, entry->bundle_id, sizeof(entry->bundle_id)); + entry->version = version; + *ptr = entry; + } + + entry->read_state.offset = fileOffset; + if (entry->read_state.length !=-1 && fileOffset >= entry->read_state.length) + return 0; + + if (entry->expires < timeout){ + entry->expires = timeout; + + if (!cache_alarm.alarm){ + cache_alarm.alarm = timeout; + cache_alarm.deadline = timeout + 1000; + schedule(&cache_alarm); + } + } + + return rhizome_read(&entry->read_state, buffer, length); +} + /* Returns -1 on error, 0 on success. */ static int write_file(struct rhizome_read *read, const char *filepath){ diff --git a/server.c b/server.c index 31e52110..1fb28db2 100644 --- a/server.c +++ b/server.c @@ -245,6 +245,9 @@ void serverCleanUp() if (FORM_SERVAL_INSTANCE_PATH(filename, "mdp.socket")) { unlink(filename); } + + rhizome_close_db(); + dna_helper_shutdown(); }