From ea2e55c62cac7a1f7b0b6d37ac98d8d5c47606aa Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Mon, 15 Jul 2013 17:08:20 +0930 Subject: [PATCH] Simplify state code for swapping to MDP transfer --- rhizome_fetch.c | 259 ++++-------------------------------------------- 1 file changed, 22 insertions(+), 237 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 3744cde5..889f963c 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -89,7 +89,6 @@ struct rhizome_fetch_slot { static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot); static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot); -static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot); /* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size * is less than a given threshold. @@ -303,57 +302,8 @@ int rhizome_any_fetch_queued() return 0; } -/* As defined below uses 64KB */ -#define RHIZOME_VERSION_CACHE_NYBLS 2 /* 256=2^8=2nybls */ -#define RHIZOME_VERSION_CACHE_SHIFT 1 -#define RHIZOME_VERSION_CACHE_SIZE 128 -#define RHIZOME_VERSION_CACHE_ASSOCIATIVITY 16 - -struct rhizome_manifest_version_cache_slot { - unsigned char idprefix[24]; - int64_t version; -}; - -struct rhizome_manifest_version_cache_slot rhizome_manifest_version_cache[RHIZOME_VERSION_CACHE_SIZE][RHIZOME_VERSION_CACHE_ASSOCIATIVITY]; - -int rhizome_manifest_version_cache_store(rhizome_manifest *m) -{ - int bin=0; - int slot; - int i; - - char *id=rhizome_manifest_get(m,"id",NULL,0); - if (!id) return 1; // dodgy manifest, so don't suggest that we want to RX it. - - /* Work out bin number in cache */ - for(i=0;i>RHIZOME_VERSION_CACHE_SHIFT; - - slot=random()%RHIZOME_VERSION_CACHE_ASSOCIATIVITY; - struct rhizome_manifest_version_cache_slot *entry - =&rhizome_manifest_version_cache[bin][slot]; - unsigned long long manifest_version = rhizome_manifest_get_ll(m,"version"); - - entry->version=manifest_version; - for(i=0;i<24;i++) - { - int byte=(hexvalue(id[(i*2)])<<4)|hexvalue(id[(i*2)+1]); - entry->idprefix[i]=byte; - } - - return 0; -} - int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) { - int bin=0; - int slot; - int i; - char id[RHIZOME_MANIFEST_ID_STRLEN + 1]; if (!rhizome_manifest_get(m, "id", id, sizeof id)) // dodgy manifest, we don't want to receive it @@ -361,9 +311,6 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) str_toupper_inplace(id); m->version = rhizome_manifest_get_ll(m, "version"); - // TODO, work out why the cache was failing and fix it, then prove that it is faster than accessing the database. - - // skip the cache for now int64_t dbVersion = -1; if (sqlite_exec_int64(&dbVersion, "SELECT version FROM MANIFESTS WHERE id='%s';", id) == -1) return WHY("Select failure"); @@ -372,112 +319,6 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) return -1; } return 0; - - /* Work out bin number in cache */ - for(i=0;i>RHIZOME_VERSION_CACHE_SHIFT; - - for(slot=0;slotidprefix[i]) break; - } - if (i==24) { - /* Entries match -- so check version */ - int64_t rev = rhizome_manifest_get_ll(m,"version"); - if (1) DEBUGF("cached version %"PRId64" vs manifest version %"PRId64, entry->version,rev); - if (rev > entry->version) { - /* If we only have an old version, try refreshing the cache - by querying the database */ - if (sqlite_exec_int64(&entry->version, "select version from manifests where id='%s'", id) != 1) - return WHY("failed to select stored manifest version"); - DEBUGF("Refreshed stored version from database: entry->version=%"PRId64, entry->version); - } - if (rev < entry->version) { - /* the presented manifest is older than we have. - This allows the caller to know that they can tell whoever gave them the - manifest it's time to get with the times. May or not ever be - implemented, but it would be nice. XXX */ - WHYF("cached version is NEWER than presented version (%"PRId64" is newer than %"PRId64")", - entry->version,rev); - return -2; - } else if (rev<=entry->version) { - /* the presented manifest is already stored. */ - if (1) DEBUG("cached version is NEWER/SAME as presented version"); - return -1; - } else { - /* the presented manifest is newer than we have */ - DEBUG("cached version is older than presented version"); - return 0; - } - } - } - - DEBUG("Not in manifest cache"); - - /* Not in cache, so all is well, well, maybe. - What we do know is that it is unlikely to be in the database, so it probably - doesn't hurt to try to receive it. - - Of course, we can just ask the database if it is there already, and populate - the cache in the process if we find it. The tradeoff is that the whole point - of the cache is to AVOID database lookups, not incurr them whenever the cache - has a negative result. But if we don't ask the database, then we can waste - more effort fetching the file associated with the manifest, and will ultimately - incurr a database lookup (and more), so while it seems a little false economy - we need to do the lookup now. - - What this all suggests is that we need fairly high associativity so that misses - are rare events. But high associativity then introduces a linear search cost, - although that is unlikely to be nearly as much cost as even thinking about a - database query. - - It also says that on a busy network that things will eventually go pear-shaped - and require regular database queries, and that memory allowing, we should use - a fairly large cache here. - */ - int64_t manifest_version = rhizome_manifest_get_ll(m, "version"); - int64_t count; - switch (sqlite_exec_int64(&count, "select count(*) from manifests where id='%s' and version>=%lld", id, manifest_version)) { - case -1: - return WHY("database error reading stored manifest version"); - case 1: - if (count) { - /* Okay, we have a stored version which is newer, so update the cache - using a random replacement strategy. */ - int64_t stored_version; - if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'", id) < 1) - return WHY("database error reading stored manifest version"); // database is broken, we can't confirm that it is here - DEBUGF("stored version=%"PRId64", manifest_version=%"PRId64" (not fetching; remembering in cache)", - stored_version,manifest_version); - slot=random()%RHIZOME_VERSION_CACHE_ASSOCIATIVITY; - struct rhizome_manifest_version_cache_slot *entry = &rhizome_manifest_version_cache[bin][slot]; - entry->version=stored_version; - for(i=0;i<24;i++) - { - int byte=(hexvalue(id[(i*2)])<<4)|hexvalue(id[(i*2)+1]); - entry->idprefix[i]=byte; - } - /* Finally, say that it isn't worth RXing this manifest */ - return stored_version > manifest_version ? -2 : -1; - } - break; - default: - return WHY("bad select result"); - } - /* At best we hold an older version of this manifest, and at worst we - don't hold any copy. */ - return 0; } typedef struct ignored_manifest { @@ -581,6 +422,9 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) slot->request_ofs = 0; slot->state = RHIZOME_FETCH_CONNECTING; + slot->alarm.function = rhizome_fetch_poll; + fetch_stats.name = "rhizome_fetch_poll"; + slot->alarm.stats = &fetch_stats; if (slot->peer_ipandport.sin_family == AF_INET && slot->peer_ipandport.sin_port) { /* Transfer via HTTP over IPv4 */ @@ -616,9 +460,6 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) ); slot->alarm.poll.fd = sock; /* Watch for activity on the socket */ - slot->alarm.function = rhizome_fetch_poll; - fetch_stats.name = "rhizome_fetch_poll"; - slot->alarm.stats = &fetch_stats; slot->alarm.poll.events = POLLIN|POLLOUT; watch(&slot->alarm); /* And schedule a timeout alarm */ @@ -951,7 +792,6 @@ int rhizome_fetch_has_queue_space(unsigned char log2_size){ * @author Andrew Bettison */ struct profile_total rsnqf_stats={.name="rhizome_start_next_queued_fetches"}; -struct profile_total rfmsc_stats={.name="rhizome_fetch_mdp_slot_callback"}; int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]) { @@ -1104,7 +944,6 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) close(slot->alarm.poll.fd); } slot->alarm.poll.fd = -1; - slot->alarm.function=NULL; /* Free ephemeral data */ if (slot->manifest) @@ -1129,13 +968,6 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) IN(); struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; - if (slot->state!=5) { - DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring"); - unschedule(alarm); - OUT(); - return; - } - long long now=gettime_ms(); if (now-slot->last_write_time>slot->mdpIdleTimeout) { DEBUGF("MDP connection timed out: last RX %lldms ago (read %"PRId64" of %"PRId64" bytes)", @@ -1148,10 +980,7 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) if (config.debug.rhizome_rx) DEBUGF("Timeout: Resending request for slot=0x%p (%"PRId64" of %"PRId64" received)", slot,slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); - if (slot->bidP) - rhizome_fetch_mdp_requestblocks(slot); - else - rhizome_fetch_mdp_requestmanifest(slot); + rhizome_fetch_mdp_requestblocks(slot); OUT(); } @@ -1165,8 +994,6 @@ static int rhizome_fetch_mdp_touch_timeout(struct rhizome_fetch_slot *slot) // For now, we will just make the timeout 1 second from the time of the last // received block. unschedule(&slot->alarm); - slot->alarm.stats=&rfmsc_stats; - slot->alarm.function = rhizome_fetch_mdp_slot_callback; slot->alarm.alarm=gettime_ms()+1000; slot->alarm.deadline=slot->alarm.alarm+500; schedule(&slot->alarm); @@ -1218,46 +1045,6 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) OUT(); } -static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) -{ - if (slot->prefix_length<1||slot->prefix_length>32) { - // invalid request - WARNF("invalid MDP Rhizome request"); - return rhizome_fetch_close(slot); - } - - if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) { - // connection timed out - DEBUGF("MDP connection timedout"); - return rhizome_fetch_close(slot); - } - - overlay_mdp_frame mdp; - - bzero(&mdp,sizeof(mdp)); - assert(my_subscriber); - assert(my_subscriber->sid); - bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); - mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; - bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); - mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; - mdp.out.ttl=1; - mdp.packetTypeAndFlags=MDP_TX; - - mdp.out.queue=OQ_ORDINARY; - mdp.out.payload_length=slot->prefix_length; - bcopy(slot->prefix,&mdp.out.payload[0],slot->prefix_length); - - overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); - - slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=gettime_ms()+100; - slot->alarm.deadline=slot->alarm.alarm+500; - schedule(&slot->alarm); - - return 0; -} - static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { /* In Rhizome Direct we use the same fetch slot system, but we aren't actually @@ -1296,7 +1083,6 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) slot->state=RHIZOME_FETCH_RXFILEMDP; slot->last_write_time=gettime_ms(); - if (slot->bidP) { /* We are requesting a file. The http request may have already received some of the file, so take that into account when setting up ring buffer. Then send the request for the next block of data, and set our alarm to @@ -1309,17 +1095,10 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) down too much. Much careful thought is required to optimise this transport. */ - slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds - slot->mdpRXBitmap=0x00000000; // no blocks received yet - slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size - rhizome_fetch_mdp_requestblocks(slot); - } else { - /* We are requesting a manifest, which is stateless, except that we eventually - give up. All we need to do now is send the request, and set our alarm to - try again in case we haven't heard anything back. */ - slot->mdpIdleTimeout=config.rhizome.idle_timeout; - rhizome_fetch_mdp_requestmanifest(slot); - } + slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds + slot->mdpRXBitmap=0x00000000; // no blocks received yet + slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size + rhizome_fetch_mdp_requestblocks(slot); RETURN(0); OUT(); @@ -1533,8 +1312,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) unschedule(&slot->alarm); slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout; slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout; - slot->alarm.function = rhizome_fetch_poll; - schedule(&slot->alarm); + schedule(&slot->alarm); return; } else { if (config.debug.rhizome_rx) @@ -1602,7 +1380,6 @@ void rhizome_fetch_poll(struct sched_ent *alarm) unschedule(&slot->alarm); slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout; slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout; - slot->alarm.function = rhizome_fetch_poll; schedule(&slot->alarm); return; @@ -1617,12 +1394,20 @@ void rhizome_fetch_poll(struct sched_ent *alarm) } } } + if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){ - // timeout or socket error, close the socket - if (config.debug.rhizome_rx) - DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR); - if (slot->state!=RHIZOME_FETCH_FREE) - rhizome_fetch_close(slot); + switch (slot->state){ + case RHIZOME_FETCH_RXFILEMDP: + rhizome_fetch_mdp_slot_callback(alarm); + break; + + default: + // timeout or socket error, close the socket + if (config.debug.rhizome_rx) + DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR); + if (slot->state!=RHIZOME_FETCH_FREE) + rhizome_fetch_close(slot); + } } }