Simplify state code for swapping to MDP transfer

This commit is contained in:
Jeremy Lakeman 2013-07-15 17:08:20 +09:30
parent a73814571d
commit ea2e55c62c

View File

@ -89,7 +89,6 @@ struct rhizome_fetch_slot {
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot);
static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot);
static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot);
/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size
* is less than a given threshold.
@ -303,57 +302,8 @@ int rhizome_any_fetch_queued()
return 0;
}
/* As defined below uses 64KB */
#define RHIZOME_VERSION_CACHE_NYBLS 2 /* 256=2^8=2nybls */
#define RHIZOME_VERSION_CACHE_SHIFT 1
#define RHIZOME_VERSION_CACHE_SIZE 128
#define RHIZOME_VERSION_CACHE_ASSOCIATIVITY 16
struct rhizome_manifest_version_cache_slot {
unsigned char idprefix[24];
int64_t version;
};
struct rhizome_manifest_version_cache_slot rhizome_manifest_version_cache[RHIZOME_VERSION_CACHE_SIZE][RHIZOME_VERSION_CACHE_ASSOCIATIVITY];
int rhizome_manifest_version_cache_store(rhizome_manifest *m)
{
int bin=0;
int slot;
int i;
char *id=rhizome_manifest_get(m,"id",NULL,0);
if (!id) return 1; // dodgy manifest, so don't suggest that we want to RX it.
/* Work out bin number in cache */
for(i=0;i<RHIZOME_VERSION_CACHE_NYBLS;i++)
{
int nybl=hexvalue(id[i]);
bin=(bin<<4)|nybl;
}
bin=bin>>RHIZOME_VERSION_CACHE_SHIFT;
slot=random()%RHIZOME_VERSION_CACHE_ASSOCIATIVITY;
struct rhizome_manifest_version_cache_slot *entry
=&rhizome_manifest_version_cache[bin][slot];
unsigned long long manifest_version = rhizome_manifest_get_ll(m,"version");
entry->version=manifest_version;
for(i=0;i<24;i++)
{
int byte=(hexvalue(id[(i*2)])<<4)|hexvalue(id[(i*2)+1]);
entry->idprefix[i]=byte;
}
return 0;
}
int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
{
int bin=0;
int slot;
int i;
char id[RHIZOME_MANIFEST_ID_STRLEN + 1];
if (!rhizome_manifest_get(m, "id", id, sizeof id))
// dodgy manifest, we don't want to receive it
@ -361,9 +311,6 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
str_toupper_inplace(id);
m->version = rhizome_manifest_get_ll(m, "version");
// TODO, work out why the cache was failing and fix it, then prove that it is faster than accessing the database.
// skip the cache for now
int64_t dbVersion = -1;
if (sqlite_exec_int64(&dbVersion, "SELECT version FROM MANIFESTS WHERE id='%s';", id) == -1)
return WHY("Select failure");
@ -372,112 +319,6 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
return -1;
}
return 0;
/* Work out bin number in cache */
for(i=0;i<RHIZOME_VERSION_CACHE_NYBLS;i++)
{
int nybl=hexvalue(id[i]);
bin=(bin<<4)|nybl;
}
bin=bin>>RHIZOME_VERSION_CACHE_SHIFT;
for(slot=0;slot<RHIZOME_VERSION_CACHE_ASSOCIATIVITY;slot++)
{
struct rhizome_manifest_version_cache_slot *entry
=&rhizome_manifest_version_cache[bin][slot];
for(i=0;i<24;i++)
{
int byte=
(hexvalue(id[(i*2)])<<4)
|hexvalue(id[(i*2)+1]);
if (byte!=entry->idprefix[i]) break;
}
if (i==24) {
/* Entries match -- so check version */
int64_t rev = rhizome_manifest_get_ll(m,"version");
if (1) DEBUGF("cached version %"PRId64" vs manifest version %"PRId64, entry->version,rev);
if (rev > entry->version) {
/* If we only have an old version, try refreshing the cache
by querying the database */
if (sqlite_exec_int64(&entry->version, "select version from manifests where id='%s'", id) != 1)
return WHY("failed to select stored manifest version");
DEBUGF("Refreshed stored version from database: entry->version=%"PRId64, entry->version);
}
if (rev < entry->version) {
/* the presented manifest is older than we have.
This allows the caller to know that they can tell whoever gave them the
manifest it's time to get with the times. May or not ever be
implemented, but it would be nice. XXX */
WHYF("cached version is NEWER than presented version (%"PRId64" is newer than %"PRId64")",
entry->version,rev);
return -2;
} else if (rev<=entry->version) {
/* the presented manifest is already stored. */
if (1) DEBUG("cached version is NEWER/SAME as presented version");
return -1;
} else {
/* the presented manifest is newer than we have */
DEBUG("cached version is older than presented version");
return 0;
}
}
}
DEBUG("Not in manifest cache");
/* Not in cache, so all is well, well, maybe.
What we do know is that it is unlikely to be in the database, so it probably
doesn't hurt to try to receive it.
Of course, we can just ask the database if it is there already, and populate
the cache in the process if we find it. The tradeoff is that the whole point
of the cache is to AVOID database lookups, not incurr them whenever the cache
has a negative result. But if we don't ask the database, then we can waste
more effort fetching the file associated with the manifest, and will ultimately
incurr a database lookup (and more), so while it seems a little false economy
we need to do the lookup now.
What this all suggests is that we need fairly high associativity so that misses
are rare events. But high associativity then introduces a linear search cost,
although that is unlikely to be nearly as much cost as even thinking about a
database query.
It also says that on a busy network that things will eventually go pear-shaped
and require regular database queries, and that memory allowing, we should use
a fairly large cache here.
*/
int64_t manifest_version = rhizome_manifest_get_ll(m, "version");
int64_t count;
switch (sqlite_exec_int64(&count, "select count(*) from manifests where id='%s' and version>=%lld", id, manifest_version)) {
case -1:
return WHY("database error reading stored manifest version");
case 1:
if (count) {
/* Okay, we have a stored version which is newer, so update the cache
using a random replacement strategy. */
int64_t stored_version;
if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'", id) < 1)
return WHY("database error reading stored manifest version"); // database is broken, we can't confirm that it is here
DEBUGF("stored version=%"PRId64", manifest_version=%"PRId64" (not fetching; remembering in cache)",
stored_version,manifest_version);
slot=random()%RHIZOME_VERSION_CACHE_ASSOCIATIVITY;
struct rhizome_manifest_version_cache_slot *entry = &rhizome_manifest_version_cache[bin][slot];
entry->version=stored_version;
for(i=0;i<24;i++)
{
int byte=(hexvalue(id[(i*2)])<<4)|hexvalue(id[(i*2)+1]);
entry->idprefix[i]=byte;
}
/* Finally, say that it isn't worth RXing this manifest */
return stored_version > manifest_version ? -2 : -1;
}
break;
default:
return WHY("bad select result");
}
/* At best we hold an older version of this manifest, and at worst we
don't hold any copy. */
return 0;
}
typedef struct ignored_manifest {
@ -581,6 +422,9 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->request_ofs = 0;
slot->state = RHIZOME_FETCH_CONNECTING;
slot->alarm.function = rhizome_fetch_poll;
fetch_stats.name = "rhizome_fetch_poll";
slot->alarm.stats = &fetch_stats;
if (slot->peer_ipandport.sin_family == AF_INET && slot->peer_ipandport.sin_port) {
/* Transfer via HTTP over IPv4 */
@ -616,9 +460,6 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
);
slot->alarm.poll.fd = sock;
/* Watch for activity on the socket */
slot->alarm.function = rhizome_fetch_poll;
fetch_stats.name = "rhizome_fetch_poll";
slot->alarm.stats = &fetch_stats;
slot->alarm.poll.events = POLLIN|POLLOUT;
watch(&slot->alarm);
/* And schedule a timeout alarm */
@ -951,7 +792,6 @@ int rhizome_fetch_has_queue_space(unsigned char log2_size){
* @author Andrew Bettison <andrew@servalproject.com>
*/
struct profile_total rsnqf_stats={.name="rhizome_start_next_queued_fetches"};
struct profile_total rfmsc_stats={.name="rhizome_fetch_mdp_slot_callback"};
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE])
{
@ -1104,7 +944,6 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
close(slot->alarm.poll.fd);
}
slot->alarm.poll.fd = -1;
slot->alarm.function=NULL;
/* Free ephemeral data */
if (slot->manifest)
@ -1129,13 +968,6 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
IN();
struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm;
if (slot->state!=5) {
DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring");
unschedule(alarm);
OUT();
return;
}
long long now=gettime_ms();
if (now-slot->last_write_time>slot->mdpIdleTimeout) {
DEBUGF("MDP connection timed out: last RX %lldms ago (read %"PRId64" of %"PRId64" bytes)",
@ -1148,10 +980,7 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
if (config.debug.rhizome_rx)
DEBUGF("Timeout: Resending request for slot=0x%p (%"PRId64" of %"PRId64" received)",
slot,slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length);
if (slot->bidP)
rhizome_fetch_mdp_requestblocks(slot);
else
rhizome_fetch_mdp_requestmanifest(slot);
rhizome_fetch_mdp_requestblocks(slot);
OUT();
}
@ -1165,8 +994,6 @@ static int rhizome_fetch_mdp_touch_timeout(struct rhizome_fetch_slot *slot)
// For now, we will just make the timeout 1 second from the time of the last
// received block.
unschedule(&slot->alarm);
slot->alarm.stats=&rfmsc_stats;
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=gettime_ms()+1000;
slot->alarm.deadline=slot->alarm.alarm+500;
schedule(&slot->alarm);
@ -1218,46 +1045,6 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
OUT();
}
static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot)
{
if (slot->prefix_length<1||slot->prefix_length>32) {
// invalid request
WARNF("invalid MDP Rhizome request");
return rhizome_fetch_close(slot);
}
if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) {
// connection timed out
DEBUGF("MDP connection timedout");
return rhizome_fetch_close(slot);
}
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
assert(my_subscriber);
assert(my_subscriber->sid);
bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE);
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE);
mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST;
mdp.out.ttl=1;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_ORDINARY;
mdp.out.payload_length=slot->prefix_length;
bcopy(slot->prefix,&mdp.out.payload[0],slot->prefix_length);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=gettime_ms()+100;
slot->alarm.deadline=slot->alarm.alarm+500;
schedule(&slot->alarm);
return 0;
}
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
{
/* In Rhizome Direct we use the same fetch slot system, but we aren't actually
@ -1296,7 +1083,6 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
slot->state=RHIZOME_FETCH_RXFILEMDP;
slot->last_write_time=gettime_ms();
if (slot->bidP) {
/* We are requesting a file. The http request may have already received
some of the file, so take that into account when setting up ring buffer.
Then send the request for the next block of data, and set our alarm to
@ -1309,17 +1095,10 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
down too much. Much careful thought is required to optimise this
transport.
*/
slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size
rhizome_fetch_mdp_requestblocks(slot);
} else {
/* We are requesting a manifest, which is stateless, except that we eventually
give up. All we need to do now is send the request, and set our alarm to
try again in case we haven't heard anything back. */
slot->mdpIdleTimeout=config.rhizome.idle_timeout;
rhizome_fetch_mdp_requestmanifest(slot);
}
slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size
rhizome_fetch_mdp_requestblocks(slot);
RETURN(0);
OUT();
@ -1533,8 +1312,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout;
slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout;
slot->alarm.function = rhizome_fetch_poll;
schedule(&slot->alarm);
schedule(&slot->alarm);
return;
} else {
if (config.debug.rhizome_rx)
@ -1602,7 +1380,6 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout;
slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout;
slot->alarm.function = rhizome_fetch_poll;
schedule(&slot->alarm);
return;
@ -1617,12 +1394,20 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
}
}
}
if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){
// timeout or socket error, close the socket
if (config.debug.rhizome_rx)
DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR);
if (slot->state!=RHIZOME_FETCH_FREE)
rhizome_fetch_close(slot);
switch (slot->state){
case RHIZOME_FETCH_RXFILEMDP:
rhizome_fetch_mdp_slot_callback(alarm);
break;
default:
// timeout or socket error, close the socket
if (config.debug.rhizome_rx)
DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR);
if (slot->state!=RHIZOME_FETCH_FREE)
rhizome_fetch_close(slot);
}
}
}