Cache rhizome read state when serving content via MDP

This commit is contained in:
Jeremy Lakeman 2013-08-16 14:57:28 +09:30
parent 771cb4151b
commit bf7d0d5b16
5 changed files with 205 additions and 96 deletions

View File

@ -29,111 +29,84 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "crypto.h"
#include "log.h"
int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t version, uint64_t fileOffset, uint32_t bitmap, uint16_t blockLength)
{
IN();
if (blockLength>1024) RETURN(-1);
char *id_str = alloca_tohex_bid(id);
if (config.debug.rhizome_tx)
DEBUGF("Requested blocks for %s @%"PRIx64, id_str, fileOffset);
/* Find manifest that corresponds to BID and version.
If we don't have this combination, then do nothing.
If we do have the combination, then find the associated file,
and open the blob so that we can send some of it.
TODO: If we have a newer version of the manifest, and the manifest is a
journal, then the newer version is okay to use to service this request.
*/
char filehash[SHA512_DIGEST_STRING_LENGTH];
if (rhizome_database_filehash_from_id(id_str, version, filehash)<=0)
RETURN(-1);
struct rhizome_read read;
bzero(&read, sizeof read);
int ret=rhizome_open_read(&read, filehash, 0);
if (!ret){
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
// Reply is broadcast, so we cannot authcrypt, and signing is too time consuming
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
// back replies, and then we can authcrypt.
// multiple receivers starting at different times, we really need merkle-tree hashing.
// so multiple receivers is not realistic for now. So use non-broadcast unicode
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
DEBUGF("Requested blocks for %s @%"PRIx64, alloca_tohex_bid(id), fileOffset);
if (dest && dest->reachable&REACHABLE_UNICAST){
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}else{
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
memset(reply.out.dst.sid,0xff,SID_SIZE);
reply.out.ttl=1;
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
reply.out.queue=OQ_OPPORTUNISTIC;
reply.out.payload[0]='B'; // reply contains blocks
// include 16 bytes of BID prefix for identification
bcopy(id, &reply.out.payload[1], 16);
// and version of manifest
bcopy(&version, &reply.out.payload[1+16], sizeof(uint64_t));
int i;
for(i=0;i<32;i++){
if (bitmap&(1<<(31-i)))
continue;
if (overlay_queue_remaining(reply.out.queue) < 10)
break;
// calculate and set offset of block
read.offset = fileOffset+i*blockLength;
// stop if we passed the length of the file
// (but we may not know the file length until we attempt a read)
if (read.length!=-1 && read.offset>read.length)
break;
write_uint64(&reply.out.payload[1+16+8], read.offset);
int bytes_read = rhizome_read(&read, &reply.out.payload[1+16+8+8], blockLength);
if (bytes_read<=0)
break;
reply.out.payload_length=1+16+8+8+bytes_read;
// Mark the last block of the file, if required
if (read.offset >= read.length)
reply.out.payload[0]='T';
// send packet
if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0))
break;
}
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
// Reply is broadcast, so we cannot authcrypt, and signing is too time consuming
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
// back replies, and then we can authcrypt.
// multiple receivers starting at different times, we really need merkle-tree hashing.
// so multiple receivers is not realistic for now. So use non-broadcast unicode
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
if (dest && dest->reachable&REACHABLE_UNICAST){
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}else{
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
memset(reply.out.dst.sid,0xff,SID_SIZE);
reply.out.ttl=1;
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
reply.out.queue=OQ_OPPORTUNISTIC;
reply.out.payload[0]='B'; // reply contains blocks
// include 16 bytes of BID prefix for identification
bcopy(id, &reply.out.payload[1], 16);
// and version of manifest
bcopy(&version, &reply.out.payload[1+16], sizeof(uint64_t));
int i;
for(i=0;i<32;i++){
if (bitmap&(1<<(31-i)))
continue;
if (overlay_queue_remaining(reply.out.queue) < 10)
break;
// calculate and set offset of block
uint64_t offset = fileOffset+i*blockLength;
write_uint64(&reply.out.payload[1+16+8], offset);
int bytes_read = rhizome_read_cached(id, version, gettime_ms()+2000, offset, &reply.out.payload[1+16+8+8], blockLength);
if (bytes_read<=0)
break;
reply.out.payload_length=1+16+8+8+bytes_read;
// Mark the last block of the file, if required
if (bytes_read < blockLength)
reply.out.payload[0]='T';
// send packet
if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0))
break;
}
rhizome_read_close(&read);
RETURN(ret);
RETURN(0);
OUT();
}
int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
{
if (!is_rhizome_mdp_server_running())
return -1;
uint64_t version=
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]);
uint64_t fileOffset=
@ -405,11 +378,7 @@ int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_fr
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(frame, mdp));
case MDP_PORT_STUNREQ: RETURN(overlay_mdp_service_stun_req(mdp));
case MDP_PORT_STUN: RETURN(overlay_mdp_service_stun(mdp));
case MDP_PORT_RHIZOME_REQUEST:
if (is_rhizome_mdp_server_running()) {
RETURN(overlay_mdp_service_rhizomerequest(mdp));
}
break;
case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(mdp));
case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp));
case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_response(mdp));
case MDP_PORT_RHIZOME_SYNC: RETURN(overlay_mdp_service_rhizome_sync(frame, mdp));

View File

@ -725,6 +725,9 @@ int rhizome_store_delete(const char *id);
int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhizome_read *read_state, int hash);
int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t *bsk);
int rhizome_dump_file(const char *id, const char *filepath, int64_t *length);
int rhizome_read_cached(unsigned char *bundle_id, uint64_t version, time_ms_t timeout,
uint64_t fileOffset, unsigned char *buffer, int length);
int rhizome_cache_close();
int rhizome_database_filehash_from_id(const char *id, uint64_t version, char hash[SHA512_DIGEST_STRING_LENGTH]);

View File

@ -288,6 +288,8 @@ int rhizome_close_db()
{
IN();
if (rhizome_db) {
rhizome_cache_close();
if (!sqlite3_get_autocommit(rhizome_db)){
WHY("Uncommitted transaction!");
sqlite_exec_void("ROLLBACK;");

View File

@ -831,6 +831,138 @@ int rhizome_read_close(struct rhizome_read *read)
return 0;
}
struct cache_entry{
struct cache_entry *_left;
struct cache_entry *_right;
unsigned char bundle_id[RHIZOME_MANIFEST_ID_BYTES];
uint64_t version;
struct rhizome_read read_state;
time_ms_t expires;
};
struct cache_entry *root;
static struct cache_entry ** find_entry_location(struct cache_entry **ptr, unsigned char *bundle_id, uint64_t version)
{
while(*ptr){
struct cache_entry *entry = *ptr;
int cmp = memcmp(bundle_id, entry->bundle_id, sizeof entry->bundle_id);
if (cmp==0){
if (entry->version==version)
break;
if (version < entry->version)
ptr = &entry->_left;
else
ptr = &entry->_right;
continue;
}
if (cmp<0)
ptr = &entry->_left;
else
ptr = &entry->_right;
}
return ptr;
}
static time_ms_t close_entries(struct cache_entry **entry, time_ms_t timeout)
{
if (!*entry)
return 0;
time_ms_t ret = close_entries(&(*entry)->_left, timeout);
time_ms_t t_right = close_entries(&(*entry)->_right, timeout);
if (t_right!=0 && (t_right < ret || ret==0))
ret=t_right;
if ((*entry)->expires < timeout || timeout==0){
rhizome_read_close(&(*entry)->read_state);
// remember the two children
struct cache_entry *left=(*entry)->_left;
struct cache_entry *right=(*entry)->_right;
// free this entry
free(*entry);
// re-add both children to the tree
*entry=left;
if (right){
entry = find_entry_location(entry, right->bundle_id, right->version);
*entry=right;
}
}else{
if ((*entry)->expires < ret || ret==0)
ret=(*entry)->expires;
}
return ret;
}
// close any expired cache entries
static void rhizome_cache_alarm(struct sched_ent *alarm)
{
alarm->alarm = close_entries(&root, gettime_ms());
if (alarm->alarm){
alarm->deadline = alarm->alarm + 1000;
schedule(alarm);
}
}
static struct profile_total cache_alarm_stats={
.name="rhizome_cache_alarm",
};
static struct sched_ent cache_alarm={
.function = rhizome_cache_alarm,
.stats = &cache_alarm_stats,
};
// close all cache entries
int rhizome_cache_close()
{
close_entries(&root, 0);
unschedule(&cache_alarm);
return 0;
}
// read a block of data, caching meta data for reuse
int rhizome_read_cached(unsigned char *bundle_id, uint64_t version, time_ms_t timeout,
uint64_t fileOffset, unsigned char *buffer, int length)
{
// look for a cached entry
struct cache_entry **ptr = find_entry_location(&root, bundle_id, version);
struct cache_entry *entry = *ptr;
// if we don't have one yet, create one and open it
if (!entry){
char *id_str = alloca_tohex_bid(bundle_id);
char filehash[SHA512_DIGEST_STRING_LENGTH];
if (rhizome_database_filehash_from_id(id_str, version, filehash)<=0)
return -1;
entry = emalloc_zero(sizeof(struct cache_entry));
if (rhizome_open_read(&entry->read_state, filehash, 0)){
free(entry);
return -1;
}
bcopy(bundle_id, entry->bundle_id, sizeof(entry->bundle_id));
entry->version = version;
*ptr = entry;
}
entry->read_state.offset = fileOffset;
if (entry->read_state.length !=-1 && fileOffset >= entry->read_state.length)
return 0;
if (entry->expires < timeout){
entry->expires = timeout;
if (!cache_alarm.alarm){
cache_alarm.alarm = timeout;
cache_alarm.deadline = timeout + 1000;
schedule(&cache_alarm);
}
}
return rhizome_read(&entry->read_state, buffer, length);
}
/* Returns -1 on error, 0 on success.
*/
static int write_file(struct rhizome_read *read, const char *filepath){

View File

@ -245,6 +245,9 @@ void serverCleanUp()
if (FORM_SERVAL_INSTANCE_PATH(filename, "mdp.socket")) {
unlink(filename);
}
rhizome_close_db();
dna_helper_shutdown();
}